aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/network
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/network')
-rw-r--r--swarm/network/depo.go211
-rw-r--r--swarm/network/forwarding.go150
-rw-r--r--swarm/network/hive.go383
-rw-r--r--swarm/network/kademlia/address.go173
-rw-r--r--swarm/network/kademlia/address_test.go96
-rw-r--r--swarm/network/kademlia/kaddb.go351
-rw-r--r--swarm/network/kademlia/kademlia.go429
-rw-r--r--swarm/network/kademlia/kademlia_test.go392
-rw-r--r--swarm/network/messages.go317
-rw-r--r--swarm/network/protocol.go554
-rw-r--r--swarm/network/protocol_test.go17
-rw-r--r--swarm/network/syncdb.go390
-rw-r--r--swarm/network/syncdb_test.go221
-rw-r--r--swarm/network/syncer.go778
14 files changed, 4462 insertions, 0 deletions
diff --git a/swarm/network/depo.go b/swarm/network/depo.go
new file mode 100644
index 000000000..79987cc6b
--- /dev/null
+++ b/swarm/network/depo.go
@@ -0,0 +1,211 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package network
+
+import (
+ "bytes"
+ "encoding/binary"
+ "time"
+
+ "github.com/ethereum/go-ethereum/logger"
+ "github.com/ethereum/go-ethereum/logger/glog"
+ "github.com/ethereum/go-ethereum/swarm/storage"
+)
+
+// Handler for storage/retrieval related protocol requests
+// implements the StorageHandler interface used by the bzz protocol
+type Depo struct {
+ hashfunc storage.Hasher
+ localStore storage.ChunkStore
+ netStore storage.ChunkStore
+}
+
+func NewDepo(hash storage.Hasher, localStore, remoteStore storage.ChunkStore) *Depo {
+ return &Depo{
+ hashfunc: hash,
+ localStore: localStore,
+ netStore: remoteStore, // entrypoint internal
+ }
+}
+
+// Handles UnsyncedKeysMsg after msg decoding - unsynced hashes upto sync state
+// * the remote sync state is just stored and handled in protocol
+// * filters through the new syncRequests and send the ones missing
+// * back immediately as a deliveryRequest message
+// * empty message just pings back for more (is this needed?)
+// * strict signed sync states may be needed.
+func (self *Depo) HandleUnsyncedKeysMsg(req *unsyncedKeysMsgData, p *peer) error {
+ unsynced := req.Unsynced
+ var missing []*syncRequest
+ var chunk *storage.Chunk
+ var err error
+ for _, req := range unsynced {
+ // skip keys that are found,
+ chunk, err = self.localStore.Get(storage.Key(req.Key[:]))
+ if err != nil || chunk.SData == nil {
+ missing = append(missing, req)
+ }
+ }
+ glog.V(logger.Debug).Infof("Depo.HandleUnsyncedKeysMsg: received %v unsynced keys: %v missing. new state: %v", len(unsynced), len(missing), req.State)
+ glog.V(logger.Detail).Infof("Depo.HandleUnsyncedKeysMsg: received %v", unsynced)
+ // send delivery request with missing keys
+ err = p.deliveryRequest(missing)
+ if err != nil {
+ return err
+ }
+ // set peers state to persist
+ p.syncState = req.State
+ return nil
+}
+
+// Handles deliveryRequestMsg
+// * serves actual chunks asked by the remote peer
+// by pushing to the delivery queue (sync db) of the correct priority
+// (remote peer is free to reprioritize)
+// * the message implies remote peer wants more, so trigger for
+// * new outgoing unsynced keys message is fired
+func (self *Depo) HandleDeliveryRequestMsg(req *deliveryRequestMsgData, p *peer) error {
+ deliver := req.Deliver
+ // queue the actual delivery of a chunk ()
+ glog.V(logger.Detail).Infof("Depo.HandleDeliveryRequestMsg: received %v delivery requests: %v", len(deliver), deliver)
+ for _, sreq := range deliver {
+ // TODO: look up in cache here or in deliveries
+ // priorities are taken from the message so the remote party can
+ // reprioritise to at their leisure
+ // r = self.pullCached(sreq.Key) // pulls and deletes from cache
+ Push(p, sreq.Key, sreq.Priority)
+ }
+
+ // sends it out as unsyncedKeysMsg
+ p.syncer.sendUnsyncedKeys()
+ return nil
+}
+
+// the entrypoint for store requests coming from the bzz wire protocol
+// if key found locally, return. otherwise
+// remote is untrusted, so hash is verified and chunk passed on to NetStore
+func (self *Depo) HandleStoreRequestMsg(req *storeRequestMsgData, p *peer) {
+ req.from = p
+ chunk, err := self.localStore.Get(req.Key)
+ switch {
+ case err != nil:
+ glog.V(logger.Detail).Infof("Depo.handleStoreRequest: %v not found locally. create new chunk/request", req.Key)
+ // not found in memory cache, ie., a genuine store request
+ // create chunk
+ chunk = storage.NewChunk(req.Key, nil)
+
+ case chunk.SData == nil:
+ // found chunk in memory store, needs the data, validate now
+ hasher := self.hashfunc()
+ hasher.Write(req.SData)
+ if !bytes.Equal(hasher.Sum(nil), req.Key) {
+ // data does not validate, ignore
+ // TODO: peer should be penalised/dropped?
+ glog.V(logger.Warn).Infof("Depo.HandleStoreRequest: chunk invalid. store request ignored: %v", req)
+ return
+ }
+ glog.V(logger.Detail).Infof("Depo.HandleStoreRequest: %v. request entry found", req)
+
+ default:
+ // data is found, store request ignored
+ // this should update access count?
+ glog.V(logger.Detail).Infof("Depo.HandleStoreRequest: %v found locally. ignore.", req)
+ return
+ }
+
+ // update chunk with size and data
+ chunk.SData = req.SData // protocol validates that SData is minimum 9 bytes long (int64 size + at least one byte of data)
+ chunk.Size = int64(binary.LittleEndian.Uint64(req.SData[0:8]))
+ glog.V(logger.Detail).Infof("delivery of %p from %v", chunk, p)
+ chunk.Source = p
+ self.netStore.Put(chunk)
+}
+
+// entrypoint for retrieve requests coming from the bzz wire protocol
+// checks swap balance - return if peer has no credit
+func (self *Depo) HandleRetrieveRequestMsg(req *retrieveRequestMsgData, p *peer) {
+ req.from = p
+ // swap - record credit for 1 request
+ // note that only charge actual reqsearches
+ var err error
+ if p.swap != nil {
+ err = p.swap.Add(1)
+ }
+ if err != nil {
+ glog.V(logger.Warn).Infof("Depo.HandleRetrieveRequest: %v - cannot process request: %v", req.Key.Log(), err)
+ return
+ }
+
+ // call storage.NetStore#Get which
+ // blocks until local retrieval finished
+ // launches cloud retrieval
+ chunk, _ := self.netStore.Get(req.Key)
+ req = self.strategyUpdateRequest(chunk.Req, req)
+ // check if we can immediately deliver
+ if chunk.SData != nil {
+ glog.V(logger.Detail).Infof("Depo.HandleRetrieveRequest: %v - content found, delivering...", req.Key.Log())
+
+ if req.MaxSize == 0 || int64(req.MaxSize) >= chunk.Size {
+ sreq := &storeRequestMsgData{
+ Id: req.Id,
+ Key: chunk.Key,
+ SData: chunk.SData,
+ requestTimeout: req.timeout, //
+ }
+ p.syncer.addRequest(sreq, DeliverReq)
+ } else {
+ glog.V(logger.Detail).Infof("Depo.HandleRetrieveRequest: %v - content found, not wanted", req.Key.Log())
+ }
+ } else {
+ glog.V(logger.Detail).Infof("Depo.HandleRetrieveRequest: %v - content not found locally. asked swarm for help. will get back", req.Key.Log())
+ }
+}
+
+// add peer request the chunk and decides the timeout for the response if still searching
+func (self *Depo) strategyUpdateRequest(rs *storage.RequestStatus, origReq *retrieveRequestMsgData) (req *retrieveRequestMsgData) {
+ glog.V(logger.Detail).Infof("Depo.strategyUpdateRequest: key %v", origReq.Key.Log())
+ // we do not create an alternative one
+ req = origReq
+ if rs != nil {
+ self.addRequester(rs, req)
+ req.setTimeout(self.searchTimeout(rs, req))
+ }
+ return
+}
+
+// decides the timeout promise sent with the immediate peers response to a retrieve request
+// if timeout is explicitly set and expired
+func (self *Depo) searchTimeout(rs *storage.RequestStatus, req *retrieveRequestMsgData) (timeout *time.Time) {
+ reqt := req.getTimeout()
+ t := time.Now().Add(searchTimeout)
+ if reqt != nil && reqt.Before(t) {
+ return reqt
+ } else {
+ return &t
+ }
+}
+
+/*
+adds a new peer to an existing open request
+only add if less than requesterCount peers forwarded the same request id so far
+note this is done irrespective of status (searching or found)
+*/
+func (self *Depo) addRequester(rs *storage.RequestStatus, req *retrieveRequestMsgData) {
+ glog.V(logger.Detail).Infof("Depo.addRequester: key %v - add peer to req.Id %v", req.Key.Log(), req.from, req.Id)
+ list := rs.Requesters[req.Id]
+ rs.Requesters[req.Id] = append(list, req)
+}
diff --git a/swarm/network/forwarding.go b/swarm/network/forwarding.go
new file mode 100644
index 000000000..fef79c70b
--- /dev/null
+++ b/swarm/network/forwarding.go
@@ -0,0 +1,150 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package network
+
+import (
+ "math/rand"
+ "time"
+
+ "github.com/ethereum/go-ethereum/logger"
+ "github.com/ethereum/go-ethereum/logger/glog"
+ "github.com/ethereum/go-ethereum/swarm/storage"
+)
+
+const requesterCount = 3
+
+/*
+forwarder implements the CloudStore interface (use by storage.NetStore)
+and serves as the cloud store backend orchestrating storage/retrieval/delivery
+via the native bzz protocol
+which uses an MSB logarithmic distance-based semi-permanent Kademlia table for
+* recursive forwarding style routing for retrieval
+* smart syncronisation
+*/
+
+type forwarder struct {
+ hive *Hive
+}
+
+func NewForwarder(hive *Hive) *forwarder {
+ return &forwarder{hive: hive}
+}
+
+// generate a unique id uint64
+func generateId() uint64 {
+ r := rand.New(rand.NewSource(time.Now().UnixNano()))
+ return uint64(r.Int63())
+}
+
+var searchTimeout = 3 * time.Second
+
+// forwarding logic
+// logic propagating retrieve requests to peers given by the kademlia hive
+func (self *forwarder) Retrieve(chunk *storage.Chunk) {
+ peers := self.hive.getPeers(chunk.Key, 0)
+ glog.V(logger.Detail).Infof("forwarder.Retrieve: %v - received %d peers from KΛÐΞMLIΛ...", chunk.Key.Log(), len(peers))
+OUT:
+ for _, p := range peers {
+ glog.V(logger.Detail).Infof("forwarder.Retrieve: sending retrieveRequest %v to peer [%v]", chunk.Key.Log(), p)
+ for _, recipients := range chunk.Req.Requesters {
+ for _, recipient := range recipients {
+ req := recipient.(*retrieveRequestMsgData)
+ if req.from.Addr() == p.Addr() {
+ continue OUT
+ }
+ }
+ }
+ req := &retrieveRequestMsgData{
+ Key: chunk.Key,
+ Id: generateId(),
+ }
+ var err error
+ if p.swap != nil {
+ err = p.swap.Add(-1)
+ }
+ if err == nil {
+ p.retrieve(req)
+ break OUT
+ }
+ glog.V(logger.Warn).Infof("forwarder.Retrieve: unable to send retrieveRequest to peer [%v]: %v", chunk.Key.Log(), err)
+ }
+}
+
+// requests to specific peers given by the kademlia hive
+// except for peers that the store request came from (if any)
+// delivery queueing taken care of by syncer
+func (self *forwarder) Store(chunk *storage.Chunk) {
+ var n int
+ msg := &storeRequestMsgData{
+ Key: chunk.Key,
+ SData: chunk.SData,
+ }
+ var source *peer
+ if chunk.Source != nil {
+ source = chunk.Source.(*peer)
+ }
+ for _, p := range self.hive.getPeers(chunk.Key, 0) {
+ glog.V(logger.Detail).Infof("forwarder.Store: %v %v", p, chunk)
+
+ if p.syncer != nil && (source == nil || p.Addr() != source.Addr()) {
+ n++
+ Deliver(p, msg, PropagateReq)
+ }
+ }
+ glog.V(logger.Detail).Infof("forwarder.Store: sent to %v peers (chunk = %v)", n, chunk)
+}
+
+// once a chunk is found deliver it to its requesters unless timed out
+func (self *forwarder) Deliver(chunk *storage.Chunk) {
+ // iterate over request entries
+ for id, requesters := range chunk.Req.Requesters {
+ counter := requesterCount
+ msg := &storeRequestMsgData{
+ Key: chunk.Key,
+ SData: chunk.SData,
+ }
+ var n int
+ var req *retrieveRequestMsgData
+ // iterate over requesters with the same id
+ for id, r := range requesters {
+ req = r.(*retrieveRequestMsgData)
+ if req.timeout == nil || req.timeout.After(time.Now()) {
+ glog.V(logger.Detail).Infof("forwarder.Deliver: %v -> %v", req.Id, req.from)
+ msg.Id = uint64(id)
+ Deliver(req.from, msg, DeliverReq)
+ n++
+ counter--
+ if counter <= 0 {
+ break
+ }
+ }
+ }
+ glog.V(logger.Detail).Infof("forwarder.Deliver: submit chunk %v (request id %v) for delivery to %v peers", chunk.Key.Log(), id, n)
+ }
+}
+
+// initiate delivery of a chunk to a particular peer via syncer#addRequest
+// depending on syncer mode and priority settings and sync request type
+// this either goes via confirmation roundtrip or queued or pushed directly
+func Deliver(p *peer, req interface{}, ty int) {
+ p.syncer.addRequest(req, ty)
+}
+
+// push chunk over to peer
+func Push(p *peer, key storage.Key, priority uint) {
+ p.syncer.doDelivery(key, priority, p.syncer.quit)
+}
diff --git a/swarm/network/hive.go b/swarm/network/hive.go
new file mode 100644
index 000000000..f5ebdd008
--- /dev/null
+++ b/swarm/network/hive.go
@@ -0,0 +1,383 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package network
+
+import (
+ "fmt"
+ "math/rand"
+ "path/filepath"
+ "time"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/logger"
+ "github.com/ethereum/go-ethereum/logger/glog"
+ "github.com/ethereum/go-ethereum/p2p/discover"
+ "github.com/ethereum/go-ethereum/swarm/network/kademlia"
+ "github.com/ethereum/go-ethereum/swarm/storage"
+)
+
+// Hive is the logistic manager of the swarm
+// it uses a generic kademlia nodetable to find best peer list
+// for any target
+// this is used by the netstore to search for content in the swarm
+// the bzz protocol peersMsgData exchange is relayed to Kademlia
+// for db storage and filtering
+// connections and disconnections are reported and relayed
+// to keep the nodetable uptodate
+
+type Hive struct {
+ listenAddr func() string
+ callInterval uint64
+ id discover.NodeID
+ addr kademlia.Address
+ kad *kademlia.Kademlia
+ path string
+ quit chan bool
+ toggle chan bool
+ more chan bool
+
+ // for testing only
+ swapEnabled bool
+ syncEnabled bool
+ blockRead bool
+ blockWrite bool
+}
+
+const (
+ callInterval = 3000000000
+ // bucketSize = 3
+ // maxProx = 8
+ // proxBinSize = 4
+)
+
+type HiveParams struct {
+ CallInterval uint64
+ KadDbPath string
+ *kademlia.KadParams
+}
+
+func NewHiveParams(path string) *HiveParams {
+ kad := kademlia.NewKadParams()
+ // kad.BucketSize = bucketSize
+ // kad.MaxProx = maxProx
+ // kad.ProxBinSize = proxBinSize
+
+ return &HiveParams{
+ CallInterval: callInterval,
+ KadDbPath: filepath.Join(path, "bzz-peers.json"),
+ KadParams: kad,
+ }
+}
+
+func NewHive(addr common.Hash, params *HiveParams, swapEnabled, syncEnabled bool) *Hive {
+ kad := kademlia.New(kademlia.Address(addr), params.KadParams)
+ return &Hive{
+ callInterval: params.CallInterval,
+ kad: kad,
+ addr: kad.Addr(),
+ path: params.KadDbPath,
+ swapEnabled: swapEnabled,
+ syncEnabled: syncEnabled,
+ }
+}
+
+func (self *Hive) SyncEnabled(on bool) {
+ self.syncEnabled = on
+}
+
+func (self *Hive) SwapEnabled(on bool) {
+ self.swapEnabled = on
+}
+
+func (self *Hive) BlockNetworkRead(on bool) {
+ self.blockRead = on
+}
+
+func (self *Hive) BlockNetworkWrite(on bool) {
+ self.blockWrite = on
+}
+
+// public accessor to the hive base address
+func (self *Hive) Addr() kademlia.Address {
+ return self.addr
+}
+
+// Start receives network info only at startup
+// listedAddr is a function to retrieve listening address to advertise to peers
+// connectPeer is a function to connect to a peer based on its NodeID or enode URL
+// there are called on the p2p.Server which runs on the node
+func (self *Hive) Start(id discover.NodeID, listenAddr func() string, connectPeer func(string) error) (err error) {
+ self.toggle = make(chan bool)
+ self.more = make(chan bool)
+ self.quit = make(chan bool)
+ self.id = id
+ self.listenAddr = listenAddr
+ err = self.kad.Load(self.path, nil)
+ if err != nil {
+ glog.V(logger.Warn).Infof("Warning: error reading kaddb '%s' (skipping): %v", self.path, err)
+ err = nil
+ }
+ // this loop is doing bootstrapping and maintains a healthy table
+ go self.keepAlive()
+ go func() {
+ // whenever toggled ask kademlia about most preferred peer
+ for alive := range self.more {
+ if !alive {
+ // receiving false closes the loop while allowing parallel routines
+ // to attempt to write to more (remove Peer when shutting down)
+ return
+ }
+ node, need, proxLimit := self.kad.Suggest()
+
+ if node != nil && len(node.Url) > 0 {
+ glog.V(logger.Detail).Infof("call known bee %v", node.Url)
+ // enode or any lower level connection address is unnecessary in future
+ // discovery table is used to look it up.
+ connectPeer(node.Url)
+ }
+ if need {
+ // a random peer is taken from the table
+ peers := self.kad.FindClosest(kademlia.RandomAddressAt(self.addr, rand.Intn(self.kad.MaxProx)), 1)
+ if len(peers) > 0 {
+ // a random address at prox bin 0 is sent for lookup
+ randAddr := kademlia.RandomAddressAt(self.addr, proxLimit)
+ req := &retrieveRequestMsgData{
+ Key: storage.Key(randAddr[:]),
+ }
+ glog.V(logger.Detail).Infof("call any bee near %v (PO%03d) - messenger bee: %v", randAddr, proxLimit, peers[0])
+ peers[0].(*peer).retrieve(req)
+ } else {
+ glog.V(logger.Warn).Infof("no peer")
+ }
+ glog.V(logger.Detail).Infof("buzz kept alive")
+ } else {
+ glog.V(logger.Info).Infof("no need for more bees")
+ }
+ select {
+ case self.toggle <- need:
+ case <-self.quit:
+ return
+ }
+ glog.V(logger.Debug).Infof("queen's address: %v, population: %d (%d)", self.addr, self.kad.Count(), self.kad.DBCount())
+ }
+ }()
+ return
+}
+
+// keepAlive is a forever loop
+// in its awake state it periodically triggers connection attempts
+// by writing to self.more until Kademlia Table is saturated
+// wake state is toggled by writing to self.toggle
+// it restarts if the table becomes non-full again due to disconnections
+func (self *Hive) keepAlive() {
+ alarm := time.NewTicker(time.Duration(self.callInterval)).C
+ for {
+ select {
+ case <-alarm:
+ if self.kad.DBCount() > 0 {
+ select {
+ case self.more <- true:
+ glog.V(logger.Debug).Infof("buzz wakeup")
+ default:
+ }
+ }
+ case need := <-self.toggle:
+ if alarm == nil && need {
+ alarm = time.NewTicker(time.Duration(self.callInterval)).C
+ }
+ if alarm != nil && !need {
+ alarm = nil
+
+ }
+ case <-self.quit:
+ return
+ }
+ }
+}
+
+func (self *Hive) Stop() error {
+ // closing toggle channel quits the updateloop
+ close(self.quit)
+ return self.kad.Save(self.path, saveSync)
+}
+
+// called at the end of a successful protocol handshake
+func (self *Hive) addPeer(p *peer) error {
+ defer func() {
+ select {
+ case self.more <- true:
+ default:
+ }
+ }()
+ glog.V(logger.Detail).Infof("hi new bee %v", p)
+ err := self.kad.On(p, loadSync)
+ if err != nil {
+ return err
+ }
+ // self lookup (can be encoded as nil/zero key since peers addr known) + no id ()
+ // the most common way of saying hi in bzz is initiation of gossip
+ // let me know about anyone new from my hood , here is the storageradius
+ // to send the 6 byte self lookup
+ // we do not record as request or forward it, just reply with peers
+ p.retrieve(&retrieveRequestMsgData{})
+ glog.V(logger.Detail).Infof("'whatsup wheresdaparty' sent to %v", p)
+
+ return nil
+}
+
+// called after peer disconnected
+func (self *Hive) removePeer(p *peer) {
+ glog.V(logger.Debug).Infof("bee %v removed", p)
+ self.kad.Off(p, saveSync)
+ select {
+ case self.more <- true:
+ default:
+ }
+ if self.kad.Count() == 0 {
+ glog.V(logger.Debug).Infof("empty, all bees gone")
+ }
+}
+
+// Retrieve a list of live peers that are closer to target than us
+func (self *Hive) getPeers(target storage.Key, max int) (peers []*peer) {
+ var addr kademlia.Address
+ copy(addr[:], target[:])
+ for _, node := range self.kad.FindClosest(addr, max) {
+ peers = append(peers, node.(*peer))
+ }
+ return
+}
+
+// disconnects all the peers
+func (self *Hive) DropAll() {
+ glog.V(logger.Info).Infof("dropping all bees")
+ for _, node := range self.kad.FindClosest(kademlia.Address{}, 0) {
+ node.Drop()
+ }
+}
+
+// contructor for kademlia.NodeRecord based on peer address alone
+// TODO: should go away and only addr passed to kademlia
+func newNodeRecord(addr *peerAddr) *kademlia.NodeRecord {
+ now := time.Now()
+ return &kademlia.NodeRecord{
+ Addr: addr.Addr,
+ Url: addr.String(),
+ Seen: now,
+ After: now,
+ }
+}
+
+// called by the protocol when receiving peerset (for target address)
+// peersMsgData is converted to a slice of NodeRecords for Kademlia
+// this is to store all thats needed
+func (self *Hive) HandlePeersMsg(req *peersMsgData, from *peer) {
+ var nrs []*kademlia.NodeRecord
+ for _, p := range req.Peers {
+ nrs = append(nrs, newNodeRecord(p))
+ }
+ self.kad.Add(nrs)
+}
+
+// peer wraps the protocol instance to represent a connected peer
+// it implements kademlia.Node interface
+type peer struct {
+ *bzz // protocol instance running on peer connection
+}
+
+// protocol instance implements kademlia.Node interface (embedded peer)
+func (self *peer) Addr() kademlia.Address {
+ return self.remoteAddr.Addr
+}
+
+func (self *peer) Url() string {
+ return self.remoteAddr.String()
+}
+
+// TODO take into account traffic
+func (self *peer) LastActive() time.Time {
+ return self.lastActive
+}
+
+// reads the serialised form of sync state persisted as the 'Meta' attribute
+// and sets the decoded syncState on the online node
+func loadSync(record *kademlia.NodeRecord, node kademlia.Node) error {
+ p, ok := node.(*peer)
+ if !ok {
+ return fmt.Errorf("invalid type")
+ }
+ if record.Meta == nil {
+ glog.V(logger.Debug).Infof("no sync state for node record %v setting default", record)
+ p.syncState = &syncState{DbSyncState: &storage.DbSyncState{}}
+ return nil
+ }
+ state, err := decodeSync(record.Meta)
+ if err != nil {
+ return fmt.Errorf("error decoding kddb record meta info into a sync state: %v", err)
+ }
+ glog.V(logger.Detail).Infof("sync state for node record %v read from Meta: %s", record, string(*(record.Meta)))
+ p.syncState = state
+ return err
+}
+
+// callback when saving a sync state
+func saveSync(record *kademlia.NodeRecord, node kademlia.Node) {
+ if p, ok := node.(*peer); ok {
+ meta, err := encodeSync(p.syncState)
+ if err != nil {
+ glog.V(logger.Warn).Infof("error saving sync state for %v: %v", node, err)
+ return
+ }
+ glog.V(logger.Detail).Infof("saved sync state for %v: %s", node, string(*meta))
+ record.Meta = meta
+ }
+}
+
+// the immediate response to a retrieve request,
+// sends relevant peer data given by the kademlia hive to the requester
+// TODO: remember peers sent for duration of the session, only new peers sent
+func (self *Hive) peers(req *retrieveRequestMsgData) {
+ if req != nil && req.MaxPeers >= 0 {
+ var addrs []*peerAddr
+ if req.timeout == nil || time.Now().Before(*(req.timeout)) {
+ key := req.Key
+ // self lookup from remote peer
+ if storage.IsZeroKey(key) {
+ addr := req.from.Addr()
+ key = storage.Key(addr[:])
+ req.Key = nil
+ }
+ // get peer addresses from hive
+ for _, peer := range self.getPeers(key, int(req.MaxPeers)) {
+ addrs = append(addrs, peer.remoteAddr)
+ }
+ glog.V(logger.Debug).Infof("Hive sending %d peer addresses to %v. req.Id: %v, req.Key: %v", len(addrs), req.from, req.Id, req.Key.Log())
+
+ peersData := &peersMsgData{
+ Peers: addrs,
+ Key: req.Key,
+ Id: req.Id,
+ }
+ peersData.setTimeout(req.timeout)
+ req.from.peers(peersData)
+ }
+ }
+}
+
+func (self *Hive) String() string {
+ return self.kad.String()
+}
diff --git a/swarm/network/kademlia/address.go b/swarm/network/kademlia/address.go
new file mode 100644
index 000000000..16c5ce532
--- /dev/null
+++ b/swarm/network/kademlia/address.go
@@ -0,0 +1,173 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package kademlia
+
+import (
+ "fmt"
+ "math/rand"
+ "strings"
+
+ "github.com/ethereum/go-ethereum/common"
+)
+
+type Address common.Hash
+
+func (a Address) String() string {
+ return fmt.Sprintf("%x", a[:])
+}
+
+func (a *Address) MarshalJSON() (out []byte, err error) {
+ return []byte(`"` + a.String() + `"`), nil
+}
+
+func (a *Address) UnmarshalJSON(value []byte) error {
+ *a = Address(common.HexToHash(string(value[1 : len(value)-1])))
+ return nil
+}
+
+// the string form of the binary representation of an address (only first 8 bits)
+func (a Address) Bin() string {
+ var bs []string
+ for _, b := range a[:] {
+ bs = append(bs, fmt.Sprintf("%08b", b))
+ }
+ return strings.Join(bs, "")
+}
+
+/*
+Proximity(x, y) returns the proximity order of the MSB distance between x and y
+
+The distance metric MSB(x, y) of two equal length byte sequences x an y is the
+value of the binary integer cast of the x^y, ie., x and y bitwise xor-ed.
+the binary cast is big endian: most significant bit first (=MSB).
+
+Proximity(x, y) is a discrete logarithmic scaling of the MSB distance.
+It is defined as the reverse rank of the integer part of the base 2
+logarithm of the distance.
+It is calculated by counting the number of common leading zeros in the (MSB)
+binary representation of the x^y.
+
+(0 farthest, 255 closest, 256 self)
+*/
+func proximity(one, other Address) (ret int) {
+ for i := 0; i < len(one); i++ {
+ oxo := one[i] ^ other[i]
+ for j := 0; j < 8; j++ {
+ if (uint8(oxo)>>uint8(7-j))&0x01 != 0 {
+ return i*8 + j
+ }
+ }
+ }
+ return len(one) * 8
+}
+
+// Address.ProxCmp compares the distances a->target and b->target.
+// Returns -1 if a is closer to target, 1 if b is closer to target
+// and 0 if they are equal.
+func (target Address) ProxCmp(a, b Address) int {
+ for i := range target {
+ da := a[i] ^ target[i]
+ db := b[i] ^ target[i]
+ if da > db {
+ return 1
+ } else if da < db {
+ return -1
+ }
+ }
+ return 0
+}
+
+// randomAddressAt(address, prox) generates a random address
+// at proximity order prox relative to address
+// if prox is negative a random address is generated
+func RandomAddressAt(self Address, prox int) (addr Address) {
+ addr = self
+ var pos int
+ if prox >= 0 {
+ pos = prox / 8
+ trans := prox % 8
+ transbytea := byte(0)
+ for j := 0; j <= trans; j++ {
+ transbytea |= 1 << uint8(7-j)
+ }
+ flipbyte := byte(1 << uint8(7-trans))
+ transbyteb := transbytea ^ byte(255)
+ randbyte := byte(rand.Intn(255))
+ addr[pos] = ((addr[pos] & transbytea) ^ flipbyte) | randbyte&transbyteb
+ }
+ for i := pos + 1; i < len(addr); i++ {
+ addr[i] = byte(rand.Intn(255))
+ }
+
+ return
+}
+
+// KeyRange(a0, a1, proxLimit) returns the address inclusive address
+// range that contain addresses closer to one than other
+func KeyRange(one, other Address, proxLimit int) (start, stop Address) {
+ prox := proximity(one, other)
+ if prox >= proxLimit {
+ prox = proxLimit
+ }
+ start = CommonBitsAddrByte(one, other, byte(0x00), prox)
+ stop = CommonBitsAddrByte(one, other, byte(0xff), prox)
+ return
+}
+
+func CommonBitsAddrF(self, other Address, f func() byte, p int) (addr Address) {
+ prox := proximity(self, other)
+ var pos int
+ if p <= prox {
+ prox = p
+ }
+ pos = prox / 8
+ addr = self
+ trans := byte(prox % 8)
+ var transbytea byte
+ if p > prox {
+ transbytea = byte(0x7f)
+ } else {
+ transbytea = byte(0xff)
+ }
+ transbytea >>= trans
+ transbyteb := transbytea ^ byte(0xff)
+ addrpos := addr[pos]
+ addrpos &= transbyteb
+ if p > prox {
+ addrpos ^= byte(0x80 >> trans)
+ }
+ addrpos |= transbytea & f()
+ addr[pos] = addrpos
+ for i := pos + 1; i < len(addr); i++ {
+ addr[i] = f()
+ }
+
+ return
+}
+
+func CommonBitsAddr(self, other Address, prox int) (addr Address) {
+ return CommonBitsAddrF(self, other, func() byte { return byte(rand.Intn(255)) }, prox)
+}
+
+func CommonBitsAddrByte(self, other Address, b byte, prox int) (addr Address) {
+ return CommonBitsAddrF(self, other, func() byte { return b }, prox)
+}
+
+// randomAddressAt() generates a random address
+func RandomAddress() Address {
+ return RandomAddressAt(Address{}, -1)
+}
diff --git a/swarm/network/kademlia/address_test.go b/swarm/network/kademlia/address_test.go
new file mode 100644
index 000000000..c062c8eaf
--- /dev/null
+++ b/swarm/network/kademlia/address_test.go
@@ -0,0 +1,96 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package kademlia
+
+import (
+ "math/rand"
+ "reflect"
+ "testing"
+
+ "github.com/ethereum/go-ethereum/common"
+)
+
+func (Address) Generate(rand *rand.Rand, size int) reflect.Value {
+ var id Address
+ for i := 0; i < len(id); i++ {
+ id[i] = byte(uint8(rand.Intn(255)))
+ }
+ return reflect.ValueOf(id)
+}
+
+func TestCommonBitsAddrF(t *testing.T) {
+ a := Address(common.HexToHash("0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"))
+ b := Address(common.HexToHash("0x8123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"))
+ c := Address(common.HexToHash("0x4123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"))
+ d := Address(common.HexToHash("0x0023456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"))
+ e := Address(common.HexToHash("0x01A3456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"))
+ ab := CommonBitsAddrF(a, b, func() byte { return byte(0x00) }, 10)
+ expab := Address(common.HexToHash("0x8000000000000000000000000000000000000000000000000000000000000000"))
+
+ if ab != expab {
+ t.Fatalf("%v != %v", ab, expab)
+ }
+ ac := CommonBitsAddrF(a, c, func() byte { return byte(0x00) }, 10)
+ expac := Address(common.HexToHash("0x4000000000000000000000000000000000000000000000000000000000000000"))
+
+ if ac != expac {
+ t.Fatalf("%v != %v", ac, expac)
+ }
+ ad := CommonBitsAddrF(a, d, func() byte { return byte(0x00) }, 10)
+ expad := Address(common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000000"))
+
+ if ad != expad {
+ t.Fatalf("%v != %v", ad, expad)
+ }
+ ae := CommonBitsAddrF(a, e, func() byte { return byte(0x00) }, 10)
+ expae := Address(common.HexToHash("0x0180000000000000000000000000000000000000000000000000000000000000"))
+
+ if ae != expae {
+ t.Fatalf("%v != %v", ae, expae)
+ }
+ acf := CommonBitsAddrF(a, c, func() byte { return byte(0xff) }, 10)
+ expacf := Address(common.HexToHash("0x7fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"))
+
+ if acf != expacf {
+ t.Fatalf("%v != %v", acf, expacf)
+ }
+ aeo := CommonBitsAddrF(a, e, func() byte { return byte(0x00) }, 2)
+ expaeo := Address(common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000000"))
+
+ if aeo != expaeo {
+ t.Fatalf("%v != %v", aeo, expaeo)
+ }
+ aep := CommonBitsAddrF(a, e, func() byte { return byte(0xff) }, 2)
+ expaep := Address(common.HexToHash("0x3fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"))
+
+ if aep != expaep {
+ t.Fatalf("%v != %v", aep, expaep)
+ }
+
+}
+
+func TestRandomAddressAt(t *testing.T) {
+ var a Address
+ for i := 0; i < 100; i++ {
+ a = RandomAddress()
+ prox := rand.Intn(255)
+ b := RandomAddressAt(a, prox)
+ if proximity(a, b) != prox {
+ t.Fatalf("incorrect address prox(%v, %v) == %v (expected %v)", a, b, proximity(a, b), prox)
+ }
+ }
+}
diff --git a/swarm/network/kademlia/kaddb.go b/swarm/network/kademlia/kaddb.go
new file mode 100644
index 000000000..53a7db451
--- /dev/null
+++ b/swarm/network/kademlia/kaddb.go
@@ -0,0 +1,351 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package kademlia
+
+import (
+ "encoding/json"
+ "fmt"
+ "io/ioutil"
+ "os"
+ "sync"
+ "time"
+
+ "github.com/ethereum/go-ethereum/logger"
+ "github.com/ethereum/go-ethereum/logger/glog"
+)
+
+type NodeData interface {
+ json.Marshaler
+ json.Unmarshaler
+}
+
+// allow inactive peers under
+type NodeRecord struct {
+ Addr Address // address of node
+ Url string // Url, used to connect to node
+ After time.Time // next call after time
+ Seen time.Time // last connected at time
+ Meta *json.RawMessage // arbitrary metadata saved for a peer
+
+ node Node
+}
+
+func (self *NodeRecord) setSeen() {
+ t := time.Now()
+ self.Seen = t
+ self.After = t
+}
+
+func (self *NodeRecord) String() string {
+ return fmt.Sprintf("<%v>", self.Addr)
+}
+
+// persisted node record database ()
+type KadDb struct {
+ Address Address
+ Nodes [][]*NodeRecord
+ index map[Address]*NodeRecord
+ cursors []int
+ lock sync.RWMutex
+ purgeInterval time.Duration
+ initialRetryInterval time.Duration
+ connRetryExp int
+}
+
+func newKadDb(addr Address, params *KadParams) *KadDb {
+ return &KadDb{
+ Address: addr,
+ Nodes: make([][]*NodeRecord, params.MaxProx+1), // overwritten by load
+ cursors: make([]int, params.MaxProx+1),
+ index: make(map[Address]*NodeRecord),
+ purgeInterval: params.PurgeInterval,
+ initialRetryInterval: params.InitialRetryInterval,
+ connRetryExp: params.ConnRetryExp,
+ }
+}
+
+func (self *KadDb) findOrCreate(index int, a Address, url string) *NodeRecord {
+ defer self.lock.Unlock()
+ self.lock.Lock()
+
+ record, found := self.index[a]
+ if !found {
+ record = &NodeRecord{
+ Addr: a,
+ Url: url,
+ }
+ glog.V(logger.Info).Infof("add new record %v to kaddb", record)
+ // insert in kaddb
+ self.index[a] = record
+ self.Nodes[index] = append(self.Nodes[index], record)
+ } else {
+ glog.V(logger.Info).Infof("found record %v in kaddb", record)
+ }
+ // update last seen time
+ record.setSeen()
+ // update with url in case IP/port changes
+ record.Url = url
+ return record
+}
+
+// add adds node records to kaddb (persisted node record db)
+func (self *KadDb) add(nrs []*NodeRecord, proximityBin func(Address) int) {
+ defer self.lock.Unlock()
+ self.lock.Lock()
+ var n int
+ var nodes []*NodeRecord
+ for _, node := range nrs {
+ _, found := self.index[node.Addr]
+ if !found && node.Addr != self.Address {
+ node.setSeen()
+ self.index[node.Addr] = node
+ index := proximityBin(node.Addr)
+ dbcursor := self.cursors[index]
+ nodes = self.Nodes[index]
+ // this is inefficient for allocation, need to just append then shift
+ newnodes := make([]*NodeRecord, len(nodes)+1)
+ copy(newnodes[:], nodes[:dbcursor])
+ newnodes[dbcursor] = node
+ copy(newnodes[dbcursor+1:], nodes[dbcursor:])
+ glog.V(logger.Detail).Infof("new nodes: %v (keys: %v)\nnodes: %v", newnodes, nodes)
+ self.Nodes[index] = newnodes
+ n++
+ }
+ }
+ if n > 0 {
+ glog.V(logger.Debug).Infof("%d/%d node records (new/known)", n, len(nrs))
+ }
+}
+
+/*
+next return one node record with the highest priority for desired
+connection.
+This is used to pick candidates for live nodes that are most wanted for
+a higly connected low centrality network structure for Swarm which best suits
+for a Kademlia-style routing.
+
+* Starting as naive node with empty db, this implements Kademlia bootstrapping
+* As a mature node, it fills short lines. All on demand.
+
+The candidate is chosen using the following strategy:
+We check for missing online nodes in the buckets for 1 upto Max BucketSize rounds.
+On each round we proceed from the low to high proximity order buckets.
+If the number of active nodes (=connected peers) is < rounds, then start looking
+for a known candidate. To determine if there is a candidate to recommend the
+kaddb node record database row corresponding to the bucket is checked.
+
+If the row cursor is on position i, the ith element in the row is chosen.
+If the record is scheduled not to be retried before NOW, the next element is taken.
+If the record is scheduled to be retried, it is set as checked, scheduled for
+checking and is returned. The time of the next check is in X (duration) such that
+X = ConnRetryExp * delta where delta is the time past since the last check and
+ConnRetryExp is constant obsoletion factor. (Note that when node records are added
+from peer messages, they are marked as checked and placed at the cursor, ie.
+given priority over older entries). Entries which were checked more than
+purgeInterval ago are deleted from the kaddb row. If no candidate is found after
+a full round of checking the next bucket up is considered. If no candidate is
+found when we reach the maximum-proximity bucket, the next round starts.
+
+node record a is more favoured to b a > b iff a is a passive node (record of
+offline past peer)
+|proxBin(a)| < |proxBin(b)|
+|| (proxBin(a) < proxBin(b) && |proxBin(a)| == |proxBin(b)|)
+|| (proxBin(a) == proxBin(b) && lastChecked(a) < lastChecked(b))
+
+
+The second argument returned names the first missing slot found
+*/
+func (self *KadDb) findBest(maxBinSize int, binSize func(int) int) (node *NodeRecord, need bool, proxLimit int) {
+ // return nil, proxLimit indicates that all buckets are filled
+ defer self.lock.Unlock()
+ self.lock.Lock()
+
+ var interval time.Duration
+ var found bool
+ var purge []bool
+ var delta time.Duration
+ var cursor int
+ var count int
+ var after time.Time
+
+ // iterate over columns maximum bucketsize times
+ for rounds := 1; rounds <= maxBinSize; rounds++ {
+ ROUND:
+ // iterate over rows from PO 0 upto MaxProx
+ for po, dbrow := range self.Nodes {
+ // if row has rounds connected peers, then take the next
+ if binSize(po) >= rounds {
+ continue ROUND
+ }
+ if !need {
+ // set proxlimit to the PO where the first missing slot is found
+ proxLimit = po
+ need = true
+ }
+ purge = make([]bool, len(dbrow))
+
+ // there is a missing slot - finding a node to connect to
+ // select a node record from the relavant kaddb row (of identical prox order)
+ ROW:
+ for cursor = self.cursors[po]; !found && count < len(dbrow); cursor = (cursor + 1) % len(dbrow) {
+ count++
+ node = dbrow[cursor]
+
+ // skip already connected nodes
+ if node.node != nil {
+ glog.V(logger.Debug).Infof("kaddb record %v (PO%03d:%d/%d) already connected", node.Addr, po, cursor, len(dbrow))
+ continue ROW
+ }
+
+ // if node is scheduled to connect
+ if time.Time(node.After).After(time.Now()) {
+ glog.V(logger.Debug).Infof("kaddb record %v (PO%03d:%d) skipped. seen at %v (%v ago), scheduled at %v", node.Addr, po, cursor, node.Seen, delta, node.After)
+ continue ROW
+ }
+
+ delta = time.Since(time.Time(node.Seen))
+ if delta < self.initialRetryInterval {
+ delta = self.initialRetryInterval
+ }
+ if delta > self.purgeInterval {
+ // remove node
+ purge[cursor] = true
+ glog.V(logger.Debug).Infof("kaddb record %v (PO%03d:%d) unreachable since %v. Removed", node.Addr, po, cursor, node.Seen)
+ continue ROW
+ }
+
+ glog.V(logger.Debug).Infof("kaddb record %v (PO%03d:%d) ready to be tried. seen at %v (%v ago), scheduled at %v", node.Addr, po, cursor, node.Seen, delta, node.After)
+
+ // scheduling next check
+ interval = time.Duration(delta * time.Duration(self.connRetryExp))
+ after = time.Now().Add(interval)
+
+ glog.V(logger.Debug).Infof("kaddb record %v (PO%03d:%d) selected as candidate connection %v. seen at %v (%v ago), selectable since %v, retry after %v (in %v)", node.Addr, po, cursor, rounds, node.Seen, delta, node.After, after, interval)
+ node.After = after
+ found = true
+ } // ROW
+ self.cursors[po] = cursor
+ self.delete(po, purge)
+ if found {
+ return node, need, proxLimit
+ }
+ } // ROUND
+ } // ROUNDS
+
+ return nil, need, proxLimit
+}
+
+// deletes the noderecords of a kaddb row corresponding to the indexes
+// caller must hold the dblock
+// the call is unsafe, no index checks
+func (self *KadDb) delete(row int, purge []bool) {
+ var nodes []*NodeRecord
+ dbrow := self.Nodes[row]
+ for i, del := range purge {
+ if i == self.cursors[row] {
+ //reset cursor
+ self.cursors[row] = len(nodes)
+ }
+ // delete the entry to be purged
+ if del {
+ delete(self.index, dbrow[i].Addr)
+ continue
+ }
+ // otherwise append to new list
+ nodes = append(nodes, dbrow[i])
+ }
+ self.Nodes[row] = nodes
+}
+
+// save persists kaddb on disk (written to file on path in json format.
+func (self *KadDb) save(path string, cb func(*NodeRecord, Node)) error {
+ defer self.lock.Unlock()
+ self.lock.Lock()
+
+ var n int
+
+ for _, b := range self.Nodes {
+ for _, node := range b {
+ n++
+ node.After = time.Now()
+ node.Seen = time.Now()
+ if cb != nil {
+ cb(node, node.node)
+ }
+ }
+ }
+
+ data, err := json.MarshalIndent(self, "", " ")
+ if err != nil {
+ return err
+ }
+ err = ioutil.WriteFile(path, data, os.ModePerm)
+ if err != nil {
+ glog.V(logger.Warn).Infof("unable to save kaddb with %v nodes to %v: err", n, path, err)
+ } else {
+ glog.V(logger.Info).Infof("saved kaddb with %v nodes to %v", n, path)
+ }
+ return err
+}
+
+// Load(path) loads the node record database (kaddb) from file on path.
+func (self *KadDb) load(path string, cb func(*NodeRecord, Node) error) (err error) {
+ defer self.lock.Unlock()
+ self.lock.Lock()
+
+ var data []byte
+ data, err = ioutil.ReadFile(path)
+ if err != nil {
+ return
+ }
+
+ err = json.Unmarshal(data, self)
+ if err != nil {
+ return
+ }
+ var n int
+ var purge []bool
+ for po, b := range self.Nodes {
+ purge = make([]bool, len(b))
+ ROW:
+ for i, node := range b {
+ if cb != nil {
+ err = cb(node, node.node)
+ if err != nil {
+ purge[i] = true
+ continue ROW
+ }
+ }
+ n++
+ if (node.After == time.Time{}) {
+ node.After = time.Now()
+ }
+ self.index[node.Addr] = node
+ }
+ self.delete(po, purge)
+ }
+ glog.V(logger.Info).Infof("loaded kaddb with %v nodes from %v", n, path)
+
+ return
+}
+
+// accessor for KAD offline db count
+func (self *KadDb) count() int {
+ defer self.lock.Unlock()
+ self.lock.Lock()
+ return len(self.index)
+}
diff --git a/swarm/network/kademlia/kademlia.go b/swarm/network/kademlia/kademlia.go
new file mode 100644
index 000000000..87c57cefe
--- /dev/null
+++ b/swarm/network/kademlia/kademlia.go
@@ -0,0 +1,429 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package kademlia
+
+import (
+ "fmt"
+ "sort"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/ethereum/go-ethereum/logger"
+ "github.com/ethereum/go-ethereum/logger/glog"
+)
+
+const (
+ bucketSize = 4
+ proxBinSize = 2
+ maxProx = 8
+ connRetryExp = 2
+ maxPeers = 100
+)
+
+var (
+ purgeInterval = 42 * time.Hour
+ initialRetryInterval = 42 * time.Millisecond
+ maxIdleInterval = 42 * 1000 * time.Millisecond
+ // maxIdleInterval = 42 * 10 0 * time.Millisecond
+)
+
+type KadParams struct {
+ // adjustable parameters
+ MaxProx int
+ ProxBinSize int
+ BucketSize int
+ PurgeInterval time.Duration
+ InitialRetryInterval time.Duration
+ MaxIdleInterval time.Duration
+ ConnRetryExp int
+}
+
+func NewKadParams() *KadParams {
+ return &KadParams{
+ MaxProx: maxProx,
+ ProxBinSize: proxBinSize,
+ BucketSize: bucketSize,
+ PurgeInterval: purgeInterval,
+ InitialRetryInterval: initialRetryInterval,
+ MaxIdleInterval: maxIdleInterval,
+ ConnRetryExp: connRetryExp,
+ }
+}
+
+// Kademlia is a table of active nodes
+type Kademlia struct {
+ addr Address // immutable baseaddress of the table
+ *KadParams // Kademlia configuration parameters
+ proxLimit int // state, the PO of the first row of the most proximate bin
+ proxSize int // state, the number of peers in the most proximate bin
+ count int // number of active peers (w live connection)
+ buckets [][]Node // the actual bins
+ db *KadDb // kaddb, node record database
+ lock sync.RWMutex // mutex to access buckets
+}
+
+type Node interface {
+ Addr() Address
+ Url() string
+ LastActive() time.Time
+ Drop()
+}
+
+// public constructor
+// add is the base address of the table
+// params is KadParams configuration
+func New(addr Address, params *KadParams) *Kademlia {
+ buckets := make([][]Node, params.MaxProx+1)
+ return &Kademlia{
+ addr: addr,
+ KadParams: params,
+ buckets: buckets,
+ db: newKadDb(addr, params),
+ }
+}
+
+// accessor for KAD base address
+func (self *Kademlia) Addr() Address {
+ return self.addr
+}
+
+// accessor for KAD active node count
+func (self *Kademlia) Count() int {
+ defer self.lock.Unlock()
+ self.lock.Lock()
+ return self.count
+}
+
+// accessor for KAD active node count
+func (self *Kademlia) DBCount() int {
+ return self.db.count()
+}
+
+// On is the entry point called when a new nodes is added
+// unsafe in that node is not checked to be already active node (to be called once)
+func (self *Kademlia) On(node Node, cb func(*NodeRecord, Node) error) (err error) {
+ glog.V(logger.Warn).Infof("%v", self)
+ defer self.lock.Unlock()
+ self.lock.Lock()
+
+ index := self.proximityBin(node.Addr())
+ record := self.db.findOrCreate(index, node.Addr(), node.Url())
+
+ if cb != nil {
+ err = cb(record, node)
+ glog.V(logger.Detail).Infof("cb(%v, %v) ->%v", record, node, err)
+ if err != nil {
+ return fmt.Errorf("unable to add node %v, callback error: %v", node.Addr(), err)
+ }
+ glog.V(logger.Debug).Infof("add node record %v with node %v", record, node)
+ }
+
+ // insert in kademlia table of active nodes
+ bucket := self.buckets[index]
+ // if bucket is full insertion replaces the worst node
+ // TODO: give priority to peers with active traffic
+ if len(bucket) < self.BucketSize { // >= allows us to add peers beyond the bucketsize limitation
+ self.buckets[index] = append(bucket, node)
+ glog.V(logger.Debug).Infof("add node %v to table", node)
+ self.setProxLimit(index, true)
+ record.node = node
+ self.count++
+ return nil
+ }
+
+ // always rotate peers
+ idle := self.MaxIdleInterval
+ var pos int
+ var replaced Node
+ for i, p := range bucket {
+ idleInt := time.Since(p.LastActive())
+ if idleInt > idle {
+ idle = idleInt
+ pos = i
+ replaced = p
+ }
+ }
+ if replaced == nil {
+ glog.V(logger.Debug).Infof("all peers wanted, PO%03d bucket full", index)
+ return fmt.Errorf("bucket full")
+ }
+ glog.V(logger.Debug).Infof("node %v replaced by %v (idle for %v > %v)", replaced, node, idle, self.MaxIdleInterval)
+ replaced.Drop()
+ // actually replace in the row. When off(node) is called, the peer is no longer in the row
+ bucket[pos] = node
+ // there is no change in bucket cardinalities so no prox limit adjustment is needed
+ record.node = node
+ self.count++
+ return nil
+
+}
+
+// Off is the called when a node is taken offline (from the protocol main loop exit)
+func (self *Kademlia) Off(node Node, cb func(*NodeRecord, Node)) (err error) {
+ self.lock.Lock()
+ defer self.lock.Unlock()
+
+ index := self.proximityBin(node.Addr())
+ bucket := self.buckets[index]
+ for i := 0; i < len(bucket); i++ {
+ if node.Addr() == bucket[i].Addr() {
+ self.buckets[index] = append(bucket[:i], bucket[(i+1):]...)
+ self.setProxLimit(index, false)
+ break
+ }
+ }
+
+ record := self.db.index[node.Addr()]
+ // callback on remove
+ if cb != nil {
+ cb(record, record.node)
+ }
+ record.node = nil
+ self.count--
+ glog.V(logger.Debug).Infof("remove node %v from table, population now is %v", node, self.count)
+
+ return
+}
+
+// proxLimit is dynamically adjusted so that
+// 1) there is no empty buckets in bin < proxLimit and
+// 2) the sum of all items are the minimum possible but higher than ProxBinSize
+// adjust Prox (proxLimit and proxSize after an insertion/removal of nodes)
+// caller holds the lock
+func (self *Kademlia) setProxLimit(r int, on bool) {
+ // if the change is outside the core (PO lower)
+ // and the change does not leave a bucket empty then
+ // no adjustment needed
+ if r < self.proxLimit && len(self.buckets[r]) > 0 {
+ return
+ }
+ // if on=a node was added, then r must be within prox limit so increment cardinality
+ if on {
+ self.proxSize++
+ curr := len(self.buckets[self.proxLimit])
+ // if now core is big enough without the furthest bucket, then contract
+ // this can result in more than one bucket change
+ for self.proxSize >= self.ProxBinSize+curr && curr > 0 {
+ self.proxSize -= curr
+ self.proxLimit++
+ curr = len(self.buckets[self.proxLimit])
+
+ glog.V(logger.Detail).Infof("proxbin contraction (size: %v, limit: %v, bin: %v)", self.proxSize, self.proxLimit, r)
+ }
+ return
+ }
+ // otherwise
+ if r >= self.proxLimit {
+ self.proxSize--
+ }
+ // expand core by lowering prox limit until hit zero or cover the empty bucket or reached target cardinality
+ for (self.proxSize < self.ProxBinSize || r < self.proxLimit) &&
+ self.proxLimit > 0 {
+ //
+ self.proxLimit--
+ self.proxSize += len(self.buckets[self.proxLimit])
+ glog.V(logger.Detail).Infof("proxbin expansion (size: %v, limit: %v, bin: %v)", self.proxSize, self.proxLimit, r)
+ }
+}
+
+/*
+returns the list of nodes belonging to the same proximity bin
+as the target. The most proximate bin will be the union of the bins between
+proxLimit and MaxProx.
+*/
+func (self *Kademlia) FindClosest(target Address, max int) []Node {
+ self.lock.Lock()
+ defer self.lock.Unlock()
+
+ r := nodesByDistance{
+ target: target,
+ }
+
+ po := self.proximityBin(target)
+ index := po
+ step := 1
+ glog.V(logger.Detail).Infof("serving %v nodes at %v (PO%02d)", max, index, po)
+
+ // if max is set to 0, just want a full bucket, dynamic number
+ min := max
+ // set limit to max
+ limit := max
+ if max == 0 {
+ min = 1
+ limit = maxPeers
+ }
+
+ var n int
+ for index >= 0 {
+ // add entire bucket
+ for _, p := range self.buckets[index] {
+ r.push(p, limit)
+ n++
+ }
+ // terminate if index reached the bottom or enough peers > min
+ glog.V(logger.Detail).Infof("add %v -> %v (PO%02d, PO%03d)", len(self.buckets[index]), n, index, po)
+ if n >= min && (step < 0 || max == 0) {
+ break
+ }
+ // reach top most non-empty PO bucket, turn around
+ if index == self.MaxProx {
+ index = po
+ step = -1
+ }
+ index += step
+ }
+ glog.V(logger.Detail).Infof("serve %d (<=%d) nodes for target lookup %v (PO%03d)", n, max, target, po)
+ return r.nodes
+}
+
+func (self *Kademlia) Suggest() (*NodeRecord, bool, int) {
+ defer self.lock.RUnlock()
+ self.lock.RLock()
+ return self.db.findBest(self.BucketSize, func(i int) int { return len(self.buckets[i]) })
+}
+
+// adds node records to kaddb (persisted node record db)
+func (self *Kademlia) Add(nrs []*NodeRecord) {
+ self.db.add(nrs, self.proximityBin)
+}
+
+// nodesByDistance is a list of nodes, ordered by distance to target.
+type nodesByDistance struct {
+ nodes []Node
+ target Address
+}
+
+func sortedByDistanceTo(target Address, slice []Node) bool {
+ var last Address
+ for i, node := range slice {
+ if i > 0 {
+ if target.ProxCmp(node.Addr(), last) < 0 {
+ return false
+ }
+ }
+ last = node.Addr()
+ }
+ return true
+}
+
+// push(node, max) adds the given node to the list, keeping the total size
+// below max elements.
+func (h *nodesByDistance) push(node Node, max int) {
+ // returns the firt index ix such that func(i) returns true
+ ix := sort.Search(len(h.nodes), func(i int) bool {
+ return h.target.ProxCmp(h.nodes[i].Addr(), node.Addr()) >= 0
+ })
+
+ if len(h.nodes) < max {
+ h.nodes = append(h.nodes, node)
+ }
+ if ix < len(h.nodes) {
+ copy(h.nodes[ix+1:], h.nodes[ix:])
+ h.nodes[ix] = node
+ }
+}
+
+/*
+Taking the proximity order relative to a fix point x classifies the points in
+the space (n byte long byte sequences) into bins. Items in each are at
+most half as distant from x as items in the previous bin. Given a sample of
+uniformly distributed items (a hash function over arbitrary sequence) the
+proximity scale maps onto series of subsets with cardinalities on a negative
+exponential scale.
+
+It also has the property that any two item belonging to the same bin are at
+most half as distant from each other as they are from x.
+
+If we think of random sample of items in the bins as connections in a network of interconnected nodes than relative proximity can serve as the basis for local
+decisions for graph traversal where the task is to find a route between two
+points. Since in every hop, the finite distance halves, there is
+a guaranteed constant maximum limit on the number of hops needed to reach one
+node from the other.
+*/
+
+func (self *Kademlia) proximityBin(other Address) (ret int) {
+ ret = proximity(self.addr, other)
+ if ret > self.MaxProx {
+ ret = self.MaxProx
+ }
+ return
+}
+
+// provides keyrange for chunk db iteration
+func (self *Kademlia) KeyRange(other Address) (start, stop Address) {
+ defer self.lock.RUnlock()
+ self.lock.RLock()
+ return KeyRange(self.addr, other, self.proxLimit)
+}
+
+// save persists kaddb on disk (written to file on path in json format.
+func (self *Kademlia) Save(path string, cb func(*NodeRecord, Node)) error {
+ return self.db.save(path, cb)
+}
+
+// Load(path) loads the node record database (kaddb) from file on path.
+func (self *Kademlia) Load(path string, cb func(*NodeRecord, Node) error) (err error) {
+ return self.db.load(path, cb)
+}
+
+// kademlia table + kaddb table displayed with ascii
+func (self *Kademlia) String() string {
+ defer self.lock.RUnlock()
+ self.lock.RLock()
+ defer self.db.lock.RUnlock()
+ self.db.lock.RLock()
+
+ var rows []string
+ rows = append(rows, "=========================================================================")
+ rows = append(rows, fmt.Sprintf("%v KΛÐΞMLIΛ hive: queen's address: %v", time.Now().UTC().Format(time.UnixDate), self.addr.String()[:6]))
+ rows = append(rows, fmt.Sprintf("population: %d (%d), proxLimit: %d, proxSize: %d", self.count, len(self.db.index), self.proxLimit, self.proxSize))
+ rows = append(rows, fmt.Sprintf("MaxProx: %d, ProxBinSize: %d, BucketSize: %d", self.MaxProx, self.ProxBinSize, self.BucketSize))
+
+ for i, bucket := range self.buckets {
+
+ if i == self.proxLimit {
+ rows = append(rows, fmt.Sprintf("============ PROX LIMIT: %d ==========================================", i))
+ }
+ row := []string{fmt.Sprintf("%03d", i), fmt.Sprintf("%2d", len(bucket))}
+ var k int
+ c := self.db.cursors[i]
+ for ; k < len(bucket); k++ {
+ p := bucket[(c+k)%len(bucket)]
+ row = append(row, p.Addr().String()[:6])
+ if k == 4 {
+ break
+ }
+ }
+ for ; k < 4; k++ {
+ row = append(row, " ")
+ }
+ row = append(row, fmt.Sprintf("| %2d %2d", len(self.db.Nodes[i]), self.db.cursors[i]))
+
+ for j, p := range self.db.Nodes[i] {
+ row = append(row, p.Addr.String()[:6])
+ if j == 3 {
+ break
+ }
+ }
+ rows = append(rows, strings.Join(row, " "))
+ if i == self.MaxProx {
+ }
+ }
+ rows = append(rows, "=========================================================================")
+ return strings.Join(rows, "\n")
+}
diff --git a/swarm/network/kademlia/kademlia_test.go b/swarm/network/kademlia/kademlia_test.go
new file mode 100644
index 000000000..66edfe654
--- /dev/null
+++ b/swarm/network/kademlia/kademlia_test.go
@@ -0,0 +1,392 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package kademlia
+
+import (
+ "fmt"
+ "math"
+ "math/rand"
+ "os"
+ "path/filepath"
+ "reflect"
+ "testing"
+ "testing/quick"
+ "time"
+)
+
+var (
+ quickrand = rand.New(rand.NewSource(time.Now().Unix()))
+ quickcfgFindClosest = &quick.Config{MaxCount: 50, Rand: quickrand}
+ quickcfgBootStrap = &quick.Config{MaxCount: 100, Rand: quickrand}
+)
+
+type testNode struct {
+ addr Address
+}
+
+func (n *testNode) String() string {
+ return fmt.Sprintf("%x", n.addr[:])
+}
+
+func (n *testNode) Addr() Address {
+ return n.addr
+}
+
+func (n *testNode) Drop() {
+}
+
+func (n *testNode) Url() string {
+ return ""
+}
+
+func (n *testNode) LastActive() time.Time {
+ return time.Now()
+}
+
+func TestOn(t *testing.T) {
+ addr, ok := gen(Address{}, quickrand).(Address)
+ other, ok := gen(Address{}, quickrand).(Address)
+ if !ok {
+ t.Errorf("oops")
+ }
+ kad := New(addr, NewKadParams())
+ err := kad.On(&testNode{addr: other}, nil)
+ _ = err
+}
+
+func TestBootstrap(t *testing.T) {
+
+ test := func(test *bootstrapTest) bool {
+ // for any node kad.le, Target and N
+ params := NewKadParams()
+ params.MaxProx = test.MaxProx
+ params.BucketSize = test.BucketSize
+ params.ProxBinSize = test.BucketSize
+ kad := New(test.Self, params)
+ var err error
+
+ for p := 0; p < 9; p++ {
+ var nrs []*NodeRecord
+ n := math.Pow(float64(2), float64(7-p))
+ for i := 0; i < int(n); i++ {
+ addr := RandomAddressAt(test.Self, p)
+ nrs = append(nrs, &NodeRecord{
+ Addr: addr,
+ })
+ }
+ kad.Add(nrs)
+ }
+
+ node := &testNode{test.Self}
+
+ n := 0
+ for n < 100 {
+ err = kad.On(node, nil)
+ if err != nil {
+ t.Fatalf("backend not accepting node: %v", err)
+ }
+
+ record, need, _ := kad.Suggest()
+ if !need {
+ break
+ }
+ n++
+ if record == nil {
+ continue
+ }
+ node = &testNode{record.Addr}
+ }
+ exp := test.BucketSize * (test.MaxProx + 1)
+ if kad.Count() != exp {
+ t.Errorf("incorrect number of peers, expected %d, got %d\n%v", exp, kad.Count(), kad)
+ return false
+ }
+ return true
+ }
+ if err := quick.Check(test, quickcfgBootStrap); err != nil {
+ t.Error(err)
+ }
+
+}
+
+func TestFindClosest(t *testing.T) {
+
+ test := func(test *FindClosestTest) bool {
+ // for any node kad.le, Target and N
+ params := NewKadParams()
+ params.MaxProx = 7
+ kad := New(test.Self, params)
+ var err error
+ for _, node := range test.All {
+ err = kad.On(node, nil)
+ if err != nil && err.Error() != "bucket full" {
+ t.Fatalf("backend not accepting node: %v", err)
+ }
+ }
+
+ if len(test.All) == 0 || test.N == 0 {
+ return true
+ }
+ nodes := kad.FindClosest(test.Target, test.N)
+
+ // check that the number of results is min(N, kad.len)
+ wantN := test.N
+ if tlen := kad.Count(); tlen < test.N {
+ wantN = tlen
+ }
+
+ if len(nodes) != wantN {
+ t.Errorf("wrong number of nodes: got %d, want %d", len(nodes), wantN)
+ return false
+ }
+
+ if hasDuplicates(nodes) {
+ t.Errorf("result contains duplicates")
+ return false
+ }
+
+ if !sortedByDistanceTo(test.Target, nodes) {
+ t.Errorf("result is not sorted by distance to target")
+ return false
+ }
+
+ // check that the result nodes have minimum distance to target.
+ farthestResult := nodes[len(nodes)-1].Addr()
+ for i, b := range kad.buckets {
+ for j, n := range b {
+ if contains(nodes, n.Addr()) {
+ continue // don't run the check below for nodes in result
+ }
+ if test.Target.ProxCmp(n.Addr(), farthestResult) < 0 {
+ _ = i * j
+ t.Errorf("kad.le contains node that is closer to target but it's not in result")
+ return false
+ }
+ }
+ }
+ return true
+ }
+ if err := quick.Check(test, quickcfgFindClosest); err != nil {
+ t.Error(err)
+ }
+}
+
+type proxTest struct {
+ add bool
+ index int
+ addr Address
+}
+
+var (
+ addresses []Address
+)
+
+func TestProxAdjust(t *testing.T) {
+ r := rand.New(rand.NewSource(time.Now().UnixNano()))
+ self := gen(Address{}, r).(Address)
+ params := NewKadParams()
+ params.MaxProx = 7
+ kad := New(self, params)
+
+ var err error
+ for i := 0; i < 100; i++ {
+ a := gen(Address{}, r).(Address)
+ addresses = append(addresses, a)
+ err = kad.On(&testNode{addr: a}, nil)
+ if err != nil && err.Error() != "bucket full" {
+ t.Fatalf("backend not accepting node: %v", err)
+ }
+ if !kad.proxCheck(t) {
+ return
+ }
+ }
+ test := func(test *proxTest) bool {
+ node := &testNode{test.addr}
+ if test.add {
+ kad.On(node, nil)
+ } else {
+ kad.Off(node, nil)
+ }
+ return kad.proxCheck(t)
+ }
+ if err := quick.Check(test, quickcfgFindClosest); err != nil {
+ t.Error(err)
+ }
+}
+
+func TestSaveLoad(t *testing.T) {
+ r := rand.New(rand.NewSource(time.Now().UnixNano()))
+ addresses := gen([]Address{}, r).([]Address)
+ self := RandomAddress()
+ params := NewKadParams()
+ params.MaxProx = 7
+ kad := New(self, params)
+
+ var err error
+
+ for _, a := range addresses {
+ err = kad.On(&testNode{addr: a}, nil)
+ if err != nil && err.Error() != "bucket full" {
+ t.Fatalf("backend not accepting node: %v", err)
+ }
+ }
+ nodes := kad.FindClosest(self, 100)
+
+ path := filepath.Join(os.TempDir(), "bzz-kad-test-save-load.peers")
+ err = kad.Save(path, nil)
+ if err != nil && err.Error() != "bucket full" {
+ t.Fatalf("unepected error saving kaddb: %v", err)
+ }
+ kad = New(self, params)
+ err = kad.Load(path, nil)
+ if err != nil && err.Error() != "bucket full" {
+ t.Fatalf("unepected error loading kaddb: %v", err)
+ }
+ for _, b := range kad.db.Nodes {
+ for _, node := range b {
+ err = kad.On(&testNode{node.Addr}, nil)
+ if err != nil && err.Error() != "bucket full" {
+ t.Fatalf("backend not accepting node: %v", err)
+ }
+ }
+ }
+ loadednodes := kad.FindClosest(self, 100)
+ for i, node := range loadednodes {
+ if nodes[i].Addr() != node.Addr() {
+ t.Errorf("node mismatch at %d/%d: %v != %v", i, len(nodes), nodes[i].Addr(), node.Addr())
+ }
+ }
+}
+
+func (self *Kademlia) proxCheck(t *testing.T) bool {
+ var sum int
+ for i, b := range self.buckets {
+ l := len(b)
+ // if we are in the high prox multibucket
+ if i >= self.proxLimit {
+ sum += l
+ } else if l == 0 {
+ t.Errorf("bucket %d empty, yet proxLimit is %d\n%v", len(b), self.proxLimit, self)
+ return false
+ }
+ }
+ // check if merged high prox bucket does not exceed size
+ if sum > 0 {
+ if sum != self.proxSize {
+ t.Errorf("proxSize incorrect, expected %v, got %v", sum, self.proxSize)
+ return false
+ }
+ last := len(self.buckets[self.proxLimit])
+ if last > 0 && sum >= self.ProxBinSize+last {
+ t.Errorf("proxLimit %v incorrect, redundant non-empty bucket %d added to proxBin with %v (target %v)\n%v", self.proxLimit, last, sum-last, self.ProxBinSize, self)
+ return false
+ }
+ if self.proxLimit > 0 && sum < self.ProxBinSize {
+ t.Errorf("proxLimit %v incorrect. proxSize %v is less than target %v, yet there is more peers", self.proxLimit, sum, self.ProxBinSize)
+ return false
+ }
+ }
+ return true
+}
+
+type bootstrapTest struct {
+ MaxProx int
+ BucketSize int
+ Self Address
+}
+
+func (*bootstrapTest) Generate(rand *rand.Rand, size int) reflect.Value {
+ t := &bootstrapTest{
+ Self: gen(Address{}, rand).(Address),
+ MaxProx: 5 + rand.Intn(2),
+ BucketSize: rand.Intn(3) + 1,
+ }
+ return reflect.ValueOf(t)
+}
+
+type FindClosestTest struct {
+ Self Address
+ Target Address
+ All []Node
+ N int
+}
+
+func (c FindClosestTest) String() string {
+ return fmt.Sprintf("A: %064x\nT: %064x\n(%d)\n", c.Self[:], c.Target[:], c.N)
+}
+
+func (*FindClosestTest) Generate(rand *rand.Rand, size int) reflect.Value {
+ t := &FindClosestTest{
+ Self: gen(Address{}, rand).(Address),
+ Target: gen(Address{}, rand).(Address),
+ N: rand.Intn(bucketSize),
+ }
+ for _, a := range gen([]Address{}, rand).([]Address) {
+ t.All = append(t.All, &testNode{addr: a})
+ }
+ return reflect.ValueOf(t)
+}
+
+func (*proxTest) Generate(rand *rand.Rand, size int) reflect.Value {
+ var add bool
+ if rand.Intn(1) == 0 {
+ add = true
+ }
+ var t *proxTest
+ if add {
+ t = &proxTest{
+ addr: gen(Address{}, rand).(Address),
+ add: add,
+ }
+ } else {
+ t = &proxTest{
+ index: rand.Intn(len(addresses)),
+ add: add,
+ }
+ }
+ return reflect.ValueOf(t)
+}
+
+func hasDuplicates(slice []Node) bool {
+ seen := make(map[Address]bool)
+ for _, node := range slice {
+ if seen[node.Addr()] {
+ return true
+ }
+ seen[node.Addr()] = true
+ }
+ return false
+}
+
+func contains(nodes []Node, addr Address) bool {
+ for _, n := range nodes {
+ if n.Addr() == addr {
+ return true
+ }
+ }
+ return false
+}
+
+// gen wraps quick.Value so it's easier to use.
+// it generates a random value of the given value's type.
+func gen(typ interface{}, rand *rand.Rand) interface{} {
+ v, ok := quick.Value(reflect.TypeOf(typ), rand)
+ if !ok {
+ panic(fmt.Sprintf("couldn't generate random value of type %T", typ))
+ }
+ return v.Interface()
+}
diff --git a/swarm/network/messages.go b/swarm/network/messages.go
new file mode 100644
index 000000000..d3858c424
--- /dev/null
+++ b/swarm/network/messages.go
@@ -0,0 +1,317 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package network
+
+import (
+ "fmt"
+ "net"
+ "time"
+
+ "github.com/ethereum/go-ethereum/contracts/chequebook"
+ "github.com/ethereum/go-ethereum/p2p/discover"
+ "github.com/ethereum/go-ethereum/swarm/network/kademlia"
+ "github.com/ethereum/go-ethereum/swarm/services/swap"
+ "github.com/ethereum/go-ethereum/swarm/storage"
+)
+
+/*
+BZZ protocol Message Types and Message Data Types
+*/
+
+// bzz protocol message codes
+const (
+ statusMsg = iota // 0x01
+ storeRequestMsg // 0x02
+ retrieveRequestMsg // 0x03
+ peersMsg // 0x04
+ syncRequestMsg // 0x05
+ deliveryRequestMsg // 0x06
+ unsyncedKeysMsg // 0x07
+ paymentMsg // 0x08
+)
+
+/*
+ Handshake
+
+* Version: 8 byte integer version of the protocol
+* ID: arbitrary byte sequence client identifier human readable
+* Addr: the address advertised by the node, format similar to DEVp2p wire protocol
+* Swap: info for the swarm accounting protocol
+* NetworkID: 8 byte integer network identifier
+* Caps: swarm-specific capabilities, format identical to devp2p
+* SyncState: syncronisation state (db iterator key and address space etc) persisted about the peer
+
+*/
+type statusMsgData struct {
+ Version uint64
+ ID string
+ Addr *peerAddr
+ Swap *swap.SwapProfile
+ NetworkId uint64
+}
+
+func (self *statusMsgData) String() string {
+ return fmt.Sprintf("Status: Version: %v, ID: %v, Addr: %v, Swap: %v, NetworkId: %v", self.Version, self.ID, self.Addr, self.Swap, self.NetworkId)
+}
+
+/*
+ store requests are forwarded to the peers in their kademlia proximity bin
+ if they are distant
+ if they are within our storage radius or have any incentive to store it
+ then attach your nodeID to the metadata
+ if the storage request is sufficiently close (within our proxLimit, i. e., the
+ last row of the routing table)
+*/
+type storeRequestMsgData struct {
+ Key storage.Key // hash of datasize | data
+ SData []byte // the actual chunk Data
+ // optional
+ Id uint64 // request ID. if delivery, the ID is retrieve request ID
+ requestTimeout *time.Time // expiry for forwarding - [not serialised][not currently used]
+ storageTimeout *time.Time // expiry of content - [not serialised][not currently used]
+ from *peer // [not serialised] protocol registers the requester
+}
+
+func (self storeRequestMsgData) String() string {
+ var from string
+ if self.from == nil {
+ from = "self"
+ } else {
+ from = self.from.Addr().String()
+ }
+ end := len(self.SData)
+ if len(self.SData) > 10 {
+ end = 10
+ }
+ return fmt.Sprintf("from: %v, Key: %v; ID: %v, requestTimeout: %v, storageTimeout: %v, SData %x", from, self.Key, self.Id, self.requestTimeout, self.storageTimeout, self.SData[:end])
+}
+
+/*
+Retrieve request
+
+Timeout in milliseconds. Note that zero timeout retrieval requests do not request forwarding, but prompt for a peers message response. therefore they serve also
+as messages to retrieve peers.
+
+MaxSize specifies the maximum size that the peer will accept. This is useful in
+particular if we allow storage and delivery of multichunk payload representing
+the entire or partial subtree unfolding from the requested root key.
+So when only interested in limited part of a stream (infinite trees) or only
+testing chunk availability etc etc, we can indicate it by limiting the size here.
+
+Request ID can be newly generated or kept from the request originator.
+If request ID Is missing or zero, the request is handled as a lookup only
+prompting a peers response but not launching a search. Lookup requests are meant
+to be used to bootstrap kademlia tables.
+
+In the special case that the key is the zero value as well, the remote peer's
+address is assumed (the message is to be handled as a self lookup request).
+The response is a PeersMsg with the peers in the kademlia proximity bin
+corresponding to the address.
+*/
+
+type retrieveRequestMsgData struct {
+ Key storage.Key // target Key address of chunk to be retrieved
+ Id uint64 // request id, request is a lookup if missing or zero
+ MaxSize uint64 // maximum size of delivery accepted
+ MaxPeers uint64 // maximum number of peers returned
+ Timeout uint64 // the longest time we are expecting a response
+ timeout *time.Time // [not serialied]
+ from *peer //
+}
+
+func (self retrieveRequestMsgData) String() string {
+ var from string
+ if self.from == nil {
+ from = "ourselves"
+ } else {
+ from = self.from.Addr().String()
+ }
+ var target []byte
+ if len(self.Key) > 3 {
+ target = self.Key[:4]
+ }
+ return fmt.Sprintf("from: %v, Key: %x; ID: %v, MaxSize: %v, MaxPeers: %d", from, target, self.Id, self.MaxSize, self.MaxPeers)
+}
+
+// lookups are encoded by missing request ID
+func (self retrieveRequestMsgData) isLookup() bool {
+ return self.Id == 0
+}
+
+// sets timeout fields
+func (self retrieveRequestMsgData) setTimeout(t *time.Time) {
+ self.timeout = t
+ if t != nil {
+ self.Timeout = uint64(t.UnixNano())
+ } else {
+ self.Timeout = 0
+ }
+}
+
+func (self retrieveRequestMsgData) getTimeout() (t *time.Time) {
+ if self.Timeout > 0 && self.timeout == nil {
+ timeout := time.Unix(int64(self.Timeout), 0)
+ t = &timeout
+ self.timeout = t
+ }
+ return
+}
+
+// peerAddr is sent in StatusMsg as part of the handshake
+type peerAddr struct {
+ IP net.IP
+ Port uint16
+ ID []byte // the 64 byte NodeID (ECDSA Public Key)
+ Addr kademlia.Address
+}
+
+// peerAddr pretty prints as enode
+func (self peerAddr) String() string {
+ var nodeid discover.NodeID
+ copy(nodeid[:], self.ID)
+ return discover.NewNode(nodeid, self.IP, 0, self.Port).String()
+}
+
+/*
+peers Msg is one response to retrieval; it is always encouraged after a retrieval
+request to respond with a list of peers in the same kademlia proximity bin.
+The encoding of a peer is identical to that in the devp2p base protocol peers
+messages: [IP, Port, NodeID]
+note that a node's DPA address is not the NodeID but the hash of the NodeID.
+
+Timeout serves to indicate whether the responder is forwarding the query within
+the timeout or not.
+
+NodeID serves as the owner of payment contracts and signer of proofs of transfer.
+
+The Key is the target (if response to a retrieval request) or missing (zero value)
+peers address (hash of NodeID) if retrieval request was a self lookup.
+
+Peers message is requested by retrieval requests with a missing or zero value request ID
+*/
+type peersMsgData struct {
+ Peers []*peerAddr //
+ Timeout uint64 //
+ timeout *time.Time // indicate whether responder is expected to deliver content
+ Key storage.Key // present if a response to a retrieval request
+ Id uint64 // present if a response to a retrieval request
+ from *peer
+}
+
+// peers msg pretty printer
+func (self peersMsgData) String() string {
+ var from string
+ if self.from == nil {
+ from = "ourselves"
+ } else {
+ from = self.from.Addr().String()
+ }
+ var target []byte
+ if len(self.Key) > 3 {
+ target = self.Key[:4]
+ }
+ return fmt.Sprintf("from: %v, Key: %x; ID: %v, Peers: %v", from, target, self.Id, self.Peers)
+}
+
+func (self peersMsgData) setTimeout(t *time.Time) {
+ self.timeout = t
+ if t != nil {
+ self.Timeout = uint64(t.UnixNano())
+ } else {
+ self.Timeout = 0
+ }
+}
+
+func (self peersMsgData) getTimeout() (t *time.Time) {
+ if self.Timeout > 0 && self.timeout == nil {
+ timeout := time.Unix(int64(self.Timeout), 0)
+ t = &timeout
+ self.timeout = t
+ }
+ return
+}
+
+/*
+syncRequest
+
+is sent after the handshake to initiate syncing
+the syncState of the remote node is persisted in kaddb and set on the
+peer/protocol instance when the node is registered by hive as online{
+*/
+
+type syncRequestMsgData struct {
+ SyncState *syncState `rlp:"nil"`
+}
+
+func (self *syncRequestMsgData) String() string {
+ return fmt.Sprintf("%v", self.SyncState)
+}
+
+/*
+deliveryRequest
+
+is sent once a batch of sync keys is filtered. The ones not found are
+sent as a list of syncReuest (hash, priority) in the Deliver field.
+When the source receives the sync request it continues to iterate
+and fetch at most N items as yet unsynced.
+At the same time responds with deliveries of the items.
+*/
+type deliveryRequestMsgData struct {
+ Deliver []*syncRequest
+}
+
+func (self *deliveryRequestMsgData) String() string {
+ return fmt.Sprintf("sync request for new chunks\ndelivery request for %v chunks", len(self.Deliver))
+}
+
+/*
+unsyncedKeys
+
+is sent first after the handshake if SyncState iterator brings up hundreds, thousands?
+and subsequently sent as a response to deliveryRequestMsgData.
+
+Syncing is the iterative process of exchanging unsyncedKeys and deliveryRequestMsgs
+both ways.
+
+State contains the sync state sent by the source. When the source receives the
+sync state it continues to iterate and fetch at most N items as yet unsynced.
+At the same time responds with deliveries of the items.
+*/
+type unsyncedKeysMsgData struct {
+ Unsynced []*syncRequest
+ State *syncState
+}
+
+func (self *unsyncedKeysMsgData) String() string {
+ return fmt.Sprintf("sync: keys of %d new chunks (state %v) => synced: %v", len(self.Unsynced), self.State, self.State.Synced)
+}
+
+/*
+payment
+
+is sent when the swap balance is tilted in favour of the remote peer
+and in absolute units exceeds the PayAt parameter in the remote peer's profile
+*/
+
+type paymentMsgData struct {
+ Units uint // units actually paid for (checked against amount by swap)
+ Promise *chequebook.Cheque // payment with cheque
+}
+
+func (self *paymentMsgData) String() string {
+ return fmt.Sprintf("payment for %d units: %v", self.Units, self.Promise)
+}
diff --git a/swarm/network/protocol.go b/swarm/network/protocol.go
new file mode 100644
index 000000000..5e65108d6
--- /dev/null
+++ b/swarm/network/protocol.go
@@ -0,0 +1,554 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package network
+
+/*
+bzz implements the swarm wire protocol [bzz] (sister of eth and shh)
+the protocol instance is launched on each peer by the network layer if the
+bzz protocol handler is registered on the p2p server.
+
+The bzz protocol component speaks the bzz protocol
+* handle the protocol handshake
+* register peers in the KΛÐΞMLIΛ table via the hive logistic manager
+* dispatch to hive for handling the DHT logic
+* encode and decode requests for storage and retrieval
+* handle sync protocol messages via the syncer
+* talks the SWAP payment protocol (swap accounting is done within NetStore)
+*/
+
+import (
+ "fmt"
+ "net"
+ "strconv"
+ "time"
+
+ "github.com/ethereum/go-ethereum/contracts/chequebook"
+ "github.com/ethereum/go-ethereum/errs"
+ "github.com/ethereum/go-ethereum/logger"
+ "github.com/ethereum/go-ethereum/logger/glog"
+ "github.com/ethereum/go-ethereum/p2p"
+ "github.com/ethereum/go-ethereum/p2p/discover"
+ bzzswap "github.com/ethereum/go-ethereum/swarm/services/swap"
+ "github.com/ethereum/go-ethereum/swarm/services/swap/swap"
+ "github.com/ethereum/go-ethereum/swarm/storage"
+)
+
+const (
+ Version = 0
+ ProtocolLength = uint64(8)
+ ProtocolMaxMsgSize = 10 * 1024 * 1024
+ NetworkId = 322
+)
+
+const (
+ ErrMsgTooLarge = iota
+ ErrDecode
+ ErrInvalidMsgCode
+ ErrVersionMismatch
+ ErrNetworkIdMismatch
+ ErrNoStatusMsg
+ ErrExtraStatusMsg
+ ErrSwap
+ ErrSync
+ ErrUnwanted
+)
+
+var errorToString = map[int]string{
+ ErrMsgTooLarge: "Message too long",
+ ErrDecode: "Invalid message",
+ ErrInvalidMsgCode: "Invalid message code",
+ ErrVersionMismatch: "Protocol version mismatch",
+ ErrNetworkIdMismatch: "NetworkId mismatch",
+ ErrNoStatusMsg: "No status message",
+ ErrExtraStatusMsg: "Extra status message",
+ ErrSwap: "SWAP error",
+ ErrSync: "Sync error",
+ ErrUnwanted: "Unwanted peer",
+}
+
+// bzz represents the swarm wire protocol
+// an instance is running on each peer
+type bzz struct {
+ selfID discover.NodeID // peer's node id used in peer advertising in handshake
+ key storage.Key // baseaddress as storage.Key
+ storage StorageHandler // handler storage/retrieval related requests coming via the bzz wire protocol
+ hive *Hive // the logistic manager, peerPool, routing service and peer handler
+ dbAccess *DbAccess // access to db storage counter and iterator for syncing
+ requestDb *storage.LDBDatabase // db to persist backlog of deliveries to aid syncing
+ remoteAddr *peerAddr // remote peers address
+ peer *p2p.Peer // the p2p peer object
+ rw p2p.MsgReadWriter // messageReadWriter to send messages to
+ errors *errs.Errors // errors table
+ backend chequebook.Backend
+ lastActive time.Time
+
+ swap *swap.Swap // swap instance for the peer connection
+ swapParams *bzzswap.SwapParams // swap settings both local and remote
+ swapEnabled bool // flag to enable SWAP (will be set via Caps in handshake)
+ syncEnabled bool // flag to enable SYNC (will be set via Caps in handshake)
+ syncer *syncer // syncer instance for the peer connection
+ syncParams *SyncParams // syncer params
+ syncState *syncState // outgoing syncronisation state (contains reference to remote peers db counter)
+}
+
+// interface type for handler of storage/retrieval related requests coming
+// via the bzz wire protocol
+// messages: UnsyncedKeys, DeliveryRequest, StoreRequest, RetrieveRequest
+type StorageHandler interface {
+ HandleUnsyncedKeysMsg(req *unsyncedKeysMsgData, p *peer) error
+ HandleDeliveryRequestMsg(req *deliveryRequestMsgData, p *peer) error
+ HandleStoreRequestMsg(req *storeRequestMsgData, p *peer)
+ HandleRetrieveRequestMsg(req *retrieveRequestMsgData, p *peer)
+}
+
+/*
+main entrypoint, wrappers starting a server that will run the bzz protocol
+use this constructor to attach the protocol ("class") to server caps
+This is done by node.Node#Register(func(node.ServiceContext) (Service, error))
+Service implements Protocols() which is an array of protocol constructors
+at node startup the protocols are initialised
+the Dev p2p layer then calls Run(p *p2p.Peer, rw p2p.MsgReadWriter) error
+on each peer connection
+The Run function of the Bzz protocol class creates a bzz instance
+which will represent the peer for the swarm hive and all peer-aware components
+*/
+func Bzz(cloud StorageHandler, backend chequebook.Backend, hive *Hive, dbaccess *DbAccess, sp *bzzswap.SwapParams, sy *SyncParams) (p2p.Protocol, error) {
+
+ // a single global request db is created for all peer connections
+ // this is to persist delivery backlog and aid syncronisation
+ requestDb, err := storage.NewLDBDatabase(sy.RequestDbPath)
+ if err != nil {
+ return p2p.Protocol{}, fmt.Errorf("error setting up request db: %v", err)
+ }
+
+ return p2p.Protocol{
+ Name: "bzz",
+ Version: Version,
+ Length: ProtocolLength,
+ Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
+ return run(requestDb, cloud, backend, hive, dbaccess, sp, sy, p, rw)
+ },
+ }, nil
+}
+
+/*
+the main protocol loop that
+ * does the handshake by exchanging statusMsg
+ * if peer is valid and accepted, registers with the hive
+ * then enters into a forever loop handling incoming messages
+ * storage and retrieval related queries coming via bzz are dispatched to StorageHandler
+ * peer-related messages are dispatched to the hive
+ * payment related messages are relayed to SWAP service
+ * on disconnect, unregister the peer in the hive (note RemovePeer in the post-disconnect hook)
+ * whenever the loop terminates, the peer will disconnect with Subprotocol error
+ * whenever handlers return an error the loop terminates
+*/
+func run(requestDb *storage.LDBDatabase, depo StorageHandler, backend chequebook.Backend, hive *Hive, dbaccess *DbAccess, sp *bzzswap.SwapParams, sy *SyncParams, p *p2p.Peer, rw p2p.MsgReadWriter) (err error) {
+
+ self := &bzz{
+ storage: depo,
+ backend: backend,
+ hive: hive,
+ dbAccess: dbaccess,
+ requestDb: requestDb,
+ peer: p,
+ rw: rw,
+ errors: &errs.Errors{
+ Package: "BZZ",
+ Errors: errorToString,
+ },
+ swapParams: sp,
+ syncParams: sy,
+ swapEnabled: hive.swapEnabled,
+ syncEnabled: true,
+ }
+
+ // handle handshake
+ err = self.handleStatus()
+ if err != nil {
+ return err
+ }
+ defer func() {
+ // if the handler loop exits, the peer is disconnecting
+ // deregister the peer in the hive
+ self.hive.removePeer(&peer{bzz: self})
+ if self.syncer != nil {
+ self.syncer.stop() // quits request db and delivery loops, save requests
+ }
+ if self.swap != nil {
+ self.swap.Stop() // quits chequebox autocash etc
+ }
+ }()
+
+ // the main forever loop that handles incoming requests
+ for {
+ if self.hive.blockRead {
+ glog.V(logger.Warn).Infof("Cannot read network")
+ time.Sleep(100 * time.Millisecond)
+ continue
+ }
+ err = self.handle()
+ if err != nil {
+ return
+ }
+ }
+}
+
+// TODO: may need to implement protocol drop only? don't want to kick off the peer
+// if they are useful for other protocols
+func (self *bzz) Drop() {
+ self.peer.Disconnect(p2p.DiscSubprotocolError)
+}
+
+// one cycle of the main forever loop that handles and dispatches incoming messages
+func (self *bzz) handle() error {
+ msg, err := self.rw.ReadMsg()
+ glog.V(logger.Debug).Infof("<- %v", msg)
+ if err != nil {
+ return err
+ }
+ if msg.Size > ProtocolMaxMsgSize {
+ return self.protoError(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize)
+ }
+ // make sure that the payload has been fully consumed
+ defer msg.Discard()
+
+ switch msg.Code {
+
+ case statusMsg:
+ // no extra status message allowed. The one needed already handled by
+ // handleStatus
+ glog.V(logger.Debug).Infof("Status message: %v", msg)
+ return self.protoError(ErrExtraStatusMsg, "")
+
+ case storeRequestMsg:
+ // store requests are dispatched to netStore
+ var req storeRequestMsgData
+ if err := msg.Decode(&req); err != nil {
+ return self.protoError(ErrDecode, "<- %v: %v", msg, err)
+ }
+ if len(req.SData) < 9 {
+ return self.protoError(ErrDecode, "<- %v: Data too short (%v)", msg)
+ }
+ // last Active time is set only when receiving chunks
+ self.lastActive = time.Now()
+ glog.V(logger.Detail).Infof("incoming store request: %s", req.String())
+ // swap accounting is done within forwarding
+ self.storage.HandleStoreRequestMsg(&req, &peer{bzz: self})
+
+ case retrieveRequestMsg:
+ // retrieve Requests are dispatched to netStore
+ var req retrieveRequestMsgData
+ if err := msg.Decode(&req); err != nil {
+ return self.protoError(ErrDecode, "<- %v: %v", msg, err)
+ }
+ req.from = &peer{bzz: self}
+ // if request is lookup and not to be delivered
+ if req.isLookup() {
+ glog.V(logger.Detail).Infof("self lookup for %v: responding with peers only...", req.from)
+ } else if req.Key == nil {
+ return self.protoError(ErrDecode, "protocol handler: req.Key == nil || req.Timeout == nil")
+ } else {
+ // swap accounting is done within netStore
+ self.storage.HandleRetrieveRequestMsg(&req, &peer{bzz: self})
+ }
+ // direct response with peers, TODO: sort this out
+ self.hive.peers(&req)
+
+ case peersMsg:
+ // response to lookups and immediate response to retrieve requests
+ // dispatches new peer data to the hive that adds them to KADDB
+ var req peersMsgData
+ if err := msg.Decode(&req); err != nil {
+ return self.protoError(ErrDecode, "<- %v: %v", msg, err)
+ }
+ req.from = &peer{bzz: self}
+ glog.V(logger.Detail).Infof("<- peer addresses: %v", req)
+ self.hive.HandlePeersMsg(&req, &peer{bzz: self})
+
+ case syncRequestMsg:
+ var req syncRequestMsgData
+ if err := msg.Decode(&req); err != nil {
+ return self.protoError(ErrDecode, "<- %v: %v", msg, err)
+ }
+ glog.V(logger.Debug).Infof("<- sync request: %v", req)
+ self.lastActive = time.Now()
+ self.sync(req.SyncState)
+
+ case unsyncedKeysMsg:
+ // coming from parent node offering
+ var req unsyncedKeysMsgData
+ if err := msg.Decode(&req); err != nil {
+ return self.protoError(ErrDecode, "<- %v: %v", msg, err)
+ }
+ glog.V(logger.Debug).Infof("<- unsynced keys : %s", req.String())
+ err := self.storage.HandleUnsyncedKeysMsg(&req, &peer{bzz: self})
+ self.lastActive = time.Now()
+ if err != nil {
+ return self.protoError(ErrDecode, "<- %v: %v", msg, err)
+ }
+
+ case deliveryRequestMsg:
+ // response to syncKeysMsg hashes filtered not existing in db
+ // also relays the last synced state to the source
+ var req deliveryRequestMsgData
+ if err := msg.Decode(&req); err != nil {
+ return self.protoError(ErrDecode, "<-msg %v: %v", msg, err)
+ }
+ glog.V(logger.Debug).Infof("<- delivery request: %s", req.String())
+ err := self.storage.HandleDeliveryRequestMsg(&req, &peer{bzz: self})
+ self.lastActive = time.Now()
+ if err != nil {
+ return self.protoError(ErrDecode, "<- %v: %v", msg, err)
+ }
+
+ case paymentMsg:
+ // swap protocol message for payment, Units paid for, Cheque paid with
+ if self.swapEnabled {
+ var req paymentMsgData
+ if err := msg.Decode(&req); err != nil {
+ return self.protoError(ErrDecode, "<- %v: %v", msg, err)
+ }
+ glog.V(logger.Debug).Infof("<- payment: %s", req.String())
+ self.swap.Receive(int(req.Units), req.Promise)
+ }
+
+ default:
+ // no other message is allowed
+ return self.protoError(ErrInvalidMsgCode, "%v", msg.Code)
+ }
+ return nil
+}
+
+func (self *bzz) handleStatus() (err error) {
+
+ handshake := &statusMsgData{
+ Version: uint64(Version),
+ ID: "honey",
+ Addr: self.selfAddr(),
+ NetworkId: uint64(NetworkId),
+ Swap: &bzzswap.SwapProfile{
+ Profile: self.swapParams.Profile,
+ PayProfile: self.swapParams.PayProfile,
+ },
+ }
+
+ err = p2p.Send(self.rw, statusMsg, handshake)
+ if err != nil {
+ self.protoError(ErrNoStatusMsg, err.Error())
+ }
+
+ // read and handle remote status
+ var msg p2p.Msg
+ msg, err = self.rw.ReadMsg()
+ if err != nil {
+ return err
+ }
+
+ if msg.Code != statusMsg {
+ self.protoError(ErrNoStatusMsg, "first msg has code %x (!= %x)", msg.Code, statusMsg)
+ }
+
+ if msg.Size > ProtocolMaxMsgSize {
+ return self.protoError(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize)
+ }
+
+ var status statusMsgData
+ if err := msg.Decode(&status); err != nil {
+ return self.protoError(ErrDecode, " %v: %v", msg, err)
+ }
+
+ if status.NetworkId != NetworkId {
+ return self.protoError(ErrNetworkIdMismatch, "%d (!= %d)", status.NetworkId, NetworkId)
+ }
+
+ if Version != status.Version {
+ return self.protoError(ErrVersionMismatch, "%d (!= %d)", status.Version, Version)
+ }
+
+ self.remoteAddr = self.peerAddr(status.Addr)
+ glog.V(logger.Detail).Infof("self: advertised IP: %v, peer advertised: %v, local address: %v\npeer: advertised IP: %v, remote address: %v\n", self.selfAddr(), self.remoteAddr, self.peer.LocalAddr(), status.Addr.IP, self.peer.RemoteAddr())
+
+ if self.swapEnabled {
+ // set remote profile for accounting
+ self.swap, err = bzzswap.NewSwap(self.swapParams, status.Swap, self.backend, self)
+ if err != nil {
+ return self.protoError(ErrSwap, "%v", err)
+ }
+ }
+
+ glog.V(logger.Info).Infof("Peer %08x is capable (%d/%d)", self.remoteAddr.Addr[:4], status.Version, status.NetworkId)
+ err = self.hive.addPeer(&peer{bzz: self})
+ if err != nil {
+ return self.protoError(ErrUnwanted, "%v", err)
+ }
+
+ // hive sets syncstate so sync should start after node added
+ glog.V(logger.Info).Infof("syncronisation request sent with %v", self.syncState)
+ self.syncRequest()
+
+ return nil
+}
+
+func (self *bzz) sync(state *syncState) error {
+ // syncer setup
+ if self.syncer != nil {
+ return self.protoError(ErrSync, "sync request can only be sent once")
+ }
+
+ cnt := self.dbAccess.counter()
+ remoteaddr := self.remoteAddr.Addr
+ start, stop := self.hive.kad.KeyRange(remoteaddr)
+
+ // an explicitly received nil syncstate disables syncronisation
+ if state == nil {
+ self.syncEnabled = false
+ glog.V(logger.Warn).Infof("syncronisation disabled for peer %v", self)
+ state = &syncState{DbSyncState: &storage.DbSyncState{}, Synced: true}
+ } else {
+ state.synced = make(chan bool)
+ state.SessionAt = cnt
+ if storage.IsZeroKey(state.Stop) && state.Synced {
+ state.Start = storage.Key(start[:])
+ state.Stop = storage.Key(stop[:])
+ }
+ glog.V(logger.Debug).Infof("syncronisation requested by peer %v at state %v", self, state)
+ }
+ var err error
+ self.syncer, err = newSyncer(
+ self.requestDb,
+ storage.Key(remoteaddr[:]),
+ self.dbAccess,
+ self.unsyncedKeys, self.store,
+ self.syncParams, state, func() bool { return self.syncEnabled },
+ )
+ if err != nil {
+ return self.protoError(ErrSync, "%v", err)
+ }
+ glog.V(logger.Detail).Infof("syncer set for peer %v", self)
+ return nil
+}
+
+func (self *bzz) String() string {
+ return self.remoteAddr.String()
+}
+
+// repair reported address if IP missing
+func (self *bzz) peerAddr(base *peerAddr) *peerAddr {
+ if base.IP.IsUnspecified() {
+ host, _, _ := net.SplitHostPort(self.peer.RemoteAddr().String())
+ base.IP = net.ParseIP(host)
+ }
+ return base
+}
+
+// returns self advertised node connection info (listening address w enodes)
+// IP will get repaired on the other end if missing
+// or resolved via ID by discovery at dialout
+func (self *bzz) selfAddr() *peerAddr {
+ id := self.hive.id
+ host, port, _ := net.SplitHostPort(self.hive.listenAddr())
+ intport, _ := strconv.Atoi(port)
+ addr := &peerAddr{
+ Addr: self.hive.addr,
+ ID: id[:],
+ IP: net.ParseIP(host),
+ Port: uint16(intport),
+ }
+ return addr
+}
+
+// outgoing messages
+// send retrieveRequestMsg
+func (self *bzz) retrieve(req *retrieveRequestMsgData) error {
+ return self.send(retrieveRequestMsg, req)
+}
+
+// send storeRequestMsg
+func (self *bzz) store(req *storeRequestMsgData) error {
+ return self.send(storeRequestMsg, req)
+}
+
+func (self *bzz) syncRequest() error {
+ req := &syncRequestMsgData{}
+ if self.hive.syncEnabled {
+ glog.V(logger.Debug).Infof("syncronisation request to peer %v at state %v", self, self.syncState)
+ req.SyncState = self.syncState
+ }
+ if self.syncState == nil {
+ glog.V(logger.Warn).Infof("syncronisation disabled for peer %v at state %v", self, self.syncState)
+ }
+ return self.send(syncRequestMsg, req)
+}
+
+// queue storeRequestMsg in request db
+func (self *bzz) deliveryRequest(reqs []*syncRequest) error {
+ req := &deliveryRequestMsgData{
+ Deliver: reqs,
+ }
+ return self.send(deliveryRequestMsg, req)
+}
+
+// batch of syncRequests to send off
+func (self *bzz) unsyncedKeys(reqs []*syncRequest, state *syncState) error {
+ req := &unsyncedKeysMsgData{
+ Unsynced: reqs,
+ State: state,
+ }
+ return self.send(unsyncedKeysMsg, req)
+}
+
+// send paymentMsg
+func (self *bzz) Pay(units int, promise swap.Promise) {
+ req := &paymentMsgData{uint(units), promise.(*chequebook.Cheque)}
+ self.payment(req)
+}
+
+// send paymentMsg
+func (self *bzz) payment(req *paymentMsgData) error {
+ return self.send(paymentMsg, req)
+}
+
+// sends peersMsg
+func (self *bzz) peers(req *peersMsgData) error {
+ return self.send(peersMsg, req)
+}
+
+func (self *bzz) protoError(code int, format string, params ...interface{}) (err *errs.Error) {
+ err = self.errors.New(code, format, params...)
+ err.Log(glog.V(logger.Info))
+ return
+}
+
+func (self *bzz) protoErrorDisconnect(err *errs.Error) {
+ err.Log(glog.V(logger.Info))
+ if err.Fatal() {
+ self.peer.Disconnect(p2p.DiscSubprotocolError)
+ }
+}
+
+func (self *bzz) send(msg uint64, data interface{}) error {
+ if self.hive.blockWrite {
+ return fmt.Errorf("network write blocked")
+ }
+ glog.V(logger.Detail).Infof("-> %v: %v (%T) to %v", msg, data, data, self)
+ err := p2p.Send(self.rw, msg, data)
+ if err != nil {
+ self.Drop()
+ }
+ return err
+}
diff --git a/swarm/network/protocol_test.go b/swarm/network/protocol_test.go
new file mode 100644
index 000000000..91dea8cac
--- /dev/null
+++ b/swarm/network/protocol_test.go
@@ -0,0 +1,17 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package network
diff --git a/swarm/network/syncdb.go b/swarm/network/syncdb.go
new file mode 100644
index 000000000..cef32610f
--- /dev/null
+++ b/swarm/network/syncdb.go
@@ -0,0 +1,390 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package network
+
+import (
+ "encoding/binary"
+ "fmt"
+
+ "github.com/ethereum/go-ethereum/logger"
+ "github.com/ethereum/go-ethereum/logger/glog"
+ "github.com/ethereum/go-ethereum/swarm/storage"
+ "github.com/syndtr/goleveldb/leveldb"
+ "github.com/syndtr/goleveldb/leveldb/iterator"
+)
+
+const counterKeyPrefix = 0x01
+
+/*
+syncDb is a queueing service for outgoing deliveries.
+One instance per priority queue for each peer
+
+a syncDb instance maintains an in-memory buffer (of capacity bufferSize)
+once its in-memory buffer is full it switches to persisting in db
+and dbRead iterator iterates through the items keeping their order
+once the db read catches up (there is no more items in the db) then
+it switches back to in-memory buffer.
+
+when syncdb is stopped all items in the buffer are saved to the db
+*/
+type syncDb struct {
+ start []byte // this syncdb starting index in requestdb
+ key storage.Key // remote peers address key
+ counterKey []byte // db key to persist counter
+ priority uint // priotity High|Medium|Low
+ buffer chan interface{} // incoming request channel
+ db *storage.LDBDatabase // underlying db (TODO should be interface)
+ done chan bool // chan to signal goroutines finished quitting
+ quit chan bool // chan to signal quitting to goroutines
+ total, dbTotal int // counts for one session
+ batch chan chan int // channel for batch requests
+ dbBatchSize uint // number of items before batch is saved
+}
+
+// constructor needs a shared request db (leveldb)
+// priority is used in the index key
+// uses a buffer and a leveldb for persistent storage
+// bufferSize, dbBatchSize are config parameters
+func newSyncDb(db *storage.LDBDatabase, key storage.Key, priority uint, bufferSize, dbBatchSize uint, deliver func(interface{}, chan bool) bool) *syncDb {
+ start := make([]byte, 42)
+ start[1] = byte(priorities - priority)
+ copy(start[2:34], key)
+
+ counterKey := make([]byte, 34)
+ counterKey[0] = counterKeyPrefix
+ copy(counterKey[1:], start[1:34])
+
+ syncdb := &syncDb{
+ start: start,
+ key: key,
+ counterKey: counterKey,
+ priority: priority,
+ buffer: make(chan interface{}, bufferSize),
+ db: db,
+ done: make(chan bool),
+ quit: make(chan bool),
+ batch: make(chan chan int),
+ dbBatchSize: dbBatchSize,
+ }
+ glog.V(logger.Detail).Infof("syncDb[peer: %v, priority: %v] - initialised", key.Log(), priority)
+
+ // starts the main forever loop reading from buffer
+ go syncdb.bufferRead(deliver)
+ return syncdb
+}
+
+/*
+bufferRead is a forever iterator loop that takes care of delivering
+outgoing store requests reads from incoming buffer
+
+its argument is the deliver function taking the item as first argument
+and a quit channel as second.
+Closing of this channel is supposed to abort all waiting for delivery
+(typically network write)
+
+The iteration switches between 2 modes,
+* buffer mode reads the in-memory buffer and delivers the items directly
+* db mode reads from the buffer and writes to the db, parallelly another
+routine is started that reads from the db and delivers items
+
+If there is buffer contention in buffer mode (slow network, high upload volume)
+syncdb switches to db mode and starts dbRead
+Once db backlog is delivered, it reverts back to in-memory buffer
+
+It is automatically started when syncdb is initialised.
+
+It saves the buffer to db upon receiving quit signal. syncDb#stop()
+*/
+func (self *syncDb) bufferRead(deliver func(interface{}, chan bool) bool) {
+ var buffer, db chan interface{} // channels representing the two read modes
+ var more bool
+ var req interface{}
+ var entry *syncDbEntry
+ var inBatch, inDb int
+ batch := new(leveldb.Batch)
+ var dbSize chan int
+ quit := self.quit
+ counterValue := make([]byte, 8)
+
+ // counter is used for keeping the items in order, persisted to db
+ // start counter where db was at, 0 if not found
+ data, err := self.db.Get(self.counterKey)
+ var counter uint64
+ if err == nil {
+ counter = binary.BigEndian.Uint64(data)
+ glog.V(logger.Detail).Infof("syncDb[%v/%v] - counter read from db at %v", self.key.Log(), self.priority, counter)
+ } else {
+ glog.V(logger.Detail).Infof("syncDb[%v/%v] - counter starts at %v", self.key.Log(), self.priority, counter)
+ }
+
+LOOP:
+ for {
+ // waiting for item next in the buffer, or quit signal or batch request
+ select {
+ // buffer only closes when writing to db
+ case req = <-buffer:
+ // deliver request : this is blocking on network write so
+ // it is passed the quit channel as argument, so that it returns
+ // if syncdb is stopped. In this case we need to save the item to the db
+ more = deliver(req, self.quit)
+ if !more {
+ glog.V(logger.Debug).Infof("syncDb[%v/%v] quit: switching to db. session tally (db/total): %v/%v", self.key.Log(), self.priority, self.dbTotal, self.total)
+ // received quit signal, save request currently waiting delivery
+ // by switching to db mode and closing the buffer
+ buffer = nil
+ db = self.buffer
+ close(db)
+ quit = nil // needs to block the quit case in select
+ break // break from select, this item will be written to the db
+ }
+ self.total++
+ glog.V(logger.Detail).Infof("syncDb[%v/%v] deliver (db/total): %v/%v", self.key.Log(), self.priority, self.dbTotal, self.total)
+ // by the time deliver returns, there were new writes to the buffer
+ // if buffer contention is detected, switch to db mode which drains
+ // the buffer so no process will block on pushing store requests
+ if len(buffer) == cap(buffer) {
+ glog.V(logger.Debug).Infof("syncDb[%v/%v] buffer full %v: switching to db. session tally (db/total): %v/%v", self.key.Log(), self.priority, cap(buffer), self.dbTotal, self.total)
+ buffer = nil
+ db = self.buffer
+ }
+ continue LOOP
+
+ // incoming entry to put into db
+ case req, more = <-db:
+ if !more {
+ // only if quit is called, saved all the buffer
+ binary.BigEndian.PutUint64(counterValue, counter)
+ batch.Put(self.counterKey, counterValue) // persist counter in batch
+ self.writeSyncBatch(batch) // save batch
+ glog.V(logger.Detail).Infof("syncDb[%v/%v] quitting: save current batch to db", self.key.Log(), self.priority)
+ break LOOP
+ }
+ self.dbTotal++
+ self.total++
+ // otherwise break after select
+ case dbSize = <-self.batch:
+ // explicit request for batch
+ if inBatch == 0 && quit != nil {
+ // there was no writes since the last batch so db depleted
+ // switch to buffer mode
+ glog.V(logger.Debug).Infof("syncDb[%v/%v] empty db: switching to buffer", self.key.Log(), self.priority)
+ db = nil
+ buffer = self.buffer
+ dbSize <- 0 // indicates to 'caller' that batch has been written
+ inDb = 0
+ continue LOOP
+ }
+ binary.BigEndian.PutUint64(counterValue, counter)
+ batch.Put(self.counterKey, counterValue)
+ glog.V(logger.Debug).Infof("syncDb[%v/%v] write batch %v/%v - %x - %x", self.key.Log(), self.priority, inBatch, counter, self.counterKey, counterValue)
+ batch = self.writeSyncBatch(batch)
+ dbSize <- inBatch // indicates to 'caller' that batch has been written
+ inBatch = 0
+ continue LOOP
+
+ // closing syncDb#quit channel is used to signal to all goroutines to quit
+ case <-quit:
+ // need to save backlog, so switch to db mode
+ db = self.buffer
+ buffer = nil
+ quit = nil
+ glog.V(logger.Detail).Infof("syncDb[%v/%v] quitting: save buffer to db", self.key.Log(), self.priority)
+ close(db)
+ continue LOOP
+ }
+
+ // only get here if we put req into db
+ entry, err = self.newSyncDbEntry(req, counter)
+ if err != nil {
+ glog.V(logger.Warn).Infof("syncDb[%v/%v] saving request %v (#%v/%v) failed: %v", self.key.Log(), self.priority, req, inBatch, inDb, err)
+ continue LOOP
+ }
+ batch.Put(entry.key, entry.val)
+ glog.V(logger.Detail).Infof("syncDb[%v/%v] to batch %v '%v' (#%v/%v/%v)", self.key.Log(), self.priority, req, entry, inBatch, inDb, counter)
+ // if just switched to db mode and not quitting, then launch dbRead
+ // in a parallel go routine to send deliveries from db
+ if inDb == 0 && quit != nil {
+ glog.V(logger.Detail).Infof("syncDb[%v/%v] start dbRead")
+ go self.dbRead(true, counter, deliver)
+ }
+ inDb++
+ inBatch++
+ counter++
+ // need to save the batch if it gets too large (== dbBatchSize)
+ if inBatch%int(self.dbBatchSize) == 0 {
+ batch = self.writeSyncBatch(batch)
+ }
+ }
+ glog.V(logger.Info).Infof("syncDb[%v:%v]: saved %v keys (saved counter at %v)", self.key.Log(), self.priority, inBatch, counter)
+ close(self.done)
+}
+
+// writes the batch to the db and returns a new batch object
+func (self *syncDb) writeSyncBatch(batch *leveldb.Batch) *leveldb.Batch {
+ err := self.db.Write(batch)
+ if err != nil {
+ glog.V(logger.Warn).Infof("syncDb[%v/%v] saving batch to db failed: %v", self.key.Log(), self.priority, err)
+ return batch
+ }
+ return new(leveldb.Batch)
+}
+
+// abstract type for db entries (TODO could be a feature of Receipts)
+type syncDbEntry struct {
+ key, val []byte
+}
+
+func (self syncDbEntry) String() string {
+ return fmt.Sprintf("key: %x, value: %x", self.key, self.val)
+}
+
+/*
+ dbRead is iterating over store requests to be sent over to the peer
+ this is mainly to prevent crashes due to network output buffer contention (???)
+ as well as to make syncronisation resilient to disconnects
+ the messages are supposed to be sent in the p2p priority queue.
+
+ the request DB is shared between peers, but domains for each syncdb
+ are disjoint. dbkeys (42 bytes) are structured:
+ * 0: 0x00 (0x01 reserved for counter key)
+ * 1: priorities - priority (so that high priority can be replayed first)
+ * 2-33: peers address
+ * 34-41: syncdb counter to preserve order (this field is missing for the counter key)
+
+ values (40 bytes) are:
+ * 0-31: key
+ * 32-39: request id
+
+dbRead needs a boolean to indicate if on first round all the historical
+record is synced. Second argument to indicate current db counter
+The third is the function to apply
+*/
+func (self *syncDb) dbRead(useBatches bool, counter uint64, fun func(interface{}, chan bool) bool) {
+ key := make([]byte, 42)
+ copy(key, self.start)
+ binary.BigEndian.PutUint64(key[34:], counter)
+ var batches, n, cnt, total int
+ var more bool
+ var entry *syncDbEntry
+ var it iterator.Iterator
+ var del *leveldb.Batch
+ batchSizes := make(chan int)
+
+ for {
+ // if useBatches is false, cnt is not set
+ if useBatches {
+ // this could be called before all cnt items sent out
+ // so that loop is not blocking while delivering
+ // only relevant if cnt is large
+ select {
+ case self.batch <- batchSizes:
+ case <-self.quit:
+ return
+ }
+ // wait for the write to finish and get the item count in the next batch
+ cnt = <-batchSizes
+ batches++
+ if cnt == 0 {
+ // empty
+ return
+ }
+ }
+ it = self.db.NewIterator()
+ it.Seek(key)
+ if !it.Valid() {
+ copy(key, self.start)
+ useBatches = true
+ continue
+ }
+ del = new(leveldb.Batch)
+ glog.V(logger.Detail).Infof("syncDb[%v/%v]: new iterator: %x (batch %v, count %v)", self.key.Log(), self.priority, key, batches, cnt)
+
+ for n = 0; !useBatches || n < cnt; it.Next() {
+ copy(key, it.Key())
+ if len(key) == 0 || key[0] != 0 {
+ copy(key, self.start)
+ useBatches = true
+ break
+ }
+ val := make([]byte, 40)
+ copy(val, it.Value())
+ entry = &syncDbEntry{key, val}
+ // glog.V(logger.Detail).Infof("syncDb[%v/%v] - %v, batches: %v, total: %v, session total from db: %v/%v", self.key.Log(), self.priority, self.key.Log(), batches, total, self.dbTotal, self.total)
+ more = fun(entry, self.quit)
+ if !more {
+ // quit received when waiting to deliver entry, the entry will not be deleted
+ glog.V(logger.Detail).Infof("syncDb[%v/%v] batch %v quit after %v/%v items", self.key.Log(), self.priority, batches, n, cnt)
+ break
+ }
+ // since subsequent batches of the same db session are indexed incrementally
+ // deleting earlier batches can be delayed and parallelised
+ // this could be batch delete when db is idle (but added complexity esp when quitting)
+ del.Delete(key)
+ n++
+ total++
+ }
+ glog.V(logger.Debug).Infof("syncDb[%v/%v] - db session closed, batches: %v, total: %v, session total from db: %v/%v", self.key.Log(), self.priority, batches, total, self.dbTotal, self.total)
+ self.db.Write(del) // this could be async called only when db is idle
+ it.Release()
+ }
+}
+
+//
+func (self *syncDb) stop() {
+ close(self.quit)
+ <-self.done
+}
+
+// calculate a dbkey for the request, for the db to work
+// see syncdb for db key structure
+// polimorphic: accepted types, see syncer#addRequest
+func (self *syncDb) newSyncDbEntry(req interface{}, counter uint64) (entry *syncDbEntry, err error) {
+ var key storage.Key
+ var chunk *storage.Chunk
+ var id uint64
+ var ok bool
+ var sreq *storeRequestMsgData
+
+ if key, ok = req.(storage.Key); ok {
+ id = generateId()
+ } else if chunk, ok = req.(*storage.Chunk); ok {
+ key = chunk.Key
+ id = generateId()
+ } else if sreq, ok = req.(*storeRequestMsgData); ok {
+ key = sreq.Key
+ id = sreq.Id
+ } else if entry, ok = req.(*syncDbEntry); !ok {
+ return nil, fmt.Errorf("type not allowed: %v (%T)", req, req)
+ }
+
+ // order by peer > priority > seqid
+ // value is request id if exists
+ if entry == nil {
+ dbkey := make([]byte, 42)
+ dbval := make([]byte, 40)
+
+ // encode key
+ copy(dbkey[:], self.start[:34]) // db peer
+ binary.BigEndian.PutUint64(dbkey[34:], counter)
+ // encode value
+ copy(dbval, key[:])
+ binary.BigEndian.PutUint64(dbval[32:], id)
+
+ entry = &syncDbEntry{dbkey, dbval}
+ }
+ return
+}
diff --git a/swarm/network/syncdb_test.go b/swarm/network/syncdb_test.go
new file mode 100644
index 000000000..e46d32a2e
--- /dev/null
+++ b/swarm/network/syncdb_test.go
@@ -0,0 +1,221 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package network
+
+import (
+ "bytes"
+ "io/ioutil"
+ "os"
+ "path/filepath"
+ "testing"
+ "time"
+
+ "github.com/ethereum/go-ethereum/crypto"
+ "github.com/ethereum/go-ethereum/logger"
+ "github.com/ethereum/go-ethereum/logger/glog"
+ "github.com/ethereum/go-ethereum/swarm/storage"
+)
+
+func init() {
+ glog.SetV(0)
+ glog.SetToStderr(true)
+}
+
+type testSyncDb struct {
+ *syncDb
+ c int
+ t *testing.T
+ fromDb chan bool
+ delivered [][]byte
+ sent []int
+ dbdir string
+ at int
+}
+
+func newTestSyncDb(priority, bufferSize, batchSize int, dbdir string, t *testing.T) *testSyncDb {
+ if len(dbdir) == 0 {
+ tmp, err := ioutil.TempDir(os.TempDir(), "syncdb-test")
+ if err != nil {
+ t.Fatalf("unable to create temporary direcory %v: %v", tmp, err)
+ }
+ dbdir = tmp
+ }
+ db, err := storage.NewLDBDatabase(filepath.Join(dbdir, "requestdb"))
+ if err != nil {
+ t.Fatalf("unable to create db: %v", err)
+ }
+ self := &testSyncDb{
+ fromDb: make(chan bool),
+ dbdir: dbdir,
+ t: t,
+ }
+ h := crypto.Sha3Hash([]byte{0})
+ key := storage.Key(h[:])
+ self.syncDb = newSyncDb(db, key, uint(priority), uint(bufferSize), uint(batchSize), self.deliver)
+ // kick off db iterator right away, if no items on db this will allow
+ // reading from the buffer
+ return self
+
+}
+
+func (self *testSyncDb) close() {
+ self.db.Close()
+ os.RemoveAll(self.dbdir)
+}
+
+func (self *testSyncDb) push(n int) {
+ for i := 0; i < n; i++ {
+ self.buffer <- storage.Key(crypto.Sha3([]byte{byte(self.c)}))
+ self.sent = append(self.sent, self.c)
+ self.c++
+ }
+ glog.V(logger.Debug).Infof("pushed %v requests", n)
+}
+
+func (self *testSyncDb) draindb() {
+ it := self.db.NewIterator()
+ defer it.Release()
+ for {
+ it.Seek(self.start)
+ if !it.Valid() {
+ return
+ }
+ k := it.Key()
+ if len(k) == 0 || k[0] == 1 {
+ return
+ }
+ it.Release()
+ it = self.db.NewIterator()
+ }
+}
+
+func (self *testSyncDb) deliver(req interface{}, quit chan bool) bool {
+ _, db := req.(*syncDbEntry)
+ key, _, _, _, err := parseRequest(req)
+ if err != nil {
+ self.t.Fatalf("unexpected error of key %v: %v", key, err)
+ }
+ self.delivered = append(self.delivered, key)
+ select {
+ case self.fromDb <- db:
+ return true
+ case <-quit:
+ return false
+ }
+}
+
+func (self *testSyncDb) expect(n int, db bool) {
+ var ok bool
+ // for n items
+ for i := 0; i < n; i++ {
+ ok = <-self.fromDb
+ if self.at+1 > len(self.delivered) {
+ self.t.Fatalf("expected %v, got %v", self.at+1, len(self.delivered))
+ }
+ if len(self.sent) > self.at && !bytes.Equal(crypto.Sha3([]byte{byte(self.sent[self.at])}), self.delivered[self.at]) {
+ self.t.Fatalf("expected delivery %v/%v/%v to be hash of %v, from db: %v = %v", i, n, self.at, self.sent[self.at], ok, db)
+ glog.V(logger.Debug).Infof("%v/%v/%v to be hash of %v, from db: %v = %v", i, n, self.at, self.sent[self.at], ok, db)
+ }
+ if !ok && db {
+ self.t.Fatalf("expected delivery %v/%v/%v from db", i, n, self.at)
+ }
+ if ok && !db {
+ self.t.Fatalf("expected delivery %v/%v/%v from cache", i, n, self.at)
+ }
+ self.at++
+ }
+}
+
+func TestSyncDb(t *testing.T) {
+ priority := High
+ bufferSize := 5
+ batchSize := 2 * bufferSize
+ s := newTestSyncDb(priority, bufferSize, batchSize, "", t)
+ defer s.close()
+ defer s.stop()
+ s.dbRead(false, 0, s.deliver)
+ s.draindb()
+
+ s.push(4)
+ s.expect(1, false)
+ // 3 in buffer
+ time.Sleep(100 * time.Millisecond)
+ s.push(3)
+ // push over limit
+ s.expect(1, false)
+ // one popped from the buffer, then contention detected
+ s.expect(4, true)
+ s.push(4)
+ s.expect(5, true)
+ // depleted db, switch back to buffer
+ s.draindb()
+ s.push(5)
+ s.expect(4, false)
+ s.push(3)
+ s.expect(4, false)
+ // buffer depleted
+ time.Sleep(100 * time.Millisecond)
+ s.push(6)
+ s.expect(1, false)
+ // push into buffer full, switch to db
+ s.expect(5, true)
+ s.draindb()
+ s.push(1)
+ s.expect(1, false)
+}
+
+func TestSaveSyncDb(t *testing.T) {
+ amount := 30
+ priority := High
+ bufferSize := amount
+ batchSize := 10
+ s := newTestSyncDb(priority, bufferSize, batchSize, "", t)
+ go s.dbRead(false, 0, s.deliver)
+ s.push(amount)
+ s.stop()
+ s.db.Close()
+
+ s = newTestSyncDb(priority, bufferSize, batchSize, s.dbdir, t)
+ go s.dbRead(false, 0, s.deliver)
+ s.expect(amount, true)
+ for i, key := range s.delivered {
+ expKey := crypto.Sha3([]byte{byte(i)})
+ if !bytes.Equal(key, expKey) {
+ t.Fatalf("delivery %v expected to be key %x, got %x", i, expKey, key)
+ }
+ }
+ s.push(amount)
+ s.expect(amount, false)
+ for i := amount; i < 2*amount; i++ {
+ key := s.delivered[i]
+ expKey := crypto.Sha3([]byte{byte(i - amount)})
+ if !bytes.Equal(key, expKey) {
+ t.Fatalf("delivery %v expected to be key %x, got %x", i, expKey, key)
+ }
+ }
+ s.stop()
+ s.db.Close()
+
+ s = newTestSyncDb(priority, bufferSize, batchSize, s.dbdir, t)
+ defer s.close()
+ defer s.stop()
+
+ go s.dbRead(false, 0, s.deliver)
+ s.push(1)
+ s.expect(1, false)
+
+}
diff --git a/swarm/network/syncer.go b/swarm/network/syncer.go
new file mode 100644
index 000000000..e871666bd
--- /dev/null
+++ b/swarm/network/syncer.go
@@ -0,0 +1,778 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package network
+
+import (
+ "encoding/binary"
+ "encoding/json"
+ "fmt"
+ "path/filepath"
+
+ "github.com/ethereum/go-ethereum/logger"
+ "github.com/ethereum/go-ethereum/logger/glog"
+ "github.com/ethereum/go-ethereum/swarm/storage"
+)
+
+// syncer parameters (global, not peer specific) default values
+const (
+ requestDbBatchSize = 512 // size of batch before written to request db
+ keyBufferSize = 1024 // size of buffer for unsynced keys
+ syncBatchSize = 128 // maximum batchsize for outgoing requests
+ syncBufferSize = 128 // size of buffer for delivery requests
+ syncCacheSize = 1024 // cache capacity to store request queue in memory
+)
+
+// priorities
+const (
+ Low = iota // 0
+ Medium // 1
+ High // 2
+ priorities // 3 number of priority levels
+)
+
+// request types
+const (
+ DeliverReq = iota // 0
+ PushReq // 1
+ PropagateReq // 2
+ HistoryReq // 3
+ BacklogReq // 4
+)
+
+// json serialisable struct to record the syncronisation state between 2 peers
+type syncState struct {
+ *storage.DbSyncState // embeds the following 4 fields:
+ // Start Key // lower limit of address space
+ // Stop Key // upper limit of address space
+ // First uint64 // counter taken from last sync state
+ // Last uint64 // counter of remote peer dbStore at the time of last connection
+ SessionAt uint64 // set at the time of connection
+ LastSeenAt uint64 // set at the time of connection
+ Latest storage.Key // cursor of dbstore when last (continuously set by syncer)
+ Synced bool // true iff Sync is done up to the last disconnect
+ synced chan bool // signal that sync stage finished
+}
+
+// wrapper of db-s to provide mockable custom local chunk store access to syncer
+type DbAccess struct {
+ db *storage.DbStore
+ loc *storage.LocalStore
+}
+
+func NewDbAccess(loc *storage.LocalStore) *DbAccess {
+ return &DbAccess{loc.DbStore.(*storage.DbStore), loc}
+}
+
+// to obtain the chunks from key or request db entry only
+func (self *DbAccess) get(key storage.Key) (*storage.Chunk, error) {
+ return self.loc.Get(key)
+}
+
+// current storage counter of chunk db
+func (self *DbAccess) counter() uint64 {
+ return self.db.Counter()
+}
+
+// implemented by dbStoreSyncIterator
+type keyIterator interface {
+ Next() storage.Key
+}
+
+// generator function for iteration by address range and storage counter
+func (self *DbAccess) iterator(s *syncState) keyIterator {
+ it, err := self.db.NewSyncIterator(*(s.DbSyncState))
+ if err != nil {
+ return nil
+ }
+ return keyIterator(it)
+}
+
+func (self syncState) String() string {
+ if self.Synced {
+ return fmt.Sprintf(
+ "session started at: %v, last seen at: %v, latest key: %v",
+ self.SessionAt, self.LastSeenAt,
+ self.Latest.Log(),
+ )
+ } else {
+ return fmt.Sprintf(
+ "address: %v-%v, index: %v-%v, session started at: %v, last seen at: %v, latest key: %v",
+ self.Start.Log(), self.Stop.Log(),
+ self.First, self.Last,
+ self.SessionAt, self.LastSeenAt,
+ self.Latest.Log(),
+ )
+ }
+}
+
+// syncer parameters (global, not peer specific)
+type SyncParams struct {
+ RequestDbPath string // path for request db (leveldb)
+ RequestDbBatchSize uint // nuber of items before batch is saved to requestdb
+ KeyBufferSize uint // size of key buffer
+ SyncBatchSize uint // maximum batchsize for outgoing requests
+ SyncBufferSize uint // size of buffer for
+ SyncCacheSize uint // cache capacity to store request queue in memory
+ SyncPriorities []uint // list of priority levels for req types 0-3
+ SyncModes []bool // list of sync modes for for req types 0-3
+}
+
+// constructor with default values
+func NewSyncParams(bzzdir string) *SyncParams {
+ return &SyncParams{
+ RequestDbPath: filepath.Join(bzzdir, "requests"),
+ RequestDbBatchSize: requestDbBatchSize,
+ KeyBufferSize: keyBufferSize,
+ SyncBufferSize: syncBufferSize,
+ SyncBatchSize: syncBatchSize,
+ SyncCacheSize: syncCacheSize,
+ SyncPriorities: []uint{High, Medium, Medium, Low, Low},
+ SyncModes: []bool{true, true, true, true, false},
+ }
+}
+
+// syncer is the agent that manages content distribution/storage replication/chunk storeRequest forwarding
+type syncer struct {
+ *SyncParams // sync parameters
+ syncF func() bool // if syncing is needed
+ key storage.Key // remote peers address key
+ state *syncState // sync state for our dbStore
+ syncStates chan *syncState // different stages of sync
+ deliveryRequest chan bool // one of two triggers needed to send unsyncedKeys
+ newUnsyncedKeys chan bool // one of two triggers needed to send unsynced keys
+ quit chan bool // signal to quit loops
+
+ // DB related fields
+ dbAccess *DbAccess // access to dbStore
+ db *storage.LDBDatabase // delivery msg db
+
+ // native fields
+ queues [priorities]*syncDb // in-memory cache / queues for sync reqs
+ keys [priorities]chan interface{} // buffer for unsynced keys
+ deliveries [priorities]chan *storeRequestMsgData // delivery
+
+ // bzz protocol instance outgoing message callbacks (mockable for testing)
+ unsyncedKeys func([]*syncRequest, *syncState) error // send unsyncedKeysMsg
+ store func(*storeRequestMsgData) error // send storeRequestMsg
+}
+
+// a syncer instance is linked to each peer connection
+// constructor is called from protocol after successful handshake
+// the returned instance is attached to the peer and can be called
+// by the forwarder
+func newSyncer(
+ db *storage.LDBDatabase, remotekey storage.Key,
+ dbAccess *DbAccess,
+ unsyncedKeys func([]*syncRequest, *syncState) error,
+ store func(*storeRequestMsgData) error,
+ params *SyncParams,
+ state *syncState,
+ syncF func() bool,
+) (*syncer, error) {
+
+ syncBufferSize := params.SyncBufferSize
+ keyBufferSize := params.KeyBufferSize
+ dbBatchSize := params.RequestDbBatchSize
+
+ self := &syncer{
+ syncF: syncF,
+ key: remotekey,
+ dbAccess: dbAccess,
+ syncStates: make(chan *syncState, 20),
+ deliveryRequest: make(chan bool, 1),
+ newUnsyncedKeys: make(chan bool, 1),
+ SyncParams: params,
+ state: state,
+ quit: make(chan bool),
+ unsyncedKeys: unsyncedKeys,
+ store: store,
+ }
+
+ // initialising
+ for i := 0; i < priorities; i++ {
+ self.keys[i] = make(chan interface{}, keyBufferSize)
+ self.deliveries[i] = make(chan *storeRequestMsgData)
+ // initialise a syncdb instance for each priority queue
+ self.queues[i] = newSyncDb(db, remotekey, uint(i), syncBufferSize, dbBatchSize, self.deliver(uint(i)))
+ }
+ glog.V(logger.Info).Infof("syncer started: %v", state)
+ // launch chunk delivery service
+ go self.syncDeliveries()
+ // launch sync task manager
+ if self.syncF() {
+ go self.sync()
+ }
+ // process unsynced keys to broadcast
+ go self.syncUnsyncedKeys()
+
+ return self, nil
+}
+
+// metadata serialisation
+func encodeSync(state *syncState) (*json.RawMessage, error) {
+ data, err := json.MarshalIndent(state, "", " ")
+ if err != nil {
+ return nil, err
+ }
+ meta := json.RawMessage(data)
+ return &meta, nil
+}
+
+func decodeSync(meta *json.RawMessage) (*syncState, error) {
+ if meta == nil {
+ return nil, fmt.Errorf("unable to deserialise sync state from <nil>")
+ }
+ data := []byte(*(meta))
+ if len(data) == 0 {
+ return nil, fmt.Errorf("unable to deserialise sync state from <nil>")
+ }
+ state := &syncState{DbSyncState: &storage.DbSyncState{}}
+ err := json.Unmarshal(data, state)
+ return state, err
+}
+
+/*
+ sync implements the syncing script
+ * first all items left in the request Db are replayed
+ * type = StaleSync
+ * Mode: by default once again via confirmation roundtrip
+ * Priority: the items are replayed as the proirity specified for StaleSync
+ * but within the order respects earlier priority level of request
+ * after all items are consumed for a priority level, the the respective
+ queue for delivery requests is open (this way new reqs not written to db)
+ (TODO: this should be checked)
+ * the sync state provided by the remote peer is used to sync history
+ * all the backlog from earlier (aborted) syncing is completed starting from latest
+ * if Last < LastSeenAt then all items in between then process all
+ backlog from upto last disconnect
+ * if Last > 0 &&
+
+ sync is called from the syncer constructor and is not supposed to be used externally
+*/
+func (self *syncer) sync() {
+ state := self.state
+ // sync finished
+ defer close(self.syncStates)
+
+ // 0. first replay stale requests from request db
+ if state.SessionAt == 0 {
+ glog.V(logger.Debug).Infof("syncer[%v]: nothing to sync", self.key.Log())
+ return
+ }
+ glog.V(logger.Debug).Infof("syncer[%v]: start replaying stale requests from request db", self.key.Log())
+ for p := priorities - 1; p >= 0; p-- {
+ self.queues[p].dbRead(false, 0, self.replay())
+ }
+ glog.V(logger.Debug).Infof("syncer[%v]: done replaying stale requests from request db", self.key.Log())
+
+ // unless peer is synced sync unfinished history beginning on
+ if !state.Synced {
+ start := state.Start
+
+ if !storage.IsZeroKey(state.Latest) {
+ // 1. there is unfinished earlier sync
+ state.Start = state.Latest
+ glog.V(logger.Debug).Infof("syncer[%v]: start syncronising backlog (unfinished sync: %v)", self.key.Log(), state)
+ // blocks while the entire history upto state is synced
+ self.syncState(state)
+ if state.Last < state.SessionAt {
+ state.First = state.Last + 1
+ }
+ }
+ state.Latest = storage.ZeroKey
+ state.Start = start
+ // 2. sync up to last disconnect1
+ if state.First < state.LastSeenAt {
+ state.Last = state.LastSeenAt
+ glog.V(logger.Debug).Infof("syncer[%v]: start syncronising history upto last disconnect at %v: %v", self.key.Log(), state.LastSeenAt, state)
+ self.syncState(state)
+ state.First = state.LastSeenAt
+ }
+ state.Latest = storage.ZeroKey
+
+ } else {
+ // synchronisation starts at end of last session
+ state.First = state.LastSeenAt
+ }
+
+ // 3. sync up to current session start
+ // if there have been new chunks since last session
+ if state.LastSeenAt < state.SessionAt {
+ state.Last = state.SessionAt
+ glog.V(logger.Debug).Infof("syncer[%v]: start syncronising history since last disconnect at %v up until session start at %v: %v", self.key.Log(), state.LastSeenAt, state.SessionAt, state)
+ // blocks until state syncing is finished
+ self.syncState(state)
+ }
+ glog.V(logger.Info).Infof("syncer[%v]: syncing all history complete", self.key.Log())
+
+}
+
+// wait till syncronised block uptil state is synced
+func (self *syncer) syncState(state *syncState) {
+ self.syncStates <- state
+ select {
+ case <-state.synced:
+ case <-self.quit:
+ }
+}
+
+// stop quits both request processor and saves the request cache to disk
+func (self *syncer) stop() {
+ close(self.quit)
+ glog.V(logger.Detail).Infof("syncer[%v]: stop and save sync request db backlog", self.key.Log())
+ for _, db := range self.queues {
+ db.stop()
+ }
+}
+
+// rlp serialisable sync request
+type syncRequest struct {
+ Key storage.Key
+ Priority uint
+}
+
+func (self *syncRequest) String() string {
+ return fmt.Sprintf("<Key: %v, Priority: %v>", self.Key.Log(), self.Priority)
+}
+
+func (self *syncer) newSyncRequest(req interface{}, p int) (*syncRequest, error) {
+ key, _, _, _, err := parseRequest(req)
+ // TODO: if req has chunk, it should be put in a cache
+ // create
+ if err != nil {
+ return nil, err
+ }
+ return &syncRequest{key, uint(p)}, nil
+}
+
+// serves historical items from the DB
+// * read is on demand, blocking unless history channel is read
+// * accepts sync requests (syncStates) to create new db iterator
+// * closes the channel one iteration finishes
+func (self *syncer) syncHistory(state *syncState) chan interface{} {
+ var n uint
+ history := make(chan interface{})
+ glog.V(logger.Debug).Infof("syncer[%v]: syncing history between %v - %v for chunk addresses %v - %v", self.key.Log(), state.First, state.Last, state.Start, state.Stop)
+ it := self.dbAccess.iterator(state)
+ if it != nil {
+ go func() {
+ // signal end of the iteration ended
+ defer close(history)
+ IT:
+ for {
+ key := it.Next()
+ if key == nil {
+ break IT
+ }
+ select {
+ // blocking until history channel is read from
+ case history <- storage.Key(key):
+ n++
+ glog.V(logger.Detail).Infof("syncer[%v]: history: %v (%v keys)", self.key.Log(), key.Log(), n)
+ state.Latest = key
+ case <-self.quit:
+ return
+ }
+ }
+ glog.V(logger.Debug).Infof("syncer[%v]: finished syncing history between %v - %v for chunk addresses %v - %v (at %v) (chunks = %v)", self.key.Log(), state.First, state.Last, state.Start, state.Stop, state.Latest, n)
+ }()
+ }
+ return history
+}
+
+// triggers key syncronisation
+func (self *syncer) sendUnsyncedKeys() {
+ select {
+ case self.deliveryRequest <- true:
+ default:
+ }
+}
+
+// assembles a new batch of unsynced keys
+// * keys are drawn from the key buffers in order of priority queue
+// * if the queues of priority for History (HistoryReq) or higher are depleted,
+// historical data is used so historical items are lower priority within
+// their priority group.
+// * Order of historical data is unspecified
+func (self *syncer) syncUnsyncedKeys() {
+ // send out new
+ var unsynced []*syncRequest
+ var more, justSynced bool
+ var keyCount, historyCnt int
+ var history chan interface{}
+
+ priority := High
+ keys := self.keys[priority]
+ var newUnsyncedKeys, deliveryRequest chan bool
+ keyCounts := make([]int, priorities)
+ histPrior := self.SyncPriorities[HistoryReq]
+ syncStates := self.syncStates
+ state := self.state
+
+LOOP:
+ for {
+
+ var req interface{}
+ // select the highest priority channel to read from
+ // keys channels are buffered so the highest priority ones
+ // are checked first - integrity can only be guaranteed if writing
+ // is locked while selecting
+ if priority != High || len(keys) == 0 {
+ // selection is not needed if the High priority queue has items
+ keys = nil
+ PRIORITIES:
+ for priority = High; priority >= 0; priority-- {
+ // the first priority channel that is non-empty will be assigned to keys
+ if len(self.keys[priority]) > 0 {
+ glog.V(logger.Detail).Infof("syncer[%v]: reading request with priority %v", self.key.Log(), priority)
+ keys = self.keys[priority]
+ break PRIORITIES
+ }
+ glog.V(logger.Detail).Infof("syncer[%v/%v]: queue: [%v, %v, %v]", self.key.Log(), priority, len(self.keys[High]), len(self.keys[Medium]), len(self.keys[Low]))
+ // if the input queue is empty on this level, resort to history if there is any
+ if uint(priority) == histPrior && history != nil {
+ glog.V(logger.Detail).Infof("syncer[%v]: reading history for %v", self.key.Log(), self.key)
+ keys = history
+ break PRIORITIES
+ }
+ }
+ }
+
+ // if peer ready to receive but nothing to send
+ if keys == nil && deliveryRequest == nil {
+ // if no items left and switch to waiting mode
+ glog.V(logger.Detail).Infof("syncer[%v]: buffers consumed. Waiting", self.key.Log())
+ newUnsyncedKeys = self.newUnsyncedKeys
+ }
+
+ // send msg iff
+ // * peer is ready to receive keys AND (
+ // * all queues and history are depleted OR
+ // * batch full OR
+ // * all history have been consumed, synced)
+ if deliveryRequest == nil &&
+ (justSynced ||
+ len(unsynced) > 0 && keys == nil ||
+ len(unsynced) == int(self.SyncBatchSize)) {
+ justSynced = false
+ // listen to requests
+ deliveryRequest = self.deliveryRequest
+ newUnsyncedKeys = nil // not care about data until next req comes in
+ // set sync to current counter
+ // (all nonhistorical outgoing traffic sheduled and persisted
+ state.LastSeenAt = self.dbAccess.counter()
+ state.Latest = storage.ZeroKey
+ glog.V(logger.Detail).Infof("syncer[%v]: sending %v", self.key.Log(), unsynced)
+ // send the unsynced keys
+ stateCopy := *state
+ err := self.unsyncedKeys(unsynced, &stateCopy)
+ if err != nil {
+ glog.V(logger.Warn).Infof("syncer[%v]: unable to send unsynced keys: %v", err)
+ }
+ self.state = state
+ glog.V(logger.Debug).Infof("syncer[%v]: --> %v keys sent: (total: %v (%v), history: %v), sent sync state: %v", self.key.Log(), len(unsynced), keyCounts, keyCount, historyCnt, stateCopy)
+ unsynced = nil
+ keys = nil
+ }
+
+ // process item and add it to the batch
+ select {
+ case <-self.quit:
+ break LOOP
+ case req, more = <-keys:
+ if keys == history && !more {
+ glog.V(logger.Detail).Infof("syncer[%v]: syncing history segment complete", self.key.Log())
+ // history channel is closed, waiting for new state (called from sync())
+ syncStates = self.syncStates
+ state.Synced = true // this signals that the current segment is complete
+ select {
+ case state.synced <- false:
+ case <-self.quit:
+ break LOOP
+ }
+ justSynced = true
+ history = nil
+ }
+ case <-deliveryRequest:
+ glog.V(logger.Detail).Infof("syncer[%v]: peer ready to receive", self.key.Log())
+
+ // this 1 cap channel can wake up the loop
+ // signaling that peer is ready to receive unsynced Keys
+ // the channel is set to nil any further writes will be ignored
+ deliveryRequest = nil
+
+ case <-newUnsyncedKeys:
+ glog.V(logger.Detail).Infof("syncer[%v]: new unsynced keys available", self.key.Log())
+ // this 1 cap channel can wake up the loop
+ // signals that data is available to send if peer is ready to receive
+ newUnsyncedKeys = nil
+ keys = self.keys[High]
+
+ case state, more = <-syncStates:
+ // this resets the state
+ if !more {
+ state = self.state
+ glog.V(logger.Detail).Infof("syncer[%v]: (priority %v) syncing complete upto %v)", self.key.Log(), priority, state)
+ state.Synced = true
+ syncStates = nil
+ } else {
+ glog.V(logger.Detail).Infof("syncer[%v]: (priority %v) syncing history upto %v priority %v)", self.key.Log(), priority, state, histPrior)
+ state.Synced = false
+ history = self.syncHistory(state)
+ // only one history at a time, only allow another one once the
+ // history channel is closed
+ syncStates = nil
+ }
+ }
+ if req == nil {
+ continue LOOP
+ }
+
+ glog.V(logger.Detail).Infof("syncer[%v]: (priority %v) added to unsynced keys: %v", self.key.Log(), priority, req)
+ keyCounts[priority]++
+ keyCount++
+ if keys == history {
+ glog.V(logger.Detail).Infof("syncer[%v]: (priority %v) history item %v (synced = %v)", self.key.Log(), priority, req, state.Synced)
+ historyCnt++
+ }
+ if sreq, err := self.newSyncRequest(req, priority); err == nil {
+ // extract key from req
+ glog.V(logger.Detail).Infof("syncer(priority %v): request %v (synced = %v)", self.key.Log(), priority, req, state.Synced)
+ unsynced = append(unsynced, sreq)
+ } else {
+ glog.V(logger.Warn).Infof("syncer(priority %v): error creating request for %v: %v)", self.key.Log(), priority, req, state.Synced, err)
+ }
+
+ }
+}
+
+// delivery loop
+// takes into account priority, send store Requests with chunk (delivery)
+// idle blocking if no new deliveries in any of the queues
+func (self *syncer) syncDeliveries() {
+ var req *storeRequestMsgData
+ p := High
+ var deliveries chan *storeRequestMsgData
+ var msg *storeRequestMsgData
+ var err error
+ var c = [priorities]int{}
+ var n = [priorities]int{}
+ var total, success uint
+
+ for {
+ deliveries = self.deliveries[p]
+ select {
+ case req = <-deliveries:
+ n[p]++
+ c[p]++
+ default:
+ if p == Low {
+ // blocking, depletion on all channels, no preference for priority
+ select {
+ case req = <-self.deliveries[High]:
+ n[High]++
+ case req = <-self.deliveries[Medium]:
+ n[Medium]++
+ case req = <-self.deliveries[Low]:
+ n[Low]++
+ case <-self.quit:
+ return
+ }
+ p = High
+ } else {
+ p--
+ continue
+ }
+ }
+ total++
+ msg, err = self.newStoreRequestMsgData(req)
+ if err != nil {
+ glog.V(logger.Warn).Infof("syncer[%v]: failed to create store request for %v: %v", self.key.Log(), req, err)
+ } else {
+ err = self.store(msg)
+ if err != nil {
+ glog.V(logger.Warn).Infof("syncer[%v]: failed to deliver %v: %v", self.key.Log(), req, err)
+ } else {
+ success++
+ glog.V(logger.Detail).Infof("syncer[%v]: %v successfully delivered", self.key.Log(), req)
+ }
+ }
+ if total%self.SyncBatchSize == 0 {
+ glog.V(logger.Debug).Infof("syncer[%v]: deliver Total: %v, Success: %v, High: %v/%v, Medium: %v/%v, Low %v/%v", self.key.Log(), total, success, c[High], n[High], c[Medium], n[Medium], c[Low], n[Low])
+ }
+ }
+}
+
+/*
+ addRequest handles requests for delivery
+ it accepts 4 types:
+
+ * storeRequestMsgData: coming from netstore propagate response
+ * chunk: coming from forwarding (questionable: id?)
+ * key: from incoming syncRequest
+ * syncDbEntry: key,id encoded in db
+
+ If sync mode is on for the type of request, then
+ it sends the request to the keys queue of the correct priority
+ channel buffered with capacity (SyncBufferSize)
+
+ If sync mode is off then, requests are directly sent to deliveries
+*/
+func (self *syncer) addRequest(req interface{}, ty int) {
+ // retrieve priority for request type name int8
+
+ priority := self.SyncPriorities[ty]
+ // sync mode for this type ON
+ if self.syncF() || ty == DeliverReq {
+ if self.SyncModes[ty] {
+ self.addKey(req, priority, self.quit)
+ } else {
+ self.addDelivery(req, priority, self.quit)
+ }
+ }
+}
+
+// addKey queues sync request for sync confirmation with given priority
+// ie the key will go out in an unsyncedKeys message
+func (self *syncer) addKey(req interface{}, priority uint, quit chan bool) bool {
+ select {
+ case self.keys[priority] <- req:
+ // this wakes up the unsynced keys loop if idle
+ select {
+ case self.newUnsyncedKeys <- true:
+ default:
+ }
+ return true
+ case <-quit:
+ return false
+ }
+}
+
+// addDelivery queues delivery request for with given priority
+// ie the chunk will be delivered ASAP mod priority queueing handled by syncdb
+// requests are persisted across sessions for correct sync
+func (self *syncer) addDelivery(req interface{}, priority uint, quit chan bool) bool {
+ select {
+ case self.queues[priority].buffer <- req:
+ return true
+ case <-quit:
+ return false
+ }
+}
+
+// doDelivery delivers the chunk for the request with given priority
+// without queuing
+func (self *syncer) doDelivery(req interface{}, priority uint, quit chan bool) bool {
+ msgdata, err := self.newStoreRequestMsgData(req)
+ if err != nil {
+ glog.V(logger.Warn).Infof("unable to deliver request %v: %v", msgdata, err)
+ return false
+ }
+ select {
+ case self.deliveries[priority] <- msgdata:
+ return true
+ case <-quit:
+ return false
+ }
+}
+
+// returns the delivery function for given priority
+// passed on to syncDb
+func (self *syncer) deliver(priority uint) func(req interface{}, quit chan bool) bool {
+ return func(req interface{}, quit chan bool) bool {
+ return self.doDelivery(req, priority, quit)
+ }
+}
+
+// returns the replay function passed on to syncDb
+// depending on sync mode settings for BacklogReq,
+// re play of request db backlog sends items via confirmation
+// or directly delivers
+func (self *syncer) replay() func(req interface{}, quit chan bool) bool {
+ sync := self.SyncModes[BacklogReq]
+ priority := self.SyncPriorities[BacklogReq]
+ // sync mode for this type ON
+ if sync {
+ return func(req interface{}, quit chan bool) bool {
+ return self.addKey(req, priority, quit)
+ }
+ } else {
+ return func(req interface{}, quit chan bool) bool {
+ return self.doDelivery(req, priority, quit)
+ }
+
+ }
+}
+
+// given a request, extends it to a full storeRequestMsgData
+// polimorphic: see addRequest for the types accepted
+func (self *syncer) newStoreRequestMsgData(req interface{}) (*storeRequestMsgData, error) {
+
+ key, id, chunk, sreq, err := parseRequest(req)
+ if err != nil {
+ return nil, err
+ }
+
+ if sreq == nil {
+ if chunk == nil {
+ var err error
+ chunk, err = self.dbAccess.get(key)
+ if err != nil {
+ return nil, err
+ }
+ }
+
+ sreq = &storeRequestMsgData{
+ Id: id,
+ Key: chunk.Key,
+ SData: chunk.SData,
+ }
+ }
+
+ return sreq, nil
+}
+
+// parse request types and extracts, key, id, chunk, request if available
+// does not do chunk lookup !
+func parseRequest(req interface{}) (storage.Key, uint64, *storage.Chunk, *storeRequestMsgData, error) {
+ var key storage.Key
+ var entry *syncDbEntry
+ var chunk *storage.Chunk
+ var id uint64
+ var ok bool
+ var sreq *storeRequestMsgData
+ var err error
+
+ if key, ok = req.(storage.Key); ok {
+ id = generateId()
+
+ } else if entry, ok = req.(*syncDbEntry); ok {
+ id = binary.BigEndian.Uint64(entry.val[32:])
+ key = storage.Key(entry.val[:32])
+
+ } else if chunk, ok = req.(*storage.Chunk); ok {
+ key = chunk.Key
+ id = generateId()
+
+ } else if sreq, ok = req.(*storeRequestMsgData); ok {
+ key = sreq.Key
+ } else {
+ err = fmt.Errorf("type not allowed: %v (%T)", req, req)
+ }
+
+ return key, id, chunk, sreq, err
+}