aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBojie Wu <bojie@dexon.org>2018-10-09 13:28:45 +0800
committerWei-Ning Huang <w@dexon.org>2019-04-09 21:32:52 +0800
commit702f086745d19e51657502de7a94d39690be55f7 (patch)
treee223c6a37bc35705835edff1c37a0ffc8c3c44a3
parent699dbac51c7344c7659c6c63f3bc720160f20843 (diff)
downloaddexon-702f086745d19e51657502de7a94d39690be55f7.tar
dexon-702f086745d19e51657502de7a94d39690be55f7.tar.gz
dexon-702f086745d19e51657502de7a94d39690be55f7.tar.bz2
dexon-702f086745d19e51657502de7a94d39690be55f7.tar.lz
dexon-702f086745d19e51657502de7a94d39690be55f7.tar.xz
dexon-702f086745d19e51657502de7a94d39690be55f7.tar.zst
dexon-702f086745d19e51657502de7a94d39690be55f7.zip
app: using lock correctly to use map safely
-rw-r--r--core/blockchain.go142
-rw-r--r--dex/app.go140
2 files changed, 163 insertions, 119 deletions
diff --git a/core/blockchain.go b/core/blockchain.go
index 9b0620679..0e86f2b59 100644
--- a/core/blockchain.go
+++ b/core/blockchain.go
@@ -142,14 +142,16 @@ type BlockChain struct {
badBlocks *lru.Cache // Bad block cache
shouldPreserve func(*types.Block) bool // Function used to determine whether should preserve the given block.
- confirmedBlocks map[coreCommon.Hash]*blockInfo
- addressNonce map[common.Address]uint64
- addressCost map[common.Address]*big.Int
- addressCounter map[common.Address]uint64
- chainLastHeight map[uint32]uint64
-
- pendingBlockMu *sync.Mutex
- pendingBlocks map[uint64]struct {
+ confirmedBlocksMu sync.RWMutex
+ confirmedBlocks map[coreCommon.Hash]*blockInfo
+ addressNonce map[common.Address]uint64
+ addressCost map[common.Address]*big.Int
+ addressCounter map[common.Address]uint64
+ chainLastHeight map[uint32]uint64
+
+ pendingBlockMu sync.RWMutex
+ lastPendingHeight uint64
+ pendingBlocks map[uint64]struct {
block *types.Block
receipts types.Receipts
}
@@ -174,27 +176,25 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
badBlocks, _ := lru.New(badBlockLimit)
bc := &BlockChain{
- chainConfig: chainConfig,
- cacheConfig: cacheConfig,
- db: db,
- triegc: prque.New(nil),
- stateCache: state.NewDatabaseWithCache(db, cacheConfig.TrieCleanLimit),
- quit: make(chan struct{}),
- shouldPreserve: shouldPreserve,
- bodyCache: bodyCache,
- bodyRLPCache: bodyRLPCache,
- receiptsCache: receiptsCache,
- blockCache: blockCache,
- futureBlocks: futureBlocks,
- engine: engine,
- vmConfig: vmConfig,
- badBlocks: badBlocks,
- confirmedBlocks: make(map[coreCommon.Hash]*blockInfo),
+ chainConfig: chainConfig,
+ cacheConfig: cacheConfig,
+ db: db,
+ triegc: prque.New(nil),
+ stateCache: state.NewDatabaseWithCache(db, cacheConfig.TrieCleanLimit),
+ quit: make(chan struct{}),
+ bodyCache: bodyCache,
+ bodyRLPCache: bodyRLPCache,
+ receiptsCache: receiptsCache,
+ blockCache: blockCache,
+ futureBlocks: futureBlocks,
+ engine: engine,
+ vmConfig: vmConfig,
+ badBlocks: badBlocks,
pendingBlocks: make(map[uint64]struct {
block *types.Block
receipts types.Receipts
}),
- pendingBlockMu: &sync.Mutex{},
+ confirmedBlocks: make(map[coreCommon.Hash]*blockInfo),
addressNonce: make(map[common.Address]uint64),
addressCost: make(map[common.Address]*big.Int),
addressCounter: make(map[common.Address]uint64),
@@ -248,6 +248,9 @@ type blockInfo struct {
}
func (bc *BlockChain) AddConfirmedBlock(block *coreTypes.Block) error {
+ bc.confirmedBlocksMu.Lock()
+ defer bc.confirmedBlocksMu.Unlock()
+
var transactions types.Transactions
err := rlp.Decode(bytes.NewReader(block.Payload), &transactions)
if err != nil {
@@ -300,20 +303,32 @@ func (bc *BlockChain) RemoveConfirmedBlock(hash coreCommon.Hash) {
}
func (bc *BlockChain) GetConfirmedBlockByHash(hash coreCommon.Hash) *coreTypes.Block {
+ bc.confirmedBlocksMu.RLock()
+ defer bc.confirmedBlocksMu.RUnlock()
+
return bc.confirmedBlocks[hash].block
}
func (bc *BlockChain) GetLastNonceInConfirmedBlocks(address common.Address) (uint64, bool) {
+ bc.confirmedBlocksMu.RLock()
+ defer bc.confirmedBlocksMu.RUnlock()
+
nonce, exist := bc.addressNonce[address]
return nonce, exist
}
func (bc *BlockChain) GetCostInConfirmedBlocks(address common.Address) (*big.Int, bool) {
+ bc.confirmedBlocksMu.RLock()
+ defer bc.confirmedBlocksMu.RUnlock()
+
cost, exist := bc.addressCost[address]
return cost, exist
}
func (bc *BlockChain) GetChainLastConfirmedHeight(chainID uint32) uint64 {
+ bc.confirmedBlocksMu.RLock()
+ defer bc.confirmedBlocksMu.RUnlock()
+
return bc.chainLastHeight[chainID]
}
@@ -1493,20 +1508,13 @@ func (bc *BlockChain) insertSidechain(block *types.Block, it *insertIterator) (i
return 0, nil, nil, nil
}
-func (bc *BlockChain) ProcessPendingBlock(block *types.Block, witness *coreTypes.Witness) (int, error) {
+func (bc *BlockChain) ProcessPendingBlock(block *types.Block, witness *coreTypes.Witness) (*common.Hash, error) {
n, events, logs, err := bc.processPendingBlock(block, witness)
bc.PostChainEvents(events, logs)
return n, err
}
-func (bc *BlockChain) processPendingBlock(block *types.Block, witness *coreTypes.Witness) (int, []interface{}, []*types.Log, error) {
- // Pre-checks passed, start the full block imports
- bc.wg.Add(1)
- defer bc.wg.Done()
-
- bc.chainmu.Lock()
- defer bc.chainmu.Unlock()
-
+func (bc *BlockChain) processPendingBlock(block *types.Block, witness *coreTypes.Witness) (*common.Hash, []interface{}, []*types.Log, error) {
// A queued approach to delivering events. This is generally
// faster than direct delivery and requires much less mutex
// acquiring.
@@ -1528,19 +1536,19 @@ func (bc *BlockChain) processPendingBlock(block *types.Block, witness *coreTypes
if atomic.LoadInt32(&bc.procInterrupt) == 1 {
log.Debug("Premature abort during blocks processing")
- return 0, nil, nil, fmt.Errorf("interrupt")
+ return nil, nil, nil, fmt.Errorf("interrupt")
}
bstart := time.Now()
currentBlock := bc.CurrentBlock()
if witness.Height > currentBlock.NumberU64() && witness.Height != 0 {
if bc.pendingBlocks[witness.Height].block.Root() != witnessData.Root {
- return 0, nil, nil, fmt.Errorf("invalid witness root %s vs %s",
+ return nil, nil, nil, fmt.Errorf("invalid witness root %s vs %s",
bc.pendingBlocks[witness.Height].block.Root().String(), witnessData.Root.String())
}
if bc.pendingBlocks[witness.Height].block.ReceiptHash() != witnessData.ReceiptHash {
- return 0, nil, nil, fmt.Errorf("invalid witness receipt hash %s vs %s",
+ return nil, nil, nil, fmt.Errorf("invalid witness receipt hash %s vs %s",
bc.pendingBlocks[witness.Height].block.ReceiptHash().String(), witnessData.ReceiptHash.String())
}
}
@@ -1552,7 +1560,7 @@ func (bc *BlockChain) processPendingBlock(block *types.Block, witness *coreTypes
if !exist {
parentBlock = currentBlock
if parentBlock.NumberU64() != block.NumberU64()-1 {
- return 0, nil, nil, fmt.Errorf("parent block %d not exist", block.NumberU64()-1)
+ return nil, nil, nil, fmt.Errorf("parent block %d not exist", block.NumberU64()-1)
}
} else {
parentBlock = parent.block
@@ -1560,7 +1568,7 @@ func (bc *BlockChain) processPendingBlock(block *types.Block, witness *coreTypes
block.RawHeader().ParentHash = parentBlock.Hash()
pendingState, err = state.New(parentBlock.Root(), bc.stateCache)
if err != nil {
- return 0, nil, nil, err
+ return nil, nil, nil, err
}
var (
@@ -1574,7 +1582,7 @@ func (bc *BlockChain) processPendingBlock(block *types.Block, witness *coreTypes
pendingState.Prepare(tx.Hash(), block.Hash(), i)
receipt, _, err := ApplyTransaction(bc.chainConfig, bc, nil, gp, pendingState, header, tx, usedGas, bc.vmConfig)
if err != nil {
- return i, nil, nil, fmt.Errorf("apply transaction error: %v %d", err, tx.Nonce())
+ return nil, nil, nil, fmt.Errorf("apply transaction error: %v %d", err, tx.Nonce())
}
receipts = append(receipts, receipt)
log.Debug("Apply transaction", "tx.hash", tx.Hash(), "nonce", tx.Nonce(), "amount", tx.Value())
@@ -1583,28 +1591,25 @@ func (bc *BlockChain) processPendingBlock(block *types.Block, witness *coreTypes
header.GasUsed = *usedGas
newPendingBlock, err := bc.engine.Finalize(bc, header, pendingState, block.Transactions(), block.Uncles(), receipts)
if err != nil {
- return 0, nil, nil, fmt.Errorf("finalize error: %v", err)
+ return nil, nil, nil, fmt.Errorf("finalize error: %v", err)
}
// Validate the state using the default validator
err = bc.Validator().ValidateState(block, nil, pendingState, receipts, *usedGas)
if err != nil {
bc.reportBlock(block, receipts, err)
- return 0, nil, nil, fmt.Errorf("valiadte state error: %v", err)
+ return nil, nil, nil, fmt.Errorf("valiadte state error: %v", err)
}
proctime := time.Since(bstart)
// commit state to refresh stateCache
_, err = pendingState.Commit(true)
if err != nil {
- return 0, nil, nil, fmt.Errorf("pendingState commit error: %v", err)
+ return nil, nil, nil, fmt.Errorf("pendingState commit error: %v", err)
}
// add into pending blocks
- bc.pendingBlocks[block.NumberU64()] = struct {
- block *types.Block
- receipts types.Receipts
- }{block: newPendingBlock, receipts: receipts}
+ bc.addPendingBlock(newPendingBlock, receipts)
// start insert available pending blocks into db
for pendingHeight := bc.CurrentBlock().NumberU64() + 1; pendingHeight <= witness.Height; pendingHeight++ {
@@ -1616,13 +1621,13 @@ func (bc *BlockChain) processPendingBlock(block *types.Block, witness *coreTypes
s, err := state.New(pendingIns.block.Root(), bc.stateCache)
if err != nil {
- return 0, events, coalescedLogs, err
+ return nil, events, coalescedLogs, err
}
// Write the block to the chain and get the status.
status, err := bc.WriteBlockWithState(pendingIns.block, pendingIns.receipts, s)
if err != nil {
- return 0, events, coalescedLogs, fmt.Errorf("WriteBlockWithState error: %v", err)
+ return nil, events, coalescedLogs, fmt.Errorf("WriteBlockWithState error: %v", err)
}
switch status {
@@ -1643,11 +1648,9 @@ func (bc *BlockChain) processPendingBlock(block *types.Block, witness *coreTypes
bc.gcproc += proctime
case SideStatTy:
- return 0, nil, nil, fmt.Errorf("insert pending block and fork found")
+ return nil, nil, nil, fmt.Errorf("insert pending block and fork found")
}
- bc.pendingBlockMu.Lock()
- delete(bc.pendingBlocks, pendingHeight)
- bc.pendingBlockMu.Unlock()
+ bc.removePendingBlock(pendingHeight)
stats.processed++
stats.usedGas += pendingIns.block.GasUsed()
@@ -1660,13 +1663,40 @@ func (bc *BlockChain) processPendingBlock(block *types.Block, witness *coreTypes
events = append(events, ChainHeadEvent{lastCanon})
}
- return 0, events, coalescedLogs, nil
+ root := newPendingBlock.Root()
+ return &root, events, coalescedLogs, nil
+}
+
+func (bc *BlockChain) removePendingBlock(height uint64) {
+ bc.pendingBlockMu.Lock()
+ defer bc.pendingBlockMu.Unlock()
+
+ delete(bc.pendingBlocks, height)
}
-func (bc *BlockChain) GetPendingBlockByHeight(height uint64) *types.Block {
+func (bc *BlockChain) addPendingBlock(block *types.Block, receipts types.Receipts) {
bc.pendingBlockMu.Lock()
defer bc.pendingBlockMu.Unlock()
- return bc.pendingBlocks[height].block
+
+ bc.pendingBlocks[block.NumberU64()] = struct {
+ block *types.Block
+ receipts types.Receipts
+ }{block: block, receipts: receipts}
+ bc.lastPendingHeight = block.NumberU64()
+}
+
+func (bc *BlockChain) GetLastPendingHeight() uint64 {
+ bc.pendingBlockMu.RLock()
+ defer bc.pendingBlockMu.RUnlock()
+
+ return bc.lastPendingHeight
+}
+
+func (bc *BlockChain) GetLastPendingBlock() *types.Block {
+ bc.pendingBlockMu.RLock()
+ defer bc.pendingBlockMu.RUnlock()
+
+ return bc.pendingBlocks[bc.lastPendingHeight].block
}
// reorg takes two blocks, an old chain and a new chain and will reconstruct the
diff --git a/dex/app.go b/dex/app.go
index b924ab620..6c6d8da53 100644
--- a/dex/app.go
+++ b/dex/app.go
@@ -29,7 +29,6 @@ import (
"github.com/dexon-foundation/dexon/common"
"github.com/dexon-foundation/dexon/core"
- "github.com/dexon-foundation/dexon/core/state"
"github.com/dexon-foundation/dexon/core/types"
"github.com/dexon-foundation/dexon/core/vm"
"github.com/dexon-foundation/dexon/ethdb"
@@ -47,13 +46,15 @@ type DexconApp struct {
vmConfig vm.Config
notifyChan map[uint64]*notify
- notifyMu *sync.Mutex
+ notifyMu sync.Mutex
- lastPendingHeight uint64
- insertMu sync.Mutex
+ chainLatestRootMu sync.RWMutex
+ chainLatestRoot map[uint32]*common.Hash
- chainLocksInitMu *sync.Mutex
- chainLocks map[uint32]*sync.Mutex
+ insertMu sync.Mutex
+
+ chainLocksInitMu sync.Mutex
+ chainLocks map[uint32]*sync.RWMutex
}
type notify struct {
@@ -68,16 +69,15 @@ type witnessData struct {
func NewDexconApp(txPool *core.TxPool, blockchain *core.BlockChain, gov *DexconGovernance, chainDB ethdb.Database, config *Config, vmConfig vm.Config) *DexconApp {
return &DexconApp{
- txPool: txPool,
- blockchain: blockchain,
- gov: gov,
- chainDB: chainDB,
- config: config,
- vmConfig: vmConfig,
- notifyChan: make(map[uint64]*notify),
- notifyMu: &sync.Mutex{},
- chainLocksInitMu: &sync.Mutex{},
- chainLocks: make(map[uint32]*sync.Mutex),
+ txPool: txPool,
+ blockchain: blockchain,
+ gov: gov,
+ chainDB: chainDB,
+ config: config,
+ vmConfig: vmConfig,
+ notifyChan: make(map[uint64]*notify),
+ chainLocks: make(map[uint32]*sync.RWMutex),
+ chainLatestRoot: make(map[uint32]*common.Hash),
}
}
@@ -105,7 +105,6 @@ func (d *DexconApp) notify(height uint64) {
delete(d.notifyChan, h)
}
}
- d.lastPendingHeight = height
}
func (d *DexconApp) checkChain(address common.Address, chainSize, chainID *big.Int) bool {
@@ -115,8 +114,8 @@ func (d *DexconApp) checkChain(address common.Address, chainSize, chainID *big.I
// PreparePayload is called when consensus core is preparing payload for block.
func (d *DexconApp) PreparePayload(position coreTypes.Position) (payload []byte, err error) {
- d.chainLock(position.ChainID)
- defer d.chainUnlock(position.ChainID)
+ d.chainRLock(position.ChainID)
+ defer d.chainRUnlock(position.ChainID)
if position.Height != 0 {
// check if chain block height is sequential
@@ -127,21 +126,16 @@ func (d *DexconApp) PreparePayload(position coreTypes.Position) (payload []byte,
}
}
- // set state to the pending height
- var latestState *state.StateDB
- lastPendingBlock := d.blockchain.GetPendingBlockByHeight(d.lastPendingHeight)
- if d.lastPendingHeight == 0 || lastPendingBlock == nil {
- latestState, err = d.blockchain.State()
- if err != nil {
- log.Error("Get current state", "error", err)
- return nil, fmt.Errorf("get current state error %v", err)
- }
- } else {
- latestState, err = d.blockchain.StateAt(lastPendingBlock.Root())
- if err != nil {
- log.Error("Get pending state", "error", err)
- return nil, fmt.Errorf("get pending state error: %v", err)
- }
+ root := d.getChainLatestRoot(position.ChainID)
+ if root == nil {
+ currentRoot := d.blockchain.CurrentBlock().Root()
+ root = &currentRoot
+ }
+ // set state to the chain latest height
+ latestState, err := d.blockchain.StateAt(*root)
+ if err != nil {
+ log.Error("Get pending state", "error", err)
+ return nil, fmt.Errorf("get pending state error: %v", err)
}
txsMap, err := d.txPool.Pending()
@@ -220,16 +214,13 @@ addressMap:
// PrepareWitness will return the witness data no lower than consensusHeight.
func (d *DexconApp) PrepareWitness(consensusHeight uint64) (witness coreTypes.Witness, err error) {
var witnessBlock *types.Block
- if d.lastPendingHeight == 0 && consensusHeight == 0 {
+ lastPendingHeight := d.blockchain.GetLastPendingHeight()
+ if lastPendingHeight == 0 && consensusHeight == 0 {
witnessBlock = d.blockchain.CurrentBlock()
- } else if d.lastPendingHeight >= consensusHeight {
- d.insertMu.Lock()
- witnessBlock = d.blockchain.GetPendingBlockByHeight(d.lastPendingHeight)
- d.insertMu.Unlock()
+ } else if lastPendingHeight >= consensusHeight {
+ witnessBlock = d.blockchain.GetLastPendingBlock()
} else if h := <-d.addNotify(consensusHeight); h >= consensusHeight {
- d.insertMu.Lock()
- witnessBlock = d.blockchain.GetPendingBlockByHeight(h)
- d.insertMu.Unlock()
+ witnessBlock = d.blockchain.GetLastPendingBlock()
} else {
log.Error("need pending block")
return witness, fmt.Errorf("need pending block")
@@ -273,8 +264,8 @@ func (d *DexconApp) VerifyBlock(block *coreTypes.Block) coreTypes.BlockVerifySta
return coreTypes.VerifyRetryLater
}
- d.chainLock(block.Position.ChainID)
- defer d.chainUnlock(block.Position.ChainID)
+ d.chainRLock(block.Position.ChainID)
+ defer d.chainRUnlock(block.Position.ChainID)
if block.Position.Height != 0 {
// check if chain block height is sequential
@@ -285,21 +276,16 @@ func (d *DexconApp) VerifyBlock(block *coreTypes.Block) coreTypes.BlockVerifySta
}
}
- // set state to the pending height
- var latestState *state.StateDB
- lastPendingBlock := d.blockchain.GetPendingBlockByHeight(d.lastPendingHeight)
- if d.lastPendingHeight == 0 || lastPendingBlock == nil {
- latestState, err = d.blockchain.State()
- if err != nil {
- log.Error("Get current state", "error", err)
- return coreTypes.VerifyInvalidBlock
- }
- } else {
- latestState, err = d.blockchain.StateAt(lastPendingBlock.Root())
- if err != nil {
- log.Error("Get pending state", "error", err)
- return coreTypes.VerifyInvalidBlock
- }
+ root := d.getChainLatestRoot(block.Position.ChainID)
+ if root == nil {
+ currentRoot := d.blockchain.CurrentBlock().Root()
+ root = &currentRoot
+ }
+ // set state to the chain latest height
+ latestState, err := d.blockchain.StateAt(*root)
+ if err != nil {
+ log.Error("Get pending state", "error", err)
+ return coreTypes.VerifyInvalidBlock
}
var transactions types.Transactions
@@ -430,11 +416,12 @@ func (d *DexconApp) BlockDelivered(blockHash coreCommon.Hash, result coreTypes.F
Randomness: result.Randomness,
}, transactions, nil, nil)
- _, err = d.blockchain.ProcessPendingBlock(newBlock, &block.Witness)
+ root, err := d.blockchain.ProcessPendingBlock(newBlock, &block.Witness)
if err != nil {
log.Error("Insert chain", "error", err)
panic(err)
}
+ d.setChainLatestRoot(block.Position.ChainID, root)
log.Info("Insert pending block success", "height", result.Height)
d.blockchain.RemoveConfirmedBlock(blockHash)
@@ -473,17 +460,44 @@ func (d *DexconApp) validateNonce(txs types.Transactions) (map[common.Address]ui
return addressFirstNonce, nil
}
-func (d *DexconApp) chainLock(chainID uint32) {
+func (d *DexconApp) getChainLatestRoot(chainID uint32) *common.Hash {
+ d.chainLatestRootMu.RLock()
+ defer d.chainLatestRootMu.RUnlock()
+
+ return d.chainLatestRoot[chainID]
+}
+
+func (d *DexconApp) setChainLatestRoot(chainID uint32, root *common.Hash) {
+ d.chainLatestRootMu.Lock()
+ defer d.chainLatestRootMu.Unlock()
+
+ d.chainLatestRoot[chainID] = root
+}
+
+func (d *DexconApp) chainLockInit(chainID uint32) {
d.chainLocksInitMu.Lock()
+ defer d.chainLocksInitMu.Unlock()
+
_, exist := d.chainLocks[chainID]
if !exist {
- d.chainLocks[chainID] = &sync.Mutex{}
+ d.chainLocks[chainID] = &sync.RWMutex{}
}
- d.chainLocksInitMu.Unlock()
+}
+func (d *DexconApp) chainLock(chainID uint32) {
+ d.chainLockInit(chainID)
d.chainLocks[chainID].Lock()
}
func (d *DexconApp) chainUnlock(chainID uint32) {
d.chainLocks[chainID].Unlock()
}
+
+func (d *DexconApp) chainRLock(chainID uint32) {
+ d.chainLockInit(chainID)
+ d.chainLocks[chainID].RLock()
+}
+
+func (d *DexconApp) chainRUnlock(chainID uint32) {
+ d.chainLocks[chainID].RUnlock()
+}