aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/storage/localstore/subscription_pull.go
blob: dd07add5351e08d3b0dc2c448fe8fa5c08e06acb (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
// Copyright 2019 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

package localstore

import (
    "context"
    "errors"
    "sync"
    "time"

    "github.com/ethereum/go-ethereum/log"
    "github.com/ethereum/go-ethereum/metrics"
    "github.com/ethereum/go-ethereum/swarm/chunk"
    "github.com/ethereum/go-ethereum/swarm/shed"
    "github.com/ethereum/go-ethereum/swarm/spancontext"
    "github.com/opentracing/opentracing-go"
    olog "github.com/opentracing/opentracing-go/log"
    "github.com/syndtr/goleveldb/leveldb"
)

// SubscribePull returns a channel that provides chunk addresses and stored times from pull syncing index.
// Pull syncing index can be only subscribed to a particular proximity order bin. If since
// is not 0, the iteration will start from the first item stored after that id. If until is not 0,
// only chunks stored up to this id will be sent to the channel, and the returned channel will be
// closed. The since-until interval is open on since side, and closed on until side: (since,until] <=> [since+1,until]. Returned stop
// function will terminate current and further iterations without errors, and also close the returned channel.
// Make sure that you check the second returned parameter from the channel to stop iteration when its value
// is false.
func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until uint64) (c <-chan chunk.Descriptor, stop func()) {
    metricName := "localstore.SubscribePull"
    metrics.GetOrRegisterCounter(metricName, nil).Inc(1)

    chunkDescriptors := make(chan chunk.Descriptor)
    trigger := make(chan struct{}, 1)

    db.pullTriggersMu.Lock()
    if _, ok := db.pullTriggers[bin]; !ok {
        db.pullTriggers[bin] = make([]chan struct{}, 0)
    }
    db.pullTriggers[bin] = append(db.pullTriggers[bin], trigger)
    db.pullTriggersMu.Unlock()

    // send signal for the initial iteration
    trigger <- struct{}{}

    stopChan := make(chan struct{})
    var stopChanOnce sync.Once

    // used to provide information from the iterator to
    // stop subscription when until chunk descriptor is reached
    var errStopSubscription = errors.New("stop subscription")

    go func() {
        defer metrics.GetOrRegisterCounter(metricName+".stop", nil).Inc(1)
        // close the returned chunk.Descriptor channel at the end to
        // signal that the subscription is done
        defer close(chunkDescriptors)
        // sinceItem is the Item from which the next iteration
        // should start. The first iteration starts from the first Item.
        var sinceItem *shed.Item
        if since > 0 {
            sinceItem = &shed.Item{
                Address: db.addressInBin(bin),
                BinID:   since,
            }
        }
        first := true // first iteration flag for SkipStartFromItem
        for {
            select {
            case <-trigger:
                // iterate until:
                // - last index Item is reached
                // - subscription stop is called
                // - context is done
                metrics.GetOrRegisterCounter(metricName+".iter", nil).Inc(1)

                ctx, sp := spancontext.StartSpan(ctx, metricName+".iter")
                sp.LogFields(olog.Int("bin", int(bin)), olog.Uint64("since", since), olog.Uint64("until", until))

                iterStart := time.Now()
                var count int
                err := db.pullIndex.Iterate(func(item shed.Item) (stop bool, err error) {
                    select {
                    case chunkDescriptors <- chunk.Descriptor{
                        Address: item.Address,
                        BinID:   item.BinID,
                    }:
                        count++
                        // until chunk descriptor is sent
                        // break the iteration
                        if until > 0 && item.BinID >= until {
                            return true, errStopSubscription
                        }
                        // set next iteration start item
                        // when its chunk is successfully sent to channel
                        sinceItem = &item
                        return false, nil
                    case <-stopChan:
                        // gracefully stop the iteration
                        // on stop
                        return true, nil
                    case <-db.close:
                        // gracefully stop the iteration
                        // on database close
                        return true, nil
                    case <-ctx.Done():
                        return true, ctx.Err()
                    }
                }, &shed.IterateOptions{
                    StartFrom: sinceItem,
                    // sinceItem was sent as the last Address in the previous
                    // iterator call, skip it in this one, but not the item with
                    // the provided since bin id as it should be sent to a channel
                    SkipStartFromItem: !first,
                    Prefix:            []byte{bin},
                })

                totalTimeMetric(metricName+".iter", iterStart)

                sp.FinishWithOptions(opentracing.FinishOptions{
                    LogRecords: []opentracing.LogRecord{
                        {
                            Timestamp: time.Now(),
                            Fields:    []olog.Field{olog.Int("count", count)},
                        },
                    },
                })

                if err != nil {
                    if err == errStopSubscription {
                        // stop subscription without any errors
                        // if until is reached
                        return
                    }
                    metrics.GetOrRegisterCounter(metricName+".iter.error", nil).Inc(1)
                    log.Error("localstore pull subscription iteration", "bin", bin, "since", since, "until", until, "err", err)
                    return
                }
                first = false
            case <-stopChan:
                // terminate the subscription
                // on stop
                return
            case <-db.close:
                // terminate the subscription
                // on database close
                return
            case <-ctx.Done():
                err := ctx.Err()
                if err != nil {
                    log.Error("localstore pull subscription", "bin", bin, "since", since, "until", until, "err", err)
                }
                return
            }
        }
    }()

    stop = func() {
        stopChanOnce.Do(func() {
            close(stopChan)
        })

        db.pullTriggersMu.Lock()
        defer db.pullTriggersMu.Unlock()

        for i, t := range db.pullTriggers[bin] {
            if t == trigger {
                db.pullTriggers[bin] = append(db.pullTriggers[bin][:i], db.pullTriggers[bin][i+1:]...)
                break
            }
        }
    }

    return chunkDescriptors, stop
}

// LastPullSubscriptionBinID returns chunk bin id of the latest Chunk
// in pull syncing index for a provided bin. If there are no chunks in
// that bin, 0 value is returned.
func (db *DB) LastPullSubscriptionBinID(bin uint8) (id uint64, err error) {
    metrics.GetOrRegisterCounter("localstore.LastPullSubscriptionBinID", nil).Inc(1)

    item, err := db.pullIndex.Last([]byte{bin})
    if err != nil {
        if err == leveldb.ErrNotFound {
            return 0, nil
        }
        return 0, err
    }
    return item.BinID, nil
}

// triggerPullSubscriptions is used internally for starting iterations
// on Pull subscriptions for a particular bin. When new item with address
// that is in particular bin for DB's baseKey is added to pull index
// this function should be called.
func (db *DB) triggerPullSubscriptions(bin uint8) {
    db.pullTriggersMu.RLock()
    triggers, ok := db.pullTriggers[bin]
    db.pullTriggersMu.RUnlock()
    if !ok {
        return
    }

    for _, t := range triggers {
        select {
        case t <- struct{}{}:
        default:
        }
    }
}

// addressInBin returns an address that is in a specific
// proximity order bin from database base key.
func (db *DB) addressInBin(bin uint8) (addr chunk.Address) {
    addr = append([]byte(nil), db.baseKey...)
    b := bin / 8
    addr[b] = addr[b] ^ (1 << (7 - bin%8))
    return addr
}