aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/network/syncdb.go
blob: 7216da525e2e0340fb39102d4952222500ae2fe1 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
// Copyright 2016 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

package network

import (
    "encoding/binary"
    "fmt"

    "github.com/ethereum/go-ethereum/log"
    "github.com/ethereum/go-ethereum/swarm/storage"
    "github.com/syndtr/goleveldb/leveldb"
    "github.com/syndtr/goleveldb/leveldb/iterator"
)

const counterKeyPrefix = 0x01

/*
syncDb is a queueing service for outgoing deliveries.
One instance per priority queue for each peer

a syncDb instance maintains an in-memory buffer (of capacity bufferSize)
once its in-memory buffer is full it switches to persisting in db
and dbRead iterator iterates through the items keeping their order
once the db read catches up (there is no more items in the db) then
it switches back to in-memory buffer.

when syncdb is stopped all items in the buffer are saved to the db
*/
type syncDb struct {
    start          []byte               // this syncdb starting index in requestdb
    key            storage.Key          // remote peers address key
    counterKey     []byte               // db key to persist counter
    priority       uint                 // priotity High|Medium|Low
    buffer         chan interface{}     // incoming request channel
    db             *storage.LDBDatabase // underlying db (TODO should be interface)
    done           chan bool            // chan to signal goroutines finished quitting
    quit           chan bool            // chan to signal quitting to goroutines
    total, dbTotal int                  // counts for one session
    batch          chan chan int        // channel for batch requests
    dbBatchSize    uint                 // number of items before batch is saved
}

// constructor needs a shared request db (leveldb)
// priority is used in the index key
// uses a buffer and a leveldb for persistent storage
// bufferSize, dbBatchSize are config parameters
func newSyncDb(db *storage.LDBDatabase, key storage.Key, priority uint, bufferSize, dbBatchSize uint, deliver func(interface{}, chan bool) bool) *syncDb {
    start := make([]byte, 42)
    start[1] = byte(priorities - priority)
    copy(start[2:34], key)

    counterKey := make([]byte, 34)
    counterKey[0] = counterKeyPrefix
    copy(counterKey[1:], start[1:34])

    syncdb := &syncDb{
        start:       start,
        key:         key,
        counterKey:  counterKey,
        priority:    priority,
        buffer:      make(chan interface{}, bufferSize),
        db:          db,
        done:        make(chan bool),
        quit:        make(chan bool),
        batch:       make(chan chan int),
        dbBatchSize: dbBatchSize,
    }
    log.Trace(fmt.Sprintf("syncDb[peer: %v, priority: %v] - initialised", key.Log(), priority))

    // starts the main forever loop reading from buffer
    go syncdb.bufferRead(deliver)
    return syncdb
}

