diff options
author | Bojie Wu <bojie@dexon.org> | 2018-10-09 13:28:45 +0800 |
---|---|---|
committer | Wei-Ning Huang <w@dexon.org> | 2019-04-09 21:32:52 +0800 |
commit | 702f086745d19e51657502de7a94d39690be55f7 (patch) | |
tree | e223c6a37bc35705835edff1c37a0ffc8c3c44a3 | |
parent | 699dbac51c7344c7659c6c63f3bc720160f20843 (diff) | |
download | dexon-702f086745d19e51657502de7a94d39690be55f7.tar dexon-702f086745d19e51657502de7a94d39690be55f7.tar.gz dexon-702f086745d19e51657502de7a94d39690be55f7.tar.bz2 dexon-702f086745d19e51657502de7a94d39690be55f7.tar.lz dexon-702f086745d19e51657502de7a94d39690be55f7.tar.xz dexon-702f086745d19e51657502de7a94d39690be55f7.tar.zst dexon-702f086745d19e51657502de7a94d39690be55f7.zip |
app: using lock correctly to use map safely
-rw-r--r-- | core/blockchain.go | 142 | ||||
-rw-r--r-- | dex/app.go | 140 |
2 files changed, 163 insertions, 119 deletions
diff --git a/core/blockchain.go b/core/blockchain.go index 9b0620679..0e86f2b59 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -142,14 +142,16 @@ type BlockChain struct { badBlocks *lru.Cache // Bad block cache shouldPreserve func(*types.Block) bool // Function used to determine whether should preserve the given block. - confirmedBlocks map[coreCommon.Hash]*blockInfo - addressNonce map[common.Address]uint64 - addressCost map[common.Address]*big.Int - addressCounter map[common.Address]uint64 - chainLastHeight map[uint32]uint64 - - pendingBlockMu *sync.Mutex - pendingBlocks map[uint64]struct { + confirmedBlocksMu sync.RWMutex + confirmedBlocks map[coreCommon.Hash]*blockInfo + addressNonce map[common.Address]uint64 + addressCost map[common.Address]*big.Int + addressCounter map[common.Address]uint64 + chainLastHeight map[uint32]uint64 + + pendingBlockMu sync.RWMutex + lastPendingHeight uint64 + pendingBlocks map[uint64]struct { block *types.Block receipts types.Receipts } @@ -174,27 +176,25 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par badBlocks, _ := lru.New(badBlockLimit) bc := &BlockChain{ - chainConfig: chainConfig, - cacheConfig: cacheConfig, - db: db, - triegc: prque.New(nil), - stateCache: state.NewDatabaseWithCache(db, cacheConfig.TrieCleanLimit), - quit: make(chan struct{}), - shouldPreserve: shouldPreserve, - bodyCache: bodyCache, - bodyRLPCache: bodyRLPCache, - receiptsCache: receiptsCache, - blockCache: blockCache, - futureBlocks: futureBlocks, - engine: engine, - vmConfig: vmConfig, - badBlocks: badBlocks, - confirmedBlocks: make(map[coreCommon.Hash]*blockInfo), + chainConfig: chainConfig, + cacheConfig: cacheConfig, + db: db, + triegc: prque.New(nil), + stateCache: state.NewDatabaseWithCache(db, cacheConfig.TrieCleanLimit), + quit: make(chan struct{}), + bodyCache: bodyCache, + bodyRLPCache: bodyRLPCache, + receiptsCache: receiptsCache, + blockCache: blockCache, + futureBlocks: futureBlocks, + engine: engine, + vmConfig: vmConfig, + badBlocks: badBlocks, pendingBlocks: make(map[uint64]struct { block *types.Block receipts types.Receipts }), - pendingBlockMu: &sync.Mutex{}, + confirmedBlocks: make(map[coreCommon.Hash]*blockInfo), addressNonce: make(map[common.Address]uint64), addressCost: make(map[common.Address]*big.Int), addressCounter: make(map[common.Address]uint64), @@ -248,6 +248,9 @@ type blockInfo struct { } func (bc *BlockChain) AddConfirmedBlock(block *coreTypes.Block) error { + bc.confirmedBlocksMu.Lock() + defer bc.confirmedBlocksMu.Unlock() + var transactions types.Transactions err := rlp.Decode(bytes.NewReader(block.Payload), &transactions) if err != nil { @@ -300,20 +303,32 @@ func (bc *BlockChain) RemoveConfirmedBlock(hash coreCommon.Hash) { } func (bc *BlockChain) GetConfirmedBlockByHash(hash coreCommon.Hash) *coreTypes.Block { + bc.confirmedBlocksMu.RLock() + defer bc.confirmedBlocksMu.RUnlock() + return bc.confirmedBlocks[hash].block } func (bc *BlockChain) GetLastNonceInConfirmedBlocks(address common.Address) (uint64, bool) { + bc.confirmedBlocksMu.RLock() + defer bc.confirmedBlocksMu.RUnlock() + nonce, exist := bc.addressNonce[address] return nonce, exist } func (bc *BlockChain) GetCostInConfirmedBlocks(address common.Address) (*big.Int, bool) { + bc.confirmedBlocksMu.RLock() + defer bc.confirmedBlocksMu.RUnlock() + cost, exist := bc.addressCost[address] return cost, exist } func (bc *BlockChain) GetChainLastConfirmedHeight(chainID uint32) uint64 { + bc.confirmedBlocksMu.RLock() + defer bc.confirmedBlocksMu.RUnlock() + return bc.chainLastHeight[chainID] } @@ -1493,20 +1508,13 @@ func (bc *BlockChain) insertSidechain(block *types.Block, it *insertIterator) (i return 0, nil, nil, nil } -func (bc *BlockChain) ProcessPendingBlock(block *types.Block, witness *coreTypes.Witness) (int, error) { +func (bc *BlockChain) ProcessPendingBlock(block *types.Block, witness *coreTypes.Witness) (*common.Hash, error) { n, events, logs, err := bc.processPendingBlock(block, witness) bc.PostChainEvents(events, logs) return n, err } -func (bc *BlockChain) processPendingBlock(block *types.Block, witness *coreTypes.Witness) (int, []interface{}, []*types.Log, error) { - // Pre-checks passed, start the full block imports - bc.wg.Add(1) - defer bc.wg.Done() - - bc.chainmu.Lock() - defer bc.chainmu.Unlock() - +func (bc *BlockChain) processPendingBlock(block *types.Block, witness *coreTypes.Witness) (*common.Hash, []interface{}, []*types.Log, error) { // A queued approach to delivering events. This is generally // faster than direct delivery and requires much less mutex // acquiring. @@ -1528,19 +1536,19 @@ func (bc *BlockChain) processPendingBlock(block *types.Block, witness *coreTypes if atomic.LoadInt32(&bc.procInterrupt) == 1 { log.Debug("Premature abort during blocks processing") - return 0, nil, nil, fmt.Errorf("interrupt") + return nil, nil, nil, fmt.Errorf("interrupt") } bstart := time.Now() currentBlock := bc.CurrentBlock() if witness.Height > currentBlock.NumberU64() && witness.Height != 0 { if bc.pendingBlocks[witness.Height].block.Root() != witnessData.Root { - return 0, nil, nil, fmt.Errorf("invalid witness root %s vs %s", + return nil, nil, nil, fmt.Errorf("invalid witness root %s vs %s", bc.pendingBlocks[witness.Height].block.Root().String(), witnessData.Root.String()) } if bc.pendingBlocks[witness.Height].block.ReceiptHash() != witnessData.ReceiptHash { - return 0, nil, nil, fmt.Errorf("invalid witness receipt hash %s vs %s", + return nil, nil, nil, fmt.Errorf("invalid witness receipt hash %s vs %s", bc.pendingBlocks[witness.Height].block.ReceiptHash().String(), witnessData.ReceiptHash.String()) } } @@ -1552,7 +1560,7 @@ func (bc *BlockChain) processPendingBlock(block *types.Block, witness *coreTypes if !exist { parentBlock = currentBlock if parentBlock.NumberU64() != block.NumberU64()-1 { - return 0, nil, nil, fmt.Errorf("parent block %d not exist", block.NumberU64()-1) + return nil, nil, nil, fmt.Errorf("parent block %d not exist", block.NumberU64()-1) } } else { parentBlock = parent.block @@ -1560,7 +1568,7 @@ func (bc *BlockChain) processPendingBlock(block *types.Block, witness *coreTypes block.RawHeader().ParentHash = parentBlock.Hash() pendingState, err = state.New(parentBlock.Root(), bc.stateCache) if err != nil { - return 0, nil, nil, err + return nil, nil, nil, err } var ( @@ -1574,7 +1582,7 @@ func (bc *BlockChain) processPendingBlock(block *types.Block, witness *coreTypes pendingState.Prepare(tx.Hash(), block.Hash(), i) receipt, _, err := ApplyTransaction(bc.chainConfig, bc, nil, gp, pendingState, header, tx, usedGas, bc.vmConfig) if err != nil { - return i, nil, nil, fmt.Errorf("apply transaction error: %v %d", err, tx.Nonce()) + return nil, nil, nil, fmt.Errorf("apply transaction error: %v %d", err, tx.Nonce()) } receipts = append(receipts, receipt) log.Debug("Apply transaction", "tx.hash", tx.Hash(), "nonce", tx.Nonce(), "amount", tx.Value()) @@ -1583,28 +1591,25 @@ func (bc *BlockChain) processPendingBlock(block *types.Block, witness *coreTypes header.GasUsed = *usedGas newPendingBlock, err := bc.engine.Finalize(bc, header, pendingState, block.Transactions(), block.Uncles(), receipts) if err != nil { - return 0, nil, nil, fmt.Errorf("finalize error: %v", err) + return nil, nil, nil, fmt.Errorf("finalize error: %v", err) } // Validate the state using the default validator err = bc.Validator().ValidateState(block, nil, pendingState, receipts, *usedGas) if err != nil { bc.reportBlock(block, receipts, err) - return 0, nil, nil, fmt.Errorf("valiadte state error: %v", err) + return nil, nil, nil, fmt.Errorf("valiadte state error: %v", err) } proctime := time.Since(bstart) // commit state to refresh stateCache _, err = pendingState.Commit(true) if err != nil { - return 0, nil, nil, fmt.Errorf("pendingState commit error: %v", err) + return nil, nil, nil, fmt.Errorf("pendingState commit error: %v", err) } // add into pending blocks - bc.pendingBlocks[block.NumberU64()] = struct { - block *types.Block - receipts types.Receipts - }{block: newPendingBlock, receipts: receipts} + bc.addPendingBlock(newPendingBlock, receipts) // start insert available pending blocks into db for pendingHeight := bc.CurrentBlock().NumberU64() + 1; pendingHeight <= witness.Height; pendingHeight++ { @@ -1616,13 +1621,13 @@ func (bc *BlockChain) processPendingBlock(block *types.Block, witness *coreTypes s, err := state.New(pendingIns.block.Root(), bc.stateCache) if err != nil { - return 0, events, coalescedLogs, err + return nil, events, coalescedLogs, err } // Write the block to the chain and get the status. status, err := bc.WriteBlockWithState(pendingIns.block, pendingIns.receipts, s) if err != nil { - return 0, events, coalescedLogs, fmt.Errorf("WriteBlockWithState error: %v", err) + return nil, events, coalescedLogs, fmt.Errorf("WriteBlockWithState error: %v", err) } switch status { @@ -1643,11 +1648,9 @@ func (bc *BlockChain) processPendingBlock(block *types.Block, witness *coreTypes bc.gcproc += proctime case SideStatTy: - return 0, nil, nil, fmt.Errorf("insert pending block and fork found") + return nil, nil, nil, fmt.Errorf("insert pending block and fork found") } - bc.pendingBlockMu.Lock() - delete(bc.pendingBlocks, pendingHeight) - bc.pendingBlockMu.Unlock() + bc.removePendingBlock(pendingHeight) stats.processed++ stats.usedGas += pendingIns.block.GasUsed() @@ -1660,13 +1663,40 @@ func (bc *BlockChain) processPendingBlock(block *types.Block, witness *coreTypes events = append(events, ChainHeadEvent{lastCanon}) } - return 0, events, coalescedLogs, nil + root := newPendingBlock.Root() + return &root, events, coalescedLogs, nil +} + +func (bc *BlockChain) removePendingBlock(height uint64) { + bc.pendingBlockMu.Lock() + defer bc.pendingBlockMu.Unlock() + + delete(bc.pendingBlocks, height) } -func (bc *BlockChain) GetPendingBlockByHeight(height uint64) *types.Block { +func (bc *BlockChain) addPendingBlock(block *types.Block, receipts types.Receipts) { bc.pendingBlockMu.Lock() defer bc.pendingBlockMu.Unlock() - return bc.pendingBlocks[height].block + + bc.pendingBlocks[block.NumberU64()] = struct { + block *types.Block + receipts types.Receipts + }{block: block, receipts: receipts} + bc.lastPendingHeight = block.NumberU64() +} + +func (bc *BlockChain) GetLastPendingHeight() uint64 { + bc.pendingBlockMu.RLock() + defer bc.pendingBlockMu.RUnlock() + + return bc.lastPendingHeight +} + +func (bc *BlockChain) GetLastPendingBlock() *types.Block { + bc.pendingBlockMu.RLock() + defer bc.pendingBlockMu.RUnlock() + + return bc.pendingBlocks[bc.lastPendingHeight].block } // reorg takes two blocks, an old chain and a new chain and will reconstruct the diff --git a/dex/app.go b/dex/app.go index b924ab620..6c6d8da53 100644 --- a/dex/app.go +++ b/dex/app.go @@ -29,7 +29,6 @@ import ( "github.com/dexon-foundation/dexon/common" "github.com/dexon-foundation/dexon/core" - "github.com/dexon-foundation/dexon/core/state" "github.com/dexon-foundation/dexon/core/types" "github.com/dexon-foundation/dexon/core/vm" "github.com/dexon-foundation/dexon/ethdb" @@ -47,13 +46,15 @@ type DexconApp struct { vmConfig vm.Config notifyChan map[uint64]*notify - notifyMu *sync.Mutex + notifyMu sync.Mutex - lastPendingHeight uint64 - insertMu sync.Mutex + chainLatestRootMu sync.RWMutex + chainLatestRoot map[uint32]*common.Hash - chainLocksInitMu *sync.Mutex - chainLocks map[uint32]*sync.Mutex + insertMu sync.Mutex + + chainLocksInitMu sync.Mutex + chainLocks map[uint32]*sync.RWMutex } type notify struct { @@ -68,16 +69,15 @@ type witnessData struct { func NewDexconApp(txPool *core.TxPool, blockchain *core.BlockChain, gov *DexconGovernance, chainDB ethdb.Database, config *Config, vmConfig vm.Config) *DexconApp { return &DexconApp{ - txPool: txPool, - blockchain: blockchain, - gov: gov, - chainDB: chainDB, - config: config, - vmConfig: vmConfig, - notifyChan: make(map[uint64]*notify), - notifyMu: &sync.Mutex{}, - chainLocksInitMu: &sync.Mutex{}, - chainLocks: make(map[uint32]*sync.Mutex), + txPool: txPool, + blockchain: blockchain, + gov: gov, + chainDB: chainDB, + config: config, + vmConfig: vmConfig, + notifyChan: make(map[uint64]*notify), + chainLocks: make(map[uint32]*sync.RWMutex), + chainLatestRoot: make(map[uint32]*common.Hash), } } @@ -105,7 +105,6 @@ func (d *DexconApp) notify(height uint64) { delete(d.notifyChan, h) } } - d.lastPendingHeight = height } func (d *DexconApp) checkChain(address common.Address, chainSize, chainID *big.Int) bool { @@ -115,8 +114,8 @@ func (d *DexconApp) checkChain(address common.Address, chainSize, chainID *big.I // PreparePayload is called when consensus core is preparing payload for block. func (d *DexconApp) PreparePayload(position coreTypes.Position) (payload []byte, err error) { - d.chainLock(position.ChainID) - defer d.chainUnlock(position.ChainID) + d.chainRLock(position.ChainID) + defer d.chainRUnlock(position.ChainID) if position.Height != 0 { // check if chain block height is sequential @@ -127,21 +126,16 @@ func (d *DexconApp) PreparePayload(position coreTypes.Position) (payload []byte, } } - // set state to the pending height - var latestState *state.StateDB - lastPendingBlock := d.blockchain.GetPendingBlockByHeight(d.lastPendingHeight) - if d.lastPendingHeight == 0 || lastPendingBlock == nil { - latestState, err = d.blockchain.State() - if err != nil { - log.Error("Get current state", "error", err) - return nil, fmt.Errorf("get current state error %v", err) - } - } else { - latestState, err = d.blockchain.StateAt(lastPendingBlock.Root()) - if err != nil { - log.Error("Get pending state", "error", err) - return nil, fmt.Errorf("get pending state error: %v", err) - } + root := d.getChainLatestRoot(position.ChainID) + if root == nil { + currentRoot := d.blockchain.CurrentBlock().Root() + root = ¤tRoot + } + // set state to the chain latest height + latestState, err := d.blockchain.StateAt(*root) + if err != nil { + log.Error("Get pending state", "error", err) + return nil, fmt.Errorf("get pending state error: %v", err) } txsMap, err := d.txPool.Pending() @@ -220,16 +214,13 @@ addressMap: // PrepareWitness will return the witness data no lower than consensusHeight. func (d *DexconApp) PrepareWitness(consensusHeight uint64) (witness coreTypes.Witness, err error) { var witnessBlock *types.Block - if d.lastPendingHeight == 0 && consensusHeight == 0 { + lastPendingHeight := d.blockchain.GetLastPendingHeight() + if lastPendingHeight == 0 && consensusHeight == 0 { witnessBlock = d.blockchain.CurrentBlock() - } else if d.lastPendingHeight >= consensusHeight { - d.insertMu.Lock() - witnessBlock = d.blockchain.GetPendingBlockByHeight(d.lastPendingHeight) - d.insertMu.Unlock() + } else if lastPendingHeight >= consensusHeight { + witnessBlock = d.blockchain.GetLastPendingBlock() } else if h := <-d.addNotify(consensusHeight); h >= consensusHeight { - d.insertMu.Lock() - witnessBlock = d.blockchain.GetPendingBlockByHeight(h) - d.insertMu.Unlock() + witnessBlock = d.blockchain.GetLastPendingBlock() } else { log.Error("need pending block") return witness, fmt.Errorf("need pending block") @@ -273,8 +264,8 @@ func (d *DexconApp) VerifyBlock(block *coreTypes.Block) coreTypes.BlockVerifySta return coreTypes.VerifyRetryLater } - d.chainLock(block.Position.ChainID) - defer d.chainUnlock(block.Position.ChainID) + d.chainRLock(block.Position.ChainID) + defer d.chainRUnlock(block.Position.ChainID) if block.Position.Height != 0 { // check if chain block height is sequential @@ -285,21 +276,16 @@ func (d *DexconApp) VerifyBlock(block *coreTypes.Block) coreTypes.BlockVerifySta } } - // set state to the pending height - var latestState *state.StateDB - lastPendingBlock := d.blockchain.GetPendingBlockByHeight(d.lastPendingHeight) - if d.lastPendingHeight == 0 || lastPendingBlock == nil { - latestState, err = d.blockchain.State() - if err != nil { - log.Error("Get current state", "error", err) - return coreTypes.VerifyInvalidBlock - } - } else { - latestState, err = d.blockchain.StateAt(lastPendingBlock.Root()) - if err != nil { - log.Error("Get pending state", "error", err) - return coreTypes.VerifyInvalidBlock - } + root := d.getChainLatestRoot(block.Position.ChainID) + if root == nil { + currentRoot := d.blockchain.CurrentBlock().Root() + root = ¤tRoot + } + // set state to the chain latest height + latestState, err := d.blockchain.StateAt(*root) + if err != nil { + log.Error("Get pending state", "error", err) + return coreTypes.VerifyInvalidBlock } var transactions types.Transactions @@ -430,11 +416,12 @@ func (d *DexconApp) BlockDelivered(blockHash coreCommon.Hash, result coreTypes.F Randomness: result.Randomness, }, transactions, nil, nil) - _, err = d.blockchain.ProcessPendingBlock(newBlock, &block.Witness) + root, err := d.blockchain.ProcessPendingBlock(newBlock, &block.Witness) if err != nil { log.Error("Insert chain", "error", err) panic(err) } + d.setChainLatestRoot(block.Position.ChainID, root) log.Info("Insert pending block success", "height", result.Height) d.blockchain.RemoveConfirmedBlock(blockHash) @@ -473,17 +460,44 @@ func (d *DexconApp) validateNonce(txs types.Transactions) (map[common.Address]ui return addressFirstNonce, nil } -func (d *DexconApp) chainLock(chainID uint32) { +func (d *DexconApp) getChainLatestRoot(chainID uint32) *common.Hash { + d.chainLatestRootMu.RLock() + defer d.chainLatestRootMu.RUnlock() + + return d.chainLatestRoot[chainID] +} + +func (d *DexconApp) setChainLatestRoot(chainID uint32, root *common.Hash) { + d.chainLatestRootMu.Lock() + defer d.chainLatestRootMu.Unlock() + + d.chainLatestRoot[chainID] = root +} + +func (d *DexconApp) chainLockInit(chainID uint32) { d.chainLocksInitMu.Lock() + defer d.chainLocksInitMu.Unlock() + _, exist := d.chainLocks[chainID] if !exist { - d.chainLocks[chainID] = &sync.Mutex{} + d.chainLocks[chainID] = &sync.RWMutex{} } - d.chainLocksInitMu.Unlock() +} +func (d *DexconApp) chainLock(chainID uint32) { + d.chainLockInit(chainID) d.chainLocks[chainID].Lock() } func (d *DexconApp) chainUnlock(chainID uint32) { d.chainLocks[chainID].Unlock() } + +func (d *DexconApp) chainRLock(chainID uint32) { + d.chainLockInit(chainID) + d.chainLocks[chainID].RLock() +} + +func (d *DexconApp) chainRUnlock(chainID uint32) { + d.chainLocks[chainID].RUnlock() +} |