aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/network/hive.go
diff options
context:
space:
mode:
authorethersphere <thesw@rm.eth>2018-06-20 20:06:27 +0800
committerethersphere <thesw@rm.eth>2018-06-22 03:10:31 +0800
commite187711c6545487d4cac3701f0f506bb536234e2 (patch)
treed2f6150f70b84b36e49a449082aeda267b4b9046 /swarm/network/hive.go
parent574378edb50c907b532946a1d4654dbd6701b20a (diff)
downloadgo-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.tar
go-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.tar.gz
go-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.tar.bz2
go-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.tar.lz
go-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.tar.xz
go-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.tar.zst
go-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.zip
swarm: network rewrite merge
Diffstat (limited to 'swarm/network/hive.go')
-rw-r--r--swarm/network/hive.go514
1 files changed, 186 insertions, 328 deletions
diff --git a/swarm/network/hive.go b/swarm/network/hive.go
index 8404ffcc2..a54a17d29 100644
--- a/swarm/network/hive.go
+++ b/swarm/network/hive.go
@@ -18,386 +18,244 @@ package network
import (
"fmt"
- "math/rand"
- "path/filepath"
+ "sync"
"time"
- "github.com/ethereum/go-ethereum/common"
- "github.com/ethereum/go-ethereum/log"
- "github.com/ethereum/go-ethereum/metrics"
+ "github.com/ethereum/go-ethereum/common/hexutil"
+ "github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/discover"
- "github.com/ethereum/go-ethereum/p2p/netutil"
- "github.com/ethereum/go-ethereum/swarm/network/kademlia"
- "github.com/ethereum/go-ethereum/swarm/storage"
+ "github.com/ethereum/go-ethereum/swarm/log"
+ "github.com/ethereum/go-ethereum/swarm/state"
)
-// Hive is the logistic manager of the swarm
-// it uses a generic kademlia nodetable to find best peer list
-// for any target
-// this is used by the netstore to search for content in the swarm
-// the bzz protocol peersMsgData exchange is relayed to Kademlia
-// for db storage and filtering
-// connections and disconnections are reported and relayed
-// to keep the nodetable uptodate
-
-var (
- peersNumGauge = metrics.NewRegisteredGauge("network.peers.num", nil)
- addPeerCounter = metrics.NewRegisteredCounter("network.addpeer.count", nil)
- removePeerCounter = metrics.NewRegisteredCounter("network.removepeer.count", nil)
-)
-
-type Hive struct {
- listenAddr func() string
- callInterval uint64
- id discover.NodeID
- addr kademlia.Address
- kad *kademlia.Kademlia
- path string
- quit chan bool
- toggle chan bool
- more chan bool
-
- // for testing only
- swapEnabled bool
- syncEnabled bool
- blockRead bool
- blockWrite bool
+/*
+Hive is the logistic manager of the swarm
+
+When the hive is started, a forever loop is launched that
+asks the Overlay Topology driver (e.g., generic kademlia nodetable)
+to suggest peers to bootstrap connectivity
+*/
+
+// Overlay is the interface for kademlia (or other topology drivers)
+type Overlay interface {
+ // suggest peers to connect to
+ SuggestPeer() (OverlayAddr, int, bool)
+ // register and deregister peer connections
+ On(OverlayConn) (depth uint8, changed bool)
+ Off(OverlayConn)
+ // register peer addresses
+ Register([]OverlayAddr) error
+ // iterate over connected peers
+ EachConn([]byte, int, func(OverlayConn, int, bool) bool)
+ // iterate over known peers (address records)
+ EachAddr([]byte, int, func(OverlayAddr, int, bool) bool)
+ // pretty print the connectivity
+ String() string
+ // base Overlay address of the node itself
+ BaseAddr() []byte
+ // connectivity health check used for testing
+ Healthy(*PeerPot) *Health
}
-const (
- callInterval = 3000000000
- // bucketSize = 3
- // maxProx = 8
- // proxBinSize = 4
-)
-
+// HiveParams holds the config options to hive
type HiveParams struct {
- CallInterval uint64
- KadDbPath string
- *kademlia.KadParams
+ Discovery bool // if want discovery of not
+ PeersBroadcastSetSize uint8 // how many peers to use when relaying
+ MaxPeersPerRequest uint8 // max size for peer address batches
+ KeepAliveInterval time.Duration
}
-//create default params
-func NewDefaultHiveParams() *HiveParams {
- kad := kademlia.NewDefaultKadParams()
- // kad.BucketSize = bucketSize
- // kad.MaxProx = maxProx
- // kad.ProxBinSize = proxBinSize
-
+// NewHiveParams returns hive config with only the
+func NewHiveParams() *HiveParams {
return &HiveParams{
- CallInterval: callInterval,
- KadParams: kad,
+ Discovery: true,
+ PeersBroadcastSetSize: 3,
+ MaxPeersPerRequest: 5,
+ KeepAliveInterval: 500 * time.Millisecond,
}
}
-//this can only finally be set after all config options (file, cmd line, env vars)
-//have been evaluated
-func (self *HiveParams) Init(path string) {
- self.KadDbPath = filepath.Join(path, "bzz-peers.json")
+// Hive manages network connections of the swarm node
+type Hive struct {
+ *HiveParams // settings
+ Overlay // the overlay connectiviy driver
+ Store state.Store // storage interface to save peers across sessions
+ addPeer func(*discover.Node) // server callback to connect to a peer
+ // bookkeeping
+ lock sync.Mutex
+ ticker *time.Ticker
}
-func NewHive(addr common.Hash, params *HiveParams, swapEnabled, syncEnabled bool) *Hive {
- kad := kademlia.New(kademlia.Address(addr), params.KadParams)
+// NewHive constructs a new hive
+// HiveParams: config parameters
+// Overlay: connectivity driver using a network topology
+// StateStore: to save peers across sessions
+func NewHive(params *HiveParams, overlay Overlay, store state.Store) *Hive {
return &Hive{
- callInterval: params.CallInterval,
- kad: kad,
- addr: kad.Addr(),
- path: params.KadDbPath,
- swapEnabled: swapEnabled,
- syncEnabled: syncEnabled,
+ HiveParams: params,
+ Overlay: overlay,
+ Store: store,
}
}
-func (self *Hive) SyncEnabled(on bool) {
- self.syncEnabled = on
-}
-
-func (self *Hive) SwapEnabled(on bool) {
- self.swapEnabled = on
-}
-
-func (self *Hive) BlockNetworkRead(on bool) {
- self.blockRead = on
-}
-
-func (self *Hive) BlockNetworkWrite(on bool) {
- self.blockWrite = on
-}
-
-// public accessor to the hive base address
-func (self *Hive) Addr() kademlia.Address {
- return self.addr
-}
-
-// Start receives network info only at startup
-// listedAddr is a function to retrieve listening address to advertise to peers
-// connectPeer is a function to connect to a peer based on its NodeID or enode URL
-// there are called on the p2p.Server which runs on the node
-func (self *Hive) Start(id discover.NodeID, listenAddr func() string, connectPeer func(string) error) (err error) {
- self.toggle = make(chan bool)
- self.more = make(chan bool)
- self.quit = make(chan bool)
- self.id = id
- self.listenAddr = listenAddr
- err = self.kad.Load(self.path, nil)
- if err != nil {
- log.Warn(fmt.Sprintf("Warning: error reading kaddb '%s' (skipping): %v", self.path, err))
- err = nil
+// Start stars the hive, receives p2p.Server only at startup
+// server is used to connect to a peer based on its NodeID or enode URL
+// these are called on the p2p.Server which runs on the node
+func (h *Hive) Start(server *p2p.Server) error {
+ log.Info(fmt.Sprintf("%08x hive starting", h.BaseAddr()[:4]))
+ // if state store is specified, load peers to prepopulate the overlay address book
+ if h.Store != nil {
+ log.Info("detected an existing store. trying to load peers")
+ if err := h.loadPeers(); err != nil {
+ log.Error(fmt.Sprintf("%08x hive encoutered an error trying to load peers", h.BaseAddr()[:4]))
+ return err
+ }
}
+ // assigns the p2p.Server#AddPeer function to connect to peers
+ h.addPeer = server.AddPeer
+ // ticker to keep the hive alive
+ h.ticker = time.NewTicker(h.KeepAliveInterval)
// this loop is doing bootstrapping and maintains a healthy table
- go self.keepAlive()
- go func() {
- // whenever toggled ask kademlia about most preferred peer
- for alive := range self.more {
- if !alive {
- // receiving false closes the loop while allowing parallel routines
- // to attempt to write to more (remove Peer when shutting down)
- return
- }
- node, need, proxLimit := self.kad.Suggest()
-
- if node != nil && len(node.Url) > 0 {
- log.Trace(fmt.Sprintf("call known bee %v", node.Url))
- // enode or any lower level connection address is unnecessary in future
- // discovery table is used to look it up.
- connectPeer(node.Url)
- }
- if need {
- // a random peer is taken from the table
- peers := self.kad.FindClosest(kademlia.RandomAddressAt(self.addr, rand.Intn(self.kad.MaxProx)), 1)
- if len(peers) > 0 {
- // a random address at prox bin 0 is sent for lookup
- randAddr := kademlia.RandomAddressAt(self.addr, proxLimit)
- req := &retrieveRequestMsgData{
- Key: storage.Key(randAddr[:]),
- }
- log.Trace(fmt.Sprintf("call any bee near %v (PO%03d) - messenger bee: %v", randAddr, proxLimit, peers[0]))
- peers[0].(*peer).retrieve(req)
- } else {
- log.Warn(fmt.Sprintf("no peer"))
- }
- log.Trace(fmt.Sprintf("buzz kept alive"))
- } else {
- log.Info(fmt.Sprintf("no need for more bees"))
- }
- select {
- case self.toggle <- need:
- case <-self.quit:
- return
- }
- log.Debug(fmt.Sprintf("queen's address: %v, population: %d (%d)", self.addr, self.kad.Count(), self.kad.DBCount()))
- }
- }()
- return
+ go h.connect()
+ return nil
}
-// keepAlive is a forever loop
-// in its awake state it periodically triggers connection attempts
-// by writing to self.more until Kademlia Table is saturated
-// wake state is toggled by writing to self.toggle
-// it restarts if the table becomes non-full again due to disconnections
-func (self *Hive) keepAlive() {
- alarm := time.NewTicker(time.Duration(self.callInterval)).C
- for {
- peersNumGauge.Update(int64(self.kad.Count()))
- select {
- case <-alarm:
- if self.kad.DBCount() > 0 {
- select {
- case self.more <- true:
- log.Debug(fmt.Sprintf("buzz wakeup"))
- default:
- }
- }
- case need := <-self.toggle:
- if alarm == nil && need {
- alarm = time.NewTicker(time.Duration(self.callInterval)).C
- }
- if alarm != nil && !need {
- alarm = nil
-
- }
- case <-self.quit:
- return
+// Stop terminates the updateloop and saves the peers
+func (h *Hive) Stop() error {
+ log.Info(fmt.Sprintf("%08x hive stopping, saving peers", h.BaseAddr()[:4]))
+ h.ticker.Stop()
+ if h.Store != nil {
+ if err := h.savePeers(); err != nil {
+ return fmt.Errorf("could not save peers to persistence store: %v", err)
}
- }
-}
-
-func (self *Hive) Stop() error {
- // closing toggle channel quits the updateloop
- close(self.quit)
- return self.kad.Save(self.path, saveSync)
-}
-
-// called at the end of a successful protocol handshake
-func (self *Hive) addPeer(p *peer) error {
- addPeerCounter.Inc(1)
- defer func() {
- select {
- case self.more <- true:
- default:
+ if err := h.Store.Close(); err != nil {
+ return fmt.Errorf("could not close file handle to persistence store: %v", err)
}
- }()
- log.Trace(fmt.Sprintf("hi new bee %v", p))
- err := self.kad.On(p, loadSync)
- if err != nil {
- return err
}
- // self lookup (can be encoded as nil/zero key since peers addr known) + no id ()
- // the most common way of saying hi in bzz is initiation of gossip
- // let me know about anyone new from my hood , here is the storageradius
- // to send the 6 byte self lookup
- // we do not record as request or forward it, just reply with peers
- p.retrieve(&retrieveRequestMsgData{})
- log.Trace(fmt.Sprintf("'whatsup wheresdaparty' sent to %v", p))
-
+ log.Info(fmt.Sprintf("%08x hive stopped, dropping peers", h.BaseAddr()[:4]))
+ h.EachConn(nil, 255, func(p OverlayConn, _ int, _ bool) bool {
+ log.Info(fmt.Sprintf("%08x dropping peer %08x", h.BaseAddr()[:4], p.Address()[:4]))
+ p.Drop(nil)
+ return true
+ })
+
+ log.Info(fmt.Sprintf("%08x all peers dropped", h.BaseAddr()[:4]))
return nil
}
-// called after peer disconnected
-func (self *Hive) removePeer(p *peer) {
- removePeerCounter.Inc(1)
- log.Debug(fmt.Sprintf("bee %v removed", p))
- self.kad.Off(p, saveSync)
- select {
- case self.more <- true:
- default:
- }
- if self.kad.Count() == 0 {
- log.Debug(fmt.Sprintf("empty, all bees gone"))
- }
-}
+// connect is a forever loop
+// at each iteration, ask the overlay driver to suggest the most preferred peer to connect to
+// as well as advertises saturation depth if needed
+func (h *Hive) connect() {
+ for range h.ticker.C {
-// Retrieve a list of live peers that are closer to target than us
-func (self *Hive) getPeers(target storage.Key, max int) (peers []*peer) {
- var addr kademlia.Address
- copy(addr[:], target[:])
- for _, node := range self.kad.FindClosest(addr, max) {
- peers = append(peers, node.(*peer))
- }
- return
-}
-
-// disconnects all the peers
-func (self *Hive) DropAll() {
- log.Info(fmt.Sprintf("dropping all bees"))
- for _, node := range self.kad.FindClosest(kademlia.Address{}, 0) {
- node.Drop()
- }
-}
-
-// contructor for kademlia.NodeRecord based on peer address alone
-// TODO: should go away and only addr passed to kademlia
-func newNodeRecord(addr *peerAddr) *kademlia.NodeRecord {
- now := time.Now()
- return &kademlia.NodeRecord{
- Addr: addr.Addr,
- Url: addr.String(),
- Seen: now,
- After: now,
- }
-}
+ addr, depth, changed := h.SuggestPeer()
+ if h.Discovery && changed {
+ NotifyDepth(uint8(depth), h)
+ }
+ if addr == nil {
+ continue
+ }
-// called by the protocol when receiving peerset (for target address)
-// peersMsgData is converted to a slice of NodeRecords for Kademlia
-// this is to store all thats needed
-func (self *Hive) HandlePeersMsg(req *peersMsgData, from *peer) {
- var nrs []*kademlia.NodeRecord
- for _, p := range req.Peers {
- if err := netutil.CheckRelayIP(from.remoteAddr.IP, p.IP); err != nil {
- log.Trace(fmt.Sprintf("invalid peer IP %v from %v: %v", from.remoteAddr.IP, p.IP, err))
+ log.Trace(fmt.Sprintf("%08x hive connect() suggested %08x", h.BaseAddr()[:4], addr.Address()[:4]))
+ under, err := discover.ParseNode(string(addr.(Addr).Under()))
+ if err != nil {
+ log.Warn(fmt.Sprintf("%08x unable to connect to bee %08x: invalid node URL: %v", h.BaseAddr()[:4], addr.Address()[:4], err))
continue
}
- nrs = append(nrs, newNodeRecord(p))
+ log.Trace(fmt.Sprintf("%08x attempt to connect to bee %08x", h.BaseAddr()[:4], addr.Address()[:4]))
+ h.addPeer(under)
}
- self.kad.Add(nrs)
}
-// peer wraps the protocol instance to represent a connected peer
-// it implements kademlia.Node interface
-type peer struct {
- *bzz // protocol instance running on peer connection
-}
-
-// protocol instance implements kademlia.Node interface (embedded peer)
-func (self *peer) Addr() kademlia.Address {
- return self.remoteAddr.Addr
+// Run protocol run function
+func (h *Hive) Run(p *BzzPeer) error {
+ dp := newDiscovery(p, h)
+ depth, changed := h.On(dp)
+ // if we want discovery, advertise change of depth
+ if h.Discovery {
+ if changed {
+ // if depth changed, send to all peers
+ NotifyDepth(depth, h)
+ } else {
+ // otherwise just send depth to new peer
+ dp.NotifyDepth(depth)
+ }
+ }
+ NotifyPeer(p.Off(), h)
+ defer h.Off(dp)
+ return dp.Run(dp.HandleMsg)
}
-func (self *peer) Url() string {
- return self.remoteAddr.String()
+// NodeInfo function is used by the p2p.server RPC interface to display
+// protocol specific node information
+func (h *Hive) NodeInfo() interface{} {
+ return h.String()
}
-// TODO take into account traffic
-func (self *peer) LastActive() time.Time {
- return self.lastActive
+// PeerInfo function is used by the p2p.server RPC interface to display
+// protocol specific information any connected peer referred to by their NodeID
+func (h *Hive) PeerInfo(id discover.NodeID) interface{} {
+ addr := NewAddrFromNodeID(id)
+ return struct {
+ OAddr hexutil.Bytes
+ UAddr hexutil.Bytes
+ }{
+ OAddr: addr.OAddr,
+ UAddr: addr.UAddr,
+ }
}
-// reads the serialised form of sync state persisted as the 'Meta' attribute
-// and sets the decoded syncState on the online node
-func loadSync(record *kademlia.NodeRecord, node kademlia.Node) error {
- p, ok := node.(*peer)
- if !ok {
- return fmt.Errorf("invalid type")
+// ToAddr returns the serialisable version of u
+func ToAddr(pa OverlayPeer) *BzzAddr {
+ if addr, ok := pa.(*BzzAddr); ok {
+ return addr
}
- if record.Meta == nil {
- log.Debug(fmt.Sprintf("no sync state for node record %v setting default", record))
- p.syncState = &syncState{DbSyncState: &storage.DbSyncState{}}
- return nil
+ if p, ok := pa.(*discPeer); ok {
+ return p.BzzAddr
}
- state, err := decodeSync(record.Meta)
- if err != nil {
- return fmt.Errorf("error decoding kddb record meta info into a sync state: %v", err)
- }
- log.Trace(fmt.Sprintf("sync state for node record %v read from Meta: %s", record, string(*(record.Meta))))
- p.syncState = state
- return err
+ return pa.(*BzzPeer).BzzAddr
}
-// callback when saving a sync state
-func saveSync(record *kademlia.NodeRecord, node kademlia.Node) {
- if p, ok := node.(*peer); ok {
- meta, err := encodeSync(p.syncState)
- if err != nil {
- log.Warn(fmt.Sprintf("error saving sync state for %v: %v", node, err))
- return
+// loadPeers, savePeer implement persistence callback/
+func (h *Hive) loadPeers() error {
+ var as []*BzzAddr
+ err := h.Store.Get("peers", &as)
+ if err != nil {
+ if err == state.ErrNotFound {
+ log.Info(fmt.Sprintf("hive %08x: no persisted peers found", h.BaseAddr()[:4]))
+ return nil
}
- log.Trace(fmt.Sprintf("saved sync state for %v: %s", node, string(*meta)))
- record.Meta = meta
+ return err
}
-}
+ log.Info(fmt.Sprintf("hive %08x: peers loaded", h.BaseAddr()[:4]))
-// the immediate response to a retrieve request,
-// sends relevant peer data given by the kademlia hive to the requester
-// TODO: remember peers sent for duration of the session, only new peers sent
-func (self *Hive) peers(req *retrieveRequestMsgData) {
- if req != nil {
- var addrs []*peerAddr
- if req.timeout == nil || time.Now().Before(*(req.timeout)) {
- key := req.Key
- // self lookup from remote peer
- if storage.IsZeroKey(key) {
- addr := req.from.Addr()
- key = storage.Key(addr[:])
- req.Key = nil
- }
- // get peer addresses from hive
- for _, peer := range self.getPeers(key, int(req.MaxPeers)) {
- addrs = append(addrs, peer.remoteAddr)
- }
- log.Debug(fmt.Sprintf("Hive sending %d peer addresses to %v. req.Id: %v, req.Key: %v", len(addrs), req.from, req.Id, req.Key.Log()))
+ return h.Register(toOverlayAddrs(as...))
+}
- peersData := &peersMsgData{
- Peers: addrs,
- Key: req.Key,
- Id: req.Id,
- }
- peersData.setTimeout(req.timeout)
- req.from.peers(peersData)
- }
+// toOverlayAddrs transforms an array of BzzAddr to OverlayAddr
+func toOverlayAddrs(as ...*BzzAddr) (oas []OverlayAddr) {
+ for _, a := range as {
+ oas = append(oas, OverlayAddr(a))
}
+ return
}
-func (self *Hive) String() string {
- return self.kad.String()
+// savePeers, savePeer implement persistence callback/
+func (h *Hive) savePeers() error {
+ var peers []*BzzAddr
+ h.Overlay.EachAddr(nil, 256, func(pa OverlayAddr, i int, _ bool) bool {
+ if pa == nil {
+ log.Warn(fmt.Sprintf("empty addr: %v", i))
+ return true
+ }
+ apa := ToAddr(pa)
+ log.Trace("saving peer", "peer", apa)
+ peers = append(peers, apa)
+ return true
+ })
+ if err := h.Store.Put("peers", peers); err != nil {
+ return fmt.Errorf("could not save peers: %v", err)
+ }
+ return nil
}