/*
bufferRead is a forever iterator loop that takes care of delivering
outgoing store requests reads from incoming buffer

its argument is the deliver function taking the item as first argument
and a quit channel as second.
Closing of this channel is supposed to abort all waiting for delivery
(typically network write)

The iteration switches between 2 modes,
* buffer mode reads the in-memory buffer and delivers the items directly
* db mode reads from the buffer and writes to the db, parallelly another
routine is started that reads from the db and delivers items

If there is buffer contention in buffer mode (slow network, high upload volume)
syncdb switches to db mode and starts dbRead
Once db backlog is delivered, it reverts back to in-memory buffer

It is automatically started when syncdb is initialised.

It saves the buffer to db upon receiving quit signal. syncDb#stop()
*/
func (self *syncDb) bufferRead(deliver func(interface{}, chan bool) bool) {
    var buffer, db chan interface{} // channels representing the two read modes
    var more bool
    var req interface{}
    var entry *syncDbEntry
    var inBatch, inDb int
    batch := new(leveldb.Batch)
    var dbSize chan int
    quit := self.quit
    counterValue := make([]byte, 8)

    // counter is used for keeping the items in order, persisted to db
    // start counter where db was at, 0 if not found
    data, err := self.db.Get(self.counterKey)
    var counter uint64
    if err == nil {
        counter = binary.BigEndian.Uint64(data)
        log.Trace(fmt.Sprintf("syncDb[%v/%v] - counter read from db at %v", self.key.Log(), self.priority, counter))
    } else {
        log.Trace(fmt.Sprintf("syncDb[%v/%v] - counter starts at %v", self.key.Log(), self.priority, counter))
    }

LOOP:
    for {
        // waiting for item next in the buffer, or quit signal or batch request
        select {
        // buffer only closes when writing to db
        case req = <-buffer:
            // deliver request : this is blocking on network write so
            // it is passed the quit channel as argument, so that it returns
            // if syncdb is stopped. In this case we need to save the item to the db
            more = deliver(req, self.quit)
            if !more {
                log.Debug(fmt.Sprintf("syncDb[%v/%v] quit: switching to db. session tally (db/total): %v/%v", self.key.Log(), self.priority, self.dbTotal, self.total))
                // received quit signal, save request currently waiting delivery
                // by switching to db mode and closing the buffer
                buffer = nil
                db = self.buffer
                close(db)
                quit = nil // needs to block the quit case in select
                break      // break from select, this item will be written to the db
            }
            self.total++
            log.Trace(fmt.Sprintf("syncDb[%v/%v] deliver (db/total): %v/%v", self.key.Log(), self.priority, self.dbTotal, self.total))
            // by the time deliver returns, there were new writes to the buffer
            // if buffer contention is detected, switch to db mode which drains
            // the buffer so no process will block on pushing store requests
            if len(buffer) == cap(buffer) {
                log.Debug(fmt.Sprintf("syncDb[%v/%v] buffer full %v: switching to db. session tally (db/total): %v/%v", self.key.Log(), self.priority, cap(buffer), self.dbTotal, self.total))
                buffer = nil
                db = self.buffer
            }
            continue LOOP

            // incoming entry to put into db
        case req, more = <-db:
            if !more {
                // only if quit is called, saved all the buffer
                binary.BigEndian.PutUint64(counterValue, counter)
                batch.Put(self.counterKey, counterValue) // persist counter in batch
                self.writeSyncBatch(batch)               // save batch
                log.Trace(fmt.Sprintf("syncDb[%v/%v] quitting: save current batch to db", self.key.Log(), self.priority))
                break LOOP
            }
            self.dbTotal++
            self.total++
            // otherwise break after select
        case dbSize = <-self.batch:
            // explicit request for batch
            if inBatch == 0 && quit != nil {
                // there was no writes since the last batch so db depleted
                // switch to buffer mode
                log.Debug(fmt.Sprintf("syncDb[%v/%v] empty db: switching to buffer", self.key.Log(), self.priority))
                db = nil
                buffer = self.buffer
                dbSize <- 0 // indicates to 'caller' that batch has been written
                inDb = 0
                continue LOOP
            }
            binary.BigEndian.PutUint64(counterValue, counter)
            batch.Put(self.counterKey, counterValue)
            log.Debug(fmt.Sprintf("syncDb[%v/%v] write batch %v/%v - %x - %x", self.key.Log(), self.priority, inBatch, counter, self.counterKey, counterValue))
            batch = self.writeSyncBatch(batch)
            dbSize <- inBatch // indicates to 'caller' that batch has been written
            inBatch = 0
            continue LOOP

            // closing syncDb#quit channel is used to signal to all goroutines to quit
        case <-quit:
            // need to save backlog, so switch to db mode
            db = self.buffer
            buffer = nil
            quit = nil
            log.Trace(fmt.Sprintf("syncDb[%v/%v] quitting: save buffer to db", self.key.Log(), self.priority))
            close(db)
            continue LOOP
        }

        // only get here if we put req into db
        entry, err = self.newSyncDbEntry(req, counter)
        if err != nil {
            log.Warn(fmt.Sprintf("syncDb[%v/%v] saving request %v (#%v/%v) failed: %v", self.key.Log(), self.priority, req, inBatch, inDb, err))
            continue LOOP
        }
        batch.Put(entry.key, entry.val)
        log.Trace(fmt.Sprintf("syncDb[%v/%v] to batch %v '%v' (#%v/%v/%v)", self.key.Log(), self.priority, req, entry, inBatch, inDb, counter))
        // if just switched to db mode and not quitting, then launch dbRead
        // in a parallel go routine to send deliveries from db
        if inDb == 0 && quit != nil {
            log.Trace(fmt.Sprintf("syncDb[%v/%v] start dbRead"))
            go self.dbRead(true, counter, deliver)
        }
        inDb++
        inBatch++
        counter++
        // need to save the batch if it gets too large (== dbBatchSize)
        if inBatch%int(self.dbBatchSize) == 0 {
            batch = self.writeSyncBatch(batch)
        }
    }
    log.Info(fmt.Sprintf("syncDb[%v:%v]: saved %v keys (saved counter at %v)", self.key.Log(), self.priority, inBatch, counter))
    close(self.done)
}

// writes the batch to the db and returns a new batch object
func (self *syncDb) writeSyncBatch(batch *leveldb.Batch) *leveldb.Batch {
    err := self.db.Write(batch)
    if err != nil {
        log.Warn(fmt.Sprintf("syncDb[%v/%v] saving batch to db failed: %v", self.key.Log(), self.priority, err))
        return batch
    }
    return new(leveldb.Batch)
}

// abstract type for db entries (TODO could be a feature of Receipts)
type syncDbEntry struct {
    key, val []byte
}

func (self syncDbEntry) String() string {
    return fmt.Sprintf("key: %x, value: %x", self.key, self.val)
}

