aboutsummaryrefslogtreecommitdiffstats
path: root/core/chain_indexer.go
diff options
context:
space:
mode:
authorPéter Szilágyi <peterke@gmail.com>2017-10-25 17:18:44 +0800
committerGitHub <noreply@github.com>2017-10-25 17:18:44 +0800
commit0095531a58772b1f5bd1547169790dbde84ec78a (patch)
tree91b9e56dbcb9afe4058c0f41f33ebc7812201abc /core/chain_indexer.go
parentca376ead88a5a26626a90abdb62f4de7f6313822 (diff)
downloadgo-tangerine-0095531a58772b1f5bd1547169790dbde84ec78a.tar
go-tangerine-0095531a58772b1f5bd1547169790dbde84ec78a.tar.gz
go-tangerine-0095531a58772b1f5bd1547169790dbde84ec78a.tar.bz2
go-tangerine-0095531a58772b1f5bd1547169790dbde84ec78a.tar.lz
go-tangerine-0095531a58772b1f5bd1547169790dbde84ec78a.tar.xz
go-tangerine-0095531a58772b1f5bd1547169790dbde84ec78a.tar.zst
go-tangerine-0095531a58772b1f5bd1547169790dbde84ec78a.zip
core, eth, les: fix messy code (#15367)
* core, eth, les: fix messy code * les: fixed tx status test and rlp encoding * core: add a workaround for light sync
Diffstat (limited to 'core/chain_indexer.go')
-rw-r--r--core/chain_indexer.go49
1 files changed, 25 insertions, 24 deletions
diff --git a/core/chain_indexer.go b/core/chain_indexer.go
index 837c908ab..7e7500dc8 100644
--- a/core/chain_indexer.go
+++ b/core/chain_indexer.go
@@ -36,17 +36,25 @@ import (
type ChainIndexerBackend interface {
// Reset initiates the processing of a new chain segment, potentially terminating
// any partially completed operations (in case of a reorg).
- Reset(section uint64, lastSectionHead common.Hash) error
+ Reset(section uint64, prevHead common.Hash) error
// Process crunches through the next header in the chain segment. The caller
// will ensure a sequential order of headers.
Process(header *types.Header)
- // Commit finalizes the section metadata and stores it into the database. This
- // interface will usually be a batch writer.
+ // Commit finalizes the section metadata and stores it into the database.
Commit() error
}
+// ChainIndexerChain interface is used for connecting the indexer to a blockchain
+type ChainIndexerChain interface {
+ // CurrentHeader retrieves the latest locally known header.
+ CurrentHeader() *types.Header
+
+ // SubscribeChainEvent subscribes to new head header notifications.
+ SubscribeChainEvent(ch chan<- ChainEvent) event.Subscription
+}
+
// ChainIndexer does a post-processing job for equally sized sections of the
// canonical chain (like BlooomBits and CHT structures). A ChainIndexer is
// connected to the blockchain through the event system by starting a
@@ -114,21 +122,14 @@ func (c *ChainIndexer) AddKnownSectionHead(section uint64, shead common.Hash) {
c.setValidSections(section + 1)
}
-// IndexerChain interface is used for connecting the indexer to a blockchain
-type IndexerChain interface {
- CurrentHeader() *types.Header
- SubscribeChainEvent(ch chan<- ChainEvent) event.Subscription
-}
-
// Start creates a goroutine to feed chain head events into the indexer for
// cascading background processing. Children do not need to be started, they
// are notified about new events by their parents.
-func (c *ChainIndexer) Start(chain IndexerChain) {
- ch := make(chan ChainEvent, 10)
- sub := chain.SubscribeChainEvent(ch)
- currentHeader := chain.CurrentHeader()
+func (c *ChainIndexer) Start(chain ChainIndexerChain) {
+ events := make(chan ChainEvent, 10)
+ sub := chain.SubscribeChainEvent(events)
- go c.eventLoop(currentHeader, ch, sub)
+ go c.eventLoop(chain.CurrentHeader(), events, sub)
}
// Close tears down all goroutines belonging to the indexer and returns any error
@@ -149,14 +150,12 @@ func (c *ChainIndexer) Close() error {
errs = append(errs, err)
}
}
-
// Close all children
for _, child := range c.children {
if err := child.Close(); err != nil {
errs = append(errs, err)
}
}
-
// Return any failures
switch {
case len(errs) == 0:
@@ -173,7 +172,7 @@ func (c *ChainIndexer) Close() error {
// eventLoop is a secondary - optional - event loop of the indexer which is only
// started for the outermost indexer to push chain head events into a processing
// queue.
-func (c *ChainIndexer) eventLoop(currentHeader *types.Header, ch chan ChainEvent, sub event.Subscription) {
+func (c *ChainIndexer) eventLoop(currentHeader *types.Header, events chan ChainEvent, sub event.Subscription) {
// Mark the chain indexer as active, requiring an additional teardown
atomic.StoreUint32(&c.active, 1)
@@ -193,7 +192,7 @@ func (c *ChainIndexer) eventLoop(currentHeader *types.Header, ch chan ChainEvent
errc <- nil
return
- case ev, ok := <-ch:
+ case ev, ok := <-events:
// Received a new event, ensure it's not nil (closing) and update
if !ok {
errc := <-c.quit
@@ -202,6 +201,8 @@ func (c *ChainIndexer) eventLoop(currentHeader *types.Header, ch chan ChainEvent
}
header := ev.Block.Header()
if header.ParentHash != prevHash {
+ // Reorg to the common ancestor (might not exist in light sync mode, skip reorg then)
+ // TODO(karalabe, zsfelfoldi): This seems a bit brittle, can we detect this case explicitly?
if h := FindCommonAncestor(c.chainDb, prevHeader, header); h != nil {
c.newHead(h.Number.Uint64(), true)
}
@@ -259,8 +260,8 @@ func (c *ChainIndexer) newHead(head uint64, reorg bool) {
// down into the processing backend.
func (c *ChainIndexer) updateLoop() {
var (
- updated time.Time
- updateMsg bool
+ updating bool
+ updated time.Time
)
for {
@@ -277,7 +278,7 @@ func (c *ChainIndexer) updateLoop() {
// Periodically print an upgrade log message to the user
if time.Since(updated) > 8*time.Second {
if c.knownSections > c.storedSections+1 {
- updateMsg = true
+ updating = true
c.log.Info("Upgrading chain index", "percentage", c.storedSections*100/c.knownSections)
}
updated = time.Now()
@@ -300,8 +301,8 @@ func (c *ChainIndexer) updateLoop() {
if err == nil && oldHead == c.SectionHead(section-1) {
c.setSectionHead(section, newHead)
c.setValidSections(section + 1)
- if c.storedSections == c.knownSections && updateMsg {
- updateMsg = false
+ if c.storedSections == c.knownSections && updating {
+ updating = false
c.log.Info("Finished upgrading chain index")
}
@@ -412,7 +413,7 @@ func (c *ChainIndexer) setValidSections(sections uint64) {
c.storedSections = sections // needed if new > old
}
-// sectionHead retrieves the last block hash of a processed section from the
+// SectionHead retrieves the last block hash of a processed section from the
// index database.
func (c *ChainIndexer) SectionHead(section uint64) common.Hash {
var data [8]byte