aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/network/protocol.go
diff options
context:
space:
mode:
authorethersphere <thesw@rm.eth>2018-06-20 20:06:27 +0800
committerethersphere <thesw@rm.eth>2018-06-22 03:10:31 +0800
commite187711c6545487d4cac3701f0f506bb536234e2 (patch)
treed2f6150f70b84b36e49a449082aeda267b4b9046 /swarm/network/protocol.go
parent574378edb50c907b532946a1d4654dbd6701b20a (diff)
downloadgo-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.tar
go-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.tar.gz
go-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.tar.bz2
go-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.tar.lz
go-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.tar.xz
go-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.tar.zst
go-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.zip
swarm: network rewrite merge
Diffstat (limited to 'swarm/network/protocol.go')
-rw-r--r--swarm/network/protocol.go759
1 files changed, 327 insertions, 432 deletions
diff --git a/swarm/network/protocol.go b/swarm/network/protocol.go
index 1cbe00a97..39673f5a1 100644
--- a/swarm/network/protocol.go
+++ b/swarm/network/protocol.go
@@ -16,519 +16,414 @@
package network
-/*
-bzz implements the swarm wire protocol [bzz] (sister of eth and shh)
-the protocol instance is launched on each peer by the network layer if the
-bzz protocol handler is registered on the p2p server.
-
-The bzz protocol component speaks the bzz protocol
-* handle the protocol handshake
-* register peers in the KΛÐΞMLIΛ table via the hive logistic manager
-* dispatch to hive for handling the DHT logic
-* encode and decode requests for storage and retrieval
-* handle sync protocol messages via the syncer
-* talks the SWAP payment protocol (swap accounting is done within NetStore)
-*/
-
import (
+ "context"
"errors"
"fmt"
"net"
- "strconv"
+ "sync"
"time"
- "github.com/ethereum/go-ethereum/contracts/chequebook"
- "github.com/ethereum/go-ethereum/log"
- "github.com/ethereum/go-ethereum/metrics"
+ "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/p2p"
- bzzswap "github.com/ethereum/go-ethereum/swarm/services/swap"
- "github.com/ethereum/go-ethereum/swarm/services/swap/swap"
- "github.com/ethereum/go-ethereum/swarm/storage"
-)
-
-//metrics variables
-var (
- storeRequestMsgCounter = metrics.NewRegisteredCounter("network.protocol.msg.storerequest.count", nil)
- retrieveRequestMsgCounter = metrics.NewRegisteredCounter("network.protocol.msg.retrieverequest.count", nil)
- peersMsgCounter = metrics.NewRegisteredCounter("network.protocol.msg.peers.count", nil)
- syncRequestMsgCounter = metrics.NewRegisteredCounter("network.protocol.msg.syncrequest.count", nil)
- unsyncedKeysMsgCounter = metrics.NewRegisteredCounter("network.protocol.msg.unsyncedkeys.count", nil)
- deliverRequestMsgCounter = metrics.NewRegisteredCounter("network.protocol.msg.deliverrequest.count", nil)
- paymentMsgCounter = metrics.NewRegisteredCounter("network.protocol.msg.payment.count", nil)
- invalidMsgCounter = metrics.NewRegisteredCounter("network.protocol.msg.invalid.count", nil)
- handleStatusMsgCounter = metrics.NewRegisteredCounter("network.protocol.msg.handlestatus.count", nil)
+ "github.com/ethereum/go-ethereum/p2p/discover"
+ "github.com/ethereum/go-ethereum/p2p/protocols"
+ "github.com/ethereum/go-ethereum/rpc"
+ "github.com/ethereum/go-ethereum/swarm/log"
+ "github.com/ethereum/go-ethereum/swarm/state"
)
const (
- Version = 0
- ProtocolLength = uint64(8)
+ DefaultNetworkID = 3
+ // ProtocolMaxMsgSize maximum allowed message size
ProtocolMaxMsgSize = 10 * 1024 * 1024
- NetworkId = 3
+ // timeout for waiting
+ bzzHandshakeTimeout = 3000 * time.Millisecond
)
-// bzz represents the swarm wire protocol
-// an instance is running on each peer
-type bzz struct {
- storage StorageHandler // handler storage/retrieval related requests coming via the bzz wire protocol
- hive *Hive // the logistic manager, peerPool, routing service and peer handler
- dbAccess *DbAccess // access to db storage counter and iterator for syncing
- requestDb *storage.LDBDatabase // db to persist backlog of deliveries to aid syncing
- remoteAddr *peerAddr // remote peers address
- peer *p2p.Peer // the p2p peer object
- rw p2p.MsgReadWriter // messageReadWriter to send messages to
- backend chequebook.Backend
- lastActive time.Time
- NetworkId uint64
-
- swap *swap.Swap // swap instance for the peer connection
- swapParams *bzzswap.SwapParams // swap settings both local and remote
- swapEnabled bool // flag to enable SWAP (will be set via Caps in handshake)
- syncEnabled bool // flag to enable SYNC (will be set via Caps in handshake)
- syncer *syncer // syncer instance for the peer connection
- syncParams *SyncParams // syncer params
- syncState *syncState // outgoing syncronisation state (contains reference to remote peers db counter)
-}
-
-// interface type for handler of storage/retrieval related requests coming
-// via the bzz wire protocol
-// messages: UnsyncedKeys, DeliveryRequest, StoreRequest, RetrieveRequest
-type StorageHandler interface {
- HandleUnsyncedKeysMsg(req *unsyncedKeysMsgData, p *peer) error
- HandleDeliveryRequestMsg(req *deliveryRequestMsgData, p *peer) error
- HandleStoreRequestMsg(req *storeRequestMsgData, p *peer)
- HandleRetrieveRequestMsg(req *retrieveRequestMsgData, p *peer)
+// BzzSpec is the spec of the generic swarm handshake
+var BzzSpec = &protocols.Spec{
+ Name: "bzz",
+ Version: 4,
+ MaxMsgSize: 10 * 1024 * 1024,
+ Messages: []interface{}{
+ HandshakeMsg{},
+ },
}
-/*
-main entrypoint, wrappers starting a server that will run the bzz protocol
-use this constructor to attach the protocol ("class") to server caps
-This is done by node.Node#Register(func(node.ServiceContext) (Service, error))
-Service implements Protocols() which is an array of protocol constructors
-at node startup the protocols are initialised
-the Dev p2p layer then calls Run(p *p2p.Peer, rw p2p.MsgReadWriter) error
-on each peer connection
-The Run function of the Bzz protocol class creates a bzz instance
-which will represent the peer for the swarm hive and all peer-aware components
-*/
-func Bzz(cloud StorageHandler, backend chequebook.Backend, hive *Hive, dbaccess *DbAccess, sp *bzzswap.SwapParams, sy *SyncParams, networkId uint64) (p2p.Protocol, error) {
+// DiscoverySpec is the spec for the bzz discovery subprotocols
+var DiscoverySpec = &protocols.Spec{
+ Name: "hive",
+ Version: 4,
+ MaxMsgSize: 10 * 1024 * 1024,
+ Messages: []interface{}{
+ peersMsg{},
+ subPeersMsg{},
+ },
+}
- // a single global request db is created for all peer connections
- // this is to persist delivery backlog and aid syncronisation
- requestDb, err := storage.NewLDBDatabase(sy.RequestDbPath)
- if err != nil {
- return p2p.Protocol{}, fmt.Errorf("error setting up request db: %v", err)
- }
- if networkId == 0 {
- networkId = NetworkId
- }
- return p2p.Protocol{
- Name: "bzz",
- Version: Version,
- Length: ProtocolLength,
- Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
- return run(requestDb, cloud, backend, hive, dbaccess, sp, sy, networkId, p, rw)
- },
- }, nil
+// Addr interface that peerPool needs
+type Addr interface {
+ OverlayPeer
+ Over() []byte
+ Under() []byte
+ String() string
+ Update(OverlayAddr) OverlayAddr
}
-/*
-the main protocol loop that
- * does the handshake by exchanging statusMsg
- * if peer is valid and accepted, registers with the hive
- * then enters into a forever loop handling incoming messages
- * storage and retrieval related queries coming via bzz are dispatched to StorageHandler
- * peer-related messages are dispatched to the hive
- * payment related messages are relayed to SWAP service
- * on disconnect, unregister the peer in the hive (note RemovePeer in the post-disconnect hook)
- * whenever the loop terminates, the peer will disconnect with Subprotocol error
- * whenever handlers return an error the loop terminates
-*/
-func run(requestDb *storage.LDBDatabase, depo StorageHandler, backend chequebook.Backend, hive *Hive, dbaccess *DbAccess, sp *bzzswap.SwapParams, sy *SyncParams, networkId uint64, p *p2p.Peer, rw p2p.MsgReadWriter) (err error) {
-
- self := &bzz{
- storage: depo,
- backend: backend,
- hive: hive,
- dbAccess: dbaccess,
- requestDb: requestDb,
- peer: p,
- rw: rw,
- swapParams: sp,
- syncParams: sy,
- swapEnabled: hive.swapEnabled,
- syncEnabled: true,
- NetworkId: networkId,
- }
+// Peer interface represents an live peer connection
+type Peer interface {
+ Addr // the address of a peer
+ Conn // the live connection (protocols.Peer)
+ LastActive() time.Time // last time active
+}
- // handle handshake
- err = self.handleStatus()
- if err != nil {
- return err
- }
- defer func() {
- // if the handler loop exits, the peer is disconnecting
- // deregister the peer in the hive
- self.hive.removePeer(&peer{bzz: self})
- if self.syncer != nil {
- self.syncer.stop() // quits request db and delivery loops, save requests
- }
- if self.swap != nil {
- self.swap.Stop() // quits chequebox autocash etc
- }
- }()
+// Conn interface represents an live peer connection
+type Conn interface {
+ ID() discover.NodeID // the key that uniquely identifies the Node for the peerPool
+ Handshake(context.Context, interface{}, func(interface{}) error) (interface{}, error) // can send messages
+ Send(interface{}) error // can send messages
+ Drop(error) // disconnect this peer
+ Run(func(interface{}) error) error // the run function to run a protocol
+ Off() OverlayAddr
+}
- // the main forever loop that handles incoming requests
- for {
- if self.hive.blockRead {
- log.Warn(fmt.Sprintf("Cannot read network"))
- time.Sleep(100 * time.Millisecond)
- continue
- }
- err = self.handle()
- if err != nil {
- return
- }
+// BzzConfig captures the config params used by the hive
+type BzzConfig struct {
+ OverlayAddr []byte // base address of the overlay network
+ UnderlayAddr []byte // node's underlay address
+ HiveParams *HiveParams
+ NetworkID uint64
+}
+
+// Bzz is the swarm protocol bundle
+type Bzz struct {
+ *Hive
+ NetworkID uint64
+ localAddr *BzzAddr
+ mtx sync.Mutex
+ handshakes map[discover.NodeID]*HandshakeMsg
+ streamerSpec *protocols.Spec
+ streamerRun func(*BzzPeer) error
+}
+
+// NewBzz is the swarm protocol constructor
+// arguments
+// * bzz config
+// * overlay driver
+// * peer store
+func NewBzz(config *BzzConfig, kad Overlay, store state.Store, streamerSpec *protocols.Spec, streamerRun func(*BzzPeer) error) *Bzz {
+ return &Bzz{
+ Hive: NewHive(config.HiveParams, kad, store),
+ NetworkID: config.NetworkID,
+ localAddr: &BzzAddr{config.OverlayAddr, config.UnderlayAddr},
+ handshakes: make(map[discover.NodeID]*HandshakeMsg),
+ streamerRun: streamerRun,
+ streamerSpec: streamerSpec,
}
}
-// TODO: may need to implement protocol drop only? don't want to kick off the peer
-// if they are useful for other protocols
-func (self *bzz) Drop() {
- self.peer.Disconnect(p2p.DiscSubprotocolError)
+// UpdateLocalAddr updates underlayaddress of the running node
+func (b *Bzz) UpdateLocalAddr(byteaddr []byte) *BzzAddr {
+ b.localAddr = b.localAddr.Update(&BzzAddr{
+ UAddr: byteaddr,
+ OAddr: b.localAddr.OAddr,
+ }).(*BzzAddr)
+ return b.localAddr
}
-// one cycle of the main forever loop that handles and dispatches incoming messages
-func (self *bzz) handle() error {
- msg, err := self.rw.ReadMsg()
- log.Debug(fmt.Sprintf("<- %v", msg))
- if err != nil {
- return err
+// NodeInfo returns the node's overlay address
+func (b *Bzz) NodeInfo() interface{} {
+ return b.localAddr.Address()
+}
+
+// Protocols return the protocols swarm offers
+// Bzz implements the node.Service interface
+// * handshake/hive
+// * discovery
+func (b *Bzz) Protocols() []p2p.Protocol {
+ protocol := []p2p.Protocol{
+ {
+ Name: BzzSpec.Name,
+ Version: BzzSpec.Version,
+ Length: BzzSpec.Length(),
+ Run: b.runBzz,
+ NodeInfo: b.NodeInfo,
+ },
+ {
+ Name: DiscoverySpec.Name,
+ Version: DiscoverySpec.Version,
+ Length: DiscoverySpec.Length(),
+ Run: b.RunProtocol(DiscoverySpec, b.Hive.Run),
+ NodeInfo: b.Hive.NodeInfo,
+ PeerInfo: b.Hive.PeerInfo,
+ },
}
- if msg.Size > ProtocolMaxMsgSize {
- return fmt.Errorf("message too long: %v > %v", msg.Size, ProtocolMaxMsgSize)
+ if b.streamerSpec != nil && b.streamerRun != nil {
+ protocol = append(protocol, p2p.Protocol{
+ Name: b.streamerSpec.Name,
+ Version: b.streamerSpec.Version,
+ Length: b.streamerSpec.Length(),
+ Run: b.RunProtocol(b.streamerSpec, b.streamerRun),
+ })
}
- // make sure that the payload has been fully consumed
- defer msg.Discard()
-
- switch msg.Code {
-
- case statusMsg:
- // no extra status message allowed. The one needed already handled by
- // handleStatus
- log.Debug(fmt.Sprintf("Status message: %v", msg))
- return errors.New("extra status message")
-
- case storeRequestMsg:
- // store requests are dispatched to netStore
- storeRequestMsgCounter.Inc(1)
- var req storeRequestMsgData
- if err := msg.Decode(&req); err != nil {
- return fmt.Errorf("<- %v: %v", msg, err)
- }
- if n := len(req.SData); n < 9 {
- return fmt.Errorf("<- %v: Data too short (%v)", msg, n)
- }
- // last Active time is set only when receiving chunks
- self.lastActive = time.Now()
- log.Trace(fmt.Sprintf("incoming store request: %s", req.String()))
- // swap accounting is done within forwarding
- self.storage.HandleStoreRequestMsg(&req, &peer{bzz: self})
-
- case retrieveRequestMsg:
- // retrieve Requests are dispatched to netStore
- retrieveRequestMsgCounter.Inc(1)
- var req retrieveRequestMsgData
- if err := msg.Decode(&req); err != nil {
- return fmt.Errorf("<- %v: %v", msg, err)
- }
- req.from = &peer{bzz: self}
- // if request is lookup and not to be delivered
- if req.isLookup() {
- log.Trace(fmt.Sprintf("self lookup for %v: responding with peers only...", req.from))
- } else if req.Key == nil {
- return fmt.Errorf("protocol handler: req.Key == nil || req.Timeout == nil")
- } else {
- // swap accounting is done within netStore
- self.storage.HandleRetrieveRequestMsg(&req, &peer{bzz: self})
- }
- // direct response with peers, TODO: sort this out
- self.hive.peers(&req)
-
- case peersMsg:
- // response to lookups and immediate response to retrieve requests
- // dispatches new peer data to the hive that adds them to KADDB
- peersMsgCounter.Inc(1)
- var req peersMsgData
- if err := msg.Decode(&req); err != nil {
- return fmt.Errorf("<- %v: %v", msg, err)
- }
- req.from = &peer{bzz: self}
- log.Trace(fmt.Sprintf("<- peer addresses: %v", req))
- self.hive.HandlePeersMsg(&req, &peer{bzz: self})
-
- case syncRequestMsg:
- syncRequestMsgCounter.Inc(1)
- var req syncRequestMsgData
- if err := msg.Decode(&req); err != nil {
- return fmt.Errorf("<- %v: %v", msg, err)
- }
- log.Debug(fmt.Sprintf("<- sync request: %v", req))
- self.lastActive = time.Now()
- self.sync(req.SyncState)
-
- case unsyncedKeysMsg:
- // coming from parent node offering
- unsyncedKeysMsgCounter.Inc(1)
- var req unsyncedKeysMsgData
- if err := msg.Decode(&req); err != nil {
- return fmt.Errorf("<- %v: %v", msg, err)
- }
- log.Debug(fmt.Sprintf("<- unsynced keys : %s", req.String()))
- err := self.storage.HandleUnsyncedKeysMsg(&req, &peer{bzz: self})
- self.lastActive = time.Now()
- if err != nil {
- return fmt.Errorf("<- %v: %v", msg, err)
- }
+ return protocol
+}
+
+// APIs returns the APIs offered by bzz
+// * hive
+// Bzz implements the node.Service interface
+func (b *Bzz) APIs() []rpc.API {
+ return []rpc.API{{
+ Namespace: "hive",
+ Version: "3.0",
+ Service: b.Hive,
+ }}
+}
- case deliveryRequestMsg:
- // response to syncKeysMsg hashes filtered not existing in db
- // also relays the last synced state to the source
- deliverRequestMsgCounter.Inc(1)
- var req deliveryRequestMsgData
- if err := msg.Decode(&req); err != nil {
- return fmt.Errorf("<-msg %v: %v", msg, err)
+// RunProtocol is a wrapper for swarm subprotocols
+// returns a p2p protocol run function that can be assigned to p2p.Protocol#Run field
+// arguments:
+// * p2p protocol spec
+// * run function taking BzzPeer as argument
+// this run function is meant to block for the duration of the protocol session
+// on return the session is terminated and the peer is disconnected
+// the protocol waits for the bzz handshake is negotiated
+// the overlay address on the BzzPeer is set from the remote handshake
+func (b *Bzz) RunProtocol(spec *protocols.Spec, run func(*BzzPeer) error) func(*p2p.Peer, p2p.MsgReadWriter) error {
+ return func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
+ // wait for the bzz protocol to perform the handshake
+ handshake, _ := b.GetHandshake(p.ID())
+ defer b.removeHandshake(p.ID())
+ select {
+ case <-handshake.done:
+ case <-time.After(bzzHandshakeTimeout):
+ return fmt.Errorf("%08x: %s protocol timeout waiting for handshake on %08x", b.BaseAddr()[:4], spec.Name, p.ID().Bytes()[:4])
}
- log.Debug(fmt.Sprintf("<- delivery request: %s", req.String()))
- err := self.storage.HandleDeliveryRequestMsg(&req, &peer{bzz: self})
- self.lastActive = time.Now()
- if err != nil {
- return fmt.Errorf("<- %v: %v", msg, err)
+ if handshake.err != nil {
+ return fmt.Errorf("%08x: %s protocol closed: %v", b.BaseAddr()[:4], spec.Name, handshake.err)
}
-
- case paymentMsg:
- // swap protocol message for payment, Units paid for, Cheque paid with
- paymentMsgCounter.Inc(1)
- if self.swapEnabled {
- var req paymentMsgData
- if err := msg.Decode(&req); err != nil {
- return fmt.Errorf("<- %v: %v", msg, err)
- }
- log.Debug(fmt.Sprintf("<- payment: %s", req.String()))
- self.swap.Receive(int(req.Units), req.Promise)
+ // the handshake has succeeded so construct the BzzPeer and run the protocol
+ peer := &BzzPeer{
+ Peer: protocols.NewPeer(p, rw, spec),
+ localAddr: b.localAddr,
+ BzzAddr: handshake.peerAddr,
+ lastActive: time.Now(),
}
+ return run(peer)
+ }
+}
- default:
- // no other message is allowed
- invalidMsgCounter.Inc(1)
- return fmt.Errorf("invalid message code: %v", msg.Code)
+// performHandshake implements the negotiation of the bzz handshake
+// shared among swarm subprotocols
+func (b *Bzz) performHandshake(p *protocols.Peer, handshake *HandshakeMsg) error {
+ ctx, cancel := context.WithTimeout(context.Background(), bzzHandshakeTimeout)
+ defer func() {
+ close(handshake.done)
+ cancel()
+ }()
+ rsh, err := p.Handshake(ctx, handshake, b.checkHandshake)
+ if err != nil {
+ handshake.err = err
+ return err
}
+ handshake.peerAddr = rsh.(*HandshakeMsg).Addr
return nil
}
-func (self *bzz) handleStatus() (err error) {
-
- handshake := &statusMsgData{
- Version: uint64(Version),
- ID: "honey",
- Addr: self.selfAddr(),
- NetworkId: self.NetworkId,
- Swap: &bzzswap.SwapProfile{
- Profile: self.swapParams.Profile,
- PayProfile: self.swapParams.PayProfile,
- },
+// runBzz is the p2p protocol run function for the bzz base protocol
+// that negotiates the bzz handshake
+func (b *Bzz) runBzz(p *p2p.Peer, rw p2p.MsgReadWriter) error {
+ handshake, _ := b.GetHandshake(p.ID())
+ if !<-handshake.init {
+ return fmt.Errorf("%08x: bzz already started on peer %08x", b.localAddr.Over()[:4], ToOverlayAddr(p.ID().Bytes())[:4])
}
-
- err = p2p.Send(self.rw, statusMsg, handshake)
+ close(handshake.init)
+ defer b.removeHandshake(p.ID())
+ peer := protocols.NewPeer(p, rw, BzzSpec)
+ err := b.performHandshake(peer, handshake)
if err != nil {
+ log.Warn(fmt.Sprintf("%08x: handshake failed with remote peer %08x: %v", b.localAddr.Over()[:4], ToOverlayAddr(p.ID().Bytes())[:4], err))
+
return err
}
-
- // read and handle remote status
- var msg p2p.Msg
- msg, err = self.rw.ReadMsg()
+ // fail if we get another handshake
+ msg, err := rw.ReadMsg()
if err != nil {
return err
}
+ msg.Discard()
+ return errors.New("received multiple handshakes")
+}
- if msg.Code != statusMsg {
- return fmt.Errorf("first msg has code %x (!= %x)", msg.Code, statusMsg)
- }
-
- handleStatusMsgCounter.Inc(1)
-
- if msg.Size > ProtocolMaxMsgSize {
- return fmt.Errorf("message too long: %v > %v", msg.Size, ProtocolMaxMsgSize)
- }
-
- var status statusMsgData
- if err := msg.Decode(&status); err != nil {
- return fmt.Errorf("<- %v: %v", msg, err)
- }
+// BzzPeer is the bzz protocol view of a protocols.Peer (itself an extension of p2p.Peer)
+// implements the Peer interface and all interfaces Peer implements: Addr, OverlayPeer
+type BzzPeer struct {
+ *protocols.Peer // represents the connection for online peers
+ localAddr *BzzAddr // local Peers address
+ *BzzAddr // remote address -> implements Addr interface = protocols.Peer
+ lastActive time.Time // time is updated whenever mutexes are releasing
+}
- if status.NetworkId != self.NetworkId {
- return fmt.Errorf("network id mismatch: %d (!= %d)", status.NetworkId, self.NetworkId)
+func NewBzzTestPeer(p *protocols.Peer, addr *BzzAddr) *BzzPeer {
+ return &BzzPeer{
+ Peer: p,
+ localAddr: addr,
+ BzzAddr: NewAddrFromNodeID(p.ID()),
}
+}
- if Version != status.Version {
- return fmt.Errorf("protocol version mismatch: %d (!= %d)", status.Version, Version)
- }
+// Off returns the overlay peer record for offline persistence
+func (p *BzzPeer) Off() OverlayAddr {
+ return p.BzzAddr
+}
- self.remoteAddr = self.peerAddr(status.Addr)
- log.Trace(fmt.Sprintf("self: advertised IP: %v, peer advertised: %v, local address: %v\npeer: advertised IP: %v, remote address: %v\n", self.selfAddr(), self.remoteAddr, self.peer.LocalAddr(), status.Addr.IP, self.peer.RemoteAddr()))
+// LastActive returns the time the peer was last active
+func (p *BzzPeer) LastActive() time.Time {
+ return p.lastActive
+}
- if self.swapEnabled {
- // set remote profile for accounting
- self.swap, err = bzzswap.NewSwap(self.swapParams, status.Swap, self.backend, self)
- if err != nil {
- return err
- }
- }
+/*
+ Handshake
- log.Info(fmt.Sprintf("Peer %08x is capable (%d/%d)", self.remoteAddr.Addr[:4], status.Version, status.NetworkId))
- err = self.hive.addPeer(&peer{bzz: self})
- if err != nil {
- return err
- }
+* Version: 8 byte integer version of the protocol
+* NetworkID: 8 byte integer network identifier
+* Addr: the address advertised by the node including underlay and overlay connecctions
+*/
+type HandshakeMsg struct {
+ Version uint64
+ NetworkID uint64
+ Addr *BzzAddr
- // hive sets syncstate so sync should start after node added
- log.Info(fmt.Sprintf("syncronisation request sent with %v", self.syncState))
- self.syncRequest()
+ // peerAddr is the address received in the peer handshake
+ peerAddr *BzzAddr
- return nil
+ init chan bool
+ done chan struct{}
+ err error
}
-func (self *bzz) sync(state *syncState) error {
- // syncer setup
- if self.syncer != nil {
- return errors.New("sync request can only be sent once")
- }
+// String pretty prints the handshake
+func (bh *HandshakeMsg) String() string {
+ return fmt.Sprintf("Handshake: Version: %v, NetworkID: %v, Addr: %v", bh.Version, bh.NetworkID, bh.Addr)
+}
- cnt := self.dbAccess.counter()
- remoteaddr := self.remoteAddr.Addr
- start, stop := self.hive.kad.KeyRange(remoteaddr)
-
- // an explicitly received nil syncstate disables syncronisation
- if state == nil {
- self.syncEnabled = false
- log.Warn(fmt.Sprintf("syncronisation disabled for peer %v", self))
- state = &syncState{DbSyncState: &storage.DbSyncState{}, Synced: true}
- } else {
- state.synced = make(chan bool)
- state.SessionAt = cnt
- if storage.IsZeroKey(state.Stop) && state.Synced {
- state.Start = storage.Key(start[:])
- state.Stop = storage.Key(stop[:])
- }
- log.Debug(fmt.Sprintf("syncronisation requested by peer %v at state %v", self, state))
+// Perform initiates the handshake and validates the remote handshake message
+func (b *Bzz) checkHandshake(hs interface{}) error {
+ rhs := hs.(*HandshakeMsg)
+ if rhs.NetworkID != b.NetworkID {
+ return fmt.Errorf("network id mismatch %d (!= %d)", rhs.NetworkID, b.NetworkID)
}
- var err error
- self.syncer, err = newSyncer(
- self.requestDb,
- storage.Key(remoteaddr[:]),
- self.dbAccess,
- self.unsyncedKeys, self.store,
- self.syncParams, state, func() bool { return self.syncEnabled },
- )
- if err != nil {
- return nil
+ if rhs.Version != uint64(BzzSpec.Version) {
+ return fmt.Errorf("version mismatch %d (!= %d)", rhs.Version, BzzSpec.Version)
}
- log.Trace(fmt.Sprintf("syncer set for peer %v", self))
return nil
}
-func (self *bzz) String() string {
- return self.remoteAddr.String()
+// removeHandshake removes handshake for peer with peerID
+// from the bzz handshake store
+func (b *Bzz) removeHandshake(peerID discover.NodeID) {
+ b.mtx.Lock()
+ defer b.mtx.Unlock()
+ delete(b.handshakes, peerID)
}
-// repair reported address if IP missing
-func (self *bzz) peerAddr(base *peerAddr) *peerAddr {
- if base.IP.IsUnspecified() {
- host, _, _ := net.SplitHostPort(self.peer.RemoteAddr().String())
- base.IP = net.ParseIP(host)
- }
- return base
-}
-
-// returns self advertised node connection info (listening address w enodes)
-// IP will get repaired on the other end if missing
-// or resolved via ID by discovery at dialout
-func (self *bzz) selfAddr() *peerAddr {
- id := self.hive.id
- host, port, _ := net.SplitHostPort(self.hive.listenAddr())
- intport, _ := strconv.Atoi(port)
- addr := &peerAddr{
- Addr: self.hive.addr,
- ID: id[:],
- IP: net.ParseIP(host),
- Port: uint16(intport),
+// GetHandshake returns the bzz handhake that the remote peer with peerID sent
+func (b *Bzz) GetHandshake(peerID discover.NodeID) (*HandshakeMsg, bool) {
+ b.mtx.Lock()
+ defer b.mtx.Unlock()
+ handshake, found := b.handshakes[peerID]
+ if !found {
+ handshake = &HandshakeMsg{
+ Version: uint64(BzzSpec.Version),
+ NetworkID: b.NetworkID,
+ Addr: b.localAddr,
+ init: make(chan bool, 1),
+ done: make(chan struct{}),
+ }
+ // when handhsake is first created for a remote peer
+ // it is initialised with the init
+ handshake.init <- true
+ b.handshakes[peerID] = handshake
}
- return addr
+
+ return handshake, found
}
-// outgoing messages
-// send retrieveRequestMsg
-func (self *bzz) retrieve(req *retrieveRequestMsgData) error {
- return self.send(retrieveRequestMsg, req)
+// BzzAddr implements the PeerAddr interface
+type BzzAddr struct {
+ OAddr []byte
+ UAddr []byte
}
-// send storeRequestMsg
-func (self *bzz) store(req *storeRequestMsgData) error {
- return self.send(storeRequestMsg, req)
+// Address implements OverlayPeer interface to be used in Overlay
+func (a *BzzAddr) Address() []byte {
+ return a.OAddr
}
-func (self *bzz) syncRequest() error {
- req := &syncRequestMsgData{}
- if self.hive.syncEnabled {
- log.Debug(fmt.Sprintf("syncronisation request to peer %v at state %v", self, self.syncState))
- req.SyncState = self.syncState
- }
- if self.syncState == nil {
- log.Warn(fmt.Sprintf("syncronisation disabled for peer %v at state %v", self, self.syncState))
- }
- return self.send(syncRequestMsg, req)
+// Over returns the overlay address
+func (a *BzzAddr) Over() []byte {
+ return a.OAddr
}
-// queue storeRequestMsg in request db
-func (self *bzz) deliveryRequest(reqs []*syncRequest) error {
- req := &deliveryRequestMsgData{
- Deliver: reqs,
- }
- return self.send(deliveryRequestMsg, req)
+// Under returns the underlay address
+func (a *BzzAddr) Under() []byte {
+ return a.UAddr
}
-// batch of syncRequests to send off
-func (self *bzz) unsyncedKeys(reqs []*syncRequest, state *syncState) error {
- req := &unsyncedKeysMsgData{
- Unsynced: reqs,
- State: state,
- }
- return self.send(unsyncedKeysMsg, req)
+// ID returns the nodeID from the underlay enode address
+func (a *BzzAddr) ID() discover.NodeID {
+ return discover.MustParseNode(string(a.UAddr)).ID
}
-// send paymentMsg
-func (self *bzz) Pay(units int, promise swap.Promise) {
- req := &paymentMsgData{uint(units), promise.(*chequebook.Cheque)}
- self.payment(req)
+// Update updates the underlay address of a peer record
+func (a *BzzAddr) Update(na OverlayAddr) OverlayAddr {
+ return &BzzAddr{a.OAddr, na.(Addr).Under()}
}
-// send paymentMsg
-func (self *bzz) payment(req *paymentMsgData) error {
- return self.send(paymentMsg, req)
+// String pretty prints the address
+func (a *BzzAddr) String() string {
+ return fmt.Sprintf("%x <%s>", a.OAddr, a.UAddr)
+}
+
+// RandomAddr is a utility method generating an address from a public key
+func RandomAddr() *BzzAddr {
+ key, err := crypto.GenerateKey()
+ if err != nil {
+ panic("unable to generate key")
+ }
+ pubkey := crypto.FromECDSAPub(&key.PublicKey)
+ var id discover.NodeID
+ copy(id[:], pubkey[1:])
+ return NewAddrFromNodeID(id)
}
-// sends peersMsg
-func (self *bzz) peers(req *peersMsgData) error {
- return self.send(peersMsg, req)
+// NewNodeIDFromAddr transforms the underlay address to an adapters.NodeID
+func NewNodeIDFromAddr(addr Addr) discover.NodeID {
+ log.Info(fmt.Sprintf("uaddr=%s", string(addr.Under())))
+ node := discover.MustParseNode(string(addr.Under()))
+ return node.ID
}
-func (self *bzz) send(msg uint64, data interface{}) error {
- if self.hive.blockWrite {
- return fmt.Errorf("network write blocked")
+// NewAddrFromNodeID constucts a BzzAddr from a discover.NodeID
+// the overlay address is derived as the hash of the nodeID
+func NewAddrFromNodeID(id discover.NodeID) *BzzAddr {
+ return &BzzAddr{
+ OAddr: ToOverlayAddr(id.Bytes()),
+ UAddr: []byte(discover.NewNode(id, net.IP{127, 0, 0, 1}, 30303, 30303).String()),
}
- log.Trace(fmt.Sprintf("-> %v: %v (%T) to %v", msg, data, data, self))
- err := p2p.Send(self.rw, msg, data)
- if err != nil {
- self.Drop()
+}
+
+// NewAddrFromNodeIDAndPort constucts a BzzAddr from a discover.NodeID and port uint16
+// the overlay address is derived as the hash of the nodeID
+func NewAddrFromNodeIDAndPort(id discover.NodeID, host net.IP, port uint16) *BzzAddr {
+ return &BzzAddr{
+ OAddr: ToOverlayAddr(id.Bytes()),
+ UAddr: []byte(discover.NewNode(id, host, port, port).String()),
}
- return err
+}
+
+// ToOverlayAddr creates an overlayaddress from a byte slice
+func ToOverlayAddr(id []byte) []byte {
+ return crypto.Keccak256(id)
}