aboutsummaryrefslogblamecommitdiffstats
path: root/swarm/storage/localstore/subscription_pull.go
blob: a18f0915d9386533b11d5dd12c3a99e614cdd5ea (plain) (tree)
































































































































































































                                                                                                                                                   
// Copyright 2019 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

package localstore

import (
    "bytes"
    "context"
    "errors"
    "fmt"
    "sync"

    "github.com/ethereum/go-ethereum/log"
    "github.com/ethereum/go-ethereum/swarm/shed"
    "github.com/ethereum/go-ethereum/swarm/storage"
)

// SubscribePull returns a channel that provides chunk addresses and stored times from pull syncing index.
// Pull syncing index can be only subscribed to a particular proximity order bin. If since
// is not nil, the iteration will start from the first item stored after that timestamp. If until is not nil,
// only chunks stored up to this timestamp will be send to the channel, and the returned channel will be
// closed. The since-until interval is open on the left and closed on the right (since,until]. Returned stop
// function will terminate current and further iterations without errors, and also close the returned channel.
// Make sure that you check the second returned parameter from the channel to stop iteration when its value
// is false.
func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until *ChunkDescriptor) (c <-chan ChunkDescriptor, stop func()) {
    chunkDescriptors := make(chan ChunkDescriptor)
    trigger := make(chan struct{}, 1)

    db.pullTriggersMu.Lock()
    if _, ok := db.pullTriggers[bin]; !ok {
        db.pullTriggers[bin] = make([]chan struct{}, 0)
    }
    db.pullTriggers[bin] = append(db.pullTriggers[bin], trigger)
    db.pullTriggersMu.Unlock()

    // send signal for the initial iteration
    trigger <- struct{}{}

    stopChan := make(chan struct{})
    var stopChanOnce sync.Once

    // used to provide information from the iterator to
    // stop subscription when until chunk descriptor is reached
    var errStopSubscription = errors.New("stop subscription")

    go func() {
        // close the returned ChunkDescriptor channel at the end to
        // signal that the subscription is done
        defer close(chunkDescriptors)
        // sinceItem is the Item from which the next iteration
        // should start. The first iteration starts from the first Item.
        var sinceItem *shed.Item
        if since != nil {
            sinceItem = &shed.Item{
                Address:        since.Address,
                StoreTimestamp: since.StoreTimestamp,
            }
        }
        for {
            select {
            case <-trigger:
                // iterate until:
                // - last index Item is reached
                // - subscription stop is called
                // - context is done
                err := db.pullIndex.Iterate(func(item shed.Item) (stop bool, err error) {
                    select {
                    case chunkDescriptors <- ChunkDescriptor{
                        Address:        item.Address,
                        StoreTimestamp: item.StoreTimestamp,
                    }:
                        // until chunk descriptor is sent
                        // break the iteration
                        if until != nil &&
                            (item.StoreTimestamp >= until.StoreTimestamp ||
                                bytes.Equal(item.Address, until.Address)) {
                            return true, errStopSubscription
                        }
                        // set next iteration start item
                        // when its chunk is successfully sent to channel
                        sinceItem = &item
                        return false, nil
                    case <-stopChan:
                        // gracefully stop the iteration
                        // on stop
                        return true, nil
                    case <-db.close:
                        // gracefully stop the iteration
                        // on database close
                        return true, nil
                    case <-ctx.Done():
                        return true, ctx.Err()
                    }
                }, &shed.IterateOptions{
                    StartFrom: sinceItem,
                    // sinceItem was sent as the last Address in the previous
                    // iterator call, skip it in this one
                    SkipStartFromItem: true,
                    Prefix:            []byte{bin},
                })
                if err != nil {
                    if err == errStopSubscription {
                        // stop subscription without any errors
                        // if until is reached
                        return
                    }
                    log.Error("localstore pull subscription iteration", "bin", bin, "since", since, "until", until, "err", err)
                    return
                }
            case <-stopChan:
                // terminate the subscription
                // on stop
                return
            case <-db.close:
                // terminate the subscription
                // on database close
                return
            case <-ctx.Done():
                err := ctx.Err()
                if err != nil {
                    log.Error("localstore pull subscription", "bin", bin, "since", since, "until", until, "err", err)
                }
                return
            }
        }
    }()

    stop = func() {
        stopChanOnce.Do(func() {
            close(stopChan)
        })

        db.pullTriggersMu.Lock()
        defer db.pullTriggersMu.Unlock()

        for i, t := range db.pullTriggers[bin] {
            if t == trigger {
                db.pullTriggers[bin] = append(db.pullTriggers[bin][:i], db.pullTriggers[bin][i+1:]...)
                break
            }
        }
    }

    return chunkDescriptors, stop
}

// ChunkDescriptor holds information required for Pull syncing. This struct
// is provided by subscribing to pull index.
type ChunkDescriptor struct {
    Address        storage.Address
    StoreTimestamp int64
}

func (c *ChunkDescriptor) String() string {
    if c == nil {
        return "none"
    }
    return fmt.Sprintf("%s stored at %v", c.Address.Hex(), c.StoreTimestamp)
}

// triggerPullSubscriptions is used internally for starting iterations
// on Pull subscriptions for a particular bin. When new item with address
// that is in particular bin for DB's baseKey is added to pull index
// this function should be called.
func (db *DB) triggerPullSubscriptions(bin uint8) {
    db.pullTriggersMu.RLock()
    triggers, ok := db.pullTriggers[bin]
    db.pullTriggersMu.RUnlock()
    if !ok {
        return
    }

    for _, t := range triggers {
        select {
        case t <- struct{}{}:
        default:
        }
    }
}