diff options
author | Felföldi Zsolt <zsfelfoldi@gmail.com> | 2018-08-16 04:25:46 +0800 |
---|---|---|
committer | Felix Lange <fjl@users.noreply.github.com> | 2018-08-16 04:25:46 +0800 |
commit | 2cdf6ee7e00d6127c372e7a28bb27a80ef495cb2 (patch) | |
tree | 128c3fe2780f604f9eac5d7ff2292994ed2102f6 /light | |
parent | e8752f4e9f9be3d2932cd4835a5d72d17ac2338b (diff) | |
download | dexon-2cdf6ee7e00d6127c372e7a28bb27a80ef495cb2.tar dexon-2cdf6ee7e00d6127c372e7a28bb27a80ef495cb2.tar.gz dexon-2cdf6ee7e00d6127c372e7a28bb27a80ef495cb2.tar.bz2 dexon-2cdf6ee7e00d6127c372e7a28bb27a80ef495cb2.tar.lz dexon-2cdf6ee7e00d6127c372e7a28bb27a80ef495cb2.tar.xz dexon-2cdf6ee7e00d6127c372e7a28bb27a80ef495cb2.tar.zst dexon-2cdf6ee7e00d6127c372e7a28bb27a80ef495cb2.zip |
light: CHT and bloom trie indexers working in light mode (#16534)
This PR enables the indexers to work in light client mode by
downloading a part of these tries (the Merkle proofs of the last
values of the last known section) in order to be able to add new
values and recalculate subsequent hashes. It also adds CHT data to
NodeInfo.
Diffstat (limited to 'light')
-rw-r--r-- | light/lightchain.go | 14 | ||||
-rw-r--r-- | light/odr.go | 4 | ||||
-rw-r--r-- | light/postprocess.go | 170 |
3 files changed, 143 insertions, 45 deletions
diff --git a/light/lightchain.go b/light/lightchain.go index 30b9bd89a..b7e629e88 100644 --- a/light/lightchain.go +++ b/light/lightchain.go @@ -116,19 +116,19 @@ func NewLightChain(odr OdrBackend, config *params.ChainConfig, engine consensus. } // addTrustedCheckpoint adds a trusted checkpoint to the blockchain -func (self *LightChain) addTrustedCheckpoint(cp trustedCheckpoint) { +func (self *LightChain) addTrustedCheckpoint(cp TrustedCheckpoint) { if self.odr.ChtIndexer() != nil { - StoreChtRoot(self.chainDb, cp.sectionIdx, cp.sectionHead, cp.chtRoot) - self.odr.ChtIndexer().AddKnownSectionHead(cp.sectionIdx, cp.sectionHead) + StoreChtRoot(self.chainDb, cp.SectionIdx, cp.SectionHead, cp.CHTRoot) + self.odr.ChtIndexer().AddKnownSectionHead(cp.SectionIdx, cp.SectionHead) } if self.odr.BloomTrieIndexer() != nil { - StoreBloomTrieRoot(self.chainDb, cp.sectionIdx, cp.sectionHead, cp.bloomTrieRoot) - self.odr.BloomTrieIndexer().AddKnownSectionHead(cp.sectionIdx, cp.sectionHead) + StoreBloomTrieRoot(self.chainDb, cp.SectionIdx, cp.SectionHead, cp.BloomRoot) + self.odr.BloomTrieIndexer().AddKnownSectionHead(cp.SectionIdx, cp.SectionHead) } if self.odr.BloomIndexer() != nil { - self.odr.BloomIndexer().AddKnownSectionHead(cp.sectionIdx, cp.sectionHead) + self.odr.BloomIndexer().AddKnownSectionHead(cp.SectionIdx, cp.SectionHead) } - log.Info("Added trusted checkpoint", "chain", cp.name, "block", (cp.sectionIdx+1)*CHTFrequencyClient-1, "hash", cp.sectionHead) + log.Info("Added trusted checkpoint", "chain", cp.name, "block", (cp.SectionIdx+1)*CHTFrequencyClient-1, "hash", cp.SectionHead) } func (self *LightChain) getProcInterrupt() bool { diff --git a/light/odr.go b/light/odr.go index 8f1e50b81..83c64055a 100644 --- a/light/odr.go +++ b/light/odr.go @@ -20,6 +20,7 @@ package light import ( "context" + "errors" "math/big" "github.com/ethereum/go-ethereum/common" @@ -33,6 +34,9 @@ import ( // service is not required. var NoOdr = context.Background() +// ErrNoPeers is returned if no peers capable of serving a queued request are available +var ErrNoPeers = errors.New("no suitable peers available") + // OdrBackend is an interface to a backend service that handles ODR retrievals type type OdrBackend interface { Database() ethdb.Database diff --git a/light/postprocess.go b/light/postprocess.go index 2090a9d04..0b25e1d88 100644 --- a/light/postprocess.go +++ b/light/postprocess.go @@ -17,8 +17,10 @@ package light import ( + "context" "encoding/binary" "errors" + "fmt" "math/big" "time" @@ -47,35 +49,35 @@ const ( HelperTrieProcessConfirmations = 256 // number of confirmations before a HelperTrie is generated ) -// trustedCheckpoint represents a set of post-processed trie roots (CHT and BloomTrie) associated with +// TrustedCheckpoint represents a set of post-processed trie roots (CHT and BloomTrie) associated with // the appropriate section index and head hash. It is used to start light syncing from this checkpoint // and avoid downloading the entire header chain while still being able to securely access old headers/logs. -type trustedCheckpoint struct { - name string - sectionIdx uint64 - sectionHead, chtRoot, bloomTrieRoot common.Hash +type TrustedCheckpoint struct { + name string + SectionIdx uint64 + SectionHead, CHTRoot, BloomRoot common.Hash } var ( - mainnetCheckpoint = trustedCheckpoint{ - name: "mainnet", - sectionIdx: 179, - sectionHead: common.HexToHash("ae778e455492db1183e566fa0c67f954d256fdd08618f6d5a393b0e24576d0ea"), - chtRoot: common.HexToHash("646b338f9ca74d936225338916be53710ec84020b89946004a8605f04c817f16"), - bloomTrieRoot: common.HexToHash("d0f978f5dbc86e5bf931d8dd5b2ecbebbda6dc78f8896af6a27b46a3ced0ac25"), + mainnetCheckpoint = TrustedCheckpoint{ + name: "mainnet", + SectionIdx: 179, + SectionHead: common.HexToHash("ae778e455492db1183e566fa0c67f954d256fdd08618f6d5a393b0e24576d0ea"), + CHTRoot: common.HexToHash("646b338f9ca74d936225338916be53710ec84020b89946004a8605f04c817f16"), + BloomRoot: common.HexToHash("d0f978f5dbc86e5bf931d8dd5b2ecbebbda6dc78f8896af6a27b46a3ced0ac25"), } - ropstenCheckpoint = trustedCheckpoint{ - name: "ropsten", - sectionIdx: 107, - sectionHead: common.HexToHash("e1988f95399debf45b873e065e5cd61b416ef2e2e5deec5a6f87c3127086e1ce"), - chtRoot: common.HexToHash("15cba18e4de0ab1e95e202625199ba30147aec8b0b70384b66ebea31ba6a18e0"), - bloomTrieRoot: common.HexToHash("e00fa6389b2e597d9df52172cd8e936879eed0fca4fa59db99e2c8ed682562f2"), + ropstenCheckpoint = TrustedCheckpoint{ + name: "ropsten", + SectionIdx: 107, + SectionHead: common.HexToHash("e1988f95399debf45b873e065e5cd61b416ef2e2e5deec5a6f87c3127086e1ce"), + CHTRoot: common.HexToHash("15cba18e4de0ab1e95e202625199ba30147aec8b0b70384b66ebea31ba6a18e0"), + BloomRoot: common.HexToHash("e00fa6389b2e597d9df52172cd8e936879eed0fca4fa59db99e2c8ed682562f2"), } ) // trustedCheckpoints associates each known checkpoint with the genesis hash of the chain it belongs to -var trustedCheckpoints = map[common.Hash]trustedCheckpoint{ +var trustedCheckpoints = map[common.Hash]TrustedCheckpoint{ params.MainnetGenesisHash: mainnetCheckpoint, params.TestnetGenesisHash: ropstenCheckpoint, } @@ -119,7 +121,8 @@ func StoreChtRoot(db ethdb.Database, sectionIdx uint64, sectionHead, root common // ChtIndexerBackend implements core.ChainIndexerBackend type ChtIndexerBackend struct { - diskdb ethdb.Database + diskdb, trieTable ethdb.Database + odr OdrBackend triedb *trie.Database section, sectionSize uint64 lastHash common.Hash @@ -127,7 +130,7 @@ type ChtIndexerBackend struct { } // NewBloomTrieIndexer creates a BloomTrie chain indexer -func NewChtIndexer(db ethdb.Database, clientMode bool) *core.ChainIndexer { +func NewChtIndexer(db ethdb.Database, clientMode bool, odr OdrBackend) *core.ChainIndexer { var sectionSize, confirmReq uint64 if clientMode { sectionSize = CHTFrequencyClient @@ -137,28 +140,64 @@ func NewChtIndexer(db ethdb.Database, clientMode bool) *core.ChainIndexer { confirmReq = HelperTrieProcessConfirmations } idb := ethdb.NewTable(db, "chtIndex-") + trieTable := ethdb.NewTable(db, ChtTablePrefix) backend := &ChtIndexerBackend{ diskdb: db, - triedb: trie.NewDatabase(ethdb.NewTable(db, ChtTablePrefix)), + odr: odr, + trieTable: trieTable, + triedb: trie.NewDatabase(trieTable), sectionSize: sectionSize, } return core.NewChainIndexer(db, idb, backend, sectionSize, confirmReq, time.Millisecond*100, "cht") } +// fetchMissingNodes tries to retrieve the last entry of the latest trusted CHT from the +// ODR backend in order to be able to add new entries and calculate subsequent root hashes +func (c *ChtIndexerBackend) fetchMissingNodes(ctx context.Context, section uint64, root common.Hash) error { + batch := c.trieTable.NewBatch() + r := &ChtRequest{ChtRoot: root, ChtNum: section - 1, BlockNum: section*c.sectionSize - 1} + for { + err := c.odr.Retrieve(ctx, r) + switch err { + case nil: + r.Proof.Store(batch) + return batch.Write() + case ErrNoPeers: + // if there are no peers to serve, retry later + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(time.Second * 10): + // stay in the loop and try again + } + default: + return err + } + } +} + // Reset implements core.ChainIndexerBackend -func (c *ChtIndexerBackend) Reset(section uint64, lastSectionHead common.Hash) error { +func (c *ChtIndexerBackend) Reset(ctx context.Context, section uint64, lastSectionHead common.Hash) error { var root common.Hash if section > 0 { root = GetChtRoot(c.diskdb, section-1, lastSectionHead) } var err error c.trie, err = trie.New(root, c.triedb) + + if err != nil && c.odr != nil { + err = c.fetchMissingNodes(ctx, section, root) + if err == nil { + c.trie, err = trie.New(root, c.triedb) + } + } + c.section = section return err } // Process implements core.ChainIndexerBackend -func (c *ChtIndexerBackend) Process(header *types.Header) { +func (c *ChtIndexerBackend) Process(ctx context.Context, header *types.Header) error { hash, num := header.Hash(), header.Number.Uint64() c.lastHash = hash @@ -170,6 +209,7 @@ func (c *ChtIndexerBackend) Process(header *types.Header) { binary.BigEndian.PutUint64(encNumber[:], num) data, _ := rlp.EncodeToBytes(ChtNode{hash, td}) c.trie.Update(encNumber[:], data) + return nil } // Commit implements core.ChainIndexerBackend @@ -181,16 +221,15 @@ func (c *ChtIndexerBackend) Commit() error { c.triedb.Commit(root, false) if ((c.section+1)*c.sectionSize)%CHTFrequencyClient == 0 { - log.Info("Storing CHT", "section", c.section*c.sectionSize/CHTFrequencyClient, "head", c.lastHash, "root", root) + log.Info("Storing CHT", "section", c.section*c.sectionSize/CHTFrequencyClient, "head", fmt.Sprintf("%064x", c.lastHash), "root", fmt.Sprintf("%064x", root)) } StoreChtRoot(c.diskdb, c.section, c.lastHash, root) return nil } const ( - BloomTrieFrequency = 32768 - ethBloomBitsSection = 4096 - ethBloomBitsConfirmations = 256 + BloomTrieFrequency = 32768 + ethBloomBitsSection = 4096 ) var ( @@ -215,7 +254,8 @@ func StoreBloomTrieRoot(db ethdb.Database, sectionIdx uint64, sectionHead, root // BloomTrieIndexerBackend implements core.ChainIndexerBackend type BloomTrieIndexerBackend struct { - diskdb ethdb.Database + diskdb, trieTable ethdb.Database + odr OdrBackend triedb *trie.Database section, parentSectionSize, bloomTrieRatio uint64 trie *trie.Trie @@ -223,44 +263,98 @@ type BloomTrieIndexerBackend struct { } // NewBloomTrieIndexer creates a BloomTrie chain indexer -func NewBloomTrieIndexer(db ethdb.Database, clientMode bool) *core.ChainIndexer { +func NewBloomTrieIndexer(db ethdb.Database, clientMode bool, odr OdrBackend) *core.ChainIndexer { + trieTable := ethdb.NewTable(db, BloomTrieTablePrefix) backend := &BloomTrieIndexerBackend{ - diskdb: db, - triedb: trie.NewDatabase(ethdb.NewTable(db, BloomTrieTablePrefix)), + diskdb: db, + odr: odr, + trieTable: trieTable, + triedb: trie.NewDatabase(trieTable), } idb := ethdb.NewTable(db, "bltIndex-") - var confirmReq uint64 if clientMode { backend.parentSectionSize = BloomTrieFrequency - confirmReq = HelperTrieConfirmations } else { backend.parentSectionSize = ethBloomBitsSection - confirmReq = HelperTrieProcessConfirmations } backend.bloomTrieRatio = BloomTrieFrequency / backend.parentSectionSize backend.sectionHeads = make([]common.Hash, backend.bloomTrieRatio) - return core.NewChainIndexer(db, idb, backend, BloomTrieFrequency, confirmReq-ethBloomBitsConfirmations, time.Millisecond*100, "bloomtrie") + return core.NewChainIndexer(db, idb, backend, BloomTrieFrequency, 0, time.Millisecond*100, "bloomtrie") +} + +// fetchMissingNodes tries to retrieve the last entries of the latest trusted bloom trie from the +// ODR backend in order to be able to add new entries and calculate subsequent root hashes +func (b *BloomTrieIndexerBackend) fetchMissingNodes(ctx context.Context, section uint64, root common.Hash) error { + indexCh := make(chan uint, types.BloomBitLength) + type res struct { + nodes *NodeSet + err error + } + resCh := make(chan res, types.BloomBitLength) + for i := 0; i < 20; i++ { + go func() { + for bitIndex := range indexCh { + r := &BloomRequest{BloomTrieRoot: root, BloomTrieNum: section - 1, BitIdx: bitIndex, SectionIdxList: []uint64{section - 1}} + for { + if err := b.odr.Retrieve(ctx, r); err == ErrNoPeers { + // if there are no peers to serve, retry later + select { + case <-ctx.Done(): + resCh <- res{nil, ctx.Err()} + return + case <-time.After(time.Second * 10): + // stay in the loop and try again + } + } else { + resCh <- res{r.Proofs, err} + break + } + } + } + }() + } + + for i := uint(0); i < types.BloomBitLength; i++ { + indexCh <- i + } + close(indexCh) + batch := b.trieTable.NewBatch() + for i := uint(0); i < types.BloomBitLength; i++ { + res := <-resCh + if res.err != nil { + return res.err + } + res.nodes.Store(batch) + } + return batch.Write() } // Reset implements core.ChainIndexerBackend -func (b *BloomTrieIndexerBackend) Reset(section uint64, lastSectionHead common.Hash) error { +func (b *BloomTrieIndexerBackend) Reset(ctx context.Context, section uint64, lastSectionHead common.Hash) error { var root common.Hash if section > 0 { root = GetBloomTrieRoot(b.diskdb, section-1, lastSectionHead) } var err error b.trie, err = trie.New(root, b.triedb) + if err != nil && b.odr != nil { + err = b.fetchMissingNodes(ctx, section, root) + if err == nil { + b.trie, err = trie.New(root, b.triedb) + } + } b.section = section return err } // Process implements core.ChainIndexerBackend -func (b *BloomTrieIndexerBackend) Process(header *types.Header) { +func (b *BloomTrieIndexerBackend) Process(ctx context.Context, header *types.Header) error { num := header.Number.Uint64() - b.section*BloomTrieFrequency if (num+1)%b.parentSectionSize == 0 { b.sectionHeads[num/b.parentSectionSize] = header.Hash() } + return nil } // Commit implements core.ChainIndexerBackend @@ -300,7 +394,7 @@ func (b *BloomTrieIndexerBackend) Commit() error { b.triedb.Commit(root, false) sectionHead := b.sectionHeads[b.bloomTrieRatio-1] - log.Info("Storing bloom trie", "section", b.section, "head", sectionHead, "root", root, "compression", float64(compSize)/float64(decompSize)) + log.Info("Storing bloom trie", "section", b.section, "head", fmt.Sprintf("%064x", sectionHead), "root", fmt.Sprintf("%064x", root), "compression", float64(compSize)/float64(decompSize)) StoreBloomTrieRoot(b.diskdb, b.section, sectionHead, root) return nil |