// Copyright 2016 The go-ethereum Authors // This file is part of the go-ethereum library. // // The go-ethereum library is free software: you can redistribute it and/or modify // it under the terms of the GNU Lesser General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // The go-ethereum library is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU Lesser General Public License for more details. // // You should have received a copy of the GNU Lesser General Public License // along with the go-ethereum library. If not, see . package network import ( "encoding/binary" "fmt" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/swarm/storage" "github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb/iterator" ) const counterKeyPrefix = 0x01 /* syncDb is a queueing service for outgoing deliveries. One instance per priority queue for each peer a syncDb instance maintains an in-memory buffer (of capacity bufferSize) once its in-memory buffer is full it switches to persisting in db and dbRead iterator iterates through the items keeping their order once the db read catches up (there is no more items in the db) then it switches back to in-memory buffer. when syncdb is stopped all items in the buffer are saved to the db */ type syncDb struct { start []byte // this syncdb starting index in requestdb key storage.Key // remote peers address key counterKey []byte // db key to persist counter priority uint // priotity High|Medium|Low buffer chan interface{} // incoming request channel db *storage.LDBDatabase // underlying db (TODO should be interface) done chan bool // chan to signal goroutines finished quitting quit chan bool // chan to signal quitting to goroutines total, dbTotal int // counts for one session batch chan chan int // channel for batch requests dbBatchSize uint // number of items before batch is saved } // constructor needs a shared request db (leveldb) // priority is used in the index key // uses a buffer and a leveldb for persistent storage // bufferSize, dbBatchSize are config parameters func newSyncDb(db *storage.LDBDatabase, key storage.Key, priority uint, bufferSize, dbBatchSize uint, deliver func(interface{}, chan bool) bool) *syncDb { start := make([]byte, 42) start[1] = byte(priorities - priority) copy(start[2:34], key) counterKey := make([]byte, 34) counterKey[0] = counterKeyPrefix copy(counterKey[1:], start[1:34]) syncdb := &syncDb{ start: start, key: key, counterKey: counterKey, priority: priority, buffer: make(chan interface{}, bufferSize), db: db, done: make(chan bool), quit: make(chan bool), batch: make(chan chan int), dbBatchSize: dbBatchSize, } glog.V(logger.Detail).Infof("syncDb[peer: %v, priority: %v] - initialised", key.Log(), priority) // starts the main forever loop reading from buffer go syncdb.bufferRead(deliver) return syncdb } /* bufferRead is a forever iterator loop that takes care of delivering outgoing store requests reads from incoming buffer its argument is the deliver function taking the item as first argument and a quit channel as second. Closing of this channel is supposed to abort all waiting for delivery (typically network write) The iteration switches between 2 modes, * buffer mode reads the in-memory buffer and delivers the items directly * db mode reads from the buffer and writes to the db, parallelly another routine is started that reads from the db and delivers items If there is buffer contention in buffer mode (slow network, high upload volume) syncdb switches to db mode and starts dbRead Once db backlog is delivered, it reverts back to in-memory buffer It is automatically started when syncdb is initialised. It saves the buffer to db upon receiving quit signal. syncDb#stop() */ func (self *syncDb) bufferRead(deliver func(interface{}, chan bool) bool) { var buffer, db chan interface{} // channels representing the two read modes var more bool var req interface{} var entry *syncDbEntry var inBatch, inDb int batch := new(leveldb.Batch) var dbSize chan int quit := self.quit counterValue := make([]byte, 8) // counter is used for keeping the items in order, persisted to db // start counter where db was at, 0 if not found data, err := self.db.Get(self.counterKey) var counter uint64 if err == nil { counter = binary.BigEndian.Uint64(data) glog.V(logger.Detail).Infof("syncDb[%v/%v] - counter read from db at %v", self.key.Log(), self.priority, counter) } else { glog.V(logger.Detail).Infof("syncDb[%v/%v] - counter starts at %v", self.key.Log(), self.priority, counter) } LOOP: for { // waiting for item next in the buffer, or quit signal or batch request select { // buffer only closes when writing to db case req = <-buffer: // deliver request : this is blocking on network write so // it is passed the quit channel as argument, so that it returns // if syncdb is stopped. In this case we need to save the item to the db more = deliver(req, self.quit) if !more { glog.V(logger.Debug).Infof("syncDb[%v/%v] quit: switching to db. session tally (db/total): %v/%v", self.key.Log(), self.priority, self.dbTotal, self.total) // received quit signal, save request currently waiting delivery // by switching to db mode and closing the buffer buffer = nil db = self.buffer close(db) quit = nil // needs to block the quit case in select break // break from select, this item will be written to the db } self.total++ glog.V(logger.Detail).Infof("syncDb[%v/%v] deliver (db/total): %v/%v", self.key.Log(), self.priority, self.dbTotal, self.total) // by the time deliver returns, there were new writes to the buffer // if buffer contention is detected, switch to db mode which drains // the buffer so no process will block on pushing store requests if len(buffer) == cap(buffer) { glog.V(logger.Debug).Infof("syncDb[%v/%v] buffer full %v: switching to db. session tally (db/total): %v/%v", self.key.Log(), self.priority, cap(buffer), self.dbTotal, self.total) buffer = nil db = self.buffer } continue LOOP // incoming entry to put into db case req, more = <-db: if !more { // only if quit is called, saved all the buffer binary.BigEndian.PutUint64(counterValue, counter) batch.Put(self.counterKey, counterValue) // persist counter in batch self.writeSyncBatch(batch) // save batch glog.V(logger.Detail).Infof("syncDb[%v/%v] quitting: save current batch to db", self.key.Log(), self.priority) break LOOP } self.dbTotal++ self.total++ // otherwise break after select case dbSize = <-self.batch: // explicit request for batch if inBatch == 0 && quit != nil { // there was no writes since the last batch so db depleted // switch to buffer mode glog.V(logger.Debug).Infof("syncDb[%v/%v] empty db: switching to buffer", self.key.Log(), self.priority) db = nil buffer = self.buffer dbSize <- 0 // indicates to 'caller' that batch has been written inDb = 0 continue LOOP } binary.BigEndian.PutUint64(counterValue, counter) batch.Put(self.counterKey, counterValue) glog.V(logger.Debug).Infof("syncDb[%v/%v] write batch %v/%v - %x - %x", self.key.Log(), self.priority, inBatch, counter, self.counterKey, counterValue) batch = self.writeSyncBatch(batch) dbSize <- inBatch // indicates to 'caller' that batch has been written inBatch = 0 continue LOOP // closing syncDb#quit channel is used to signal to all goroutines to quit case <-quit: // need to save backlog, so switch to db mode db = self.buffer buffer = nil quit = nil glog.V(logger.Detail).Infof("syncDb[%v/%v] quitting: save buffer to db", self.key.Log(), self.priority) close(db) continue LOOP } // only get here if we put req into db entry, err = self.newSyncDbEntry(req, counter) if err != nil { glog.V(logger.Warn).Infof("syncDb[%v/%v] saving request %v (#%v/%v) failed: %v", self.key.Log(), self.priority, req, inBatch, inDb, err) continue LOOP } batch.Put(entry.key, entry.val) glog.V(logger.Detail).Infof("syncDb[%v/%v] to batch %v '%v' (#%v/%v/%v)", self.key.Log(), self.priority, req, entry, inBatch, inDb, counter) // if just switched to db mode and not quitting, then launch dbRead // in a parallel go routine to send deliveries from db if inDb == 0 && quit != nil { glog.V(logger.Detail).Infof("syncDb[%v/%v] start dbRead") go self.dbRead(true, counter, deliver) } inDb++ inBatch++ counter++ // need to save the batch if it gets too large (== dbBatchSize) if inBatch%int(self.dbBatchSize) == 0 { batch = self.writeSyncBatch(batch) } } glog.V(logger.Info).Infof("syncDb[%v:%v]: saved %v keys (saved counter at %v)", self.key.Log(), self.priority, inBatch, counter) close(self.done) } // writes the batch to the db and returns a new batch object func (self *syncDb) writeSyncBatch(batch *leveldb.Batch) *leveldb.Batch { err := self.db.Write(batch) if err != nil { glog.V(logger.Warn).Infof("syncDb[%v/%v] saving batch to db failed: %v", self.key.Log(), self.priority, err) return batch } return new(leveldb.Batch) } // abstract type for db entries (TODO could be a feature of Receipts) type syncDbEntry struct { key, val []byte } func (self syncDbEntry) String() string { return fmt.Sprintf("key: %x, value: %x", self.key, self.val) } /* dbRead is iterating over store requests to be sent over to the peer this is mainly to prevent crashes due to network output buffer contention (???) as well as to make syncronisation resilient to disconnects the messages are supposed to be sent in the p2p priority queue. the request DB is shared between peers, but domains for each syncdb are disjoint. dbkeys (42 bytes) are structured: * 0: 0x00 (0x01 reserved for counter key) * 1: priorities - priority (so that high priority can be replayed first) * 2-33: peers address * 34-41: syncdb counter to preserve order (this field is missing for the counter key) values (40 bytes) are: * 0-31: key * 32-39: request id dbRead needs a boolean to indicate if on first round all the historical record is synced. Second argument to indicate current db counter The third is the function to apply */ func (self *syncDb) dbRead(useBatches bool, counter uint64, fun func(interface{}, chan bool) bool) { key := make([]byte, 42) copy(key, self.start) binary.BigEndian.PutUint64(key[34:], counter) var batches, n, cnt, total int var more bool var entry *syncDbEntry var it iterator.Iterator var del *leveldb.Batch batchSizes := make(chan int) for { // if useBatches is false, cnt is not set if useBatches { // this could be called before all cnt items sent out // so that loop is not blocking while delivering // only relevant if cnt is large select { case self.batch <- batchSizes: case <-self.quit: return } // wait for the write to finish and get the item count in the next batch cnt = <-batchSizes batches++ if cnt == 0 { // empty return } } it = self.db.NewIterator() it.Seek(key) if !it.Valid() { copy(key, self.start) useBatches = true continue } del = new(leveldb.Batch) glog.V(logger.Detail).Infof("syncDb[%v/%v]: new iterator: %x (batch %v, count %v)", self.key.Log(), self.priority, key, batches, cnt) for n = 0; !useBatches || n < cnt; it.Next() { copy(key, it.Key()) if len(key) == 0 || key[0] != 0 { copy(key, self.start) useBatches = true break } val := make([]byte, 40) copy(val, it.Value()) entry = &syncDbEntry{key, val} // glog.V(logger.Detail).Infof("syncDb[%v/%v] - %v, batches: %v, total: %v, session total from db: %v/%v", self.key.Log(), self.priority, self.key.Log(), batches, total, self.dbTotal, self.total) more = fun(entry, self.quit) if !more { // quit received when waiting to deliver entry, the entry will not be deleted glog.V(logger.Detail).Infof("syncDb[%v/%v] batch %v quit after %v/%v items", self.key.Log(), self.priority, batches, n, cnt) break } // since subsequent batches of the same db session are indexed incrementally // deleting earlier batches can be delayed and parallelised // this could be batch delete when db is idle (but added complexity esp when quitting) del.Delete(key) n++ total++ } glog.V(logger.Debug).Infof("syncDb[%v/%v] - db session closed, batches: %v, total: %v, session total from db: %v/%v", self.key.Log(), self.priority, batches, total, self.dbTotal, self.total) self.db.Write(del) // this could be async called only when db is idle it.Release() } } // func (self *syncDb) stop() { close(self.quit) <-self.done } // calculate a dbkey for the request, for the db to work // see syncdb for db key structure // polimorphic: accepted types, see syncer#addRequest func (self *syncDb) newSyncDbEntry(req interface{}, counter uint64) (entry *syncDbEntry, err error) { var key storage.Key var chunk *storage.Chunk var id uint64 var ok bool var sreq *storeRequestMsgData if key, ok = req.(storage.Key); ok { id = generateId() } else if chunk, ok = req.(*storage.Chunk); ok { key = chunk.Key id = generateId() } else if sreq, ok = req.(*storeRequestMsgData); ok { key = sreq.Key id = sreq.Id } else if entry, ok = req.(*syncDbEntry); !ok { return nil, fmt.Errorf("type not allowed: %v (%T)", req, req) } // order by peer > priority > seqid // value is request id if exists if entry == nil { dbkey := make([]byte, 42) dbval := make([]byte, 40) // encode key copy(dbkey[:], self.start[:34]) // db peer binary.BigEndian.PutUint64(dbkey[34:], counter) // encode value copy(dbval, key[:]) binary.BigEndian.PutUint64(dbval[32:], id) entry = &syncDbEntry{dbkey, dbval} } return }