aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/network/syncdb.go
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/network/syncdb.go')
-rw-r--r--swarm/network/syncdb.go390
1 files changed, 390 insertions, 0 deletions
diff --git a/swarm/network/syncdb.go b/swarm/network/syncdb.go
new file mode 100644
index 000000000..cef32610f
--- /dev/null
+++ b/swarm/network/syncdb.go
@@ -0,0 +1,390 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package network
+
+import (
+ "encoding/binary"
+ "fmt"
+
+ "github.com/ethereum/go-ethereum/logger"
+ "github.com/ethereum/go-ethereum/logger/glog"
+ "github.com/ethereum/go-ethereum/swarm/storage"
+ "github.com/syndtr/goleveldb/leveldb"
+ "github.com/syndtr/goleveldb/leveldb/iterator"
+)
+
+const counterKeyPrefix = 0x01
+
+/*
+syncDb is a queueing service for outgoing deliveries.
+One instance per priority queue for each peer
+
+a syncDb instance maintains an in-memory buffer (of capacity bufferSize)
+once its in-memory buffer is full it switches to persisting in db
+and dbRead iterator iterates through the items keeping their order
+once the db read catches up (there is no more items in the db) then
+it switches back to in-memory buffer.
+
+when syncdb is stopped all items in the buffer are saved to the db
+*/
+type syncDb struct {
+ start []byte // this syncdb starting index in requestdb
+ key storage.Key // remote peers address key
+ counterKey []byte // db key to persist counter
+ priority uint // priotity High|Medium|Low
+ buffer chan interface{} // incoming request channel
+ db *storage.LDBDatabase // underlying db (TODO should be interface)
+ done chan bool // chan to signal goroutines finished quitting
+ quit chan bool // chan to signal quitting to goroutines
+ total, dbTotal int // counts for one session
+ batch chan chan int // channel for batch requests
+ dbBatchSize uint // number of items before batch is saved
+}
+
+// constructor needs a shared request db (leveldb)
+// priority is used in the index key
+// uses a buffer and a leveldb for persistent storage
+// bufferSize, dbBatchSize are config parameters
+func newSyncDb(db *storage.LDBDatabase, key storage.Key, priority uint, bufferSize, dbBatchSize uint, deliver func(interface{}, chan bool) bool) *syncDb {
+ start := make([]byte, 42)
+ start[1] = byte(priorities - priority)
+ copy(start[2:34], key)
+
+ counterKey := make([]byte, 34)
+ counterKey[0] = counterKeyPrefix
+ copy(counterKey[1:], start[1:34])
+
+ syncdb := &syncDb{
+ start: start,
+ key: key,
+ counterKey: counterKey,
+ priority: priority,
+ buffer: make(chan interface{}, bufferSize),
+ db: db,
+ done: make(chan bool),
+ quit: make(chan bool),
+ batch: make(chan chan int),
+ dbBatchSize: dbBatchSize,
+ }
+ glog.V(logger.Detail).Infof("syncDb[peer: %v, priority: %v] - initialised", key.Log(), priority)
+
+ // starts the main forever loop reading from buffer
+ go syncdb.bufferRead(deliver)
+ return syncdb
+}
+
+/*
+bufferRead is a forever iterator loop that takes care of delivering
+outgoing store requests reads from incoming buffer
+
+its argument is the deliver function taking the item as first argument
+and a quit channel as second.
+Closing of this channel is supposed to abort all waiting for delivery
+(typically network write)
+
+The iteration switches between 2 modes,
+* buffer mode reads the in-memory buffer and delivers the items directly
+* db mode reads from the buffer and writes to the db, parallelly another
+routine is started that reads from the db and delivers items
+
+If there is buffer contention in buffer mode (slow network, high upload volume)
+syncdb switches to db mode and starts dbRead
+Once db backlog is delivered, it reverts back to in-memory buffer
+
+It is automatically started when syncdb is initialised.
+
+It saves the buffer to db upon receiving quit signal. syncDb#stop()
+*/
+func (self *syncDb) bufferRead(deliver func(interface{}, chan bool) bool) {
+ var buffer, db chan interface{} // channels representing the two read modes
+ var more bool
+ var req interface{}
+ var entry *syncDbEntry
+ var inBatch, inDb int
+ batch := new(leveldb.Batch)
+ var dbSize chan int
+ quit := self.quit
+ counterValue := make([]byte, 8)
+
+ // counter is used for keeping the items in order, persisted to db
+ // start counter where db was at, 0 if not found
+ data, err := self.db.Get(self.counterKey)
+ var counter uint64
+ if err == nil {
+ counter = binary.BigEndian.Uint64(data)
+ glog.V(logger.Detail).Infof("syncDb[%v/%v] - counter read from db at %v", self.key.Log(), self.priority, counter)
+ } else {
+ glog.V(logger.Detail).Infof("syncDb[%v/%v] - counter starts at %v", self.key.Log(), self.priority, counter)
+ }
+
+LOOP:
+ for {
+ // waiting for item next in the buffer, or quit signal or batch request
+ select {
+ // buffer only closes when writing to db
+ case req = <-buffer:
+ // deliver request : this is blocking on network write so
+ // it is passed the quit channel as argument, so that it returns
+ // if syncdb is stopped. In this case we need to save the item to the db
+ more = deliver(req, self.quit)
+ if !more {
+ glog.V(logger.Debug).Infof("syncDb[%v/%v] quit: switching to db. session tally (db/total): %v/%v", self.key.Log(), self.priority, self.dbTotal, self.total)
+ // received quit signal, save request currently waiting delivery
+ // by switching to db mode and closing the buffer
+ buffer = nil
+ db = self.buffer
+ close(db)
+ quit = nil // needs to block the quit case in select
+ break // break from select, this item will be written to the db
+ }
+ self.total++
+ glog.V(logger.Detail).Infof("syncDb[%v/%v] deliver (db/total): %v/%v", self.key.Log(), self.priority, self.dbTotal, self.total)
+ // by the time deliver returns, there were new writes to the buffer
+ // if buffer contention is detected, switch to db mode which drains
+ // the buffer so no process will block on pushing store requests
+ if len(buffer) == cap(buffer) {
+ glog.V(logger.Debug).Infof("syncDb[%v/%v] buffer full %v: switching to db. session tally (db/total): %v/%v", self.key.Log(), self.priority, cap(buffer), self.dbTotal, self.total)
+ buffer = nil
+ db = self.buffer
+ }
+ continue LOOP
+
+ // incoming entry to put into db
+ case req, more = <-db:
+ if !more {
+ // only if quit is called, saved all the buffer
+ binary.BigEndian.PutUint64(counterValue, counter)
+ batch.Put(self.counterKey, counterValue) // persist counter in batch
+ self.writeSyncBatch(batch) // save batch
+ glog.V(logger.Detail).Infof("syncDb[%v/%v] quitting: save current batch to db", self.key.Log(), self.priority)
+ break LOOP
+ }
+ self.dbTotal++
+ self.total++
+ // otherwise break after select
+ case dbSize = <-self.batch:
+ // explicit request for batch
+ if inBatch == 0 && quit != nil {
+ // there was no writes since the last batch so db depleted
+ // switch to buffer mode
+ glog.V(logger.Debug).Infof("syncDb[%v/%v] empty db: switching to buffer", self.key.Log(), self.priority)
+ db = nil
+ buffer = self.buffer
+ dbSize <- 0 // indicates to 'caller' that batch has been written
+ inDb = 0
+ continue LOOP
+ }
+ binary.BigEndian.PutUint64(counterValue, counter)
+ batch.Put(self.counterKey, counterValue)
+ glog.V(logger.Debug).Infof("syncDb[%v/%v] write batch %v/%v - %x - %x", self.key.Log(), self.priority, inBatch, counter, self.counterKey, counterValue)
+ batch = self.writeSyncBatch(batch)
+ dbSize <- inBatch // indicates to 'caller' that batch has been written
+ inBatch = 0
+ continue LOOP
+
+ // closing syncDb#quit channel is used to signal to all goroutines to quit
+ case <-quit:
+ // need to save backlog, so switch to db mode
+ db = self.buffer
+ buffer = nil
+ quit = nil
+ glog.V(logger.Detail).Infof("syncDb[%v/%v] quitting: save buffer to db", self.key.Log(), self.priority)
+ close(db)
+ continue LOOP
+ }
+
+ // only get here if we put req into db
+ entry, err = self.newSyncDbEntry(req, counter)
+ if err != nil {
+ glog.V(logger.Warn).Infof("syncDb[%v/%v] saving request %v (#%v/%v) failed: %v", self.key.Log(), self.priority, req, inBatch, inDb, err)
+ continue LOOP
+ }
+ batch.Put(entry.key, entry.val)
+ glog.V(logger.Detail).Infof("syncDb[%v/%v] to batch %v '%v' (#%v/%v/%v)", self.key.Log(), self.priority, req, entry, inBatch, inDb, counter)
+ // if just switched to db mode and not quitting, then launch dbRead
+ // in a parallel go routine to send deliveries from db
+ if inDb == 0 && quit != nil {
+ glog.V(logger.Detail).Infof("syncDb[%v/%v] start dbRead")
+ go self.dbRead(true, counter, deliver)
+ }
+ inDb++
+ inBatch++
+ counter++
+ // need to save the batch if it gets too large (== dbBatchSize)
+ if inBatch%int(self.dbBatchSize) == 0 {
+ batch = self.writeSyncBatch(batch)
+ }
+ }
+ glog.V(logger.Info).Infof("syncDb[%v:%v]: saved %v keys (saved counter at %v)", self.key.Log(), self.priority, inBatch, counter)
+ close(self.done)
+}
+
+// writes the batch to the db and returns a new batch object
+func (self *syncDb) writeSyncBatch(batch *leveldb.Batch) *leveldb.Batch {
+ err := self.db.Write(batch)
+ if err != nil {
+ glog.V(logger.Warn).Infof("syncDb[%v/%v] saving batch to db failed: %v", self.key.Log(), self.priority, err)
+ return batch
+ }
+ return new(leveldb.Batch)
+}
+
+// abstract type for db entries (TODO could be a feature of Receipts)
+type syncDbEntry struct {
+ key, val []byte
+}
+
+func (self syncDbEntry) String() string {
+ return fmt.Sprintf("key: %x, value: %x", self.key, self.val)
+}
+
+/*
+ dbRead is iterating over store requests to be sent over to the peer
+ this is mainly to prevent crashes due to network output buffer contention (???)
+ as well as to make syncronisation resilient to disconnects
+ the messages are supposed to be sent in the p2p priority queue.
+
+ the request DB is shared between peers, but domains for each syncdb
+ are disjoint. dbkeys (42 bytes) are structured:
+ * 0: 0x00 (0x01 reserved for counter key)
+ * 1: priorities - priority (so that high priority can be replayed first)
+ * 2-33: peers address
+ * 34-41: syncdb counter to preserve order (this field is missing for the counter key)
+
+ values (40 bytes) are:
+ * 0-31: key
+ * 32-39: request id
+
+dbRead needs a boolean to indicate if on first round all the historical
+record is synced. Second argument to indicate current db counter
+The third is the function to apply
+*/
+func (self *syncDb) dbRead(useBatches bool, counter uint64, fun func(interface{}, chan bool) bool) {
+ key := make([]byte, 42)
+ copy(key, self.start)
+ binary.BigEndian.PutUint64(key[34:], counter)
+ var batches, n, cnt, total int
+ var more bool
+ var entry *syncDbEntry
+ var it iterator.Iterator
+ var del *leveldb.Batch
+ batchSizes := make(chan int)
+
+ for {
+ // if useBatches is false, cnt is not set
+ if useBatches {
+ // this could be called before all cnt items sent out
+ // so that loop is not blocking while delivering
+ // only relevant if cnt is large
+ select {
+ case self.batch <- batchSizes:
+ case <-self.quit:
+ return
+ }
+ // wait for the write to finish and get the item count in the next batch
+ cnt = <-batchSizes
+ batches++
+ if cnt == 0 {
+ // empty
+ return
+ }
+ }
+ it = self.db.NewIterator()
+ it.Seek(key)
+ if !it.Valid() {
+ copy(key, self.start)
+ useBatches = true
+ continue
+ }
+ del = new(leveldb.Batch)
+ glog.V(logger.Detail).Infof("syncDb[%v/%v]: new iterator: %x (batch %v, count %v)", self.key.Log(), self.priority, key, batches, cnt)
+
+ for n = 0; !useBatches || n < cnt; it.Next() {
+ copy(key, it.Key())
+ if len(key) == 0 || key[0] != 0 {
+ copy(key, self.start)
+ useBatches = true
+ break
+ }
+ val := make([]byte, 40)
+ copy(val, it.Value())
+ entry = &syncDbEntry{key, val}
+ // glog.V(logger.Detail).Infof("syncDb[%v/%v] - %v, batches: %v, total: %v, session total from db: %v/%v", self.key.Log(), self.priority, self.key.Log(), batches, total, self.dbTotal, self.total)
+ more = fun(entry, self.quit)
+ if !more {
+ // quit received when waiting to deliver entry, the entry will not be deleted
+ glog.V(logger.Detail).Infof("syncDb[%v/%v] batch %v quit after %v/%v items", self.key.Log(), self.priority, batches, n, cnt)
+ break
+ }
+ // since subsequent batches of the same db session are indexed incrementally
+ // deleting earlier batches can be delayed and parallelised
+ // this could be batch delete when db is idle (but added complexity esp when quitting)
+ del.Delete(key)
+ n++
+ total++
+ }
+ glog.V(logger.Debug).Infof("syncDb[%v/%v] - db session closed, batches: %v, total: %v, session total from db: %v/%v", self.key.Log(), self.priority, batches, total, self.dbTotal, self.total)
+ self.db.Write(del) // this could be async called only when db is idle
+ it.Release()
+ }
+}
+
+//
+func (self *syncDb) stop() {
+ close(self.quit)
+ <-self.done
+}
+
+// calculate a dbkey for the request, for the db to work
+// see syncdb for db key structure
+// polimorphic: accepted types, see syncer#addRequest
+func (self *syncDb) newSyncDbEntry(req interface{}, counter uint64) (entry *syncDbEntry, err error) {
+ var key storage.Key
+ var chunk *storage.Chunk
+ var id uint64
+ var ok bool
+ var sreq *storeRequestMsgData
+
+ if key, ok = req.(storage.Key); ok {
+ id = generateId()
+ } else if chunk, ok = req.(*storage.Chunk); ok {
+ key = chunk.Key
+ id = generateId()
+ } else if sreq, ok = req.(*storeRequestMsgData); ok {
+ key = sreq.Key
+ id = sreq.Id
+ } else if entry, ok = req.(*syncDbEntry); !ok {
+ return nil, fmt.Errorf("type not allowed: %v (%T)", req, req)
+ }
+
+ // order by peer > priority > seqid
+ // value is request id if exists
+ if entry == nil {
+ dbkey := make([]byte, 42)
+ dbval := make([]byte, 40)
+
+ // encode key
+ copy(dbkey[:], self.start[:34]) // db peer
+ binary.BigEndian.PutUint64(dbkey[34:], counter)
+ // encode value
+ copy(dbval, key[:])
+ binary.BigEndian.PutUint64(dbval[32:], id)
+
+ entry = &syncDbEntry{dbkey, dbval}
+ }
+ return
+}