diff options
Diffstat (limited to 'peer.go')
-rw-r--r-- | peer.go | 177 |
1 files changed, 101 insertions, 76 deletions
@@ -24,9 +24,11 @@ const ( // The size of the output buffer for writing messages outputBufferSize = 50 // Current protocol version - ProtocolVersion = 32 + ProtocolVersion = 33 // Current P2P version P2PVersion = 0 + // Ethereum network version + NetVersion = 0 // Interval for ping/pong message pingPongTimer = 2 * time.Second ) @@ -72,7 +74,7 @@ func (d DiscReason) String() string { type Caps byte const ( - CapPeerDiscTy = 1 << iota + CapPeerDiscTy Caps = 1 << iota CapTxTy CapChainTy @@ -309,6 +311,14 @@ out: select { // Main message queue. All outbound messages are processed through here case msg := <-p.outputQueue: + if !p.statusKnown { + switch msg.Type { + case ethwire.MsgGetTxsTy, ethwire.MsgGetBlockHashesTy, ethwire.MsgGetBlocksTy, ethwire.MsgBlockHashesTy, ethwire.MsgBlockTy: + peerlogger.Debugln("Blocked outgoing [eth] message to peer without the [eth] cap.") + break + } + } + p.writeMessage(msg) p.lastSend = time.Now() @@ -435,100 +445,106 @@ func (p *Peer) HandleInbound() { case ethwire.MsgStatusTy: // Handle peer's status msg p.handleStatus(msg) - case ethwire.MsgGetTxsTy: - // Get the current transactions of the pool - txs := p.ethereum.TxPool().CurrentTransactions() - // Get the RlpData values from the txs - txsInterface := make([]interface{}, len(txs)) - for i, tx := range txs { - txsInterface[i] = tx.RlpData() - } - // Broadcast it back to the peer - p.QueueMessage(ethwire.NewMessage(ethwire.MsgTxTy, txsInterface)) + } - case ethwire.MsgGetBlockHashesTy: - if msg.Data.Len() < 2 { - peerlogger.Debugln("err: argument length invalid ", msg.Data.Len()) - } + // TMP + if p.statusKnown { + switch msg.Type { + case ethwire.MsgGetTxsTy: + // Get the current transactions of the pool + txs := p.ethereum.TxPool().CurrentTransactions() + // Get the RlpData values from the txs + txsInterface := make([]interface{}, len(txs)) + for i, tx := range txs { + txsInterface[i] = tx.RlpData() + } + // Broadcast it back to the peer + p.QueueMessage(ethwire.NewMessage(ethwire.MsgTxTy, txsInterface)) + + case ethwire.MsgGetBlockHashesTy: + if msg.Data.Len() < 2 { + peerlogger.Debugln("err: argument length invalid ", msg.Data.Len()) + } - hash := msg.Data.Get(0).Bytes() - amount := msg.Data.Get(1).Uint() + hash := msg.Data.Get(0).Bytes() + amount := msg.Data.Get(1).Uint() - hashes := p.ethereum.BlockChain().GetChainHashesFromHash(hash, amount) + hashes := p.ethereum.BlockChain().GetChainHashesFromHash(hash, amount) - p.QueueMessage(ethwire.NewMessage(ethwire.MsgBlockHashesTy, ethutil.ByteSliceToInterface(hashes))) + p.QueueMessage(ethwire.NewMessage(ethwire.MsgBlockHashesTy, ethutil.ByteSliceToInterface(hashes))) - case ethwire.MsgGetBlocksTy: - // Limit to max 300 blocks - max := int(math.Min(float64(msg.Data.Len()), 300.0)) - var blocks []interface{} + case ethwire.MsgGetBlocksTy: + // Limit to max 300 blocks + max := int(math.Min(float64(msg.Data.Len()), 300.0)) + var blocks []interface{} - for i := 0; i < max; i++ { - hash := msg.Data.Get(i).Bytes() - block := p.ethereum.BlockChain().GetBlock(hash) - if block != nil { - blocks = append(blocks, block.Value().Raw()) + for i := 0; i < max; i++ { + hash := msg.Data.Get(i).Bytes() + block := p.ethereum.BlockChain().GetBlock(hash) + if block != nil { + blocks = append(blocks, block.Value().Raw()) + } } - } - p.QueueMessage(ethwire.NewMessage(ethwire.MsgBlockTy, blocks)) + p.QueueMessage(ethwire.NewMessage(ethwire.MsgBlockTy, blocks)) - case ethwire.MsgBlockHashesTy: - p.catchingUp = true + case ethwire.MsgBlockHashesTy: + p.catchingUp = true - blockPool := p.ethereum.blockPool + blockPool := p.ethereum.blockPool - foundCommonHash := false + foundCommonHash := false - it := msg.Data.NewIterator() - for it.Next() { - hash := it.Value().Bytes() + it := msg.Data.NewIterator() + for it.Next() { + hash := it.Value().Bytes() - if blockPool.HasCommonHash(hash) { - foundCommonHash = true + if blockPool.HasCommonHash(hash) { + foundCommonHash = true - break - } + break + } - blockPool.AddHash(hash) + blockPool.AddHash(hash) - p.lastReceivedHash = hash + p.lastReceivedHash = hash - p.lastBlockReceived = time.Now() - } - - if foundCommonHash { - p.FetchBlocks() - } else { - p.FetchHashes() - } + p.lastBlockReceived = time.Now() + } - case ethwire.MsgBlockTy: - p.catchingUp = true + if foundCommonHash { + p.FetchBlocks() + } else { + p.FetchHashes() + } - blockPool := p.ethereum.blockPool + case ethwire.MsgBlockTy: + p.catchingUp = true - it := msg.Data.NewIterator() - for it.Next() { - block := ethchain.NewBlockFromRlpValue(it.Value()) - //fmt.Printf("%v %x - %x\n", block.Number, block.Hash()[0:4], block.PrevHash[0:4]) + blockPool := p.ethereum.blockPool - blockPool.SetBlock(block, p) + it := msg.Data.NewIterator() + for it.Next() { + block := ethchain.NewBlockFromRlpValue(it.Value()) + //fmt.Printf("%v %x - %x\n", block.Number, block.Hash()[0:4], block.PrevHash[0:4]) - p.lastBlockReceived = time.Now() - } + blockPool.SetBlock(block, p) - var err error - blockPool.CheckLinkAndProcess(func(block *ethchain.Block) { - err = p.ethereum.StateManager().Process(block, false) - }) + p.lastBlockReceived = time.Now() + } - if err != nil { - peerlogger.Infoln(err) - } else { - // Don't trigger if there's just one block. - if blockPool.Len() != 0 && msg.Data.Len() > 1 { - p.FetchBlocks() + var err error + blockPool.CheckLinkAndProcess(func(block *ethchain.Block) { + err = p.ethereum.StateManager().Process(block, false) + }) + + if err != nil { + peerlogger.Infoln(err) + } else { + // Don't trigger if there's just one block. + if blockPool.Len() != 0 && msg.Data.Len() > 1 { + p.FetchBlocks() + } } } } @@ -645,10 +661,9 @@ func (p *Peer) pushPeers() { } func (self *Peer) pushStatus() { - const netVersion = 0 msg := ethwire.NewMessage(ethwire.MsgStatusTy, []interface{}{ uint32(ProtocolVersion), - uint32(netVersion), + uint32(NetVersion), self.ethereum.BlockChain().TD, self.ethereum.BlockChain().CurrentBlock.Hash(), self.ethereum.BlockChain().Genesis().Hash(), @@ -669,7 +684,17 @@ func (self *Peer) handleStatus(msg *ethwire.Msg) { ) if bytes.Compare(self.ethereum.BlockChain().Genesis().Hash(), genesis) != 0 { - ethlogger.Warnf("Invalid genisis hash %x. Disabling [ETH]\n", genesis) + ethlogger.Warnf("Invalid genisis hash %x. Disabling [eth]\n", genesis) + return + } + + if netVersion != NetVersion { + ethlogger.Warnf("Invalid network version %d. Disabling [eth]\n", netVersion) + return + } + + if protoVersion != ProtocolVersion { + ethlogger.Warnf("Invalid protocol version %d. Disabling [eth]\n", protoVersion) return } @@ -687,7 +712,7 @@ func (self *Peer) handleStatus(msg *ethwire.Msg) { self.FetchHashes() } - ethlogger.Infof("Peer is [ETH] capable. (TD = %v ~ %x) %d / %d", self.td, self.bestHash, protoVersion, netVersion) + ethlogger.Infof("Peer is [eth] capable. (TD = %v ~ %x) %d / %d", self.td, self.bestHash, protoVersion, netVersion) } |