aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/storage/localstore/subscription_pull.go
blob: a18f0915d9386533b11d5dd12c3a99e614cdd5ea (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
// Copyright 2019 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

package localstore

import (
    "bytes"
    "context"
    "errors"
    "fmt"
    "sync"

    "github.com/ethereum/go-ethereum/log"
    "github.com/ethereum/go-ethereum/swarm/shed"
    "github.com/ethereum/go-ethereum/swarm/storage"
)

// SubscribePull returns a channel that provides chunk addresses and stored times from pull syncing index.
// Pull syncing index can be only subscribed to a particular proximity order bin. If since
// is not nil, the iteration will start from the first item stored after that timestamp. If until is not nil,
// only chunks stored up to this timestamp will be send to the channel, and the returned channel will be
// closed. The since-until interval is open on the left and closed on the right (since,until]. Returned stop
// function will terminate current and further iterations without errors, and also close the returned channel.
// Make sure that you check the second returned parameter from the channel to stop iteration when its value
// is false.
func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until *ChunkDescriptor) (c <-chan ChunkDescriptor, stop func()) {
    chunkDescriptors := make(chan ChunkDescriptor)
    trigger := make(chan struct{}, 1)

    db.pullTriggersMu.Lock()
    if _, ok := db.pullTriggers[bin]; !ok {
        db.pullTriggers[bin] = make([]chan struct{}, 0)
    }
    db.pullTriggers[bin] = append(db.pullTriggers[bin], trigger)
    db.pullTriggersMu.Unlock()

    // send signal for the initial iteration
    trigger <- struct{}{}

    stopChan := make(chan struct{})
    var stopChanOnce sync.Once

    // used to provide information from the iterator to
    // stop subscription when until chunk descriptor is reached
    var errStopSubscription = errors.New("stop subscription")

    go func() {
        // close the returned ChunkDescriptor channel at the end to
        // signal that the subscription is done
        defer close(chunkDescriptors)
        // sinceItem is the Item from which the next iteration
        // should start. The first iteration starts from the first Item.
        var sinceItem *shed.Item
        if since != nil {
            sinceItem = &shed.Item{
                Address:        since.Address,
                StoreTimestamp: since.StoreTimestamp,
            }
        }
        for {
            select {
            case <-trigger:
                // iterate until:
                // - last index Item is reached
                // - subscription stop is called
                // - context is done
                err := db.pullIndex.Iterate(func(item shed.Item) (stop bool, err error) {
                    select {
                    case chunkDescriptors <- ChunkDescriptor{
                        Address:        item.Address,
                        StoreTimestamp: item.StoreTimestamp,
                    }:
                        // until chunk descriptor is sent
                        // break the iteration
                        if until != nil &&
                            (item.StoreTimestamp >= until.StoreTimestamp ||
                                bytes.Equal(item.Address, until.Address)) {
                            return true, errStopSubscription
                        }
                        // set next iteration start item
                        // when its chunk is successfully sent to channel
                        sinceItem = &item
                        return false, nil
                    case <-stopChan:
                        // gracefully stop the iteration
                        // on stop
                        return true, nil
                    case <-db.close:
                        // gracefully stop the iteration
                        // on database close
                        return true, nil
                    case <-ctx.Done():
                        return true, ctx.Err()
                    }
                }, &shed.IterateOptions{
                    StartFrom: sinceItem,
                    // sinceItem was sent as the last Address in the previous
                    // iterator call, skip it in this one
                    SkipStartFromItem: true,
                    Prefix:            []byte{bin},
                })
                if err != nil {
                    if err == errStopSubscription {
                        // stop subscription without any errors
                        // if until is reached
                        return
                    }
                    log.Error("localstore pull subscription iteration", "bin", bin, "since", since, "until", until, "err", err)
                    return
                }
            case <-stopChan:
                // terminate the subscription
                // on stop
                return
            case <-db.close:
                // terminate the subscription
                // on database close
                return
            case <-ctx.Done():
                err := ctx.Err()
                if err != nil {
                    log.Error("localstore pull subscription", "bin", bin, "since", since, "until", until, "err", err)
                }
                return
            }
        }
    }()

    stop = func() {
        stopChanOnce.Do(func() {
            close(stopChan)
        })

        db.pullTriggersMu.Lock()
        defer db.pullTriggersMu.Unlock()

        for i, t := range db.pullTriggers[bin] {
            if t == trigger {
                db.pullTriggers[bin] = append(db.pullTriggers[bin][:i], db.pullTriggers[bin][i+1:]...)
                break
            }
        }
    }

    return chunkDescriptors, stop
}

// ChunkDescriptor holds information required for Pull syncing. This struct
// is provided by subscribing to pull index.
type ChunkDescriptor struct {
    Address        storage.Address
    StoreTimestamp int64
}

func (c *ChunkDescriptor) String() string {
    if c == nil {
        return "none"
    }
    return fmt.Sprintf("%s stored at %v", c.Address.Hex(), c.StoreTimestamp)
}

// triggerPullSubscriptions is used internally for starting iterations
// on Pull subscriptions for a particular bin. When new item with address
// that is in particular bin for DB's baseKey is added to pull index
// this function should be called.
func (db *DB) triggerPullSubscriptions(bin uint8) {
    db.pullTriggersMu.RLock()
    triggers, ok := db.pullTriggers[bin]
    db.pullTriggersMu.RUnlock()
    if !ok {
        return
    }

    for _, t := range triggers {
        select {
        case t <- struct{}{}:
        default:
        }
    }
}