aboutsummaryrefslogtreecommitdiffstats
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/chain_indexer.go30
-rw-r--r--core/chain_indexer_test.go6
2 files changed, 26 insertions, 10 deletions
diff --git a/core/chain_indexer.go b/core/chain_indexer.go
index 0b927116d..11a7c96fa 100644
--- a/core/chain_indexer.go
+++ b/core/chain_indexer.go
@@ -17,6 +17,7 @@
package core
import (
+ "context"
"encoding/binary"
"fmt"
"sync"
@@ -37,11 +38,11 @@ import (
type ChainIndexerBackend interface {
// Reset initiates the processing of a new chain segment, potentially terminating
// any partially completed operations (in case of a reorg).
- Reset(section uint64, prevHead common.Hash) error
+ Reset(ctx context.Context, section uint64, prevHead common.Hash) error
// Process crunches through the next header in the chain segment. The caller
// will ensure a sequential order of headers.
- Process(header *types.Header)
+ Process(ctx context.Context, header *types.Header) error
// Commit finalizes the section metadata and stores it into the database.
Commit() error
@@ -71,9 +72,11 @@ type ChainIndexer struct {
backend ChainIndexerBackend // Background processor generating the index data content
children []*ChainIndexer // Child indexers to cascade chain updates to
- active uint32 // Flag whether the event loop was started
- update chan struct{} // Notification channel that headers should be processed
- quit chan chan error // Quit channel to tear down running goroutines
+ active uint32 // Flag whether the event loop was started
+ update chan struct{} // Notification channel that headers should be processed
+ quit chan chan error // Quit channel to tear down running goroutines
+ ctx context.Context
+ ctxCancel func()
sectionSize uint64 // Number of blocks in a single chain segment to process
confirmsReq uint64 // Number of confirmations before processing a completed segment
@@ -105,6 +108,8 @@ func NewChainIndexer(chainDb, indexDb ethdb.Database, backend ChainIndexerBacken
}
// Initialize database dependent fields and start the updater
c.loadValidSections()
+ c.ctx, c.ctxCancel = context.WithCancel(context.Background())
+
go c.updateLoop()
return c
@@ -138,6 +143,8 @@ func (c *ChainIndexer) Start(chain ChainIndexerChain) {
func (c *ChainIndexer) Close() error {
var errs []error
+ c.ctxCancel()
+
// Tear down the primary update loop
errc := make(chan error)
c.quit <- errc
@@ -297,6 +304,12 @@ func (c *ChainIndexer) updateLoop() {
c.lock.Unlock()
newHead, err := c.processSection(section, oldHead)
if err != nil {
+ select {
+ case <-c.ctx.Done():
+ <-c.quit <- nil
+ return
+ default:
+ }
c.log.Error("Section processing failed", "error", err)
}
c.lock.Lock()
@@ -344,7 +357,7 @@ func (c *ChainIndexer) processSection(section uint64, lastHead common.Hash) (com
// Reset and partial processing
- if err := c.backend.Reset(section, lastHead); err != nil {
+ if err := c.backend.Reset(c.ctx, section, lastHead); err != nil {
c.setValidSections(0)
return common.Hash{}, err
}
@@ -360,11 +373,12 @@ func (c *ChainIndexer) processSection(section uint64, lastHead common.Hash) (com
} else if header.ParentHash != lastHead {
return common.Hash{}, fmt.Errorf("chain reorged during section processing")
}
- c.backend.Process(header)
+ if err := c.backend.Process(c.ctx, header); err != nil {
+ return common.Hash{}, err
+ }
lastHead = header.Hash()
}
if err := c.backend.Commit(); err != nil {
- c.log.Error("Section commit failed", "error", err)
return common.Hash{}, err
}
return lastHead, nil
diff --git a/core/chain_indexer_test.go b/core/chain_indexer_test.go
index 550caf556..a029dec62 100644
--- a/core/chain_indexer_test.go
+++ b/core/chain_indexer_test.go
@@ -17,6 +17,7 @@
package core
import (
+ "context"
"fmt"
"math/big"
"math/rand"
@@ -210,13 +211,13 @@ func (b *testChainIndexBackend) reorg(headNum uint64) uint64 {
return b.stored * b.indexer.sectionSize
}
-func (b *testChainIndexBackend) Reset(section uint64, prevHead common.Hash) error {
+func (b *testChainIndexBackend) Reset(ctx context.Context, section uint64, prevHead common.Hash) error {
b.section = section
b.headerCnt = 0
return nil
}
-func (b *testChainIndexBackend) Process(header *types.Header) {
+func (b *testChainIndexBackend) Process(ctx context.Context, header *types.Header) error {
b.headerCnt++
if b.headerCnt > b.indexer.sectionSize {
b.t.Error("Processing too many headers")
@@ -227,6 +228,7 @@ func (b *testChainIndexBackend) Process(header *types.Header) {
b.t.Fatal("Unexpected call to Process")
case b.processCh <- header.Number.Uint64():
}
+ return nil
}
func (b *testChainIndexBackend) Commit() error {