aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--core/chain_indexer.go30
-rw-r--r--core/chain_indexer_test.go6
-rw-r--r--eth/backend.go2
-rw-r--r--eth/bloombits.go23
-rw-r--r--les/backend.go39
-rw-r--r--les/distributor.go4
-rw-r--r--les/handler.go34
-rw-r--r--les/helper_test.go6
-rw-r--r--les/odr.go18
-rw-r--r--les/odr_test.go3
-rw-r--r--les/request_test.go3
-rw-r--r--les/retrieve.go3
-rw-r--r--les/server.go4
-rw-r--r--light/lightchain.go14
-rw-r--r--light/odr.go4
-rw-r--r--light/postprocess.go170
16 files changed, 251 insertions, 112 deletions
diff --git a/core/chain_indexer.go b/core/chain_indexer.go
index 0b927116d..11a7c96fa 100644
--- a/core/chain_indexer.go
+++ b/core/chain_indexer.go
@@ -17,6 +17,7 @@
package core
import (
+ "context"
"encoding/binary"
"fmt"
"sync"
@@ -37,11 +38,11 @@ import (
type ChainIndexerBackend interface {
// Reset initiates the processing of a new chain segment, potentially terminating
// any partially completed operations (in case of a reorg).
- Reset(section uint64, prevHead common.Hash) error
+ Reset(ctx context.Context, section uint64, prevHead common.Hash) error
// Process crunches through the next header in the chain segment. The caller
// will ensure a sequential order of headers.
- Process(header *types.Header)
+ Process(ctx context.Context, header *types.Header) error
// Commit finalizes the section metadata and stores it into the database.
Commit() error
@@ -71,9 +72,11 @@ type ChainIndexer struct {
backend ChainIndexerBackend // Background processor generating the index data content
children []*ChainIndexer // Child indexers to cascade chain updates to
- active uint32 // Flag whether the event loop was started
- update chan struct{} // Notification channel that headers should be processed
- quit chan chan error // Quit channel to tear down running goroutines
+ active uint32 // Flag whether the event loop was started
+ update chan struct{} // Notification channel that headers should be processed
+ quit chan chan error // Quit channel to tear down running goroutines
+ ctx context.Context
+ ctxCancel func()
sectionSize uint64 // Number of blocks in a single chain segment to process
confirmsReq uint64 // Number of confirmations before processing a completed segment
@@ -105,6 +108,8 @@ func NewChainIndexer(chainDb, indexDb ethdb.Database, backend ChainIndexerBacken
}
// Initialize database dependent fields and start the updater
c.loadValidSections()
+ c.ctx, c.ctxCancel = context.WithCancel(context.Background())
+
go c.updateLoop()
return c
@@ -138,6 +143,8 @@ func (c *ChainIndexer) Start(chain ChainIndexerChain) {
func (c *ChainIndexer) Close() error {
var errs []error
+ c.ctxCancel()
+
// Tear down the primary update loop
errc := make(chan error)
c.quit <- errc
@@ -297,6 +304,12 @@ func (c *ChainIndexer) updateLoop() {
c.lock.Unlock()
newHead, err := c.processSection(section, oldHead)
if err != nil {
+ select {
+ case <-c.ctx.Done():
+ <-c.quit <- nil
+ return
+ default:
+ }
c.log.Error("Section processing failed", "error", err)
}
c.lock.Lock()
@@ -344,7 +357,7 @@ func (c *ChainIndexer) processSection(section uint64, lastHead common.Hash) (com
// Reset and partial processing
- if err := c.backend.Reset(section, lastHead); err != nil {
+ if err := c.backend.Reset(c.ctx, section, lastHead); err != nil {
c.setValidSections(0)
return common.Hash{}, err
}
@@ -360,11 +373,12 @@ func (c *ChainIndexer) processSection(section uint64, lastHead common.Hash) (com
} else if header.ParentHash != lastHead {
return common.Hash{}, fmt.Errorf("chain reorged during section processing")
}
- c.backend.Process(header)
+ if err := c.backend.Process(c.ctx, header); err != nil {
+ return common.Hash{}, err
+ }
lastHead = header.Hash()
}
if err := c.backend.Commit(); err != nil {
- c.log.Error("Section commit failed", "error", err)
return common.Hash{}, err
}
return lastHead, nil
diff --git a/core/chain_indexer_test.go b/core/chain_indexer_test.go
index 550caf556..a029dec62 100644
--- a/core/chain_indexer_test.go
+++ b/core/chain_indexer_test.go
@@ -17,6 +17,7 @@
package core
import (
+ "context"
"fmt"
"math/big"
"math/rand"
@@ -210,13 +211,13 @@ func (b *testChainIndexBackend) reorg(headNum uint64) uint64 {
return b.stored * b.indexer.sectionSize
}
-func (b *testChainIndexBackend) Reset(section uint64, prevHead common.Hash) error {
+func (b *testChainIndexBackend) Reset(ctx context.Context, section uint64, prevHead common.Hash) error {
b.section = section
b.headerCnt = 0
return nil
}
-func (b *testChainIndexBackend) Process(header *types.Header) {
+func (b *testChainIndexBackend) Process(ctx context.Context, header *types.Header) error {
b.headerCnt++
if b.headerCnt > b.indexer.sectionSize {
b.t.Error("Processing too many headers")
@@ -227,6 +228,7 @@ func (b *testChainIndexBackend) Process(header *types.Header) {
b.t.Fatal("Unexpected call to Process")
case b.processCh <- header.Number.Uint64():
}
+ return nil
}
func (b *testChainIndexBackend) Commit() error {
diff --git a/eth/backend.go b/eth/backend.go
index 865534b19..6549cb8a3 100644
--- a/eth/backend.go
+++ b/eth/backend.go
@@ -130,7 +130,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
gasPrice: config.GasPrice,
etherbase: config.Etherbase,
bloomRequests: make(chan chan *bloombits.Retrieval),
- bloomIndexer: NewBloomIndexer(chainDb, params.BloomBitsBlocks),
+ bloomIndexer: NewBloomIndexer(chainDb, params.BloomBitsBlocks, bloomConfirms),
}
log.Info("Initialising Ethereum protocol", "versions", ProtocolVersions, "network", config.NetworkId)
diff --git a/eth/bloombits.go b/eth/bloombits.go
index 954239d14..eb18565e2 100644
--- a/eth/bloombits.go
+++ b/eth/bloombits.go
@@ -17,6 +17,7 @@
package eth
import (
+ "context"
"time"
"github.com/ethereum/go-ethereum/common"
@@ -92,30 +93,28 @@ const (
// BloomIndexer implements a core.ChainIndexer, building up a rotated bloom bits index
// for the Ethereum header bloom filters, permitting blazing fast filtering.
type BloomIndexer struct {
- size uint64 // section size to generate bloombits for
-
- db ethdb.Database // database instance to write index data and metadata into
- gen *bloombits.Generator // generator to rotate the bloom bits crating the bloom index
-
- section uint64 // Section is the section number being processed currently
- head common.Hash // Head is the hash of the last header processed
+ size uint64 // section size to generate bloombits for
+ db ethdb.Database // database instance to write index data and metadata into
+ gen *bloombits.Generator // generator to rotate the bloom bits crating the bloom index
+ section uint64 // Section is the section number being processed currently
+ head common.Hash // Head is the hash of the last header processed
}
// NewBloomIndexer returns a chain indexer that generates bloom bits data for the
// canonical chain for fast logs filtering.
-func NewBloomIndexer(db ethdb.Database, size uint64) *core.ChainIndexer {
+func NewBloomIndexer(db ethdb.Database, size, confReq uint64) *core.ChainIndexer {
backend := &BloomIndexer{
db: db,
size: size,
}
table := ethdb.NewTable(db, string(rawdb.BloomBitsIndexPrefix))
- return core.NewChainIndexer(db, table, backend, size, bloomConfirms, bloomThrottling, "bloombits")
+ return core.NewChainIndexer(db, table, backend, size, confReq, bloomThrottling, "bloombits")
}
// Reset implements core.ChainIndexerBackend, starting a new bloombits index
// section.
-func (b *BloomIndexer) Reset(section uint64, lastSectionHead common.Hash) error {
+func (b *BloomIndexer) Reset(ctx context.Context, section uint64, lastSectionHead common.Hash) error {
gen, err := bloombits.NewGenerator(uint(b.size))
b.gen, b.section, b.head = gen, section, common.Hash{}
return err
@@ -123,16 +122,16 @@ func (b *BloomIndexer) Reset(section uint64, lastSectionHead common.Hash) error
// Process implements core.ChainIndexerBackend, adding a new header's bloom into
// the index.
-func (b *BloomIndexer) Process(header *types.Header) {
+func (b *BloomIndexer) Process(ctx context.Context, header *types.Header) error {
b.gen.AddBloom(uint(header.Number.Uint64()-b.section*b.size), header.Bloom)
b.head = header.Hash()
+ return nil
}
// Commit implements core.ChainIndexerBackend, finalizing the bloom section and
// writing it out into the database.
func (b *BloomIndexer) Commit() error {
batch := b.db.NewBatch()
-
for i := 0; i < types.BloomBitLength; i++ {
bits, err := b.gen.Bitset(uint(i))
if err != nil {
diff --git a/les/backend.go b/les/backend.go
index 178bc1e0e..9b8cc1828 100644
--- a/les/backend.go
+++ b/les/backend.go
@@ -95,29 +95,35 @@ func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) {
quitSync := make(chan struct{})
leth := &LightEthereum{
- config: config,
- chainConfig: chainConfig,
- chainDb: chainDb,
- eventMux: ctx.EventMux,
- peers: peers,
- reqDist: newRequestDistributor(peers, quitSync),
- accountManager: ctx.AccountManager,
- engine: eth.CreateConsensusEngine(ctx, chainConfig, &config.Ethash, nil, chainDb),
- shutdownChan: make(chan bool),
- networkId: config.NetworkId,
- bloomRequests: make(chan chan *bloombits.Retrieval),
- bloomIndexer: eth.NewBloomIndexer(chainDb, light.BloomTrieFrequency),
- chtIndexer: light.NewChtIndexer(chainDb, true),
- bloomTrieIndexer: light.NewBloomTrieIndexer(chainDb, true),
+ config: config,
+ chainConfig: chainConfig,
+ chainDb: chainDb,
+ eventMux: ctx.EventMux,
+ peers: peers,
+ reqDist: newRequestDistributor(peers, quitSync),
+ accountManager: ctx.AccountManager,
+ engine: eth.CreateConsensusEngine(ctx, chainConfig, &config.Ethash, nil, chainDb),
+ shutdownChan: make(chan bool),
+ networkId: config.NetworkId,
+ bloomRequests: make(chan chan *bloombits.Retrieval),
+ bloomIndexer: eth.NewBloomIndexer(chainDb, light.BloomTrieFrequency, light.HelperTrieConfirmations),
}
leth.relay = NewLesTxRelay(peers, leth.reqDist)
leth.serverPool = newServerPool(chainDb, quitSync, &leth.wg)
leth.retriever = newRetrieveManager(peers, leth.reqDist, leth.serverPool)
- leth.odr = NewLesOdr(chainDb, leth.chtIndexer, leth.bloomTrieIndexer, leth.bloomIndexer, leth.retriever)
+ leth.odr = NewLesOdr(chainDb, leth.retriever)
+ leth.chtIndexer = light.NewChtIndexer(chainDb, true, leth.odr)
+ leth.bloomTrieIndexer = light.NewBloomTrieIndexer(chainDb, true, leth.odr)
+ leth.odr.SetIndexers(leth.chtIndexer, leth.bloomTrieIndexer, leth.bloomIndexer)
+ // Note: NewLightChain adds the trusted checkpoint so it needs an ODR with
+ // indexers already set but not started yet
if leth.blockchain, err = light.NewLightChain(leth.odr, leth.chainConfig, leth.engine); err != nil {
return nil, err
}
+ // Note: AddChildIndexer starts the update process for the child
+ leth.bloomIndexer.AddChildIndexer(leth.bloomTrieIndexer)
+ leth.chtIndexer.Start(leth.blockchain)
leth.bloomIndexer.Start(leth.blockchain)
// Rewind the chain in case of an incompatible config upgrade.
if compat, ok := genesisErr.(*params.ConfigCompatError); ok {
@@ -242,9 +248,6 @@ func (s *LightEthereum) Stop() error {
if s.chtIndexer != nil {
s.chtIndexer.Close()
}
- if s.bloomTrieIndexer != nil {
- s.bloomTrieIndexer.Close()
- }
s.blockchain.Stop()
s.protocolManager.Stop()
s.txPool.Stop()
diff --git a/les/distributor.go b/les/distributor.go
index 159fa4c73..d3f6b21d1 100644
--- a/les/distributor.go
+++ b/les/distributor.go
@@ -20,14 +20,10 @@ package les
import (
"container/list"
- "errors"
"sync"
"time"
)
-// ErrNoPeers is returned if no peers capable of serving a queued request are available
-var ErrNoPeers = errors.New("no suitable peers available")
-
// requestDistributor implements a mechanism that distributes requests to
// suitable peers, obeying flow control rules and prioritizing them in creation
// order (even when a resend is necessary).
diff --git a/les/handler.go b/les/handler.go
index 91a235bf0..ccb4a8844 100644
--- a/les/handler.go
+++ b/les/handler.go
@@ -1206,11 +1206,12 @@ func (pm *ProtocolManager) txStatus(hashes []common.Hash) []txStatus {
// NodeInfo represents a short summary of the Ethereum sub-protocol metadata
// known about the host peer.
type NodeInfo struct {
- Network uint64 `json:"network"` // Ethereum network ID (1=Frontier, 2=Morden, Ropsten=3, Rinkeby=4)
- Difficulty *big.Int `json:"difficulty"` // Total difficulty of the host's blockchain
- Genesis common.Hash `json:"genesis"` // SHA3 hash of the host's genesis block
- Config *params.ChainConfig `json:"config"` // Chain configuration for the fork rules
- Head common.Hash `json:"head"` // SHA3 hash of the host's best owned block
+ Network uint64 `json:"network"` // Ethereum network ID (1=Frontier, 2=Morden, Ropsten=3, Rinkeby=4)
+ Difficulty *big.Int `json:"difficulty"` // Total difficulty of the host's blockchain
+ Genesis common.Hash `json:"genesis"` // SHA3 hash of the host's genesis block
+ Config *params.ChainConfig `json:"config"` // Chain configuration for the fork rules
+ Head common.Hash `json:"head"` // SHA3 hash of the host's best owned block
+ CHT light.TrustedCheckpoint `json:"cht"` // Trused CHT checkpoint for fast catchup
}
// NodeInfo retrieves some protocol metadata about the running host node.
@@ -1218,12 +1219,31 @@ func (self *ProtocolManager) NodeInfo() *NodeInfo {
head := self.blockchain.CurrentHeader()
hash := head.Hash()
+ var cht light.TrustedCheckpoint
+
+ sections, _, sectionHead := self.odr.ChtIndexer().Sections()
+ sections2, _, sectionHead2 := self.odr.BloomTrieIndexer().Sections()
+ if sections2 < sections {
+ sections = sections2
+ sectionHead = sectionHead2
+ }
+ if sections > 0 {
+ sectionIndex := sections - 1
+ cht = light.TrustedCheckpoint{
+ SectionIdx: sectionIndex,
+ SectionHead: sectionHead,
+ CHTRoot: light.GetChtRoot(self.chainDb, sectionIndex, sectionHead),
+ BloomRoot: light.GetBloomTrieRoot(self.chainDb, sectionIndex, sectionHead),
+ }
+ }
+
return &NodeInfo{
Network: self.networkId,
Difficulty: self.blockchain.GetTd(hash, head.Number.Uint64()),
Genesis: self.blockchain.Genesis().Hash(),
Config: self.blockchain.Config(),
Head: hash,
+ CHT: cht,
}
}
@@ -1258,7 +1278,7 @@ func (pc *peerConnection) RequestHeadersByHash(origin common.Hash, amount int, s
}
_, ok := <-pc.manager.reqDist.queue(rq)
if !ok {
- return ErrNoPeers
+ return light.ErrNoPeers
}
return nil
}
@@ -1282,7 +1302,7 @@ func (pc *peerConnection) RequestHeadersByNumber(origin uint64, amount int, skip
}
_, ok := <-pc.manager.reqDist.queue(rq)
if !ok {
- return ErrNoPeers
+ return light.ErrNoPeers
}
return nil
}
diff --git a/les/helper_test.go b/les/helper_test.go
index 8fd01a39e..50c97e06e 100644
--- a/les/helper_test.go
+++ b/les/helper_test.go
@@ -156,12 +156,12 @@ func newTestProtocolManager(lightSync bool, blocks int, generator func(int, *cor
} else {
blockchain, _ := core.NewBlockChain(db, nil, gspec.Config, engine, vm.Config{})
- chtIndexer := light.NewChtIndexer(db, false)
+ chtIndexer := light.NewChtIndexer(db, false, nil)
chtIndexer.Start(blockchain)
- bbtIndexer := light.NewBloomTrieIndexer(db, false)
+ bbtIndexer := light.NewBloomTrieIndexer(db, false, nil)
- bloomIndexer := eth.NewBloomIndexer(db, params.BloomBitsBlocks)
+ bloomIndexer := eth.NewBloomIndexer(db, params.BloomBitsBlocks, light.HelperTrieProcessConfirmations)
bloomIndexer.AddChildIndexer(bbtIndexer)
bloomIndexer.Start(blockchain)
diff --git a/les/odr.go b/les/odr.go
index f8412aaad..2ad28d5d9 100644
--- a/les/odr.go
+++ b/les/odr.go
@@ -33,14 +33,11 @@ type LesOdr struct {
stop chan struct{}
}
-func NewLesOdr(db ethdb.Database, chtIndexer, bloomTrieIndexer, bloomIndexer *core.ChainIndexer, retriever *retrieveManager) *LesOdr {
+func NewLesOdr(db ethdb.Database, retriever *retrieveManager) *LesOdr {
return &LesOdr{
- db: db,
- chtIndexer: chtIndexer,
- bloomTrieIndexer: bloomTrieIndexer,
- bloomIndexer: bloomIndexer,
- retriever: retriever,
- stop: make(chan struct{}),
+ db: db,
+ retriever: retriever,
+ stop: make(chan struct{}),
}
}
@@ -54,6 +51,13 @@ func (odr *LesOdr) Database() ethdb.Database {
return odr.db
}
+// SetIndexers adds the necessary chain indexers to the ODR backend
+func (odr *LesOdr) SetIndexers(chtIndexer, bloomTrieIndexer, bloomIndexer *core.ChainIndexer) {
+ odr.chtIndexer = chtIndexer
+ odr.bloomTrieIndexer = bloomTrieIndexer
+ odr.bloomIndexer = bloomIndexer
+}
+
// ChtIndexer returns the CHT chain indexer
func (odr *LesOdr) ChtIndexer() *core.ChainIndexer {
return odr.chtIndexer
diff --git a/les/odr_test.go b/les/odr_test.go
index 983f7262b..c7c25cbe4 100644
--- a/les/odr_test.go
+++ b/les/odr_test.go
@@ -167,7 +167,8 @@ func testOdr(t *testing.T, protocol int, expFail uint64, fn odrTestFn) {
rm := newRetrieveManager(peers, dist, nil)
db := ethdb.NewMemDatabase()
ldb := ethdb.NewMemDatabase()
- odr := NewLesOdr(ldb, light.NewChtIndexer(db, true), light.NewBloomTrieIndexer(db, true), eth.NewBloomIndexer(db, light.BloomTrieFrequency), rm)
+ odr := NewLesOdr(ldb, rm)
+ odr.SetIndexers(light.NewChtIndexer(db, true, nil), light.NewBloomTrieIndexer(db, true, nil), eth.NewBloomIndexer(db, light.BloomTrieFrequency, light.HelperTrieConfirmations))
pm := newTestProtocolManagerMust(t, false, 4, testChainGen, nil, nil, db)
lpm := newTestProtocolManagerMust(t, true, 0, nil, peers, odr, ldb)
_, err1, lpeer, err2 := newTestPeerPair("peer", protocol, pm, lpm)
diff --git a/les/request_test.go b/les/request_test.go
index ba2f603d8..db576798b 100644
--- a/les/request_test.go
+++ b/les/request_test.go
@@ -89,7 +89,8 @@ func testAccess(t *testing.T, protocol int, fn accessTestFn) {
rm := newRetrieveManager(peers, dist, nil)
db := ethdb.NewMemDatabase()
ldb := ethdb.NewMemDatabase()
- odr := NewLesOdr(ldb, light.NewChtIndexer(db, true), light.NewBloomTrieIndexer(db, true), eth.NewBloomIndexer(db, light.BloomTrieFrequency), rm)
+ odr := NewLesOdr(ldb, rm)
+ odr.SetIndexers(light.NewChtIndexer(db, true, nil), light.NewBloomTrieIndexer(db, true, nil), eth.NewBloomIndexer(db, light.BloomTrieFrequency, light.HelperTrieConfirmations))
pm := newTestProtocolManagerMust(t, false, 4, testChainGen, nil, nil, db)
lpm := newTestProtocolManagerMust(t, true, 0, nil, peers, odr, ldb)
diff --git a/les/retrieve.go b/les/retrieve.go
index a9037a38e..8ae36d82c 100644
--- a/les/retrieve.go
+++ b/les/retrieve.go
@@ -27,6 +27,7 @@ import (
"time"
"github.com/ethereum/go-ethereum/common/mclock"
+ "github.com/ethereum/go-ethereum/light"
)
var (
@@ -207,7 +208,7 @@ func (r *sentReq) stateRequesting() reqStateFn {
return r.stateNoMorePeers
}
// nothing to wait for, no more peers to ask, return with error
- r.stop(ErrNoPeers)
+ r.stop(light.ErrNoPeers)
// no need to go to stopped state because waiting() already returned false
return nil
}
diff --git a/les/server.go b/les/server.go
index fca6124c9..a934fbf26 100644
--- a/les/server.go
+++ b/les/server.go
@@ -67,8 +67,8 @@ func NewLesServer(eth *eth.Ethereum, config *eth.Config) (*LesServer, error) {
protocolManager: pm,
quitSync: quitSync,
lesTopics: lesTopics,
- chtIndexer: light.NewChtIndexer(eth.ChainDb(), false),
- bloomTrieIndexer: light.NewBloomTrieIndexer(eth.ChainDb(), false),
+ chtIndexer: light.NewChtIndexer(eth.ChainDb(), false, nil),
+ bloomTrieIndexer: light.NewBloomTrieIndexer(eth.ChainDb(), false, nil),
}
logger := log.New()
diff --git a/light/lightchain.go b/light/lightchain.go
index 30b9bd89a..b7e629e88 100644
--- a/light/lightchain.go
+++ b/light/lightchain.go
@@ -116,19 +116,19 @@ func NewLightChain(odr OdrBackend, config *params.ChainConfig, engine consensus.
}
// addTrustedCheckpoint adds a trusted checkpoint to the blockchain
-func (self *LightChain) addTrustedCheckpoint(cp trustedCheckpoint) {
+func (self *LightChain) addTrustedCheckpoint(cp TrustedCheckpoint) {
if self.odr.ChtIndexer() != nil {
- StoreChtRoot(self.chainDb, cp.sectionIdx, cp.sectionHead, cp.chtRoot)
- self.odr.ChtIndexer().AddKnownSectionHead(cp.sectionIdx, cp.sectionHead)
+ StoreChtRoot(self.chainDb, cp.SectionIdx, cp.SectionHead, cp.CHTRoot)
+ self.odr.ChtIndexer().AddKnownSectionHead(cp.SectionIdx, cp.SectionHead)
}
if self.odr.BloomTrieIndexer() != nil {
- StoreBloomTrieRoot(self.chainDb, cp.sectionIdx, cp.sectionHead, cp.bloomTrieRoot)
- self.odr.BloomTrieIndexer().AddKnownSectionHead(cp.sectionIdx, cp.sectionHead)
+ StoreBloomTrieRoot(self.chainDb, cp.SectionIdx, cp.SectionHead, cp.BloomRoot)
+ self.odr.BloomTrieIndexer().AddKnownSectionHead(cp.SectionIdx, cp.SectionHead)
}
if self.odr.BloomIndexer() != nil {
- self.odr.BloomIndexer().AddKnownSectionHead(cp.sectionIdx, cp.sectionHead)
+ self.odr.BloomIndexer().AddKnownSectionHead(cp.SectionIdx, cp.SectionHead)
}
- log.Info("Added trusted checkpoint", "chain", cp.name, "block", (cp.sectionIdx+1)*CHTFrequencyClient-1, "hash", cp.sectionHead)
+ log.Info("Added trusted checkpoint", "chain", cp.name, "block", (cp.SectionIdx+1)*CHTFrequencyClient-1, "hash", cp.SectionHead)
}
func (self *LightChain) getProcInterrupt() bool {
diff --git a/light/odr.go b/light/odr.go
index 8f1e50b81..83c64055a 100644
--- a/light/odr.go
+++ b/light/odr.go
@@ -20,6 +20,7 @@ package light
import (
"context"
+ "errors"
"math/big"
"github.com/ethereum/go-ethereum/common"
@@ -33,6 +34,9 @@ import (
// service is not required.
var NoOdr = context.Background()
+// ErrNoPeers is returned if no peers capable of serving a queued request are available
+var ErrNoPeers = errors.New("no suitable peers available")
+
// OdrBackend is an interface to a backend service that handles ODR retrievals type
type OdrBackend interface {
Database() ethdb.Database
diff --git a/light/postprocess.go b/light/postprocess.go
index 2090a9d04..0b25e1d88 100644
--- a/light/postprocess.go
+++ b/light/postprocess.go
@@ -17,8 +17,10 @@
package light
import (
+ "context"
"encoding/binary"
"errors"
+ "fmt"
"math/big"
"time"
@@ -47,35 +49,35 @@ const (
HelperTrieProcessConfirmations = 256 // number of confirmations before a HelperTrie is generated
)
-// trustedCheckpoint represents a set of post-processed trie roots (CHT and BloomTrie) associated with
+// TrustedCheckpoint represents a set of post-processed trie roots (CHT and BloomTrie) associated with
// the appropriate section index and head hash. It is used to start light syncing from this checkpoint
// and avoid downloading the entire header chain while still being able to securely access old headers/logs.
-type trustedCheckpoint struct {
- name string
- sectionIdx uint64
- sectionHead, chtRoot, bloomTrieRoot common.Hash
+type TrustedCheckpoint struct {
+ name string
+ SectionIdx uint64
+ SectionHead, CHTRoot, BloomRoot common.Hash
}
var (
- mainnetCheckpoint = trustedCheckpoint{
- name: "mainnet",
- sectionIdx: 179,
- sectionHead: common.HexToHash("ae778e455492db1183e566fa0c67f954d256fdd08618f6d5a393b0e24576d0ea"),
- chtRoot: common.HexToHash("646b338f9ca74d936225338916be53710ec84020b89946004a8605f04c817f16"),
- bloomTrieRoot: common.HexToHash("d0f978f5dbc86e5bf931d8dd5b2ecbebbda6dc78f8896af6a27b46a3ced0ac25"),
+ mainnetCheckpoint = TrustedCheckpoint{
+ name: "mainnet",
+ SectionIdx: 179,
+ SectionHead: common.HexToHash("ae778e455492db1183e566fa0c67f954d256fdd08618f6d5a393b0e24576d0ea"),
+ CHTRoot: common.HexToHash("646b338f9ca74d936225338916be53710ec84020b89946004a8605f04c817f16"),
+ BloomRoot: common.HexToHash("d0f978f5dbc86e5bf931d8dd5b2ecbebbda6dc78f8896af6a27b46a3ced0ac25"),
}
- ropstenCheckpoint = trustedCheckpoint{
- name: "ropsten",
- sectionIdx: 107,
- sectionHead: common.HexToHash("e1988f95399debf45b873e065e5cd61b416ef2e2e5deec5a6f87c3127086e1ce"),
- chtRoot: common.HexToHash("15cba18e4de0ab1e95e202625199ba30147aec8b0b70384b66ebea31ba6a18e0"),
- bloomTrieRoot: common.HexToHash("e00fa6389b2e597d9df52172cd8e936879eed0fca4fa59db99e2c8ed682562f2"),
+ ropstenCheckpoint = TrustedCheckpoint{
+ name: "ropsten",
+ SectionIdx: 107,
+ SectionHead: common.HexToHash("e1988f95399debf45b873e065e5cd61b416ef2e2e5deec5a6f87c3127086e1ce"),
+ CHTRoot: common.HexToHash("15cba18e4de0ab1e95e202625199ba30147aec8b0b70384b66ebea31ba6a18e0"),
+ BloomRoot: common.HexToHash("e00fa6389b2e597d9df52172cd8e936879eed0fca4fa59db99e2c8ed682562f2"),
}
)
// trustedCheckpoints associates each known checkpoint with the genesis hash of the chain it belongs to
-var trustedCheckpoints = map[common.Hash]trustedCheckpoint{
+var trustedCheckpoints = map[common.Hash]TrustedCheckpoint{
params.MainnetGenesisHash: mainnetCheckpoint,
params.TestnetGenesisHash: ropstenCheckpoint,
}
@@ -119,7 +121,8 @@ func StoreChtRoot(db ethdb.Database, sectionIdx uint64, sectionHead, root common
// ChtIndexerBackend implements core.ChainIndexerBackend
type ChtIndexerBackend struct {
- diskdb ethdb.Database
+ diskdb, trieTable ethdb.Database
+ odr OdrBackend
triedb *trie.Database
section, sectionSize uint64
lastHash common.Hash
@@ -127,7 +130,7 @@ type ChtIndexerBackend struct {
}
// NewBloomTrieIndexer creates a BloomTrie chain indexer
-func NewChtIndexer(db ethdb.Database, clientMode bool) *core.ChainIndexer {
+func NewChtIndexer(db ethdb.Database, clientMode bool, odr OdrBackend) *core.ChainIndexer {
var sectionSize, confirmReq uint64
if clientMode {
sectionSize = CHTFrequencyClient
@@ -137,28 +140,64 @@ func NewChtIndexer(db ethdb.Database, clientMode bool) *core.ChainIndexer {
confirmReq = HelperTrieProcessConfirmations
}
idb := ethdb.NewTable(db, "chtIndex-")
+ trieTable := ethdb.NewTable(db, ChtTablePrefix)
backend := &ChtIndexerBackend{
diskdb: db,
- triedb: trie.NewDatabase(ethdb.NewTable(db, ChtTablePrefix)),
+ odr: odr,
+ trieTable: trieTable,
+ triedb: trie.NewDatabase(trieTable),
sectionSize: sectionSize,
}
return core.NewChainIndexer(db, idb, backend, sectionSize, confirmReq, time.Millisecond*100, "cht")
}
+// fetchMissingNodes tries to retrieve the last entry of the latest trusted CHT from the
+// ODR backend in order to be able to add new entries and calculate subsequent root hashes
+func (c *ChtIndexerBackend) fetchMissingNodes(ctx context.Context, section uint64, root common.Hash) error {
+ batch := c.trieTable.NewBatch()
+ r := &ChtRequest{ChtRoot: root, ChtNum: section - 1, BlockNum: section*c.sectionSize - 1}
+ for {
+ err := c.odr.Retrieve(ctx, r)
+ switch err {
+ case nil:
+ r.Proof.Store(batch)
+ return batch.Write()
+ case ErrNoPeers:
+ // if there are no peers to serve, retry later
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-time.After(time.Second * 10):
+ // stay in the loop and try again
+ }
+ default:
+ return err
+ }
+ }
+}
+
// Reset implements core.ChainIndexerBackend
-func (c *ChtIndexerBackend) Reset(section uint64, lastSectionHead common.Hash) error {
+func (c *ChtIndexerBackend) Reset(ctx context.Context, section uint64, lastSectionHead common.Hash) error {
var root common.Hash
if section > 0 {
root = GetChtRoot(c.diskdb, section-1, lastSectionHead)
}
var err error
c.trie, err = trie.New(root, c.triedb)
+
+ if err != nil && c.odr != nil {
+ err = c.fetchMissingNodes(ctx, section, root)
+ if err == nil {
+ c.trie, err = trie.New(root, c.triedb)
+ }
+ }
+
c.section = section
return err
}
// Process implements core.ChainIndexerBackend
-func (c *ChtIndexerBackend) Process(header *types.Header) {
+func (c *ChtIndexerBackend) Process(ctx context.Context, header *types.Header) error {
hash, num := header.Hash(), header.Number.Uint64()
c.lastHash = hash
@@ -170,6 +209,7 @@ func (c *ChtIndexerBackend) Process(header *types.Header) {
binary.BigEndian.PutUint64(encNumber[:], num)
data, _ := rlp.EncodeToBytes(ChtNode{hash, td})
c.trie.Update(encNumber[:], data)
+ return nil
}
// Commit implements core.ChainIndexerBackend
@@ -181,16 +221,15 @@ func (c *ChtIndexerBackend) Commit() error {
c.triedb.Commit(root, false)
if ((c.section+1)*c.sectionSize)%CHTFrequencyClient == 0 {
- log.Info("Storing CHT", "section", c.section*c.sectionSize/CHTFrequencyClient, "head", c.lastHash, "root", root)
+ log.Info("Storing CHT", "section", c.section*c.sectionSize/CHTFrequencyClient, "head", fmt.Sprintf("%064x", c.lastHash), "root", fmt.Sprintf("%064x", root))
}
StoreChtRoot(c.diskdb, c.section, c.lastHash, root)
return nil
}
const (
- BloomTrieFrequency = 32768
- ethBloomBitsSection = 4096
- ethBloomBitsConfirmations = 256
+ BloomTrieFrequency = 32768
+ ethBloomBitsSection = 4096
)
var (
@@ -215,7 +254,8 @@ func StoreBloomTrieRoot(db ethdb.Database, sectionIdx uint64, sectionHead, root
// BloomTrieIndexerBackend implements core.ChainIndexerBackend
type BloomTrieIndexerBackend struct {
- diskdb ethdb.Database
+ diskdb, trieTable ethdb.Database
+ odr OdrBackend
triedb *trie.Database
section, parentSectionSize, bloomTrieRatio uint64
trie *trie.Trie
@@ -223,44 +263,98 @@ type BloomTrieIndexerBackend struct {
}
// NewBloomTrieIndexer creates a BloomTrie chain indexer
-func NewBloomTrieIndexer(db ethdb.Database, clientMode bool) *core.ChainIndexer {
+func NewBloomTrieIndexer(db ethdb.Database, clientMode bool, odr OdrBackend) *core.ChainIndexer {
+ trieTable := ethdb.NewTable(db, BloomTrieTablePrefix)
backend := &BloomTrieIndexerBackend{
- diskdb: db,
- triedb: trie.NewDatabase(ethdb.NewTable(db, BloomTrieTablePrefix)),
+ diskdb: db,
+ odr: odr,
+ trieTable: trieTable,
+ triedb: trie.NewDatabase(trieTable),
}
idb := ethdb.NewTable(db, "bltIndex-")
- var confirmReq uint64
if clientMode {
backend.parentSectionSize = BloomTrieFrequency
- confirmReq = HelperTrieConfirmations
} else {
backend.parentSectionSize = ethBloomBitsSection
- confirmReq = HelperTrieProcessConfirmations
}
backend.bloomTrieRatio = BloomTrieFrequency / backend.parentSectionSize
backend.sectionHeads = make([]common.Hash, backend.bloomTrieRatio)
- return core.NewChainIndexer(db, idb, backend, BloomTrieFrequency, confirmReq-ethBloomBitsConfirmations, time.Millisecond*100, "bloomtrie")
+ return core.NewChainIndexer(db, idb, backend, BloomTrieFrequency, 0, time.Millisecond*100, "bloomtrie")
+}
+
+// fetchMissingNodes tries to retrieve the last entries of the latest trusted bloom trie from the
+// ODR backend in order to be able to add new entries and calculate subsequent root hashes
+func (b *BloomTrieIndexerBackend) fetchMissingNodes(ctx context.Context, section uint64, root common.Hash) error {
+ indexCh := make(chan uint, types.BloomBitLength)
+ type res struct {
+ nodes *NodeSet
+ err error
+ }
+ resCh := make(chan res, types.BloomBitLength)
+ for i := 0; i < 20; i++ {
+ go func() {
+ for bitIndex := range indexCh {
+ r := &BloomRequest{BloomTrieRoot: root, BloomTrieNum: section - 1, BitIdx: bitIndex, SectionIdxList: []uint64{section - 1}}
+ for {
+ if err := b.odr.Retrieve(ctx, r); err == ErrNoPeers {
+ // if there are no peers to serve, retry later
+ select {
+ case <-ctx.Done():
+ resCh <- res{nil, ctx.Err()}
+ return
+ case <-time.After(time.Second * 10):
+ // stay in the loop and try again
+ }
+ } else {
+ resCh <- res{r.Proofs, err}
+ break
+ }
+ }
+ }
+ }()
+ }
+
+ for i := uint(0); i < types.BloomBitLength; i++ {
+ indexCh <- i
+ }
+ close(indexCh)
+ batch := b.trieTable.NewBatch()
+ for i := uint(0); i < types.BloomBitLength; i++ {
+ res := <-resCh
+ if res.err != nil {
+ return res.err
+ }
+ res.nodes.Store(batch)
+ }
+ return batch.Write()
}
// Reset implements core.ChainIndexerBackend
-func (b *BloomTrieIndexerBackend) Reset(section uint64, lastSectionHead common.Hash) error {
+func (b *BloomTrieIndexerBackend) Reset(ctx context.Context, section uint64, lastSectionHead common.Hash) error {
var root common.Hash
if section > 0 {
root = GetBloomTrieRoot(b.diskdb, section-1, lastSectionHead)
}
var err error
b.trie, err = trie.New(root, b.triedb)
+ if err != nil && b.odr != nil {
+ err = b.fetchMissingNodes(ctx, section, root)
+ if err == nil {
+ b.trie, err = trie.New(root, b.triedb)
+ }
+ }
b.section = section
return err
}
// Process implements core.ChainIndexerBackend
-func (b *BloomTrieIndexerBackend) Process(header *types.Header) {
+func (b *BloomTrieIndexerBackend) Process(ctx context.Context, header *types.Header) error {
num := header.Number.Uint64() - b.section*BloomTrieFrequency
if (num+1)%b.parentSectionSize == 0 {
b.sectionHeads[num/b.parentSectionSize] = header.Hash()
}
+ return nil
}
// Commit implements core.ChainIndexerBackend
@@ -300,7 +394,7 @@ func (b *BloomTrieIndexerBackend) Commit() error {
b.triedb.Commit(root, false)
sectionHead := b.sectionHeads[b.bloomTrieRatio-1]
- log.Info("Storing bloom trie", "section", b.section, "head", sectionHead, "root", root, "compression", float64(compSize)/float64(decompSize))
+ log.Info("Storing bloom trie", "section", b.section, "head", fmt.Sprintf("%064x", sectionHead), "root", fmt.Sprintf("%064x", root), "compression", float64(compSize)/float64(decompSize))
StoreBloomTrieRoot(b.diskdb, b.section, sectionHead, root)
return nil