From f585f9eee8cb18423c23fe8b517b5b4cbe3b3755 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Tue, 29 Aug 2017 14:13:11 +0300 Subject: core, eth: clean up bloom filtering, add some tests --- core/chain_indexer.go | 61 +++++++++++++++------------------------------------ 1 file changed, 18 insertions(+), 43 deletions(-) (limited to 'core/chain_indexer.go') diff --git a/core/chain_indexer.go b/core/chain_indexer.go index 56360b59a..f4c207dcc 100644 --- a/core/chain_indexer.go +++ b/core/chain_indexer.go @@ -36,14 +36,13 @@ import ( type ChainIndexerBackend interface { // Reset initiates the processing of a new chain segment, potentially terminating // any partially completed operations (in case of a reorg). - Reset(section uint64, lastSectionHead common.Hash) + Reset(section uint64) // Process crunches through the next header in the chain segment. The caller // will ensure a sequential order of headers. Process(header *types.Header) - // Commit finalizes the section metadata and stores it into the database. This - // interface will usually be a batch writer. + // Commit finalizes the section metadata and stores it into the database. Commit() error } @@ -101,34 +100,11 @@ func NewChainIndexer(chainDb, indexDb ethdb.Database, backend ChainIndexerBacken return c } -// AddKnownSectionHead marks a new section head as known/processed if it is newer -// than the already known best section head -func (c *ChainIndexer) AddKnownSectionHead(section uint64, shead common.Hash) { - c.lock.Lock() - defer c.lock.Unlock() - - if section < c.storedSections { - return - } - c.setSectionHead(section, shead) - c.setValidSections(section + 1) -} - -// IndexerChain interface is used for connecting the indexer to a blockchain -type IndexerChain interface { - CurrentHeader() *types.Header - SubscribeChainEvent(ch chan<- ChainEvent) event.Subscription -} - // Start creates a goroutine to feed chain head events into the indexer for // cascading background processing. Children do not need to be started, they // are notified about new events by their parents. -func (c *ChainIndexer) Start(chain IndexerChain) { - ch := make(chan ChainEvent, 10) - sub := chain.SubscribeChainEvent(ch) - currentHeader := chain.CurrentHeader() - - go c.eventLoop(currentHeader, ch, sub) +func (c *ChainIndexer) Start(currentHeader *types.Header, chainEventer func(ch chan<- ChainEvent) event.Subscription) { + go c.eventLoop(currentHeader, chainEventer) } // Close tears down all goroutines belonging to the indexer and returns any error @@ -149,14 +125,12 @@ func (c *ChainIndexer) Close() error { errs = append(errs, err) } } - // Close all children for _, child := range c.children { if err := child.Close(); err != nil { errs = append(errs, err) } } - // Return any failures switch { case len(errs) == 0: @@ -173,10 +147,12 @@ func (c *ChainIndexer) Close() error { // eventLoop is a secondary - optional - event loop of the indexer which is only // started for the outermost indexer to push chain head events into a processing // queue. -func (c *ChainIndexer) eventLoop(currentHeader *types.Header, ch chan ChainEvent, sub event.Subscription) { +func (c *ChainIndexer) eventLoop(currentHeader *types.Header, chainEventer func(ch chan<- ChainEvent) event.Subscription) { // Mark the chain indexer as active, requiring an additional teardown atomic.StoreUint32(&c.active, 1) + events := make(chan ChainEvent, 10) + sub := chainEventer(events) defer sub.Unsubscribe() // Fire the initial new head event to start any outstanding processing @@ -193,7 +169,7 @@ func (c *ChainIndexer) eventLoop(currentHeader *types.Header, ch chan ChainEvent errc <- nil return - case ev, ok := <-ch: + case ev, ok := <-events: // Received a new event, ensure it's not nil (closing) and update if !ok { errc := <-c.quit @@ -257,10 +233,9 @@ func (c *ChainIndexer) newHead(head uint64, reorg bool) { // down into the processing backend. func (c *ChainIndexer) updateLoop() { var ( - updated time.Time - updateMsg bool + updating bool + updated time.Time ) - for { select { case errc := <-c.quit: @@ -275,7 +250,7 @@ func (c *ChainIndexer) updateLoop() { // Periodically print an upgrade log message to the user if time.Since(updated) > 8*time.Second { if c.knownSections > c.storedSections+1 { - updateMsg = true + updating = true c.log.Info("Upgrading chain index", "percentage", c.storedSections*100/c.knownSections) } updated = time.Now() @@ -284,7 +259,7 @@ func (c *ChainIndexer) updateLoop() { section := c.storedSections var oldHead common.Hash if section > 0 { - oldHead = c.SectionHead(section - 1) + oldHead = c.sectionHead(section - 1) } // Process the newly defined section in the background c.lock.Unlock() @@ -295,11 +270,11 @@ func (c *ChainIndexer) updateLoop() { c.lock.Lock() // If processing succeeded and no reorgs occcurred, mark the section completed - if err == nil && oldHead == c.SectionHead(section-1) { + if err == nil && oldHead == c.sectionHead(section-1) { c.setSectionHead(section, newHead) c.setValidSections(section + 1) - if c.storedSections == c.knownSections && updateMsg { - updateMsg = false + if c.storedSections == c.knownSections && updating { + updating = false c.log.Info("Finished upgrading chain index") } @@ -336,7 +311,7 @@ func (c *ChainIndexer) processSection(section uint64, lastHead common.Hash) (com c.log.Trace("Processing new chain section", "section", section) // Reset and partial processing - c.backend.Reset(section, lastHead) + c.backend.Reset(section) for number := section * c.sectionSize; number < (section+1)*c.sectionSize; number++ { hash := GetCanonicalHash(c.chainDb, number) @@ -366,7 +341,7 @@ func (c *ChainIndexer) Sections() (uint64, uint64, common.Hash) { c.lock.Lock() defer c.lock.Unlock() - return c.storedSections, c.storedSections*c.sectionSize - 1, c.SectionHead(c.storedSections - 1) + return c.storedSections, c.storedSections*c.sectionSize - 1, c.sectionHead(c.storedSections - 1) } // AddChildIndexer adds a child ChainIndexer that can use the output of this one @@ -408,7 +383,7 @@ func (c *ChainIndexer) setValidSections(sections uint64) { // sectionHead retrieves the last block hash of a processed section from the // index database. -func (c *ChainIndexer) SectionHead(section uint64) common.Hash { +func (c *ChainIndexer) sectionHead(section uint64) common.Hash { var data [8]byte binary.BigEndian.PutUint64(data[:], section) -- cgit v1.2.3