diff options
author | ethersphere <thesw@rm.eth> | 2018-06-20 20:06:27 +0800 |
---|---|---|
committer | ethersphere <thesw@rm.eth> | 2018-06-22 03:10:31 +0800 |
commit | e187711c6545487d4cac3701f0f506bb536234e2 (patch) | |
tree | d2f6150f70b84b36e49a449082aeda267b4b9046 /swarm/network/hive.go | |
parent | 574378edb50c907b532946a1d4654dbd6701b20a (diff) | |
download | dexon-e187711c6545487d4cac3701f0f506bb536234e2.tar dexon-e187711c6545487d4cac3701f0f506bb536234e2.tar.gz dexon-e187711c6545487d4cac3701f0f506bb536234e2.tar.bz2 dexon-e187711c6545487d4cac3701f0f506bb536234e2.tar.lz dexon-e187711c6545487d4cac3701f0f506bb536234e2.tar.xz dexon-e187711c6545487d4cac3701f0f506bb536234e2.tar.zst dexon-e187711c6545487d4cac3701f0f506bb536234e2.zip |
swarm: network rewrite merge
Diffstat (limited to 'swarm/network/hive.go')
-rw-r--r-- | swarm/network/hive.go | 514 |
1 files changed, 186 insertions, 328 deletions
diff --git a/swarm/network/hive.go b/swarm/network/hive.go index 8404ffcc2..a54a17d29 100644 --- a/swarm/network/hive.go +++ b/swarm/network/hive.go @@ -18,386 +18,244 @@ package network import ( "fmt" - "math/rand" - "path/filepath" + "sync" "time" - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/log" - "github.com/ethereum/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/discover" - "github.com/ethereum/go-ethereum/p2p/netutil" - "github.com/ethereum/go-ethereum/swarm/network/kademlia" - "github.com/ethereum/go-ethereum/swarm/storage" + "github.com/ethereum/go-ethereum/swarm/log" + "github.com/ethereum/go-ethereum/swarm/state" ) -// Hive is the logistic manager of the swarm -// it uses a generic kademlia nodetable to find best peer list -// for any target -// this is used by the netstore to search for content in the swarm -// the bzz protocol peersMsgData exchange is relayed to Kademlia -// for db storage and filtering -// connections and disconnections are reported and relayed -// to keep the nodetable uptodate - -var ( - peersNumGauge = metrics.NewRegisteredGauge("network.peers.num", nil) - addPeerCounter = metrics.NewRegisteredCounter("network.addpeer.count", nil) - removePeerCounter = metrics.NewRegisteredCounter("network.removepeer.count", nil) -) - -type Hive struct { - listenAddr func() string - callInterval uint64 - id discover.NodeID - addr kademlia.Address - kad *kademlia.Kademlia - path string - quit chan bool - toggle chan bool - more chan bool - - // for testing only - swapEnabled bool - syncEnabled bool - blockRead bool - blockWrite bool +/* +Hive is the logistic manager of the swarm + +When the hive is started, a forever loop is launched that +asks the Overlay Topology driver (e.g., generic kademlia nodetable) +to suggest peers to bootstrap connectivity +*/ + +// Overlay is the interface for kademlia (or other topology drivers) +type Overlay interface { + // suggest peers to connect to + SuggestPeer() (OverlayAddr, int, bool) + // register and deregister peer connections + On(OverlayConn) (depth uint8, changed bool) + Off(OverlayConn) + // register peer addresses + Register([]OverlayAddr) error + // iterate over connected peers + EachConn([]byte, int, func(OverlayConn, int, bool) bool) + // iterate over known peers (address records) + EachAddr([]byte, int, func(OverlayAddr, int, bool) bool) + // pretty print the connectivity + String() string + // base Overlay address of the node itself + BaseAddr() []byte + // connectivity health check used for testing + Healthy(*PeerPot) *Health } -const ( - callInterval = 3000000000 - // bucketSize = 3 - // maxProx = 8 - // proxBinSize = 4 -) - +// HiveParams holds the config options to hive type HiveParams struct { - CallInterval uint64 - KadDbPath string - *kademlia.KadParams + Discovery bool // if want discovery of not + PeersBroadcastSetSize uint8 // how many peers to use when relaying + MaxPeersPerRequest uint8 // max size for peer address batches + KeepAliveInterval time.Duration } -//create default params -func NewDefaultHiveParams() *HiveParams { - kad := kademlia.NewDefaultKadParams() - // kad.BucketSize = bucketSize - // kad.MaxProx = maxProx - // kad.ProxBinSize = proxBinSize - +// NewHiveParams returns hive config with only the +func NewHiveParams() *HiveParams { return &HiveParams{ - CallInterval: callInterval, - KadParams: kad, + Discovery: true, + PeersBroadcastSetSize: 3, + MaxPeersPerRequest: 5, + KeepAliveInterval: 500 * time.Millisecond, } } -//this can only finally be set after all config options (file, cmd line, env vars) -//have been evaluated -func (self *HiveParams) Init(path string) { - self.KadDbPath = filepath.Join(path, "bzz-peers.json") +// Hive manages network connections of the swarm node +type Hive struct { + *HiveParams // settings + Overlay // the overlay connectiviy driver + Store state.Store // storage interface to save peers across sessions + addPeer func(*discover.Node) // server callback to connect to a peer + // bookkeeping + lock sync.Mutex + ticker *time.Ticker } -func NewHive(addr common.Hash, params *HiveParams, swapEnabled, syncEnabled bool) *Hive { - kad := kademlia.New(kademlia.Address(addr), params.KadParams) +// NewHive constructs a new hive +// HiveParams: config parameters +// Overlay: connectivity driver using a network topology +// StateStore: to save peers across sessions +func NewHive(params *HiveParams, overlay Overlay, store state.Store) *Hive { return &Hive{ - callInterval: params.CallInterval, - kad: kad, - addr: kad.Addr(), - path: params.KadDbPath, - swapEnabled: swapEnabled, - syncEnabled: syncEnabled, + HiveParams: params, + Overlay: overlay, + Store: store, } } -func (self *Hive) SyncEnabled(on bool) { - self.syncEnabled = on -} - -func (self *Hive) SwapEnabled(on bool) { - self.swapEnabled = on -} - -func (self *Hive) BlockNetworkRead(on bool) { - self.blockRead = on -} - -func (self *Hive) BlockNetworkWrite(on bool) { - self.blockWrite = on -} - -// public accessor to the hive base address -func (self *Hive) Addr() kademlia.Address { - return self.addr -} - -// Start receives network info only at startup -// listedAddr is a function to retrieve listening address to advertise to peers -// connectPeer is a function to connect to a peer based on its NodeID or enode URL -// there are called on the p2p.Server which runs on the node -func (self *Hive) Start(id discover.NodeID, listenAddr func() string, connectPeer func(string) error) (err error) { - self.toggle = make(chan bool) - self.more = make(chan bool) - self.quit = make(chan bool) - self.id = id - self.listenAddr = listenAddr - err = self.kad.Load(self.path, nil) - if err != nil { - log.Warn(fmt.Sprintf("Warning: error reading kaddb '%s' (skipping): %v", self.path, err)) - err = nil +// Start stars the hive, receives p2p.Server only at startup +// server is used to connect to a peer based on its NodeID or enode URL +// these are called on the p2p.Server which runs on the node +func (h *Hive) Start(server *p2p.Server) error { + log.Info(fmt.Sprintf("%08x hive starting", h.BaseAddr()[:4])) + // if state store is specified, load peers to prepopulate the overlay address book + if h.Store != nil { + log.Info("detected an existing store. trying to load peers") + if err := h.loadPeers(); err != nil { + log.Error(fmt.Sprintf("%08x hive encoutered an error trying to load peers", h.BaseAddr()[:4])) + return err + } } + // assigns the p2p.Server#AddPeer function to connect to peers + h.addPeer = server.AddPeer + // ticker to keep the hive alive + h.ticker = time.NewTicker(h.KeepAliveInterval) // this loop is doing bootstrapping and maintains a healthy table - go self.keepAlive() - go func() { - // whenever toggled ask kademlia about most preferred peer - for alive := range self.more { - if !alive { - // receiving false closes the loop while allowing parallel routines - // to attempt to write to more (remove Peer when shutting down) - return - } - node, need, proxLimit := self.kad.Suggest() - - if node != nil && len(node.Url) > 0 { - log.Trace(fmt.Sprintf("call known bee %v", node.Url)) - // enode or any lower level connection address is unnecessary in future - // discovery table is used to look it up. - connectPeer(node.Url) - } - if need { - // a random peer is taken from the table - peers := self.kad.FindClosest(kademlia.RandomAddressAt(self.addr, rand.Intn(self.kad.MaxProx)), 1) - if len(peers) > 0 { - // a random address at prox bin 0 is sent for lookup - randAddr := kademlia.RandomAddressAt(self.addr, proxLimit) - req := &retrieveRequestMsgData{ - Key: storage.Key(randAddr[:]), - } - log.Trace(fmt.Sprintf("call any bee near %v (PO%03d) - messenger bee: %v", randAddr, proxLimit, peers[0])) - peers[0].(*peer).retrieve(req) - } else { - log.Warn(fmt.Sprintf("no peer")) - } - log.Trace(fmt.Sprintf("buzz kept alive")) - } else { - log.Info(fmt.Sprintf("no need for more bees")) - } - select { - case self.toggle <- need: - case <-self.quit: - return - } - log.Debug(fmt.Sprintf("queen's address: %v, population: %d (%d)", self.addr, self.kad.Count(), self.kad.DBCount())) - } - }() - return + go h.connect() + return nil } -// keepAlive is a forever loop -// in its awake state it periodically triggers connection attempts -// by writing to self.more until Kademlia Table is saturated -// wake state is toggled by writing to self.toggle -// it restarts if the table becomes non-full again due to disconnections -func (self *Hive) keepAlive() { - alarm := time.NewTicker(time.Duration(self.callInterval)).C - for { - peersNumGauge.Update(int64(self.kad.Count())) - select { - case <-alarm: - if self.kad.DBCount() > 0 { - select { - case self.more <- true: - log.Debug(fmt.Sprintf("buzz wakeup")) - default: - } - } - case need := <-self.toggle: - if alarm == nil && need { - alarm = time.NewTicker(time.Duration(self.callInterval)).C - } - if alarm != nil && !need { - alarm = nil - - } - case <-self.quit: - return +// Stop terminates the updateloop and saves the peers +func (h *Hive) Stop() error { + log.Info(fmt.Sprintf("%08x hive stopping, saving peers", h.BaseAddr()[:4])) + h.ticker.Stop() + if h.Store != nil { + if err := h.savePeers(); err != nil { + return fmt.Errorf("could not save peers to persistence store: %v", err) } - } -} - -func (self *Hive) Stop() error { - // closing toggle channel quits the updateloop - close(self.quit) - return self.kad.Save(self.path, saveSync) -} - -// called at the end of a successful protocol handshake -func (self *Hive) addPeer(p *peer) error { - addPeerCounter.Inc(1) - defer func() { - select { - case self.more <- true: - default: + if err := h.Store.Close(); err != nil { + return fmt.Errorf("could not close file handle to persistence store: %v", err) } - }() - log.Trace(fmt.Sprintf("hi new bee %v", p)) - err := self.kad.On(p, loadSync) - if err != nil { - return err } - // self lookup (can be encoded as nil/zero key since peers addr known) + no id () - // the most common way of saying hi in bzz is initiation of gossip - // let me know about anyone new from my hood , here is the storageradius - // to send the 6 byte self lookup - // we do not record as request or forward it, just reply with peers - p.retrieve(&retrieveRequestMsgData{}) - log.Trace(fmt.Sprintf("'whatsup wheresdaparty' sent to %v", p)) - + log.Info(fmt.Sprintf("%08x hive stopped, dropping peers", h.BaseAddr()[:4])) + h.EachConn(nil, 255, func(p OverlayConn, _ int, _ bool) bool { + log.Info(fmt.Sprintf("%08x dropping peer %08x", h.BaseAddr()[:4], p.Address()[:4])) + p.Drop(nil) + return true + }) + + log.Info(fmt.Sprintf("%08x all peers dropped", h.BaseAddr()[:4])) return nil } -// called after peer disconnected -func (self *Hive) removePeer(p *peer) { - removePeerCounter.Inc(1) - log.Debug(fmt.Sprintf("bee %v removed", p)) - self.kad.Off(p, saveSync) - select { - case self.more <- true: - default: - } - if self.kad.Count() == 0 { - log.Debug(fmt.Sprintf("empty, all bees gone")) - } -} +// connect is a forever loop +// at each iteration, ask the overlay driver to suggest the most preferred peer to connect to +// as well as advertises saturation depth if needed +func (h *Hive) connect() { + for range h.ticker.C { -// Retrieve a list of live peers that are closer to target than us -func (self *Hive) getPeers(target storage.Key, max int) (peers []*peer) { - var addr kademlia.Address - copy(addr[:], target[:]) - for _, node := range self.kad.FindClosest(addr, max) { - peers = append(peers, node.(*peer)) - } - return -} - -// disconnects all the peers -func (self *Hive) DropAll() { - log.Info(fmt.Sprintf("dropping all bees")) - for _, node := range self.kad.FindClosest(kademlia.Address{}, 0) { - node.Drop() - } -} - -// contructor for kademlia.NodeRecord based on peer address alone -// TODO: should go away and only addr passed to kademlia -func newNodeRecord(addr *peerAddr) *kademlia.NodeRecord { - now := time.Now() - return &kademlia.NodeRecord{ - Addr: addr.Addr, - Url: addr.String(), - Seen: now, - After: now, - } -} + addr, depth, changed := h.SuggestPeer() + if h.Discovery && changed { + NotifyDepth(uint8(depth), h) + } + if addr == nil { + continue + } -// called by the protocol when receiving peerset (for target address) -// peersMsgData is converted to a slice of NodeRecords for Kademlia -// this is to store all thats needed -func (self *Hive) HandlePeersMsg(req *peersMsgData, from *peer) { - var nrs []*kademlia.NodeRecord - for _, p := range req.Peers { - if err := netutil.CheckRelayIP(from.remoteAddr.IP, p.IP); err != nil { - log.Trace(fmt.Sprintf("invalid peer IP %v from %v: %v", from.remoteAddr.IP, p.IP, err)) + log.Trace(fmt.Sprintf("%08x hive connect() suggested %08x", h.BaseAddr()[:4], addr.Address()[:4])) + under, err := discover.ParseNode(string(addr.(Addr).Under())) + if err != nil { + log.Warn(fmt.Sprintf("%08x unable to connect to bee %08x: invalid node URL: %v", h.BaseAddr()[:4], addr.Address()[:4], err)) continue } - nrs = append(nrs, newNodeRecord(p)) + log.Trace(fmt.Sprintf("%08x attempt to connect to bee %08x", h.BaseAddr()[:4], addr.Address()[:4])) + h.addPeer(under) } - self.kad.Add(nrs) } -// peer wraps the protocol instance to represent a connected peer -// it implements kademlia.Node interface -type peer struct { - *bzz // protocol instance running on peer connection -} - -// protocol instance implements kademlia.Node interface (embedded peer) -func (self *peer) Addr() kademlia.Address { - return self.remoteAddr.Addr +// Run protocol run function +func (h *Hive) Run(p *BzzPeer) error { + dp := newDiscovery(p, h) + depth, changed := h.On(dp) + // if we want discovery, advertise change of depth + if h.Discovery { + if changed { + // if depth changed, send to all peers + NotifyDepth(depth, h) + } else { + // otherwise just send depth to new peer + dp.NotifyDepth(depth) + } + } + NotifyPeer(p.Off(), h) + defer h.Off(dp) + return dp.Run(dp.HandleMsg) } -func (self *peer) Url() string { - return self.remoteAddr.String() +// NodeInfo function is used by the p2p.server RPC interface to display +// protocol specific node information +func (h *Hive) NodeInfo() interface{} { + return h.String() } -// TODO take into account traffic -func (self *peer) LastActive() time.Time { - return self.lastActive +// PeerInfo function is used by the p2p.server RPC interface to display +// protocol specific information any connected peer referred to by their NodeID +func (h *Hive) PeerInfo(id discover.NodeID) interface{} { + addr := NewAddrFromNodeID(id) + return struct { + OAddr hexutil.Bytes + UAddr hexutil.Bytes + }{ + OAddr: addr.OAddr, + UAddr: addr.UAddr, + } } -// reads the serialised form of sync state persisted as the 'Meta' attribute -// and sets the decoded syncState on the online node -func loadSync(record *kademlia.NodeRecord, node kademlia.Node) error { - p, ok := node.(*peer) - if !ok { - return fmt.Errorf("invalid type") +// ToAddr returns the serialisable version of u +func ToAddr(pa OverlayPeer) *BzzAddr { + if addr, ok := pa.(*BzzAddr); ok { + return addr } - if record.Meta == nil { - log.Debug(fmt.Sprintf("no sync state for node record %v setting default", record)) - p.syncState = &syncState{DbSyncState: &storage.DbSyncState{}} - return nil + if p, ok := pa.(*discPeer); ok { + return p.BzzAddr } - state, err := decodeSync(record.Meta) - if err != nil { - return fmt.Errorf("error decoding kddb record meta info into a sync state: %v", err) - } - log.Trace(fmt.Sprintf("sync state for node record %v read from Meta: %s", record, string(*(record.Meta)))) - p.syncState = state - return err + return pa.(*BzzPeer).BzzAddr } -// callback when saving a sync state -func saveSync(record *kademlia.NodeRecord, node kademlia.Node) { - if p, ok := node.(*peer); ok { - meta, err := encodeSync(p.syncState) - if err != nil { - log.Warn(fmt.Sprintf("error saving sync state for %v: %v", node, err)) - return +// loadPeers, savePeer implement persistence callback/ +func (h *Hive) loadPeers() error { + var as []*BzzAddr + err := h.Store.Get("peers", &as) + if err != nil { + if err == state.ErrNotFound { + log.Info(fmt.Sprintf("hive %08x: no persisted peers found", h.BaseAddr()[:4])) + return nil } - log.Trace(fmt.Sprintf("saved sync state for %v: %s", node, string(*meta))) - record.Meta = meta + return err } -} + log.Info(fmt.Sprintf("hive %08x: peers loaded", h.BaseAddr()[:4])) -// the immediate response to a retrieve request, -// sends relevant peer data given by the kademlia hive to the requester -// TODO: remember peers sent for duration of the session, only new peers sent -func (self *Hive) peers(req *retrieveRequestMsgData) { - if req != nil { - var addrs []*peerAddr - if req.timeout == nil || time.Now().Before(*(req.timeout)) { - key := req.Key - // self lookup from remote peer - if storage.IsZeroKey(key) { - addr := req.from.Addr() - key = storage.Key(addr[:]) - req.Key = nil - } - // get peer addresses from hive - for _, peer := range self.getPeers(key, int(req.MaxPeers)) { - addrs = append(addrs, peer.remoteAddr) - } - log.Debug(fmt.Sprintf("Hive sending %d peer addresses to %v. req.Id: %v, req.Key: %v", len(addrs), req.from, req.Id, req.Key.Log())) + return h.Register(toOverlayAddrs(as...)) +} - peersData := &peersMsgData{ - Peers: addrs, - Key: req.Key, - Id: req.Id, - } - peersData.setTimeout(req.timeout) - req.from.peers(peersData) - } +// toOverlayAddrs transforms an array of BzzAddr to OverlayAddr +func toOverlayAddrs(as ...*BzzAddr) (oas []OverlayAddr) { + for _, a := range as { + oas = append(oas, OverlayAddr(a)) } + return } -func (self *Hive) String() string { - return self.kad.String() +// savePeers, savePeer implement persistence callback/ +func (h *Hive) savePeers() error { + var peers []*BzzAddr + h.Overlay.EachAddr(nil, 256, func(pa OverlayAddr, i int, _ bool) bool { + if pa == nil { + log.Warn(fmt.Sprintf("empty addr: %v", i)) + return true + } + apa := ToAddr(pa) + log.Trace("saving peer", "peer", apa) + peers = append(peers, apa) + return true + }) + if err := h.Store.Put("peers", peers); err != nil { + return fmt.Errorf("could not save peers: %v", err) + } + return nil } |