aboutsummaryrefslogtreecommitdiffstats
path: root/eth/handler.go
diff options
context:
space:
mode:
Diffstat (limited to 'eth/handler.go')
-rw-r--r--eth/handler.go196
1 files changed, 73 insertions, 123 deletions
diff --git a/eth/handler.go b/eth/handler.go
index 47a36cc0b..9ad430976 100644
--- a/eth/handler.go
+++ b/eth/handler.go
@@ -45,6 +45,10 @@ const (
estHeaderRlpSize = 500 // Approximate size of an RLP encoded block header
)
+var (
+ daoChallengeTimeout = 15 * time.Second // Time allowance for a node to reply to the DAO handshake challenge
+)
+
// errIncompatibleConfig is returned if the requested protocols and configs are
// not compatible (low protocol version restrictions and high requirements).
var errIncompatibleConfig = errors.New("incompatible configuration")
@@ -53,18 +57,16 @@ func errResp(code errCode, format string, v ...interface{}) error {
return fmt.Errorf("%v - %v", code, fmt.Sprintf(format, v...))
}
-type hashFetcherFn func(common.Hash) error
-type blockFetcherFn func([]common.Hash) error
-
type ProtocolManager struct {
networkId int
fastSync uint32 // Flag whether fast sync is enabled (gets disabled if we already have blocks)
synced uint32 // Flag whether we're considered synchronised (enables transaction processing)
- txpool txPool
- blockchain *core.BlockChain
- chaindb ethdb.Database
+ txpool txPool
+ blockchain *core.BlockChain
+ chaindb ethdb.Database
+ chainconfig *core.ChainConfig
downloader *downloader.Downloader
fetcher *fetcher.Fetcher
@@ -99,6 +101,7 @@ func NewProtocolManager(config *core.ChainConfig, fastSync bool, networkId int,
txpool: txpool,
blockchain: blockchain,
chaindb: chaindb,
+ chainconfig: config,
peers: newPeerSet(),
newPeerCh: make(chan *peer),
noMorePeers: make(chan struct{}),
@@ -269,15 +272,36 @@ func (pm *ProtocolManager) handle(p *peer) error {
defer pm.removePeer(p.id)
// Register the peer in the downloader. If the downloader considers it banned, we disconnect
- if err := pm.downloader.RegisterPeer(p.id, p.version, p.Head(),
- p.RequestHashes, p.RequestHashesFromNumber, p.RequestBlocks, p.RequestHeadersByHash,
- p.RequestHeadersByNumber, p.RequestBodies, p.RequestReceipts, p.RequestNodeData); err != nil {
+ err := pm.downloader.RegisterPeer(p.id, p.version, p.Head(),
+ p.RequestHeadersByHash, p.RequestHeadersByNumber,
+ p.RequestBodies, p.RequestReceipts, p.RequestNodeData,
+ )
+ if err != nil {
return err
}
// Propagate existing transactions. new transactions appearing
// after this will be sent via broadcasts.
pm.syncTransactions(p)
+ // If we're DAO hard-fork aware, validate any remote peer with regard to the hard-fork
+ if daoBlock := pm.chainconfig.DAOForkBlock; daoBlock != nil {
+ // Request the peer's DAO fork header for extra-data validation
+ if err := p.RequestHeadersByNumber(daoBlock.Uint64(), 1, 0, false); err != nil {
+ return err
+ }
+ // Start a timer to disconnect if the peer doesn't reply in time
+ p.forkDrop = time.AfterFunc(daoChallengeTimeout, func() {
+ glog.V(logger.Warn).Infof("%v: timed out DAO fork-check, dropping", p)
+ pm.removePeer(p.id)
+ })
+ // Make sure it's cleaned up if the peer dies off
+ defer func() {
+ if p.forkDrop != nil {
+ p.forkDrop.Stop()
+ p.forkDrop = nil
+ }
+ }()
+ }
// main loop. handle incoming messages.
for {
if err := pm.handleMsg(p); err != nil {
@@ -306,108 +330,8 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
// Status messages should never arrive after the handshake
return errResp(ErrExtraStatusMsg, "uncontrolled status message")
- case p.version < eth62 && msg.Code == GetBlockHashesMsg:
- // Retrieve the number of hashes to return and from which origin hash
- var request getBlockHashesData
- if err := msg.Decode(&request); err != nil {
- return errResp(ErrDecode, "%v: %v", msg, err)
- }
- if request.Amount > uint64(downloader.MaxHashFetch) {
- request.Amount = uint64(downloader.MaxHashFetch)
- }
- // Retrieve the hashes from the block chain and return them
- hashes := pm.blockchain.GetBlockHashesFromHash(request.Hash, request.Amount)
- if len(hashes) == 0 {
- glog.V(logger.Debug).Infof("invalid block hash %x", request.Hash.Bytes()[:4])
- }
- return p.SendBlockHashes(hashes)
-
- case p.version < eth62 && msg.Code == GetBlockHashesFromNumberMsg:
- // Retrieve and decode the number of hashes to return and from which origin number
- var request getBlockHashesFromNumberData
- if err := msg.Decode(&request); err != nil {
- return errResp(ErrDecode, "%v: %v", msg, err)
- }
- if request.Amount > uint64(downloader.MaxHashFetch) {
- request.Amount = uint64(downloader.MaxHashFetch)
- }
- // Calculate the last block that should be retrieved, and short circuit if unavailable
- last := pm.blockchain.GetBlockByNumber(request.Number + request.Amount - 1)
- if last == nil {
- last = pm.blockchain.CurrentBlock()
- request.Amount = last.NumberU64() - request.Number + 1
- }
- if last.NumberU64() < request.Number {
- return p.SendBlockHashes(nil)
- }
- // Retrieve the hashes from the last block backwards, reverse and return
- hashes := []common.Hash{last.Hash()}
- hashes = append(hashes, pm.blockchain.GetBlockHashesFromHash(last.Hash(), request.Amount-1)...)
-
- for i := 0; i < len(hashes)/2; i++ {
- hashes[i], hashes[len(hashes)-1-i] = hashes[len(hashes)-1-i], hashes[i]
- }
- return p.SendBlockHashes(hashes)
-
- case p.version < eth62 && msg.Code == BlockHashesMsg:
- // A batch of hashes arrived to one of our previous requests
- var hashes []common.Hash
- if err := msg.Decode(&hashes); err != nil {
- break
- }
- // Deliver them all to the downloader for queuing
- err := pm.downloader.DeliverHashes(p.id, hashes)
- if err != nil {
- glog.V(logger.Debug).Infoln(err)
- }
-
- case p.version < eth62 && msg.Code == GetBlocksMsg:
- // Decode the retrieval message
- msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
- if _, err := msgStream.List(); err != nil {
- return err
- }
- // Gather blocks until the fetch or network limits is reached
- var (
- hash common.Hash
- bytes common.StorageSize
- blocks []*types.Block
- )
- for len(blocks) < downloader.MaxBlockFetch && bytes < softResponseLimit {
- //Retrieve the hash of the next block
- err := msgStream.Decode(&hash)
- if err == rlp.EOL {
- break
- } else if err != nil {
- return errResp(ErrDecode, "msg %v: %v", msg, err)
- }
- // Retrieve the requested block, stopping if enough was found
- if block := pm.blockchain.GetBlockByHash(hash); block != nil {
- blocks = append(blocks, block)
- bytes += block.Size()
- }
- }
- return p.SendBlocks(blocks)
-
- case p.version < eth62 && msg.Code == BlocksMsg:
- // Decode the arrived block message
- var blocks []*types.Block
- if err := msg.Decode(&blocks); err != nil {
- glog.V(logger.Detail).Infoln("Decode error", err)
- blocks = nil
- }
- // Update the receive timestamp of each block
- for _, block := range blocks {
- block.ReceivedAt = msg.ReceivedAt
- block.ReceivedFrom = p
- }
- // Filter out any explicitly requested blocks, deliver the rest to the downloader
- if blocks := pm.fetcher.FilterBlocks(blocks); len(blocks) > 0 {
- pm.downloader.DeliverBlocks(p.id, blocks)
- }
-
// Block header query, collect the requested headers and reply
- case p.version >= eth62 && msg.Code == GetBlockHeadersMsg:
+ case msg.Code == GetBlockHeadersMsg:
// Decode the complex header query
var query getBlockHeadersData
if err := msg.Decode(&query); err != nil {
@@ -475,15 +399,49 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
return p.SendBlockHeaders(headers)
- case p.version >= eth62 && msg.Code == BlockHeadersMsg:
+ case msg.Code == BlockHeadersMsg:
// A batch of headers arrived to one of our previous requests
var headers []*types.Header
if err := msg.Decode(&headers); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
+ // If no headers were received, but we're expending a DAO fork check, maybe it's that
+ if len(headers) == 0 && p.forkDrop != nil {
+ // Possibly an empty reply to the fork header checks, sanity check TDs
+ verifyDAO := true
+
+ // If we already have a DAO header, we can check the peer's TD against it. If
+ // the peer's ahead of this, it too must have a reply to the DAO check
+ if daoHeader := pm.blockchain.GetHeaderByNumber(pm.chainconfig.DAOForkBlock.Uint64()); daoHeader != nil {
+ if p.Td().Cmp(pm.blockchain.GetTd(daoHeader.Hash(), daoHeader.Number.Uint64())) >= 0 {
+ verifyDAO = false
+ }
+ }
+ // If we're seemingly on the same chain, disable the drop timer
+ if verifyDAO {
+ glog.V(logger.Debug).Infof("%v: seems to be on the same side of the DAO fork", p)
+ p.forkDrop.Stop()
+ p.forkDrop = nil
+ return nil
+ }
+ }
// Filter out any explicitly requested headers, deliver the rest to the downloader
filter := len(headers) == 1
if filter {
+ // If it's a potential DAO fork check, validate against the rules
+ if p.forkDrop != nil && pm.chainconfig.DAOForkBlock.Cmp(headers[0].Number) == 0 {
+ // Disable the fork drop timer
+ p.forkDrop.Stop()
+ p.forkDrop = nil
+
+ // Validate the header and either drop the peer or continue
+ if err := core.ValidateDAOHeaderExtraData(pm.chainconfig, headers[0]); err != nil {
+ glog.V(logger.Debug).Infof("%v: verified to be on the other side of the DAO fork, dropping", p)
+ return err
+ }
+ glog.V(logger.Debug).Infof("%v: verified to be on the same side of the DAO fork", p)
+ }
+ // Irrelevant of the fork checks, send the header to the fetcher just in case
headers = pm.fetcher.FilterHeaders(headers, time.Now())
}
if len(headers) > 0 || !filter {
@@ -493,7 +451,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
}
- case p.version >= eth62 && msg.Code == GetBlockBodiesMsg:
+ case msg.Code == GetBlockBodiesMsg:
// Decode the retrieval message
msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
if _, err := msgStream.List(); err != nil {
@@ -520,7 +478,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
return p.SendBlockBodiesRLP(bodies)
- case p.version >= eth62 && msg.Code == BlockBodiesMsg:
+ case msg.Code == BlockBodiesMsg:
// A batch of block bodies arrived to one of our previous requests
var request blockBodiesData
if err := msg.Decode(&request); err != nil {
@@ -671,11 +629,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
}
for _, block := range unknown {
- if p.version < eth62 {
- pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestBlocks, nil, nil)
- } else {
- pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), nil, p.RequestOneHeader, p.RequestBodies)
- }
+ pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestOneHeader, p.RequestBodies)
}
case msg.Code == NewBlockMsg:
@@ -757,11 +711,7 @@ func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) {
// Otherwise if the block is indeed in out own chain, announce it
if pm.blockchain.HasBlock(hash) {
for _, peer := range peers {
- if peer.version < eth62 {
- peer.SendNewBlockHashes61([]common.Hash{hash})
- } else {
- peer.SendNewBlockHashes([]common.Hash{hash}, []uint64{block.NumberU64()})
- }
+ peer.SendNewBlockHashes([]common.Hash{hash}, []uint64{block.NumberU64()})
}
glog.V(logger.Detail).Infof("announced block %x to %d peers in %v", hash[:4], len(peers), time.Since(block.ReceivedAt))
}