From 0095531a58772b1f5bd1547169790dbde84ec78a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Wed, 25 Oct 2017 12:18:44 +0300 Subject: core, eth, les: fix messy code (#15367) * core, eth, les: fix messy code * les: fixed tx status test and rlp encoding * core: add a workaround for light sync --- core/chain_indexer.go | 49 +++++++++++++++++++++++++------------------------ 1 file changed, 25 insertions(+), 24 deletions(-) (limited to 'core/chain_indexer.go') diff --git a/core/chain_indexer.go b/core/chain_indexer.go index 837c908ab..7e7500dc8 100644 --- a/core/chain_indexer.go +++ b/core/chain_indexer.go @@ -36,17 +36,25 @@ import ( type ChainIndexerBackend interface { // Reset initiates the processing of a new chain segment, potentially terminating // any partially completed operations (in case of a reorg). - Reset(section uint64, lastSectionHead common.Hash) error + Reset(section uint64, prevHead common.Hash) error // Process crunches through the next header in the chain segment. The caller // will ensure a sequential order of headers. Process(header *types.Header) - // Commit finalizes the section metadata and stores it into the database. This - // interface will usually be a batch writer. + // Commit finalizes the section metadata and stores it into the database. Commit() error } +// ChainIndexerChain interface is used for connecting the indexer to a blockchain +type ChainIndexerChain interface { + // CurrentHeader retrieves the latest locally known header. + CurrentHeader() *types.Header + + // SubscribeChainEvent subscribes to new head header notifications. + SubscribeChainEvent(ch chan<- ChainEvent) event.Subscription +} + // ChainIndexer does a post-processing job for equally sized sections of the // canonical chain (like BlooomBits and CHT structures). A ChainIndexer is // connected to the blockchain through the event system by starting a @@ -114,21 +122,14 @@ func (c *ChainIndexer) AddKnownSectionHead(section uint64, shead common.Hash) { c.setValidSections(section + 1) } -// IndexerChain interface is used for connecting the indexer to a blockchain -type IndexerChain interface { - CurrentHeader() *types.Header - SubscribeChainEvent(ch chan<- ChainEvent) event.Subscription -} - // Start creates a goroutine to feed chain head events into the indexer for // cascading background processing. Children do not need to be started, they // are notified about new events by their parents. -func (c *ChainIndexer) Start(chain IndexerChain) { - ch := make(chan ChainEvent, 10) - sub := chain.SubscribeChainEvent(ch) - currentHeader := chain.CurrentHeader() +func (c *ChainIndexer) Start(chain ChainIndexerChain) { + events := make(chan ChainEvent, 10) + sub := chain.SubscribeChainEvent(events) - go c.eventLoop(currentHeader, ch, sub) + go c.eventLoop(chain.CurrentHeader(), events, sub) } // Close tears down all goroutines belonging to the indexer and returns any error @@ -149,14 +150,12 @@ func (c *ChainIndexer) Close() error { errs = append(errs, err) } } - // Close all children for _, child := range c.children { if err := child.Close(); err != nil { errs = append(errs, err) } } - // Return any failures switch { case len(errs) == 0: @@ -173,7 +172,7 @@ func (c *ChainIndexer) Close() error { // eventLoop is a secondary - optional - event loop of the indexer which is only // started for the outermost indexer to push chain head events into a processing // queue. -func (c *ChainIndexer) eventLoop(currentHeader *types.Header, ch chan ChainEvent, sub event.Subscription) { +func (c *ChainIndexer) eventLoop(currentHeader *types.Header, events chan ChainEvent, sub event.Subscription) { // Mark the chain indexer as active, requiring an additional teardown atomic.StoreUint32(&c.active, 1) @@ -193,7 +192,7 @@ func (c *ChainIndexer) eventLoop(currentHeader *types.Header, ch chan ChainEvent errc <- nil return - case ev, ok := <-ch: + case ev, ok := <-events: // Received a new event, ensure it's not nil (closing) and update if !ok { errc := <-c.quit @@ -202,6 +201,8 @@ func (c *ChainIndexer) eventLoop(currentHeader *types.Header, ch chan ChainEvent } header := ev.Block.Header() if header.ParentHash != prevHash { + // Reorg to the common ancestor (might not exist in light sync mode, skip reorg then) + // TODO(karalabe, zsfelfoldi): This seems a bit brittle, can we detect this case explicitly? if h := FindCommonAncestor(c.chainDb, prevHeader, header); h != nil { c.newHead(h.Number.Uint64(), true) } @@ -259,8 +260,8 @@ func (c *ChainIndexer) newHead(head uint64, reorg bool) { // down into the processing backend. func (c *ChainIndexer) updateLoop() { var ( - updated time.Time - updateMsg bool + updating bool + updated time.Time ) for { @@ -277,7 +278,7 @@ func (c *ChainIndexer) updateLoop() { // Periodically print an upgrade log message to the user if time.Since(updated) > 8*time.Second { if c.knownSections > c.storedSections+1 { - updateMsg = true + updating = true c.log.Info("Upgrading chain index", "percentage", c.storedSections*100/c.knownSections) } updated = time.Now() @@ -300,8 +301,8 @@ func (c *ChainIndexer) updateLoop() { if err == nil && oldHead == c.SectionHead(section-1) { c.setSectionHead(section, newHead) c.setValidSections(section + 1) - if c.storedSections == c.knownSections && updateMsg { - updateMsg = false + if c.storedSections == c.knownSections && updating { + updating = false c.log.Info("Finished upgrading chain index") } @@ -412,7 +413,7 @@ func (c *ChainIndexer) setValidSections(sections uint64) { c.storedSections = sections // needed if new > old } -// sectionHead retrieves the last block hash of a processed section from the +// SectionHead retrieves the last block hash of a processed section from the // index database. func (c *ChainIndexer) SectionHead(section uint64) common.Hash { var data [8]byte -- cgit v1.2.3