diff options
author | obscuren <geffobscura@gmail.com> | 2014-02-13 22:12:16 +0800 |
---|---|---|
committer | obscuren <geffobscura@gmail.com> | 2014-02-13 22:12:16 +0800 |
commit | c2fb9f06ad018d01ce335c82b3542de16045a32d (patch) | |
tree | b3315405b2a3b06816d0640deaabdd899242e445 /peer.go | |
parent | 67de76c217f4ff4f2111e5f578b35fb162d64916 (diff) | |
download | go-tangerine-c2fb9f06ad018d01ce335c82b3542de16045a32d.tar go-tangerine-c2fb9f06ad018d01ce335c82b3542de16045a32d.tar.gz go-tangerine-c2fb9f06ad018d01ce335c82b3542de16045a32d.tar.bz2 go-tangerine-c2fb9f06ad018d01ce335c82b3542de16045a32d.tar.lz go-tangerine-c2fb9f06ad018d01ce335c82b3542de16045a32d.tar.xz go-tangerine-c2fb9f06ad018d01ce335c82b3542de16045a32d.tar.zst go-tangerine-c2fb9f06ad018d01ce335c82b3542de16045a32d.zip |
Refactoring RlpValue => Value
Diffstat (limited to 'peer.go')
-rw-r--r-- | peer.go | 123 |
1 files changed, 80 insertions, 43 deletions
@@ -1,7 +1,7 @@ package eth import ( - _ "bytes" + "bytes" "fmt" "github.com/ethereum/ethchain-go" "github.com/ethereum/ethutil-go" @@ -123,9 +123,14 @@ type Peer struct { // Indicated whether the node is catching up or not catchingUp bool + + Version string } func NewPeer(conn net.Conn, ethereum *Ethereum, inbound bool) *Peer { + data, _ := ethutil.Config.Db.Get([]byte("KeyRing")) + pubkey := ethutil.NewValueFromBytes(data).Get(2).Bytes() + return &Peer{ outputQueue: make(chan *ethwire.Msg, outputBufferSize), quit: make(chan bool), @@ -135,6 +140,7 @@ func NewPeer(conn net.Conn, ethereum *Ethereum, inbound bool) *Peer { disconnect: 0, connected: 1, port: 30303, + pubkey: pubkey, } } @@ -148,6 +154,7 @@ func NewOutboundPeer(addr string, ethereum *Ethereum, caps Caps) *Peer { connected: 0, disconnect: 0, caps: caps, + Version: fmt.Sprintf("/Ethereum(G) v%s/%s", ethutil.Config.Ver, runtime.GOOS), } // Set up the connection in another goroutine so we don't block the main thread @@ -267,7 +274,7 @@ func (p *Peer) HandleInbound() { } case ethwire.MsgDiscTy: p.Stop() - log.Println("Disconnect peer:", DiscReason(msg.Data.Get(0).AsUint())) + log.Println("Disconnect peer:", DiscReason(msg.Data.Get(0).Uint())) case ethwire.MsgPingTy: // Respond back with pong p.QueueMessage(ethwire.NewMessage(ethwire.MsgPongTy, "")) @@ -279,30 +286,47 @@ func (p *Peer) HandleInbound() { case ethwire.MsgBlockTy: // Get all blocks and process them msg.Data = msg.Data - var block *ethchain.Block - for i := msg.Data.Length() - 1; i >= 0; i-- { - block = ethchain.NewBlockFromRlpValue(ethutil.NewValue(msg.Data.Get(i).AsRaw())) - err := p.ethereum.BlockManager.ProcessBlock(block) + var block, lastBlock *ethchain.Block + var err error + for i := msg.Data.Len() - 1; i >= 0; i-- { + block = ethchain.NewBlockFromRlpValue(msg.Data.Get(i)) + err = p.ethereum.BlockManager.ProcessBlock(block) if err != nil { log.Println(err) + break + } else { + lastBlock = block } } - // If we're catching up, try to catch up further. - if p.catchingUp && msg.Data.Length() > 1 { - if ethutil.Config.Debug { - blockInfo := p.ethereum.BlockManager.BlockChain().BlockInfo(block) - log.Printf("Synced to block height #%d\n", blockInfo.Number) + if err != nil { + // If the parent is unknown try to catch up with this peer + if ethchain.IsParentErr(err) { + log.Println("Attempting to catch up") + p.catchingUp = false + p.CatchupWithPeer() + } + if ethchain.IsValidationErr(err) { + // TODO + } + } else { + // XXX Do we want to catch up if there were errors? + // If we're catching up, try to catch up further. + if p.catchingUp && msg.Data.Len() > 1 { + if ethutil.Config.Debug && lastBlock != nil { + blockInfo := lastBlock.BlockInfo() + log.Printf("Synced to block height #%d %x %x\n", blockInfo.Number, lastBlock.Hash(), blockInfo.Hash) + } + p.catchingUp = false + p.CatchupWithPeer() } - p.catchingUp = false - p.CatchupWithPeer() } case ethwire.MsgTxTy: // If the message was a transaction queue the transaction // in the TxPool where it will undergo validation and // processing when a new block is found - for i := 0; i < msg.Data.Length(); i++ { + for i := 0; i < msg.Data.Len(); i++ { p.ethereum.TxPool.QueueTransaction(ethchain.NewTransactionFromData(msg.Data.Get(i).Encode())) } case ethwire.MsgGetPeersTy: @@ -317,10 +341,10 @@ func (p *Peer) HandleInbound() { //if p.requestedPeerList { data := msg.Data // Create new list of possible peers for the ethereum to process - peers := make([]string, data.Length()) + peers := make([]string, data.Len()) // Parse each possible peer - for i := 0; i < data.Length(); i++ { - value := ethutil.NewValue(data.Get(i).AsRaw()) + for i := 0; i < data.Len(); i++ { + value := data.Get(i) peers[i] = unpackAddr(value.Get(0), value.Get(1).Uint()) } @@ -333,7 +357,7 @@ func (p *Peer) HandleInbound() { case ethwire.MsgGetChainTy: var parent *ethchain.Block // Length minus one since the very last element in the array is a count - l := msg.Data.Length() - 1 + l := msg.Data.Len() - 1 // Ignore empty get chains if l == 0 { break @@ -345,7 +369,7 @@ func (p *Peer) HandleInbound() { // Check each SHA block hash from the message and determine whether // the SHA is in the database for i := 0; i < l; i++ { - if data := msg.Data.Get(i).AsBytes(); p.ethereum.BlockManager.BlockChain().HasBlock(data) { + if data := msg.Data.Get(i).Bytes(); p.ethereum.BlockManager.BlockChain().HasBlock(data) { parent = p.ethereum.BlockManager.BlockChain().GetBlock(data) break } @@ -359,8 +383,8 @@ func (p *Peer) HandleInbound() { // If no blocks are found we send back a reply with msg not in chain // and the last hash from get chain lastHash := msg.Data.Get(l - 1) - log.Printf("Sending not in chain with hash %x\n", lastHash.AsRaw()) - p.QueueMessage(ethwire.NewMessage(ethwire.MsgNotInChainTy, []interface{}{lastHash.AsRaw()})) + //log.Printf("Sending not in chain with hash %x\n", lastHash.AsRaw()) + p.QueueMessage(ethwire.NewMessage(ethwire.MsgNotInChainTy, []interface{}{lastHash.Raw()})) } case ethwire.MsgNotInChainTy: log.Printf("Not in chain %x\n", msg.Data) @@ -368,7 +392,7 @@ func (p *Peer) HandleInbound() { // Unofficial but fun nonetheless case ethwire.MsgTalkTy: - log.Printf("%v says: %s\n", p.conn.RemoteAddr(), msg.Data.AsString()) + log.Printf("%v says: %s\n", p.conn.RemoteAddr(), msg.Data.Str()) } } } @@ -441,9 +465,8 @@ func (p *Peer) pushHandshake() error { data, _ := ethutil.Config.Db.Get([]byte("KeyRing")) pubkey := ethutil.NewValueFromBytes(data).Get(2).Bytes() - clientId := fmt.Sprintf("/Ethereum(G) v%s/%s", ethutil.Config.Ver, runtime.GOOS) msg := ethwire.NewMessage(ethwire.MsgHandshakeTy, []interface{}{ - uint32(4), uint32(0), clientId, byte(p.caps), p.port, pubkey, + uint32(4), uint32(0), p.Version, byte(p.caps), p.port, pubkey, }) p.QueueMessage(msg) @@ -470,7 +493,7 @@ func (p *Peer) pushPeers() { func (p *Peer) handleHandshake(msg *ethwire.Msg) { c := msg.Data - if c.Get(0).AsUint() != 4 { + if c.Get(0).Uint() != 4 { log.Println("Invalid peer version. Require protocol v4") p.Stop() return @@ -479,35 +502,49 @@ func (p *Peer) handleHandshake(msg *ethwire.Msg) { // [PROTOCOL_VERSION, NETWORK_ID, CLIENT_ID, CAPS, PORT, PUBKEY] p.versionKnown = true - var istr string // If this is an inbound connection send an ack back if p.inbound { - p.pubkey = c.Get(5).AsBytes() - p.port = uint16(c.Get(4).AsUint()) + p.pubkey = c.Get(5).Bytes() + p.port = uint16(c.Get(4).Uint()) // Self connect detection - /* - data, _ := ethutil.Config.Db.Get([]byte("KeyRing")) - pubkey := ethutil.NewValueFromBytes(data).Get(2).Bytes() - if bytes.Compare(pubkey, p.pubkey) == 0 { - p.Stop() + data, _ := ethutil.Config.Db.Get([]byte("KeyRing")) + pubkey := ethutil.NewValueFromBytes(data).Get(2).Bytes() + if bytes.Compare(pubkey, p.pubkey) == 0 { + p.Stop() - return - } - */ + return + } + + } - p.CatchupWithPeer() + // Catch up with the connected peer + p.CatchupWithPeer() - istr = "inbound" - } else { - p.CatchupWithPeer() + // Set the peer's caps + p.caps = Caps(c.Get(3).Byte()) + // Get a reference to the peers version + p.Version = c.Get(2).Str() - istr = "outbound" + log.Println(p) +} + +func (p *Peer) String() string { + var strBoundType string + if p.inbound { + strBoundType = "inbound" + } else { + strBoundType = "outbound" + } + var strConnectType string + if atomic.LoadInt32(&p.disconnect) == 0 { + strConnectType = "connected" + } else { + strConnectType = "disconnected" } - p.caps = Caps(c.Get(3).AsByte()) + return fmt.Sprintf("peer [%s] (%s) %v %s [%s]", strConnectType, strBoundType, p.conn.RemoteAddr(), p.Version, p.caps) - log.Printf("peer connect (%s) %v %s [%s]\n", istr, p.conn.RemoteAddr(), c.Get(2).AsString(), p.caps) } func (p *Peer) CatchupWithPeer() { |