diff options
Diffstat (limited to 'swarm/network/syncdb.go')
-rw-r--r-- | swarm/network/syncdb.go | 390 |
1 files changed, 390 insertions, 0 deletions
diff --git a/swarm/network/syncdb.go b/swarm/network/syncdb.go new file mode 100644 index 000000000..cef32610f --- /dev/null +++ b/swarm/network/syncdb.go @@ -0,0 +1,390 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package network + +import ( + "encoding/binary" + "fmt" + + "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/logger/glog" + "github.com/ethereum/go-ethereum/swarm/storage" + "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/iterator" +) + +const counterKeyPrefix = 0x01 + +/* +syncDb is a queueing service for outgoing deliveries. +One instance per priority queue for each peer + +a syncDb instance maintains an in-memory buffer (of capacity bufferSize) +once its in-memory buffer is full it switches to persisting in db +and dbRead iterator iterates through the items keeping their order +once the db read catches up (there is no more items in the db) then +it switches back to in-memory buffer. + +when syncdb is stopped all items in the buffer are saved to the db +*/ +type syncDb struct { + start []byte // this syncdb starting index in requestdb + key storage.Key // remote peers address key + counterKey []byte // db key to persist counter + priority uint // priotity High|Medium|Low + buffer chan interface{} // incoming request channel + db *storage.LDBDatabase // underlying db (TODO should be interface) + done chan bool // chan to signal goroutines finished quitting + quit chan bool // chan to signal quitting to goroutines + total, dbTotal int // counts for one session + batch chan chan int // channel for batch requests + dbBatchSize uint // number of items before batch is saved +} + +// constructor needs a shared request db (leveldb) +// priority is used in the index key +// uses a buffer and a leveldb for persistent storage +// bufferSize, dbBatchSize are config parameters +func newSyncDb(db *storage.LDBDatabase, key storage.Key, priority uint, bufferSize, dbBatchSize uint, deliver func(interface{}, chan bool) bool) *syncDb { + start := make([]byte, 42) + start[1] = byte(priorities - priority) + copy(start[2:34], key) + + counterKey := make([]byte, 34) + counterKey[0] = counterKeyPrefix + copy(counterKey[1:], start[1:34]) + + syncdb := &syncDb{ + start: start, + key: key, + counterKey: counterKey, + priority: priority, + buffer: make(chan interface{}, bufferSize), + db: db, + done: make(chan bool), + quit: make(chan bool), + batch: make(chan chan int), + dbBatchSize: dbBatchSize, + } + glog.V(logger.Detail).Infof("syncDb[peer: %v, priority: %v] - initialised", key.Log(), priority) + + // starts the main forever loop reading from buffer + go syncdb.bufferRead(deliver) + return syncdb +} + +/* +bufferRead is a forever iterator loop that takes care of delivering +outgoing store requests reads from incoming buffer + +its argument is the deliver function taking the item as first argument +and a quit channel as second. +Closing of this channel is supposed to abort all waiting for delivery +(typically network write) + +The iteration switches between 2 modes, +* buffer mode reads the in-memory buffer and delivers the items directly +* db mode reads from the buffer and writes to the db, parallelly another +routine is started that reads from the db and delivers items + +If there is buffer contention in buffer mode (slow network, high upload volume) +syncdb switches to db mode and starts dbRead +Once db backlog is delivered, it reverts back to in-memory buffer + +It is automatically started when syncdb is initialised. + +It saves the buffer to db upon receiving quit signal. syncDb#stop() +*/ +func (self *syncDb) bufferRead(deliver func(interface{}, chan bool) bool) { + var buffer, db chan interface{} // channels representing the two read modes + var more bool + var req interface{} + var entry *syncDbEntry + var inBatch, inDb int + batch := new(leveldb.Batch) + var dbSize chan int + quit := self.quit + counterValue := make([]byte, 8) + + // counter is used for keeping the items in order, persisted to db + // start counter where db was at, 0 if not found + data, err := self.db.Get(self.counterKey) + var counter uint64 + if err == nil { + counter = binary.BigEndian.Uint64(data) + glog.V(logger.Detail).Infof("syncDb[%v/%v] - counter read from db at %v", self.key.Log(), self.priority, counter) + } else { + glog.V(logger.Detail).Infof("syncDb[%v/%v] - counter starts at %v", self.key.Log(), self.priority, counter) + } + +LOOP: + for { + // waiting for item next in the buffer, or quit signal or batch request + select { + // buffer only closes when writing to db + case req = <-buffer: + // deliver request : this is blocking on network write so + // it is passed the quit channel as argument, so that it returns + // if syncdb is stopped. In this case we need to save the item to the db + more = deliver(req, self.quit) + if !more { + glog.V(logger.Debug).Infof("syncDb[%v/%v] quit: switching to db. session tally (db/total): %v/%v", self.key.Log(), self.priority, self.dbTotal, self.total) + // received quit signal, save request currently waiting delivery + // by switching to db mode and closing the buffer + buffer = nil + db = self.buffer + close(db) + quit = nil // needs to block the quit case in select + break // break from select, this item will be written to the db + } + self.total++ + glog.V(logger.Detail).Infof("syncDb[%v/%v] deliver (db/total): %v/%v", self.key.Log(), self.priority, self.dbTotal, self.total) + // by the time deliver returns, there were new writes to the buffer + // if buffer contention is detected, switch to db mode which drains + // the buffer so no process will block on pushing store requests + if len(buffer) == cap(buffer) { + glog.V(logger.Debug).Infof("syncDb[%v/%v] buffer full %v: switching to db. session tally (db/total): %v/%v", self.key.Log(), self.priority, cap(buffer), self.dbTotal, self.total) + buffer = nil + db = self.buffer + } + continue LOOP + + // incoming entry to put into db + case req, more = <-db: + if !more { + // only if quit is called, saved all the buffer + binary.BigEndian.PutUint64(counterValue, counter) + batch.Put(self.counterKey, counterValue) // persist counter in batch + self.writeSyncBatch(batch) // save batch + glog.V(logger.Detail).Infof("syncDb[%v/%v] quitting: save current batch to db", self.key.Log(), self.priority) + break LOOP + } + self.dbTotal++ + self.total++ + // otherwise break after select + case dbSize = <-self.batch: + // explicit request for batch + if inBatch == 0 && quit != nil { + // there was no writes since the last batch so db depleted + // switch to buffer mode + glog.V(logger.Debug).Infof("syncDb[%v/%v] empty db: switching to buffer", self.key.Log(), self.priority) + db = nil + buffer = self.buffer + dbSize <- 0 // indicates to 'caller' that batch has been written + inDb = 0 + continue LOOP + } + binary.BigEndian.PutUint64(counterValue, counter) + batch.Put(self.counterKey, counterValue) + glog.V(logger.Debug).Infof("syncDb[%v/%v] write batch %v/%v - %x - %x", self.key.Log(), self.priority, inBatch, counter, self.counterKey, counterValue) + batch = self.writeSyncBatch(batch) + dbSize <- inBatch // indicates to 'caller' that batch has been written + inBatch = 0 + continue LOOP + + // closing syncDb#quit channel is used to signal to all goroutines to quit + case <-quit: + // need to save backlog, so switch to db mode + db = self.buffer + buffer = nil + quit = nil + glog.V(logger.Detail).Infof("syncDb[%v/%v] quitting: save buffer to db", self.key.Log(), self.priority) + close(db) + continue LOOP + } + + // only get here if we put req into db + entry, err = self.newSyncDbEntry(req, counter) + if err != nil { + glog.V(logger.Warn).Infof("syncDb[%v/%v] saving request %v (#%v/%v) failed: %v", self.key.Log(), self.priority, req, inBatch, inDb, err) + continue LOOP + } + batch.Put(entry.key, entry.val) + glog.V(logger.Detail).Infof("syncDb[%v/%v] to batch %v '%v' (#%v/%v/%v)", self.key.Log(), self.priority, req, entry, inBatch, inDb, counter) + // if just switched to db mode and not quitting, then launch dbRead + // in a parallel go routine to send deliveries from db + if inDb == 0 && quit != nil { + glog.V(logger.Detail).Infof("syncDb[%v/%v] start dbRead") + go self.dbRead(true, counter, deliver) + } + inDb++ + inBatch++ + counter++ + // need to save the batch if it gets too large (== dbBatchSize) + if inBatch%int(self.dbBatchSize) == 0 { + batch = self.writeSyncBatch(batch) + } + } + glog.V(logger.Info).Infof("syncDb[%v:%v]: saved %v keys (saved counter at %v)", self.key.Log(), self.priority, inBatch, counter) + close(self.done) +} + +// writes the batch to the db and returns a new batch object +func (self *syncDb) writeSyncBatch(batch *leveldb.Batch) *leveldb.Batch { + err := self.db.Write(batch) + if err != nil { + glog.V(logger.Warn).Infof("syncDb[%v/%v] saving batch to db failed: %v", self.key.Log(), self.priority, err) + return batch + } + return new(leveldb.Batch) +} + +// abstract type for db entries (TODO could be a feature of Receipts) +type syncDbEntry struct { + key, val []byte +} + +func (self syncDbEntry) String() string { + return fmt.Sprintf("key: %x, value: %x", self.key, self.val) +} + +/* + dbRead is iterating over store requests to be sent over to the peer + this is mainly to prevent crashes due to network output buffer contention (???) + as well as to make syncronisation resilient to disconnects + the messages are supposed to be sent in the p2p priority queue. + + the request DB is shared between peers, but domains for each syncdb + are disjoint. dbkeys (42 bytes) are structured: + * 0: 0x00 (0x01 reserved for counter key) + * 1: priorities - priority (so that high priority can be replayed first) + * 2-33: peers address + * 34-41: syncdb counter to preserve order (this field is missing for the counter key) + + values (40 bytes) are: + * 0-31: key + * 32-39: request id + +dbRead needs a boolean to indicate if on first round all the historical +record is synced. Second argument to indicate current db counter +The third is the function to apply +*/ +func (self *syncDb) dbRead(useBatches bool, counter uint64, fun func(interface{}, chan bool) bool) { + key := make([]byte, 42) + copy(key, self.start) + binary.BigEndian.PutUint64(key[34:], counter) + var batches, n, cnt, total int + var more bool + var entry *syncDbEntry + var it iterator.Iterator + var del *leveldb.Batch + batchSizes := make(chan int) + + for { + // if useBatches is false, cnt is not set + if useBatches { + // this could be called before all cnt items sent out + // so that loop is not blocking while delivering + // only relevant if cnt is large + select { + case self.batch <- batchSizes: + case <-self.quit: + return + } + // wait for the write to finish and get the item count in the next batch + cnt = <-batchSizes + batches++ + if cnt == 0 { + // empty + return + } + } + it = self.db.NewIterator() + it.Seek(key) + if !it.Valid() { + copy(key, self.start) + useBatches = true + continue + } + del = new(leveldb.Batch) + glog.V(logger.Detail).Infof("syncDb[%v/%v]: new iterator: %x (batch %v, count %v)", self.key.Log(), self.priority, key, batches, cnt) + + for n = 0; !useBatches || n < cnt; it.Next() { + copy(key, it.Key()) + if len(key) == 0 || key[0] != 0 { + copy(key, self.start) + useBatches = true + break + } + val := make([]byte, 40) + copy(val, it.Value()) + entry = &syncDbEntry{key, val} + // glog.V(logger.Detail).Infof("syncDb[%v/%v] - %v, batches: %v, total: %v, session total from db: %v/%v", self.key.Log(), self.priority, self.key.Log(), batches, total, self.dbTotal, self.total) + more = fun(entry, self.quit) + if !more { + // quit received when waiting to deliver entry, the entry will not be deleted + glog.V(logger.Detail).Infof("syncDb[%v/%v] batch %v quit after %v/%v items", self.key.Log(), self.priority, batches, n, cnt) + break + } + // since subsequent batches of the same db session are indexed incrementally + // deleting earlier batches can be delayed and parallelised + // this could be batch delete when db is idle (but added complexity esp when quitting) + del.Delete(key) + n++ + total++ + } + glog.V(logger.Debug).Infof("syncDb[%v/%v] - db session closed, batches: %v, total: %v, session total from db: %v/%v", self.key.Log(), self.priority, batches, total, self.dbTotal, self.total) + self.db.Write(del) // this could be async called only when db is idle + it.Release() + } +} + +// +func (self *syncDb) stop() { + close(self.quit) + <-self.done +} + +// calculate a dbkey for the request, for the db to work +// see syncdb for db key structure +// polimorphic: accepted types, see syncer#addRequest +func (self *syncDb) newSyncDbEntry(req interface{}, counter uint64) (entry *syncDbEntry, err error) { + var key storage.Key + var chunk *storage.Chunk + var id uint64 + var ok bool + var sreq *storeRequestMsgData + + if key, ok = req.(storage.Key); ok { + id = generateId() + } else if chunk, ok = req.(*storage.Chunk); ok { + key = chunk.Key + id = generateId() + } else if sreq, ok = req.(*storeRequestMsgData); ok { + key = sreq.Key + id = sreq.Id + } else if entry, ok = req.(*syncDbEntry); !ok { + return nil, fmt.Errorf("type not allowed: %v (%T)", req, req) + } + + // order by peer > priority > seqid + // value is request id if exists + if entry == nil { + dbkey := make([]byte, 42) + dbval := make([]byte, 40) + + // encode key + copy(dbkey[:], self.start[:34]) // db peer + binary.BigEndian.PutUint64(dbkey[34:], counter) + // encode value + copy(dbval, key[:]) + binary.BigEndian.PutUint64(dbval[32:], id) + + entry = &syncDbEntry{dbkey, dbval} + } + return +} |