From d8787230faa07e078a183765271313a7d2c6bdf2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Tue, 16 Apr 2019 13:20:38 +0300 Subject: eth, les, light: enforce CHT checkpoints on fast-sync too --- eth/downloader/downloader.go | 20 +++-- eth/downloader/downloader_test.go | 42 +++++++++- eth/handler.go | 77 ++++++++++--------- eth/handler_test.go | 156 +++++++++++++++++++++++++------------- eth/peer.go | 2 +- eth/sync.go | 2 - 6 files changed, 196 insertions(+), 103 deletions(-) (limited to 'eth') diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 4db689f73..0ef833bb8 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -75,6 +75,7 @@ var ( errUnknownPeer = errors.New("peer is unknown or unhealthy") errBadPeer = errors.New("action from bad peer ignored") errStallingPeer = errors.New("peer is stalling") + errUnsyncedPeer = errors.New("unsynced peer") errNoPeers = errors.New("no peers to keep download active") errTimeout = errors.New("timeout") errEmptyHeaderSet = errors.New("empty header set by peer") @@ -99,10 +100,11 @@ type Downloader struct { mode SyncMode // Synchronisation mode defining the strategy used (per sync cycle) mux *event.TypeMux // Event multiplexer to announce sync operation events - genesis uint64 // Genesis block number to limit sync to (e.g. light client CHT) - queue *queue // Scheduler for selecting the hashes to download - peers *peerSet // Set of active peers from which download can proceed - stateDB ethdb.Database + checkpoint uint64 // Checkpoint block number to enforce head against (e.g. fast sync) + genesis uint64 // Genesis block number to limit sync to (e.g. light client CHT) + queue *queue // Scheduler for selecting the hashes to download + peers *peerSet // Set of active peers from which download can proceed + stateDB ethdb.Database rttEstimate uint64 // Round trip time to target for download requests rttConfidence uint64 // Confidence in the estimated RTT (unit: millionths to allow atomic ops) @@ -205,15 +207,15 @@ type BlockChain interface { } // New creates a new downloader to fetch hashes and blocks from remote peers. -func New(mode SyncMode, stateDb ethdb.Database, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn) *Downloader { +func New(mode SyncMode, checkpoint uint64, stateDb ethdb.Database, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn) *Downloader { if lightchain == nil { lightchain = chain } - dl := &Downloader{ mode: mode, stateDB: stateDb, mux: mux, + checkpoint: checkpoint, queue: newQueue(), peers: newPeerSet(), rttEstimate: uint64(rttMaxEstimate), @@ -326,7 +328,7 @@ func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode case nil: case errBusy: - case errTimeout, errBadPeer, errStallingPeer, + case errTimeout, errBadPeer, errStallingPeer, errUnsyncedPeer, errEmptyHeaderSet, errPeersUnavailable, errTooOld, errInvalidAncestor, errInvalidChain: log.Warn("Synchronisation failed, dropping peer", "peer", id, "err", err) @@ -577,6 +579,10 @@ func (d *Downloader) fetchHeight(p *peerConnection) (*types.Header, error) { return nil, errBadPeer } head := headers[0] + if d.mode == FastSync && head.Number.Uint64() < d.checkpoint { + p.log.Warn("Remote head below checkpoint", "number", head.Number, "hash", head.Hash()) + return nil, errUnsyncedPeer + } p.log.Debug("Remote head header identified", "number", head.Number, "hash", head.Hash()) return head, nil diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 1a42965d3..6db534219 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -26,7 +26,7 @@ import ( "testing" "time" - ethereum "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethdb" @@ -73,7 +73,8 @@ func newTester() *downloadTester { } tester.stateDb = ethdb.NewMemDatabase() tester.stateDb.Put(testGenesis.Root().Bytes(), []byte{0x00}) - tester.downloader = New(FullSync, tester.stateDb, new(event.TypeMux), tester, nil, tester.dropPeer) + + tester.downloader = New(FullSync, 0, tester.stateDb, new(event.TypeMux), tester, nil, tester.dropPeer) return tester } @@ -1049,6 +1050,7 @@ func testBlockHeaderAttackerDropping(t *testing.T, protocol int) { {errUnknownPeer, false}, // Peer is unknown, was already dropped, don't double drop {errBadPeer, true}, // Peer was deemed bad for some reason, drop it {errStallingPeer, true}, // Peer was detected to be stalling, drop it + {errUnsyncedPeer, true}, // Peer was detected to be unsynced, drop it {errNoPeers, false}, // No peers to download from, soft race, no issue {errTimeout, true}, // No hashes received in due time, drop the peer {errEmptyHeaderSet, true}, // No headers were returned as a response, drop as it's a dead end @@ -1567,3 +1569,39 @@ func TestRemoteHeaderRequestSpan(t *testing.T) { } } } + +// Tests that peers below a pre-configured checkpoint block are prevented from +// being fast-synced from, avoiding potential cheap eclipse attacks. +func TestCheckpointEnforcement62(t *testing.T) { testCheckpointEnforcement(t, 62, FullSync) } +func TestCheckpointEnforcement63Full(t *testing.T) { testCheckpointEnforcement(t, 63, FullSync) } +func TestCheckpointEnforcement63Fast(t *testing.T) { testCheckpointEnforcement(t, 63, FastSync) } +func TestCheckpointEnforcement64Full(t *testing.T) { testCheckpointEnforcement(t, 64, FullSync) } +func TestCheckpointEnforcement64Fast(t *testing.T) { testCheckpointEnforcement(t, 64, FastSync) } +func TestCheckpointEnforcement64Light(t *testing.T) { testCheckpointEnforcement(t, 64, LightSync) } + +func testCheckpointEnforcement(t *testing.T, protocol int, mode SyncMode) { + t.Parallel() + + // Create a new tester with a particular hard coded checkpoint block + tester := newTester() + defer tester.terminate() + + tester.downloader.checkpoint = uint64(fsMinFullBlocks) + 256 + chain := testChainBase.shorten(int(tester.downloader.checkpoint) - 1) + + // Attempt to sync with the peer and validate the result + tester.newPeer("peer", protocol, chain) + + var expect error + if mode == FastSync { + expect = errUnsyncedPeer + } + if err := tester.sync("peer", nil, mode); err != expect { + t.Fatalf("block sync error mismatch: have %v, want %v", err, expect) + } + if mode == FastSync { + assertOwnChain(t, tester, 1) + } else { + assertOwnChain(t, tester, chain.len()) + } +} diff --git a/eth/handler.go b/eth/handler.go index b42612a56..2c0ea1eb1 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -28,7 +28,6 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus" - "github.com/ethereum/go-ethereum/consensus/misc" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/eth/downloader" @@ -55,7 +54,7 @@ const ( ) var ( - daoChallengeTimeout = 15 * time.Second // Time allowance for a node to reply to the DAO handshake challenge + syncChallengeTimeout = 15 * time.Second // Time allowance for a node to reply to the sync progress challenge ) // errIncompatibleConfig is returned if the requested protocols and configs are @@ -72,6 +71,9 @@ type ProtocolManager struct { fastSync uint32 // Flag whether fast sync is enabled (gets disabled if we already have blocks) acceptTxs uint32 // Flag whether we're considered synchronised (enables transaction processing) + checkpointNumber uint64 // Block number for the sync progress validator to cross reference + checkpointHash common.Hash // Block hash for the sync progress validator to cross reference + txpool txPool blockchain *core.BlockChain chainconfig *params.ChainConfig @@ -126,6 +128,11 @@ func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, ne if mode == downloader.FastSync { manager.fastSync = uint32(1) } + // If we have trusted checkpoints, enforce them on the chain + if checkpoint, ok := params.TrustedCheckpoints[blockchain.Genesis().Hash()]; ok { + manager.checkpointNumber = (checkpoint.SectionIndex+1)*params.CHTFrequencyClient - 1 + manager.checkpointHash = checkpoint.SectionHead + } // Initiate a sub-protocol for every implemented version we can handle manager.SubProtocols = make([]p2p.Protocol, 0, len(ProtocolVersions)) for i, version := range ProtocolVersions { @@ -165,7 +172,7 @@ func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, ne return nil, errIncompatibleConfig } // Construct the different synchronisation mechanisms - manager.downloader = downloader.New(mode, chaindb, manager.eventMux, blockchain, nil, manager.removePeer) + manager.downloader = downloader.New(mode, manager.checkpointNumber, chaindb, manager.eventMux, blockchain, nil, manager.removePeer) validator := func(header *types.Header) error { return engine.VerifyHeader(blockchain, header, true) @@ -291,22 +298,22 @@ func (pm *ProtocolManager) handle(p *peer) error { // after this will be sent via broadcasts. pm.syncTransactions(p) - // If we're DAO hard-fork aware, validate any remote peer with regard to the hard-fork - if daoBlock := pm.chainconfig.DAOForkBlock; daoBlock != nil { - // Request the peer's DAO fork header for extra-data validation - if err := p.RequestHeadersByNumber(daoBlock.Uint64(), 1, 0, false); err != nil { + // If we have a trusted CHT, reject all peers below that (avoid fast sync eclipse) + if pm.checkpointHash != (common.Hash{}) { + // Request the peer's checkpoint header for chain height/weight validation + if err := p.RequestHeadersByNumber(pm.checkpointNumber, 1, 0, false); err != nil { return err } // Start a timer to disconnect if the peer doesn't reply in time - p.forkDrop = time.AfterFunc(daoChallengeTimeout, func() { - p.Log().Debug("Timed out DAO fork-check, dropping") + p.syncDrop = time.AfterFunc(syncChallengeTimeout, func() { + p.Log().Warn("Checkpoint challenge timed out, dropping", "addr", p.RemoteAddr(), "type", p.Name()) pm.removePeer(p.id) }) // Make sure it's cleaned up if the peer dies off defer func() { - if p.forkDrop != nil { - p.forkDrop.Stop() - p.forkDrop = nil + if p.syncDrop != nil { + p.syncDrop.Stop() + p.syncDrop = nil } }() } @@ -438,41 +445,33 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { if err := msg.Decode(&headers); err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) } - // If no headers were received, but we're expending a DAO fork check, maybe it's that - if len(headers) == 0 && p.forkDrop != nil { - // Possibly an empty reply to the fork header checks, sanity check TDs - verifyDAO := true - - // If we already have a DAO header, we can check the peer's TD against it. If - // the peer's ahead of this, it too must have a reply to the DAO check - if daoHeader := pm.blockchain.GetHeaderByNumber(pm.chainconfig.DAOForkBlock.Uint64()); daoHeader != nil { - if _, td := p.Head(); td.Cmp(pm.blockchain.GetTd(daoHeader.Hash(), daoHeader.Number.Uint64())) >= 0 { - verifyDAO = false - } - } - // If we're seemingly on the same chain, disable the drop timer - if verifyDAO { - p.Log().Debug("Seems to be on the same side of the DAO fork") - p.forkDrop.Stop() - p.forkDrop = nil - return nil + // If no headers were received, but we're expencting a checkpoint header, consider it that + if len(headers) == 0 && p.syncDrop != nil { + // Stop the timer either way, decide later to drop or not + p.syncDrop.Stop() + p.syncDrop = nil + + // If we're doing a fast sync, we must enforce the checkpoint block to avoid + // eclipse attacks. Unsynced nodes are welcome to connect after we're done + // joining the network + if atomic.LoadUint32(&pm.fastSync) == 1 { + p.Log().Warn("Dropping unsynced node during fast sync", "addr", p.RemoteAddr(), "type", p.Name()) + return errors.New("unsynced node cannot serve fast sync") } } // Filter out any explicitly requested headers, deliver the rest to the downloader filter := len(headers) == 1 if filter { - // If it's a potential DAO fork check, validate against the rules - if p.forkDrop != nil && pm.chainconfig.DAOForkBlock.Cmp(headers[0].Number) == 0 { - // Disable the fork drop timer - p.forkDrop.Stop() - p.forkDrop = nil + // If it's a potential sync progress check, validate the content and advertised chain weight + if p.syncDrop != nil && headers[0].Number.Uint64() == pm.checkpointNumber { + // Disable the sync drop timer + p.syncDrop.Stop() + p.syncDrop = nil // Validate the header and either drop the peer or continue - if err := misc.VerifyDAOHeaderExtraData(pm.chainconfig, headers[0]); err != nil { - p.Log().Debug("Verified to be on the other side of the DAO fork, dropping") - return err + if headers[0].Hash() != pm.checkpointHash { + return errors.New("checkpoint hash mismatch") } - p.Log().Debug("Verified to be on the same side of the DAO fork") return nil } // Otherwise if it's a whitelisted block, validate against the set diff --git a/eth/handler_test.go b/eth/handler_test.go index 9fffd9581..2b6b59165 100644 --- a/eth/handler_test.go +++ b/eth/handler_test.go @@ -449,79 +449,131 @@ func testGetReceipt(t *testing.T, protocol int) { } } -// Tests that post eth protocol handshake, DAO fork-enabled clients also execute -// a DAO "challenge" verifying each others' DAO fork headers to ensure they're on -// compatible chains. -func TestDAOChallengeNoVsNo(t *testing.T) { testDAOChallenge(t, false, false, false) } -func TestDAOChallengeNoVsPro(t *testing.T) { testDAOChallenge(t, false, true, false) } -func TestDAOChallengeProVsNo(t *testing.T) { testDAOChallenge(t, true, false, false) } -func TestDAOChallengeProVsPro(t *testing.T) { testDAOChallenge(t, true, true, false) } -func TestDAOChallengeNoVsTimeout(t *testing.T) { testDAOChallenge(t, false, false, true) } -func TestDAOChallengeProVsTimeout(t *testing.T) { testDAOChallenge(t, true, true, true) } - -func testDAOChallenge(t *testing.T, localForked, remoteForked bool, timeout bool) { - // Reduce the DAO handshake challenge timeout - if timeout { - defer func(old time.Duration) { daoChallengeTimeout = old }(daoChallengeTimeout) - daoChallengeTimeout = 500 * time.Millisecond - } - // Create a DAO aware protocol manager +// Tests that post eth protocol handshake, clients perform a mutual checkpoint +// challenge to validate each other's chains. Hash mismatches, or missing ones +// during a fast sync should lead to the peer getting dropped. +func TestCheckpointChallenge(t *testing.T) { + tests := []struct { + syncmode downloader.SyncMode + checkpoint bool + timeout bool + empty bool + match bool + drop bool + }{ + // If checkpointing is not enabled locally, don't challenge and don't drop + {downloader.FullSync, false, false, false, false, false}, + {downloader.FastSync, false, false, false, false, false}, + {downloader.LightSync, false, false, false, false, false}, + + // If checkpointing is enabled locally and remote response is empty, only drop during fast sync + {downloader.FullSync, true, false, true, false, false}, + {downloader.FastSync, true, false, true, false, true}, // Special case, fast sync, unsynced peer + {downloader.LightSync, true, false, true, false, false}, + + // If checkpointing is enabled locally and remote response mismatches, always drop + {downloader.FullSync, true, false, false, false, true}, + {downloader.FastSync, true, false, false, false, true}, + {downloader.LightSync, true, false, false, false, true}, + + // If checkpointing is enabled locally and remote response matches, never drop + {downloader.FullSync, true, false, false, true, false}, + {downloader.FastSync, true, false, false, true, false}, + {downloader.LightSync, true, false, false, true, false}, + + // If checkpointing is enabled locally and remote times out, always drop + {downloader.FullSync, true, true, false, true, true}, + {downloader.FastSync, true, true, false, true, true}, + {downloader.LightSync, true, true, false, true, true}, + } + for _, tt := range tests { + t.Run(fmt.Sprintf("sync %v checkpoint %v timeout %v empty %v match %v", tt.syncmode, tt.checkpoint, tt.timeout, tt.empty, tt.match), func(t *testing.T) { + testCheckpointChallenge(t, tt.syncmode, tt.checkpoint, tt.timeout, tt.empty, tt.match, tt.drop) + }) + } +} + +func testCheckpointChallenge(t *testing.T, syncmode downloader.SyncMode, checkpoint bool, timeout bool, empty bool, match bool, drop bool) { + // Reduce the checkpoint handshake challenge timeout + defer func(old time.Duration) { syncChallengeTimeout = old }(syncChallengeTimeout) + syncChallengeTimeout = 250 * time.Millisecond + + // Initialize a chain and generate a fake CHT if checkpointing is enabled var ( - evmux = new(event.TypeMux) - pow = ethash.NewFaker() db = ethdb.NewMemDatabase() - config = ¶ms.ChainConfig{DAOForkBlock: big.NewInt(1), DAOForkSupport: localForked} - gspec = &core.Genesis{Config: config} - genesis = gspec.MustCommit(db) + config = new(params.ChainConfig) + genesis = (&core.Genesis{Config: config}).MustCommit(db) ) - blockchain, err := core.NewBlockChain(db, nil, config, pow, vm.Config{}, nil) + // If checkpointing is enabled, create and inject a fake CHT and the corresponding + // chllenge response. + var response *types.Header + if checkpoint { + index := uint64(rand.Intn(500)) + number := (index+1)*params.CHTFrequencyClient - 1 + response = &types.Header{Number: big.NewInt(int64(number)), Extra: []byte("valid")} + + cht := ¶ms.TrustedCheckpoint{ + SectionIndex: index, + SectionHead: response.Hash(), + } + params.TrustedCheckpoints[genesis.Hash()] = cht + defer delete(params.TrustedCheckpoints, genesis.Hash()) + } + // Create a checkpoint aware protocol manager + blockchain, err := core.NewBlockChain(db, nil, config, ethash.NewFaker(), vm.Config{}, nil) if err != nil { t.Fatalf("failed to create new blockchain: %v", err) } - pm, err := NewProtocolManager(config, downloader.FullSync, DefaultConfig.NetworkId, evmux, new(testTxPool), pow, blockchain, db, nil) + pm, err := NewProtocolManager(config, syncmode, DefaultConfig.NetworkId, new(event.TypeMux), new(testTxPool), ethash.NewFaker(), blockchain, db, nil) if err != nil { t.Fatalf("failed to start test protocol manager: %v", err) } pm.Start(1000) defer pm.Stop() - // Connect a new peer and check that we receive the DAO challenge + // Connect a new peer and check that we receive the checkpoint challenge peer, _ := newTestPeer("peer", eth63, pm, true) defer peer.close() - challenge := &getBlockHeadersData{ - Origin: hashOrNumber{Number: config.DAOForkBlock.Uint64()}, - Amount: 1, - Skip: 0, - Reverse: false, - } - if err := p2p.ExpectMsg(peer.app, GetBlockHeadersMsg, challenge); err != nil { - t.Fatalf("challenge mismatch: %v", err) - } - // Create a block to reply to the challenge if no timeout is simulated - if !timeout { - blocks, _ := core.GenerateChain(¶ms.ChainConfig{}, genesis, ethash.NewFaker(), db, 1, func(i int, block *core.BlockGen) { - if remoteForked { - block.SetExtra(params.DAOForkBlockExtra) + if checkpoint { + challenge := &getBlockHeadersData{ + Origin: hashOrNumber{Number: response.Number.Uint64()}, + Amount: 1, + Skip: 0, + Reverse: false, + } + if err := p2p.ExpectMsg(peer.app, GetBlockHeadersMsg, challenge); err != nil { + t.Fatalf("challenge mismatch: %v", err) + } + // Create a block to reply to the challenge if no timeout is simulated + if !timeout { + if empty { + if err := p2p.Send(peer.app, BlockHeadersMsg, []*types.Header{}); err != nil { + t.Fatalf("failed to answer challenge: %v", err) + } + } else if match { + if err := p2p.Send(peer.app, BlockHeadersMsg, []*types.Header{response}); err != nil { + t.Fatalf("failed to answer challenge: %v", err) + } + } else { + if err := p2p.Send(peer.app, BlockHeadersMsg, []*types.Header{{Number: response.Number}}); err != nil { + t.Fatalf("failed to answer challenge: %v", err) + } } - }) - if err := p2p.Send(peer.app, BlockHeadersMsg, []*types.Header{blocks[0].Header()}); err != nil { - t.Fatalf("failed to answer challenge: %v", err) } - time.Sleep(100 * time.Millisecond) // Sleep to avoid the verification racing with the drops - } else { - // Otherwise wait until the test timeout passes - time.Sleep(daoChallengeTimeout + 500*time.Millisecond) } - // Verify that depending on fork side, the remote peer is maintained or dropped - if localForked == remoteForked && !timeout { - if peers := pm.peers.Len(); peers != 1 { - t.Fatalf("peer count mismatch: have %d, want %d", peers, 1) - } - } else { + // Wait until the test timeout passes to ensure proper cleanup + time.Sleep(syncChallengeTimeout + 100*time.Millisecond) + + // Verify that the remote peer is maintained or dropped + if drop { if peers := pm.peers.Len(); peers != 0 { t.Fatalf("peer count mismatch: have %d, want %d", peers, 0) } + } else { + if peers := pm.peers.Len(); peers != 1 { + t.Fatalf("peer count mismatch: have %d, want %d", peers, 1) + } } } diff --git a/eth/peer.go b/eth/peer.go index b5f450855..54c0ea0d5 100644 --- a/eth/peer.go +++ b/eth/peer.go @@ -79,7 +79,7 @@ type peer struct { rw p2p.MsgReadWriter version int // Protocol version negotiated - forkDrop *time.Timer // Timed connection dropper if forks aren't validated in time + syncDrop *time.Timer // Timed connection dropper if sync progress isn't validated in time head common.Hash td *big.Int diff --git a/eth/sync.go b/eth/sync.go index bfcfb6716..3e23c5378 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -188,14 +188,12 @@ func (pm *ProtocolManager) synchronise(peer *peer) { atomic.StoreUint32(&pm.fastSync, 1) mode = downloader.FastSync } - if mode == downloader.FastSync { // Make sure the peer's total difficulty we are synchronizing is higher. if pm.blockchain.GetTdByHash(pm.blockchain.CurrentFastBlock().Hash()).Cmp(pTd) >= 0 { return } } - // Run the sync cycle, and disable fast sync if we've went past the pivot block if err := pm.downloader.Synchronise(peer.id, pHead, pTd, mode); err != nil { return -- cgit v1.2.3