diff options
Diffstat (limited to 'peer.go')
-rw-r--r-- | peer.go | 400 |
1 files changed, 234 insertions, 166 deletions
@@ -24,7 +24,11 @@ const ( // The size of the output buffer for writing messages outputBufferSize = 50 // Current protocol version - ProtocolVersion = 28 + ProtocolVersion = 33 + // Current P2P version + P2PVersion = 0 + // Ethereum network version + NetVersion = 0 // Interval for ping/pong message pingPongTimer = 2 * time.Second ) @@ -70,7 +74,7 @@ func (d DiscReason) String() string { type Caps byte const ( - CapPeerDiscTy = 1 << iota + CapPeerDiscTy Caps = 1 << iota CapTxTy CapChainTy @@ -122,6 +126,7 @@ type Peer struct { // This flag is used by writeMessage to check if messages are allowed // to be send or not. If no version is known all messages are ignored. versionKnown bool + statusKnown bool // Last received pong message lastPong int64 @@ -179,6 +184,7 @@ func NewOutboundPeer(addr string, ethereum *Ethereum, caps Caps) *Peer { inbound: false, connected: 0, disconnect: 0, + port: 30303, caps: caps, version: ethereum.ClientIdentity().String(), } @@ -271,9 +277,19 @@ func (p *Peer) writeMessage(msg *ethwire.Msg) { default: // Anything but ack is allowed return } + } else { + /* + if !p.statusKnown { + switch msg.Type { + case ethwire.MsgStatusTy: // Ok + default: // Anything but ack is allowed + return + } + } + */ } - peerlogger.DebugDetailf("(%v) <= %v %v\n", p.conn.RemoteAddr(), msg.Type, msg.Data) + peerlogger.DebugDetailf("(%v) <= %v\n", p.conn.RemoteAddr(), formatMessage(msg)) err := ethwire.WriteMessage(p.conn, msg) if err != nil { @@ -295,6 +311,14 @@ out: select { // Main message queue. All outbound messages are processed through here case msg := <-p.outputQueue: + if !p.statusKnown { + switch msg.Type { + case ethwire.MsgGetTxsTy, ethwire.MsgGetBlockHashesTy, ethwire.MsgGetBlocksTy, ethwire.MsgBlockHashesTy, ethwire.MsgBlockTy: + peerlogger.Debugln("Blocked outgoing [eth] message to peer without the [eth] cap.") + break + } + } + p.writeMessage(msg) p.lastSend = time.Now() @@ -337,6 +361,29 @@ clean: } } +func formatMessage(msg *ethwire.Msg) (ret string) { + ret = fmt.Sprintf("%v %v", msg.Type, msg.Data) + + /* + XXX Commented out because I need the log level here to determine + if i should or shouldn't generate this message + */ + /* + switch msg.Type { + case ethwire.MsgPeersTy: + ret += fmt.Sprintf("(%d entries)", msg.Data.Len()) + case ethwire.MsgBlockTy: + b1, b2 := ethchain.NewBlockFromRlpValue(msg.Data.Get(0)), ethchain.NewBlockFromRlpValue(msg.Data.Get(msg.Data.Len()-1)) + ret += fmt.Sprintf("(%d entries) %x - %x", msg.Data.Len(), b1.Hash()[0:4], b2.Hash()[0:4]) + case ethwire.MsgBlockHashesTy: + h1, h2 := msg.Data.Get(0).Bytes(), msg.Data.Get(msg.Data.Len()-1).Bytes() + ret += fmt.Sprintf("(%d entries) %x - %x", msg.Data.Len(), h1, h2) + } + */ + + return +} + // Inbound handler. Inbound messages are received here and passed to the appropriate methods func (p *Peer) HandleInbound() { for atomic.LoadInt32(&p.disconnect) == 0 { @@ -349,16 +396,16 @@ func (p *Peer) HandleInbound() { peerlogger.Debugln(err) } for _, msg := range msgs { - peerlogger.DebugDetailf("(%v) => %v %v\n", p.conn.RemoteAddr(), msg.Type, msg.Data) + peerlogger.DebugDetailf("(%v) => %v\n", p.conn.RemoteAddr(), formatMessage(msg)) switch msg.Type { case ethwire.MsgHandshakeTy: // Version message p.handleHandshake(msg) - if p.caps.IsCap(CapPeerDiscTy) { - p.QueueMessage(ethwire.NewMessage(ethwire.MsgGetPeersTy, "")) - } + //if p.caps.IsCap(CapPeerDiscTy) { + p.QueueMessage(ethwire.NewMessage(ethwire.MsgGetPeersTy, "")) + //} case ethwire.MsgDiscTy: p.Stop() @@ -396,95 +443,111 @@ func (p *Peer) HandleInbound() { // Connect to the list of peers p.ethereum.ProcessPeerList(peers) - case ethwire.MsgGetTxsTy: - // Get the current transactions of the pool - txs := p.ethereum.TxPool().CurrentTransactions() - // Get the RlpData values from the txs - txsInterface := make([]interface{}, len(txs)) - for i, tx := range txs { - txsInterface[i] = tx.RlpData() - } - // Broadcast it back to the peer - p.QueueMessage(ethwire.NewMessage(ethwire.MsgTxTy, txsInterface)) - case ethwire.MsgGetBlockHashesTy: - if msg.Data.Len() < 2 { - peerlogger.Debugln("err: argument length invalid ", msg.Data.Len()) - } + case ethwire.MsgStatusTy: + // Handle peer's status msg + p.handleStatus(msg) + } - hash := msg.Data.Get(0).Bytes() - amount := msg.Data.Get(1).Uint() + // TMP + if p.statusKnown { + switch msg.Type { + case ethwire.MsgGetTxsTy: + // Get the current transactions of the pool + txs := p.ethereum.TxPool().CurrentTransactions() + // Get the RlpData values from the txs + txsInterface := make([]interface{}, len(txs)) + for i, tx := range txs { + txsInterface[i] = tx.RlpData() + } + // Broadcast it back to the peer + p.QueueMessage(ethwire.NewMessage(ethwire.MsgTxTy, txsInterface)) - hashes := p.ethereum.BlockChain().GetChainHashesFromHash(hash, amount) + case ethwire.MsgGetBlockHashesTy: + if msg.Data.Len() < 2 { + peerlogger.Debugln("err: argument length invalid ", msg.Data.Len()) + } - p.QueueMessage(ethwire.NewMessage(ethwire.MsgBlockHashesTy, ethutil.ByteSliceToInterface(hashes))) + hash := msg.Data.Get(0).Bytes() + amount := msg.Data.Get(1).Uint() - case ethwire.MsgGetBlocksTy: - // Limit to max 300 blocks - max := int(math.Min(float64(msg.Data.Len()), 300.0)) - var blocks []interface{} + hashes := p.ethereum.BlockChain().GetChainHashesFromHash(hash, amount) - for i := 0; i < max; i++ { - hash := msg.Data.Get(i).Bytes() - block := p.ethereum.BlockChain().GetBlock(hash) - if block != nil { - blocks = append(blocks, block.Value().Raw()) - } - } + p.QueueMessage(ethwire.NewMessage(ethwire.MsgBlockHashesTy, ethutil.ByteSliceToInterface(hashes))) - p.QueueMessage(ethwire.NewMessage(ethwire.MsgBlockTy, blocks)) + case ethwire.MsgGetBlocksTy: + // Limit to max 300 blocks + max := int(math.Min(float64(msg.Data.Len()), 300.0)) + var blocks []interface{} - case ethwire.MsgBlockHashesTy: - p.catchingUp = true + for i := 0; i < max; i++ { + hash := msg.Data.Get(i).Bytes() + block := p.ethereum.BlockChain().GetBlock(hash) + if block != nil { + blocks = append(blocks, block.Value().Raw()) + } + } - blockPool := p.ethereum.blockPool + p.QueueMessage(ethwire.NewMessage(ethwire.MsgBlockTy, blocks)) - foundCommonHash := false + case ethwire.MsgBlockHashesTy: + p.catchingUp = true - it := msg.Data.NewIterator() - for it.Next() { - hash := it.Value().Bytes() + blockPool := p.ethereum.blockPool - if blockPool.HasCommonHash(hash) { - foundCommonHash = true + foundCommonHash := false - break - } + it := msg.Data.NewIterator() + for it.Next() { + hash := it.Value().Bytes() - blockPool.AddHash(hash) + if blockPool.HasCommonHash(hash) { + foundCommonHash = true - p.lastReceivedHash = hash + break + } - p.lastBlockReceived = time.Now() - } + blockPool.AddHash(hash) - if foundCommonHash { - p.FetchBlocks() - } else { - p.FetchHashes() - } + p.lastReceivedHash = hash - case ethwire.MsgBlockTy: - p.catchingUp = true + p.lastBlockReceived = time.Now() + } - blockPool := p.ethereum.blockPool + if foundCommonHash || msg.Data.Len() == 0 { + p.FetchBlocks() + } else { + p.FetchHashes() + } - it := msg.Data.NewIterator() + case ethwire.MsgBlockTy: + p.catchingUp = true - for it.Next() { - block := ethchain.NewBlockFromRlpValue(it.Value()) + blockPool := p.ethereum.blockPool - blockPool.SetBlock(block) + it := msg.Data.NewIterator() + for it.Next() { + block := ethchain.NewBlockFromRlpValue(it.Value()) + //fmt.Printf("%v %x - %x\n", block.Number, block.Hash()[0:4], block.PrevHash[0:4]) - p.lastBlockReceived = time.Now() - } + blockPool.SetBlock(block, p) - linked := blockPool.CheckLinkAndProcess(func(block *ethchain.Block) { - p.ethereum.StateManager().Process(block, false) - }) + p.lastBlockReceived = time.Now() + } - if !linked { - p.FetchBlocks() + var err error + blockPool.CheckLinkAndProcess(func(block *ethchain.Block) { + err = p.ethereum.StateManager().Process(block, false) + }) + + if err != nil { + peerlogger.Infoln(err) + } else { + // Don't trigger if there's just one block. + if blockPool.Len() != 0 && msg.Data.Len() > 1 { + p.FetchBlocks() + } + } } } } @@ -506,10 +569,10 @@ func (self *Peer) FetchHashes() { blockPool := self.ethereum.blockPool if self.td.Cmp(blockPool.td) >= 0 { - peerlogger.Debugf("Requesting hashes from %x\n", self.lastReceivedHash) + blockPool.td = self.td if !blockPool.HasLatestHash() { - self.QueueMessage(ethwire.NewMessage(ethwire.MsgGetBlockHashesTy, []interface{}{self.lastReceivedHash, uint32(200)})) + self.QueueMessage(ethwire.NewMessage(ethwire.MsgGetBlockHashesTy, []interface{}{self.lastReceivedHash, uint32(256)})) } } } @@ -580,18 +643,6 @@ func (p *Peer) Stop() { p.ethereum.RemovePeer(p) } -func (p *Peer) pushHandshake() error { - pubkey := p.ethereum.KeyManager().PublicKey() - msg := ethwire.NewMessage(ethwire.MsgHandshakeTy, []interface{}{ - uint32(ProtocolVersion), uint32(0), []byte(p.version), byte(p.caps), p.port, pubkey[1:], - p.ethereum.BlockChain().TD.Uint64(), p.ethereum.BlockChain().CurrentBlock.Hash(), - }) - - p.QueueMessage(msg) - - return nil -} - func (p *Peer) peersMessage() *ethwire.Msg { outPeers := make([]interface{}, len(p.ethereum.InOutPeers())) // Serialise each peer @@ -611,13 +662,93 @@ func (p *Peer) pushPeers() { p.QueueMessage(p.peersMessage()) } +func (self *Peer) pushStatus() { + msg := ethwire.NewMessage(ethwire.MsgStatusTy, []interface{}{ + uint32(ProtocolVersion), + uint32(NetVersion), + self.ethereum.BlockChain().TD, + self.ethereum.BlockChain().CurrentBlock.Hash(), + self.ethereum.BlockChain().Genesis().Hash(), + }) + + self.QueueMessage(msg) +} + +func (self *Peer) handleStatus(msg *ethwire.Msg) { + c := msg.Data + + var ( + protoVersion = c.Get(0).Uint() + netVersion = c.Get(1).Uint() + td = c.Get(2).BigInt() + bestHash = c.Get(3).Bytes() + genesis = c.Get(4).Bytes() + ) + + if bytes.Compare(self.ethereum.BlockChain().Genesis().Hash(), genesis) != 0 { + ethlogger.Warnf("Invalid genisis hash %x. Disabling [eth]\n", genesis) + return + } + + if netVersion != NetVersion { + ethlogger.Warnf("Invalid network version %d. Disabling [eth]\n", netVersion) + return + } + + if protoVersion != ProtocolVersion { + ethlogger.Warnf("Invalid protocol version %d. Disabling [eth]\n", protoVersion) + return + } + + // Get the td and last hash + self.td = td + self.bestHash = bestHash + self.lastReceivedHash = bestHash + + self.statusKnown = true + + // Compare the total TD with the blockchain TD. If remote is higher + // fetch hashes from highest TD node. + if self.td.Cmp(self.ethereum.BlockChain().TD) > 0 { + self.ethereum.blockPool.AddHash(self.lastReceivedHash) + self.FetchHashes() + } + + ethlogger.Infof("Peer is [eth] capable. (TD = %v ~ %x) %d / %d", self.td, self.bestHash, protoVersion, netVersion) + +} + +func (p *Peer) pushHandshake() error { + pubkey := p.ethereum.KeyManager().PublicKey() + msg := ethwire.NewMessage(ethwire.MsgHandshakeTy, []interface{}{ + P2PVersion, []byte(p.version), []interface{}{"eth"}, p.port, pubkey[1:], + }) + + p.QueueMessage(msg) + + return nil +} + func (p *Peer) handleHandshake(msg *ethwire.Msg) { c := msg.Data - // Set pubkey - p.pubkey = c.Get(5).Bytes() + var ( + p2pVersion = c.Get(0).Uint() + clientId = c.Get(1).Str() + caps = c.Get(2) + port = c.Get(3).Uint() + pub = c.Get(4).Bytes() + ) + + // Check correctness of p2p protocol version + if p2pVersion != P2PVersion { + peerlogger.Debugf("Invalid P2P version. Require protocol %d, received %d\n", P2PVersion, p2pVersion) + p.Stop() + return + } - if p.pubkey == nil { + // Handle the pub key (validation, uniqueness) + if len(pub) == 0 { peerlogger.Warnln("Pubkey required, not supplied in handshake.") p.Stop() return @@ -625,9 +756,8 @@ func (p *Peer) handleHandshake(msg *ethwire.Msg) { usedPub := 0 // This peer is already added to the peerlist so we expect to find a double pubkey at least once - eachPeer(p.ethereum.Peers(), func(peer *Peer, e *list.Element) { - if bytes.Compare(p.pubkey, peer.pubkey) == 0 { + if bytes.Compare(pub, peer.pubkey) == 0 { usedPub++ } }) @@ -637,19 +767,11 @@ func (p *Peer) handleHandshake(msg *ethwire.Msg) { p.Stop() return } - - if c.Get(0).Uint() != ProtocolVersion { - peerlogger.Debugf("Invalid peer version. Require protocol: %d. Received: %d\n", ProtocolVersion, c.Get(0).Uint()) - p.Stop() - return - } - - // [PROTOCOL_VERSION, NETWORK_ID, CLIENT_ID, CAPS, PORT, PUBKEY] - p.versionKnown = true + p.pubkey = pub // If this is an inbound connection send an ack back if p.inbound { - p.port = uint16(c.Get(4).Uint()) + p.port = uint16(port) // Self connect detection pubkey := p.ethereum.KeyManager().PublicKey() @@ -660,40 +782,27 @@ func (p *Peer) handleHandshake(msg *ethwire.Msg) { } } + p.SetVersion(clientId) - // Set the peer's caps - p.caps = Caps(c.Get(3).Byte()) - - // Get a reference to the peers version - versionString := c.Get(2).Str() - if len(versionString) > 0 { - p.SetVersion(c.Get(2).Str()) - } - - // Get the td and last hash - p.td = c.Get(6).BigInt() - p.bestHash = c.Get(7).Bytes() - p.lastReceivedHash = p.bestHash + p.versionKnown = true p.ethereum.PushPeer(p) p.ethereum.reactor.Post("peerList", p.ethereum.Peers()) - ethlogger.Infof("Added peer (%s) %d / %d (TD = %v ~ %x)\n", p.conn.RemoteAddr(), p.ethereum.Peers().Len(), p.ethereum.MaxPeers, p.td, p.bestHash) - - /* - // Catch up with the connected peer - if !p.ethereum.IsUpToDate() { - peerlogger.Debugln("Already syncing up with a peer; sleeping") - time.Sleep(10 * time.Second) + capsIt := caps.NewIterator() + var capsStrs []string + for capsIt.Next() { + cap := capsIt.Value().Str() + switch cap { + case "eth": + p.pushStatus() } - */ - //p.SyncWithPeerToLastKnown() - if p.td.Cmp(p.ethereum.BlockChain().TD) > 0 { - p.ethereum.blockPool.AddHash(p.lastReceivedHash) - p.FetchHashes() + capsStrs = append(capsStrs, cap) } + ethlogger.Infof("Added peer (%s) %d / %d (%v)\n", p.conn.RemoteAddr(), p.ethereum.Peers().Len(), p.ethereum.MaxPeers, capsStrs) + peerlogger.Debugln(p) } @@ -714,47 +823,6 @@ func (p *Peer) String() string { return fmt.Sprintf("[%s] (%s) %v %s [%s]", strConnectType, strBoundType, p.conn.RemoteAddr(), p.version, p.caps) } -func (p *Peer) SyncWithPeerToLastKnown() { - p.catchingUp = false - p.CatchupWithPeer(p.ethereum.BlockChain().CurrentBlock.Hash()) -} - -func (p *Peer) FindCommonParentBlock() { - if p.catchingUp { - return - } - - p.catchingUp = true - if p.blocksRequested == 0 { - p.blocksRequested = 20 - } - blocks := p.ethereum.BlockChain().GetChain(p.ethereum.BlockChain().CurrentBlock.Hash(), p.blocksRequested) - - var hashes []interface{} - for _, block := range blocks { - hashes = append(hashes, block.Hash()) - } - - msgInfo := append(hashes, uint64(len(hashes))) - - peerlogger.DebugDetailf("Asking for block from %x (%d total) from %s\n", p.ethereum.BlockChain().CurrentBlock.Hash(), len(hashes), p.conn.RemoteAddr().String()) - - msg := ethwire.NewMessage(ethwire.MsgGetChainTy, msgInfo) - p.QueueMessage(msg) -} -func (p *Peer) CatchupWithPeer(blockHash []byte) { - if !p.catchingUp { - // Make sure nobody else is catching up when you want to do this - p.catchingUp = true - msg := ethwire.NewMessage(ethwire.MsgGetChainTy, []interface{}{blockHash, uint64(100)}) - p.QueueMessage(msg) - - peerlogger.DebugDetailf("Requesting blockchain %x... from peer %s\n", p.ethereum.BlockChain().CurrentBlock.Hash()[:4], p.conn.RemoteAddr()) - - msg = ethwire.NewMessage(ethwire.MsgGetTxsTy, []interface{}{}) - p.QueueMessage(msg) - } -} func (p *Peer) RlpData() []interface{} { return []interface{}{p.host, p.port, p.pubkey} |