diff options
-rw-r--r-- | core/chain_indexer.go | 30 | ||||
-rw-r--r-- | core/chain_indexer_test.go | 6 | ||||
-rw-r--r-- | eth/backend.go | 2 | ||||
-rw-r--r-- | eth/bloombits.go | 23 | ||||
-rw-r--r-- | les/backend.go | 39 | ||||
-rw-r--r-- | les/distributor.go | 4 | ||||
-rw-r--r-- | les/handler.go | 34 | ||||
-rw-r--r-- | les/helper_test.go | 6 | ||||
-rw-r--r-- | les/odr.go | 18 | ||||
-rw-r--r-- | les/odr_test.go | 3 | ||||
-rw-r--r-- | les/request_test.go | 3 | ||||
-rw-r--r-- | les/retrieve.go | 3 | ||||
-rw-r--r-- | les/server.go | 4 | ||||
-rw-r--r-- | light/lightchain.go | 14 | ||||
-rw-r--r-- | light/odr.go | 4 | ||||
-rw-r--r-- | light/postprocess.go | 170 |
16 files changed, 251 insertions, 112 deletions
diff --git a/core/chain_indexer.go b/core/chain_indexer.go index 0b927116d..11a7c96fa 100644 --- a/core/chain_indexer.go +++ b/core/chain_indexer.go @@ -17,6 +17,7 @@ package core import ( + "context" "encoding/binary" "fmt" "sync" @@ -37,11 +38,11 @@ import ( type ChainIndexerBackend interface { // Reset initiates the processing of a new chain segment, potentially terminating // any partially completed operations (in case of a reorg). - Reset(section uint64, prevHead common.Hash) error + Reset(ctx context.Context, section uint64, prevHead common.Hash) error // Process crunches through the next header in the chain segment. The caller // will ensure a sequential order of headers. - Process(header *types.Header) + Process(ctx context.Context, header *types.Header) error // Commit finalizes the section metadata and stores it into the database. Commit() error @@ -71,9 +72,11 @@ type ChainIndexer struct { backend ChainIndexerBackend // Background processor generating the index data content children []*ChainIndexer // Child indexers to cascade chain updates to - active uint32 // Flag whether the event loop was started - update chan struct{} // Notification channel that headers should be processed - quit chan chan error // Quit channel to tear down running goroutines + active uint32 // Flag whether the event loop was started + update chan struct{} // Notification channel that headers should be processed + quit chan chan error // Quit channel to tear down running goroutines + ctx context.Context + ctxCancel func() sectionSize uint64 // Number of blocks in a single chain segment to process confirmsReq uint64 // Number of confirmations before processing a completed segment @@ -105,6 +108,8 @@ func NewChainIndexer(chainDb, indexDb ethdb.Database, backend ChainIndexerBacken } // Initialize database dependent fields and start the updater c.loadValidSections() + c.ctx, c.ctxCancel = context.WithCancel(context.Background()) + go c.updateLoop() return c @@ -138,6 +143,8 @@ func (c *ChainIndexer) Start(chain ChainIndexerChain) { func (c *ChainIndexer) Close() error { var errs []error + c.ctxCancel() + // Tear down the primary update loop errc := make(chan error) c.quit <- errc @@ -297,6 +304,12 @@ func (c *ChainIndexer) updateLoop() { c.lock.Unlock() newHead, err := c.processSection(section, oldHead) if err != nil { + select { + case <-c.ctx.Done(): + <-c.quit <- nil + return + default: + } c.log.Error("Section processing failed", "error", err) } c.lock.Lock() @@ -344,7 +357,7 @@ func (c *ChainIndexer) processSection(section uint64, lastHead common.Hash) (com // Reset and partial processing - if err := c.backend.Reset(section, lastHead); err != nil { + if err := c.backend.Reset(c.ctx, section, lastHead); err != nil { c.setValidSections(0) return common.Hash{}, err } @@ -360,11 +373,12 @@ func (c *ChainIndexer) processSection(section uint64, lastHead common.Hash) (com } else if header.ParentHash != lastHead { return common.Hash{}, fmt.Errorf("chain reorged during section processing") } - c.backend.Process(header) + if err := c.backend.Process(c.ctx, header); err != nil { + return common.Hash{}, err + } lastHead = header.Hash() } if err := c.backend.Commit(); err != nil { - c.log.Error("Section commit failed", "error", err) return common.Hash{}, err } return lastHead, nil diff --git a/core/chain_indexer_test.go b/core/chain_indexer_test.go index 550caf556..a029dec62 100644 --- a/core/chain_indexer_test.go +++ b/core/chain_indexer_test.go @@ -17,6 +17,7 @@ package core import ( + "context" "fmt" "math/big" "math/rand" @@ -210,13 +211,13 @@ func (b *testChainIndexBackend) reorg(headNum uint64) uint64 { return b.stored * b.indexer.sectionSize } -func (b *testChainIndexBackend) Reset(section uint64, prevHead common.Hash) error { +func (b *testChainIndexBackend) Reset(ctx context.Context, section uint64, prevHead common.Hash) error { b.section = section b.headerCnt = 0 return nil } -func (b *testChainIndexBackend) Process(header *types.Header) { +func (b *testChainIndexBackend) Process(ctx context.Context, header *types.Header) error { b.headerCnt++ if b.headerCnt > b.indexer.sectionSize { b.t.Error("Processing too many headers") @@ -227,6 +228,7 @@ func (b *testChainIndexBackend) Process(header *types.Header) { b.t.Fatal("Unexpected call to Process") case b.processCh <- header.Number.Uint64(): } + return nil } func (b *testChainIndexBackend) Commit() error { diff --git a/eth/backend.go b/eth/backend.go index 865534b19..6549cb8a3 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -130,7 +130,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) { gasPrice: config.GasPrice, etherbase: config.Etherbase, bloomRequests: make(chan chan *bloombits.Retrieval), - bloomIndexer: NewBloomIndexer(chainDb, params.BloomBitsBlocks), + bloomIndexer: NewBloomIndexer(chainDb, params.BloomBitsBlocks, bloomConfirms), } log.Info("Initialising Ethereum protocol", "versions", ProtocolVersions, "network", config.NetworkId) diff --git a/eth/bloombits.go b/eth/bloombits.go index 954239d14..eb18565e2 100644 --- a/eth/bloombits.go +++ b/eth/bloombits.go @@ -17,6 +17,7 @@ package eth import ( + "context" "time" "github.com/ethereum/go-ethereum/common" @@ -92,30 +93,28 @@ const ( // BloomIndexer implements a core.ChainIndexer, building up a rotated bloom bits index // for the Ethereum header bloom filters, permitting blazing fast filtering. type BloomIndexer struct { - size uint64 // section size to generate bloombits for - - db ethdb.Database // database instance to write index data and metadata into - gen *bloombits.Generator // generator to rotate the bloom bits crating the bloom index - - section uint64 // Section is the section number being processed currently - head common.Hash // Head is the hash of the last header processed + size uint64 // section size to generate bloombits for + db ethdb.Database // database instance to write index data and metadata into + gen *bloombits.Generator // generator to rotate the bloom bits crating the bloom index + section uint64 // Section is the section number being processed currently + head common.Hash // Head is the hash of the last header processed } // NewBloomIndexer returns a chain indexer that generates bloom bits data for the // canonical chain for fast logs filtering. -func NewBloomIndexer(db ethdb.Database, size uint64) *core.ChainIndexer { +func NewBloomIndexer(db ethdb.Database, size, confReq uint64) *core.ChainIndexer { backend := &BloomIndexer{ db: db, size: size, } table := ethdb.NewTable(db, string(rawdb.BloomBitsIndexPrefix)) - return core.NewChainIndexer(db, table, backend, size, bloomConfirms, bloomThrottling, "bloombits") + return core.NewChainIndexer(db, table, backend, size, confReq, bloomThrottling, "bloombits") } // Reset implements core.ChainIndexerBackend, starting a new bloombits index // section. -func (b *BloomIndexer) Reset(section uint64, lastSectionHead common.Hash) error { +func (b *BloomIndexer) Reset(ctx context.Context, section uint64, lastSectionHead common.Hash) error { gen, err := bloombits.NewGenerator(uint(b.size)) b.gen, b.section, b.head = gen, section, common.Hash{} return err @@ -123,16 +122,16 @@ func (b *BloomIndexer) Reset(section uint64, lastSectionHead common.Hash) error // Process implements core.ChainIndexerBackend, adding a new header's bloom into // the index. -func (b *BloomIndexer) Process(header *types.Header) { +func (b *BloomIndexer) Process(ctx context.Context, header *types.Header) error { b.gen.AddBloom(uint(header.Number.Uint64()-b.section*b.size), header.Bloom) b.head = header.Hash() + return nil } // Commit implements core.ChainIndexerBackend, finalizing the bloom section and // writing it out into the database. func (b *BloomIndexer) Commit() error { batch := b.db.NewBatch() - for i := 0; i < types.BloomBitLength; i++ { bits, err := b.gen.Bitset(uint(i)) if err != nil { diff --git a/les/backend.go b/les/backend.go index 178bc1e0e..9b8cc1828 100644 --- a/les/backend.go +++ b/les/backend.go @@ -95,29 +95,35 @@ func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) { quitSync := make(chan struct{}) leth := &LightEthereum{ - config: config, - chainConfig: chainConfig, - chainDb: chainDb, - eventMux: ctx.EventMux, - peers: peers, - reqDist: newRequestDistributor(peers, quitSync), - accountManager: ctx.AccountManager, - engine: eth.CreateConsensusEngine(ctx, chainConfig, &config.Ethash, nil, chainDb), - shutdownChan: make(chan bool), - networkId: config.NetworkId, - bloomRequests: make(chan chan *bloombits.Retrieval), - bloomIndexer: eth.NewBloomIndexer(chainDb, light.BloomTrieFrequency), - chtIndexer: light.NewChtIndexer(chainDb, true), - bloomTrieIndexer: light.NewBloomTrieIndexer(chainDb, true), + config: config, + chainConfig: chainConfig, + chainDb: chainDb, + eventMux: ctx.EventMux, + peers: peers, + reqDist: newRequestDistributor(peers, quitSync), + accountManager: ctx.AccountManager, + engine: eth.CreateConsensusEngine(ctx, chainConfig, &config.Ethash, nil, chainDb), + shutdownChan: make(chan bool), + networkId: config.NetworkId, + bloomRequests: make(chan chan *bloombits.Retrieval), + bloomIndexer: eth.NewBloomIndexer(chainDb, light.BloomTrieFrequency, light.HelperTrieConfirmations), } leth.relay = NewLesTxRelay(peers, leth.reqDist) leth.serverPool = newServerPool(chainDb, quitSync, &leth.wg) leth.retriever = newRetrieveManager(peers, leth.reqDist, leth.serverPool) - leth.odr = NewLesOdr(chainDb, leth.chtIndexer, leth.bloomTrieIndexer, leth.bloomIndexer, leth.retriever) + leth.odr = NewLesOdr(chainDb, leth.retriever) + leth.chtIndexer = light.NewChtIndexer(chainDb, true, leth.odr) + leth.bloomTrieIndexer = light.NewBloomTrieIndexer(chainDb, true, leth.odr) + leth.odr.SetIndexers(leth.chtIndexer, leth.bloomTrieIndexer, leth.bloomIndexer) + // Note: NewLightChain adds the trusted checkpoint so it needs an ODR with + // indexers already set but not started yet if leth.blockchain, err = light.NewLightChain(leth.odr, leth.chainConfig, leth.engine); err != nil { return nil, err } + // Note: AddChildIndexer starts the update process for the child + leth.bloomIndexer.AddChildIndexer(leth.bloomTrieIndexer) + leth.chtIndexer.Start(leth.blockchain) leth.bloomIndexer.Start(leth.blockchain) // Rewind the chain in case of an incompatible config upgrade. if compat, ok := genesisErr.(*params.ConfigCompatError); ok { @@ -242,9 +248,6 @@ func (s *LightEthereum) Stop() error { if s.chtIndexer != nil { s.chtIndexer.Close() } - if s.bloomTrieIndexer != nil { - s.bloomTrieIndexer.Close() - } s.blockchain.Stop() s.protocolManager.Stop() s.txPool.Stop() diff --git a/les/distributor.go b/les/distributor.go index 159fa4c73..d3f6b21d1 100644 --- a/les/distributor.go +++ b/les/distributor.go @@ -20,14 +20,10 @@ package les import ( "container/list" - "errors" "sync" "time" ) -// ErrNoPeers is returned if no peers capable of serving a queued request are available -var ErrNoPeers = errors.New("no suitable peers available") - // requestDistributor implements a mechanism that distributes requests to // suitable peers, obeying flow control rules and prioritizing them in creation // order (even when a resend is necessary). diff --git a/les/handler.go b/les/handler.go index 91a235bf0..ccb4a8844 100644 --- a/les/handler.go +++ b/les/handler.go @@ -1206,11 +1206,12 @@ func (pm *ProtocolManager) txStatus(hashes []common.Hash) []txStatus { // NodeInfo represents a short summary of the Ethereum sub-protocol metadata // known about the host peer. type NodeInfo struct { - Network uint64 `json:"network"` // Ethereum network ID (1=Frontier, 2=Morden, Ropsten=3, Rinkeby=4) - Difficulty *big.Int `json:"difficulty"` // Total difficulty of the host's blockchain - Genesis common.Hash `json:"genesis"` // SHA3 hash of the host's genesis block - Config *params.ChainConfig `json:"config"` // Chain configuration for the fork rules - Head common.Hash `json:"head"` // SHA3 hash of the host's best owned block + Network uint64 `json:"network"` // Ethereum network ID (1=Frontier, 2=Morden, Ropsten=3, Rinkeby=4) + Difficulty *big.Int `json:"difficulty"` // Total difficulty of the host's blockchain + Genesis common.Hash `json:"genesis"` // SHA3 hash of the host's genesis block + Config *params.ChainConfig `json:"config"` // Chain configuration for the fork rules + Head common.Hash `json:"head"` // SHA3 hash of the host's best owned block + CHT light.TrustedCheckpoint `json:"cht"` // Trused CHT checkpoint for fast catchup } // NodeInfo retrieves some protocol metadata about the running host node. @@ -1218,12 +1219,31 @@ func (self *ProtocolManager) NodeInfo() *NodeInfo { head := self.blockchain.CurrentHeader() hash := head.Hash() + var cht light.TrustedCheckpoint + + sections, _, sectionHead := self.odr.ChtIndexer().Sections() + sections2, _, sectionHead2 := self.odr.BloomTrieIndexer().Sections() + if sections2 < sections { + sections = sections2 + sectionHead = sectionHead2 + } + if sections > 0 { + sectionIndex := sections - 1 + cht = light.TrustedCheckpoint{ + SectionIdx: sectionIndex, + SectionHead: sectionHead, + CHTRoot: light.GetChtRoot(self.chainDb, sectionIndex, sectionHead), + BloomRoot: light.GetBloomTrieRoot(self.chainDb, sectionIndex, sectionHead), + } + } + return &NodeInfo{ Network: self.networkId, Difficulty: self.blockchain.GetTd(hash, head.Number.Uint64()), Genesis: self.blockchain.Genesis().Hash(), Config: self.blockchain.Config(), Head: hash, + CHT: cht, } } @@ -1258,7 +1278,7 @@ func (pc *peerConnection) RequestHeadersByHash(origin common.Hash, amount int, s } _, ok := <-pc.manager.reqDist.queue(rq) if !ok { - return ErrNoPeers + return light.ErrNoPeers } return nil } @@ -1282,7 +1302,7 @@ func (pc *peerConnection) RequestHeadersByNumber(origin uint64, amount int, skip } _, ok := <-pc.manager.reqDist.queue(rq) if !ok { - return ErrNoPeers + return light.ErrNoPeers } return nil } diff --git a/les/helper_test.go b/les/helper_test.go index 8fd01a39e..50c97e06e 100644 --- a/les/helper_test.go +++ b/les/helper_test.go @@ -156,12 +156,12 @@ func newTestProtocolManager(lightSync bool, blocks int, generator func(int, *cor } else { blockchain, _ := core.NewBlockChain(db, nil, gspec.Config, engine, vm.Config{}) - chtIndexer := light.NewChtIndexer(db, false) + chtIndexer := light.NewChtIndexer(db, false, nil) chtIndexer.Start(blockchain) - bbtIndexer := light.NewBloomTrieIndexer(db, false) + bbtIndexer := light.NewBloomTrieIndexer(db, false, nil) - bloomIndexer := eth.NewBloomIndexer(db, params.BloomBitsBlocks) + bloomIndexer := eth.NewBloomIndexer(db, params.BloomBitsBlocks, light.HelperTrieProcessConfirmations) bloomIndexer.AddChildIndexer(bbtIndexer) bloomIndexer.Start(blockchain) diff --git a/les/odr.go b/les/odr.go index f8412aaad..2ad28d5d9 100644 --- a/les/odr.go +++ b/les/odr.go @@ -33,14 +33,11 @@ type LesOdr struct { stop chan struct{} } -func NewLesOdr(db ethdb.Database, chtIndexer, bloomTrieIndexer, bloomIndexer *core.ChainIndexer, retriever *retrieveManager) *LesOdr { +func NewLesOdr(db ethdb.Database, retriever *retrieveManager) *LesOdr { return &LesOdr{ - db: db, - chtIndexer: chtIndexer, - bloomTrieIndexer: bloomTrieIndexer, - bloomIndexer: bloomIndexer, - retriever: retriever, - stop: make(chan struct{}), + db: db, + retriever: retriever, + stop: make(chan struct{}), } } @@ -54,6 +51,13 @@ func (odr *LesOdr) Database() ethdb.Database { return odr.db } +// SetIndexers adds the necessary chain indexers to the ODR backend +func (odr *LesOdr) SetIndexers(chtIndexer, bloomTrieIndexer, bloomIndexer *core.ChainIndexer) { + odr.chtIndexer = chtIndexer + odr.bloomTrieIndexer = bloomTrieIndexer + odr.bloomIndexer = bloomIndexer +} + // ChtIndexer returns the CHT chain indexer func (odr *LesOdr) ChtIndexer() *core.ChainIndexer { return odr.chtIndexer diff --git a/les/odr_test.go b/les/odr_test.go index 983f7262b..c7c25cbe4 100644 --- a/les/odr_test.go +++ b/les/odr_test.go @@ -167,7 +167,8 @@ func testOdr(t *testing.T, protocol int, expFail uint64, fn odrTestFn) { rm := newRetrieveManager(peers, dist, nil) db := ethdb.NewMemDatabase() ldb := ethdb.NewMemDatabase() - odr := NewLesOdr(ldb, light.NewChtIndexer(db, true), light.NewBloomTrieIndexer(db, true), eth.NewBloomIndexer(db, light.BloomTrieFrequency), rm) + odr := NewLesOdr(ldb, rm) + odr.SetIndexers(light.NewChtIndexer(db, true, nil), light.NewBloomTrieIndexer(db, true, nil), eth.NewBloomIndexer(db, light.BloomTrieFrequency, light.HelperTrieConfirmations)) pm := newTestProtocolManagerMust(t, false, 4, testChainGen, nil, nil, db) lpm := newTestProtocolManagerMust(t, true, 0, nil, peers, odr, ldb) _, err1, lpeer, err2 := newTestPeerPair("peer", protocol, pm, lpm) diff --git a/les/request_test.go b/les/request_test.go index ba2f603d8..db576798b 100644 --- a/les/request_test.go +++ b/les/request_test.go @@ -89,7 +89,8 @@ func testAccess(t *testing.T, protocol int, fn accessTestFn) { rm := newRetrieveManager(peers, dist, nil) db := ethdb.NewMemDatabase() ldb := ethdb.NewMemDatabase() - odr := NewLesOdr(ldb, light.NewChtIndexer(db, true), light.NewBloomTrieIndexer(db, true), eth.NewBloomIndexer(db, light.BloomTrieFrequency), rm) + odr := NewLesOdr(ldb, rm) + odr.SetIndexers(light.NewChtIndexer(db, true, nil), light.NewBloomTrieIndexer(db, true, nil), eth.NewBloomIndexer(db, light.BloomTrieFrequency, light.HelperTrieConfirmations)) pm := newTestProtocolManagerMust(t, false, 4, testChainGen, nil, nil, db) lpm := newTestProtocolManagerMust(t, true, 0, nil, peers, odr, ldb) diff --git a/les/retrieve.go b/les/retrieve.go index a9037a38e..8ae36d82c 100644 --- a/les/retrieve.go +++ b/les/retrieve.go @@ -27,6 +27,7 @@ import ( "time" "github.com/ethereum/go-ethereum/common/mclock" + "github.com/ethereum/go-ethereum/light" ) var ( @@ -207,7 +208,7 @@ func (r *sentReq) stateRequesting() reqStateFn { return r.stateNoMorePeers } // nothing to wait for, no more peers to ask, return with error - r.stop(ErrNoPeers) + r.stop(light.ErrNoPeers) // no need to go to stopped state because waiting() already returned false return nil } diff --git a/les/server.go b/les/server.go index fca6124c9..a934fbf26 100644 --- a/les/server.go +++ b/les/server.go @@ -67,8 +67,8 @@ func NewLesServer(eth *eth.Ethereum, config *eth.Config) (*LesServer, error) { protocolManager: pm, quitSync: quitSync, lesTopics: lesTopics, - chtIndexer: light.NewChtIndexer(eth.ChainDb(), false), - bloomTrieIndexer: light.NewBloomTrieIndexer(eth.ChainDb(), false), + chtIndexer: light.NewChtIndexer(eth.ChainDb(), false, nil), + bloomTrieIndexer: light.NewBloomTrieIndexer(eth.ChainDb(), false, nil), } logger := log.New() diff --git a/light/lightchain.go b/light/lightchain.go index 30b9bd89a..b7e629e88 100644 --- a/light/lightchain.go +++ b/light/lightchain.go @@ -116,19 +116,19 @@ func NewLightChain(odr OdrBackend, config *params.ChainConfig, engine consensus. } // addTrustedCheckpoint adds a trusted checkpoint to the blockchain -func (self *LightChain) addTrustedCheckpoint(cp trustedCheckpoint) { +func (self *LightChain) addTrustedCheckpoint(cp TrustedCheckpoint) { if self.odr.ChtIndexer() != nil { - StoreChtRoot(self.chainDb, cp.sectionIdx, cp.sectionHead, cp.chtRoot) - self.odr.ChtIndexer().AddKnownSectionHead(cp.sectionIdx, cp.sectionHead) + StoreChtRoot(self.chainDb, cp.SectionIdx, cp.SectionHead, cp.CHTRoot) + self.odr.ChtIndexer().AddKnownSectionHead(cp.SectionIdx, cp.SectionHead) } if self.odr.BloomTrieIndexer() != nil { - StoreBloomTrieRoot(self.chainDb, cp.sectionIdx, cp.sectionHead, cp.bloomTrieRoot) - self.odr.BloomTrieIndexer().AddKnownSectionHead(cp.sectionIdx, cp.sectionHead) + StoreBloomTrieRoot(self.chainDb, cp.SectionIdx, cp.SectionHead, cp.BloomRoot) + self.odr.BloomTrieIndexer().AddKnownSectionHead(cp.SectionIdx, cp.SectionHead) } if self.odr.BloomIndexer() != nil { - self.odr.BloomIndexer().AddKnownSectionHead(cp.sectionIdx, cp.sectionHead) + self.odr.BloomIndexer().AddKnownSectionHead(cp.SectionIdx, cp.SectionHead) } - log.Info("Added trusted checkpoint", "chain", cp.name, "block", (cp.sectionIdx+1)*CHTFrequencyClient-1, "hash", cp.sectionHead) + log.Info("Added trusted checkpoint", "chain", cp.name, "block", (cp.SectionIdx+1)*CHTFrequencyClient-1, "hash", cp.SectionHead) } func (self *LightChain) getProcInterrupt() bool { diff --git a/light/odr.go b/light/odr.go index 8f1e50b81..83c64055a 100644 --- a/light/odr.go +++ b/light/odr.go @@ -20,6 +20,7 @@ package light import ( "context" + "errors" "math/big" "github.com/ethereum/go-ethereum/common" @@ -33,6 +34,9 @@ import ( // service is not required. var NoOdr = context.Background() +// ErrNoPeers is returned if no peers capable of serving a queued request are available +var ErrNoPeers = errors.New("no suitable peers available") + // OdrBackend is an interface to a backend service that handles ODR retrievals type type OdrBackend interface { Database() ethdb.Database diff --git a/light/postprocess.go b/light/postprocess.go index 2090a9d04..0b25e1d88 100644 --- a/light/postprocess.go +++ b/light/postprocess.go @@ -17,8 +17,10 @@ package light import ( + "context" "encoding/binary" "errors" + "fmt" "math/big" "time" @@ -47,35 +49,35 @@ const ( HelperTrieProcessConfirmations = 256 // number of confirmations before a HelperTrie is generated ) -// trustedCheckpoint represents a set of post-processed trie roots (CHT and BloomTrie) associated with +// TrustedCheckpoint represents a set of post-processed trie roots (CHT and BloomTrie) associated with // the appropriate section index and head hash. It is used to start light syncing from this checkpoint // and avoid downloading the entire header chain while still being able to securely access old headers/logs. -type trustedCheckpoint struct { - name string - sectionIdx uint64 - sectionHead, chtRoot, bloomTrieRoot common.Hash +type TrustedCheckpoint struct { + name string + SectionIdx uint64 + SectionHead, CHTRoot, BloomRoot common.Hash } var ( - mainnetCheckpoint = trustedCheckpoint{ - name: "mainnet", - sectionIdx: 179, - sectionHead: common.HexToHash("ae778e455492db1183e566fa0c67f954d256fdd08618f6d5a393b0e24576d0ea"), - chtRoot: common.HexToHash("646b338f9ca74d936225338916be53710ec84020b89946004a8605f04c817f16"), - bloomTrieRoot: common.HexToHash("d0f978f5dbc86e5bf931d8dd5b2ecbebbda6dc78f8896af6a27b46a3ced0ac25"), + mainnetCheckpoint = TrustedCheckpoint{ + name: "mainnet", + SectionIdx: 179, + SectionHead: common.HexToHash("ae778e455492db1183e566fa0c67f954d256fdd08618f6d5a393b0e24576d0ea"), + CHTRoot: common.HexToHash("646b338f9ca74d936225338916be53710ec84020b89946004a8605f04c817f16"), + BloomRoot: common.HexToHash("d0f978f5dbc86e5bf931d8dd5b2ecbebbda6dc78f8896af6a27b46a3ced0ac25"), } - ropstenCheckpoint = trustedCheckpoint{ - name: "ropsten", - sectionIdx: 107, - sectionHead: common.HexToHash("e1988f95399debf45b873e065e5cd61b416ef2e2e5deec5a6f87c3127086e1ce"), - chtRoot: common.HexToHash("15cba18e4de0ab1e95e202625199ba30147aec8b0b70384b66ebea31ba6a18e0"), - bloomTrieRoot: common.HexToHash("e00fa6389b2e597d9df52172cd8e936879eed0fca4fa59db99e2c8ed682562f2"), + ropstenCheckpoint = TrustedCheckpoint{ + name: "ropsten", + SectionIdx: 107, + SectionHead: common.HexToHash("e1988f95399debf45b873e065e5cd61b416ef2e2e5deec5a6f87c3127086e1ce"), + CHTRoot: common.HexToHash("15cba18e4de0ab1e95e202625199ba30147aec8b0b70384b66ebea31ba6a18e0"), + BloomRoot: common.HexToHash("e00fa6389b2e597d9df52172cd8e936879eed0fca4fa59db99e2c8ed682562f2"), } ) // trustedCheckpoints associates each known checkpoint with the genesis hash of the chain it belongs to -var trustedCheckpoints = map[common.Hash]trustedCheckpoint{ +var trustedCheckpoints = map[common.Hash]TrustedCheckpoint{ params.MainnetGenesisHash: mainnetCheckpoint, params.TestnetGenesisHash: ropstenCheckpoint, } @@ -119,7 +121,8 @@ func StoreChtRoot(db ethdb.Database, sectionIdx uint64, sectionHead, root common // ChtIndexerBackend implements core.ChainIndexerBackend type ChtIndexerBackend struct { - diskdb ethdb.Database + diskdb, trieTable ethdb.Database + odr OdrBackend triedb *trie.Database section, sectionSize uint64 lastHash common.Hash @@ -127,7 +130,7 @@ type ChtIndexerBackend struct { } // NewBloomTrieIndexer creates a BloomTrie chain indexer -func NewChtIndexer(db ethdb.Database, clientMode bool) *core.ChainIndexer { +func NewChtIndexer(db ethdb.Database, clientMode bool, odr OdrBackend) *core.ChainIndexer { var sectionSize, confirmReq uint64 if clientMode { sectionSize = CHTFrequencyClient @@ -137,28 +140,64 @@ func NewChtIndexer(db ethdb.Database, clientMode bool) *core.ChainIndexer { confirmReq = HelperTrieProcessConfirmations } idb := ethdb.NewTable(db, "chtIndex-") + trieTable := ethdb.NewTable(db, ChtTablePrefix) backend := &ChtIndexerBackend{ diskdb: db, - triedb: trie.NewDatabase(ethdb.NewTable(db, ChtTablePrefix)), + odr: odr, + trieTable: trieTable, + triedb: trie.NewDatabase(trieTable), sectionSize: sectionSize, } return core.NewChainIndexer(db, idb, backend, sectionSize, confirmReq, time.Millisecond*100, "cht") } +// fetchMissingNodes tries to retrieve the last entry of the latest trusted CHT from the +// ODR backend in order to be able to add new entries and calculate subsequent root hashes +func (c *ChtIndexerBackend) fetchMissingNodes(ctx context.Context, section uint64, root common.Hash) error { + batch := c.trieTable.NewBatch() + r := &ChtRequest{ChtRoot: root, ChtNum: section - 1, BlockNum: section*c.sectionSize - 1} + for { + err := c.odr.Retrieve(ctx, r) + switch err { + case nil: + r.Proof.Store(batch) + return batch.Write() + case ErrNoPeers: + // if there are no peers to serve, retry later + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(time.Second * 10): + // stay in the loop and try again + } + default: + return err + } + } +} + // Reset implements core.ChainIndexerBackend -func (c *ChtIndexerBackend) Reset(section uint64, lastSectionHead common.Hash) error { +func (c *ChtIndexerBackend) Reset(ctx context.Context, section uint64, lastSectionHead common.Hash) error { var root common.Hash if section > 0 { root = GetChtRoot(c.diskdb, section-1, lastSectionHead) } var err error c.trie, err = trie.New(root, c.triedb) + + if err != nil && c.odr != nil { + err = c.fetchMissingNodes(ctx, section, root) + if err == nil { + c.trie, err = trie.New(root, c.triedb) + } + } + c.section = section return err } // Process implements core.ChainIndexerBackend -func (c *ChtIndexerBackend) Process(header *types.Header) { +func (c *ChtIndexerBackend) Process(ctx context.Context, header *types.Header) error { hash, num := header.Hash(), header.Number.Uint64() c.lastHash = hash @@ -170,6 +209,7 @@ func (c *ChtIndexerBackend) Process(header *types.Header) { binary.BigEndian.PutUint64(encNumber[:], num) data, _ := rlp.EncodeToBytes(ChtNode{hash, td}) c.trie.Update(encNumber[:], data) + return nil } // Commit implements core.ChainIndexerBackend @@ -181,16 +221,15 @@ func (c *ChtIndexerBackend) Commit() error { c.triedb.Commit(root, false) if ((c.section+1)*c.sectionSize)%CHTFrequencyClient == 0 { - log.Info("Storing CHT", "section", c.section*c.sectionSize/CHTFrequencyClient, "head", c.lastHash, "root", root) + log.Info("Storing CHT", "section", c.section*c.sectionSize/CHTFrequencyClient, "head", fmt.Sprintf("%064x", c.lastHash), "root", fmt.Sprintf("%064x", root)) } StoreChtRoot(c.diskdb, c.section, c.lastHash, root) return nil } const ( - BloomTrieFrequency = 32768 - ethBloomBitsSection = 4096 - ethBloomBitsConfirmations = 256 + BloomTrieFrequency = 32768 + ethBloomBitsSection = 4096 ) var ( @@ -215,7 +254,8 @@ func StoreBloomTrieRoot(db ethdb.Database, sectionIdx uint64, sectionHead, root // BloomTrieIndexerBackend implements core.ChainIndexerBackend type BloomTrieIndexerBackend struct { - diskdb ethdb.Database + diskdb, trieTable ethdb.Database + odr OdrBackend triedb *trie.Database section, parentSectionSize, bloomTrieRatio uint64 trie *trie.Trie @@ -223,44 +263,98 @@ type BloomTrieIndexerBackend struct { } // NewBloomTrieIndexer creates a BloomTrie chain indexer -func NewBloomTrieIndexer(db ethdb.Database, clientMode bool) *core.ChainIndexer { +func NewBloomTrieIndexer(db ethdb.Database, clientMode bool, odr OdrBackend) *core.ChainIndexer { + trieTable := ethdb.NewTable(db, BloomTrieTablePrefix) backend := &BloomTrieIndexerBackend{ - diskdb: db, - triedb: trie.NewDatabase(ethdb.NewTable(db, BloomTrieTablePrefix)), + diskdb: db, + odr: odr, + trieTable: trieTable, + triedb: trie.NewDatabase(trieTable), } idb := ethdb.NewTable(db, "bltIndex-") - var confirmReq uint64 if clientMode { backend.parentSectionSize = BloomTrieFrequency - confirmReq = HelperTrieConfirmations } else { backend.parentSectionSize = ethBloomBitsSection - confirmReq = HelperTrieProcessConfirmations } backend.bloomTrieRatio = BloomTrieFrequency / backend.parentSectionSize backend.sectionHeads = make([]common.Hash, backend.bloomTrieRatio) - return core.NewChainIndexer(db, idb, backend, BloomTrieFrequency, confirmReq-ethBloomBitsConfirmations, time.Millisecond*100, "bloomtrie") + return core.NewChainIndexer(db, idb, backend, BloomTrieFrequency, 0, time.Millisecond*100, "bloomtrie") +} + +// fetchMissingNodes tries to retrieve the last entries of the latest trusted bloom trie from the +// ODR backend in order to be able to add new entries and calculate subsequent root hashes +func (b *BloomTrieIndexerBackend) fetchMissingNodes(ctx context.Context, section uint64, root common.Hash) error { + indexCh := make(chan uint, types.BloomBitLength) + type res struct { + nodes *NodeSet + err error + } + resCh := make(chan res, types.BloomBitLength) + for i := 0; i < 20; i++ { + go func() { + for bitIndex := range indexCh { + r := &BloomRequest{BloomTrieRoot: root, BloomTrieNum: section - 1, BitIdx: bitIndex, SectionIdxList: []uint64{section - 1}} + for { + if err := b.odr.Retrieve(ctx, r); err == ErrNoPeers { + // if there are no peers to serve, retry later + select { + case <-ctx.Done(): + resCh <- res{nil, ctx.Err()} + return + case <-time.After(time.Second * 10): + // stay in the loop and try again + } + } else { + resCh <- res{r.Proofs, err} + break + } + } + } + }() + } + + for i := uint(0); i < types.BloomBitLength; i++ { + indexCh <- i + } + close(indexCh) + batch := b.trieTable.NewBatch() + for i := uint(0); i < types.BloomBitLength; i++ { + res := <-resCh + if res.err != nil { + return res.err + } + res.nodes.Store(batch) + } + return batch.Write() } // Reset implements core.ChainIndexerBackend -func (b *BloomTrieIndexerBackend) Reset(section uint64, lastSectionHead common.Hash) error { +func (b *BloomTrieIndexerBackend) Reset(ctx context.Context, section uint64, lastSectionHead common.Hash) error { var root common.Hash if section > 0 { root = GetBloomTrieRoot(b.diskdb, section-1, lastSectionHead) } var err error b.trie, err = trie.New(root, b.triedb) + if err != nil && b.odr != nil { + err = b.fetchMissingNodes(ctx, section, root) + if err == nil { + b.trie, err = trie.New(root, b.triedb) + } + } b.section = section return err } // Process implements core.ChainIndexerBackend -func (b *BloomTrieIndexerBackend) Process(header *types.Header) { +func (b *BloomTrieIndexerBackend) Process(ctx context.Context, header *types.Header) error { num := header.Number.Uint64() - b.section*BloomTrieFrequency if (num+1)%b.parentSectionSize == 0 { b.sectionHeads[num/b.parentSectionSize] = header.Hash() } + return nil } // Commit implements core.ChainIndexerBackend @@ -300,7 +394,7 @@ func (b *BloomTrieIndexerBackend) Commit() error { b.triedb.Commit(root, false) sectionHead := b.sectionHeads[b.bloomTrieRatio-1] - log.Info("Storing bloom trie", "section", b.section, "head", sectionHead, "root", root, "compression", float64(compSize)/float64(decompSize)) + log.Info("Storing bloom trie", "section", b.section, "head", fmt.Sprintf("%064x", sectionHead), "root", fmt.Sprintf("%064x", root), "compression", float64(compSize)/float64(decompSize)) StoreBloomTrieRoot(b.diskdb, b.section, sectionHead, root) return nil |