aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/storage/netstore.go
blob: 80ac6f1989d4c2d9480bb10d3e1139806f059ae6 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
// Copyright 2016 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

package storage

import (
    "context"
    "encoding/hex"
    "fmt"
    "sync"
    "sync/atomic"
    "time"

    "github.com/ethereum/go-ethereum/p2p/discover"
    "github.com/ethereum/go-ethereum/swarm/log"

    lru "github.com/hashicorp/golang-lru"
)

type (
    NewNetFetcherFunc func(ctx context.Context, addr Address, peers *sync.Map) NetFetcher
)

type NetFetcher interface {
    Request(ctx context.Context)
    Offer(ctx context.Context, source *discover.NodeID)
}

// NetStore is an extension of local storage
// it implements the ChunkStore interface
// on request it initiates remote cloud retrieval using a fetcher
// fetchers are unique to a chunk and are stored in fetchers LRU memory cache
// fetchFuncFactory is a factory object to create a fetch function for a specific chunk address
type NetStore struct {
    mu                sync.Mutex
    store             SyncChunkStore
    fetchers          *lru.Cache
    NewNetFetcherFunc NewNetFetcherFunc
    closeC            chan struct{}
}

var fetcherTimeout = 2 * time.Minute // timeout to cancel the fetcher even if requests are coming in

// NewNetStore creates a new NetStore object using the given local store. newFetchFunc is a
// constructor function that can create a fetch function for a specific chunk address.
func NewNetStore(store SyncChunkStore, nnf NewNetFetcherFunc) (*NetStore, error) {
    fetchers, err := lru.New(defaultChunkRequestsCacheCapacity)
    if err != nil {
        return nil, err
    }
    return &NetStore{
        store:             store,
        fetchers:          fetchers,
        NewNetFetcherFunc: nnf,
        closeC:            make(chan struct{}),
    }, nil
}

// Put stores a chunk in localstore, and delivers to all requestor peers using the fetcher stored in
// the fetchers cache
func (n *NetStore) Put(ctx context.Context, ch Chunk) error {
    n.mu.Lock()
    defer n.mu.Unlock()

    // put to the chunk to the store, there should be no error
    err := n.store.Put(ctx, ch)
    if err != nil {
        return err
    }

    // if chunk is now put in the store, check if there was an active fetcher and call deliver on it
    // (this delivers the chunk to requestors via the fetcher)
    if f := n.getFetcher(ch.Address()); f != nil {
        f.deliver(ctx, ch)
    }
    return nil
}

// Get retrieves the chunk from the NetStore DPA synchronously.
// It calls NetStore.get, and if the chunk is not in local Storage
// it calls fetch with the request, which blocks until the chunk
// arrived or context is done
func (n *NetStore) Get(rctx context.Context, ref Address) (Chunk, error) {
    chunk, fetch, err := n.get(rctx, ref)
    if err != nil {
        return nil, err
    }
    if chunk != nil {
        return chunk, nil
    }
    return fetch(rctx)
}

func (n *NetStore) BinIndex(po uint8) uint64 {
    return n.store.BinIndex(po)
}

func (n *NetStore) Iterator(from uint64, to uint64, po uint8, f func(Address, uint64) bool) error {
    return n.store.Iterator(from, to, po, f)
}

// FetchFunc returns nil if the store contains the given address. Otherwise it returns a wait function,
// which returns after the chunk is available or the context is done
func (n *NetStore) FetchFunc(ctx context.Context, ref Address) func(context.Context) error {
    chunk, fetch, _ := n.get(ctx, ref)
    if chunk != nil {
        return nil
    }
    return func(ctx context.Context) error {
        _, err := fetch(ctx)
        return err
    }
}

// Close chunk store
func (n *NetStore) Close() {
    close(n.closeC)
    n.store.Close()
    // TODO: loop through fetchers to cancel them
}

// get attempts at retrieving the chunk from LocalStore
// If it is not found then using getOrCreateFetcher:
//     1. Either there is already a fetcher to retrieve it
//     2. A new fetcher is created and saved in the fetchers cache
// From here on, all Get will hit on this fetcher until the chunk is delivered
// or all fetcher contexts are done.
// It returns a chunk, a fetcher function and an error
// If chunk is nil, the returned fetch function needs to be called with a context to return the chunk.
func (n *NetStore) get(ctx context.Context, ref Address) (Chunk, func(context.Context) (Chunk, error), error) {
    n.mu.Lock()
    defer n.mu.Unlock()

    chunk, err := n.store.Get(ctx, ref)
    if err != nil {
        if err != ErrChunkNotFound {
            log.Debug("Received error from LocalStore other than ErrNotFound", "err", err)
        }
        // The chunk is not available in the LocalStore, let's get the fetcher for it, or create a new one
        // if it doesn't exist yet
        f := n.getOrCreateFetcher(ref)
        // If the caller needs the chunk, it has to use the returned fetch function to get it
        return nil, f.Fetch, nil
    }

    return chunk, nil, nil
}

