aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/network/protocol.go
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/network/protocol.go')
-rw-r--r--swarm/network/protocol.go554
1 files changed, 554 insertions, 0 deletions
diff --git a/swarm/network/protocol.go b/swarm/network/protocol.go
new file mode 100644
index 000000000..5e65108d6
--- /dev/null
+++ b/swarm/network/protocol.go
@@ -0,0 +1,554 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package network
+
+/*
+bzz implements the swarm wire protocol [bzz] (sister of eth and shh)
+the protocol instance is launched on each peer by the network layer if the
+bzz protocol handler is registered on the p2p server.
+
+The bzz protocol component speaks the bzz protocol
+* handle the protocol handshake
+* register peers in the KΛÐΞMLIΛ table via the hive logistic manager
+* dispatch to hive for handling the DHT logic
+* encode and decode requests for storage and retrieval
+* handle sync protocol messages via the syncer
+* talks the SWAP payment protocol (swap accounting is done within NetStore)
+*/
+
+import (
+ "fmt"
+ "net"
+ "strconv"
+ "time"
+
+ "github.com/ethereum/go-ethereum/contracts/chequebook"
+ "github.com/ethereum/go-ethereum/errs"
+ "github.com/ethereum/go-ethereum/logger"
+ "github.com/ethereum/go-ethereum/logger/glog"
+ "github.com/ethereum/go-ethereum/p2p"
+ "github.com/ethereum/go-ethereum/p2p/discover"
+ bzzswap "github.com/ethereum/go-ethereum/swarm/services/swap"
+ "github.com/ethereum/go-ethereum/swarm/services/swap/swap"
+ "github.com/ethereum/go-ethereum/swarm/storage"
+)
+
+const (
+ Version = 0
+ ProtocolLength = uint64(8)
+ ProtocolMaxMsgSize = 10 * 1024 * 1024
+ NetworkId = 322
+)
+
+const (
+ ErrMsgTooLarge = iota
+ ErrDecode
+ ErrInvalidMsgCode
+ ErrVersionMismatch
+ ErrNetworkIdMismatch
+ ErrNoStatusMsg
+ ErrExtraStatusMsg
+ ErrSwap
+ ErrSync
+ ErrUnwanted
+)
+
+var errorToString = map[int]string{
+ ErrMsgTooLarge: "Message too long",
+ ErrDecode: "Invalid message",
+ ErrInvalidMsgCode: "Invalid message code",
+ ErrVersionMismatch: "Protocol version mismatch",
+ ErrNetworkIdMismatch: "NetworkId mismatch",
+ ErrNoStatusMsg: "No status message",
+ ErrExtraStatusMsg: "Extra status message",
+ ErrSwap: "SWAP error",
+ ErrSync: "Sync error",
+ ErrUnwanted: "Unwanted peer",
+}
+
+// bzz represents the swarm wire protocol
+// an instance is running on each peer
+type bzz struct {
+ selfID discover.NodeID // peer's node id used in peer advertising in handshake
+ key storage.Key // baseaddress as storage.Key
+ storage StorageHandler // handler storage/retrieval related requests coming via the bzz wire protocol
+ hive *Hive // the logistic manager, peerPool, routing service and peer handler
+ dbAccess *DbAccess // access to db storage counter and iterator for syncing
+ requestDb *storage.LDBDatabase // db to persist backlog of deliveries to aid syncing
+ remoteAddr *peerAddr // remote peers address
+ peer *p2p.Peer // the p2p peer object
+ rw p2p.MsgReadWriter // messageReadWriter to send messages to
+ errors *errs.Errors // errors table
+ backend chequebook.Backend
+ lastActive time.Time
+
+ swap *swap.Swap // swap instance for the peer connection
+ swapParams *bzzswap.SwapParams // swap settings both local and remote
+ swapEnabled bool // flag to enable SWAP (will be set via Caps in handshake)
+ syncEnabled bool // flag to enable SYNC (will be set via Caps in handshake)
+ syncer *syncer // syncer instance for the peer connection
+ syncParams *SyncParams // syncer params
+ syncState *syncState // outgoing syncronisation state (contains reference to remote peers db counter)
+}
+
+// interface type for handler of storage/retrieval related requests coming
+// via the bzz wire protocol
+// messages: UnsyncedKeys, DeliveryRequest, StoreRequest, RetrieveRequest
+type StorageHandler interface {
+ HandleUnsyncedKeysMsg(req *unsyncedKeysMsgData, p *peer) error
+ HandleDeliveryRequestMsg(req *deliveryRequestMsgData, p *peer) error
+ HandleStoreRequestMsg(req *storeRequestMsgData, p *peer)
+ HandleRetrieveRequestMsg(req *retrieveRequestMsgData, p *peer)
+}
+
+/*
+main entrypoint, wrappers starting a server that will run the bzz protocol
+use this constructor to attach the protocol ("class") to server caps
+This is done by node.Node#Register(func(node.ServiceContext) (Service, error))
+Service implements Protocols() which is an array of protocol constructors
+at node startup the protocols are initialised
+the Dev p2p layer then calls Run(p *p2p.Peer, rw p2p.MsgReadWriter) error
+on each peer connection
+The Run function of the Bzz protocol class creates a bzz instance
+which will represent the peer for the swarm hive and all peer-aware components
+*/
+func Bzz(cloud StorageHandler, backend chequebook.Backend, hive *Hive, dbaccess *DbAccess, sp *bzzswap.SwapParams, sy *SyncParams) (p2p.Protocol, error) {
+
+ // a single global request db is created for all peer connections
+ // this is to persist delivery backlog and aid syncronisation
+ requestDb, err := storage.NewLDBDatabase(sy.RequestDbPath)
+ if err != nil {
+ return p2p.Protocol{}, fmt.Errorf("error setting up request db: %v", err)
+ }
+
+ return p2p.Protocol{
+ Name: "bzz",
+ Version: Version,
+ Length: ProtocolLength,
+ Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
+ return run(requestDb, cloud, backend, hive, dbaccess, sp, sy, p, rw)
+ },
+ }, nil
+}
+
+/*
+the main protocol loop that
+ * does the handshake by exchanging statusMsg
+ * if peer is valid and accepted, registers with the hive
+ * then enters into a forever loop handling incoming messages
+ * storage and retrieval related queries coming via bzz are dispatched to StorageHandler
+ * peer-related messages are dispatched to the hive
+ * payment related messages are relayed to SWAP service
+ * on disconnect, unregister the peer in the hive (note RemovePeer in the post-disconnect hook)
+ * whenever the loop terminates, the peer will disconnect with Subprotocol error
+ * whenever handlers return an error the loop terminates
+*/
+func run(requestDb *storage.LDBDatabase, depo StorageHandler, backend chequebook.Backend, hive *Hive, dbaccess *DbAccess, sp *bzzswap.SwapParams, sy *SyncParams, p *p2p.Peer, rw p2p.MsgReadWriter) (err error) {
+
+ self := &bzz{
+ storage: depo,
+ backend: backend,
+ hive: hive,
+ dbAccess: dbaccess,
+ requestDb: requestDb,
+ peer: p,
+ rw: rw,
+ errors: &errs.Errors{
+ Package: "BZZ",
+ Errors: errorToString,
+ },
+ swapParams: sp,
+ syncParams: sy,
+ swapEnabled: hive.swapEnabled,
+ syncEnabled: true,
+ }
+
+ // handle handshake
+ err = self.handleStatus()
+ if err != nil {
+ return err
+ }
+ defer func() {
+ // if the handler loop exits, the peer is disconnecting
+ // deregister the peer in the hive
+ self.hive.removePeer(&peer{bzz: self})
+ if self.syncer != nil {
+ self.syncer.stop() // quits request db and delivery loops, save requests
+ }
+ if self.swap != nil {
+ self.swap.Stop() // quits chequebox autocash etc
+ }
+ }()
+
+ // the main forever loop that handles incoming requests
+ for {
+ if self.hive.blockRead {
+ glog.V(logger.Warn).Infof("Cannot read network")
+ time.Sleep(100 * time.Millisecond)
+ continue
+ }
+ err = self.handle()
+ if err != nil {
+ return
+ }
+ }
+}
+
+// TODO: may need to implement protocol drop only? don't want to kick off the peer
+// if they are useful for other protocols
+func (self *bzz) Drop() {
+ self.peer.Disconnect(p2p.DiscSubprotocolError)
+}
+
+// one cycle of the main forever loop that handles and dispatches incoming messages
+func (self *bzz) handle() error {
+ msg, err := self.rw.ReadMsg()
+ glog.V(logger.Debug).Infof("<- %v", msg)
+ if err != nil {
+ return err
+ }
+ if msg.Size > ProtocolMaxMsgSize {
+ return self.protoError(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize)
+ }
+ // make sure that the payload has been fully consumed
+ defer msg.Discard()
+
+ switch msg.Code {
+
+ case statusMsg:
+ // no extra status message allowed. The one needed already handled by
+ // handleStatus
+ glog.V(logger.Debug).Infof("Status message: %v", msg)
+ return self.protoError(ErrExtraStatusMsg, "")
+
+ case storeRequestMsg:
+ // store requests are dispatched to netStore
+ var req storeRequestMsgData
+ if err := msg.Decode(&req); err != nil {
+ return self.protoError(ErrDecode, "<- %v: %v", msg, err)
+ }
+ if len(req.SData) < 9 {
+ return self.protoError(ErrDecode, "<- %v: Data too short (%v)", msg)
+ }
+ // last Active time is set only when receiving chunks
+ self.lastActive = time.Now()
+ glog.V(logger.Detail).Infof("incoming store request: %s", req.String())
+ // swap accounting is done within forwarding
+ self.storage.HandleStoreRequestMsg(&req, &peer{bzz: self})
+
+ case retrieveRequestMsg:
+ // retrieve Requests are dispatched to netStore
+ var req retrieveRequestMsgData
+ if err := msg.Decode(&req); err != nil {
+ return self.protoError(ErrDecode, "<- %v: %v", msg, err)
+ }
+ req.from = &peer{bzz: self}
+ // if request is lookup and not to be delivered
+ if req.isLookup() {
+ glog.V(logger.Detail).Infof("self lookup for %v: responding with peers only...", req.from)
+ } else if req.Key == nil {
+ return self.protoError(ErrDecode, "protocol handler: req.Key == nil || req.Timeout == nil")
+ } else {
+ // swap accounting is done within netStore
+ self.storage.HandleRetrieveRequestMsg(&req, &peer{bzz: self})
+ }
+ // direct response with peers, TODO: sort this out
+ self.hive.peers(&req)
+
+ case peersMsg:
+ // response to lookups and immediate response to retrieve requests
+ // dispatches new peer data to the hive that adds them to KADDB
+ var req peersMsgData
+ if err := msg.Decode(&req); err != nil {
+ return self.protoError(ErrDecode, "<- %v: %v", msg, err)
+ }
+ req.from = &peer{bzz: self}
+ glog.V(logger.Detail).Infof("<- peer addresses: %v", req)
+ self.hive.HandlePeersMsg(&req, &peer{bzz: self})
+
+ case syncRequestMsg:
+ var req syncRequestMsgData
+ if err := msg.Decode(&req); err != nil {
+ return self.protoError(ErrDecode, "<- %v: %v", msg, err)
+ }
+ glog.V(logger.Debug).Infof("<- sync request: %v", req)
+ self.lastActive = time.Now()
+ self.sync(req.SyncState)
+
+ case unsyncedKeysMsg:
+ // coming from parent node offering
+ var req unsyncedKeysMsgData
+ if err := msg.Decode(&req); err != nil {
+ return self.protoError(ErrDecode, "<- %v: %v", msg, err)
+ }
+ glog.V(logger.Debug).Infof("<- unsynced keys : %s", req.String())
+ err := self.storage.HandleUnsyncedKeysMsg(&req, &peer{bzz: self})
+ self.lastActive = time.Now()
+ if err != nil {
+ return self.protoError(ErrDecode, "<- %v: %v", msg, err)
+ }
+
+ case deliveryRequestMsg:
+ // response to syncKeysMsg hashes filtered not existing in db
+ // also relays the last synced state to the source
+ var req deliveryRequestMsgData
+ if err := msg.Decode(&req); err != nil {
+ return self.protoError(ErrDecode, "<-msg %v: %v", msg, err)
+ }
+ glog.V(logger.Debug).Infof("<- delivery request: %s", req.String())
+ err := self.storage.HandleDeliveryRequestMsg(&req, &peer{bzz: self})
+ self.lastActive = time.Now()
+ if err != nil {
+ return self.protoError(ErrDecode, "<- %v: %v", msg, err)
+ }
+
+ case paymentMsg:
+ // swap protocol message for payment, Units paid for, Cheque paid with
+ if self.swapEnabled {
+ var req paymentMsgData
+ if err := msg.Decode(&req); err != nil {
+ return self.protoError(ErrDecode, "<- %v: %v", msg, err)
+ }
+ glog.V(logger.Debug).Infof("<- payment: %s", req.String())
+ self.swap.Receive(int(req.Units), req.Promise)
+ }
+
+ default:
+ // no other message is allowed
+ return self.protoError(ErrInvalidMsgCode, "%v", msg.Code)
+ }
+ return nil
+}
+
+func (self *bzz) handleStatus() (err error) {
+
+ handshake := &statusMsgData{
+ Version: uint64(Version),
+ ID: "honey",
+ Addr: self.selfAddr(),
+ NetworkId: uint64(NetworkId),
+ Swap: &bzzswap.SwapProfile{
+ Profile: self.swapParams.Profile,
+ PayProfile: self.swapParams.PayProfile,
+ },
+ }
+
+ err = p2p.Send(self.rw, statusMsg, handshake)
+ if err != nil {
+ self.protoError(ErrNoStatusMsg, err.Error())
+ }
+
+ // read and handle remote status
+ var msg p2p.Msg
+ msg, err = self.rw.ReadMsg()
+ if err != nil {
+ return err
+ }
+
+ if msg.Code != statusMsg {
+ self.protoError(ErrNoStatusMsg, "first msg has code %x (!= %x)", msg.Code, statusMsg)
+ }
+
+ if msg.Size > ProtocolMaxMsgSize {
+ return self.protoError(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize)
+ }
+
+ var status statusMsgData
+ if err := msg.Decode(&status); err != nil {
+ return self.protoError(ErrDecode, " %v: %v", msg, err)
+ }
+
+ if status.NetworkId != NetworkId {
+ return self.protoError(ErrNetworkIdMismatch, "%d (!= %d)", status.NetworkId, NetworkId)
+ }
+
+ if Version != status.Version {
+ return self.protoError(ErrVersionMismatch, "%d (!= %d)", status.Version, Version)
+ }
+
+ self.remoteAddr = self.peerAddr(status.Addr)
+ glog.V(logger.Detail).Infof("self: advertised IP: %v, peer advertised: %v, local address: %v\npeer: advertised IP: %v, remote address: %v\n", self.selfAddr(), self.remoteAddr, self.peer.LocalAddr(), status.Addr.IP, self.peer.RemoteAddr())
+
+ if self.swapEnabled {
+ // set remote profile for accounting
+ self.swap, err = bzzswap.NewSwap(self.swapParams, status.Swap, self.backend, self)
+ if err != nil {
+ return self.protoError(ErrSwap, "%v", err)
+ }
+ }
+
+ glog.V(logger.Info).Infof("Peer %08x is capable (%d/%d)", self.remoteAddr.Addr[:4], status.Version, status.NetworkId)
+ err = self.hive.addPeer(&peer{bzz: self})
+ if err != nil {
+ return self.protoError(ErrUnwanted, "%v", err)
+ }
+
+ // hive sets syncstate so sync should start after node added
+ glog.V(logger.Info).Infof("syncronisation request sent with %v", self.syncState)
+ self.syncRequest()
+
+ return nil
+}
+
+func (self *bzz) sync(state *syncState) error {
+ // syncer setup
+ if self.syncer != nil {
+ return self.protoError(ErrSync, "sync request can only be sent once")
+ }
+
+ cnt := self.dbAccess.counter()
+ remoteaddr := self.remoteAddr.Addr
+ start, stop := self.hive.kad.KeyRange(remoteaddr)
+
+ // an explicitly received nil syncstate disables syncronisation
+ if state == nil {
+ self.syncEnabled = false
+ glog.V(logger.Warn).Infof("syncronisation disabled for peer %v", self)
+ state = &syncState{DbSyncState: &storage.DbSyncState{}, Synced: true}
+ } else {
+ state.synced = make(chan bool)
+ state.SessionAt = cnt
+ if storage.IsZeroKey(state.Stop) && state.Synced {
+ state.Start = storage.Key(start[:])
+ state.Stop = storage.Key(stop[:])
+ }
+ glog.V(logger.Debug).Infof("syncronisation requested by peer %v at state %v", self, state)
+ }
+ var err error
+ self.syncer, err = newSyncer(
+ self.requestDb,
+ storage.Key(remoteaddr[:]),
+ self.dbAccess,
+ self.unsyncedKeys, self.store,
+ self.syncParams, state, func() bool { return self.syncEnabled },
+ )
+ if err != nil {
+ return self.protoError(ErrSync, "%v", err)
+ }
+ glog.V(logger.Detail).Infof("syncer set for peer %v", self)
+ return nil
+}
+
+func (self *bzz) String() string {
+ return self.remoteAddr.String()
+}
+
+// repair reported address if IP missing
+func (self *bzz) peerAddr(base *peerAddr) *peerAddr {
+ if base.IP.IsUnspecified() {
+ host, _, _ := net.SplitHostPort(self.peer.RemoteAddr().String())
+ base.IP = net.ParseIP(host)
+ }
+ return base
+}
+
+// returns self advertised node connection info (listening address w enodes)
+// IP will get repaired on the other end if missing
+// or resolved via ID by discovery at dialout
+func (self *bzz) selfAddr() *peerAddr {
+ id := self.hive.id
+ host, port, _ := net.SplitHostPort(self.hive.listenAddr())
+ intport, _ := strconv.Atoi(port)
+ addr := &peerAddr{
+ Addr: self.hive.addr,
+ ID: id[:],
+ IP: net.ParseIP(host),
+ Port: uint16(intport),
+ }
+ return addr
+}
+
+// outgoing messages
+// send retrieveRequestMsg
+func (self *bzz) retrieve(req *retrieveRequestMsgData) error {
+ return self.send(retrieveRequestMsg, req)
+}
+
+// send storeRequestMsg
+func (self *bzz) store(req *storeRequestMsgData) error {
+ return self.send(storeRequestMsg, req)
+}
+
+func (self *bzz) syncRequest() error {
+ req := &syncRequestMsgData{}
+ if self.hive.syncEnabled {
+ glog.V(logger.Debug).Infof("syncronisation request to peer %v at state %v", self, self.syncState)
+ req.SyncState = self.syncState
+ }
+ if self.syncState == nil {
+ glog.V(logger.Warn).Infof("syncronisation disabled for peer %v at state %v", self, self.syncState)
+ }
+ return self.send(syncRequestMsg, req)
+}
+
+// queue storeRequestMsg in request db
+func (self *bzz) deliveryRequest(reqs []*syncRequest) error {
+ req := &deliveryRequestMsgData{
+ Deliver: reqs,
+ }
+ return self.send(deliveryRequestMsg, req)
+}
+
+// batch of syncRequests to send off
+func (self *bzz) unsyncedKeys(reqs []*syncRequest, state *syncState) error {
+ req := &unsyncedKeysMsgData{
+ Unsynced: reqs,
+ State: state,
+ }
+ return self.send(unsyncedKeysMsg, req)
+}
+
+// send paymentMsg
+func (self *bzz) Pay(units int, promise swap.Promise) {
+ req := &paymentMsgData{uint(units), promise.(*chequebook.Cheque)}
+ self.payment(req)
+}
+
+// send paymentMsg
+func (self *bzz) payment(req *paymentMsgData) error {
+ return self.send(paymentMsg, req)
+}
+
+// sends peersMsg
+func (self *bzz) peers(req *peersMsgData) error {
+ return self.send(peersMsg, req)
+}
+
+func (self *bzz) protoError(code int, format string, params ...interface{}) (err *errs.Error) {
+ err = self.errors.New(code, format, params...)
+ err.Log(glog.V(logger.Info))
+ return
+}
+
+func (self *bzz) protoErrorDisconnect(err *errs.Error) {
+ err.Log(glog.V(logger.Info))
+ if err.Fatal() {
+ self.peer.Disconnect(p2p.DiscSubprotocolError)
+ }
+}
+
+func (self *bzz) send(msg uint64, data interface{}) error {
+ if self.hive.blockWrite {
+ return fmt.Errorf("network write blocked")
+ }
+ glog.V(logger.Detail).Infof("-> %v: %v (%T) to %v", msg, data, data, self)
+ err := p2p.Send(self.rw, msg, data)
+ if err != nil {
+ self.Drop()
+ }
+ return err
+}