/*
    dbRead is iterating over store requests to be sent over to the peer
    this is mainly to prevent crashes due to network output buffer contention (???)
    as well as to make syncronisation resilient to disconnects
    the messages are supposed to be sent in the p2p priority queue.

    the request DB is shared between peers, but domains for each syncdb
    are disjoint. dbkeys (42 bytes) are structured:
    * 0: 0x00 (0x01 reserved for counter key)
    * 1: priorities - priority (so that high priority can be replayed first)
    * 2-33: peers address
    * 34-41: syncdb counter to preserve order (this field is missing for the counter key)

    values (40 bytes) are:
    * 0-31: key
    * 32-39: request id

dbRead needs a boolean to indicate if on first round all the historical
record is synced. Second argument to indicate current db counter
The third is the function to apply
*/
func (self *syncDb) dbRead(useBatches bool, counter uint64, fun func(interface{}, chan bool) bool) {
    key := make([]byte, 42)
    copy(key, self.start)
    binary.BigEndian.PutUint64(key[34:], counter)
    var batches, n, cnt, total int
    var more bool
    var entry *syncDbEntry
    var it iterator.Iterator
    var del *leveldb.Batch
    batchSizes := make(chan int)

    for {
        // if useBatches is false, cnt is not set
        if useBatches {
            // this could be called before all cnt items sent out
            // so that loop is not blocking while delivering
            // only relevant if cnt is large
            select {
            case self.batch <- batchSizes:
            case <-self.quit:
                return
            }
            // wait for the write to finish and get the item count in the next batch
            cnt = <-batchSizes
            batches++
            if cnt == 0 {
                // empty
                return
            }
        }
        it = self.db.NewIterator()
        it.Seek(key)
        if !it.Valid() {
            copy(key, self.start)
            useBatches = true
            continue
        }
        del = new(leveldb.Batch)
        log.Trace(fmt.Sprintf("syncDb[%v/%v]: new iterator: %x (batch %v, count %v)", self.key.Log(), self.priority, key, batches, cnt))

        for n = 0; !useBatches || n < cnt; it.Next() {
            copy(key, it.Key())
            if len(key) == 0 || key[0] != 0 {
                copy(key, self.start)
                useBatches = true
                break
            }
            val := make([]byte, 40)
            copy(val, it.Value())
            entry = &syncDbEntry{key, val}
            // log.Trace(fmt.Sprintf("syncDb[%v/%v] - %v, batches: %v, total: %v, session total from db: %v/%v", self.key.Log(), self.priority, self.key.Log(), batches, total, self.dbTotal, self.total))
            more = fun(entry, self.quit)
            if !more {
                // quit received when waiting to deliver entry, the entry will not be deleted
                log.Trace(fmt.Sprintf("syncDb[%v/%v] batch %v quit after %v/%v items", self.key.Log(), self.priority, batches, n, cnt))
                break
            }
            // since subsequent batches of the same db session are indexed incrementally
            // deleting earlier batches can be delayed and parallelised
            // this could be batch delete when db is idle (but added complexity esp when quitting)
            del.Delete(key)
            n++
            total++
        }
        log.Debug(fmt.Sprintf("syncDb[%v/%v] - db session closed, batches: %v, total: %v, session total from db: %v/%v", self.key.Log(), self.priority, batches, total, self.dbTotal, self.total))
        self.db.Write(del) // this could be async called only when db is idle
        it.Release()
    }
}

//
func (self *syncDb) stop() {
    close(self.quit)
    <-self.done
}

// calculate a dbkey for the request, for the db to work
// see syncdb for db key structure
// polimorphic: accepted types, see syncer#addRequest
func (self *syncDb) newSyncDbEntry(req interface{}, counter uint64) (entry *syncDbEntry, err error) {
    var key storage.Key
    var chunk *storage.Chunk
    var id uint64
    var ok bool
    var sreq *storeRequestMsgData

    if key, ok = req.(storage.Key); ok {
        id = generateId()
    } else if chunk, ok = req.(*storage.Chunk); ok {
        key = chunk.Key
        id = generateId()
    } else if sreq, ok = req.(*storeRequestMsgData); ok {
        key = sreq.Key
        id = sreq.Id
    } else if entry, ok = req.(*syncDbEntry); !ok {
        return nil, fmt.Errorf("type not allowed: %v (%T)", req, req)
    }

    // order by peer > priority > seqid
    // value is request id if exists
    if entry == nil {
        dbkey := make([]byte, 42)
        dbval := make([]byte, 40)

        // encode key
        copy(dbkey[:], self.start[:34]) // db  peer
        binary.BigEndian.PutUint64(dbkey[34:], counter)
        // encode value
        copy(dbval, key[:])
        binary.BigEndian.PutUint64(dbval[32:], id)

        entry = &syncDbEntry{dbkey, dbval}
    }
    return
}