diff options
author | zelig <viktor.tron@gmail.com> | 2014-07-31 00:03:20 +0800 |
---|---|---|
committer | zelig <viktor.tron@gmail.com> | 2014-07-31 00:03:20 +0800 |
commit | 9831619881c5264c2449ce1b906108d892b6e1e1 (patch) | |
tree | f8775b1196fe6b31081a553945377f40bdc8d550 /peer.go | |
parent | 194c58858cd230a9a08b0eb14650720341a5580e (diff) | |
parent | 42d47ecfb09ac0b419db5722602d9b02e21f2457 (diff) | |
download | dexon-9831619881c5264c2449ce1b906108d892b6e1e1.tar dexon-9831619881c5264c2449ce1b906108d892b6e1e1.tar.gz dexon-9831619881c5264c2449ce1b906108d892b6e1e1.tar.bz2 dexon-9831619881c5264c2449ce1b906108d892b6e1e1.tar.lz dexon-9831619881c5264c2449ce1b906108d892b6e1e1.tar.xz dexon-9831619881c5264c2449ce1b906108d892b6e1e1.tar.zst dexon-9831619881c5264c2449ce1b906108d892b6e1e1.zip |
merge upstream
Diffstat (limited to 'peer.go')
-rw-r--r-- | peer.go | 161 |
1 files changed, 99 insertions, 62 deletions
@@ -4,15 +4,16 @@ import ( "bytes" "container/list" "fmt" - "github.com/ethereum/eth-go/ethchain" - "github.com/ethereum/eth-go/ethlog" - "github.com/ethereum/eth-go/ethutil" - "github.com/ethereum/eth-go/ethwire" "net" "strconv" "strings" "sync/atomic" "time" + + "github.com/ethereum/eth-go/ethchain" + "github.com/ethereum/eth-go/ethlog" + "github.com/ethereum/eth-go/ethutil" + "github.com/ethereum/eth-go/ethwire" ) var peerlogger = ethlog.NewLogger("PEER") @@ -21,7 +22,7 @@ const ( // The size of the output buffer for writing messages outputBufferSize = 50 // Current protocol version - ProtocolVersion = 23 + ProtocolVersion = 25 // Interval for ping/pong message pingPongTimer = 2 * time.Second ) @@ -121,10 +122,8 @@ type Peer struct { versionKnown bool // Last received pong message - lastPong int64 - // Indicates whether a MsgGetPeersTy was requested of the peer - // this to prevent receiving false peers. - requestedPeerList bool + lastPong int64 + lastBlockReceived time.Time host []byte port uint16 @@ -180,10 +179,9 @@ func NewOutboundPeer(addr string, ethereum *Ethereum, caps Caps) *Peer { // Set up the connection in another goroutine so we don't block the main thread go func() { - conn, err := net.DialTimeout("tcp", addr, 10*time.Second) - + conn, err := p.Connect(addr) if err != nil { - peerlogger.Debugln("Connection to peer failed", err) + peerlogger.Debugln("Connection to peer failed. Giving up.", err) p.Stop() return } @@ -199,6 +197,23 @@ func NewOutboundPeer(addr string, ethereum *Ethereum, caps Caps) *Peer { return p } +func (self *Peer) Connect(addr string) (conn net.Conn, err error) { + const maxTries = 3 + for attempts := 0; attempts < maxTries; attempts++ { + conn, err = net.DialTimeout("tcp", addr, 10*time.Second) + if err != nil { + //peerlogger.Debugf("Peer connection failed. Retrying (%d/%d) (%s)\n", attempts+1, maxTries, addr) + time.Sleep(time.Duration(attempts*20) * time.Second) + continue + } + + // Success + return + } + + return +} + // Getters func (p *Peer) PingTime() string { return p.pingTime.String() @@ -279,12 +294,14 @@ out: // Ping timer case <-pingTimer.C: - timeSince := time.Since(time.Unix(p.lastPong, 0)) - if !p.pingStartTime.IsZero() && p.lastPong != 0 && timeSince > (pingPongTimer+30*time.Second) { - peerlogger.Infof("Peer did not respond to latest pong fast enough, it took %s, disconnecting.\n", timeSince) - p.Stop() - return - } + /* + timeSince := time.Since(time.Unix(p.lastPong, 0)) + if !p.pingStartTime.IsZero() && p.lastPong != 0 && timeSince > (pingPongTimer+30*time.Second) { + peerlogger.Infof("Peer did not respond to latest pong fast enough, it took %s, disconnecting.\n", timeSince) + p.Stop() + return + } + */ p.writeMessage(ethwire.NewMessage(ethwire.MsgPingTy, "")) p.pingStartTime = time.Now() @@ -339,7 +356,7 @@ func (p *Peer) HandleInbound() { } case ethwire.MsgDiscTy: p.Stop() - peerlogger.Infoln("Disconnect peer:", DiscReason(msg.Data.Get(0).Uint())) + peerlogger.Infoln("Disconnect peer: ", DiscReason(msg.Data.Get(0).Uint())) case ethwire.MsgPingTy: // Respond back with pong p.QueueMessage(ethwire.NewMessage(ethwire.MsgPongTy, "")) @@ -348,11 +365,17 @@ func (p *Peer) HandleInbound() { // last pong so the peer handler knows this peer is still // active. p.lastPong = time.Now().Unix() - p.pingTime = time.Now().Sub(p.pingStartTime) + p.pingTime = time.Since(p.pingStartTime) case ethwire.MsgBlockTy: // Get all blocks and process them - var block, lastBlock *ethchain.Block - var err error + //var block, lastBlock *ethchain.Block + //var err error + + var ( + block, lastBlock *ethchain.Block + blockChain = p.ethereum.BlockChain() + err error + ) // Make sure we are actually receiving anything if msg.Data.Len()-1 > 1 && p.diverted { @@ -368,11 +391,11 @@ func (p *Peer) HandleInbound() { for i := msg.Data.Len() - 1; i >= 0; i-- { block = ethchain.NewBlockFromRlpValue(msg.Data.Get(i)) // Do we have this block on our chain? If so we can continue - if !p.ethereum.StateManager().BlockChain().HasBlock(block.Hash()) { + if !blockChain.HasBlock(block.Hash()) { // We don't have this block, but we do have a block with the same prevHash, diversion time! - if p.ethereum.StateManager().BlockChain().HasBlockWithPrevHash(block.PrevHash) { + if blockChain.HasBlockWithPrevHash(block.PrevHash) { p.diverted = false - if !p.ethereum.StateManager().BlockChain().FindCanonicalChainFromMsg(msg, block.PrevHash) { + if !blockChain.FindCanonicalChainFromMsg(msg, block.PrevHash) { p.SyncWithPeerToLastKnown() break nextMsg } @@ -397,10 +420,7 @@ func (p *Peer) HandleInbound() { for i := msg.Data.Len() - 1; i >= 0; i-- { block = ethchain.NewBlockFromRlpValue(msg.Data.Get(i)) - //p.ethereum.StateManager().PrepareDefault(block) - //state := p.ethereum.StateManager().CurrentState() err = p.ethereum.StateManager().Process(block, false) - if err != nil { if ethutil.Config.Debug { peerlogger.Infof("Block %x failed\n", block.Hash()) @@ -411,6 +431,8 @@ func (p *Peer) HandleInbound() { } else { lastBlock = block } + + p.lastBlockReceived = time.Now() } if msg.Data.Len() <= 1 { @@ -422,19 +444,20 @@ func (p *Peer) HandleInbound() { if err != nil { // If the parent is unknown try to catch up with this peer if ethchain.IsParentErr(err) { - /* - b := ethchain.NewBlockFromRlpValue(msg.Data.Get(0)) + b := ethchain.NewBlockFromRlpValue(msg.Data.Get(0)) + + peerlogger.Infof("Attempting to catch (%x). Parent unknown\n", b.Hash()) + p.catchingUp = false - peerlogger.Infof("Attempting to catch (%x). Parent known\n", b.Hash()) - p.catchingUp = false + p.CatchupWithPeer(b.Hash()) - p.CatchupWithPeer(b.Hash()) + peerlogger.Infoln(b) - peerlogger.Infoln(b) + /* + peerlogger.Infoln("Attempting to catch. Parent known") + p.catchingUp = false + p.CatchupWithPeer(p.ethereum.BlockChain().CurrentBlock.Hash()) */ - peerlogger.Infoln("Attempting to catch. Parent known") - p.catchingUp = false - p.CatchupWithPeer(p.ethereum.BlockChain().CurrentBlock.Hash()) } else if ethchain.IsValidationErr(err) { fmt.Println("Err:", err) p.catchingUp = false @@ -463,30 +486,21 @@ func (p *Peer) HandleInbound() { p.ethereum.TxPool().QueueTransaction(tx) } case ethwire.MsgGetPeersTy: - // Flag this peer as a 'requested of new peers' this to - // prevent malicious peers being forced. - p.requestedPeerList = true // Peer asked for list of connected peers p.pushPeers() case ethwire.MsgPeersTy: // Received a list of peers (probably because MsgGetPeersTy was send) - // Only act on message if we actually requested for a peers list - if p.requestedPeerList { - data := msg.Data - // Create new list of possible peers for the ethereum to process - peers := make([]string, data.Len()) - // Parse each possible peer - for i := 0; i < data.Len(); i++ { - value := data.Get(i) - peers[i] = unpackAddr(value.Get(0), value.Get(1).Uint()) - } - - // Connect to the list of peers - p.ethereum.ProcessPeerList(peers) - // Mark unrequested again - p.requestedPeerList = false - + data := msg.Data + // Create new list of possible peers for the ethereum to process + peers := make([]string, data.Len()) + // Parse each possible peer + for i := 0; i < data.Len(); i++ { + value := data.Get(i) + peers[i] = unpackAddr(value.Get(0), value.Get(1).Uint()) } + + // Connect to the list of peers + p.ethereum.ProcessPeerList(peers) case ethwire.MsgGetChainTy: var parent *ethchain.Block // Length minus one since the very last element in the array is a count @@ -559,6 +573,25 @@ func (p *Peer) HandleInbound() { p.Stop() } +// General update method +func (self *Peer) update() { + serviceTimer := time.NewTicker(5 * time.Second) + +out: + for { + select { + case <-serviceTimer.C: + if time.Since(self.lastBlockReceived) > 10*time.Second { + self.catchingUp = false + } + case <-self.quit: + break out + } + } + + serviceTimer.Stop() +} + func (p *Peer) Start() { peerHost, peerPort, _ := net.SplitHostPort(p.conn.LocalAddr().String()) servHost, servPort, _ := net.SplitHostPort(p.conn.RemoteAddr().String()) @@ -581,6 +614,8 @@ func (p *Peer) Start() { go p.HandleOutbound() // Run the inbound handler in a new goroutine go p.HandleInbound() + // Run the general update handler + go p.update() // Wait a few seconds for startup and then ask for an initial ping time.Sleep(2 * time.Second) @@ -698,11 +733,13 @@ func (p *Peer) handleHandshake(msg *ethwire.Msg) { ethlogger.Infof("Added peer (%s) %d / %d\n", p.conn.RemoteAddr(), p.ethereum.Peers().Len(), p.ethereum.MaxPeers) - // Catch up with the connected peer - if !p.ethereum.IsUpToDate() { - peerlogger.Debugln("Already syncing up with a peer; sleeping") - time.Sleep(10 * time.Second) - } + /* + // Catch up with the connected peer + if !p.ethereum.IsUpToDate() { + peerlogger.Debugln("Already syncing up with a peer; sleeping") + time.Sleep(10 * time.Second) + } + */ p.SyncWithPeerToLastKnown() peerlogger.Debugln(p) @@ -757,7 +794,7 @@ func (p *Peer) CatchupWithPeer(blockHash []byte) { if !p.catchingUp { // Make sure nobody else is catching up when you want to do this p.catchingUp = true - msg := ethwire.NewMessage(ethwire.MsgGetChainTy, []interface{}{blockHash, uint64(30)}) + msg := ethwire.NewMessage(ethwire.MsgGetChainTy, []interface{}{blockHash, uint64(100)}) p.QueueMessage(msg) peerlogger.DebugDetailf("Requesting blockchain %x... from peer %s\n", p.ethereum.BlockChain().CurrentBlock.Hash()[:4], p.conn.RemoteAddr()) |