aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/network/syncdb.go
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/network/syncdb.go')
-rw-r--r--swarm/network/syncdb.go389
1 files changed, 0 insertions, 389 deletions
diff --git a/swarm/network/syncdb.go b/swarm/network/syncdb.go
deleted file mode 100644
index 88b4b68dd..000000000
--- a/swarm/network/syncdb.go
+++ /dev/null
@@ -1,389 +0,0 @@
-// Copyright 2016 The go-ethereum Authors
-// This file is part of the go-ethereum library.
-//
-// The go-ethereum library is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Lesser General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// The go-ethereum library is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Lesser General Public License for more details.
-//
-// You should have received a copy of the GNU Lesser General Public License
-// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
-
-package network
-
-import (
- "encoding/binary"
- "fmt"
-
- "github.com/ethereum/go-ethereum/log"
- "github.com/ethereum/go-ethereum/swarm/storage"
- "github.com/syndtr/goleveldb/leveldb"
- "github.com/syndtr/goleveldb/leveldb/iterator"
-)
-
-const counterKeyPrefix = 0x01
-
-/*
-syncDb is a queueing service for outgoing deliveries.
-One instance per priority queue for each peer
-
-a syncDb instance maintains an in-memory buffer (of capacity bufferSize)
-once its in-memory buffer is full it switches to persisting in db
-and dbRead iterator iterates through the items keeping their order
-once the db read catches up (there is no more items in the db) then
-it switches back to in-memory buffer.
-
-when syncdb is stopped all items in the buffer are saved to the db
-*/
-type syncDb struct {
- start []byte // this syncdb starting index in requestdb
- key storage.Key // remote peers address key
- counterKey []byte // db key to persist counter
- priority uint // priotity High|Medium|Low
- buffer chan interface{} // incoming request channel
- db *storage.LDBDatabase // underlying db (TODO should be interface)
- done chan bool // chan to signal goroutines finished quitting
- quit chan bool // chan to signal quitting to goroutines
- total, dbTotal int // counts for one session
- batch chan chan int // channel for batch requests
- dbBatchSize uint // number of items before batch is saved
-}
-
-// constructor needs a shared request db (leveldb)
-// priority is used in the index key
-// uses a buffer and a leveldb for persistent storage
-// bufferSize, dbBatchSize are config parameters
-func newSyncDb(db *storage.LDBDatabase, key storage.Key, priority uint, bufferSize, dbBatchSize uint, deliver func(interface{}, chan bool) bool) *syncDb {
- start := make([]byte, 42)
- start[1] = byte(priorities - priority)
- copy(start[2:34], key)
-
- counterKey := make([]byte, 34)
- counterKey[0] = counterKeyPrefix
- copy(counterKey[1:], start[1:34])
-
- syncdb := &syncDb{
- start: start,
- key: key,
- counterKey: counterKey,
- priority: priority,
- buffer: make(chan interface{}, bufferSize),
- db: db,
- done: make(chan bool),
- quit: make(chan bool),
- batch: make(chan chan int),
- dbBatchSize: dbBatchSize,
- }
- log.Trace(fmt.Sprintf("syncDb[peer: %v, priority: %v] - initialised", key.Log(), priority))
-
- // starts the main forever loop reading from buffer
- go syncdb.bufferRead(deliver)
- return syncdb
-}
-
-/*
-bufferRead is a forever iterator loop that takes care of delivering
-outgoing store requests reads from incoming buffer
-
-its argument is the deliver function taking the item as first argument
-and a quit channel as second.
-Closing of this channel is supposed to abort all waiting for delivery
-(typically network write)
-
-The iteration switches between 2 modes,
-* buffer mode reads the in-memory buffer and delivers the items directly
-* db mode reads from the buffer and writes to the db, parallelly another
-routine is started that reads from the db and delivers items
-
-If there is buffer contention in buffer mode (slow network, high upload volume)
-syncdb switches to db mode and starts dbRead
-Once db backlog is delivered, it reverts back to in-memory buffer
-
-It is automatically started when syncdb is initialised.
-
-It saves the buffer to db upon receiving quit signal. syncDb#stop()
-*/
-func (self *syncDb) bufferRead(deliver func(interface{}, chan bool) bool) {
- var buffer, db chan interface{} // channels representing the two read modes
- var more bool
- var req interface{}
- var entry *syncDbEntry
- var inBatch, inDb int
- batch := new(leveldb.Batch)
- var dbSize chan int
- quit := self.quit
- counterValue := make([]byte, 8)
-
- // counter is used for keeping the items in order, persisted to db
- // start counter where db was at, 0 if not found
- data, err := self.db.Get(self.counterKey)
- var counter uint64
- if err == nil {
- counter = binary.BigEndian.Uint64(data)
- log.Trace(fmt.Sprintf("syncDb[%v/%v] - counter read from db at %v", self.key.Log(), self.priority, counter))
- } else {
- log.Trace(fmt.Sprintf("syncDb[%v/%v] - counter starts at %v", self.key.Log(), self.priority, counter))
- }
-
-LOOP:
- for {
- // waiting for item next in the buffer, or quit signal or batch request
- select {
- // buffer only closes when writing to db
- case req = <-buffer:
- // deliver request : this is blocking on network write so
- // it is passed the quit channel as argument, so that it returns
- // if syncdb is stopped. In this case we need to save the item to the db
- more = deliver(req, self.quit)
- if !more {
- log.Debug(fmt.Sprintf("syncDb[%v/%v] quit: switching to db. session tally (db/total): %v/%v", self.key.Log(), self.priority, self.dbTotal, self.total))
- // received quit signal, save request currently waiting delivery
- // by switching to db mode and closing the buffer
- buffer = nil
- db = self.buffer
- close(db)
- quit = nil // needs to block the quit case in select
- break // break from select, this item will be written to the db
- }
- self.total++
- log.Trace(fmt.Sprintf("syncDb[%v/%v] deliver (db/total): %v/%v", self.key.Log(), self.priority, self.dbTotal, self.total))
- // by the time deliver returns, there were new writes to the buffer
- // if buffer contention is detected, switch to db mode which drains
- // the buffer so no process will block on pushing store requests
- if len(buffer) == cap(buffer) {
- log.Debug(fmt.Sprintf("syncDb[%v/%v] buffer full %v: switching to db. session tally (db/total): %v/%v", self.key.Log(), self.priority, cap(buffer), self.dbTotal, self.total))
- buffer = nil
- db = self.buffer
- }
- continue LOOP
-
- // incoming entry to put into db
- case req, more = <-db:
- if !more {
- // only if quit is called, saved all the buffer
- binary.BigEndian.PutUint64(counterValue, counter)
- batch.Put(self.counterKey, counterValue) // persist counter in batch
- self.writeSyncBatch(batch) // save batch
- log.Trace(fmt.Sprintf("syncDb[%v/%v] quitting: save current batch to db", self.key.Log(), self.priority))
- break LOOP
- }
- self.dbTotal++
- self.total++
- // otherwise break after select
- case dbSize = <-self.batch:
- // explicit request for batch
- if inBatch == 0 && quit != nil {
- // there was no writes since the last batch so db depleted
- // switch to buffer mode
- log.Debug(fmt.Sprintf("syncDb[%v/%v] empty db: switching to buffer", self.key.Log(), self.priority))
- db = nil
- buffer = self.buffer
- dbSize <- 0 // indicates to 'caller' that batch has been written
- inDb = 0
- continue LOOP
- }
- binary.BigEndian.PutUint64(counterValue, counter)
- batch.Put(self.counterKey, counterValue)
- log.Debug(fmt.Sprintf("syncDb[%v/%v] write batch %v/%v - %x - %x", self.key.Log(), self.priority, inBatch, counter, self.counterKey, counterValue))
- batch = self.writeSyncBatch(batch)
- dbSize <- inBatch // indicates to 'caller' that batch has been written
- inBatch = 0
- continue LOOP
-
- // closing syncDb#quit channel is used to signal to all goroutines to quit
- case <-quit:
- // need to save backlog, so switch to db mode
- db = self.buffer
- buffer = nil
- quit = nil
- log.Trace(fmt.Sprintf("syncDb[%v/%v] quitting: save buffer to db", self.key.Log(), self.priority))
- close(db)
- continue LOOP
- }
-
- // only get here if we put req into db
- entry, err = self.newSyncDbEntry(req, counter)
- if err != nil {
- log.Warn(fmt.Sprintf("syncDb[%v/%v] saving request %v (#%v/%v) failed: %v", self.key.Log(), self.priority, req, inBatch, inDb, err))
- continue LOOP
- }
- batch.Put(entry.key, entry.val)
- log.Trace(fmt.Sprintf("syncDb[%v/%v] to batch %v '%v' (#%v/%v/%v)", self.key.Log(), self.priority, req, entry, inBatch, inDb, counter))
- // if just switched to db mode and not quitting, then launch dbRead
- // in a parallel go routine to send deliveries from db
- if inDb == 0 && quit != nil {
- log.Trace(fmt.Sprintf("syncDb[%v/%v] start dbRead", self.key.Log(), self.priority))
- go self.dbRead(true, counter, deliver)
- }
- inDb++
- inBatch++
- counter++
- // need to save the batch if it gets too large (== dbBatchSize)
- if inBatch%int(self.dbBatchSize) == 0 {
- batch = self.writeSyncBatch(batch)
- }
- }
- log.Info(fmt.Sprintf("syncDb[%v:%v]: saved %v keys (saved counter at %v)", self.key.Log(), self.priority, inBatch, counter))
- close(self.done)
-}
-
-// writes the batch to the db and returns a new batch object
-func (self *syncDb) writeSyncBatch(batch *leveldb.Batch) *leveldb.Batch {
- err := self.db.Write(batch)
- if err != nil {
- log.Warn(fmt.Sprintf("syncDb[%v/%v] saving batch to db failed: %v", self.key.Log(), self.priority, err))
- return batch
- }
- return new(leveldb.Batch)
-}
-
-// abstract type for db entries (TODO could be a feature of Receipts)
-type syncDbEntry struct {
- key, val []byte
-}
-
-func (self syncDbEntry) String() string {
- return fmt.Sprintf("key: %x, value: %x", self.key, self.val)
-}
-
-/*
- dbRead is iterating over store requests to be sent over to the peer
- this is mainly to prevent crashes due to network output buffer contention (???)
- as well as to make syncronisation resilient to disconnects
- the messages are supposed to be sent in the p2p priority queue.
-
- the request DB is shared between peers, but domains for each syncdb
- are disjoint. dbkeys (42 bytes) are structured:
- * 0: 0x00 (0x01 reserved for counter key)
- * 1: priorities - priority (so that high priority can be replayed first)
- * 2-33: peers address
- * 34-41: syncdb counter to preserve order (this field is missing for the counter key)
-
- values (40 bytes) are:
- * 0-31: key
- * 32-39: request id
-
-dbRead needs a boolean to indicate if on first round all the historical
-record is synced. Second argument to indicate current db counter
-The third is the function to apply
-*/
-func (self *syncDb) dbRead(useBatches bool, counter uint64, fun func(interface{}, chan bool) bool) {
- key := make([]byte, 42)
- copy(key, self.start)
- binary.BigEndian.PutUint64(key[34:], counter)
- var batches, n, cnt, total int
- var more bool
- var entry *syncDbEntry
- var it iterator.Iterator
- var del *leveldb.Batch
- batchSizes := make(chan int)
-
- for {
- // if useBatches is false, cnt is not set
- if useBatches {
- // this could be called before all cnt items sent out
- // so that loop is not blocking while delivering
- // only relevant if cnt is large
- select {
- case self.batch <- batchSizes:
- case <-self.quit:
- return
- }
- // wait for the write to finish and get the item count in the next batch
- cnt = <-batchSizes
- batches++
- if cnt == 0 {
- // empty
- return
- }
- }
- it = self.db.NewIterator()
- it.Seek(key)
- if !it.Valid() {
- copy(key, self.start)
- useBatches = true
- continue
- }
- del = new(leveldb.Batch)
- log.Trace(fmt.Sprintf("syncDb[%v/%v]: new iterator: %x (batch %v, count %v)", self.key.Log(), self.priority, key, batches, cnt))
-
- for n = 0; !useBatches || n < cnt; it.Next() {
- copy(key, it.Key())
- if len(key) == 0 || key[0] != 0 {
- copy(key, self.start)
- useBatches = true
- break
- }
- val := make([]byte, 40)
- copy(val, it.Value())
- entry = &syncDbEntry{key, val}
- // log.Trace(fmt.Sprintf("syncDb[%v/%v] - %v, batches: %v, total: %v, session total from db: %v/%v", self.key.Log(), self.priority, self.key.Log(), batches, total, self.dbTotal, self.total))
- more = fun(entry, self.quit)
- if !more {
- // quit received when waiting to deliver entry, the entry will not be deleted
- log.Trace(fmt.Sprintf("syncDb[%v/%v] batch %v quit after %v/%v items", self.key.Log(), self.priority, batches, n, cnt))
- break
- }
- // since subsequent batches of the same db session are indexed incrementally
- // deleting earlier batches can be delayed and parallelised
- // this could be batch delete when db is idle (but added complexity esp when quitting)
- del.Delete(key)
- n++
- total++
- }
- log.Debug(fmt.Sprintf("syncDb[%v/%v] - db session closed, batches: %v, total: %v, session total from db: %v/%v", self.key.Log(), self.priority, batches, total, self.dbTotal, self.total))
- self.db.Write(del) // this could be async called only when db is idle
- it.Release()
- }
-}
-
-//
-func (self *syncDb) stop() {
- close(self.quit)
- <-self.done
-}
-
-// calculate a dbkey for the request, for the db to work
-// see syncdb for db key structure
-// polimorphic: accepted types, see syncer#addRequest
-func (self *syncDb) newSyncDbEntry(req interface{}, counter uint64) (entry *syncDbEntry, err error) {
- var key storage.Key
- var chunk *storage.Chunk
- var id uint64
- var ok bool
- var sreq *storeRequestMsgData
-
- if key, ok = req.(storage.Key); ok {
- id = generateId()
- } else if chunk, ok = req.(*storage.Chunk); ok {
- key = chunk.Key
- id = generateId()
- } else if sreq, ok = req.(*storeRequestMsgData); ok {
- key = sreq.Key
- id = sreq.Id
- } else if entry, ok = req.(*syncDbEntry); !ok {
- return nil, fmt.Errorf("type not allowed: %v (%T)", req, req)
- }
-
- // order by peer > priority > seqid
- // value is request id if exists
- if entry == nil {
- dbkey := make([]byte, 42)
- dbval := make([]byte, 40)
-
- // encode key
- copy(dbkey[:], self.start[:34]) // db peer
- binary.BigEndian.PutUint64(dbkey[34:], counter)
- // encode value
- copy(dbval, key[:])
- binary.BigEndian.PutUint64(dbval[32:], id)
-
- entry = &syncDbEntry{dbkey, dbval}
- }
- return
-}