diff options
Diffstat (limited to 'peer.go')
-rw-r--r-- | peer.go | 243 |
1 files changed, 129 insertions, 114 deletions
@@ -42,6 +42,9 @@ type Peer struct { // Indicates whether a MsgGetPeersTy was requested of the peer // this to prevent receiving false peers. requestedPeerList bool + + // Determines whether this is a seed peer + seed bool } func NewPeer(conn net.Conn, ethereum *Ethereum, inbound bool) *Peer { @@ -81,9 +84,7 @@ func NewOutboundPeer(addr string, ethereum *Ethereum) *Peer { atomic.StoreInt32(&p.connected, 1) atomic.StoreInt32(&p.disconnect, 0) - log.Println("Connected to peer ::", conn.RemoteAddr()) - - p.Start() + p.Start(false) }() return p @@ -115,6 +116,14 @@ func (p *Peer) writeMessage(msg *ethwire.Msg) { p.Stop() return } + + // XXX TMP CODE FOR TESTNET + switch msg.Type { + case ethwire.MsgPeersTy: + if p.seed { + p.Stop() + } + } } // Outbound message handler. Outbound messages are handled here @@ -133,7 +142,7 @@ out: case <-tickleTimer.C: p.writeMessage(ethwire.NewMessage(ethwire.MsgPingTy, "")) - // Break out of the for loop if a quit message is posted + // Break out of the for loop if a quit message is posted case <-p.quit: break out } @@ -157,113 +166,118 @@ func (p *Peer) HandleInbound() { out: for atomic.LoadInt32(&p.disconnect) == 0 { // Wait for a message from the peer - msg, err := ethwire.ReadMessage(p.conn) - if err != nil { - log.Println(err) + msgs, err := ethwire.ReadMessages(p.conn) + for _, msg := range msgs { + if err != nil { + log.Println(err) - break out - } - - if ethutil.Config.Debug { - log.Printf("Received %s\n", msg.Type.String()) - } + break out + } - switch msg.Type { - case ethwire.MsgHandshakeTy: - // Version message - p.handleHandshake(msg) - case ethwire.MsgDiscTy: - p.Stop() - case ethwire.MsgPingTy: - // Respond back with pong - p.QueueMessage(ethwire.NewMessage(ethwire.MsgPongTy, "")) - case ethwire.MsgPongTy: - // If we received a pong back from a peer we set the - // last pong so the peer handler knows this peer is still - // active. - p.lastPong = time.Now().Unix() - case ethwire.MsgBlockTy: - // Get all blocks and process them (TODO reverse order?) - msg.Data = msg.Data.Get(0) - for i := msg.Data.Length() - 1; i >= 0; i-- { - block := ethchain.NewBlockFromRlpValue(msg.Data.Get(i)) - err := p.ethereum.BlockManager.ProcessBlock(block) - - if err != nil { - log.Println(err) + switch msg.Type { + case ethwire.MsgHandshakeTy: + // Version message + p.handleHandshake(msg) + case ethwire.MsgDiscTy: + p.Stop() + case ethwire.MsgPingTy: + // Respond back with pong + p.QueueMessage(ethwire.NewMessage(ethwire.MsgPongTy, "")) + case ethwire.MsgPongTy: + // If we received a pong back from a peer we set the + // last pong so the peer handler knows this peer is still + // active. + p.lastPong = time.Now().Unix() + case ethwire.MsgBlockTy: + // Get all blocks and process them + msg.Data = msg.Data + for i := msg.Data.Length() - 1; i >= 0; i-- { + block := ethchain.NewBlockFromRlpValue(msg.Data.Get(i)) + err := p.ethereum.BlockManager.ProcessBlock(block) + + if err != nil { + log.Println(err) + } } - } - case ethwire.MsgTxTy: - // If the message was a transaction queue the transaction - // in the TxPool where it will undergo validation and - // processing when a new block is found - for i := 0; i < msg.Data.Length(); i++ { - p.ethereum.TxPool.QueueTransaction(ethchain.NewTransactionFromRlpValue(msg.Data.Get(i))) - } - case ethwire.MsgGetPeersTy: - // Flag this peer as a 'requested of new peers' this to - // prevent malicious peers being forced. - p.requestedPeerList = true - // Peer asked for list of connected peers - p.pushPeers() - case ethwire.MsgPeersTy: - // Received a list of peers (probably because MsgGetPeersTy was send) - // Only act on message if we actually requested for a peers list - if p.requestedPeerList { - data := ethutil.Conv(msg.Data) - // Create new list of possible peers for the ethereum to process - peers := make([]string, data.Length()) - // Parse each possible peer - for i := 0; i < data.Length(); i++ { - peers[i] = data.Get(i).AsString() + strconv.Itoa(int(data.Get(i).AsUint())) + case ethwire.MsgTxTy: + // If the message was a transaction queue the transaction + // in the TxPool where it will undergo validation and + // processing when a new block is found + for i := 0; i < msg.Data.Length(); i++ { + p.ethereum.TxPool.QueueTransaction(ethchain.NewTransactionFromRlpValue(msg.Data.Get(i))) } + case ethwire.MsgGetPeersTy: + // Flag this peer as a 'requested of new peers' this to + // prevent malicious peers being forced. + p.requestedPeerList = true + // Peer asked for list of connected peers + p.pushPeers() + case ethwire.MsgPeersTy: + // Received a list of peers (probably because MsgGetPeersTy was send) + // Only act on message if we actually requested for a peers list + if p.requestedPeerList { + data := ethutil.Conv(msg.Data) + // Create new list of possible peers for the ethereum to process + peers := make([]string, data.Length()) + // Parse each possible peer + for i := 0; i < data.Length(); i++ { + peers[i] = data.Get(i).AsString() + strconv.Itoa(int(data.Get(i).AsUint())) + } + + // Connect to the list of peers + p.ethereum.ProcessPeerList(peers) + // Mark unrequested again + p.requestedPeerList = false - // Connect to the list of peers - p.ethereum.ProcessPeerList(peers) - // Mark unrequested again - p.requestedPeerList = false - } - case ethwire.MsgGetChainTy: - var parent *ethchain.Block - // FIXME - msg.Data = msg.Data.Get(0) - // Length minus one since the very last element in the array is a count - l := msg.Data.Length() - 1 - // Amount of parents in the canonical chain - amountOfBlocks := msg.Data.Get(l).AsUint() - // Check each SHA block hash from the message and determine whether - // the SHA is in the database - for i := 0; i < l; i++ { - if data := msg.Data.Get(i).AsBytes(); p.ethereum.BlockManager.BlockChain().HasBlock(data) { - parent = p.ethereum.BlockManager.BlockChain().GetBlock(data) + } + case ethwire.MsgGetChainTy: + var parent *ethchain.Block + // Length minus one since the very last element in the array is a count + l := msg.Data.Length() - 1 + // Ignore empty get chains + if l <= 1 { break } - } - // If a parent is found send back a reply - if parent != nil { - chain := p.ethereum.BlockManager.BlockChain().GetChainFromHash(parent.Hash(), amountOfBlocks) - p.QueueMessage(ethwire.NewMessage(ethwire.MsgBlockTy, chain)) - } else { - // If no blocks are found we send back a reply with msg not in chain - // and the last hash from get chain - lastHash := msg.Data.Get(l) - p.QueueMessage(ethwire.NewMessage(ethwire.MsgNotInChainTy, lastHash.AsRaw())) - } - case ethwire.MsgNotInChainTy: - log.Println("Not in chain, not yet implemented") - // TODO + // Amount of parents in the canonical chain + amountOfBlocks := msg.Data.Get(l).AsUint() + // Check each SHA block hash from the message and determine whether + // the SHA is in the database + for i := 0; i < l; i++ { + if data := msg.Data.Get(i).AsBytes(); p.ethereum.BlockManager.BlockChain().HasBlock(data) { + parent = p.ethereum.BlockManager.BlockChain().GetBlock(data) + break + } + } + + // If a parent is found send back a reply + if parent != nil { + chain := p.ethereum.BlockManager.BlockChain().GetChainFromHash(parent.Hash(), amountOfBlocks) + p.QueueMessage(ethwire.NewMessage(ethwire.MsgBlockTy, append(chain, amountOfBlocks))) + } else { + // If no blocks are found we send back a reply with msg not in chain + // and the last hash from get chain + lastHash := msg.Data.Get(l - 1) + log.Printf("Sending not in chain with hash %x\n", lastHash.AsRaw()) + p.QueueMessage(ethwire.NewMessage(ethwire.MsgNotInChainTy, []interface{}{lastHash.AsRaw()})) + } + case ethwire.MsgNotInChainTy: + log.Printf("Not in chain %x\n", msg.Data) + // TODO - // Unofficial but fun nonetheless - case ethwire.MsgTalkTy: - log.Printf("%v says: %s\n", p.conn.RemoteAddr(), msg.Data.Get(0).AsString()) + // Unofficial but fun nonetheless + case ethwire.MsgTalkTy: + log.Printf("%v says: %s\n", p.conn.RemoteAddr(), msg.Data.AsString()) + } } } p.Stop() } -func (p *Peer) Start() { +func (p *Peer) Start(seed bool) { + p.seed = seed + if !p.inbound { err := p.pushHandshake() if err != nil { @@ -277,6 +291,7 @@ func (p *Peer) Start() { go p.HandleOutbound() // Run the inbound handler in a new goroutine go p.HandleInbound() + } func (p *Peer) Stop() { @@ -294,9 +309,9 @@ func (p *Peer) Stop() { } func (p *Peer) pushHandshake() error { - msg := ethwire.NewMessage(ethwire.MsgHandshakeTy, ethutil.Encode([]interface{}{ - 1, 0, p.ethereum.Nonce, - })) + msg := ethwire.NewMessage(ethwire.MsgHandshakeTy, []interface{}{ + uint32(0), uint32(0), "/Ethereum(G) v0.0.1/", + }) p.QueueMessage(msg) @@ -305,6 +320,7 @@ func (p *Peer) pushHandshake() error { // Pushes the list of outbound peers to the client when requested func (p *Peer) pushPeers() { + outPeers := make([]interface{}, len(p.ethereum.OutboundPeers())) // Serialise each peer for i, peer := range p.ethereum.OutboundPeers() { @@ -312,7 +328,7 @@ func (p *Peer) pushPeers() { } // Send message to the peer with the known list of connected clients - msg := ethwire.NewMessage(ethwire.MsgPeersTy, ethutil.Encode(outPeers)) + msg := ethwire.NewMessage(ethwire.MsgPeersTy, outPeers) p.QueueMessage(msg) } @@ -320,29 +336,28 @@ func (p *Peer) pushPeers() { func (p *Peer) handleHandshake(msg *ethwire.Msg) { c := msg.Data // [PROTOCOL_VERSION, NETWORK_ID, CLIENT_ID] - if c.Get(2).AsUint() == p.ethereum.Nonce { - //if msg.Nonce == p.ethereum.Nonce { - log.Println("Peer connected to self, disconnecting") - - p.Stop() - - return - } - p.versionKnown = true + var istr string // If this is an inbound connection send an ack back if p.inbound { - err := p.pushHandshake() - if err != nil { - log.Println("Peer can't send ack back") + /* + err := p.pushHandshake() + if err != nil { + log.Println("Peer can't send ack back") - p.Stop() - } + p.Stop() + } + */ + istr = "inbound" } else { msg := ethwire.NewMessage(ethwire.MsgGetChainTy, []interface{}{p.ethereum.BlockManager.BlockChain().CurrentBlock.Hash(), uint64(100)}) p.QueueMessage(msg) + + istr = "outbound" } + + log.Printf("peer connect (%s) %v %s\n", istr, p.conn.RemoteAddr(), c.Get(2).AsString()) } func (p *Peer) RlpEncode() []byte { |