aboutsummaryrefslogtreecommitdiffstats
path: root/eth/handler.go
diff options
context:
space:
mode:
authorPéter Szilágyi <peterke@gmail.com>2015-06-29 17:44:00 +0800
committerPéter Szilágyi <peterke@gmail.com>2015-07-01 00:00:01 +0800
commit6fc85f1ec221f976399af071a75ad7264b0ee905 (patch)
tree6ef805673f49ae9662f5c724f917f88bbba9a023 /eth/handler.go
parent2c8ed76e01161d9fe4e69064404cd888b4e327f3 (diff)
downloaddexon-6fc85f1ec221f976399af071a75ad7264b0ee905.tar
dexon-6fc85f1ec221f976399af071a75ad7264b0ee905.tar.gz
dexon-6fc85f1ec221f976399af071a75ad7264b0ee905.tar.bz2
dexon-6fc85f1ec221f976399af071a75ad7264b0ee905.tar.lz
dexon-6fc85f1ec221f976399af071a75ad7264b0ee905.tar.xz
dexon-6fc85f1ec221f976399af071a75ad7264b0ee905.tar.zst
dexon-6fc85f1ec221f976399af071a75ad7264b0ee905.zip
eth: clean up peer struct a bit, fix double txn bcast
Diffstat (limited to 'eth/handler.go')
-rw-r--r--eth/handler.go29
1 files changed, 16 insertions, 13 deletions
diff --git a/eth/handler.go b/eth/handler.go
index d25118337..d0456446d 100644
--- a/eth/handler.go
+++ b/eth/handler.go
@@ -158,9 +158,7 @@ func (pm *ProtocolManager) Stop() {
}
func (pm *ProtocolManager) newPeer(pv, nv int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
- td, current, genesis := pm.chainman.Status()
-
- return newPeer(pv, nv, genesis, current, td, p, rw)
+ return newPeer(pv, nv, p, rw)
}
// handle is the callback invoked to manage the life cycle of an eth peer. When
@@ -169,7 +167,8 @@ func (pm *ProtocolManager) handle(p *peer) error {
glog.V(logger.Debug).Infof("%v: peer connected", p)
// Execute the Ethereum handshake
- if err := p.handleStatus(); err != nil {
+ td, head, genesis := pm.chainman.Status()
+ if err := p.Handshake(td, head, genesis); err != nil {
glog.V(logger.Debug).Infof("%v: handshake failed: %v", p, err)
return err
}
@@ -182,7 +181,7 @@ func (pm *ProtocolManager) handle(p *peer) error {
defer pm.removePeer(p.id)
// Register the peer in the downloader. If the downloader considers it banned, we disconnect
- if err := pm.downloader.RegisterPeer(p.id, p.Head(), p.requestHashes, p.requestBlocks); err != nil {
+ if err := pm.downloader.RegisterPeer(p.id, p.Head(), p.RequestHashes, p.RequestBlocks); err != nil {
return err
}
// Propagate existing transactions. new transactions appearing
@@ -225,9 +224,13 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
propTxnInPacketsMeter.Mark(1)
for i, tx := range txs {
+ // Validate and mark the remote transaction
if tx == nil {
return errResp(ErrDecode, "transaction %d is nil", i)
}
+ p.MarkTransaction(tx.Hash())
+
+ // Log it's arrival for later analysis
propTxnInTrafficMeter.Mark(tx.Size().Int64())
jsonlogger.LogJson(&logger.EthTxReceived{
TxHash: tx.Hash().Hex(),
@@ -255,7 +258,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
// returns either requested hashes or nothing (i.e. not found)
- return p.sendBlockHashes(hashes)
+ return p.SendBlockHashes(hashes)
case BlockHashesMsg:
// A batch of hashes arrived to one of our previous requests
@@ -314,7 +317,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
glog.Infof("%v: no blocks found for requested hashes %s", p, list)
}
- return p.sendBlocks(blocks)
+ return p.SendBlocks(blocks)
case BlocksMsg:
// Decode the arrived block message
@@ -349,7 +352,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
// Mark the hashes as present at the remote node
for _, hash := range hashes {
- p.blockHashes.Add(hash)
+ p.MarkBlock(hash)
p.SetHead(hash)
}
// Schedule all the unknown hashes for retrieval
@@ -360,7 +363,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
}
for _, hash := range unknown {
- pm.fetcher.Notify(p.id, hash, time.Now(), p.requestBlocks)
+ pm.fetcher.Notify(p.id, hash, time.Now(), p.RequestBlocks)
}
case NewBlockMsg:
@@ -387,7 +390,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
RemoteId: p.ID().String(),
})
// Mark the peer as owning the block and schedule it for import
- p.blockHashes.Add(request.Block.Hash())
+ p.MarkBlock(request.Block.Hash())
p.SetHead(request.Block.Hash())
pm.fetcher.Enqueue(p.id, request.Block)
@@ -412,14 +415,14 @@ func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) {
if propagate {
transfer := peers[:int(math.Sqrt(float64(len(peers))))]
for _, peer := range transfer {
- peer.sendNewBlock(block)
+ peer.SendNewBlock(block)
}
glog.V(logger.Detail).Infof("propagated block %x to %d peers in %v", hash[:4], len(transfer), time.Since(block.ReceivedAt))
}
// Otherwise if the block is indeed in out own chain, announce it
if pm.chainman.HasBlock(hash) {
for _, peer := range peers {
- peer.sendNewBlockHashes([]common.Hash{hash})
+ peer.SendNewBlockHashes([]common.Hash{hash})
}
glog.V(logger.Detail).Infof("announced block %x to %d peers in %v", hash[:4], len(peers), time.Since(block.ReceivedAt))
}
@@ -432,7 +435,7 @@ func (pm *ProtocolManager) BroadcastTx(hash common.Hash, tx *types.Transaction)
peers := pm.peers.PeersWithoutTx(hash)
//FIXME include this again: peers = peers[:int(math.Sqrt(float64(len(peers))))]
for _, peer := range peers {
- peer.sendTransactions(types.Transactions{tx})
+ peer.SendTransactions(types.Transactions{tx})
}
glog.V(logger.Detail).Infoln("broadcast tx to", len(peers), "peers")
}