aboutsummaryrefslogtreecommitdiffstats
path: root/core/chain_indexer.go
diff options
context:
space:
mode:
Diffstat (limited to 'core/chain_indexer.go')
-rw-r--r--core/chain_indexer.go61
1 files changed, 18 insertions, 43 deletions
diff --git a/core/chain_indexer.go b/core/chain_indexer.go
index 56360b59a..f4c207dcc 100644
--- a/core/chain_indexer.go
+++ b/core/chain_indexer.go
@@ -36,14 +36,13 @@ import (
type ChainIndexerBackend interface {
// Reset initiates the processing of a new chain segment, potentially terminating
// any partially completed operations (in case of a reorg).
- Reset(section uint64, lastSectionHead common.Hash)
+ Reset(section uint64)
// Process crunches through the next header in the chain segment. The caller
// will ensure a sequential order of headers.
Process(header *types.Header)
- // Commit finalizes the section metadata and stores it into the database. This
- // interface will usually be a batch writer.
+ // Commit finalizes the section metadata and stores it into the database.
Commit() error
}
@@ -101,34 +100,11 @@ func NewChainIndexer(chainDb, indexDb ethdb.Database, backend ChainIndexerBacken
return c
}
-// AddKnownSectionHead marks a new section head as known/processed if it is newer
-// than the already known best section head
-func (c *ChainIndexer) AddKnownSectionHead(section uint64, shead common.Hash) {
- c.lock.Lock()
- defer c.lock.Unlock()
-
- if section < c.storedSections {
- return
- }
- c.setSectionHead(section, shead)
- c.setValidSections(section + 1)
-}
-
-// IndexerChain interface is used for connecting the indexer to a blockchain
-type IndexerChain interface {
- CurrentHeader() *types.Header
- SubscribeChainEvent(ch chan<- ChainEvent) event.Subscription
-}
-
// Start creates a goroutine to feed chain head events into the indexer for
// cascading background processing. Children do not need to be started, they
// are notified about new events by their parents.
-func (c *ChainIndexer) Start(chain IndexerChain) {
- ch := make(chan ChainEvent, 10)
- sub := chain.SubscribeChainEvent(ch)
- currentHeader := chain.CurrentHeader()
-
- go c.eventLoop(currentHeader, ch, sub)
+func (c *ChainIndexer) Start(currentHeader *types.Header, chainEventer func(ch chan<- ChainEvent) event.Subscription) {
+ go c.eventLoop(currentHeader, chainEventer)
}
// Close tears down all goroutines belonging to the indexer and returns any error
@@ -149,14 +125,12 @@ func (c *ChainIndexer) Close() error {
errs = append(errs, err)
}
}
-
// Close all children
for _, child := range c.children {
if err := child.Close(); err != nil {
errs = append(errs, err)
}
}
-
// Return any failures
switch {
case len(errs) == 0:
@@ -173,10 +147,12 @@ func (c *ChainIndexer) Close() error {
// eventLoop is a secondary - optional - event loop of the indexer which is only
// started for the outermost indexer to push chain head events into a processing
// queue.
-func (c *ChainIndexer) eventLoop(currentHeader *types.Header, ch chan ChainEvent, sub event.Subscription) {
+func (c *ChainIndexer) eventLoop(currentHeader *types.Header, chainEventer func(ch chan<- ChainEvent) event.Subscription) {
// Mark the chain indexer as active, requiring an additional teardown
atomic.StoreUint32(&c.active, 1)
+ events := make(chan ChainEvent, 10)
+ sub := chainEventer(events)
defer sub.Unsubscribe()
// Fire the initial new head event to start any outstanding processing
@@ -193,7 +169,7 @@ func (c *ChainIndexer) eventLoop(currentHeader *types.Header, ch chan ChainEvent
errc <- nil
return
- case ev, ok := <-ch:
+ case ev, ok := <-events:
// Received a new event, ensure it's not nil (closing) and update
if !ok {
errc := <-c.quit
@@ -257,10 +233,9 @@ func (c *ChainIndexer) newHead(head uint64, reorg bool) {
// down into the processing backend.
func (c *ChainIndexer) updateLoop() {
var (
- updated time.Time
- updateMsg bool
+ updating bool
+ updated time.Time
)
-
for {
select {
case errc := <-c.quit:
@@ -275,7 +250,7 @@ func (c *ChainIndexer) updateLoop() {
// Periodically print an upgrade log message to the user
if time.Since(updated) > 8*time.Second {
if c.knownSections > c.storedSections+1 {
- updateMsg = true
+ updating = true
c.log.Info("Upgrading chain index", "percentage", c.storedSections*100/c.knownSections)
}
updated = time.Now()
@@ -284,7 +259,7 @@ func (c *ChainIndexer) updateLoop() {
section := c.storedSections
var oldHead common.Hash
if section > 0 {
- oldHead = c.SectionHead(section - 1)
+ oldHead = c.sectionHead(section - 1)
}
// Process the newly defined section in the background
c.lock.Unlock()
@@ -295,11 +270,11 @@ func (c *ChainIndexer) updateLoop() {
c.lock.Lock()
// If processing succeeded and no reorgs occcurred, mark the section completed
- if err == nil && oldHead == c.SectionHead(section-1) {
+ if err == nil && oldHead == c.sectionHead(section-1) {
c.setSectionHead(section, newHead)
c.setValidSections(section + 1)
- if c.storedSections == c.knownSections && updateMsg {
- updateMsg = false
+ if c.storedSections == c.knownSections && updating {
+ updating = false
c.log.Info("Finished upgrading chain index")
}
@@ -336,7 +311,7 @@ func (c *ChainIndexer) processSection(section uint64, lastHead common.Hash) (com
c.log.Trace("Processing new chain section", "section", section)
// Reset and partial processing
- c.backend.Reset(section, lastHead)
+ c.backend.Reset(section)
for number := section * c.sectionSize; number < (section+1)*c.sectionSize; number++ {
hash := GetCanonicalHash(c.chainDb, number)
@@ -366,7 +341,7 @@ func (c *ChainIndexer) Sections() (uint64, uint64, common.Hash) {
c.lock.Lock()
defer c.lock.Unlock()
- return c.storedSections, c.storedSections*c.sectionSize - 1, c.SectionHead(c.storedSections - 1)
+ return c.storedSections, c.storedSections*c.sectionSize - 1, c.sectionHead(c.storedSections - 1)
}
// AddChildIndexer adds a child ChainIndexer that can use the output of this one
@@ -408,7 +383,7 @@ func (c *ChainIndexer) setValidSections(sections uint64) {
// sectionHead retrieves the last block hash of a processed section from the
// index database.
-func (c *ChainIndexer) SectionHead(section uint64) common.Hash {
+func (c *ChainIndexer) sectionHead(section uint64) common.Hash {
var data [8]byte
binary.BigEndian.PutUint64(data[:], section)