diff options
author | ΞTHΞЯSPHΞЯΞ <{viktor.tron,nagydani,zsfelfoldi}@gmail.com> | 2016-08-30 03:18:00 +0800 |
---|---|---|
committer | Felix Lange <fjl@twurst.com> | 2016-08-31 22:19:40 +0800 |
commit | 4d300e4dece56535f56ccc32330340ce89e42581 (patch) | |
tree | 135838bfae03437eb2a50c6d66de4d66b20c3220 /swarm/network/syncer.go | |
parent | 1f58b2d084b65eaec9aa2c2ecb1d3aae50d894b3 (diff) | |
download | go-tangerine-4d300e4dece56535f56ccc32330340ce89e42581.tar go-tangerine-4d300e4dece56535f56ccc32330340ce89e42581.tar.gz go-tangerine-4d300e4dece56535f56ccc32330340ce89e42581.tar.bz2 go-tangerine-4d300e4dece56535f56ccc32330340ce89e42581.tar.lz go-tangerine-4d300e4dece56535f56ccc32330340ce89e42581.tar.xz go-tangerine-4d300e4dece56535f56ccc32330340ce89e42581.tar.zst go-tangerine-4d300e4dece56535f56ccc32330340ce89e42581.zip |
swarm: plan bee for content storage and distribution on web3
This change imports the Swarm protocol codebase. Compared to the 'swarm'
branch, a few mostly cosmetic changes had to be made:
* The various redundant log message prefixes are gone.
* All files now have LGPLv3 license headers.
* Minor code changes were needed to please go vet and make the tests
pass on Windows.
* Further changes were required to adapt to the go-ethereum develop
branch and its new Go APIs.
Some code has not (yet) been brought over:
* swarm/cmd/bzzhash: will reappear as cmd/bzzhash later
* swarm/cmd/bzzup.sh: will be reimplemented in cmd/bzzup
* swarm/cmd/makegenesis: will reappear somehow
* swarm/examples/album: will move to a separate repository
* swarm/examples/filemanager: ditto
* swarm/examples/files: will not be merged
* swarm/test/*: will not be merged
* swarm/services/swear: will reappear as contracts/swear when needed
Diffstat (limited to 'swarm/network/syncer.go')
-rw-r--r-- | swarm/network/syncer.go | 778 |
1 files changed, 778 insertions, 0 deletions
diff --git a/swarm/network/syncer.go b/swarm/network/syncer.go new file mode 100644 index 000000000..e871666bd --- /dev/null +++ b/swarm/network/syncer.go @@ -0,0 +1,778 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package network + +import ( + "encoding/binary" + "encoding/json" + "fmt" + "path/filepath" + + "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/logger/glog" + "github.com/ethereum/go-ethereum/swarm/storage" +) + +// syncer parameters (global, not peer specific) default values +const ( + requestDbBatchSize = 512 // size of batch before written to request db + keyBufferSize = 1024 // size of buffer for unsynced keys + syncBatchSize = 128 // maximum batchsize for outgoing requests + syncBufferSize = 128 // size of buffer for delivery requests + syncCacheSize = 1024 // cache capacity to store request queue in memory +) + +// priorities +const ( + Low = iota // 0 + Medium // 1 + High // 2 + priorities // 3 number of priority levels +) + +// request types +const ( + DeliverReq = iota // 0 + PushReq // 1 + PropagateReq // 2 + HistoryReq // 3 + BacklogReq // 4 +) + +// json serialisable struct to record the syncronisation state between 2 peers +type syncState struct { + *storage.DbSyncState // embeds the following 4 fields: + // Start Key // lower limit of address space + // Stop Key // upper limit of address space + // First uint64 // counter taken from last sync state + // Last uint64 // counter of remote peer dbStore at the time of last connection + SessionAt uint64 // set at the time of connection + LastSeenAt uint64 // set at the time of connection + Latest storage.Key // cursor of dbstore when last (continuously set by syncer) + Synced bool // true iff Sync is done up to the last disconnect + synced chan bool // signal that sync stage finished +} + +// wrapper of db-s to provide mockable custom local chunk store access to syncer +type DbAccess struct { + db *storage.DbStore + loc *storage.LocalStore +} + +func NewDbAccess(loc *storage.LocalStore) *DbAccess { + return &DbAccess{loc.DbStore.(*storage.DbStore), loc} +} + +// to obtain the chunks from key or request db entry only +func (self *DbAccess) get(key storage.Key) (*storage.Chunk, error) { + return self.loc.Get(key) +} + +// current storage counter of chunk db +func (self *DbAccess) counter() uint64 { + return self.db.Counter() +} + +// implemented by dbStoreSyncIterator +type keyIterator interface { + Next() storage.Key +} + +// generator function for iteration by address range and storage counter +func (self *DbAccess) iterator(s *syncState) keyIterator { + it, err := self.db.NewSyncIterator(*(s.DbSyncState)) + if err != nil { + return nil + } + return keyIterator(it) +} + +func (self syncState) String() string { + if self.Synced { + return fmt.Sprintf( + "session started at: %v, last seen at: %v, latest key: %v", + self.SessionAt, self.LastSeenAt, + self.Latest.Log(), + ) + } else { + return fmt.Sprintf( + "address: %v-%v, index: %v-%v, session started at: %v, last seen at: %v, latest key: %v", + self.Start.Log(), self.Stop.Log(), + self.First, self.Last, + self.SessionAt, self.LastSeenAt, + self.Latest.Log(), + ) + } +} + +// syncer parameters (global, not peer specific) +type SyncParams struct { + RequestDbPath string // path for request db (leveldb) + RequestDbBatchSize uint // nuber of items before batch is saved to requestdb + KeyBufferSize uint // size of key buffer + SyncBatchSize uint // maximum batchsize for outgoing requests + SyncBufferSize uint // size of buffer for + SyncCacheSize uint // cache capacity to store request queue in memory + SyncPriorities []uint // list of priority levels for req types 0-3 + SyncModes []bool // list of sync modes for for req types 0-3 +} + +// constructor with default values +func NewSyncParams(bzzdir string) *SyncParams { + return &SyncParams{ + RequestDbPath: filepath.Join(bzzdir, "requests"), + RequestDbBatchSize: requestDbBatchSize, + KeyBufferSize: keyBufferSize, + SyncBufferSize: syncBufferSize, + SyncBatchSize: syncBatchSize, + SyncCacheSize: syncCacheSize, + SyncPriorities: []uint{High, Medium, Medium, Low, Low}, + SyncModes: []bool{true, true, true, true, false}, + } +} + +// syncer is the agent that manages content distribution/storage replication/chunk storeRequest forwarding +type syncer struct { + *SyncParams // sync parameters + syncF func() bool // if syncing is needed + key storage.Key // remote peers address key + state *syncState // sync state for our dbStore + syncStates chan *syncState // different stages of sync + deliveryRequest chan bool // one of two triggers needed to send unsyncedKeys + newUnsyncedKeys chan bool // one of two triggers needed to send unsynced keys + quit chan bool // signal to quit loops + + // DB related fields + dbAccess *DbAccess // access to dbStore + db *storage.LDBDatabase // delivery msg db + + // native fields + queues [priorities]*syncDb // in-memory cache / queues for sync reqs + keys [priorities]chan interface{} // buffer for unsynced keys + deliveries [priorities]chan *storeRequestMsgData // delivery + + // bzz protocol instance outgoing message callbacks (mockable for testing) + unsyncedKeys func([]*syncRequest, *syncState) error // send unsyncedKeysMsg + store func(*storeRequestMsgData) error // send storeRequestMsg +} + +// a syncer instance is linked to each peer connection +// constructor is called from protocol after successful handshake +// the returned instance is attached to the peer and can be called +// by the forwarder +func newSyncer( + db *storage.LDBDatabase, remotekey storage.Key, + dbAccess *DbAccess, + unsyncedKeys func([]*syncRequest, *syncState) error, + store func(*storeRequestMsgData) error, + params *SyncParams, + state *syncState, + syncF func() bool, +) (*syncer, error) { + + syncBufferSize := params.SyncBufferSize + keyBufferSize := params.KeyBufferSize + dbBatchSize := params.RequestDbBatchSize + + self := &syncer{ + syncF: syncF, + key: remotekey, + dbAccess: dbAccess, + syncStates: make(chan *syncState, 20), + deliveryRequest: make(chan bool, 1), + newUnsyncedKeys: make(chan bool, 1), + SyncParams: params, + state: state, + quit: make(chan bool), + unsyncedKeys: unsyncedKeys, + store: store, + } + + // initialising + for i := 0; i < priorities; i++ { + self.keys[i] = make(chan interface{}, keyBufferSize) + self.deliveries[i] = make(chan *storeRequestMsgData) + // initialise a syncdb instance for each priority queue + self.queues[i] = newSyncDb(db, remotekey, uint(i), syncBufferSize, dbBatchSize, self.deliver(uint(i))) + } + glog.V(logger.Info).Infof("syncer started: %v", state) + // launch chunk delivery service + go self.syncDeliveries() + // launch sync task manager + if self.syncF() { + go self.sync() + } + // process unsynced keys to broadcast + go self.syncUnsyncedKeys() + + return self, nil +} + +// metadata serialisation +func encodeSync(state *syncState) (*json.RawMessage, error) { + data, err := json.MarshalIndent(state, "", " ") + if err != nil { + return nil, err + } + meta := json.RawMessage(data) + return &meta, nil +} + +func decodeSync(meta *json.RawMessage) (*syncState, error) { + if meta == nil { + return nil, fmt.Errorf("unable to deserialise sync state from <nil>") + } + data := []byte(*(meta)) + if len(data) == 0 { + return nil, fmt.Errorf("unable to deserialise sync state from <nil>") + } + state := &syncState{DbSyncState: &storage.DbSyncState{}} + err := json.Unmarshal(data, state) + return state, err +} + +/* + sync implements the syncing script + * first all items left in the request Db are replayed + * type = StaleSync + * Mode: by default once again via confirmation roundtrip + * Priority: the items are replayed as the proirity specified for StaleSync + * but within the order respects earlier priority level of request + * after all items are consumed for a priority level, the the respective + queue for delivery requests is open (this way new reqs not written to db) + (TODO: this should be checked) + * the sync state provided by the remote peer is used to sync history + * all the backlog from earlier (aborted) syncing is completed starting from latest + * if Last < LastSeenAt then all items in between then process all + backlog from upto last disconnect + * if Last > 0 && + + sync is called from the syncer constructor and is not supposed to be used externally +*/ +func (self *syncer) sync() { + state := self.state + // sync finished + defer close(self.syncStates) + + // 0. first replay stale requests from request db + if state.SessionAt == 0 { + glog.V(logger.Debug).Infof("syncer[%v]: nothing to sync", self.key.Log()) + return + } + glog.V(logger.Debug).Infof("syncer[%v]: start replaying stale requests from request db", self.key.Log()) + for p := priorities - 1; p >= 0; p-- { + self.queues[p].dbRead(false, 0, self.replay()) + } + glog.V(logger.Debug).Infof("syncer[%v]: done replaying stale requests from request db", self.key.Log()) + + // unless peer is synced sync unfinished history beginning on + if !state.Synced { + start := state.Start + + if !storage.IsZeroKey(state.Latest) { + // 1. there is unfinished earlier sync + state.Start = state.Latest + glog.V(logger.Debug).Infof("syncer[%v]: start syncronising backlog (unfinished sync: %v)", self.key.Log(), state) + // blocks while the entire history upto state is synced + self.syncState(state) + if state.Last < state.SessionAt { + state.First = state.Last + 1 + } + } + state.Latest = storage.ZeroKey + state.Start = start + // 2. sync up to last disconnect1 + if state.First < state.LastSeenAt { + state.Last = state.LastSeenAt + glog.V(logger.Debug).Infof("syncer[%v]: start syncronising history upto last disconnect at %v: %v", self.key.Log(), state.LastSeenAt, state) + self.syncState(state) + state.First = state.LastSeenAt + } + state.Latest = storage.ZeroKey + + } else { + // synchronisation starts at end of last session + state.First = state.LastSeenAt + } + + // 3. sync up to current session start + // if there have been new chunks since last session + if state.LastSeenAt < state.SessionAt { + state.Last = state.SessionAt + glog.V(logger.Debug).Infof("syncer[%v]: start syncronising history since last disconnect at %v up until session start at %v: %v", self.key.Log(), state.LastSeenAt, state.SessionAt, state) + // blocks until state syncing is finished + self.syncState(state) + } + glog.V(logger.Info).Infof("syncer[%v]: syncing all history complete", self.key.Log()) + +} + +// wait till syncronised block uptil state is synced +func (self *syncer) syncState(state *syncState) { + self.syncStates <- state + select { + case <-state.synced: + case <-self.quit: + } +} + +// stop quits both request processor and saves the request cache to disk +func (self *syncer) stop() { + close(self.quit) + glog.V(logger.Detail).Infof("syncer[%v]: stop and save sync request db backlog", self.key.Log()) + for _, db := range self.queues { + db.stop() + } +} + +// rlp serialisable sync request +type syncRequest struct { + Key storage.Key + Priority uint +} + +func (self *syncRequest) String() string { + return fmt.Sprintf("<Key: %v, Priority: %v>", self.Key.Log(), self.Priority) +} + +func (self *syncer) newSyncRequest(req interface{}, p int) (*syncRequest, error) { + key, _, _, _, err := parseRequest(req) + // TODO: if req has chunk, it should be put in a cache + // create + if err != nil { + return nil, err + } + return &syncRequest{key, uint(p)}, nil +} + +// serves historical items from the DB +// * read is on demand, blocking unless history channel is read +// * accepts sync requests (syncStates) to create new db iterator +// * closes the channel one iteration finishes +func (self *syncer) syncHistory(state *syncState) chan interface{} { + var n uint + history := make(chan interface{}) + glog.V(logger.Debug).Infof("syncer[%v]: syncing history between %v - %v for chunk addresses %v - %v", self.key.Log(), state.First, state.Last, state.Start, state.Stop) + it := self.dbAccess.iterator(state) + if it != nil { + go func() { + // signal end of the iteration ended + defer close(history) + IT: + for { + key := it.Next() + if key == nil { + break IT + } + select { + // blocking until history channel is read from + case history <- storage.Key(key): + n++ + glog.V(logger.Detail).Infof("syncer[%v]: history: %v (%v keys)", self.key.Log(), key.Log(), n) + state.Latest = key + case <-self.quit: + return + } + } + glog.V(logger.Debug).Infof("syncer[%v]: finished syncing history between %v - %v for chunk addresses %v - %v (at %v) (chunks = %v)", self.key.Log(), state.First, state.Last, state.Start, state.Stop, state.Latest, n) + }() + } + return history +} + +// triggers key syncronisation +func (self *syncer) sendUnsyncedKeys() { + select { + case self.deliveryRequest <- true: + default: + } +} + +// assembles a new batch of unsynced keys +// * keys are drawn from the key buffers in order of priority queue +// * if the queues of priority for History (HistoryReq) or higher are depleted, +// historical data is used so historical items are lower priority within +// their priority group. +// * Order of historical data is unspecified +func (self *syncer) syncUnsyncedKeys() { + // send out new + var unsynced []*syncRequest + var more, justSynced bool + var keyCount, historyCnt int + var history chan interface{} + + priority := High + keys := self.keys[priority] + var newUnsyncedKeys, deliveryRequest chan bool + keyCounts := make([]int, priorities) + histPrior := self.SyncPriorities[HistoryReq] + syncStates := self.syncStates + state := self.state + +LOOP: + for { + + var req interface{} + // select the highest priority channel to read from + // keys channels are buffered so the highest priority ones + // are checked first - integrity can only be guaranteed if writing + // is locked while selecting + if priority != High || len(keys) == 0 { + // selection is not needed if the High priority queue has items + keys = nil + PRIORITIES: + for priority = High; priority >= 0; priority-- { + // the first priority channel that is non-empty will be assigned to keys + if len(self.keys[priority]) > 0 { + glog.V(logger.Detail).Infof("syncer[%v]: reading request with priority %v", self.key.Log(), priority) + keys = self.keys[priority] + break PRIORITIES + } + glog.V(logger.Detail).Infof("syncer[%v/%v]: queue: [%v, %v, %v]", self.key.Log(), priority, len(self.keys[High]), len(self.keys[Medium]), len(self.keys[Low])) + // if the input queue is empty on this level, resort to history if there is any + if uint(priority) == histPrior && history != nil { + glog.V(logger.Detail).Infof("syncer[%v]: reading history for %v", self.key.Log(), self.key) + keys = history + break PRIORITIES + } + } + } + + // if peer ready to receive but nothing to send + if keys == nil && deliveryRequest == nil { + // if no items left and switch to waiting mode + glog.V(logger.Detail).Infof("syncer[%v]: buffers consumed. Waiting", self.key.Log()) + newUnsyncedKeys = self.newUnsyncedKeys + } + + // send msg iff + // * peer is ready to receive keys AND ( + // * all queues and history are depleted OR + // * batch full OR + // * all history have been consumed, synced) + if deliveryRequest == nil && + (justSynced || + len(unsynced) > 0 && keys == nil || + len(unsynced) == int(self.SyncBatchSize)) { + justSynced = false + // listen to requests + deliveryRequest = self.deliveryRequest + newUnsyncedKeys = nil // not care about data until next req comes in + // set sync to current counter + // (all nonhistorical outgoing traffic sheduled and persisted + state.LastSeenAt = self.dbAccess.counter() + state.Latest = storage.ZeroKey + glog.V(logger.Detail).Infof("syncer[%v]: sending %v", self.key.Log(), unsynced) + // send the unsynced keys + stateCopy := *state + err := self.unsyncedKeys(unsynced, &stateCopy) + if err != nil { + glog.V(logger.Warn).Infof("syncer[%v]: unable to send unsynced keys: %v", err) + } + self.state = state + glog.V(logger.Debug).Infof("syncer[%v]: --> %v keys sent: (total: %v (%v), history: %v), sent sync state: %v", self.key.Log(), len(unsynced), keyCounts, keyCount, historyCnt, stateCopy) + unsynced = nil + keys = nil + } + + // process item and add it to the batch + select { + case <-self.quit: + break LOOP + case req, more = <-keys: + if keys == history && !more { + glog.V(logger.Detail).Infof("syncer[%v]: syncing history segment complete", self.key.Log()) + // history channel is closed, waiting for new state (called from sync()) + syncStates = self.syncStates + state.Synced = true // this signals that the current segment is complete + select { + case state.synced <- false: + case <-self.quit: + break LOOP + } + justSynced = true + history = nil + } + case <-deliveryRequest: + glog.V(logger.Detail).Infof("syncer[%v]: peer ready to receive", self.key.Log()) + + // this 1 cap channel can wake up the loop + // signaling that peer is ready to receive unsynced Keys + // the channel is set to nil any further writes will be ignored + deliveryRequest = nil + + case <-newUnsyncedKeys: + glog.V(logger.Detail).Infof("syncer[%v]: new unsynced keys available", self.key.Log()) + // this 1 cap channel can wake up the loop + // signals that data is available to send if peer is ready to receive + newUnsyncedKeys = nil + keys = self.keys[High] + + case state, more = <-syncStates: + // this resets the state + if !more { + state = self.state + glog.V(logger.Detail).Infof("syncer[%v]: (priority %v) syncing complete upto %v)", self.key.Log(), priority, state) + state.Synced = true + syncStates = nil + } else { + glog.V(logger.Detail).Infof("syncer[%v]: (priority %v) syncing history upto %v priority %v)", self.key.Log(), priority, state, histPrior) + state.Synced = false + history = self.syncHistory(state) + // only one history at a time, only allow another one once the + // history channel is closed + syncStates = nil + } + } + if req == nil { + continue LOOP + } + + glog.V(logger.Detail).Infof("syncer[%v]: (priority %v) added to unsynced keys: %v", self.key.Log(), priority, req) + keyCounts[priority]++ + keyCount++ + if keys == history { + glog.V(logger.Detail).Infof("syncer[%v]: (priority %v) history item %v (synced = %v)", self.key.Log(), priority, req, state.Synced) + historyCnt++ + } + if sreq, err := self.newSyncRequest(req, priority); err == nil { + // extract key from req + glog.V(logger.Detail).Infof("syncer(priority %v): request %v (synced = %v)", self.key.Log(), priority, req, state.Synced) + unsynced = append(unsynced, sreq) + } else { + glog.V(logger.Warn).Infof("syncer(priority %v): error creating request for %v: %v)", self.key.Log(), priority, req, state.Synced, err) + } + + } +} + +// delivery loop +// takes into account priority, send store Requests with chunk (delivery) +// idle blocking if no new deliveries in any of the queues +func (self *syncer) syncDeliveries() { + var req *storeRequestMsgData + p := High + var deliveries chan *storeRequestMsgData + var msg *storeRequestMsgData + var err error + var c = [priorities]int{} + var n = [priorities]int{} + var total, success uint + + for { + deliveries = self.deliveries[p] + select { + case req = <-deliveries: + n[p]++ + c[p]++ + default: + if p == Low { + // blocking, depletion on all channels, no preference for priority + select { + case req = <-self.deliveries[High]: + n[High]++ + case req = <-self.deliveries[Medium]: + n[Medium]++ + case req = <-self.deliveries[Low]: + n[Low]++ + case <-self.quit: + return + } + p = High + } else { + p-- + continue + } + } + total++ + msg, err = self.newStoreRequestMsgData(req) + if err != nil { + glog.V(logger.Warn).Infof("syncer[%v]: failed to create store request for %v: %v", self.key.Log(), req, err) + } else { + err = self.store(msg) + if err != nil { + glog.V(logger.Warn).Infof("syncer[%v]: failed to deliver %v: %v", self.key.Log(), req, err) + } else { + success++ + glog.V(logger.Detail).Infof("syncer[%v]: %v successfully delivered", self.key.Log(), req) + } + } + if total%self.SyncBatchSize == 0 { + glog.V(logger.Debug).Infof("syncer[%v]: deliver Total: %v, Success: %v, High: %v/%v, Medium: %v/%v, Low %v/%v", self.key.Log(), total, success, c[High], n[High], c[Medium], n[Medium], c[Low], n[Low]) + } + } +} + +/* + addRequest handles requests for delivery + it accepts 4 types: + + * storeRequestMsgData: coming from netstore propagate response + * chunk: coming from forwarding (questionable: id?) + * key: from incoming syncRequest + * syncDbEntry: key,id encoded in db + + If sync mode is on for the type of request, then + it sends the request to the keys queue of the correct priority + channel buffered with capacity (SyncBufferSize) + + If sync mode is off then, requests are directly sent to deliveries +*/ +func (self *syncer) addRequest(req interface{}, ty int) { + // retrieve priority for request type name int8 + + priority := self.SyncPriorities[ty] + // sync mode for this type ON + if self.syncF() || ty == DeliverReq { + if self.SyncModes[ty] { + self.addKey(req, priority, self.quit) + } else { + self.addDelivery(req, priority, self.quit) + } + } +} + +// addKey queues sync request for sync confirmation with given priority +// ie the key will go out in an unsyncedKeys message +func (self *syncer) addKey(req interface{}, priority uint, quit chan bool) bool { + select { + case self.keys[priority] <- req: + // this wakes up the unsynced keys loop if idle + select { + case self.newUnsyncedKeys <- true: + default: + } + return true + case <-quit: + return false + } +} + +// addDelivery queues delivery request for with given priority +// ie the chunk will be delivered ASAP mod priority queueing handled by syncdb +// requests are persisted across sessions for correct sync +func (self *syncer) addDelivery(req interface{}, priority uint, quit chan bool) bool { + select { + case self.queues[priority].buffer <- req: + return true + case <-quit: + return false + } +} + +// doDelivery delivers the chunk for the request with given priority +// without queuing +func (self *syncer) doDelivery(req interface{}, priority uint, quit chan bool) bool { + msgdata, err := self.newStoreRequestMsgData(req) + if err != nil { + glog.V(logger.Warn).Infof("unable to deliver request %v: %v", msgdata, err) + return false + } + select { + case self.deliveries[priority] <- msgdata: + return true + case <-quit: + return false + } +} + +// returns the delivery function for given priority +// passed on to syncDb +func (self *syncer) deliver(priority uint) func(req interface{}, quit chan bool) bool { + return func(req interface{}, quit chan bool) bool { + return self.doDelivery(req, priority, quit) + } +} + +// returns the replay function passed on to syncDb +// depending on sync mode settings for BacklogReq, +// re play of request db backlog sends items via confirmation +// or directly delivers +func (self *syncer) replay() func(req interface{}, quit chan bool) bool { + sync := self.SyncModes[BacklogReq] + priority := self.SyncPriorities[BacklogReq] + // sync mode for this type ON + if sync { + return func(req interface{}, quit chan bool) bool { + return self.addKey(req, priority, quit) + } + } else { + return func(req interface{}, quit chan bool) bool { + return self.doDelivery(req, priority, quit) + } + + } +} + +// given a request, extends it to a full storeRequestMsgData +// polimorphic: see addRequest for the types accepted +func (self *syncer) newStoreRequestMsgData(req interface{}) (*storeRequestMsgData, error) { + + key, id, chunk, sreq, err := parseRequest(req) + if err != nil { + return nil, err + } + + if sreq == nil { + if chunk == nil { + var err error + chunk, err = self.dbAccess.get(key) + if err != nil { + return nil, err + } + } + + sreq = &storeRequestMsgData{ + Id: id, + Key: chunk.Key, + SData: chunk.SData, + } + } + + return sreq, nil +} + +// parse request types and extracts, key, id, chunk, request if available +// does not do chunk lookup ! +func parseRequest(req interface{}) (storage.Key, uint64, *storage.Chunk, *storeRequestMsgData, error) { + var key storage.Key + var entry *syncDbEntry + var chunk *storage.Chunk + var id uint64 + var ok bool + var sreq *storeRequestMsgData + var err error + + if key, ok = req.(storage.Key); ok { + id = generateId() + + } else if entry, ok = req.(*syncDbEntry); ok { + id = binary.BigEndian.Uint64(entry.val[32:]) + key = storage.Key(entry.val[:32]) + + } else if chunk, ok = req.(*storage.Chunk); ok { + key = chunk.Key + id = generateId() + + } else if sreq, ok = req.(*storeRequestMsgData); ok { + key = sreq.Key + } else { + err = fmt.Errorf("type not allowed: %v (%T)", req, req) + } + + return key, id, chunk, sreq, err +} |