aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/network/syncer.go
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/network/syncer.go')
-rw-r--r--swarm/network/syncer.go781
1 files changed, 0 insertions, 781 deletions
diff --git a/swarm/network/syncer.go b/swarm/network/syncer.go
deleted file mode 100644
index 6d729fcb9..000000000
--- a/swarm/network/syncer.go
+++ /dev/null
@@ -1,781 +0,0 @@
-// Copyright 2016 The go-ethereum Authors
-// This file is part of the go-ethereum library.
-//
-// The go-ethereum library is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Lesser General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// The go-ethereum library is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Lesser General Public License for more details.
-//
-// You should have received a copy of the GNU Lesser General Public License
-// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
-
-package network
-
-import (
- "encoding/binary"
- "encoding/json"
- "fmt"
- "path/filepath"
-
- "github.com/ethereum/go-ethereum/log"
- "github.com/ethereum/go-ethereum/swarm/storage"
-)
-
-// syncer parameters (global, not peer specific) default values
-const (
- requestDbBatchSize = 512 // size of batch before written to request db
- keyBufferSize = 1024 // size of buffer for unsynced keys
- syncBatchSize = 128 // maximum batchsize for outgoing requests
- syncBufferSize = 128 // size of buffer for delivery requests
- syncCacheSize = 1024 // cache capacity to store request queue in memory
-)
-
-// priorities
-const (
- Low = iota // 0
- Medium // 1
- High // 2
- priorities // 3 number of priority levels
-)
-
-// request types
-const (
- DeliverReq = iota // 0
- PushReq // 1
- PropagateReq // 2
- HistoryReq // 3
- BacklogReq // 4
-)
-
-// json serialisable struct to record the syncronisation state between 2 peers
-type syncState struct {
- *storage.DbSyncState // embeds the following 4 fields:
- // Start Key // lower limit of address space
- // Stop Key // upper limit of address space
- // First uint64 // counter taken from last sync state
- // Last uint64 // counter of remote peer dbStore at the time of last connection
- SessionAt uint64 // set at the time of connection
- LastSeenAt uint64 // set at the time of connection
- Latest storage.Key // cursor of dbstore when last (continuously set by syncer)
- Synced bool // true iff Sync is done up to the last disconnect
- synced chan bool // signal that sync stage finished
-}
-
-// wrapper of db-s to provide mockable custom local chunk store access to syncer
-type DbAccess struct {
- db *storage.DbStore
- loc *storage.LocalStore
-}
-
-func NewDbAccess(loc *storage.LocalStore) *DbAccess {
- return &DbAccess{loc.DbStore.(*storage.DbStore), loc}
-}
-
-// to obtain the chunks from key or request db entry only
-func (self *DbAccess) get(key storage.Key) (*storage.Chunk, error) {
- return self.loc.Get(key)
-}
-
-// current storage counter of chunk db
-func (self *DbAccess) counter() uint64 {
- return self.db.Counter()
-}
-
-// implemented by dbStoreSyncIterator
-type keyIterator interface {
- Next() storage.Key
-}
-
-// generator function for iteration by address range and storage counter
-func (self *DbAccess) iterator(s *syncState) keyIterator {
- it, err := self.db.NewSyncIterator(*(s.DbSyncState))
- if err != nil {
- return nil
- }
- return keyIterator(it)
-}
-
-func (self syncState) String() string {
- if self.Synced {
- return fmt.Sprintf(
- "session started at: %v, last seen at: %v, latest key: %v",
- self.SessionAt, self.LastSeenAt,
- self.Latest.Log(),
- )
- } else {
- return fmt.Sprintf(
- "address: %v-%v, index: %v-%v, session started at: %v, last seen at: %v, latest key: %v",
- self.Start.Log(), self.Stop.Log(),
- self.First, self.Last,
- self.SessionAt, self.LastSeenAt,
- self.Latest.Log(),
- )
- }
-}
-
-// syncer parameters (global, not peer specific)
-type SyncParams struct {
- RequestDbPath string // path for request db (leveldb)
- RequestDbBatchSize uint // nuber of items before batch is saved to requestdb
- KeyBufferSize uint // size of key buffer
- SyncBatchSize uint // maximum batchsize for outgoing requests
- SyncBufferSize uint // size of buffer for
- SyncCacheSize uint // cache capacity to store request queue in memory
- SyncPriorities []uint // list of priority levels for req types 0-3
- SyncModes []bool // list of sync modes for for req types 0-3
-}
-
-// constructor with default values
-func NewDefaultSyncParams() *SyncParams {
- return &SyncParams{
- RequestDbBatchSize: requestDbBatchSize,
- KeyBufferSize: keyBufferSize,
- SyncBufferSize: syncBufferSize,
- SyncBatchSize: syncBatchSize,
- SyncCacheSize: syncCacheSize,
- SyncPriorities: []uint{High, Medium, Medium, Low, Low},
- SyncModes: []bool{true, true, true, true, false},
- }
-}
-
-//this can only finally be set after all config options (file, cmd line, env vars)
-//have been evaluated
-func (self *SyncParams) Init(path string) {
- self.RequestDbPath = filepath.Join(path, "requests")
-}
-
-// syncer is the agent that manages content distribution/storage replication/chunk storeRequest forwarding
-type syncer struct {
- *SyncParams // sync parameters
- syncF func() bool // if syncing is needed
- key storage.Key // remote peers address key
- state *syncState // sync state for our dbStore
- syncStates chan *syncState // different stages of sync
- deliveryRequest chan bool // one of two triggers needed to send unsyncedKeys
- newUnsyncedKeys chan bool // one of two triggers needed to send unsynced keys
- quit chan bool // signal to quit loops
-
- // DB related fields
- dbAccess *DbAccess // access to dbStore
-
- // native fields
- queues [priorities]*syncDb // in-memory cache / queues for sync reqs
- keys [priorities]chan interface{} // buffer for unsynced keys
- deliveries [priorities]chan *storeRequestMsgData // delivery
-
- // bzz protocol instance outgoing message callbacks (mockable for testing)
- unsyncedKeys func([]*syncRequest, *syncState) error // send unsyncedKeysMsg
- store func(*storeRequestMsgData) error // send storeRequestMsg
-}
-
-// a syncer instance is linked to each peer connection
-// constructor is called from protocol after successful handshake
-// the returned instance is attached to the peer and can be called
-// by the forwarder
-func newSyncer(
- db *storage.LDBDatabase, remotekey storage.Key,
- dbAccess *DbAccess,
- unsyncedKeys func([]*syncRequest, *syncState) error,
- store func(*storeRequestMsgData) error,
- params *SyncParams,
- state *syncState,
- syncF func() bool,
-) (*syncer, error) {
-
- syncBufferSize := params.SyncBufferSize
- keyBufferSize := params.KeyBufferSize
- dbBatchSize := params.RequestDbBatchSize
-
- self := &syncer{
- syncF: syncF,
- key: remotekey,
- dbAccess: dbAccess,
- syncStates: make(chan *syncState, 20),
- deliveryRequest: make(chan bool, 1),
- newUnsyncedKeys: make(chan bool, 1),
- SyncParams: params,
- state: state,
- quit: make(chan bool),
- unsyncedKeys: unsyncedKeys,
- store: store,
- }
-
- // initialising
- for i := 0; i < priorities; i++ {
- self.keys[i] = make(chan interface{}, keyBufferSize)
- self.deliveries[i] = make(chan *storeRequestMsgData)
- // initialise a syncdb instance for each priority queue
- self.queues[i] = newSyncDb(db, remotekey, uint(i), syncBufferSize, dbBatchSize, self.deliver(uint(i)))
- }
- log.Info(fmt.Sprintf("syncer started: %v", state))
- // launch chunk delivery service
- go self.syncDeliveries()
- // launch sync task manager
- if self.syncF() {
- go self.sync()
- }
- // process unsynced keys to broadcast
- go self.syncUnsyncedKeys()
-
- return self, nil
-}
-
-// metadata serialisation
-func encodeSync(state *syncState) (*json.RawMessage, error) {
- data, err := json.MarshalIndent(state, "", " ")
- if err != nil {
- return nil, err
- }
- meta := json.RawMessage(data)
- return &meta, nil
-}
-
-func decodeSync(meta *json.RawMessage) (*syncState, error) {
- if meta == nil {
- return nil, fmt.Errorf("unable to deserialise sync state from <nil>")
- }
- data := []byte(*(meta))
- if len(data) == 0 {
- return nil, fmt.Errorf("unable to deserialise sync state from <nil>")
- }
- state := &syncState{DbSyncState: &storage.DbSyncState{}}
- err := json.Unmarshal(data, state)
- return state, err
-}
-
-/*
- sync implements the syncing script
- * first all items left in the request Db are replayed
- * type = StaleSync
- * Mode: by default once again via confirmation roundtrip
- * Priority: the items are replayed as the proirity specified for StaleSync
- * but within the order respects earlier priority level of request
- * after all items are consumed for a priority level, the the respective
- queue for delivery requests is open (this way new reqs not written to db)
- (TODO: this should be checked)
- * the sync state provided by the remote peer is used to sync history
- * all the backlog from earlier (aborted) syncing is completed starting from latest
- * if Last < LastSeenAt then all items in between then process all
- backlog from upto last disconnect
- * if Last > 0 &&
-
- sync is called from the syncer constructor and is not supposed to be used externally
-*/
-func (self *syncer) sync() {
- state := self.state
- // sync finished
- defer close(self.syncStates)
-
- // 0. first replay stale requests from request db
- if state.SessionAt == 0 {
- log.Debug(fmt.Sprintf("syncer[%v]: nothing to sync", self.key.Log()))
- return
- }
- log.Debug(fmt.Sprintf("syncer[%v]: start replaying stale requests from request db", self.key.Log()))
- for p := priorities - 1; p >= 0; p-- {
- self.queues[p].dbRead(false, 0, self.replay())
- }
- log.Debug(fmt.Sprintf("syncer[%v]: done replaying stale requests from request db", self.key.Log()))
-
- // unless peer is synced sync unfinished history beginning on
- if !state.Synced {
- start := state.Start
-
- if !storage.IsZeroKey(state.Latest) {
- // 1. there is unfinished earlier sync
- state.Start = state.Latest
- log.Debug(fmt.Sprintf("syncer[%v]: start syncronising backlog (unfinished sync: %v)", self.key.Log(), state))
- // blocks while the entire history upto state is synced
- self.syncState(state)
- if state.Last < state.SessionAt {
- state.First = state.Last + 1
- }
- }
- state.Latest = storage.ZeroKey
- state.Start = start
- // 2. sync up to last disconnect1
- if state.First < state.LastSeenAt {
- state.Last = state.LastSeenAt
- log.Debug(fmt.Sprintf("syncer[%v]: start syncronising history upto last disconnect at %v: %v", self.key.Log(), state.LastSeenAt, state))
- self.syncState(state)
- state.First = state.LastSeenAt
- }
- state.Latest = storage.ZeroKey
-
- } else {
- // synchronisation starts at end of last session
- state.First = state.LastSeenAt
- }
-
- // 3. sync up to current session start
- // if there have been new chunks since last session
- if state.LastSeenAt < state.SessionAt {
- state.Last = state.SessionAt
- log.Debug(fmt.Sprintf("syncer[%v]: start syncronising history since last disconnect at %v up until session start at %v: %v", self.key.Log(), state.LastSeenAt, state.SessionAt, state))
- // blocks until state syncing is finished
- self.syncState(state)
- }
- log.Info(fmt.Sprintf("syncer[%v]: syncing all history complete", self.key.Log()))
-
-}
-
-// wait till syncronised block uptil state is synced
-func (self *syncer) syncState(state *syncState) {
- self.syncStates <- state
- select {
- case <-state.synced:
- case <-self.quit:
- }
-}
-
-// stop quits both request processor and saves the request cache to disk
-func (self *syncer) stop() {
- close(self.quit)
- log.Trace(fmt.Sprintf("syncer[%v]: stop and save sync request db backlog", self.key.Log()))
- for _, db := range self.queues {
- db.stop()
- }
-}
-
-// rlp serialisable sync request
-type syncRequest struct {
- Key storage.Key
- Priority uint
-}
-
-func (self *syncRequest) String() string {
- return fmt.Sprintf("<Key: %v, Priority: %v>", self.Key.Log(), self.Priority)
-}
-
-func (self *syncer) newSyncRequest(req interface{}, p int) (*syncRequest, error) {
- key, _, _, _, err := parseRequest(req)
- // TODO: if req has chunk, it should be put in a cache
- // create
- if err != nil {
- return nil, err
- }
- return &syncRequest{key, uint(p)}, nil
-}
-
-// serves historical items from the DB
-// * read is on demand, blocking unless history channel is read
-// * accepts sync requests (syncStates) to create new db iterator
-// * closes the channel one iteration finishes
-func (self *syncer) syncHistory(state *syncState) chan interface{} {
- var n uint
- history := make(chan interface{})
- log.Debug(fmt.Sprintf("syncer[%v]: syncing history between %v - %v for chunk addresses %v - %v", self.key.Log(), state.First, state.Last, state.Start, state.Stop))
- it := self.dbAccess.iterator(state)
- if it != nil {
- go func() {
- // signal end of the iteration ended
- defer close(history)
- IT:
- for {
- key := it.Next()
- if key == nil {
- break IT
- }
- select {
- // blocking until history channel is read from
- case history <- key:
- n++
- log.Trace(fmt.Sprintf("syncer[%v]: history: %v (%v keys)", self.key.Log(), key.Log(), n))
- state.Latest = key
- case <-self.quit:
- return
- }
- }
- log.Debug(fmt.Sprintf("syncer[%v]: finished syncing history between %v - %v for chunk addresses %v - %v (at %v) (chunks = %v)", self.key.Log(), state.First, state.Last, state.Start, state.Stop, state.Latest, n))
- }()
- }
- return history
-}
-
-// triggers key syncronisation
-func (self *syncer) sendUnsyncedKeys() {
- select {
- case self.deliveryRequest <- true:
- default:
- }
-}
-
-// assembles a new batch of unsynced keys
-// * keys are drawn from the key buffers in order of priority queue
-// * if the queues of priority for History (HistoryReq) or higher are depleted,
-// historical data is used so historical items are lower priority within
-// their priority group.
-// * Order of historical data is unspecified
-func (self *syncer) syncUnsyncedKeys() {
- // send out new
- var unsynced []*syncRequest
- var more, justSynced bool
- var keyCount, historyCnt int
- var history chan interface{}
-
- priority := High
- keys := self.keys[priority]
- var newUnsyncedKeys, deliveryRequest chan bool
- keyCounts := make([]int, priorities)
- histPrior := self.SyncPriorities[HistoryReq]
- syncStates := self.syncStates
- state := self.state
-
-LOOP:
- for {
-
- var req interface{}
- // select the highest priority channel to read from
- // keys channels are buffered so the highest priority ones
- // are checked first - integrity can only be guaranteed if writing
- // is locked while selecting
- if priority != High || len(keys) == 0 {
- // selection is not needed if the High priority queue has items
- keys = nil
- PRIORITIES:
- for priority = High; priority >= 0; priority-- {
- // the first priority channel that is non-empty will be assigned to keys
- if len(self.keys[priority]) > 0 {
- log.Trace(fmt.Sprintf("syncer[%v]: reading request with priority %v", self.key.Log(), priority))
- keys = self.keys[priority]
- break PRIORITIES
- }
- log.Trace(fmt.Sprintf("syncer[%v/%v]: queue: [%v, %v, %v]", self.key.Log(), priority, len(self.keys[High]), len(self.keys[Medium]), len(self.keys[Low])))
- // if the input queue is empty on this level, resort to history if there is any
- if uint(priority) == histPrior && history != nil {
- log.Trace(fmt.Sprintf("syncer[%v]: reading history for %v", self.key.Log(), self.key))
- keys = history
- break PRIORITIES
- }
- }
- }
-
- // if peer ready to receive but nothing to send
- if keys == nil && deliveryRequest == nil {
- // if no items left and switch to waiting mode
- log.Trace(fmt.Sprintf("syncer[%v]: buffers consumed. Waiting", self.key.Log()))
- newUnsyncedKeys = self.newUnsyncedKeys
- }
-
- // send msg iff
- // * peer is ready to receive keys AND (
- // * all queues and history are depleted OR
- // * batch full OR
- // * all history have been consumed, synced)
- if deliveryRequest == nil &&
- (justSynced ||
- len(unsynced) > 0 && keys == nil ||
- len(unsynced) == int(self.SyncBatchSize)) {
- justSynced = false
- // listen to requests
- deliveryRequest = self.deliveryRequest
- newUnsyncedKeys = nil // not care about data until next req comes in
- // set sync to current counter
- // (all nonhistorical outgoing traffic sheduled and persisted
- state.LastSeenAt = self.dbAccess.counter()
- state.Latest = storage.ZeroKey
- log.Trace(fmt.Sprintf("syncer[%v]: sending %v", self.key.Log(), unsynced))
- // send the unsynced keys
- stateCopy := *state
- err := self.unsyncedKeys(unsynced, &stateCopy)
- if err != nil {
- log.Warn(fmt.Sprintf("syncer[%v]: unable to send unsynced keys: %v", self.key.Log(), err))
- }
- self.state = state
- log.Debug(fmt.Sprintf("syncer[%v]: --> %v keys sent: (total: %v (%v), history: %v), sent sync state: %v", self.key.Log(), len(unsynced), keyCounts, keyCount, historyCnt, stateCopy))
- unsynced = nil
- keys = nil
- }
-
- // process item and add it to the batch
- select {
- case <-self.quit:
- break LOOP
- case req, more = <-keys:
- if keys == history && !more {
- log.Trace(fmt.Sprintf("syncer[%v]: syncing history segment complete", self.key.Log()))
- // history channel is closed, waiting for new state (called from sync())
- syncStates = self.syncStates
- state.Synced = true // this signals that the current segment is complete
- select {
- case state.synced <- false:
- case <-self.quit:
- break LOOP
- }
- justSynced = true
- history = nil
- }
- case <-deliveryRequest:
- log.Trace(fmt.Sprintf("syncer[%v]: peer ready to receive", self.key.Log()))
-
- // this 1 cap channel can wake up the loop
- // signaling that peer is ready to receive unsynced Keys
- // the channel is set to nil any further writes will be ignored
- deliveryRequest = nil
-
- case <-newUnsyncedKeys:
- log.Trace(fmt.Sprintf("syncer[%v]: new unsynced keys available", self.key.Log()))
- // this 1 cap channel can wake up the loop
- // signals that data is available to send if peer is ready to receive
- newUnsyncedKeys = nil
- keys = self.keys[High]
-
- case state, more = <-syncStates:
- // this resets the state
- if !more {
- state = self.state
- log.Trace(fmt.Sprintf("syncer[%v]: (priority %v) syncing complete upto %v)", self.key.Log(), priority, state))
- state.Synced = true
- syncStates = nil
- } else {
- log.Trace(fmt.Sprintf("syncer[%v]: (priority %v) syncing history upto %v priority %v)", self.key.Log(), priority, state, histPrior))
- state.Synced = false
- history = self.syncHistory(state)
- // only one history at a time, only allow another one once the
- // history channel is closed
- syncStates = nil
- }
- }
- if req == nil {
- continue LOOP
- }
-
- log.Trace(fmt.Sprintf("syncer[%v]: (priority %v) added to unsynced keys: %v", self.key.Log(), priority, req))
- keyCounts[priority]++
- keyCount++
- if keys == history {
- log.Trace(fmt.Sprintf("syncer[%v]: (priority %v) history item %v (synced = %v)", self.key.Log(), priority, req, state.Synced))
- historyCnt++
- }
- if sreq, err := self.newSyncRequest(req, priority); err == nil {
- // extract key from req
- log.Trace(fmt.Sprintf("syncer[%v]: (priority %v): request %v (synced = %v)", self.key.Log(), priority, req, state.Synced))
- unsynced = append(unsynced, sreq)
- } else {
- log.Warn(fmt.Sprintf("syncer[%v]: (priority %v): error creating request for %v: %v)", self.key.Log(), priority, req, err))
- }
-
- }
-}
-
-// delivery loop
-// takes into account priority, send store Requests with chunk (delivery)
-// idle blocking if no new deliveries in any of the queues
-func (self *syncer) syncDeliveries() {
- var req *storeRequestMsgData
- p := High
- var deliveries chan *storeRequestMsgData
- var msg *storeRequestMsgData
- var err error
- var c = [priorities]int{}
- var n = [priorities]int{}
- var total, success uint
-
- for {
- deliveries = self.deliveries[p]
- select {
- case req = <-deliveries:
- n[p]++
- c[p]++
- default:
- if p == Low {
- // blocking, depletion on all channels, no preference for priority
- select {
- case req = <-self.deliveries[High]:
- n[High]++
- case req = <-self.deliveries[Medium]:
- n[Medium]++
- case req = <-self.deliveries[Low]:
- n[Low]++
- case <-self.quit:
- return
- }
- p = High
- } else {
- p--
- continue
- }
- }
- total++
- msg, err = self.newStoreRequestMsgData(req)
- if err != nil {
- log.Warn(fmt.Sprintf("syncer[%v]: failed to create store request for %v: %v", self.key.Log(), req, err))
- } else {
- err = self.store(msg)
- if err != nil {
- log.Warn(fmt.Sprintf("syncer[%v]: failed to deliver %v: %v", self.key.Log(), req, err))
- } else {
- success++
- log.Trace(fmt.Sprintf("syncer[%v]: %v successfully delivered", self.key.Log(), req))
- }
- }
- if total%self.SyncBatchSize == 0 {
- log.Debug(fmt.Sprintf("syncer[%v]: deliver Total: %v, Success: %v, High: %v/%v, Medium: %v/%v, Low %v/%v", self.key.Log(), total, success, c[High], n[High], c[Medium], n[Medium], c[Low], n[Low]))
- }
- }
-}
-
-/*
- addRequest handles requests for delivery
- it accepts 4 types:
-
- * storeRequestMsgData: coming from netstore propagate response
- * chunk: coming from forwarding (questionable: id?)
- * key: from incoming syncRequest
- * syncDbEntry: key,id encoded in db
-
- If sync mode is on for the type of request, then
- it sends the request to the keys queue of the correct priority
- channel buffered with capacity (SyncBufferSize)
-
- If sync mode is off then, requests are directly sent to deliveries
-*/
-func (self *syncer) addRequest(req interface{}, ty int) {
- // retrieve priority for request type name int8
-
- priority := self.SyncPriorities[ty]
- // sync mode for this type ON
- if self.syncF() || ty == DeliverReq {
- if self.SyncModes[ty] {
- self.addKey(req, priority, self.quit)
- } else {
- self.addDelivery(req, priority, self.quit)
- }
- }
-}
-
-// addKey queues sync request for sync confirmation with given priority
-// ie the key will go out in an unsyncedKeys message
-func (self *syncer) addKey(req interface{}, priority uint, quit chan bool) bool {
- select {
- case self.keys[priority] <- req:
- // this wakes up the unsynced keys loop if idle
- select {
- case self.newUnsyncedKeys <- true:
- default:
- }
- return true
- case <-quit:
- return false
- }
-}
-
-// addDelivery queues delivery request for with given priority
-// ie the chunk will be delivered ASAP mod priority queueing handled by syncdb
-// requests are persisted across sessions for correct sync
-func (self *syncer) addDelivery(req interface{}, priority uint, quit chan bool) bool {
- select {
- case self.queues[priority].buffer <- req:
- return true
- case <-quit:
- return false
- }
-}
-
-// doDelivery delivers the chunk for the request with given priority
-// without queuing
-func (self *syncer) doDelivery(req interface{}, priority uint, quit chan bool) bool {
- msgdata, err := self.newStoreRequestMsgData(req)
- if err != nil {
- log.Warn(fmt.Sprintf("unable to deliver request %v: %v", msgdata, err))
- return false
- }
- select {
- case self.deliveries[priority] <- msgdata:
- return true
- case <-quit:
- return false
- }
-}
-
-// returns the delivery function for given priority
-// passed on to syncDb
-func (self *syncer) deliver(priority uint) func(req interface{}, quit chan bool) bool {
- return func(req interface{}, quit chan bool) bool {
- return self.doDelivery(req, priority, quit)
- }
-}
-
-// returns the replay function passed on to syncDb
-// depending on sync mode settings for BacklogReq,
-// re play of request db backlog sends items via confirmation
-// or directly delivers
-func (self *syncer) replay() func(req interface{}, quit chan bool) bool {
- sync := self.SyncModes[BacklogReq]
- priority := self.SyncPriorities[BacklogReq]
- // sync mode for this type ON
- if sync {
- return func(req interface{}, quit chan bool) bool {
- return self.addKey(req, priority, quit)
- }
- } else {
- return func(req interface{}, quit chan bool) bool {
- return self.doDelivery(req, priority, quit)
- }
-
- }
-}
-
-// given a request, extends it to a full storeRequestMsgData
-// polimorphic: see addRequest for the types accepted
-func (self *syncer) newStoreRequestMsgData(req interface{}) (*storeRequestMsgData, error) {
-
- key, id, chunk, sreq, err := parseRequest(req)
- if err != nil {
- return nil, err
- }
-
- if sreq == nil {
- if chunk == nil {
- var err error
- chunk, err = self.dbAccess.get(key)
- if err != nil {
- return nil, err
- }
- }
-
- sreq = &storeRequestMsgData{
- Id: id,
- Key: chunk.Key,
- SData: chunk.SData,
- }
- }
-
- return sreq, nil
-}
-
-// parse request types and extracts, key, id, chunk, request if available
-// does not do chunk lookup !
-func parseRequest(req interface{}) (storage.Key, uint64, *storage.Chunk, *storeRequestMsgData, error) {
- var key storage.Key
- var entry *syncDbEntry
- var chunk *storage.Chunk
- var id uint64
- var ok bool
- var sreq *storeRequestMsgData
- var err error
-
- if key, ok = req.(storage.Key); ok {
- id = generateId()
-
- } else if entry, ok = req.(*syncDbEntry); ok {
- id = binary.BigEndian.Uint64(entry.val[32:])
- key = storage.Key(entry.val[:32])
-
- } else if chunk, ok = req.(*storage.Chunk); ok {
- key = chunk.Key
- id = generateId()
-
- } else if sreq, ok = req.(*storeRequestMsgData); ok {
- key = sreq.Key
- } else {
- err = fmt.Errorf("type not allowed: %v (%T)", req, req)
- }
-
- return key, id, chunk, sreq, err
-}