diff options
Diffstat (limited to 'swarm/network/syncer.go')
-rw-r--r-- | swarm/network/syncer.go | 781 |
1 files changed, 0 insertions, 781 deletions
diff --git a/swarm/network/syncer.go b/swarm/network/syncer.go deleted file mode 100644 index 6d729fcb9..000000000 --- a/swarm/network/syncer.go +++ /dev/null @@ -1,781 +0,0 @@ -// Copyright 2016 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. - -package network - -import ( - "encoding/binary" - "encoding/json" - "fmt" - "path/filepath" - - "github.com/ethereum/go-ethereum/log" - "github.com/ethereum/go-ethereum/swarm/storage" -) - -// syncer parameters (global, not peer specific) default values -const ( - requestDbBatchSize = 512 // size of batch before written to request db - keyBufferSize = 1024 // size of buffer for unsynced keys - syncBatchSize = 128 // maximum batchsize for outgoing requests - syncBufferSize = 128 // size of buffer for delivery requests - syncCacheSize = 1024 // cache capacity to store request queue in memory -) - -// priorities -const ( - Low = iota // 0 - Medium // 1 - High // 2 - priorities // 3 number of priority levels -) - -// request types -const ( - DeliverReq = iota // 0 - PushReq // 1 - PropagateReq // 2 - HistoryReq // 3 - BacklogReq // 4 -) - -// json serialisable struct to record the syncronisation state between 2 peers -type syncState struct { - *storage.DbSyncState // embeds the following 4 fields: - // Start Key // lower limit of address space - // Stop Key // upper limit of address space - // First uint64 // counter taken from last sync state - // Last uint64 // counter of remote peer dbStore at the time of last connection - SessionAt uint64 // set at the time of connection - LastSeenAt uint64 // set at the time of connection - Latest storage.Key // cursor of dbstore when last (continuously set by syncer) - Synced bool // true iff Sync is done up to the last disconnect - synced chan bool // signal that sync stage finished -} - -// wrapper of db-s to provide mockable custom local chunk store access to syncer -type DbAccess struct { - db *storage.DbStore - loc *storage.LocalStore -} - -func NewDbAccess(loc *storage.LocalStore) *DbAccess { - return &DbAccess{loc.DbStore.(*storage.DbStore), loc} -} - -// to obtain the chunks from key or request db entry only -func (self *DbAccess) get(key storage.Key) (*storage.Chunk, error) { - return self.loc.Get(key) -} - -// current storage counter of chunk db -func (self *DbAccess) counter() uint64 { - return self.db.Counter() -} - -// implemented by dbStoreSyncIterator -type keyIterator interface { - Next() storage.Key -} - -// generator function for iteration by address range and storage counter -func (self *DbAccess) iterator(s *syncState) keyIterator { - it, err := self.db.NewSyncIterator(*(s.DbSyncState)) - if err != nil { - return nil - } - return keyIterator(it) -} - -func (self syncState) String() string { - if self.Synced { - return fmt.Sprintf( - "session started at: %v, last seen at: %v, latest key: %v", - self.SessionAt, self.LastSeenAt, - self.Latest.Log(), - ) - } else { - return fmt.Sprintf( - "address: %v-%v, index: %v-%v, session started at: %v, last seen at: %v, latest key: %v", - self.Start.Log(), self.Stop.Log(), - self.First, self.Last, - self.SessionAt, self.LastSeenAt, - self.Latest.Log(), - ) - } -} - -// syncer parameters (global, not peer specific) -type SyncParams struct { - RequestDbPath string // path for request db (leveldb) - RequestDbBatchSize uint // nuber of items before batch is saved to requestdb - KeyBufferSize uint // size of key buffer - SyncBatchSize uint // maximum batchsize for outgoing requests - SyncBufferSize uint // size of buffer for - SyncCacheSize uint // cache capacity to store request queue in memory - SyncPriorities []uint // list of priority levels for req types 0-3 - SyncModes []bool // list of sync modes for for req types 0-3 -} - -// constructor with default values -func NewDefaultSyncParams() *SyncParams { - return &SyncParams{ - RequestDbBatchSize: requestDbBatchSize, - KeyBufferSize: keyBufferSize, - SyncBufferSize: syncBufferSize, - SyncBatchSize: syncBatchSize, - SyncCacheSize: syncCacheSize, - SyncPriorities: []uint{High, Medium, Medium, Low, Low}, - SyncModes: []bool{true, true, true, true, false}, - } -} - -//this can only finally be set after all config options (file, cmd line, env vars) -//have been evaluated -func (self *SyncParams) Init(path string) { - self.RequestDbPath = filepath.Join(path, "requests") -} - -// syncer is the agent that manages content distribution/storage replication/chunk storeRequest forwarding -type syncer struct { - *SyncParams // sync parameters - syncF func() bool // if syncing is needed - key storage.Key // remote peers address key - state *syncState // sync state for our dbStore - syncStates chan *syncState // different stages of sync - deliveryRequest chan bool // one of two triggers needed to send unsyncedKeys - newUnsyncedKeys chan bool // one of two triggers needed to send unsynced keys - quit chan bool // signal to quit loops - - // DB related fields - dbAccess *DbAccess // access to dbStore - - // native fields - queues [priorities]*syncDb // in-memory cache / queues for sync reqs - keys [priorities]chan interface{} // buffer for unsynced keys - deliveries [priorities]chan *storeRequestMsgData // delivery - - // bzz protocol instance outgoing message callbacks (mockable for testing) - unsyncedKeys func([]*syncRequest, *syncState) error // send unsyncedKeysMsg - store func(*storeRequestMsgData) error // send storeRequestMsg -} - -// a syncer instance is linked to each peer connection -// constructor is called from protocol after successful handshake -// the returned instance is attached to the peer and can be called -// by the forwarder -func newSyncer( - db *storage.LDBDatabase, remotekey storage.Key, - dbAccess *DbAccess, - unsyncedKeys func([]*syncRequest, *syncState) error, - store func(*storeRequestMsgData) error, - params *SyncParams, - state *syncState, - syncF func() bool, -) (*syncer, error) { - - syncBufferSize := params.SyncBufferSize - keyBufferSize := params.KeyBufferSize - dbBatchSize := params.RequestDbBatchSize - - self := &syncer{ - syncF: syncF, - key: remotekey, - dbAccess: dbAccess, - syncStates: make(chan *syncState, 20), - deliveryRequest: make(chan bool, 1), - newUnsyncedKeys: make(chan bool, 1), - SyncParams: params, - state: state, - quit: make(chan bool), - unsyncedKeys: unsyncedKeys, - store: store, - } - - // initialising - for i := 0; i < priorities; i++ { - self.keys[i] = make(chan interface{}, keyBufferSize) - self.deliveries[i] = make(chan *storeRequestMsgData) - // initialise a syncdb instance for each priority queue - self.queues[i] = newSyncDb(db, remotekey, uint(i), syncBufferSize, dbBatchSize, self.deliver(uint(i))) - } - log.Info(fmt.Sprintf("syncer started: %v", state)) - // launch chunk delivery service - go self.syncDeliveries() - // launch sync task manager - if self.syncF() { - go self.sync() - } - // process unsynced keys to broadcast - go self.syncUnsyncedKeys() - - return self, nil -} - -// metadata serialisation -func encodeSync(state *syncState) (*json.RawMessage, error) { - data, err := json.MarshalIndent(state, "", " ") - if err != nil { - return nil, err - } - meta := json.RawMessage(data) - return &meta, nil -} - -func decodeSync(meta *json.RawMessage) (*syncState, error) { - if meta == nil { - return nil, fmt.Errorf("unable to deserialise sync state from <nil>") - } - data := []byte(*(meta)) - if len(data) == 0 { - return nil, fmt.Errorf("unable to deserialise sync state from <nil>") - } - state := &syncState{DbSyncState: &storage.DbSyncState{}} - err := json.Unmarshal(data, state) - return state, err -} - -/* - sync implements the syncing script - * first all items left in the request Db are replayed - * type = StaleSync - * Mode: by default once again via confirmation roundtrip - * Priority: the items are replayed as the proirity specified for StaleSync - * but within the order respects earlier priority level of request - * after all items are consumed for a priority level, the the respective - queue for delivery requests is open (this way new reqs not written to db) - (TODO: this should be checked) - * the sync state provided by the remote peer is used to sync history - * all the backlog from earlier (aborted) syncing is completed starting from latest - * if Last < LastSeenAt then all items in between then process all - backlog from upto last disconnect - * if Last > 0 && - - sync is called from the syncer constructor and is not supposed to be used externally -*/ -func (self *syncer) sync() { - state := self.state - // sync finished - defer close(self.syncStates) - - // 0. first replay stale requests from request db - if state.SessionAt == 0 { - log.Debug(fmt.Sprintf("syncer[%v]: nothing to sync", self.key.Log())) - return - } - log.Debug(fmt.Sprintf("syncer[%v]: start replaying stale requests from request db", self.key.Log())) - for p := priorities - 1; p >= 0; p-- { - self.queues[p].dbRead(false, 0, self.replay()) - } - log.Debug(fmt.Sprintf("syncer[%v]: done replaying stale requests from request db", self.key.Log())) - - // unless peer is synced sync unfinished history beginning on - if !state.Synced { - start := state.Start - - if !storage.IsZeroKey(state.Latest) { - // 1. there is unfinished earlier sync - state.Start = state.Latest - log.Debug(fmt.Sprintf("syncer[%v]: start syncronising backlog (unfinished sync: %v)", self.key.Log(), state)) - // blocks while the entire history upto state is synced - self.syncState(state) - if state.Last < state.SessionAt { - state.First = state.Last + 1 - } - } - state.Latest = storage.ZeroKey - state.Start = start - // 2. sync up to last disconnect1 - if state.First < state.LastSeenAt { - state.Last = state.LastSeenAt - log.Debug(fmt.Sprintf("syncer[%v]: start syncronising history upto last disconnect at %v: %v", self.key.Log(), state.LastSeenAt, state)) - self.syncState(state) - state.First = state.LastSeenAt - } - state.Latest = storage.ZeroKey - - } else { - // synchronisation starts at end of last session - state.First = state.LastSeenAt - } - - // 3. sync up to current session start - // if there have been new chunks since last session - if state.LastSeenAt < state.SessionAt { - state.Last = state.SessionAt - log.Debug(fmt.Sprintf("syncer[%v]: start syncronising history since last disconnect at %v up until session start at %v: %v", self.key.Log(), state.LastSeenAt, state.SessionAt, state)) - // blocks until state syncing is finished - self.syncState(state) - } - log.Info(fmt.Sprintf("syncer[%v]: syncing all history complete", self.key.Log())) - -} - -// wait till syncronised block uptil state is synced -func (self *syncer) syncState(state *syncState) { - self.syncStates <- state - select { - case <-state.synced: - case <-self.quit: - } -} - -// stop quits both request processor and saves the request cache to disk -func (self *syncer) stop() { - close(self.quit) - log.Trace(fmt.Sprintf("syncer[%v]: stop and save sync request db backlog", self.key.Log())) - for _, db := range self.queues { - db.stop() - } -} - -// rlp serialisable sync request -type syncRequest struct { - Key storage.Key - Priority uint -} - -func (self *syncRequest) String() string { - return fmt.Sprintf("<Key: %v, Priority: %v>", self.Key.Log(), self.Priority) -} - -func (self *syncer) newSyncRequest(req interface{}, p int) (*syncRequest, error) { - key, _, _, _, err := parseRequest(req) - // TODO: if req has chunk, it should be put in a cache - // create - if err != nil { - return nil, err - } - return &syncRequest{key, uint(p)}, nil -} - -// serves historical items from the DB -// * read is on demand, blocking unless history channel is read -// * accepts sync requests (syncStates) to create new db iterator -// * closes the channel one iteration finishes -func (self *syncer) syncHistory(state *syncState) chan interface{} { - var n uint - history := make(chan interface{}) - log.Debug(fmt.Sprintf("syncer[%v]: syncing history between %v - %v for chunk addresses %v - %v", self.key.Log(), state.First, state.Last, state.Start, state.Stop)) - it := self.dbAccess.iterator(state) - if it != nil { - go func() { - // signal end of the iteration ended - defer close(history) - IT: - for { - key := it.Next() - if key == nil { - break IT - } - select { - // blocking until history channel is read from - case history <- key: - n++ - log.Trace(fmt.Sprintf("syncer[%v]: history: %v (%v keys)", self.key.Log(), key.Log(), n)) - state.Latest = key - case <-self.quit: - return - } - } - log.Debug(fmt.Sprintf("syncer[%v]: finished syncing history between %v - %v for chunk addresses %v - %v (at %v) (chunks = %v)", self.key.Log(), state.First, state.Last, state.Start, state.Stop, state.Latest, n)) - }() - } - return history -} - -// triggers key syncronisation -func (self *syncer) sendUnsyncedKeys() { - select { - case self.deliveryRequest <- true: - default: - } -} - -// assembles a new batch of unsynced keys -// * keys are drawn from the key buffers in order of priority queue -// * if the queues of priority for History (HistoryReq) or higher are depleted, -// historical data is used so historical items are lower priority within -// their priority group. -// * Order of historical data is unspecified -func (self *syncer) syncUnsyncedKeys() { - // send out new - var unsynced []*syncRequest - var more, justSynced bool - var keyCount, historyCnt int - var history chan interface{} - - priority := High - keys := self.keys[priority] - var newUnsyncedKeys, deliveryRequest chan bool - keyCounts := make([]int, priorities) - histPrior := self.SyncPriorities[HistoryReq] - syncStates := self.syncStates - state := self.state - -LOOP: - for { - - var req interface{} - // select the highest priority channel to read from - // keys channels are buffered so the highest priority ones - // are checked first - integrity can only be guaranteed if writing - // is locked while selecting - if priority != High || len(keys) == 0 { - // selection is not needed if the High priority queue has items - keys = nil - PRIORITIES: - for priority = High; priority >= 0; priority-- { - // the first priority channel that is non-empty will be assigned to keys - if len(self.keys[priority]) > 0 { - log.Trace(fmt.Sprintf("syncer[%v]: reading request with priority %v", self.key.Log(), priority)) - keys = self.keys[priority] - break PRIORITIES - } - log.Trace(fmt.Sprintf("syncer[%v/%v]: queue: [%v, %v, %v]", self.key.Log(), priority, len(self.keys[High]), len(self.keys[Medium]), len(self.keys[Low]))) - // if the input queue is empty on this level, resort to history if there is any - if uint(priority) == histPrior && history != nil { - log.Trace(fmt.Sprintf("syncer[%v]: reading history for %v", self.key.Log(), self.key)) - keys = history - break PRIORITIES - } - } - } - - // if peer ready to receive but nothing to send - if keys == nil && deliveryRequest == nil { - // if no items left and switch to waiting mode - log.Trace(fmt.Sprintf("syncer[%v]: buffers consumed. Waiting", self.key.Log())) - newUnsyncedKeys = self.newUnsyncedKeys - } - - // send msg iff - // * peer is ready to receive keys AND ( - // * all queues and history are depleted OR - // * batch full OR - // * all history have been consumed, synced) - if deliveryRequest == nil && - (justSynced || - len(unsynced) > 0 && keys == nil || - len(unsynced) == int(self.SyncBatchSize)) { - justSynced = false - // listen to requests - deliveryRequest = self.deliveryRequest - newUnsyncedKeys = nil // not care about data until next req comes in - // set sync to current counter - // (all nonhistorical outgoing traffic sheduled and persisted - state.LastSeenAt = self.dbAccess.counter() - state.Latest = storage.ZeroKey - log.Trace(fmt.Sprintf("syncer[%v]: sending %v", self.key.Log(), unsynced)) - // send the unsynced keys - stateCopy := *state - err := self.unsyncedKeys(unsynced, &stateCopy) - if err != nil { - log.Warn(fmt.Sprintf("syncer[%v]: unable to send unsynced keys: %v", self.key.Log(), err)) - } - self.state = state - log.Debug(fmt.Sprintf("syncer[%v]: --> %v keys sent: (total: %v (%v), history: %v), sent sync state: %v", self.key.Log(), len(unsynced), keyCounts, keyCount, historyCnt, stateCopy)) - unsynced = nil - keys = nil - } - - // process item and add it to the batch - select { - case <-self.quit: - break LOOP - case req, more = <-keys: - if keys == history && !more { - log.Trace(fmt.Sprintf("syncer[%v]: syncing history segment complete", self.key.Log())) - // history channel is closed, waiting for new state (called from sync()) - syncStates = self.syncStates - state.Synced = true // this signals that the current segment is complete - select { - case state.synced <- false: - case <-self.quit: - break LOOP - } - justSynced = true - history = nil - } - case <-deliveryRequest: - log.Trace(fmt.Sprintf("syncer[%v]: peer ready to receive", self.key.Log())) - - // this 1 cap channel can wake up the loop - // signaling that peer is ready to receive unsynced Keys - // the channel is set to nil any further writes will be ignored - deliveryRequest = nil - - case <-newUnsyncedKeys: - log.Trace(fmt.Sprintf("syncer[%v]: new unsynced keys available", self.key.Log())) - // this 1 cap channel can wake up the loop - // signals that data is available to send if peer is ready to receive - newUnsyncedKeys = nil - keys = self.keys[High] - - case state, more = <-syncStates: - // this resets the state - if !more { - state = self.state - log.Trace(fmt.Sprintf("syncer[%v]: (priority %v) syncing complete upto %v)", self.key.Log(), priority, state)) - state.Synced = true - syncStates = nil - } else { - log.Trace(fmt.Sprintf("syncer[%v]: (priority %v) syncing history upto %v priority %v)", self.key.Log(), priority, state, histPrior)) - state.Synced = false - history = self.syncHistory(state) - // only one history at a time, only allow another one once the - // history channel is closed - syncStates = nil - } - } - if req == nil { - continue LOOP - } - - log.Trace(fmt.Sprintf("syncer[%v]: (priority %v) added to unsynced keys: %v", self.key.Log(), priority, req)) - keyCounts[priority]++ - keyCount++ - if keys == history { - log.Trace(fmt.Sprintf("syncer[%v]: (priority %v) history item %v (synced = %v)", self.key.Log(), priority, req, state.Synced)) - historyCnt++ - } - if sreq, err := self.newSyncRequest(req, priority); err == nil { - // extract key from req - log.Trace(fmt.Sprintf("syncer[%v]: (priority %v): request %v (synced = %v)", self.key.Log(), priority, req, state.Synced)) - unsynced = append(unsynced, sreq) - } else { - log.Warn(fmt.Sprintf("syncer[%v]: (priority %v): error creating request for %v: %v)", self.key.Log(), priority, req, err)) - } - - } -} - -// delivery loop -// takes into account priority, send store Requests with chunk (delivery) -// idle blocking if no new deliveries in any of the queues -func (self *syncer) syncDeliveries() { - var req *storeRequestMsgData - p := High - var deliveries chan *storeRequestMsgData - var msg *storeRequestMsgData - var err error - var c = [priorities]int{} - var n = [priorities]int{} - var total, success uint - - for { - deliveries = self.deliveries[p] - select { - case req = <-deliveries: - n[p]++ - c[p]++ - default: - if p == Low { - // blocking, depletion on all channels, no preference for priority - select { - case req = <-self.deliveries[High]: - n[High]++ - case req = <-self.deliveries[Medium]: - n[Medium]++ - case req = <-self.deliveries[Low]: - n[Low]++ - case <-self.quit: - return - } - p = High - } else { - p-- - continue - } - } - total++ - msg, err = self.newStoreRequestMsgData(req) - if err != nil { - log.Warn(fmt.Sprintf("syncer[%v]: failed to create store request for %v: %v", self.key.Log(), req, err)) - } else { - err = self.store(msg) - if err != nil { - log.Warn(fmt.Sprintf("syncer[%v]: failed to deliver %v: %v", self.key.Log(), req, err)) - } else { - success++ - log.Trace(fmt.Sprintf("syncer[%v]: %v successfully delivered", self.key.Log(), req)) - } - } - if total%self.SyncBatchSize == 0 { - log.Debug(fmt.Sprintf("syncer[%v]: deliver Total: %v, Success: %v, High: %v/%v, Medium: %v/%v, Low %v/%v", self.key.Log(), total, success, c[High], n[High], c[Medium], n[Medium], c[Low], n[Low])) - } - } -} - -/* - addRequest handles requests for delivery - it accepts 4 types: - - * storeRequestMsgData: coming from netstore propagate response - * chunk: coming from forwarding (questionable: id?) - * key: from incoming syncRequest - * syncDbEntry: key,id encoded in db - - If sync mode is on for the type of request, then - it sends the request to the keys queue of the correct priority - channel buffered with capacity (SyncBufferSize) - - If sync mode is off then, requests are directly sent to deliveries -*/ -func (self *syncer) addRequest(req interface{}, ty int) { - // retrieve priority for request type name int8 - - priority := self.SyncPriorities[ty] - // sync mode for this type ON - if self.syncF() || ty == DeliverReq { - if self.SyncModes[ty] { - self.addKey(req, priority, self.quit) - } else { - self.addDelivery(req, priority, self.quit) - } - } -} - -// addKey queues sync request for sync confirmation with given priority -// ie the key will go out in an unsyncedKeys message -func (self *syncer) addKey(req interface{}, priority uint, quit chan bool) bool { - select { - case self.keys[priority] <- req: - // this wakes up the unsynced keys loop if idle - select { - case self.newUnsyncedKeys <- true: - default: - } - return true - case <-quit: - return false - } -} - -// addDelivery queues delivery request for with given priority -// ie the chunk will be delivered ASAP mod priority queueing handled by syncdb -// requests are persisted across sessions for correct sync -func (self *syncer) addDelivery(req interface{}, priority uint, quit chan bool) bool { - select { - case self.queues[priority].buffer <- req: - return true - case <-quit: - return false - } -} - -// doDelivery delivers the chunk for the request with given priority -// without queuing -func (self *syncer) doDelivery(req interface{}, priority uint, quit chan bool) bool { - msgdata, err := self.newStoreRequestMsgData(req) - if err != nil { - log.Warn(fmt.Sprintf("unable to deliver request %v: %v", msgdata, err)) - return false - } - select { - case self.deliveries[priority] <- msgdata: - return true - case <-quit: - return false - } -} - -// returns the delivery function for given priority -// passed on to syncDb -func (self *syncer) deliver(priority uint) func(req interface{}, quit chan bool) bool { - return func(req interface{}, quit chan bool) bool { - return self.doDelivery(req, priority, quit) - } -} - -// returns the replay function passed on to syncDb -// depending on sync mode settings for BacklogReq, -// re play of request db backlog sends items via confirmation -// or directly delivers -func (self *syncer) replay() func(req interface{}, quit chan bool) bool { - sync := self.SyncModes[BacklogReq] - priority := self.SyncPriorities[BacklogReq] - // sync mode for this type ON - if sync { - return func(req interface{}, quit chan bool) bool { - return self.addKey(req, priority, quit) - } - } else { - return func(req interface{}, quit chan bool) bool { - return self.doDelivery(req, priority, quit) - } - - } -} - -// given a request, extends it to a full storeRequestMsgData -// polimorphic: see addRequest for the types accepted -func (self *syncer) newStoreRequestMsgData(req interface{}) (*storeRequestMsgData, error) { - - key, id, chunk, sreq, err := parseRequest(req) - if err != nil { - return nil, err - } - - if sreq == nil { - if chunk == nil { - var err error - chunk, err = self.dbAccess.get(key) - if err != nil { - return nil, err - } - } - - sreq = &storeRequestMsgData{ - Id: id, - Key: chunk.Key, - SData: chunk.SData, - } - } - - return sreq, nil -} - -// parse request types and extracts, key, id, chunk, request if available -// does not do chunk lookup ! -func parseRequest(req interface{}) (storage.Key, uint64, *storage.Chunk, *storeRequestMsgData, error) { - var key storage.Key - var entry *syncDbEntry - var chunk *storage.Chunk - var id uint64 - var ok bool - var sreq *storeRequestMsgData - var err error - - if key, ok = req.(storage.Key); ok { - id = generateId() - - } else if entry, ok = req.(*syncDbEntry); ok { - id = binary.BigEndian.Uint64(entry.val[32:]) - key = storage.Key(entry.val[:32]) - - } else if chunk, ok = req.(*storage.Chunk); ok { - key = chunk.Key - id = generateId() - - } else if sreq, ok = req.(*storeRequestMsgData); ok { - key = sreq.Key - } else { - err = fmt.Errorf("type not allowed: %v (%T)", req, req) - } - - return key, id, chunk, sreq, err -} |