diff options
author | Felix Lange <fjl@twurst.com> | 2014-11-22 04:48:49 +0800 |
---|---|---|
committer | Felix Lange <fjl@twurst.com> | 2014-11-22 04:52:45 +0800 |
commit | 59b63caf5e4de64ceb7dcdf01551a080f53b1672 (patch) | |
tree | a4e79590284c5afe4d6927b422a5092b074e7938 /p2p/protocol.go | |
parent | e4a601c6444afdc11ce0cb80d7fd83116de2c8b9 (diff) | |
download | dexon-59b63caf5e4de64ceb7dcdf01551a080f53b1672.tar dexon-59b63caf5e4de64ceb7dcdf01551a080f53b1672.tar.gz dexon-59b63caf5e4de64ceb7dcdf01551a080f53b1672.tar.bz2 dexon-59b63caf5e4de64ceb7dcdf01551a080f53b1672.tar.lz dexon-59b63caf5e4de64ceb7dcdf01551a080f53b1672.tar.xz dexon-59b63caf5e4de64ceb7dcdf01551a080f53b1672.tar.zst dexon-59b63caf5e4de64ceb7dcdf01551a080f53b1672.zip |
p2p: API cleanup and PoC 7 compatibility
Whoa, one more big commit. I didn't manage to untangle the
changes while working towards compatibility.
Diffstat (limited to 'p2p/protocol.go')
-rw-r--r-- | p2p/protocol.go | 412 |
1 files changed, 194 insertions, 218 deletions
diff --git a/p2p/protocol.go b/p2p/protocol.go index d22ba70cb..169dcdb6e 100644 --- a/p2p/protocol.go +++ b/p2p/protocol.go @@ -3,249 +3,185 @@ package p2p import ( "bytes" "net" - "sort" "time" "github.com/ethereum/go-ethereum/ethutil" ) -// Protocol is implemented by P2P subprotocols. -type Protocol interface { - // Start is called when the protocol becomes active. - // It should read and write messages from rw. - // Messages must be fully consumed. - // - // The connection is closed when Start returns. It should return - // any protocol-level error (such as an I/O error) that is - // encountered. - Start(peer *Peer, rw MsgReadWriter) error +// Protocol represents a P2P subprotocol implementation. +type Protocol struct { + // Name should contain the official protocol name, + // often a three-letter word. + Name string - // Offset should return the number of message codes - // used by the protocol. - Offset() MsgCode -} + // Version should contain the version number of the protocol. + Version uint -type MsgReader interface { - ReadMsg() (Msg, error) -} - -type MsgWriter interface { - WriteMsg(Msg) error -} - -// MsgReadWriter is passed to protocols. Protocol implementations can -// use it to write messages back to a connected peer. -type MsgReadWriter interface { - MsgReader - MsgWriter -} + // Length should contain the number of message codes used + // by the protocol. + Length uint64 -type MsgHandler func(code MsgCode, data *ethutil.Value) error - -// MsgLoop reads messages off the given reader and -// calls the handler function for each decoded message until -// it returns an error or the peer connection is closed. -// -// If a message is larger than the given maximum size, RunProtocol -// returns an appropriate error.n -func MsgLoop(r MsgReader, maxsize uint32, handler MsgHandler) error { - for { - msg, err := r.ReadMsg() - if err != nil { - return err - } - if msg.Size > maxsize { - return NewPeerError(InvalidMsg, "size %d exceeds maximum size of %d", msg.Size, maxsize) - } - value, err := msg.Data() - if err != nil { - return err - } - if err := handler(msg.Code, value); err != nil { - return err - } - } -} - -// the ÐΞVp2p base protocol -type baseProtocol struct { - rw MsgReadWriter - peer *Peer + // Run is called in a new groutine when the protocol has been + // negotiated with a peer. It should read and write messages from + // rw. The Payload for each message must be fully consumed. + // + // The peer connection is closed when Start returns. It should return + // any protocol-level error (such as an I/O error) that is + // encountered. + Run func(peer *Peer, rw MsgReadWriter) error } -type bpMsg struct { - code MsgCode - data *ethutil.Value +func (p Protocol) cap() Cap { + return Cap{p.Name, p.Version} } const ( - p2pVersion = 0 - pingTimeout = 2 * time.Second - pingGracePeriod = 2 * time.Second + baseProtocolVersion = 2 + baseProtocolLength = uint64(16) + baseProtocolMaxMsgSize = 10 * 1024 * 1024 ) const ( - // message codes - handshakeMsg = iota - discMsg - pingMsg - pongMsg - getPeersMsg - peersMsg + // devp2p message codes + handshakeMsg = 0x00 + discMsg = 0x01 + pingMsg = 0x02 + pongMsg = 0x03 + getPeersMsg = 0x04 + peersMsg = 0x05 ) -const ( - baseProtocolOffset MsgCode = 16 - baseProtocolMaxMsgSize = 500 * 1024 -) - -type DiscReason byte +// handshake is the structure of a handshake list. +type handshake struct { + Version uint64 + ID string + Caps []Cap + ListenPort uint64 + NodeID []byte +} -const ( - // Values are given explicitly instead of by iota because these values are - // defined by the wire protocol spec; it is easier for humans to ensure - // correctness when values are explicit. - DiscRequested = 0x00 - DiscNetworkError = 0x01 - DiscProtocolError = 0x02 - DiscUselessPeer = 0x03 - DiscTooManyPeers = 0x04 - DiscAlreadyConnected = 0x05 - DiscIncompatibleVersion = 0x06 - DiscInvalidIdentity = 0x07 - DiscQuitting = 0x08 - DiscUnexpectedIdentity = 0x09 - DiscSelf = 0x0a - DiscReadTimeout = 0x0b - DiscSubprotocolError = 0x10 -) +func (h *handshake) String() string { + return h.ID +} +func (h *handshake) Pubkey() []byte { + return h.NodeID +} -var discReasonToString = [DiscSubprotocolError + 1]string{ - DiscRequested: "Disconnect requested", - DiscNetworkError: "Network error", - DiscProtocolError: "Breach of protocol", - DiscUselessPeer: "Useless peer", - DiscTooManyPeers: "Too many peers", - DiscAlreadyConnected: "Already connected", - DiscIncompatibleVersion: "Incompatible P2P protocol version", - DiscInvalidIdentity: "Invalid node identity", - DiscQuitting: "Client quitting", - DiscUnexpectedIdentity: "Unexpected identity", - DiscSelf: "Connected to self", - DiscReadTimeout: "Read timeout", - DiscSubprotocolError: "Subprotocol error", +// Cap is the structure of a peer capability. +type Cap struct { + Name string + Version uint } -func (d DiscReason) String() string { - if len(discReasonToString) < int(d) { - return "Unknown" - } - return discReasonToString[d] +func (cap Cap) RlpData() interface{} { + return []interface{}{cap.Name, cap.Version} } -func (bp *baseProtocol) Offset() MsgCode { - return baseProtocolOffset +type capsByName []Cap + +func (cs capsByName) Len() int { return len(cs) } +func (cs capsByName) Less(i, j int) bool { return cs[i].Name < cs[j].Name } +func (cs capsByName) Swap(i, j int) { cs[i], cs[j] = cs[j], cs[i] } + +type baseProtocol struct { + rw MsgReadWriter + peer *Peer } -func (bp *baseProtocol) Start(peer *Peer, rw MsgReadWriter) error { - bp.peer, bp.rw = peer, rw +func runBaseProtocol(peer *Peer, rw MsgReadWriter) error { + bp := &baseProtocol{rw, peer} - // Do the handshake. - // TODO: disconnect is valid before handshake, too. - rw.WriteMsg(bp.peer.server.handshakeMsg()) + // do handshake + if err := rw.WriteMsg(bp.handshakeMsg()); err != nil { + return err + } msg, err := rw.ReadMsg() if err != nil { return err } if msg.Code != handshakeMsg { - return NewPeerError(ProtocolBreach, " first message must be handshake") + return newPeerError(errProtocolBreach, "first message must be handshake, got %x", msg.Code) } data, err := msg.Data() if err != nil { - return NewPeerError(InvalidMsg, "%v", err) + return newPeerError(errInvalidMsg, "%v", err) } if err := bp.handleHandshake(data); err != nil { return err } - msgin := make(chan bpMsg) - done := make(chan error, 1) + // run main loop + quit := make(chan error, 1) go func() { - done <- MsgLoop(rw, baseProtocolMaxMsgSize, - func(code MsgCode, data *ethutil.Value) error { - msgin <- bpMsg{code, data} - return nil - }) + quit <- MsgLoop(rw, baseProtocolMaxMsgSize, bp.handle) }() - return bp.loop(msgin, done) + return bp.loop(quit) } -func (bp *baseProtocol) loop(msgin <-chan bpMsg, quit <-chan error) error { - logger.Debugf("pingpong keepalive started at %v\n", time.Now()) - messenger := bp.rw.(*proto).messenger - pingTimer := time.NewTimer(pingTimeout) - pinged := true +var pingTimeout = 2 * time.Second + +func (bp *baseProtocol) loop(quit <-chan error) error { + ping := time.NewTimer(pingTimeout) + activity := bp.peer.activity.Subscribe(time.Time{}) + lastActive := time.Time{} + defer ping.Stop() + defer activity.Unsubscribe() - for { + getPeersTick := time.NewTicker(10 * time.Second) + defer getPeersTick.Stop() + err := bp.rw.EncodeMsg(getPeersMsg) + + for err == nil { select { - case msg := <-msgin: - if err := bp.handle(msg.code, msg.data); err != nil { - return err - } - case err := <-quit: + case err = <-quit: return err - case <-messenger.pulse: - pingTimer.Reset(pingTimeout) - pinged = false - case <-pingTimer.C: - if pinged { - return NewPeerError(PingTimeout, "") + case <-getPeersTick.C: + err = bp.rw.EncodeMsg(getPeersMsg) + case event := <-activity.Chan(): + ping.Reset(pingTimeout) + lastActive = event.(time.Time) + case t := <-ping.C: + if lastActive.Add(pingTimeout * 2).Before(t) { + err = newPeerError(errPingTimeout, "") + } else if lastActive.Add(pingTimeout).Before(t) { + err = bp.rw.EncodeMsg(pingMsg) } - logger.Debugf("pinging at %v\n", time.Now()) - if err := bp.rw.WriteMsg(NewMsg(pingMsg)); err != nil { - return NewPeerError(WriteError, "%v", err) - } - pinged = true - pingTimer.Reset(pingTimeout) } } + return err } -func (bp *baseProtocol) handle(code MsgCode, data *ethutil.Value) error { +func (bp *baseProtocol) handle(code uint64, data *ethutil.Value) error { switch code { case handshakeMsg: - return NewPeerError(ProtocolBreach, " extra handshake received") + return newPeerError(errProtocolBreach, "extra handshake received") case discMsg: - logger.Infof("Disconnect requested from peer %v, reason", DiscReason(data.Get(0).Uint())) - bp.peer.server.PeerDisconnect() <- DisconnectRequest{ - addr: bp.peer.Address, - reason: DiscRequested, - } + bp.peer.Disconnect(DiscReason(data.Get(0).Uint())) + return nil case pingMsg: - return bp.rw.WriteMsg(NewMsg(pongMsg)) + return bp.rw.EncodeMsg(pongMsg) case pongMsg: - // reply for ping case getPeersMsg: - // Peer asked for list of connected peers. - peersRLP := bp.peer.server.encodedPeerList() - if peersRLP != nil { - msg := Msg{ - Code: peersMsg, - Size: uint32(len(peersRLP)), - Payload: bytes.NewReader(peersRLP), - } - return bp.rw.WriteMsg(msg) + peers := bp.peerList() + // this is dangerous. the spec says that we should _delay_ + // sending the response if no new information is available. + // this means that would need to send a response later when + // new peers become available. + // + // TODO: add event mechanism to notify baseProtocol for new peers + if len(peers) > 0 { + return bp.rw.EncodeMsg(peersMsg, peers) } case peersMsg: bp.handlePeers(data) default: - return NewPeerError(InvalidMsgCode, "unknown message code %v", code) + return newPeerError(errInvalidMsgCode, "unknown message code %v", code) } return nil } @@ -253,62 +189,102 @@ func (bp *baseProtocol) handle(code MsgCode, data *ethutil.Value) error { func (bp *baseProtocol) handlePeers(data *ethutil.Value) { it := data.NewIterator() for it.Next() { - ip := net.IP(it.Value().Get(0).Bytes()) - port := it.Value().Get(1).Uint() - address := &net.TCPAddr{IP: ip, Port: int(port)} - go bp.peer.server.PeerConnect(address) + addr := &peerAddr{ + IP: net.IP(it.Value().Get(0).Bytes()), + Port: it.Value().Get(1).Uint(), + Pubkey: it.Value().Get(2).Bytes(), + } + bp.peer.Debugf("received peer suggestion: %v", addr) + bp.peer.newPeerAddr <- addr } } func (bp *baseProtocol) handleHandshake(c *ethutil.Value) error { - var ( - remoteVersion = c.Get(0).Uint() - id = c.Get(1).Str() - caps = c.Get(2) - port = c.Get(3).Uint() - pubkey = c.Get(4).Bytes() - ) - // Check correctness of p2p protocol version - if remoteVersion != p2pVersion { - return NewPeerError(P2PVersionMismatch, "Require protocol %d, received %d\n", p2pVersion, remoteVersion) + hs := handshake{ + Version: c.Get(0).Uint(), + ID: c.Get(1).Str(), + Caps: nil, // decoded below + ListenPort: c.Get(3).Uint(), + NodeID: c.Get(4).Bytes(), } - - // Handle the pub key (validation, uniqueness) - if len(pubkey) == 0 { - return NewPeerError(PubkeyMissing, "not supplied in handshake.") + if hs.Version != baseProtocolVersion { + return newPeerError(errP2PVersionMismatch, "Require protocol %d, received %d\n", + baseProtocolVersion, hs.Version) } - - if len(pubkey) != 64 { - return NewPeerError(PubkeyInvalid, "require 512 bit, got %v", len(pubkey)*8) + if len(hs.NodeID) == 0 { + return newPeerError(errPubkeyMissing, "") + } + if len(hs.NodeID) != 64 { + return newPeerError(errPubkeyInvalid, "require 512 bit, got %v", len(hs.NodeID)*8) + } + if da := bp.peer.dialAddr; da != nil { + // verify that the peer we wanted to connect to + // actually holds the target public key. + if da.Pubkey != nil && !bytes.Equal(da.Pubkey, hs.NodeID) { + return newPeerError(errPubkeyForbidden, "dial address pubkey mismatch") + } + } + pa := newPeerAddr(bp.peer.conn.RemoteAddr(), hs.NodeID) + if err := bp.peer.pubkeyHook(pa); err != nil { + return newPeerError(errPubkeyForbidden, "%v", err) + } + capsIt := c.Get(2).NewIterator() + for capsIt.Next() { + cap := capsIt.Value() + name := cap.Get(0).Str() + if name != "" { + hs.Caps = append(hs.Caps, Cap{Name: name, Version: uint(cap.Get(1).Uint())}) + } } - // self connect detection - if bytes.Compare(bp.peer.server.ClientIdentity().Pubkey()[1:], pubkey) == 0 { - return NewPeerError(PubkeyForbidden, "not allowed to connect to self") + var addr *peerAddr + if hs.ListenPort != 0 { + addr = newPeerAddr(bp.peer.conn.RemoteAddr(), hs.NodeID) + addr.Port = hs.ListenPort } + bp.peer.setHandshakeInfo(&hs, addr, hs.Caps) + bp.peer.startSubprotocols(hs.Caps) + return nil +} - // register pubkey on server. this also sets the pubkey on the peer (need lock) - if err := bp.peer.server.RegisterPubkey(bp.peer, pubkey); err != nil { - return NewPeerError(PubkeyForbidden, err.Error()) +func (bp *baseProtocol) handshakeMsg() Msg { + var ( + port uint64 + caps []interface{} + ) + if bp.peer.ourListenAddr != nil { + port = bp.peer.ourListenAddr.Port } + for _, proto := range bp.peer.protocols { + caps = append(caps, proto.cap()) + } + return NewMsg(handshakeMsg, + baseProtocolVersion, + bp.peer.ourID.String(), + caps, + port, + bp.peer.ourID.Pubkey()[1:], + ) +} - // check port - if bp.peer.Inbound { - uint16port := uint16(port) - if bp.peer.Port > 0 && bp.peer.Port != uint16port { - return NewPeerError(PortMismatch, "port mismatch: %v != %v", bp.peer.Port, port) - } else { - bp.peer.Port = uint16port +func (bp *baseProtocol) peerList() []ethutil.RlpEncodable { + peers := bp.peer.otherPeers() + ds := make([]ethutil.RlpEncodable, 0, len(peers)) + for _, p := range peers { + p.infolock.Lock() + addr := p.listenAddr + p.infolock.Unlock() + // filter out this peer and peers that are not listening or + // have not completed the handshake. + // TODO: track previously sent peers and exclude them as well. + if p == bp.peer || addr == nil { + continue } + ds = append(ds, addr) } - - capsIt := caps.NewIterator() - for capsIt.Next() { - cap := capsIt.Value().Str() - bp.peer.Caps = append(bp.peer.Caps, cap) + ourAddr := bp.peer.ourListenAddr + if ourAddr != nil && !ourAddr.IP.IsLoopback() && !ourAddr.IP.IsUnspecified() { + ds = append(ds, ourAddr) } - sort.Strings(bp.peer.Caps) - bp.rw.(*proto).messenger.setRemoteProtocols(bp.peer.Caps) - bp.peer.Id = id - return nil + return ds } |