aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--core/blockchain.go27
-rw-r--r--core/blockchain_test.go206
2 files changed, 224 insertions, 9 deletions
diff --git a/core/blockchain.go b/core/blockchain.go
index e40fc39fa..117be8c72 100644
--- a/core/blockchain.go
+++ b/core/blockchain.go
@@ -1401,11 +1401,11 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
commonBlock *types.Block
deletedTxs types.Transactions
deletedLogs []*types.Log
+ rebirthLogs []*types.Log
// collectLogs collects the logs that were generated during the
// processing of the block that corresponds with the given hash.
- // These logs are later announced as deleted.
- collectLogs = func(hash common.Hash) {
- // Coalesce logs and set 'Removed'.
+ // These logs are later announced as deleted or reborn
+ collectLogs = func(hash common.Hash, removed bool) {
number := bc.hc.GetBlockNumber(hash)
if number == nil {
return
@@ -1413,9 +1413,13 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
receipts := rawdb.ReadReceipts(bc.db, hash, *number)
for _, receipt := range receipts {
for _, log := range receipt.Logs {
- del := *log
- del.Removed = true
- deletedLogs = append(deletedLogs, &del)
+ l := *log
+ if removed {
+ l.Removed = true
+ deletedLogs = append(deletedLogs, &l)
+ } else {
+ rebirthLogs = append(rebirthLogs, &l)
+ }
}
}
}
@@ -1428,7 +1432,7 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
oldChain = append(oldChain, oldBlock)
deletedTxs = append(deletedTxs, oldBlock.Transactions()...)
- collectLogs(oldBlock.Hash())
+ collectLogs(oldBlock.Hash(), true)
}
} else {
// reduce new chain and append new chain blocks for inserting later on
@@ -1452,7 +1456,7 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
oldChain = append(oldChain, oldBlock)
newChain = append(newChain, newBlock)
deletedTxs = append(deletedTxs, oldBlock.Transactions()...)
- collectLogs(oldBlock.Hash())
+ collectLogs(oldBlock.Hash(), true)
oldBlock, newBlock = bc.GetBlock(oldBlock.ParentHash(), oldBlock.NumberU64()-1), bc.GetBlock(newBlock.ParentHash(), newBlock.NumberU64()-1)
if oldBlock == nil {
@@ -1478,6 +1482,10 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
for i := len(newChain) - 1; i >= 0; i-- {
// insert the block in the canonical way, re-writing history
bc.insert(newChain[i])
+ // collect reborn logs due to chain reorg(except head block)
+ if i != 0 {
+ collectLogs(newChain[i].Hash(), false)
+ }
// write lookup entries for hash based transaction/receipt searches
rawdb.WriteTxLookupEntries(bc.db, newChain[i])
addedTxs = append(addedTxs, newChain[i].Transactions()...)
@@ -1495,6 +1503,9 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
if len(deletedLogs) > 0 {
go bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs})
}
+ if len(rebirthLogs) > 0 {
+ go bc.logsFeed.Send(rebirthLogs)
+ }
if len(oldChain) > 0 {
go func() {
for _, block := range oldChain {
diff --git a/core/blockchain_test.go b/core/blockchain_test.go
index 504ad0eaf..7c76f1fc4 100644
--- a/core/blockchain_test.go
+++ b/core/blockchain_test.go
@@ -884,7 +884,6 @@ func TestChainTxReorgs(t *testing.T) {
}
func TestLogReorgs(t *testing.T) {
-
var (
key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
addr1 = crypto.PubkeyToAddress(key1.PublicKey)
@@ -930,6 +929,211 @@ func TestLogReorgs(t *testing.T) {
}
}
+func TestLogRebirth(t *testing.T) {
+ var (
+ key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
+ addr1 = crypto.PubkeyToAddress(key1.PublicKey)
+ db = ethdb.NewMemDatabase()
+ // this code generates a log
+ code = common.Hex2Bytes("60606040525b7f24ec1d3ff24c2f6ff210738839dbc339cd45a5294d85c79361016243157aae7b60405180905060405180910390a15b600a8060416000396000f360606040526008565b00")
+ gspec = &Genesis{Config: params.TestChainConfig, Alloc: GenesisAlloc{addr1: {Balance: big.NewInt(10000000000000)}}}
+ genesis = gspec.MustCommit(db)
+ signer = types.NewEIP155Signer(gspec.Config.ChainID)
+ newLogCh = make(chan bool)
+ )
+
+ // listenNewLog checks whether the received logs number is equal with expected.
+ listenNewLog := func(sink chan []*types.Log, expect int) {
+ cnt := 0
+ for {
+ select {
+ case logs := <-sink:
+ cnt += len(logs)
+ case <-time.NewTimer(5 * time.Second).C:
+ // new logs timeout
+ newLogCh <- false
+ return
+ }
+ if cnt == expect {
+ break
+ } else if cnt > expect {
+ // redundant logs received
+ newLogCh <- false
+ return
+ }
+ }
+ select {
+ case <-sink:
+ // redundant logs received
+ newLogCh <- false
+ case <-time.NewTimer(100 * time.Millisecond).C:
+ newLogCh <- true
+ }
+ }
+
+ blockchain, _ := NewBlockChain(db, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil)
+ defer blockchain.Stop()
+
+ logsCh := make(chan []*types.Log)
+ blockchain.SubscribeLogsEvent(logsCh)
+
+ rmLogsCh := make(chan RemovedLogsEvent)
+ blockchain.SubscribeRemovedLogsEvent(rmLogsCh)
+
+ chain, _ := GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, 2, func(i int, gen *BlockGen) {
+ if i == 1 {
+ tx, err := types.SignTx(types.NewContractCreation(gen.TxNonce(addr1), new(big.Int), 1000000, new(big.Int), code), signer, key1)
+ if err != nil {
+ t.Fatalf("failed to create tx: %v", err)
+ }
+ gen.AddTx(tx)
+ }
+ })
+
+ // Spawn a goroutine to receive log events
+ go listenNewLog(logsCh, 1)
+ if _, err := blockchain.InsertChain(chain); err != nil {
+ t.Fatalf("failed to insert chain: %v", err)
+ }
+ if !<-newLogCh {
+ t.Fatalf("failed to receive new log event")
+ }
+
+ // Generate long reorg chain
+ forkChain, _ := GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, 2, func(i int, gen *BlockGen) {
+ if i == 1 {
+ tx, err := types.SignTx(types.NewContractCreation(gen.TxNonce(addr1), new(big.Int), 1000000, new(big.Int), code), signer, key1)
+ if err != nil {
+ t.Fatalf("failed to create tx: %v", err)
+ }
+ gen.AddTx(tx)
+ // Higher block difficulty
+ gen.OffsetTime(-9)
+ }
+ })
+
+ // Spawn a goroutine to receive log events
+ go listenNewLog(logsCh, 1)
+ if _, err := blockchain.InsertChain(forkChain); err != nil {
+ t.Fatalf("failed to insert forked chain: %v", err)
+ }
+ if !<-newLogCh {
+ t.Fatalf("failed to receive new log event")
+ }
+ // Ensure removedLog events received
+ select {
+ case ev := <-rmLogsCh:
+ if len(ev.Logs) == 0 {
+ t.Error("expected logs")
+ }
+ case <-time.NewTimer(1 * time.Second).C:
+ t.Fatal("Timeout. There is no RemovedLogsEvent has been sent.")
+ }
+
+ newBlocks, _ := GenerateChain(params.TestChainConfig, chain[len(chain)-1], ethash.NewFaker(), db, 1, func(i int, gen *BlockGen) {})
+ go listenNewLog(logsCh, 1)
+ if _, err := blockchain.InsertChain(newBlocks); err != nil {
+ t.Fatalf("failed to insert forked chain: %v", err)
+ }
+ // Rebirth logs should omit a newLogEvent
+ if !<-newLogCh {
+ t.Fatalf("failed to receive new log event")
+ }
+ // Ensure removedLog events received
+ select {
+ case ev := <-rmLogsCh:
+ if len(ev.Logs) == 0 {
+ t.Error("expected logs")
+ }
+ case <-time.NewTimer(1 * time.Second).C:
+ t.Fatal("Timeout. There is no RemovedLogsEvent has been sent.")
+ }
+}
+
+func TestSideLogRebirth(t *testing.T) {
+ var (
+ key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
+ addr1 = crypto.PubkeyToAddress(key1.PublicKey)
+ db = ethdb.NewMemDatabase()
+ // this code generates a log
+ code = common.Hex2Bytes("60606040525b7f24ec1d3ff24c2f6ff210738839dbc339cd45a5294d85c79361016243157aae7b60405180905060405180910390a15b600a8060416000396000f360606040526008565b00")
+ gspec = &Genesis{Config: params.TestChainConfig, Alloc: GenesisAlloc{addr1: {Balance: big.NewInt(10000000000000)}}}
+ genesis = gspec.MustCommit(db)
+ signer = types.NewEIP155Signer(gspec.Config.ChainID)
+ newLogCh = make(chan bool)
+ )
+
+ // listenNewLog checks whether the received logs number is equal with expected.
+ listenNewLog := func(sink chan []*types.Log, expect int) {
+ cnt := 0
+ for {
+ select {
+ case logs := <-sink:
+ cnt += len(logs)
+ case <-time.NewTimer(5 * time.Second).C:
+ // new logs timeout
+ newLogCh <- false
+ return
+ }
+ if cnt == expect {
+ break
+ } else if cnt > expect {
+ // redundant logs received
+ newLogCh <- false
+ return
+ }
+ }
+ select {
+ case <-sink:
+ // redundant logs received
+ newLogCh <- false
+ case <-time.NewTimer(100 * time.Millisecond).C:
+ newLogCh <- true
+ }
+ }
+
+ blockchain, _ := NewBlockChain(db, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil)
+ defer blockchain.Stop()
+
+ logsCh := make(chan []*types.Log)
+ blockchain.SubscribeLogsEvent(logsCh)
+
+ chain, _ := GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, 2, func(i int, gen *BlockGen) {
+ if i == 1 {
+ // Higher block difficulty
+ gen.OffsetTime(-9)
+ }
+ })
+ if _, err := blockchain.InsertChain(chain); err != nil {
+ t.Fatalf("failed to insert forked chain: %v", err)
+ }
+
+ // Generate side chain with lower difficulty
+ sideChain, _ := GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, 2, func(i int, gen *BlockGen) {
+ if i == 1 {
+ tx, err := types.SignTx(types.NewContractCreation(gen.TxNonce(addr1), new(big.Int), 1000000, new(big.Int), code), signer, key1)
+ if err != nil {
+ t.Fatalf("failed to create tx: %v", err)
+ }
+ gen.AddTx(tx)
+ }
+ })
+ if _, err := blockchain.InsertChain(sideChain); err != nil {
+ t.Fatalf("failed to insert forked chain: %v", err)
+ }
+
+ // Generate a new block based on side chain
+ newBlocks, _ := GenerateChain(params.TestChainConfig, sideChain[len(sideChain)-1], ethash.NewFaker(), db, 1, func(i int, gen *BlockGen) {})
+ go listenNewLog(logsCh, 1)
+ if _, err := blockchain.InsertChain(newBlocks); err != nil {
+ t.Fatalf("failed to insert forked chain: %v", err)
+ }
+ // Rebirth logs should omit a newLogEvent
+ if !<-newLogCh {
+ t.Fatalf("failed to receive new log event")
+ }
+}
+
func TestReorgSideEvent(t *testing.T) {
var (
db = ethdb.NewMemDatabase()