// getOrCreateFetcher attempts at retrieving an existing fetchers
// if none exists, creates one and saves it in the fetchers cache
// caller must hold the lock
func (n *NetStore) getOrCreateFetcher(ref Address) *fetcher {
    if f := n.getFetcher(ref); f != nil {
        return f
    }

    // no fetcher for the given address, we have to create a new one
    key := hex.EncodeToString(ref)
    // create the context during which fetching is kept alive
    ctx, cancel := context.WithTimeout(context.Background(), fetcherTimeout)
    // destroy is called when all requests finish
    destroy := func() {
        // remove fetcher from fetchers
        n.fetchers.Remove(key)
        // stop fetcher by cancelling context called when
        // all requests cancelled/timedout or chunk is delivered
        cancel()
    }
    // peers always stores all the peers which have an active request for the chunk. It is shared
    // between fetcher and the NewFetchFunc function. It is needed by the NewFetchFunc because
    // the peers which requested the chunk should not be requested to deliver it.
    peers := &sync.Map{}

    fetcher := newFetcher(ref, n.NewNetFetcherFunc(ctx, ref, peers), destroy, peers, n.closeC)
    n.fetchers.Add(key, fetcher)

    return fetcher
}

// getFetcher retrieves the fetcher for the given address from the fetchers cache if it exists,
// otherwise it returns nil
func (n *NetStore) getFetcher(ref Address) *fetcher {
    key := hex.EncodeToString(ref)
    f, ok := n.fetchers.Get(key)
    if ok {
        return f.(*fetcher)
    }
    return nil
}

// RequestsCacheLen returns the current number of outgoing requests stored in the cache
func (n *NetStore) RequestsCacheLen() int {
    return n.fetchers.Len()
}

// One fetcher object is responsible to fetch one chunk for one address, and keep track of all the
// peers who have requested it and did not receive it yet.
type fetcher struct {
    addr        Address       // address of chunk
    chunk       Chunk         // fetcher can set the chunk on the fetcher
    deliveredC  chan struct{} // chan signalling chunk delivery to requests
    cancelledC  chan struct{} // chan signalling the fetcher has been cancelled (removed from fetchers in NetStore)
    netFetcher  NetFetcher    // remote fetch function to be called with a request source taken from the context
    cancel      func()        // cleanup function for the remote fetcher to call when all upstream contexts are called
    peers       *sync.Map     // the peers which asked for the chunk
    requestCnt  int32         // number of requests on this chunk. If all the requests are done (delivered or context is done) the cancel function is called
    deliverOnce *sync.Once    // guarantees that we only close deliveredC once
}

// newFetcher creates a new fetcher object for the fiven addr. fetch is the function which actually
// does the retrieval (in non-test cases this is coming from the network package). cancel function is
// called either
//     1. when the chunk has been fetched all peers have been either notified or their context has been done
//     2. the chunk has not been fetched but all context from all the requests has been done
// The peers map stores all the peers which have requested chunk.
func newFetcher(addr Address, nf NetFetcher, cancel func(), peers *sync.Map, closeC chan struct{}) *fetcher {
    cancelOnce := &sync.Once{} // cancel should only be called once
    return &fetcher{
        addr:        addr,
        deliveredC:  make(chan struct{}),
        deliverOnce: &sync.Once{},
        cancelledC:  closeC,
        netFetcher:  nf,
        cancel: func() {
            cancelOnce.Do(func() {
                cancel()
            })
        },
        peers: peers,
    }
}

// Fetch fetches the chunk synchronously, it is called by NetStore.Get is the chunk is not available
// locally.
func (f *fetcher) Fetch(rctx context.Context) (Chunk, error) {
    atomic.AddInt32(&f.requestCnt, 1)
    defer func() {
        // if all the requests are done the fetcher can be cancelled
        if atomic.AddInt32(&f.requestCnt, -1) == 0 {
            f.cancel()
        }
    }()

    // The peer asking for the chunk. Store in the shared peers map, but delete after the request
    // has been delivered
    peer := rctx.Value("peer")
    if peer != nil {
        f.peers.Store(peer, time.Now())
        defer f.peers.Delete(peer)
    }

    // If there is a source in the context then it is an offer, otherwise a request
    sourceIF := rctx.Value("source")
    if sourceIF != nil {
        var source *discover.NodeID
        id := discover.MustHexID(sourceIF.(string))
        source = &id
        f.netFetcher.Offer(rctx, source)
    } else {
        f.netFetcher.Request(rctx)
    }

    // wait until either the chunk is delivered or the context is done
    select {
    case <-rctx.Done():
        return nil, rctx.Err()
    case <-f.deliveredC:
        return f.chunk, nil
    case <-f.cancelledC:
        return nil, fmt.Errorf("fetcher cancelled")
    }
}

// deliver is called by NetStore.Put to notify all pending requests
func (f *fetcher) deliver(ctx context.Context, ch Chunk) {
    f.deliverOnce.Do(func() {
        f.chunk = ch
        // closing the deliveredC channel will terminate ongoing requests
        close(f.deliveredC)
    })
}