diff options
-rw-r--r-- | core/chain_indexer.go | 396 | ||||
-rw-r--r-- | core/chain_indexer_test.go | 234 |
2 files changed, 630 insertions, 0 deletions
diff --git a/core/chain_indexer.go b/core/chain_indexer.go new file mode 100644 index 000000000..9a88a5b1b --- /dev/null +++ b/core/chain_indexer.go @@ -0,0 +1,396 @@ +// Copyright 2017 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package core + +import ( + "encoding/binary" + "fmt" + "sync" + "sync/atomic" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/log" +) + +// ChainIndexerBackend defines the methods needed to process chain segments in +// the background and write the segment results into the database. These can be +// used to create filter blooms or CHTs. +type ChainIndexerBackend interface { + // Reset initiates the processing of a new chain segment, potentially terminating + // any partially completed operations (in case of a reorg). + Reset(section uint64) + + // Process crunches through the next header in the chain segment. The caller + // will ensure a sequential order of headers. + Process(header *types.Header) + + // Commit finalizes the section metadata and stores it into the database. This + // interface will usually be a batch writer. + Commit(db ethdb.Database) error +} + +// ChainIndexer does a post-processing job for equally sized sections of the +// canonical chain (like BlooomBits and CHT structures). A ChainIndexer is +// connected to the blockchain through the event system by starting a +// ChainEventLoop in a goroutine. +// +// Further child ChainIndexers can be added which use the output of the parent +// section indexer. These child indexers receive new head notifications only +// after an entire section has been finished or in case of rollbacks that might +// affect already finished sections. +type ChainIndexer struct { + chainDb ethdb.Database // Chain database to index the data from + indexDb ethdb.Database // Prefixed table-view of the db to write index metadata into + backend ChainIndexerBackend // Background processor generating the index data content + children []*ChainIndexer // Child indexers to cascade chain updates to + + active uint32 // Flag whether the event loop was started + update chan struct{} // Notification channel that headers should be processed + quit chan chan error // Quit channel to tear down running goroutines + + sectionSize uint64 // Number of blocks in a single chain segment to process + confirmsReq uint64 // Number of confirmations before processing a completed segment + + storedSections uint64 // Number of sections successfully indexed into the database + knownSections uint64 // Number of sections known to be complete (block wise) + cascadedHead uint64 // Block number of the last completed section cascaded to subindexers + + throttling time.Duration // Disk throttling to prevent a heavy upgrade from hogging resources + + log log.Logger + lock sync.RWMutex +} + +// NewChainIndexer creates a new chain indexer to do background processing on +// chain segments of a given size after certain number of confirmations passed. +// The throttling parameter might be used to prevent database thrashing. +func NewChainIndexer(chainDb, indexDb ethdb.Database, backend ChainIndexerBackend, section, confirm uint64, throttling time.Duration, kind string) *ChainIndexer { + c := &ChainIndexer{ + chainDb: chainDb, + indexDb: indexDb, + backend: backend, + update: make(chan struct{}, 1), + quit: make(chan chan error), + sectionSize: section, + confirmsReq: confirm, + throttling: throttling, + log: log.New("type", kind), + } + // Initialize database dependent fields and start the updater + c.loadValidSections() + go c.updateLoop() + + return c +} + +// Start creates a goroutine to feed chain head events into the indexer for +// cascading background processing. +func (c *ChainIndexer) Start(currentHeader *types.Header, eventMux *event.TypeMux) { + go c.eventLoop(currentHeader, eventMux) +} + +// Close tears down all goroutines belonging to the indexer and returns any error +// that might have occurred internally. +func (c *ChainIndexer) Close() error { + var errs []error + + // Tear down the primary update loop + errc := make(chan error) + c.quit <- errc + if err := <-errc; err != nil { + errs = append(errs, err) + } + // If needed, tear down the secondary event loop + if atomic.LoadUint32(&c.active) != 0 { + c.quit <- errc + if err := <-errc; err != nil { + errs = append(errs, err) + } + } + // Return any failures + switch { + case len(errs) == 0: + return nil + + case len(errs) == 1: + return errs[0] + + default: + return fmt.Errorf("%v", errs) + } +} + +// eventLoop is a secondary - optional - event loop of the indexer which is only +// started for the outermost indexer to push chain head events into a processing +// queue. +func (c *ChainIndexer) eventLoop(currentHeader *types.Header, eventMux *event.TypeMux) { + // Mark the chain indexer as active, requiring an additional teardown + atomic.StoreUint32(&c.active, 1) + + // Subscribe to chain head events + sub := eventMux.Subscribe(ChainEvent{}) + defer sub.Unsubscribe() + + // Fire the initial new head event to start any outstanding processing + c.newHead(currentHeader.Number.Uint64(), false) + + var ( + prevHeader = currentHeader + prevHash = currentHeader.Hash() + ) + for { + select { + case errc := <-c.quit: + // Chain indexer terminating, report no failure and abort + errc <- nil + return + + case ev, ok := <-sub.Chan(): + // Received a new event, ensure it's not nil (closing) and update + if !ok { + errc := <-c.quit + errc <- nil + return + } + header := ev.Data.(ChainEvent).Block.Header() + if header.ParentHash != prevHash { + c.newHead(FindCommonAncestor(c.chainDb, prevHeader, header).Number.Uint64(), true) + } + c.newHead(header.Number.Uint64(), false) + + prevHeader, prevHash = header, header.Hash() + } + } +} + +// newHead notifies the indexer about new chain heads and/or reorgs. +func (c *ChainIndexer) newHead(head uint64, reorg bool) { + c.lock.Lock() + defer c.lock.Unlock() + + // If a reorg happened, invalidate all sections until that point + if reorg { + // Revert the known section number to the reorg point + changed := head / c.sectionSize + if changed < c.knownSections { + c.knownSections = changed + } + // Revert the stored sections from the database to the reorg point + if changed < c.storedSections { + c.setValidSections(changed) + } + // Update the new head number to te finalized section end and notify children + head = changed * c.sectionSize + + if head < c.cascadedHead { + c.cascadedHead = head + for _, child := range c.children { + child.newHead(c.cascadedHead, true) + } + } + return + } + // No reorg, calculate the number of newly known sections and update if high enough + var sections uint64 + if head >= c.confirmsReq { + sections = (head + 1 - c.confirmsReq) / c.sectionSize + if sections > c.knownSections { + c.knownSections = sections + + select { + case c.update <- struct{}{}: + default: + } + } + } +} + +// updateLoop is the main event loop of the indexer which pushes chain segments +// down into the processing backend. +func (c *ChainIndexer) updateLoop() { + var updated time.Time + + for { + select { + case errc := <-c.quit: + // Chain indexer terminating, report no failure and abort + errc <- nil + return + + case <-c.update: + // Section headers completed (or rolled back), update the index + c.lock.Lock() + if c.knownSections > c.storedSections { + // Periodically print an upgrade log message to the user + if time.Since(updated) > 8*time.Second { + if c.knownSections > c.storedSections+1 { + c.log.Info("Upgrading chain index", "percentage", c.storedSections*100/c.knownSections) + } + updated = time.Now() + } + // Cache the current section count and head to allow unlocking the mutex + section := c.storedSections + var oldHead common.Hash + if section > 0 { + oldHead = c.sectionHead(section - 1) + } + // Process the newly defined section in the background + c.lock.Unlock() + newHead, err := c.processSection(section, oldHead) + c.lock.Lock() + + // If processing succeeded and no reorgs occcurred, mark the section completed + if err == nil && oldHead == c.sectionHead(section-1) { + c.setSectionHead(section, newHead) + c.setValidSections(section + 1) + + c.cascadedHead = c.storedSections*c.sectionSize - 1 + for _, child := range c.children { + c.log.Trace("Cascading chain index update", "head", c.cascadedHead) + child.newHead(c.cascadedHead, false) + } + } else { + // If processing failed, don't retry until further notification + c.log.Debug("Chain index processing failed", "section", section, "err", err) + c.knownSections = c.storedSections + } + } + // If there are still further sections to process, reschedule + if c.knownSections > c.storedSections { + time.AfterFunc(c.throttling, func() { + select { + case c.update <- struct{}{}: + default: + } + }) + } + c.lock.Unlock() + } + } +} + +// processSection processes an entire section by calling backend functions while +// ensuring the continuity of the passed headers. Since the chain mutex is not +// held while processing, the continuity can be broken by a long reorg, in which +// case the function returns with an error. +func (c *ChainIndexer) processSection(section uint64, lastHead common.Hash) (common.Hash, error) { + c.log.Trace("Processing new chain section", "section", section) + + // Reset and partial processing + c.backend.Reset(section) + + for number := section * c.sectionSize; number < (section+1)*c.sectionSize; number++ { + hash := GetCanonicalHash(c.chainDb, number) + if hash == (common.Hash{}) { + return common.Hash{}, fmt.Errorf("canonical block #%d unknown", number) + } + header := GetHeader(c.chainDb, hash, number) + if header == nil { + return common.Hash{}, fmt.Errorf("block #%d [%x…] not found", number, hash[:4]) + } else if header.ParentHash != lastHead { + return common.Hash{}, fmt.Errorf("chain reorged during section processing") + } + c.backend.Process(header) + lastHead = header.Hash() + } + if err := c.backend.Commit(c.chainDb); err != nil { + return common.Hash{}, err + } + return lastHead, nil +} + +// Sections returns the number of processed sections maintained by the indexer +// and also the information about the last header indexed for potential canonical +// verifications. +func (c *ChainIndexer) Sections() (uint64, uint64, common.Hash) { + c.lock.Lock() + defer c.lock.Unlock() + + return c.storedSections, c.storedSections*c.sectionSize - 1, c.sectionHead(c.storedSections - 1) +} + +// AddChildIndexer adds a child ChainIndexer that can use the output of this one +func (c *ChainIndexer) AddChildIndexer(indexer *ChainIndexer) { + c.lock.Lock() + defer c.lock.Unlock() + + c.children = append(c.children, indexer) + + // Cascade any pending updates to new children too + if c.storedSections > 0 { + indexer.newHead(c.storedSections*c.sectionSize-1, false) + } +} + +// loadValidSections reads the number of valid sections from the index database +// and caches is into the local state. +func (c *ChainIndexer) loadValidSections() { + data, _ := c.indexDb.Get([]byte("count")) + if len(data) == 8 { + c.storedSections = binary.BigEndian.Uint64(data[:]) + } +} + +// setValidSections writes the number of valid sections to the index database +func (c *ChainIndexer) setValidSections(sections uint64) { + // Set the current number of valid sections in the database + var data [8]byte + binary.BigEndian.PutUint64(data[:], sections) + c.indexDb.Put([]byte("count"), data[:]) + + // Remove any reorged sections, caching the valids in the mean time + for c.storedSections > sections { + c.storedSections-- + c.removeSectionHead(c.storedSections) + } + c.storedSections = sections // needed if new > old +} + +// sectionHead retrieves the last block hash of a processed section from the +// index database. +func (c *ChainIndexer) sectionHead(section uint64) common.Hash { + var data [8]byte + binary.BigEndian.PutUint64(data[:], section) + + hash, _ := c.indexDb.Get(append([]byte("shead"), data[:]...)) + if len(hash) == len(common.Hash{}) { + return common.BytesToHash(hash) + } + return common.Hash{} +} + +// setSectionHead writes the last block hash of a processed section to the index +// database. +func (c *ChainIndexer) setSectionHead(section uint64, hash common.Hash) { + var data [8]byte + binary.BigEndian.PutUint64(data[:], section) + + c.indexDb.Put(append([]byte("shead"), data[:]...), hash.Bytes()) +} + +// removeSectionHead removes the reference to a processed section from the index +// database. +func (c *ChainIndexer) removeSectionHead(section uint64) { + var data [8]byte + binary.BigEndian.PutUint64(data[:], section) + + c.indexDb.Delete(append([]byte("shead"), data[:]...)) +} diff --git a/core/chain_indexer_test.go b/core/chain_indexer_test.go new file mode 100644 index 000000000..780e46e43 --- /dev/null +++ b/core/chain_indexer_test.go @@ -0,0 +1,234 @@ +// Copyright 2017 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package core + +import ( + "fmt" + "math/big" + "math/rand" + "testing" + "time" + + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethdb" +) + +// Runs multiple tests with randomized parameters. +func TestChainIndexerSingle(t *testing.T) { + for i := 0; i < 10; i++ { + testChainIndexer(t, 1) + } +} + +// Runs multiple tests with randomized parameters and different number of +// chain backends. +func TestChainIndexerWithChildren(t *testing.T) { + for i := 2; i < 8; i++ { + testChainIndexer(t, i) + } +} + +// testChainIndexer runs a test with either a single chain indexer or a chain of +// multiple backends. The section size and required confirmation count parameters +// are randomized. +func testChainIndexer(t *testing.T, count int) { + db, _ := ethdb.NewMemDatabase() + defer db.Close() + + // Create a chain of indexers and ensure they all report empty + backends := make([]*testChainIndexBackend, count) + for i := 0; i < count; i++ { + var ( + sectionSize = uint64(rand.Intn(100) + 1) + confirmsReq = uint64(rand.Intn(10)) + ) + backends[i] = &testChainIndexBackend{t: t, processCh: make(chan uint64)} + backends[i].indexer = NewChainIndexer(db, ethdb.NewTable(db, string([]byte{byte(i)})), backends[i], sectionSize, confirmsReq, 0, fmt.Sprintf("indexer-%d", i)) + defer backends[i].indexer.Close() + + if sections, _, _ := backends[i].indexer.Sections(); sections != 0 { + t.Fatalf("Canonical section count mismatch: have %v, want %v", sections, 0) + } + if i > 0 { + backends[i-1].indexer.AddChildIndexer(backends[i].indexer) + } + } + // notify pings the root indexer about a new head or reorg, then expect + // processed blocks if a section is processable + notify := func(headNum, failNum uint64, reorg bool) { + backends[0].indexer.newHead(headNum, reorg) + if reorg { + for _, backend := range backends { + headNum = backend.reorg(headNum) + backend.assertSections() + } + return + } + var cascade bool + for _, backend := range backends { + headNum, cascade = backend.assertBlocks(headNum, failNum) + if !cascade { + break + } + backend.assertSections() + } + } + // inject inserts a new random canonical header into the database directly + inject := func(number uint64) { + header := &types.Header{Number: big.NewInt(int64(number)), Extra: big.NewInt(rand.Int63()).Bytes()} + if number > 0 { + header.ParentHash = GetCanonicalHash(db, number-1) + } + WriteHeader(db, header) + WriteCanonicalHash(db, header.Hash(), number) + } + // Start indexer with an already existing chain + for i := uint64(0); i <= 100; i++ { + inject(i) + } + notify(100, 100, false) + + // Add new blocks one by one + for i := uint64(101); i <= 1000; i++ { + inject(i) + notify(i, i, false) + } + // Do a reorg + notify(500, 500, true) + + // Create new fork + for i := uint64(501); i <= 1000; i++ { + inject(i) + notify(i, i, false) + } + for i := uint64(1001); i <= 1500; i++ { + inject(i) + } + // Failed processing scenario where less blocks are available than notified + notify(2000, 1500, false) + + // Notify about a reorg (which could have caused the missing blocks if happened during processing) + notify(1500, 1500, true) + + // Create new fork + for i := uint64(1501); i <= 2000; i++ { + inject(i) + notify(i, i, false) + } +} + +// testChainIndexBackend implements ChainIndexerBackend +type testChainIndexBackend struct { + t *testing.T + indexer *ChainIndexer + section, headerCnt, stored uint64 + processCh chan uint64 +} + +// assertSections verifies if a chain indexer has the correct number of section. +func (b *testChainIndexBackend) assertSections() { + // Keep trying for 3 seconds if it does not match + var sections uint64 + for i := 0; i < 300; i++ { + sections, _, _ = b.indexer.Sections() + if sections == b.stored { + return + } + time.Sleep(10 * time.Millisecond) + } + b.t.Fatalf("Canonical section count mismatch: have %v, want %v", sections, b.stored) +} + +// assertBlocks expects processing calls after new blocks have arrived. If the +// failNum < headNum then we are simulating a scenario where a reorg has happened +// after the processing has started and the processing of a section fails. +func (b *testChainIndexBackend) assertBlocks(headNum, failNum uint64) (uint64, bool) { + var sections uint64 + if headNum >= b.indexer.confirmsReq { + sections = (headNum + 1 - b.indexer.confirmsReq) / b.indexer.sectionSize + if sections > b.stored { + // expect processed blocks + for expectd := b.stored * b.indexer.sectionSize; expectd < sections*b.indexer.sectionSize; expectd++ { + if expectd > failNum { + // rolled back after processing started, no more process calls expected + // wait until updating is done to make sure that processing actually fails + var updating bool + for i := 0; i < 300; i++ { + b.indexer.lock.Lock() + updating = b.indexer.knownSections > b.indexer.storedSections + b.indexer.lock.Unlock() + if !updating { + break + } + time.Sleep(10 * time.Millisecond) + } + if updating { + b.t.Fatalf("update did not finish") + } + sections = expectd / b.indexer.sectionSize + break + } + select { + case <-time.After(10 * time.Second): + b.t.Fatalf("Expected processed block #%d, got nothing", expectd) + case processed := <-b.processCh: + if processed != expectd { + b.t.Errorf("Expected processed block #%d, got #%d", expectd, processed) + } + } + } + b.stored = sections + } + } + if b.stored == 0 { + return 0, false + } + return b.stored*b.indexer.sectionSize - 1, true +} + +func (b *testChainIndexBackend) reorg(headNum uint64) uint64 { + firstChanged := headNum / b.indexer.sectionSize + if firstChanged < b.stored { + b.stored = firstChanged + } + return b.stored * b.indexer.sectionSize +} + +func (b *testChainIndexBackend) Reset(section uint64) { + b.section = section + b.headerCnt = 0 +} + +func (b *testChainIndexBackend) Process(header *types.Header) { + b.headerCnt++ + if b.headerCnt > b.indexer.sectionSize { + b.t.Error("Processing too many headers") + } + //t.processCh <- header.Number.Uint64() + select { + case <-time.After(10 * time.Second): + b.t.Fatal("Unexpected call to Process") + case b.processCh <- header.Number.Uint64(): + } +} + +func (b *testChainIndexBackend) Commit(db ethdb.Database) error { + if b.headerCnt != b.indexer.sectionSize { + b.t.Error("Not enough headers processed") + } + return nil +} |