1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
|
// Copyright 2016 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package network
import (
"encoding/binary"
"fmt"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/swarm/storage"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/iterator"
)
const counterKeyPrefix = 0x01
/*
syncDb is a queueing service for outgoing deliveries.
One instance per priority queue for each peer
a syncDb instance maintains an in-memory buffer (of capacity bufferSize)
once its in-memory buffer is full it switches to persisting in db
and dbRead iterator iterates through the items keeping their order
once the db read catches up (there is no more items in the db) then
it switches back to in-memory buffer.
when syncdb is stopped all items in the buffer are saved to the db
*/
type syncDb struct {
start []byte // this syncdb starting index in requestdb
key storage.Key // remote peers address key
counterKey []byte // db key to persist counter
priority uint // priotity High|Medium|Low
buffer chan interface{} // incoming request channel
db *storage.LDBDatabase // underlying db (TODO should be interface)
done chan bool // chan to signal goroutines finished quitting
quit chan bool // chan to signal quitting to goroutines
total, dbTotal int // counts for one session
batch chan chan int // channel for batch requests
dbBatchSize uint // number of items before batch is saved
}
// constructor needs a shared request db (leveldb)
// priority is used in the index key
// uses a buffer and a leveldb for persistent storage
// bufferSize, dbBatchSize are config parameters
func newSyncDb(db *storage.LDBDatabase, key storage.Key, priority uint, bufferSize, dbBatchSize uint, deliver func(interface{}, chan bool) bool) *syncDb {
start := make([]byte, 42)
start[1] = byte(priorities - priority)
copy(start[2:34], key)
counterKey := make([]byte, 34)
counterKey[0] = counterKeyPrefix
copy(counterKey[1:], start[1:34])
syncdb := &syncDb{
start: start,
key: key,
counterKey: counterKey,
priority: priority,
buffer: make(chan interface{}, bufferSize),
db: db,
done: make(chan bool),
quit: make(chan bool),
batch: make(chan chan int),
dbBatchSize: dbBatchSize,
}
log.Trace(fmt.Sprintf("syncDb[peer: %v, priority: %v] - initialised", key.Log(), priority))
// starts the main forever loop reading from buffer
go syncdb.bufferRead(deliver)
return syncdb
}
/*
bufferRead is a forever iterator loop that takes care of delivering
outgoing store requests reads from incoming buffer
its argument is the deliver function taking the item as first argument
and a quit channel as second.
Closing of this channel is supposed to abort all waiting for delivery
(typically network write)
The iteration switches between 2 modes,
* buffer mode reads the in-memory buffer and delivers the items directly
* db mode reads from the buffer and writes to the db, parallelly another
routine is started that reads from the db and delivers items
If there is buffer contention in buffer mode (slow network, high upload volume)
syncdb switches to db mode and starts dbRead
Once db backlog is delivered, it reverts back to in-memory buffer
It is automatically started when syncdb is initialised.
It saves the buffer to db upon receiving quit signal. syncDb#stop()
*/
func (self *syncDb) bufferRead(deliver func(interface{}, chan bool) bool) {
var buffer, db chan interface{} // channels representing the two read modes
var more bool
var req interface{}
var entry *syncDbEntry
var inBatch, inDb int
batch := new(leveldb.Batch)
var dbSize chan int
quit := self.quit
counterValue := make([]byte, 8)
// counter is used for keeping the items in order, persisted to db
// start counter where db was at, 0 if not found
data, err := self.db.Get(self.counterKey)
var counter uint64
if err == nil {
counter = binary.BigEndian.Uint64(data)
log.Trace(fmt.Sprintf("syncDb[%v/%v] - counter read from db at %v", self.key.Log(), self.priority, counter))
} else {
log.Trace(fmt.Sprintf("syncDb[%v/%v] - counter starts at %v", self.key.Log(), self.priority, counter))
}
LOOP:
for {
// waiting for item next in the buffer, or quit signal or batch request
select {
// buffer only closes when writing to db
case req = <-buffer:
// deliver request : this is blocking on network write so
// it is passed the quit channel as argument, so that it returns
// if syncdb is stopped. In this case we need to save the item to the db
more = deliver(req, self.quit)
if !more {
log.Debug(fmt.Sprintf("syncDb[%v/%v] quit: switching to db. session tally (db/total): %v/%v", self.key.Log(), self.priority, self.dbTotal, self.total))
// received quit signal, save request currently waiting delivery
// by switching to db mode and closing the buffer
buffer = nil
db = self.buffer
close(db)
quit = nil // needs to block the quit case in select
break // break from select, this item will be written to the db
}
self.total++
log.Trace(fmt.Sprintf("syncDb[%v/%v] deliver (db/total): %v/%v", self.key.Log(), self.priority, self.dbTotal, self.total))
// by the time deliver returns, there were new writes to the buffer
// if buffer contention is detected, switch to db mode which drains
// the buffer so no process will block on pushing store requests
if len(buffer) == cap(buffer) {
log.Debug(fmt.Sprintf("syncDb[%v/%v] buffer full %v: switching to db. session tally (db/total): %v/%v", self.key.Log(), self.priority, cap(buffer), self.dbTotal, self.total))
buffer = nil
db = self.buffer
}
continue LOOP
// incoming entry to put into db
case req, more = <-db:
if !more {
// only if quit is called, saved all the buffer
binary.BigEndian.PutUint64(counterValue, counter)
batch.Put(self.counterKey, counterValue) // persist counter in batch
self.writeSyncBatch(batch) // save batch
log.Trace(fmt.Sprintf("syncDb[%v/%v] quitting: save current batch to db", self.key.Log(), self.priority))
break LOOP
}
self.dbTotal++
self.total++
// otherwise break after select
case dbSize = <-self.batch:
// explicit request for batch
if inBatch == 0 && quit != nil {
// there was no writes since the last batch so db depleted
// switch to buffer mode
log.Debug(fmt.Sprintf("syncDb[%v/%v] empty db: switching to buffer", self.key.Log(), self.priority))
db = nil
buffer = self.buffer
dbSize <- 0 // indicates to 'caller' that batch has been written
inDb = 0
continue LOOP
}
binary.BigEndian.PutUint64(counterValue, counter)
batch.Put(self.counterKey, counterValue)
log.Debug(fmt.Sprintf("syncDb[%v/%v] write batch %v/%v - %x - %x", self.key.Log(), self.priority, inBatch, counter, self.counterKey, counterValue))
batch = self.writeSyncBatch(batch)
dbSize <- inBatch // indicates to 'caller' that batch has been written
inBatch = 0
continue LOOP
// closing syncDb#quit channel is used to signal to all goroutines to quit
case <-quit:
// need to save backlog, so switch to db mode
db = self.buffer
buffer = nil
quit = nil
log.Trace(fmt.Sprintf("syncDb[%v/%v] quitting: save buffer to db", self.key.Log(), self.priority))
close(db)
continue LOOP
}
// only get here if we put req into db
entry, err = self.newSyncDbEntry(req, counter)
if err != nil {
log.Warn(fmt.Sprintf("syncDb[%v/%v] saving request %v (#%v/%v) failed: %v", self.key.Log(), self.priority, req, inBatch, inDb, err))
continue LOOP
}
batch.Put(entry.key, entry.val)
log.Trace(fmt.Sprintf("syncDb[%v/%v] to batch %v '%v' (#%v/%v/%v)", self.key.Log(), self.priority, req, entry, inBatch, inDb, counter))
// if just switched to db mode and not quitting, then launch dbRead
// in a parallel go routine to send deliveries from db
if inDb == 0 && quit != nil {
log.Trace(fmt.Sprintf("syncDb[%v/%v] start dbRead", self.key.Log(), self.priority))
go self.dbRead(true, counter, deliver)
}
inDb++
inBatch++
counter++
// need to save the batch if it gets too large (== dbBatchSize)
if inBatch%int(self.dbBatchSize) == 0 {
batch = self.writeSyncBatch(batch)
}
}
log.Info(fmt.Sprintf("syncDb[%v:%v]: saved %v keys (saved counter at %v)", self.key.Log(), self.priority, inBatch, counter))
close(self.done)
}
// writes the batch to the db and returns a new batch object
func (self *syncDb) writeSyncBatch(batch *leveldb.Batch) *leveldb.Batch {
err := self.db.Write(batch)
if err != nil {
log.Warn(fmt.Sprintf("syncDb[%v/%v] saving batch to db failed: %v", self.key.Log(), self.priority, err))
return batch
}
return new(leveldb.Batch)
}
// abstract type for db entries (TODO could be a feature of Receipts)
type syncDbEntry struct {
key, val []byte
}
func (self syncDbEntry) String() string {
return fmt.Sprintf("key: %x, value: %x", self.key, self.val)
}
/*
dbRead is iterating over store requests to be sent over to the peer
this is mainly to prevent crashes due to network output buffer contention (???)
as well as to make syncronisation resilient to disconnects
the messages are supposed to be sent in the p2p priority queue.
the request DB is shared between peers, but domains for each syncdb
are disjoint. dbkeys (42 bytes) are structured:
* 0: 0x00 (0x01 reserved for counter key)
* 1: priorities - priority (so that high priority can be replayed first)
* 2-33: peers address
* 34-41: syncdb counter to preserve order (this field is missing for the counter key)
values (40 bytes) are:
* 0-31: key
* 32-39: request id
dbRead needs a boolean to indicate if on first round all the historical
record is synced. Second argument to indicate current db counter
The third is the function to apply
*/
func (self *syncDb) dbRead(useBatches bool, counter uint64, fun func(interface{}, chan bool) bool) {
key := make([]byte, 42)
copy(key, self.start)
binary.BigEndian.PutUint64(key[34:], counter)
var batches, n, cnt, total int
var more bool
var entry *syncDbEntry
var it iterator.Iterator
var del *leveldb.Batch
batchSizes := make(chan int)
for {
// if useBatches is false, cnt is not set
if useBatches {
// this could be called before all cnt items sent out
// so that loop is not blocking while delivering
// only relevant if cnt is large
select {
case self.batch <- batchSizes:
case <-self.quit:
return
}
// wait for the write to finish and get the item count in the next batch
cnt = <-batchSizes
batches++
if cnt == 0 {
// empty
return
}
}
it = self.db.NewIterator()
it.Seek(key)
if !it.Valid() {
copy(key, self.start)
useBatches = true
continue
}
del = new(leveldb.Batch)
log.Trace(fmt.Sprintf("syncDb[%v/%v]: new iterator: %x (batch %v, count %v)", self.key.Log(), self.priority, key, batches, cnt))
for n = 0; !useBatches || n < cnt; it.Next() {
copy(key, it.Key())
if len(key) == 0 || key[0] != 0 {
copy(key, self.start)
useBatches = true
break
}
val := make([]byte, 40)
copy(val, it.Value())
entry = &syncDbEntry{key, val}
// log.Trace(fmt.Sprintf("syncDb[%v/%v] - %v, batches: %v, total: %v, session total from db: %v/%v", self.key.Log(), self.priority, self.key.Log(), batches, total, self.dbTotal, self.total))
more = fun(entry, self.quit)
if !more {
// quit received when waiting to deliver entry, the entry will not be deleted
log.Trace(fmt.Sprintf("syncDb[%v/%v] batch %v quit after %v/%v items", self.key.Log(), self.priority, batches, n, cnt))
break
}
// since subsequent batches of the same db session are indexed incrementally
// deleting earlier batches can be delayed and parallelised
// this could be batch delete when db is idle (but added complexity esp when quitting)
del.Delete(key)
n++
total++
}
log.Debug(fmt.Sprintf("syncDb[%v/%v] - db session closed, batches: %v, total: %v, session total from db: %v/%v", self.key.Log(), self.priority, batches, total, self.dbTotal, self.total))
self.db.Write(del) // this could be async called only when db is idle
it.Release()
}
}
//
func (self *syncDb) stop() {
close(self.quit)
<-self.done
}
// calculate a dbkey for the request, for the db to work
// see syncdb for db key structure
// polimorphic: accepted types, see syncer#addRequest
func (self *syncDb) newSyncDbEntry(req interface{}, counter uint64) (entry *syncDbEntry, err error) {
var key storage.Key
var chunk *storage.Chunk
var id uint64
var ok bool
var sreq *storeRequestMsgData
if key, ok = req.(storage.Key); ok {
id = generateId()
} else if chunk, ok = req.(*storage.Chunk); ok {
key = chunk.Key
id = generateId()
} else if sreq, ok = req.(*storeRequestMsgData); ok {
key = sreq.Key
id = sreq.Id
} else if entry, ok = req.(*syncDbEntry); !ok {
return nil, fmt.Errorf("type not allowed: %v (%T)", req, req)
}
// order by peer > priority > seqid
// value is request id if exists
if entry == nil {
dbkey := make([]byte, 42)
dbval := make([]byte, 40)
// encode key
copy(dbkey[:], self.start[:34]) // db peer
binary.BigEndian.PutUint64(dbkey[34:], counter)
// encode value
copy(dbval, key[:])
binary.BigEndian.PutUint64(dbval[32:], id)
entry = &syncDbEntry{dbkey, dbval}
}
return
}
|