aboutsummaryrefslogtreecommitdiffstats
path: root/eth/handler.go
diff options
context:
space:
mode:
authorPéter Szilágyi <peterke@gmail.com>2015-08-15 02:25:41 +0800
committerPéter Szilágyi <peterke@gmail.com>2015-08-25 22:48:47 +0800
commit47a7fe5d22fe2a6be783f6576070814fe951eaaf (patch)
tree61f2f691c6775fa5ae3547b8d769a709b7b3f04c /eth/handler.go
parentca88e18f59af84f34ad67da21fd27a6407eea87c (diff)
downloaddexon-47a7fe5d22fe2a6be783f6576070814fe951eaaf.tar
dexon-47a7fe5d22fe2a6be783f6576070814fe951eaaf.tar.gz
dexon-47a7fe5d22fe2a6be783f6576070814fe951eaaf.tar.bz2
dexon-47a7fe5d22fe2a6be783f6576070814fe951eaaf.tar.lz
dexon-47a7fe5d22fe2a6be783f6576070814fe951eaaf.tar.xz
dexon-47a7fe5d22fe2a6be783f6576070814fe951eaaf.tar.zst
dexon-47a7fe5d22fe2a6be783f6576070814fe951eaaf.zip
eth: port the synchronisation algo to eth/62
Diffstat (limited to 'eth/handler.go')
-rw-r--r--eth/handler.go62
1 files changed, 56 insertions, 6 deletions
diff --git a/eth/handler.go b/eth/handler.go
index 25ff0eef0..e7404e36a 100644
--- a/eth/handler.go
+++ b/eth/handler.go
@@ -201,7 +201,9 @@ func (pm *ProtocolManager) handle(p *peer) error {
defer pm.removePeer(p.id)
// Register the peer in the downloader. If the downloader considers it banned, we disconnect
- if err := pm.downloader.RegisterPeer(p.id, p.version, p.Head(), p.RequestHashes, p.RequestHashesFromNumber, p.RequestBlocks); err != nil {
+ if err := pm.downloader.RegisterPeer(p.id, p.version, p.Head(),
+ p.RequestHashes, p.RequestHashesFromNumber, p.RequestBlocks,
+ p.RequestHeadersByHash, p.RequestHeadersByNumber, p.RequestBodies); err != nil {
return err
}
// Propagate existing transactions. new transactions appearing
@@ -287,7 +289,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
break
}
// Deliver them all to the downloader for queuing
- err := pm.downloader.DeliverHashes(p.id, hashes)
+ err := pm.downloader.DeliverHashes61(p.id, hashes)
if err != nil {
glog.V(logger.Debug).Infoln(err)
}
@@ -332,8 +334,8 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
block.ReceivedAt = msg.ReceivedAt
}
// Filter out any explicitly requested blocks, deliver the rest to the downloader
- if blocks := pm.fetcher.Filter(blocks); len(blocks) > 0 {
- pm.downloader.DeliverBlocks(p.id, blocks)
+ if blocks := pm.fetcher.FilterBlocks(blocks); len(blocks) > 0 {
+ pm.downloader.DeliverBlocks61(p.id, blocks)
}
// Block header query, collect the requested headers and reply
@@ -401,6 +403,46 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
return p.SendBlockHeaders(headers)
+ case p.version >= eth62 && msg.Code == BlockHeadersMsg:
+ // A batch of headers arrived to one of our previous requests
+ var headers []*types.Header
+ if err := msg.Decode(&headers); err != nil {
+ return errResp(ErrDecode, "msg %v: %v", msg, err)
+ }
+ // Filter out any explicitly requested headers, deliver the rest to the downloader
+ filter := len(headers) == 1
+ if filter {
+ headers = pm.fetcher.FilterHeaders(headers, time.Now())
+ }
+ if len(headers) > 0 || !filter {
+ err := pm.downloader.DeliverHeaders(p.id, headers)
+ if err != nil {
+ glog.V(logger.Debug).Infoln(err)
+ }
+ }
+
+ case p.version >= eth62 && msg.Code == BlockBodiesMsg:
+ // A batch of block bodies arrived to one of our previous requests
+ var request blockBodiesData
+ if err := msg.Decode(&request); err != nil {
+ return errResp(ErrDecode, "msg %v: %v", msg, err)
+ }
+ // Deliver them all to the downloader for queuing
+ trasactions := make([][]*types.Transaction, len(request))
+ uncles := make([][]*types.Header, len(request))
+
+ for i, body := range request {
+ trasactions[i] = body.Transactions
+ uncles[i] = body.Uncles
+ }
+ // Filter out any explicitly requested bodies, deliver the rest to the downloader
+ if trasactions, uncles := pm.fetcher.FilterBodies(trasactions, uncles, time.Now()); len(trasactions) > 0 || len(uncles) > 0 {
+ err := pm.downloader.DeliverBodies(p.id, trasactions, uncles)
+ if err != nil {
+ glog.V(logger.Debug).Infoln(err)
+ }
+ }
+
case p.version >= eth62 && msg.Code == GetBlockBodiesMsg:
// Decode the retrieval message
msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
@@ -522,7 +564,11 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
}
for _, block := range unknown {
- pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestBlocks)
+ if p.version < eth62 {
+ pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestBlocks, nil, nil)
+ } else {
+ pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), nil, p.RequestOneHeader, p.RequestBodies)
+ }
}
case msg.Code == NewBlockMsg:
@@ -612,7 +658,11 @@ func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) {
// Otherwise if the block is indeed in out own chain, announce it
if pm.chainman.HasBlock(hash) {
for _, peer := range peers {
- peer.SendNewBlockHashes([]common.Hash{hash})
+ if peer.version < eth62 {
+ peer.SendNewBlockHashes61([]common.Hash{hash})
+ } else {
+ peer.SendNewBlockHashes([]common.Hash{hash}, []uint64{block.NumberU64()})
+ }
}
glog.V(logger.Detail).Infof("announced block %x to %d peers in %v", hash[:4], len(peers), time.Since(block.ReceivedAt))
}