diff options
Diffstat (limited to 'peer.go')
-rw-r--r-- | peer.go | 180 |
1 files changed, 92 insertions, 88 deletions
@@ -25,6 +25,8 @@ const ( outputBufferSize = 50 // Current protocol version ProtocolVersion = 28 + // Current P2P version + P2PVersion = 0 // Interval for ping/pong message pingPongTimer = 2 * time.Second ) @@ -122,6 +124,7 @@ type Peer struct { // This flag is used by writeMessage to check if messages are allowed // to be send or not. If no version is known all messages are ignored. versionKnown bool + statusKnown bool // Last received pong message lastPong int64 @@ -271,6 +274,14 @@ func (p *Peer) writeMessage(msg *ethwire.Msg) { default: // Anything but ack is allowed return } + } else { + if !p.statusKnown { + switch msg.Type { + case ethwire.MsgStatusTy: // Ok + default: // Anything but ack is allowed + return + } + } } peerlogger.DebugDetailf("(%v) <= %v %v\n", p.conn.RemoteAddr(), msg.Type, msg.Data) @@ -356,9 +367,9 @@ func (p *Peer) HandleInbound() { // Version message p.handleHandshake(msg) - if p.caps.IsCap(CapPeerDiscTy) { - p.QueueMessage(ethwire.NewMessage(ethwire.MsgGetPeersTy, "")) - } + //if p.caps.IsCap(CapPeerDiscTy) { + p.QueueMessage(ethwire.NewMessage(ethwire.MsgGetPeersTy, "")) + //} case ethwire.MsgDiscTy: p.Stop() @@ -396,6 +407,10 @@ func (p *Peer) HandleInbound() { // Connect to the list of peers p.ethereum.ProcessPeerList(peers) + + case ethwire.MsgStatusTy: + // Handle peer's status msg + p.handleStatus(msg) case ethwire.MsgGetTxsTy: // Get the current transactions of the pool txs := p.ethereum.TxPool().CurrentTransactions() @@ -474,7 +489,7 @@ func (p *Peer) HandleInbound() { for it.Next() { block := ethchain.NewBlockFromRlpValue(it.Value()) - blockPool.SetBlock(block) + blockPool.SetBlock(block, p) p.lastBlockReceived = time.Now() } @@ -507,6 +522,7 @@ func (self *Peer) FetchHashes() { if self.td.Cmp(blockPool.td) >= 0 { peerlogger.Debugf("Requesting hashes from %x\n", self.lastReceivedHash) + blockPool.td = self.td if !blockPool.HasLatestHash() { self.QueueMessage(ethwire.NewMessage(ethwire.MsgGetBlockHashesTy, []interface{}{self.lastReceivedHash, uint32(200)})) @@ -580,6 +596,7 @@ func (p *Peer) Stop() { p.ethereum.RemovePeer(p) } +/* func (p *Peer) pushHandshake() error { pubkey := p.ethereum.KeyManager().PublicKey() msg := ethwire.NewMessage(ethwire.MsgHandshakeTy, []interface{}{ @@ -591,6 +608,7 @@ func (p *Peer) pushHandshake() error { return nil } +*/ func (p *Peer) peersMessage() *ethwire.Msg { outPeers := make([]interface{}, len(p.ethereum.InOutPeers())) @@ -611,13 +629,72 @@ func (p *Peer) pushPeers() { p.QueueMessage(p.peersMessage()) } +func (p *Peer) pushHandshake() error { + pubkey := p.ethereum.KeyManager().PublicKey() + msg := ethwire.NewMessage(ethwire.MsgHandshakeTy, []interface{}{ + uint32(0), []byte(p.version), []string{"eth"}, p.port, pubkey[1:], + }) + + p.QueueMessage(msg) + + return nil +} + +func (self *Peer) pushStatus() { + const netVersion = 0 + msg := ethwire.NewMessage(ethwire.MsgStatusTy, []interface{}{ + uint32(ProtocolVersion), + netVersion, + self.ethereum.BlockChain().TD.Uint64(), + self.ethereum.BlockChain().CurrentBlock.Hash(), + self.ethereum.BlockChain().Genesis().Hash(), + }) + + self.QueueMessage(msg) +} + +func (self *Peer) handleStatus(msg *ethwire.Msg) { + c := msg.Data + // Set the peer's caps + //p.caps = Caps(c.Get(3).Byte()) + + // Get the td and last hash + self.td = c.Get(6).BigInt() + self.bestHash = c.Get(7).Bytes() + self.lastReceivedHash = self.bestHash + + // Compare the total TD with the blockchain TD. If remote is higher + // fetch hashes from highest TD node. + if self.td.Cmp(self.ethereum.BlockChain().TD) > 0 { + self.ethereum.blockPool.AddHash(self.lastReceivedHash) + self.FetchHashes() + } + + ethlogger.Infof("Peer is [ETH] capable. (TD = %v ~ %x", self.td, self.bestHash) +} + func (p *Peer) handleHandshake(msg *ethwire.Msg) { c := msg.Data - // Set pubkey - p.pubkey = c.Get(5).Bytes() + var ( + p2pVersion = c.Get(0).Uint() + clientId = c.Get(1).Str() + caps = c.Get(2).Raw() + port = c.Get(3).Uint() + pub = c.Get(4).Bytes() + ) + + fmt.Println("PEER CAPS", caps) + + // Check correctness of p2p protocol version + if p2pVersion != P2PVersion { + peerlogger.Debugf("Invalid P2P version. Require protocol %d, received %d\n", P2PVersion, p2pVersion) + p.Stop() + return + } - if p.pubkey == nil { + // Handle the pub key (validation, uniqueness) + if len(pub) == 0 { peerlogger.Warnln("Pubkey required, not supplied in handshake.") p.Stop() return @@ -625,9 +702,8 @@ func (p *Peer) handleHandshake(msg *ethwire.Msg) { usedPub := 0 // This peer is already added to the peerlist so we expect to find a double pubkey at least once - eachPeer(p.ethereum.Peers(), func(peer *Peer, e *list.Element) { - if bytes.Compare(p.pubkey, peer.pubkey) == 0 { + if bytes.Compare(pub, peer.pubkey) == 0 { usedPub++ } }) @@ -637,19 +713,11 @@ func (p *Peer) handleHandshake(msg *ethwire.Msg) { p.Stop() return } - - if c.Get(0).Uint() != ProtocolVersion { - peerlogger.Debugf("Invalid peer version. Require protocol: %d. Received: %d\n", ProtocolVersion, c.Get(0).Uint()) - p.Stop() - return - } - - // [PROTOCOL_VERSION, NETWORK_ID, CLIENT_ID, CAPS, PORT, PUBKEY] - p.versionKnown = true + p.pubkey = pub // If this is an inbound connection send an ack back if p.inbound { - p.port = uint16(c.Get(4).Uint()) + p.port = uint16(port) // Self connect detection pubkey := p.ethereum.KeyManager().PublicKey() @@ -660,41 +728,18 @@ func (p *Peer) handleHandshake(msg *ethwire.Msg) { } } + p.SetVersion(clientId) - // Set the peer's caps - p.caps = Caps(c.Get(3).Byte()) - - // Get a reference to the peers version - versionString := c.Get(2).Str() - if len(versionString) > 0 { - p.SetVersion(c.Get(2).Str()) - } - - // Get the td and last hash - p.td = c.Get(6).BigInt() - p.bestHash = c.Get(7).Bytes() - p.lastReceivedHash = p.bestHash + p.versionKnown = true p.ethereum.PushPeer(p) p.ethereum.reactor.Post("peerList", p.ethereum.Peers()) - ethlogger.Infof("Added peer (%s) %d / %d (TD = %v ~ %x)\n", p.conn.RemoteAddr(), p.ethereum.Peers().Len(), p.ethereum.MaxPeers, p.td, p.bestHash) - - /* - // Catch up with the connected peer - if !p.ethereum.IsUpToDate() { - peerlogger.Debugln("Already syncing up with a peer; sleeping") - time.Sleep(10 * time.Second) - } - */ - //p.SyncWithPeerToLastKnown() - - if p.td.Cmp(p.ethereum.BlockChain().TD) > 0 { - p.ethereum.blockPool.AddHash(p.lastReceivedHash) - p.FetchHashes() - } + ethlogger.Infof("Added peer (%s) %d / %d \n", p.conn.RemoteAddr(), p.ethereum.Peers().Len(), p.ethereum.MaxPeers) peerlogger.Debugln(p) + + p.pushStatus() } func (p *Peer) String() string { @@ -714,47 +759,6 @@ func (p *Peer) String() string { return fmt.Sprintf("[%s] (%s) %v %s [%s]", strConnectType, strBoundType, p.conn.RemoteAddr(), p.version, p.caps) } -func (p *Peer) SyncWithPeerToLastKnown() { - p.catchingUp = false - p.CatchupWithPeer(p.ethereum.BlockChain().CurrentBlock.Hash()) -} - -func (p *Peer) FindCommonParentBlock() { - if p.catchingUp { - return - } - - p.catchingUp = true - if p.blocksRequested == 0 { - p.blocksRequested = 20 - } - blocks := p.ethereum.BlockChain().GetChain(p.ethereum.BlockChain().CurrentBlock.Hash(), p.blocksRequested) - - var hashes []interface{} - for _, block := range blocks { - hashes = append(hashes, block.Hash()) - } - - msgInfo := append(hashes, uint64(len(hashes))) - - peerlogger.DebugDetailf("Asking for block from %x (%d total) from %s\n", p.ethereum.BlockChain().CurrentBlock.Hash(), len(hashes), p.conn.RemoteAddr().String()) - - msg := ethwire.NewMessage(ethwire.MsgGetChainTy, msgInfo) - p.QueueMessage(msg) -} -func (p *Peer) CatchupWithPeer(blockHash []byte) { - if !p.catchingUp { - // Make sure nobody else is catching up when you want to do this - p.catchingUp = true - msg := ethwire.NewMessage(ethwire.MsgGetChainTy, []interface{}{blockHash, uint64(100)}) - p.QueueMessage(msg) - - peerlogger.DebugDetailf("Requesting blockchain %x... from peer %s\n", p.ethereum.BlockChain().CurrentBlock.Hash()[:4], p.conn.RemoteAddr()) - - msg = ethwire.NewMessage(ethwire.MsgGetTxsTy, []interface{}{}) - p.QueueMessage(msg) - } -} func (p *Peer) RlpData() []interface{} { return []interface{}{p.host, p.port, p.pubkey} |