aboutsummaryrefslogtreecommitdiffstats
path: root/core/chain_indexer.go
diff options
context:
space:
mode:
Diffstat (limited to 'core/chain_indexer.go')
-rw-r--r--core/chain_indexer.go69
1 files changed, 50 insertions, 19 deletions
diff --git a/core/chain_indexer.go b/core/chain_indexer.go
index f4c207dcc..837c908ab 100644
--- a/core/chain_indexer.go
+++ b/core/chain_indexer.go
@@ -36,13 +36,14 @@ import (
type ChainIndexerBackend interface {
// Reset initiates the processing of a new chain segment, potentially terminating
// any partially completed operations (in case of a reorg).
- Reset(section uint64)
+ Reset(section uint64, lastSectionHead common.Hash) error
// Process crunches through the next header in the chain segment. The caller
// will ensure a sequential order of headers.
Process(header *types.Header)
- // Commit finalizes the section metadata and stores it into the database.
+ // Commit finalizes the section metadata and stores it into the database. This
+ // interface will usually be a batch writer.
Commit() error
}
@@ -100,11 +101,34 @@ func NewChainIndexer(chainDb, indexDb ethdb.Database, backend ChainIndexerBacken
return c
}
+// AddKnownSectionHead marks a new section head as known/processed if it is newer
+// than the already known best section head
+func (c *ChainIndexer) AddKnownSectionHead(section uint64, shead common.Hash) {
+ c.lock.Lock()
+ defer c.lock.Unlock()
+
+ if section < c.storedSections {
+ return
+ }
+ c.setSectionHead(section, shead)
+ c.setValidSections(section + 1)
+}
+
+// IndexerChain interface is used for connecting the indexer to a blockchain
+type IndexerChain interface {
+ CurrentHeader() *types.Header
+ SubscribeChainEvent(ch chan<- ChainEvent) event.Subscription
+}
+
// Start creates a goroutine to feed chain head events into the indexer for
// cascading background processing. Children do not need to be started, they
// are notified about new events by their parents.
-func (c *ChainIndexer) Start(currentHeader *types.Header, chainEventer func(ch chan<- ChainEvent) event.Subscription) {
- go c.eventLoop(currentHeader, chainEventer)
+func (c *ChainIndexer) Start(chain IndexerChain) {
+ ch := make(chan ChainEvent, 10)
+ sub := chain.SubscribeChainEvent(ch)
+ currentHeader := chain.CurrentHeader()
+
+ go c.eventLoop(currentHeader, ch, sub)
}
// Close tears down all goroutines belonging to the indexer and returns any error
@@ -125,12 +149,14 @@ func (c *ChainIndexer) Close() error {
errs = append(errs, err)
}
}
+
// Close all children
for _, child := range c.children {
if err := child.Close(); err != nil {
errs = append(errs, err)
}
}
+
// Return any failures
switch {
case len(errs) == 0:
@@ -147,12 +173,10 @@ func (c *ChainIndexer) Close() error {
// eventLoop is a secondary - optional - event loop of the indexer which is only
// started for the outermost indexer to push chain head events into a processing
// queue.
-func (c *ChainIndexer) eventLoop(currentHeader *types.Header, chainEventer func(ch chan<- ChainEvent) event.Subscription) {
+func (c *ChainIndexer) eventLoop(currentHeader *types.Header, ch chan ChainEvent, sub event.Subscription) {
// Mark the chain indexer as active, requiring an additional teardown
atomic.StoreUint32(&c.active, 1)
- events := make(chan ChainEvent, 10)
- sub := chainEventer(events)
defer sub.Unsubscribe()
// Fire the initial new head event to start any outstanding processing
@@ -169,7 +193,7 @@ func (c *ChainIndexer) eventLoop(currentHeader *types.Header, chainEventer func(
errc <- nil
return
- case ev, ok := <-events:
+ case ev, ok := <-ch:
// Received a new event, ensure it's not nil (closing) and update
if !ok {
errc := <-c.quit
@@ -178,7 +202,9 @@ func (c *ChainIndexer) eventLoop(currentHeader *types.Header, chainEventer func(
}
header := ev.Block.Header()
if header.ParentHash != prevHash {
- c.newHead(FindCommonAncestor(c.chainDb, prevHeader, header).Number.Uint64(), true)
+ if h := FindCommonAncestor(c.chainDb, prevHeader, header); h != nil {
+ c.newHead(h.Number.Uint64(), true)
+ }
}
c.newHead(header.Number.Uint64(), false)
@@ -233,9 +259,10 @@ func (c *ChainIndexer) newHead(head uint64, reorg bool) {
// down into the processing backend.
func (c *ChainIndexer) updateLoop() {
var (
- updating bool
- updated time.Time
+ updated time.Time
+ updateMsg bool
)
+
for {
select {
case errc := <-c.quit:
@@ -250,7 +277,7 @@ func (c *ChainIndexer) updateLoop() {
// Periodically print an upgrade log message to the user
if time.Since(updated) > 8*time.Second {
if c.knownSections > c.storedSections+1 {
- updating = true
+ updateMsg = true
c.log.Info("Upgrading chain index", "percentage", c.storedSections*100/c.knownSections)
}
updated = time.Now()
@@ -259,7 +286,7 @@ func (c *ChainIndexer) updateLoop() {
section := c.storedSections
var oldHead common.Hash
if section > 0 {
- oldHead = c.sectionHead(section - 1)
+ oldHead = c.SectionHead(section - 1)
}
// Process the newly defined section in the background
c.lock.Unlock()
@@ -270,11 +297,11 @@ func (c *ChainIndexer) updateLoop() {
c.lock.Lock()
// If processing succeeded and no reorgs occcurred, mark the section completed
- if err == nil && oldHead == c.sectionHead(section-1) {
+ if err == nil && oldHead == c.SectionHead(section-1) {
c.setSectionHead(section, newHead)
c.setValidSections(section + 1)
- if c.storedSections == c.knownSections && updating {
- updating = false
+ if c.storedSections == c.knownSections && updateMsg {
+ updateMsg = false
c.log.Info("Finished upgrading chain index")
}
@@ -311,7 +338,11 @@ func (c *ChainIndexer) processSection(section uint64, lastHead common.Hash) (com
c.log.Trace("Processing new chain section", "section", section)
// Reset and partial processing
- c.backend.Reset(section)
+
+ if err := c.backend.Reset(section, lastHead); err != nil {
+ c.setValidSections(0)
+ return common.Hash{}, err
+ }
for number := section * c.sectionSize; number < (section+1)*c.sectionSize; number++ {
hash := GetCanonicalHash(c.chainDb, number)
@@ -341,7 +372,7 @@ func (c *ChainIndexer) Sections() (uint64, uint64, common.Hash) {
c.lock.Lock()
defer c.lock.Unlock()
- return c.storedSections, c.storedSections*c.sectionSize - 1, c.sectionHead(c.storedSections - 1)
+ return c.storedSections, c.storedSections*c.sectionSize - 1, c.SectionHead(c.storedSections - 1)
}
// AddChildIndexer adds a child ChainIndexer that can use the output of this one
@@ -383,7 +414,7 @@ func (c *ChainIndexer) setValidSections(sections uint64) {
// sectionHead retrieves the last block hash of a processed section from the
// index database.
-func (c *ChainIndexer) sectionHead(section uint64) common.Hash {
+func (c *ChainIndexer) SectionHead(section uint64) common.Hash {
var data [8]byte
binary.BigEndian.PutUint64(data[:], section)