diff options
author | ethersphere <thesw@rm.eth> | 2018-06-20 20:06:27 +0800 |
---|---|---|
committer | ethersphere <thesw@rm.eth> | 2018-06-22 03:10:31 +0800 |
commit | e187711c6545487d4cac3701f0f506bb536234e2 (patch) | |
tree | d2f6150f70b84b36e49a449082aeda267b4b9046 /swarm/network/protocol.go | |
parent | 574378edb50c907b532946a1d4654dbd6701b20a (diff) | |
download | dexon-e187711c6545487d4cac3701f0f506bb536234e2.tar dexon-e187711c6545487d4cac3701f0f506bb536234e2.tar.gz dexon-e187711c6545487d4cac3701f0f506bb536234e2.tar.bz2 dexon-e187711c6545487d4cac3701f0f506bb536234e2.tar.lz dexon-e187711c6545487d4cac3701f0f506bb536234e2.tar.xz dexon-e187711c6545487d4cac3701f0f506bb536234e2.tar.zst dexon-e187711c6545487d4cac3701f0f506bb536234e2.zip |
swarm: network rewrite merge
Diffstat (limited to 'swarm/network/protocol.go')
-rw-r--r-- | swarm/network/protocol.go | 759 |
1 files changed, 327 insertions, 432 deletions
diff --git a/swarm/network/protocol.go b/swarm/network/protocol.go index 1cbe00a97..39673f5a1 100644 --- a/swarm/network/protocol.go +++ b/swarm/network/protocol.go @@ -16,519 +16,414 @@ package network -/* -bzz implements the swarm wire protocol [bzz] (sister of eth and shh) -the protocol instance is launched on each peer by the network layer if the -bzz protocol handler is registered on the p2p server. - -The bzz protocol component speaks the bzz protocol -* handle the protocol handshake -* register peers in the KΛÐΞMLIΛ table via the hive logistic manager -* dispatch to hive for handling the DHT logic -* encode and decode requests for storage and retrieval -* handle sync protocol messages via the syncer -* talks the SWAP payment protocol (swap accounting is done within NetStore) -*/ - import ( + "context" "errors" "fmt" "net" - "strconv" + "sync" "time" - "github.com/ethereum/go-ethereum/contracts/chequebook" - "github.com/ethereum/go-ethereum/log" - "github.com/ethereum/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/p2p" - bzzswap "github.com/ethereum/go-ethereum/swarm/services/swap" - "github.com/ethereum/go-ethereum/swarm/services/swap/swap" - "github.com/ethereum/go-ethereum/swarm/storage" -) - -//metrics variables -var ( - storeRequestMsgCounter = metrics.NewRegisteredCounter("network.protocol.msg.storerequest.count", nil) - retrieveRequestMsgCounter = metrics.NewRegisteredCounter("network.protocol.msg.retrieverequest.count", nil) - peersMsgCounter = metrics.NewRegisteredCounter("network.protocol.msg.peers.count", nil) - syncRequestMsgCounter = metrics.NewRegisteredCounter("network.protocol.msg.syncrequest.count", nil) - unsyncedKeysMsgCounter = metrics.NewRegisteredCounter("network.protocol.msg.unsyncedkeys.count", nil) - deliverRequestMsgCounter = metrics.NewRegisteredCounter("network.protocol.msg.deliverrequest.count", nil) - paymentMsgCounter = metrics.NewRegisteredCounter("network.protocol.msg.payment.count", nil) - invalidMsgCounter = metrics.NewRegisteredCounter("network.protocol.msg.invalid.count", nil) - handleStatusMsgCounter = metrics.NewRegisteredCounter("network.protocol.msg.handlestatus.count", nil) + "github.com/ethereum/go-ethereum/p2p/discover" + "github.com/ethereum/go-ethereum/p2p/protocols" + "github.com/ethereum/go-ethereum/rpc" + "github.com/ethereum/go-ethereum/swarm/log" + "github.com/ethereum/go-ethereum/swarm/state" ) const ( - Version = 0 - ProtocolLength = uint64(8) + DefaultNetworkID = 3 + // ProtocolMaxMsgSize maximum allowed message size ProtocolMaxMsgSize = 10 * 1024 * 1024 - NetworkId = 3 + // timeout for waiting + bzzHandshakeTimeout = 3000 * time.Millisecond ) -// bzz represents the swarm wire protocol -// an instance is running on each peer -type bzz struct { - storage StorageHandler // handler storage/retrieval related requests coming via the bzz wire protocol - hive *Hive // the logistic manager, peerPool, routing service and peer handler - dbAccess *DbAccess // access to db storage counter and iterator for syncing - requestDb *storage.LDBDatabase // db to persist backlog of deliveries to aid syncing - remoteAddr *peerAddr // remote peers address - peer *p2p.Peer // the p2p peer object - rw p2p.MsgReadWriter // messageReadWriter to send messages to - backend chequebook.Backend - lastActive time.Time - NetworkId uint64 - - swap *swap.Swap // swap instance for the peer connection - swapParams *bzzswap.SwapParams // swap settings both local and remote - swapEnabled bool // flag to enable SWAP (will be set via Caps in handshake) - syncEnabled bool // flag to enable SYNC (will be set via Caps in handshake) - syncer *syncer // syncer instance for the peer connection - syncParams *SyncParams // syncer params - syncState *syncState // outgoing syncronisation state (contains reference to remote peers db counter) -} - -// interface type for handler of storage/retrieval related requests coming -// via the bzz wire protocol -// messages: UnsyncedKeys, DeliveryRequest, StoreRequest, RetrieveRequest -type StorageHandler interface { - HandleUnsyncedKeysMsg(req *unsyncedKeysMsgData, p *peer) error - HandleDeliveryRequestMsg(req *deliveryRequestMsgData, p *peer) error - HandleStoreRequestMsg(req *storeRequestMsgData, p *peer) - HandleRetrieveRequestMsg(req *retrieveRequestMsgData, p *peer) +// BzzSpec is the spec of the generic swarm handshake +var BzzSpec = &protocols.Spec{ + Name: "bzz", + Version: 4, + MaxMsgSize: 10 * 1024 * 1024, + Messages: []interface{}{ + HandshakeMsg{}, + }, } -/* -main entrypoint, wrappers starting a server that will run the bzz protocol -use this constructor to attach the protocol ("class") to server caps -This is done by node.Node#Register(func(node.ServiceContext) (Service, error)) -Service implements Protocols() which is an array of protocol constructors -at node startup the protocols are initialised -the Dev p2p layer then calls Run(p *p2p.Peer, rw p2p.MsgReadWriter) error -on each peer connection -The Run function of the Bzz protocol class creates a bzz instance -which will represent the peer for the swarm hive and all peer-aware components -*/ -func Bzz(cloud StorageHandler, backend chequebook.Backend, hive *Hive, dbaccess *DbAccess, sp *bzzswap.SwapParams, sy *SyncParams, networkId uint64) (p2p.Protocol, error) { +// DiscoverySpec is the spec for the bzz discovery subprotocols +var DiscoverySpec = &protocols.Spec{ + Name: "hive", + Version: 4, + MaxMsgSize: 10 * 1024 * 1024, + Messages: []interface{}{ + peersMsg{}, + subPeersMsg{}, + }, +} - // a single global request db is created for all peer connections - // this is to persist delivery backlog and aid syncronisation - requestDb, err := storage.NewLDBDatabase(sy.RequestDbPath) - if err != nil { - return p2p.Protocol{}, fmt.Errorf("error setting up request db: %v", err) - } - if networkId == 0 { - networkId = NetworkId - } - return p2p.Protocol{ - Name: "bzz", - Version: Version, - Length: ProtocolLength, - Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error { - return run(requestDb, cloud, backend, hive, dbaccess, sp, sy, networkId, p, rw) - }, - }, nil +// Addr interface that peerPool needs +type Addr interface { + OverlayPeer + Over() []byte + Under() []byte + String() string + Update(OverlayAddr) OverlayAddr } -/* -the main protocol loop that - * does the handshake by exchanging statusMsg - * if peer is valid and accepted, registers with the hive - * then enters into a forever loop handling incoming messages - * storage and retrieval related queries coming via bzz are dispatched to StorageHandler - * peer-related messages are dispatched to the hive - * payment related messages are relayed to SWAP service - * on disconnect, unregister the peer in the hive (note RemovePeer in the post-disconnect hook) - * whenever the loop terminates, the peer will disconnect with Subprotocol error - * whenever handlers return an error the loop terminates -*/ -func run(requestDb *storage.LDBDatabase, depo StorageHandler, backend chequebook.Backend, hive *Hive, dbaccess *DbAccess, sp *bzzswap.SwapParams, sy *SyncParams, networkId uint64, p *p2p.Peer, rw p2p.MsgReadWriter) (err error) { - - self := &bzz{ - storage: depo, - backend: backend, - hive: hive, - dbAccess: dbaccess, - requestDb: requestDb, - peer: p, - rw: rw, - swapParams: sp, - syncParams: sy, - swapEnabled: hive.swapEnabled, - syncEnabled: true, - NetworkId: networkId, - } +// Peer interface represents an live peer connection +type Peer interface { + Addr // the address of a peer + Conn // the live connection (protocols.Peer) + LastActive() time.Time // last time active +} - // handle handshake - err = self.handleStatus() - if err != nil { - return err - } - defer func() { - // if the handler loop exits, the peer is disconnecting - // deregister the peer in the hive - self.hive.removePeer(&peer{bzz: self}) - if self.syncer != nil { - self.syncer.stop() // quits request db and delivery loops, save requests - } - if self.swap != nil { - self.swap.Stop() // quits chequebox autocash etc - } - }() +// Conn interface represents an live peer connection +type Conn interface { + ID() discover.NodeID // the key that uniquely identifies the Node for the peerPool + Handshake(context.Context, interface{}, func(interface{}) error) (interface{}, error) // can send messages + Send(interface{}) error // can send messages + Drop(error) // disconnect this peer + Run(func(interface{}) error) error // the run function to run a protocol + Off() OverlayAddr +} - // the main forever loop that handles incoming requests - for { - if self.hive.blockRead { - log.Warn(fmt.Sprintf("Cannot read network")) - time.Sleep(100 * time.Millisecond) - continue - } - err = self.handle() - if err != nil { - return - } +// BzzConfig captures the config params used by the hive +type BzzConfig struct { + OverlayAddr []byte // base address of the overlay network + UnderlayAddr []byte // node's underlay address + HiveParams *HiveParams + NetworkID uint64 +} + +// Bzz is the swarm protocol bundle +type Bzz struct { + *Hive + NetworkID uint64 + localAddr *BzzAddr + mtx sync.Mutex + handshakes map[discover.NodeID]*HandshakeMsg + streamerSpec *protocols.Spec + streamerRun func(*BzzPeer) error +} + +// NewBzz is the swarm protocol constructor +// arguments +// * bzz config +// * overlay driver +// * peer store +func NewBzz(config *BzzConfig, kad Overlay, store state.Store, streamerSpec *protocols.Spec, streamerRun func(*BzzPeer) error) *Bzz { + return &Bzz{ + Hive: NewHive(config.HiveParams, kad, store), + NetworkID: config.NetworkID, + localAddr: &BzzAddr{config.OverlayAddr, config.UnderlayAddr}, + handshakes: make(map[discover.NodeID]*HandshakeMsg), + streamerRun: streamerRun, + streamerSpec: streamerSpec, } } -// TODO: may need to implement protocol drop only? don't want to kick off the peer -// if they are useful for other protocols -func (self *bzz) Drop() { - self.peer.Disconnect(p2p.DiscSubprotocolError) +// UpdateLocalAddr updates underlayaddress of the running node +func (b *Bzz) UpdateLocalAddr(byteaddr []byte) *BzzAddr { + b.localAddr = b.localAddr.Update(&BzzAddr{ + UAddr: byteaddr, + OAddr: b.localAddr.OAddr, + }).(*BzzAddr) + return b.localAddr } -// one cycle of the main forever loop that handles and dispatches incoming messages -func (self *bzz) handle() error { - msg, err := self.rw.ReadMsg() - log.Debug(fmt.Sprintf("<- %v", msg)) - if err != nil { - return err +// NodeInfo returns the node's overlay address +func (b *Bzz) NodeInfo() interface{} { + return b.localAddr.Address() +} + +// Protocols return the protocols swarm offers +// Bzz implements the node.Service interface +// * handshake/hive +// * discovery +func (b *Bzz) Protocols() []p2p.Protocol { + protocol := []p2p.Protocol{ + { + Name: BzzSpec.Name, + Version: BzzSpec.Version, + Length: BzzSpec.Length(), + Run: b.runBzz, + NodeInfo: b.NodeInfo, + }, + { + Name: DiscoverySpec.Name, + Version: DiscoverySpec.Version, + Length: DiscoverySpec.Length(), + Run: b.RunProtocol(DiscoverySpec, b.Hive.Run), + NodeInfo: b.Hive.NodeInfo, + PeerInfo: b.Hive.PeerInfo, + }, } - if msg.Size > ProtocolMaxMsgSize { - return fmt.Errorf("message too long: %v > %v", msg.Size, ProtocolMaxMsgSize) + if b.streamerSpec != nil && b.streamerRun != nil { + protocol = append(protocol, p2p.Protocol{ + Name: b.streamerSpec.Name, + Version: b.streamerSpec.Version, + Length: b.streamerSpec.Length(), + Run: b.RunProtocol(b.streamerSpec, b.streamerRun), + }) } - // make sure that the payload has been fully consumed - defer msg.Discard() - - switch msg.Code { - - case statusMsg: - // no extra status message allowed. The one needed already handled by - // handleStatus - log.Debug(fmt.Sprintf("Status message: %v", msg)) - return errors.New("extra status message") - - case storeRequestMsg: - // store requests are dispatched to netStore - storeRequestMsgCounter.Inc(1) - var req storeRequestMsgData - if err := msg.Decode(&req); err != nil { - return fmt.Errorf("<- %v: %v", msg, err) - } - if n := len(req.SData); n < 9 { - return fmt.Errorf("<- %v: Data too short (%v)", msg, n) - } - // last Active time is set only when receiving chunks - self.lastActive = time.Now() - log.Trace(fmt.Sprintf("incoming store request: %s", req.String())) - // swap accounting is done within forwarding - self.storage.HandleStoreRequestMsg(&req, &peer{bzz: self}) - - case retrieveRequestMsg: - // retrieve Requests are dispatched to netStore - retrieveRequestMsgCounter.Inc(1) - var req retrieveRequestMsgData - if err := msg.Decode(&req); err != nil { - return fmt.Errorf("<- %v: %v", msg, err) - } - req.from = &peer{bzz: self} - // if request is lookup and not to be delivered - if req.isLookup() { - log.Trace(fmt.Sprintf("self lookup for %v: responding with peers only...", req.from)) - } else if req.Key == nil { - return fmt.Errorf("protocol handler: req.Key == nil || req.Timeout == nil") - } else { - // swap accounting is done within netStore - self.storage.HandleRetrieveRequestMsg(&req, &peer{bzz: self}) - } - // direct response with peers, TODO: sort this out - self.hive.peers(&req) - - case peersMsg: - // response to lookups and immediate response to retrieve requests - // dispatches new peer data to the hive that adds them to KADDB - peersMsgCounter.Inc(1) - var req peersMsgData - if err := msg.Decode(&req); err != nil { - return fmt.Errorf("<- %v: %v", msg, err) - } - req.from = &peer{bzz: self} - log.Trace(fmt.Sprintf("<- peer addresses: %v", req)) - self.hive.HandlePeersMsg(&req, &peer{bzz: self}) - - case syncRequestMsg: - syncRequestMsgCounter.Inc(1) - var req syncRequestMsgData - if err := msg.Decode(&req); err != nil { - return fmt.Errorf("<- %v: %v", msg, err) - } - log.Debug(fmt.Sprintf("<- sync request: %v", req)) - self.lastActive = time.Now() - self.sync(req.SyncState) - - case unsyncedKeysMsg: - // coming from parent node offering - unsyncedKeysMsgCounter.Inc(1) - var req unsyncedKeysMsgData - if err := msg.Decode(&req); err != nil { - return fmt.Errorf("<- %v: %v", msg, err) - } - log.Debug(fmt.Sprintf("<- unsynced keys : %s", req.String())) - err := self.storage.HandleUnsyncedKeysMsg(&req, &peer{bzz: self}) - self.lastActive = time.Now() - if err != nil { - return fmt.Errorf("<- %v: %v", msg, err) - } + return protocol +} + +// APIs returns the APIs offered by bzz +// * hive +// Bzz implements the node.Service interface +func (b *Bzz) APIs() []rpc.API { + return []rpc.API{{ + Namespace: "hive", + Version: "3.0", + Service: b.Hive, + }} +} - case deliveryRequestMsg: - // response to syncKeysMsg hashes filtered not existing in db - // also relays the last synced state to the source - deliverRequestMsgCounter.Inc(1) - var req deliveryRequestMsgData - if err := msg.Decode(&req); err != nil { - return fmt.Errorf("<-msg %v: %v", msg, err) +// RunProtocol is a wrapper for swarm subprotocols +// returns a p2p protocol run function that can be assigned to p2p.Protocol#Run field +// arguments: +// * p2p protocol spec +// * run function taking BzzPeer as argument +// this run function is meant to block for the duration of the protocol session +// on return the session is terminated and the peer is disconnected +// the protocol waits for the bzz handshake is negotiated +// the overlay address on the BzzPeer is set from the remote handshake +func (b *Bzz) RunProtocol(spec *protocols.Spec, run func(*BzzPeer) error) func(*p2p.Peer, p2p.MsgReadWriter) error { + return func(p *p2p.Peer, rw p2p.MsgReadWriter) error { + // wait for the bzz protocol to perform the handshake + handshake, _ := b.GetHandshake(p.ID()) + defer b.removeHandshake(p.ID()) + select { + case <-handshake.done: + case <-time.After(bzzHandshakeTimeout): + return fmt.Errorf("%08x: %s protocol timeout waiting for handshake on %08x", b.BaseAddr()[:4], spec.Name, p.ID().Bytes()[:4]) } - log.Debug(fmt.Sprintf("<- delivery request: %s", req.String())) - err := self.storage.HandleDeliveryRequestMsg(&req, &peer{bzz: self}) - self.lastActive = time.Now() - if err != nil { - return fmt.Errorf("<- %v: %v", msg, err) + if handshake.err != nil { + return fmt.Errorf("%08x: %s protocol closed: %v", b.BaseAddr()[:4], spec.Name, handshake.err) } - - case paymentMsg: - // swap protocol message for payment, Units paid for, Cheque paid with - paymentMsgCounter.Inc(1) - if self.swapEnabled { - var req paymentMsgData - if err := msg.Decode(&req); err != nil { - return fmt.Errorf("<- %v: %v", msg, err) - } - log.Debug(fmt.Sprintf("<- payment: %s", req.String())) - self.swap.Receive(int(req.Units), req.Promise) + // the handshake has succeeded so construct the BzzPeer and run the protocol + peer := &BzzPeer{ + Peer: protocols.NewPeer(p, rw, spec), + localAddr: b.localAddr, + BzzAddr: handshake.peerAddr, + lastActive: time.Now(), } + return run(peer) + } +} - default: - // no other message is allowed - invalidMsgCounter.Inc(1) - return fmt.Errorf("invalid message code: %v", msg.Code) +// performHandshake implements the negotiation of the bzz handshake +// shared among swarm subprotocols +func (b *Bzz) performHandshake(p *protocols.Peer, handshake *HandshakeMsg) error { + ctx, cancel := context.WithTimeout(context.Background(), bzzHandshakeTimeout) + defer func() { + close(handshake.done) + cancel() + }() + rsh, err := p.Handshake(ctx, handshake, b.checkHandshake) + if err != nil { + handshake.err = err + return err } + handshake.peerAddr = rsh.(*HandshakeMsg).Addr return nil } -func (self *bzz) handleStatus() (err error) { - - handshake := &statusMsgData{ - Version: uint64(Version), - ID: "honey", - Addr: self.selfAddr(), - NetworkId: self.NetworkId, - Swap: &bzzswap.SwapProfile{ - Profile: self.swapParams.Profile, - PayProfile: self.swapParams.PayProfile, - }, +// runBzz is the p2p protocol run function for the bzz base protocol +// that negotiates the bzz handshake +func (b *Bzz) runBzz(p *p2p.Peer, rw p2p.MsgReadWriter) error { + handshake, _ := b.GetHandshake(p.ID()) + if !<-handshake.init { + return fmt.Errorf("%08x: bzz already started on peer %08x", b.localAddr.Over()[:4], ToOverlayAddr(p.ID().Bytes())[:4]) } - - err = p2p.Send(self.rw, statusMsg, handshake) + close(handshake.init) + defer b.removeHandshake(p.ID()) + peer := protocols.NewPeer(p, rw, BzzSpec) + err := b.performHandshake(peer, handshake) if err != nil { + log.Warn(fmt.Sprintf("%08x: handshake failed with remote peer %08x: %v", b.localAddr.Over()[:4], ToOverlayAddr(p.ID().Bytes())[:4], err)) + return err } - - // read and handle remote status - var msg p2p.Msg - msg, err = self.rw.ReadMsg() + // fail if we get another handshake + msg, err := rw.ReadMsg() if err != nil { return err } + msg.Discard() + return errors.New("received multiple handshakes") +} - if msg.Code != statusMsg { - return fmt.Errorf("first msg has code %x (!= %x)", msg.Code, statusMsg) - } - - handleStatusMsgCounter.Inc(1) - - if msg.Size > ProtocolMaxMsgSize { - return fmt.Errorf("message too long: %v > %v", msg.Size, ProtocolMaxMsgSize) - } - - var status statusMsgData - if err := msg.Decode(&status); err != nil { - return fmt.Errorf("<- %v: %v", msg, err) - } +// BzzPeer is the bzz protocol view of a protocols.Peer (itself an extension of p2p.Peer) +// implements the Peer interface and all interfaces Peer implements: Addr, OverlayPeer +type BzzPeer struct { + *protocols.Peer // represents the connection for online peers + localAddr *BzzAddr // local Peers address + *BzzAddr // remote address -> implements Addr interface = protocols.Peer + lastActive time.Time // time is updated whenever mutexes are releasing +} - if status.NetworkId != self.NetworkId { - return fmt.Errorf("network id mismatch: %d (!= %d)", status.NetworkId, self.NetworkId) +func NewBzzTestPeer(p *protocols.Peer, addr *BzzAddr) *BzzPeer { + return &BzzPeer{ + Peer: p, + localAddr: addr, + BzzAddr: NewAddrFromNodeID(p.ID()), } +} - if Version != status.Version { - return fmt.Errorf("protocol version mismatch: %d (!= %d)", status.Version, Version) - } +// Off returns the overlay peer record for offline persistence +func (p *BzzPeer) Off() OverlayAddr { + return p.BzzAddr +} - self.remoteAddr = self.peerAddr(status.Addr) - log.Trace(fmt.Sprintf("self: advertised IP: %v, peer advertised: %v, local address: %v\npeer: advertised IP: %v, remote address: %v\n", self.selfAddr(), self.remoteAddr, self.peer.LocalAddr(), status.Addr.IP, self.peer.RemoteAddr())) +// LastActive returns the time the peer was last active +func (p *BzzPeer) LastActive() time.Time { + return p.lastActive +} - if self.swapEnabled { - // set remote profile for accounting - self.swap, err = bzzswap.NewSwap(self.swapParams, status.Swap, self.backend, self) - if err != nil { - return err - } - } +/* + Handshake - log.Info(fmt.Sprintf("Peer %08x is capable (%d/%d)", self.remoteAddr.Addr[:4], status.Version, status.NetworkId)) - err = self.hive.addPeer(&peer{bzz: self}) - if err != nil { - return err - } +* Version: 8 byte integer version of the protocol +* NetworkID: 8 byte integer network identifier +* Addr: the address advertised by the node including underlay and overlay connecctions +*/ +type HandshakeMsg struct { + Version uint64 + NetworkID uint64 + Addr *BzzAddr - // hive sets syncstate so sync should start after node added - log.Info(fmt.Sprintf("syncronisation request sent with %v", self.syncState)) - self.syncRequest() + // peerAddr is the address received in the peer handshake + peerAddr *BzzAddr - return nil + init chan bool + done chan struct{} + err error } -func (self *bzz) sync(state *syncState) error { - // syncer setup - if self.syncer != nil { - return errors.New("sync request can only be sent once") - } +// String pretty prints the handshake +func (bh *HandshakeMsg) String() string { + return fmt.Sprintf("Handshake: Version: %v, NetworkID: %v, Addr: %v", bh.Version, bh.NetworkID, bh.Addr) +} - cnt := self.dbAccess.counter() - remoteaddr := self.remoteAddr.Addr - start, stop := self.hive.kad.KeyRange(remoteaddr) - - // an explicitly received nil syncstate disables syncronisation - if state == nil { - self.syncEnabled = false - log.Warn(fmt.Sprintf("syncronisation disabled for peer %v", self)) - state = &syncState{DbSyncState: &storage.DbSyncState{}, Synced: true} - } else { - state.synced = make(chan bool) - state.SessionAt = cnt - if storage.IsZeroKey(state.Stop) && state.Synced { - state.Start = storage.Key(start[:]) - state.Stop = storage.Key(stop[:]) - } - log.Debug(fmt.Sprintf("syncronisation requested by peer %v at state %v", self, state)) +// Perform initiates the handshake and validates the remote handshake message +func (b *Bzz) checkHandshake(hs interface{}) error { + rhs := hs.(*HandshakeMsg) + if rhs.NetworkID != b.NetworkID { + return fmt.Errorf("network id mismatch %d (!= %d)", rhs.NetworkID, b.NetworkID) } - var err error - self.syncer, err = newSyncer( - self.requestDb, - storage.Key(remoteaddr[:]), - self.dbAccess, - self.unsyncedKeys, self.store, - self.syncParams, state, func() bool { return self.syncEnabled }, - ) - if err != nil { - return nil + if rhs.Version != uint64(BzzSpec.Version) { + return fmt.Errorf("version mismatch %d (!= %d)", rhs.Version, BzzSpec.Version) } - log.Trace(fmt.Sprintf("syncer set for peer %v", self)) return nil } -func (self *bzz) String() string { - return self.remoteAddr.String() +// removeHandshake removes handshake for peer with peerID +// from the bzz handshake store +func (b *Bzz) removeHandshake(peerID discover.NodeID) { + b.mtx.Lock() + defer b.mtx.Unlock() + delete(b.handshakes, peerID) } -// repair reported address if IP missing -func (self *bzz) peerAddr(base *peerAddr) *peerAddr { - if base.IP.IsUnspecified() { - host, _, _ := net.SplitHostPort(self.peer.RemoteAddr().String()) - base.IP = net.ParseIP(host) - } - return base -} - -// returns self advertised node connection info (listening address w enodes) -// IP will get repaired on the other end if missing -// or resolved via ID by discovery at dialout -func (self *bzz) selfAddr() *peerAddr { - id := self.hive.id - host, port, _ := net.SplitHostPort(self.hive.listenAddr()) - intport, _ := strconv.Atoi(port) - addr := &peerAddr{ - Addr: self.hive.addr, - ID: id[:], - IP: net.ParseIP(host), - Port: uint16(intport), +// GetHandshake returns the bzz handhake that the remote peer with peerID sent +func (b *Bzz) GetHandshake(peerID discover.NodeID) (*HandshakeMsg, bool) { + b.mtx.Lock() + defer b.mtx.Unlock() + handshake, found := b.handshakes[peerID] + if !found { + handshake = &HandshakeMsg{ + Version: uint64(BzzSpec.Version), + NetworkID: b.NetworkID, + Addr: b.localAddr, + init: make(chan bool, 1), + done: make(chan struct{}), + } + // when handhsake is first created for a remote peer + // it is initialised with the init + handshake.init <- true + b.handshakes[peerID] = handshake } - return addr + + return handshake, found } -// outgoing messages -// send retrieveRequestMsg -func (self *bzz) retrieve(req *retrieveRequestMsgData) error { - return self.send(retrieveRequestMsg, req) +// BzzAddr implements the PeerAddr interface +type BzzAddr struct { + OAddr []byte + UAddr []byte } -// send storeRequestMsg -func (self *bzz) store(req *storeRequestMsgData) error { - return self.send(storeRequestMsg, req) +// Address implements OverlayPeer interface to be used in Overlay +func (a *BzzAddr) Address() []byte { + return a.OAddr } -func (self *bzz) syncRequest() error { - req := &syncRequestMsgData{} - if self.hive.syncEnabled { - log.Debug(fmt.Sprintf("syncronisation request to peer %v at state %v", self, self.syncState)) - req.SyncState = self.syncState - } - if self.syncState == nil { - log.Warn(fmt.Sprintf("syncronisation disabled for peer %v at state %v", self, self.syncState)) - } - return self.send(syncRequestMsg, req) +// Over returns the overlay address +func (a *BzzAddr) Over() []byte { + return a.OAddr } -// queue storeRequestMsg in request db -func (self *bzz) deliveryRequest(reqs []*syncRequest) error { - req := &deliveryRequestMsgData{ - Deliver: reqs, - } - return self.send(deliveryRequestMsg, req) +// Under returns the underlay address +func (a *BzzAddr) Under() []byte { + return a.UAddr } -// batch of syncRequests to send off -func (self *bzz) unsyncedKeys(reqs []*syncRequest, state *syncState) error { - req := &unsyncedKeysMsgData{ - Unsynced: reqs, - State: state, - } - return self.send(unsyncedKeysMsg, req) +// ID returns the nodeID from the underlay enode address +func (a *BzzAddr) ID() discover.NodeID { + return discover.MustParseNode(string(a.UAddr)).ID } -// send paymentMsg -func (self *bzz) Pay(units int, promise swap.Promise) { - req := &paymentMsgData{uint(units), promise.(*chequebook.Cheque)} - self.payment(req) +// Update updates the underlay address of a peer record +func (a *BzzAddr) Update(na OverlayAddr) OverlayAddr { + return &BzzAddr{a.OAddr, na.(Addr).Under()} } -// send paymentMsg -func (self *bzz) payment(req *paymentMsgData) error { - return self.send(paymentMsg, req) +// String pretty prints the address +func (a *BzzAddr) String() string { + return fmt.Sprintf("%x <%s>", a.OAddr, a.UAddr) +} + +// RandomAddr is a utility method generating an address from a public key +func RandomAddr() *BzzAddr { + key, err := crypto.GenerateKey() + if err != nil { + panic("unable to generate key") + } + pubkey := crypto.FromECDSAPub(&key.PublicKey) + var id discover.NodeID + copy(id[:], pubkey[1:]) + return NewAddrFromNodeID(id) } -// sends peersMsg -func (self *bzz) peers(req *peersMsgData) error { - return self.send(peersMsg, req) +// NewNodeIDFromAddr transforms the underlay address to an adapters.NodeID +func NewNodeIDFromAddr(addr Addr) discover.NodeID { + log.Info(fmt.Sprintf("uaddr=%s", string(addr.Under()))) + node := discover.MustParseNode(string(addr.Under())) + return node.ID } -func (self *bzz) send(msg uint64, data interface{}) error { - if self.hive.blockWrite { - return fmt.Errorf("network write blocked") +// NewAddrFromNodeID constucts a BzzAddr from a discover.NodeID +// the overlay address is derived as the hash of the nodeID +func NewAddrFromNodeID(id discover.NodeID) *BzzAddr { + return &BzzAddr{ + OAddr: ToOverlayAddr(id.Bytes()), + UAddr: []byte(discover.NewNode(id, net.IP{127, 0, 0, 1}, 30303, 30303).String()), } - log.Trace(fmt.Sprintf("-> %v: %v (%T) to %v", msg, data, data, self)) - err := p2p.Send(self.rw, msg, data) - if err != nil { - self.Drop() +} + +// NewAddrFromNodeIDAndPort constucts a BzzAddr from a discover.NodeID and port uint16 +// the overlay address is derived as the hash of the nodeID +func NewAddrFromNodeIDAndPort(id discover.NodeID, host net.IP, port uint16) *BzzAddr { + return &BzzAddr{ + OAddr: ToOverlayAddr(id.Bytes()), + UAddr: []byte(discover.NewNode(id, host, port, port).String()), } - return err +} + +// ToOverlayAddr creates an overlayaddress from a byte slice +func ToOverlayAddr(id []byte) []byte { + return crypto.Keccak256(id) } |