From 91d3c571ef5e5628917c5a27d16bc0d8d7083224 Mon Sep 17 00:00:00 2001 From: Sonic Date: Tue, 20 Nov 2018 12:12:39 +0800 Subject: core: implement insert chain, headerchain logic --- core/blockchain.go | 226 ++++++++++++++++++++++++++++++++++++++++++++++++++++ core/governance.go | 102 ++++++++++++++++++++++++ core/headerchain.go | 193 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 521 insertions(+) create mode 100644 core/governance.go diff --git a/core/blockchain.go b/core/blockchain.go index 8bf680c38..293bb88f4 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -29,6 +29,7 @@ import ( "time" coreCommon "github.com/dexon-foundation/dexon-consensus/common" + dexCore "github.com/dexon-foundation/dexon-consensus/core" coreTypes "github.com/dexon-foundation/dexon-consensus/core/types" "github.com/hashicorp/golang-lru" @@ -145,6 +146,8 @@ type BlockChain struct { roundHeightMap sync.Map + verifierCache *dexCore.TSigVerifierCache + confirmedBlockInitMu sync.Mutex confirmedBlocks map[uint32]map[coreCommon.Hash]*blockInfo addressNonce map[uint32]map[common.Address]uint64 @@ -231,6 +234,9 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par } } + gov := NewGovernance(bc) + bc.verifierCache = dexCore.NewTSigVerifierCache(gov, 5) + // Take ownership of this particular state go bc.update() return bc, nil @@ -1007,6 +1013,9 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ bytes += batch.ValueSize() batch.Reset() } + if _, ok := bc.GetRoundHeight(block.Round()); !ok { + bc.storeRoundHeight(block.Round(), block.NumberU64()) + } } if batch.ValueSize() > 0 { bytes += batch.ValueSize() @@ -1527,6 +1536,205 @@ func (bc *BlockChain) insertSidechain(block *types.Block, it *insertIterator) (i return 0, nil, nil, nil } +// InsertChain2 attempts to insert the given batch of blocks in to the canonical +// chain or, otherwise, create a fork. If an error is returned it will return +// the index number of the failing block as well an error describing what went +// wrong. +// +// After insertion is done, all accumulated events will be fired. +func (bc *BlockChain) InsertChain2(chain types.Blocks) (int, error) { + n, events, logs, err := bc.insertChain2(chain) + bc.PostChainEvents(events, logs) + return n, err +} + +// insertChain2 will execute the actual chain insertion and event aggregation. The +// only reason this method exists as a separate one is to make locking cleaner +// with deferred statements. +func (bc *BlockChain) insertChain2(chain types.Blocks) (int, []interface{}, []*types.Log, error) { + // Sanity check that we have something meaningful to import + if len(chain) == 0 { + return 0, nil, nil, nil + } + // Do a sanity check that the provided chain is actually ordered and linked + for i := 1; i < len(chain); i++ { + if chain[i].NumberU64() != chain[i-1].NumberU64()+1 || chain[i].ParentHash() != chain[i-1].Hash() { + // Chain broke ancestry, log a message (programming error) and skip insertion + log.Error("Non contiguous block insert", "number", chain[i].Number(), "hash", chain[i].Hash(), + "parent", chain[i].ParentHash(), "prevnumber", chain[i-1].Number(), "prevhash", chain[i-1].Hash()) + + return 0, nil, nil, fmt.Errorf("non contiguous insert: item %d is #%d [%x…], item %d is #%d [%x…] (parent [%x…])", i-1, chain[i-1].NumberU64(), + chain[i-1].Hash().Bytes()[:4], i, chain[i].NumberU64(), chain[i].Hash().Bytes()[:4], chain[i].ParentHash().Bytes()[:4]) + } + } + // Pre-checks passed, start the full block imports + bc.wg.Add(1) + defer bc.wg.Done() + + bc.chainmu.Lock() + defer bc.chainmu.Unlock() + + // A queued approach to delivering events. This is generally + // faster than direct delivery and requires much less mutex + // acquiring. + var ( + stats = insertStats{startTime: mclock.Now()} + events = make([]interface{}, 0, len(chain)) + lastCanon *types.Block + coalescedLogs []*types.Log + ) + + // Start a parallel signature recovery (signer will fluke on fork transition, minimal perf loss) + senderCacher.recoverFromBlocks(types.MakeSigner(bc.chainConfig, chain[0].Number()), chain) + + // Iterate over the blocks and insert when the verifier permits + for i, block := range chain { + // If the chain is terminating, stop processing blocks + if atomic.LoadInt32(&bc.procInterrupt) == 1 { + log.Debug("Premature abort during blocks processing") + break + } + // If the header is a banned one, straight out abort + if BadHashes[block.Hash()] { + bc.reportBlock(block, nil, ErrBlacklistedHash) + return i, events, coalescedLogs, ErrBlacklistedHash + } + // Wait for the block's verification to complete + bstart := time.Now() + + err := bc.hc.verifyTSig(block.Header(), bc.verifierCache) + if err == nil { + err = bc.Validator().ValidateBody(block) + } + switch { + case err == ErrKnownBlock: + // Block and state both already known. However if the current block is below + // this number we did a rollback and we should reimport it nonetheless. + if bc.CurrentBlock().NumberU64() >= block.NumberU64() { + stats.ignored++ + continue + } + + case err == consensus.ErrFutureBlock: + // Allow up to MaxFuture second in the future blocks. If this limit is exceeded + // the chain is discarded and processed at a later time if given. + max := time.Now().Unix() + maxTimeFutureBlocks + if block.Time() > uint64(max) { + return i, events, coalescedLogs, fmt.Errorf("future block: %v > %v", block.Time(), max) + } + bc.futureBlocks.Add(block.Hash(), block) + stats.queued++ + continue + + case err == consensus.ErrUnknownAncestor && bc.futureBlocks.Contains(block.ParentHash()): + bc.futureBlocks.Add(block.Hash(), block) + stats.queued++ + continue + + case err == consensus.ErrPrunedAncestor: + // Block competing with the canonical chain, store in the db, but don't process + // until the competitor TD goes above the canonical TD + currentBlock := bc.CurrentBlock() + localTd := bc.GetTd(currentBlock.Hash(), currentBlock.NumberU64()) + externTd := new(big.Int).Add(bc.GetTd(block.ParentHash(), block.NumberU64()-1), block.Difficulty()) + if localTd.Cmp(externTd) > 0 { + if err = bc.WriteBlockWithoutState(block, externTd); err != nil { + return i, events, coalescedLogs, err + } + continue + } + // Competitor chain beat canonical, gather all blocks from the common ancestor + var winner []*types.Block + + parent := bc.GetBlock(block.ParentHash(), block.NumberU64()-1) + for !bc.HasState(parent.Root()) { + winner = append(winner, parent) + parent = bc.GetBlock(parent.ParentHash(), parent.NumberU64()-1) + } + for j := 0; j < len(winner)/2; j++ { + winner[j], winner[len(winner)-1-j] = winner[len(winner)-1-j], winner[j] + } + // Import all the pruned blocks to make the state available + bc.chainmu.Unlock() + _, evs, logs, err := bc.insertChain2(winner) + bc.chainmu.Lock() + events, coalescedLogs = evs, logs + + if err != nil { + return i, events, coalescedLogs, err + } + + case err != nil: + bc.reportBlock(block, nil, err) + return i, events, coalescedLogs, err + } + // Create a new statedb using the parent block and report an + // error if it fails. + var parent *types.Block + if i == 0 { + parent = bc.GetBlock(block.ParentHash(), block.NumberU64()-1) + } else { + parent = chain[i-1] + } + state, err := state.New(parent.Root(), bc.stateCache) + if err != nil { + return i, events, coalescedLogs, err + } + // Process block using the parent state as reference point. + receipts, logs, usedGas, err := bc.processor.Process(block, state, bc.vmConfig) + if err != nil { + bc.reportBlock(block, receipts, err) + return i, events, coalescedLogs, err + } + // Validate the state using the default validator + err = bc.Validator().ValidateState(block, parent, state, receipts, usedGas) + if err != nil { + bc.reportBlock(block, receipts, err) + return i, events, coalescedLogs, err + } + proctime := time.Since(bstart) + + // Write the block to the chain and get the status. + status, err := bc.WriteBlockWithState(block, receipts, state) + if err != nil { + return i, events, coalescedLogs, err + } + switch status { + case CanonStatTy: + log.Debug("Inserted new block", "number", block.Number(), "hash", block.Hash(), "uncles", len(block.Uncles()), + "txs", len(block.Transactions()), "gas", block.GasUsed(), "elapsed", common.PrettyDuration(time.Since(bstart))) + + coalescedLogs = append(coalescedLogs, logs...) + blockInsertTimer.UpdateSince(bstart) + events = append(events, ChainEvent{block, block.Hash(), logs}) + lastCanon = block + + // Only count canonical blocks for GC processing time + bc.gcproc += proctime + + case SideStatTy: + log.Debug("Inserted forked block", "number", block.Number(), "hash", block.Hash(), "diff", block.Difficulty(), "elapsed", + common.PrettyDuration(time.Since(bstart)), "txs", len(block.Transactions()), "gas", block.GasUsed(), "uncles", len(block.Uncles())) + + blockInsertTimer.UpdateSince(bstart) + events = append(events, ChainSideEvent{block}) + } + stats.processed++ + stats.usedGas += usedGas + + cache, _ := bc.stateCache.TrieDB().Size() + stats.report(chain, i, cache) + if _, ok := bc.GetRoundHeight(block.Round()); !ok { + bc.storeRoundHeight(block.Round(), block.NumberU64()) + } + } + // Append a single chain head event if we've progressed the chain + if lastCanon != nil && bc.CurrentBlock().Hash() == lastCanon.Hash() { + events = append(events, ChainHeadEvent{lastCanon}) + } + return 0, events, coalescedLogs, nil +} + func (bc *BlockChain) ProcessPendingBlock(block *types.Block, witness *coreTypes.Witness) (*common.Hash, error) { n, events, logs, err := bc.processPendingBlock(block, witness) bc.PostChainEvents(events, logs) @@ -2014,6 +2222,24 @@ func (bc *BlockChain) InsertHeaderChain(chain []*types.Header, checkFreq int) (i return bc.hc.InsertHeaderChain(chain, whFunc, start) } +func (bc *BlockChain) InsertHeaderChain2(chain []*types.HeaderWithGovState, verifierCache *dexCore.TSigVerifierCache) (int, error) { + start := time.Now() + bc.chainmu.Lock() + defer bc.chainmu.Unlock() + + bc.wg.Add(1) + defer bc.wg.Done() + + whFunc := func(header *types.HeaderWithGovState) error { + bc.mu.Lock() + defer bc.mu.Unlock() + _, err := bc.hc.WriteHeader2(header) + return err + } + + return bc.hc.InsertHeaderChain2(chain, whFunc, start) +} + // writeHeader writes a header into the local chain, given that its parent is // already known. If the total difficulty of the newly inserted header becomes // greater than the current known TD, the canonical chain is re-routed. diff --git a/core/governance.go b/core/governance.go new file mode 100644 index 000000000..261db1e75 --- /dev/null +++ b/core/governance.go @@ -0,0 +1,102 @@ +package core + +import ( + "math/big" + "time" + + dexCore "github.com/dexon-foundation/dexon-consensus/core" + coreTypes "github.com/dexon-foundation/dexon-consensus/core/types" + dkgTypes "github.com/dexon-foundation/dexon-consensus/core/types/dkg" + + "github.com/dexon-foundation/dexon/core/vm" + "github.com/dexon-foundation/dexon/log" + "github.com/dexon-foundation/dexon/rlp" +) + +type Governance struct { + bc *BlockChain +} + +func NewGovernance(bc *BlockChain) *Governance { + return &Governance{bc: bc} +} + +func (g *Governance) getHeadHelper() *vm.GovernanceStateHelper { + headState, err := g.bc.State() + if err != nil { + log.Error("get head state fail", "err", err) + panic(err) + } + return &vm.GovernanceStateHelper{headState} +} + +func (g *Governance) getConfigHelper(round uint64) *vm.GovernanceStateHelper { + if round < dexCore.ConfigRoundShift { + round = 0 + } else { + round -= dexCore.ConfigRoundShift + } + return g.getHelperAtRound(round) +} + +func (g *Governance) getHelperAtRound(round uint64) *vm.GovernanceStateHelper { + headHelper := g.getHeadHelper() + height := headHelper.RoundHeight(big.NewInt(int64(round))).Uint64() + header := g.bc.GetHeaderByNumber(height) + s, err := g.bc.StateAt(header.Root) + if err != nil { + log.Error("get state fail", "err", err) + panic(err) + } + return &vm.GovernanceStateHelper{s} +} + +func (g *Governance) Configuration(round uint64) *coreTypes.Config { + helper := g.getConfigHelper(round) + c := helper.Configuration() + return &coreTypes.Config{ + NumChains: c.NumChains, + LambdaBA: time.Duration(c.LambdaBA) * time.Millisecond, + LambdaDKG: time.Duration(c.LambdaDKG) * time.Millisecond, + K: int(c.K), + PhiRatio: c.PhiRatio, + NotarySetSize: c.NotarySetSize, + DKGSetSize: c.DKGSetSize, + RoundInterval: time.Duration(c.RoundInterval) * time.Millisecond, + MinBlockInterval: time.Duration(c.MinBlockInterval) * time.Millisecond, + } +} + +func (g *Governance) DKGComplaints(round uint64) []*dkgTypes.Complaint { + headHelper := g.getHeadHelper() + var dkgComplaints []*dkgTypes.Complaint + for _, pk := range headHelper.DKGComplaints(big.NewInt(int64(round))) { + x := new(dkgTypes.Complaint) + if err := rlp.DecodeBytes(pk, x); err != nil { + panic(err) + } + dkgComplaints = append(dkgComplaints, x) + } + return dkgComplaints +} + +func (g *Governance) DKGMasterPublicKeys(round uint64) []*dkgTypes.MasterPublicKey { + headHelper := g.getHeadHelper() + var dkgMasterPKs []*dkgTypes.MasterPublicKey + for _, pk := range headHelper.DKGMasterPublicKeys(big.NewInt(int64(round))) { + x := new(dkgTypes.MasterPublicKey) + if err := rlp.DecodeBytes(pk, x); err != nil { + panic(err) + } + dkgMasterPKs = append(dkgMasterPKs, x) + } + return dkgMasterPKs +} + +func (g *Governance) IsDKGFinal(round uint64) bool { + headHelper := g.getHeadHelper() + configHelper := g.getConfigHelper(round) + threshold := 2*configHelper.DKGSetSize().Uint64()/3 + 1 + count := headHelper.DKGFinalizedsCount(big.NewInt(int64(round))).Uint64() + return count >= threshold +} diff --git a/core/headerchain.go b/core/headerchain.go index 76ca1c1dd..a57e509ca 100644 --- a/core/headerchain.go +++ b/core/headerchain.go @@ -26,13 +26,20 @@ import ( "sync/atomic" "time" + dexCore "github.com/dexon-foundation/dexon-consensus/core" + coreCrypto "github.com/dexon-foundation/dexon-consensus/core/crypto" + coreTypes "github.com/dexon-foundation/dexon-consensus/core/types" + "github.com/dexon-foundation/dexon/common" "github.com/dexon-foundation/dexon/consensus" "github.com/dexon-foundation/dexon/core/rawdb" "github.com/dexon-foundation/dexon/core/types" + "github.com/dexon-foundation/dexon/crypto" "github.com/dexon-foundation/dexon/ethdb" "github.com/dexon-foundation/dexon/log" "github.com/dexon-foundation/dexon/params" + "github.com/dexon-foundation/dexon/rlp" + "github.com/dexon-foundation/dexon/trie" "github.com/hashicorp/golang-lru" ) @@ -297,6 +304,192 @@ func (hc *HeaderChain) InsertHeaderChain(chain []*types.Header, writeHeader WhCa return 0, nil } +func (hc *HeaderChain) WriteHeader2(header *types.HeaderWithGovState) (status WriteStatus, err error) { + // Cache some values to prevent constant recalculation + var ( + hash = header.Hash() + number = header.Number.Uint64() + ) + // Calculate the total difficulty of the header + ptd := hc.GetTd(header.ParentHash, number-1) + if ptd == nil { + return NonStatTy, consensus.ErrUnknownAncestor + } + localTd := hc.GetTd(hc.currentHeaderHash, hc.CurrentHeader().Number.Uint64()) + externTd := new(big.Int).Add(header.Difficulty, ptd) + + // Irrelevant of the canonical status, write the td and header to the database + if err := hc.WriteTd(hash, number, externTd); err != nil { + log.Crit("Failed to write header total difficulty", "err", err) + } + rawdb.WriteHeader(hc.chainDb, header.Header) + + // If the total difficulty is higher than our known, add it to the canonical chain + // Second clause in the if statement reduces the vulnerability to selfish mining. + // Please refer to http://www.cs.cornell.edu/~ie53/publications/btcProcFC.pdf + if externTd.Cmp(localTd) > 0 || (externTd.Cmp(localTd) == 0 && mrand.Float64() < 0.5) { + // Delete any canonical number assignments above the new head + batch := hc.chainDb.NewBatch() + for i := number + 1; ; i++ { + hash := rawdb.ReadCanonicalHash(hc.chainDb, i) + if hash == (common.Hash{}) { + break + } + rawdb.DeleteCanonicalHash(batch, i) + } + batch.Write() + + // Overwrite any stale canonical number assignments + var ( + headHash = header.ParentHash + headNumber = header.Number.Uint64() - 1 + headHeader = hc.GetHeader(headHash, headNumber) + ) + for rawdb.ReadCanonicalHash(hc.chainDb, headNumber) != headHash { + rawdb.WriteCanonicalHash(hc.chainDb, headHash, headNumber) + + headHash = headHeader.ParentHash + headNumber = headHeader.Number.Uint64() - 1 + headHeader = hc.GetHeader(headHash, headNumber) + } + // Extend the canonical chain with the new header + rawdb.WriteCanonicalHash(hc.chainDb, hash, number) + rawdb.WriteHeadHeaderHash(hc.chainDb, hash) + + hc.currentHeaderHash = hash + hc.currentHeader.Store(types.CopyHeader(header.Header)) + + status = CanonStatTy + } else { + status = SideStatTy + } + + hc.headerCache.Add(hash, header.Header) + hc.numberCache.Add(hash, number) + + // Store the govState + if govState := header.GovState; govState != nil { + batch := hc.chainDb.NewBatch() + for _, node := range govState.Proof { + batch.Put(crypto.Keccak256(node), node) + } + if err := batch.Write(); err != nil { + panic(fmt.Errorf("DB write error: %v", err)) + } + + triedb := trie.NewDatabase(hc.chainDb) + t, err := trie.New(common.Hash{}, triedb) + if err != nil { + panic(err) + } + + for _, kv := range govState.Storage { + t.TryUpdate(kv[0], kv[1]) + } + t.Commit(nil) + triedb.Commit(t.Hash(), false) + } + return +} + +type Wh2Callback func(*types.HeaderWithGovState) error + +func (hc *HeaderChain) ValidateHeaderChain2(chain []*types.HeaderWithGovState, verifierCache *dexCore.TSigVerifierCache) (int, error) { + // Do a sanity check that the provided chain is actually ordered and linked + for i := 1; i < len(chain); i++ { + if chain[i].Number.Uint64() != chain[i-1].Number.Uint64()+1 || chain[i].ParentHash != chain[i-1].Hash() { + // Chain broke ancestry, log a message (programming error) and skip insertion + log.Error("Non contiguous header insert", "number", chain[i].Number, "hash", chain[i].Hash(), + "parent", chain[i].ParentHash, "prevnumber", chain[i-1].Number, "prevhash", chain[i-1].Hash()) + + return 0, fmt.Errorf("non contiguous insert: item %d is #%d [%x…], item %d is #%d [%x…] (parent [%x…])", i-1, chain[i-1].Number, + chain[i-1].Hash().Bytes()[:4], i, chain[i].Number, chain[i].Hash().Bytes()[:4], chain[i].ParentHash[:4]) + } + } + + // Iterate over the headers and ensure they all check out + for i, header := range chain { + // If the chain is terminating, stop processing blocks + if hc.procInterrupt() { + log.Debug("Premature abort during headers verification") + return 0, errors.New("aborted") + } + + if err := hc.verifyTSig(header.Header, verifierCache); err != nil { + return i, err + } + } + return 0, nil +} + +func (hc *HeaderChain) verifyTSig(header *types.Header, verifierCache *dexCore.TSigVerifierCache) error { + // If the header is a banned one, straight out abort + if BadHashes[header.Hash()] { + return ErrBlacklistedHash + } + + if header.Round == 0 { + return nil + } + + var dexconMeta coreTypes.Block + if err := rlp.DecodeBytes(header.DexconMeta, &dexconMeta); err != nil { + return err + } + + v, ok, err := verifierCache.UpdateAndGet(header.Round) + if err != nil { + panic(err) + } + + if !ok { + panic(fmt.Errorf("DKG of round %d is not finished", header.Round)) + } + + if !v.VerifySignature(dexconMeta.Hash, coreCrypto.Signature{ + Type: "bls", + Signature: header.Randomness}) { + return fmt.Errorf("signature invalid") + } + return nil +} + +// InsertHeaderChain2 attempts to insert the given header chain in to the local +// chain, possibly creating a reorg. If an error is returned, it will return the +// index number of the failing header as well an error describing what went wrong. +// +// The verify parameter can be used to fine tune whether nonce verification +// should be done or not. The reason behind the optional check is because some +// of the header retrieval mechanisms already need to verfy nonces, as well as +// because nonces can be verified sparsely, not needing to check each. +func (hc *HeaderChain) InsertHeaderChain2(chain []*types.HeaderWithGovState, writeHeader Wh2Callback, start time.Time) (int, error) { + // Collect some import statistics to report on + stats := struct{ processed, ignored int }{} + // All headers passed verification, import them into the database + for i, header := range chain { + // Short circuit insertion if shutting down + if hc.procInterrupt() { + log.Debug("Premature abort during headers import") + return i, errors.New("aborted") + } + // If the header's already known, skip it, otherwise store + if hc.HasHeader(header.Hash(), header.Number.Uint64()) { + stats.ignored++ + continue + } + if err := writeHeader(header); err != nil { + return i, err + } + stats.processed++ + } + // Report some public statistics so the user has a clue what's going on + last := chain[len(chain)-1] + log.Info("Imported new block headers", "count", stats.processed, "elapsed", common.PrettyDuration(time.Since(start)), + "number", last.Number, "hash", last.Hash(), "ignored", stats.ignored) + + return 0, nil +} + // GetBlockHashesFromHash retrieves a number of block hashes starting at a given // hash, fetching towards the genesis block. func (hc *HeaderChain) GetBlockHashesFromHash(hash common.Hash, max uint64) []common.Hash { -- cgit v1.2.3