From f1b00cffc828105c17c0ecacb2074874b752a9a0 Mon Sep 17 00:00:00 2001
From: rjl493456442 <garyrong0905@gmail.com>
Date: Mon, 17 Dec 2018 15:23:54 +0800
Subject: core: re-omit new log event when logs rebirth

---
 core/blockchain.go      |  27 +++++--
 core/blockchain_test.go | 206 +++++++++++++++++++++++++++++++++++++++++++++++-
 2 files changed, 224 insertions(+), 9 deletions(-)

(limited to 'core')

diff --git a/core/blockchain.go b/core/blockchain.go
index e40fc39fa..117be8c72 100644
--- a/core/blockchain.go
+++ b/core/blockchain.go
@@ -1401,11 +1401,11 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
 		commonBlock *types.Block
 		deletedTxs  types.Transactions
 		deletedLogs []*types.Log
+		rebirthLogs []*types.Log
 		// collectLogs collects the logs that were generated during the
 		// processing of the block that corresponds with the given hash.
-		// These logs are later announced as deleted.
-		collectLogs = func(hash common.Hash) {
-			// Coalesce logs and set 'Removed'.
+		// These logs are later announced as deleted or reborn
+		collectLogs = func(hash common.Hash, removed bool) {
 			number := bc.hc.GetBlockNumber(hash)
 			if number == nil {
 				return
@@ -1413,9 +1413,13 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
 			receipts := rawdb.ReadReceipts(bc.db, hash, *number)
 			for _, receipt := range receipts {
 				for _, log := range receipt.Logs {
-					del := *log
-					del.Removed = true
-					deletedLogs = append(deletedLogs, &del)
+					l := *log
+					if removed {
+						l.Removed = true
+						deletedLogs = append(deletedLogs, &l)
+					} else {
+						rebirthLogs = append(rebirthLogs, &l)
+					}
 				}
 			}
 		}
@@ -1428,7 +1432,7 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
 			oldChain = append(oldChain, oldBlock)
 			deletedTxs = append(deletedTxs, oldBlock.Transactions()...)
 
-			collectLogs(oldBlock.Hash())
+			collectLogs(oldBlock.Hash(), true)
 		}
 	} else {
 		// reduce new chain and append new chain blocks for inserting later on
@@ -1452,7 +1456,7 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
 		oldChain = append(oldChain, oldBlock)
 		newChain = append(newChain, newBlock)
 		deletedTxs = append(deletedTxs, oldBlock.Transactions()...)
-		collectLogs(oldBlock.Hash())
+		collectLogs(oldBlock.Hash(), true)
 
 		oldBlock, newBlock = bc.GetBlock(oldBlock.ParentHash(), oldBlock.NumberU64()-1), bc.GetBlock(newBlock.ParentHash(), newBlock.NumberU64()-1)
 		if oldBlock == nil {
@@ -1478,6 +1482,10 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
 	for i := len(newChain) - 1; i >= 0; i-- {
 		// insert the block in the canonical way, re-writing history
 		bc.insert(newChain[i])
+		// collect reborn logs due to chain reorg(except head block)
+		if i != 0 {
+			collectLogs(newChain[i].Hash(), false)
+		}
 		// write lookup entries for hash based transaction/receipt searches
 		rawdb.WriteTxLookupEntries(bc.db, newChain[i])
 		addedTxs = append(addedTxs, newChain[i].Transactions()...)
@@ -1495,6 +1503,9 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
 	if len(deletedLogs) > 0 {
 		go bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs})
 	}
+	if len(rebirthLogs) > 0 {
+		go bc.logsFeed.Send(rebirthLogs)
+	}
 	if len(oldChain) > 0 {
 		go func() {
 			for _, block := range oldChain {
diff --git a/core/blockchain_test.go b/core/blockchain_test.go
index 504ad0eaf..7c76f1fc4 100644
--- a/core/blockchain_test.go
+++ b/core/blockchain_test.go
@@ -884,7 +884,6 @@ func TestChainTxReorgs(t *testing.T) {
 }
 
 func TestLogReorgs(t *testing.T) {
-
 	var (
 		key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
 		addr1   = crypto.PubkeyToAddress(key1.PublicKey)
@@ -930,6 +929,211 @@ func TestLogReorgs(t *testing.T) {
 	}
 }
 
+func TestLogRebirth(t *testing.T) {
+	var (
+		key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
+		addr1   = crypto.PubkeyToAddress(key1.PublicKey)
+		db      = ethdb.NewMemDatabase()
+		// this code generates a log
+		code     = common.Hex2Bytes("60606040525b7f24ec1d3ff24c2f6ff210738839dbc339cd45a5294d85c79361016243157aae7b60405180905060405180910390a15b600a8060416000396000f360606040526008565b00")
+		gspec    = &Genesis{Config: params.TestChainConfig, Alloc: GenesisAlloc{addr1: {Balance: big.NewInt(10000000000000)}}}
+		genesis  = gspec.MustCommit(db)
+		signer   = types.NewEIP155Signer(gspec.Config.ChainID)
+		newLogCh = make(chan bool)
+	)
+
+	// listenNewLog checks whether the received logs number is equal with expected.
+	listenNewLog := func(sink chan []*types.Log, expect int) {
+		cnt := 0
+		for {
+			select {
+			case logs := <-sink:
+				cnt += len(logs)
+			case <-time.NewTimer(5 * time.Second).C:
+				// new logs timeout
+				newLogCh <- false
+				return
+			}
+			if cnt == expect {
+				break
+			} else if cnt > expect {
+				// redundant logs received
+				newLogCh <- false
+				return
+			}
+		}
+		select {
+		case <-sink:
+			// redundant logs received
+			newLogCh <- false
+		case <-time.NewTimer(100 * time.Millisecond).C:
+			newLogCh <- true
+		}
+	}
+
+	blockchain, _ := NewBlockChain(db, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil)
+	defer blockchain.Stop()
+
+	logsCh := make(chan []*types.Log)
+	blockchain.SubscribeLogsEvent(logsCh)
+
+	rmLogsCh := make(chan RemovedLogsEvent)
+	blockchain.SubscribeRemovedLogsEvent(rmLogsCh)
+
+	chain, _ := GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, 2, func(i int, gen *BlockGen) {
+		if i == 1 {
+			tx, err := types.SignTx(types.NewContractCreation(gen.TxNonce(addr1), new(big.Int), 1000000, new(big.Int), code), signer, key1)
+			if err != nil {
+				t.Fatalf("failed to create tx: %v", err)
+			}
+			gen.AddTx(tx)
+		}
+	})
+
+	// Spawn a goroutine to receive log events
+	go listenNewLog(logsCh, 1)
+	if _, err := blockchain.InsertChain(chain); err != nil {
+		t.Fatalf("failed to insert chain: %v", err)
+	}
+	if !<-newLogCh {
+		t.Fatalf("failed to receive new log event")
+	}
+
+	// Generate long reorg chain
+	forkChain, _ := GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, 2, func(i int, gen *BlockGen) {
+		if i == 1 {
+			tx, err := types.SignTx(types.NewContractCreation(gen.TxNonce(addr1), new(big.Int), 1000000, new(big.Int), code), signer, key1)
+			if err != nil {
+				t.Fatalf("failed to create tx: %v", err)
+			}
+			gen.AddTx(tx)
+			// Higher block difficulty
+			gen.OffsetTime(-9)
+		}
+	})
+
+	// Spawn a goroutine to receive log events
+	go listenNewLog(logsCh, 1)
+	if _, err := blockchain.InsertChain(forkChain); err != nil {
+		t.Fatalf("failed to insert forked chain: %v", err)
+	}
+	if !<-newLogCh {
+		t.Fatalf("failed to receive new log event")
+	}
+	// Ensure removedLog events received
+	select {
+	case ev := <-rmLogsCh:
+		if len(ev.Logs) == 0 {
+			t.Error("expected logs")
+		}
+	case <-time.NewTimer(1 * time.Second).C:
+		t.Fatal("Timeout. There is no RemovedLogsEvent has been sent.")
+	}
+
+	newBlocks, _ := GenerateChain(params.TestChainConfig, chain[len(chain)-1], ethash.NewFaker(), db, 1, func(i int, gen *BlockGen) {})
+	go listenNewLog(logsCh, 1)
+	if _, err := blockchain.InsertChain(newBlocks); err != nil {
+		t.Fatalf("failed to insert forked chain: %v", err)
+	}
+	// Rebirth logs should omit a newLogEvent
+	if !<-newLogCh {
+		t.Fatalf("failed to receive new log event")
+	}
+	// Ensure removedLog events received
+	select {
+	case ev := <-rmLogsCh:
+		if len(ev.Logs) == 0 {
+			t.Error("expected logs")
+		}
+	case <-time.NewTimer(1 * time.Second).C:
+		t.Fatal("Timeout. There is no RemovedLogsEvent has been sent.")
+	}
+}
+
+func TestSideLogRebirth(t *testing.T) {
+	var (
+		key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
+		addr1   = crypto.PubkeyToAddress(key1.PublicKey)
+		db      = ethdb.NewMemDatabase()
+		// this code generates a log
+		code     = common.Hex2Bytes("60606040525b7f24ec1d3ff24c2f6ff210738839dbc339cd45a5294d85c79361016243157aae7b60405180905060405180910390a15b600a8060416000396000f360606040526008565b00")
+		gspec    = &Genesis{Config: params.TestChainConfig, Alloc: GenesisAlloc{addr1: {Balance: big.NewInt(10000000000000)}}}
+		genesis  = gspec.MustCommit(db)
+		signer   = types.NewEIP155Signer(gspec.Config.ChainID)
+		newLogCh = make(chan bool)
+	)
+
+	// listenNewLog checks whether the received logs number is equal with expected.
+	listenNewLog := func(sink chan []*types.Log, expect int) {
+		cnt := 0
+		for {
+			select {
+			case logs := <-sink:
+				cnt += len(logs)
+			case <-time.NewTimer(5 * time.Second).C:
+				// new logs timeout
+				newLogCh <- false
+				return
+			}
+			if cnt == expect {
+				break
+			} else if cnt > expect {
+				// redundant logs received
+				newLogCh <- false
+				return
+			}
+		}
+		select {
+		case <-sink:
+			// redundant logs received
+			newLogCh <- false
+		case <-time.NewTimer(100 * time.Millisecond).C:
+			newLogCh <- true
+		}
+	}
+
+	blockchain, _ := NewBlockChain(db, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil)
+	defer blockchain.Stop()
+
+	logsCh := make(chan []*types.Log)
+	blockchain.SubscribeLogsEvent(logsCh)
+
+	chain, _ := GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, 2, func(i int, gen *BlockGen) {
+		if i == 1 {
+			// Higher block difficulty
+			gen.OffsetTime(-9)
+		}
+	})
+	if _, err := blockchain.InsertChain(chain); err != nil {
+		t.Fatalf("failed to insert forked chain: %v", err)
+	}
+
+	// Generate side chain with lower difficulty
+	sideChain, _ := GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, 2, func(i int, gen *BlockGen) {
+		if i == 1 {
+			tx, err := types.SignTx(types.NewContractCreation(gen.TxNonce(addr1), new(big.Int), 1000000, new(big.Int), code), signer, key1)
+			if err != nil {
+				t.Fatalf("failed to create tx: %v", err)
+			}
+			gen.AddTx(tx)
+		}
+	})
+	if _, err := blockchain.InsertChain(sideChain); err != nil {
+		t.Fatalf("failed to insert forked chain: %v", err)
+	}
+
+	// Generate a new block based on side chain
+	newBlocks, _ := GenerateChain(params.TestChainConfig, sideChain[len(sideChain)-1], ethash.NewFaker(), db, 1, func(i int, gen *BlockGen) {})
+	go listenNewLog(logsCh, 1)
+	if _, err := blockchain.InsertChain(newBlocks); err != nil {
+		t.Fatalf("failed to insert forked chain: %v", err)
+	}
+	// Rebirth logs should omit a newLogEvent
+	if !<-newLogCh {
+		t.Fatalf("failed to receive new log event")
+	}
+}
+
 func TestReorgSideEvent(t *testing.T) {
 	var (
 		db      = ethdb.NewMemDatabase()
-- 
cgit v1.2.3