aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/storage/localstore/subscription_push.go
blob: 4b554c38c3441c56bef047083222dfd2d383bbea (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
// Copyright 2019 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

package localstore

import (
    "context"
    "sync"

    "github.com/dexon-foundation/dexon/log"
    "github.com/dexon-foundation/dexon/swarm/shed"
    "github.com/dexon-foundation/dexon/swarm/storage"
)

// SubscribePush returns a channel that provides storage chunks with ordering from push syncing index.
// Returned stop function will terminate current and further iterations, and also it will close
// the returned channel without any errors. Make sure that you check the second returned parameter
// from the channel to stop iteration when its value is false.
func (db *DB) SubscribePush(ctx context.Context) (c <-chan storage.Chunk, stop func()) {
    chunks := make(chan storage.Chunk)
    trigger := make(chan struct{}, 1)

    db.pushTriggersMu.Lock()
    db.pushTriggers = append(db.pushTriggers, trigger)
    db.pushTriggersMu.Unlock()

    // send signal for the initial iteration
    trigger <- struct{}{}

    stopChan := make(chan struct{})
    var stopChanOnce sync.Once

    go func() {
        // close the returned chunkInfo channel at the end to
        // signal that the subscription is done
        defer close(chunks)
        // sinceItem is the Item from which the next iteration
        // should start. The first iteration starts from the first Item.
        var sinceItem *shed.Item
        for {
            select {
            case <-trigger:
                // iterate until:
                // - last index Item is reached
                // - subscription stop is called
                // - context is done
                err := db.pushIndex.Iterate(func(item shed.Item) (stop bool, err error) {
                    // get chunk data
                    dataItem, err := db.retrievalDataIndex.Get(item)
                    if err != nil {
                        return true, err
                    }

                    select {
                    case chunks <- storage.NewChunk(dataItem.Address, dataItem.Data):
                        // set next iteration start item
                        // when its chunk is successfully sent to channel
                        sinceItem = &item
                        return false, nil
                    case <-stopChan:
                        // gracefully stop the iteration
                        // on stop
                        return true, nil
                    case <-db.close:
                        // gracefully stop the iteration
                        // on database close
                        return true, nil
                    case <-ctx.Done():
                        return true, ctx.Err()
                    }
                }, &shed.IterateOptions{
                    StartFrom: sinceItem,
                    // sinceItem was sent as the last Address in the previous
                    // iterator call, skip it in this one
                    SkipStartFromItem: true,
                })
                if err != nil {
                    log.Error("localstore push subscription iteration", "err", err)
                    return
                }
            case <-stopChan:
                // terminate the subscription
                // on stop
                return
            case <-db.close:
                // terminate the subscription
                // on database close
                return
            case <-ctx.Done():
                err := ctx.Err()
                if err != nil {
                    log.Error("localstore push subscription", "err", err)
                }
                return
            }
        }
    }()

    stop = func() {
        stopChanOnce.Do(func() {
            close(stopChan)
        })

        db.pushTriggersMu.Lock()
        defer db.pushTriggersMu.Unlock()

        for i, t := range db.pushTriggers {
            if t == trigger {
                db.pushTriggers = append(db.pushTriggers[:i], db.pushTriggers[i+1:]...)
                break
            }
        }
    }

    return chunks, stop
}

// triggerPushSubscriptions is used internally for starting iterations
// on Push subscriptions. Whenever new item is added to the push index,
// this function should be called.
func (db *DB) triggerPushSubscriptions() {
    db.pushTriggersMu.RLock()
    triggers := db.pushTriggers
    db.pushTriggersMu.RUnlock()

    for _, t := range triggers {
        select {
        case t <- struct{}{}:
        default:
        }
    }
}