From 8150597b3878368ad090c9846cdf50dd812b2181 Mon Sep 17 00:00:00 2001
From: Wei-Ning Huang <w@dexon.org>
Date: Thu, 15 Nov 2018 13:30:50 +0800
Subject: core: refactor validator and fix light node sync (#25)

Remove custom Dexon validator by adding a new `ValidateWitnessData`
method into the validator interface. This allow us to properly detect
know blocks. This also allow other gdex "light" client to sync
compaction chain. Also, setup a standalone RPC node for handling RPC
reqeusts.
---
 cmd/utils/flags.go        |   4 ++
 core/block_validator.go   |  41 +++++-----------
 core/blockchain.go        | 121 ++++++++++------------------------------------
 core/blockchain_test.go   |   3 --
 core/chain_makers_test.go |   1 +
 core/tx_pool.go           |  35 +++++++++++++-
 core/tx_pool_test.go      |  34 ++++++-------
 core/types.go             |   3 ++
 core/types/block.go       |   1 +
 dex/backend.go            |   7 ++-
 dex/config.go             |  12 +++--
 dex/handler.go            | 103 ++++++++++++++++++++++++---------------
 eth/backend.go            |   2 +-
 p2p/server.go             |   2 +-
 test/run_test.sh          |  26 ++++++----
 15 files changed, 189 insertions(+), 206 deletions(-)

diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go
index 6a1d84869..611db1975 100644
--- a/cmd/utils/flags.go
+++ b/cmd/utils/flags.go
@@ -1171,6 +1171,10 @@ func SetDexConfig(ctx *cli.Context, stack *node.Node, cfg *dex.Config) {
 	if ctx.GlobalIsSet(NetworkIdFlag.Name) {
 		cfg.NetworkId = ctx.GlobalUint64(NetworkIdFlag.Name)
 	}
+	if ctx.GlobalIsSet(BlockProposerEnabledFlag.Name) {
+		cfg.BlockProposerEnabled = ctx.GlobalBool(BlockProposerEnabledFlag.Name)
+	}
+
 	if ctx.GlobalIsSet(CacheFlag.Name) || ctx.GlobalIsSet(CacheDatabaseFlag.Name) {
 		cfg.DatabaseCache = ctx.GlobalInt(CacheFlag.Name) * ctx.GlobalInt(CacheDatabaseFlag.Name) / 100
 	}
diff --git a/core/block_validator.go b/core/block_validator.go
index 65f311f9f..09539790b 100644
--- a/core/block_validator.go
+++ b/core/block_validator.go
@@ -101,37 +101,20 @@ func (v *BlockValidator) ValidateState(block, parent *types.Block, statedb *stat
 	return nil
 }
 
-// BlockValidator implements Validator.
-type DexonBlockValidator struct {
-	config *params.ChainConfig // Chain configuration options
-	bc     *BlockChain         // Canonical block chain
-	engine consensus.Engine    // Consensus engine used for validating
-}
+func (v *BlockValidator) ValidateWitnessData(height uint64, data types.WitnessData) error {
+	currentBlock := v.bc.CurrentBlock()
+	if height > currentBlock.NumberU64() && height != 0 {
+		pendingBlock := v.bc.GetPendingBlockByNumber(height)
 
-// NewDexonBlockValidator returns a new block validator which is safe for re-use
-func NewDexonBlockValidator(config *params.ChainConfig, blockchain *BlockChain, engine consensus.Engine) *DexonBlockValidator {
-	validator := &DexonBlockValidator{
-		config: config,
-		engine: engine,
-		bc:     blockchain,
+		if pendingBlock.Root() != data.Root {
+			return fmt.Errorf("invalid witness root %s vs %s",
+				pendingBlock.Root().String(), data.Root.String())
+		}
+		if pendingBlock.ReceiptHash() != data.ReceiptHash {
+			return fmt.Errorf("invalid witness receipt hash %s vs %s",
+				pendingBlock.ReceiptHash().String(), data.ReceiptHash.String())
+		}
 	}
-	return validator
-}
-
-// ValidateBody validates the given block's uncles and verifies the block
-// header's transaction and uncle roots. The headers are assumed to be already
-// validated at this point.
-func (v *DexonBlockValidator) ValidateBody(block *types.Block) error {
-	// TODO(Bojie): implement it
-	return nil
-}
-
-// ValidateState validates the various changes that happen after a state
-// transition, such as amount of used gas, the receipt roots and the state root
-// itself. ValidateState returns a database batch if the validation was a success
-// otherwise nil and an error is returned.
-func (v *DexonBlockValidator) ValidateState(block, parent *types.Block, statedb *state.StateDB, receipts types.Receipts, usedGas uint64) error {
-	// TODO(Bojie): implement it
 	return nil
 }
 
diff --git a/core/blockchain.go b/core/blockchain.go
index 70394f0db..43d044875 100644
--- a/core/blockchain.go
+++ b/core/blockchain.go
@@ -237,79 +237,6 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
 	return bc, nil
 }
 
-func NewBlockChainWithDexonValidator(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *params.ChainConfig, engine consensus.Engine, vmConfig vm.Config, shouldPreserve func(block *types.Block) bool) (*BlockChain, error) {
-	if cacheConfig == nil {
-		cacheConfig = &CacheConfig{
-			TrieNodeLimit: 256 * 1024 * 1024,
-			TrieTimeLimit: 5 * time.Minute,
-		}
-	}
-	bodyCache, _ := lru.New(bodyCacheLimit)
-	bodyRLPCache, _ := lru.New(bodyCacheLimit)
-	receiptsCache, _ := lru.New(receiptsCacheLimit)
-	blockCache, _ := lru.New(blockCacheLimit)
-	futureBlocks, _ := lru.New(maxFutureBlocks)
-	badBlocks, _ := lru.New(badBlockLimit)
-
-	bc := &BlockChain{
-		chainConfig:   chainConfig,
-		cacheConfig:   cacheConfig,
-		db:            db,
-		triegc:        prque.New(nil),
-		stateCache:    state.NewDatabase(db),
-		quit:          make(chan struct{}),
-		bodyCache:     bodyCache,
-		bodyRLPCache:  bodyRLPCache,
-		receiptsCache: receiptsCache,
-		blockCache:    blockCache,
-		futureBlocks:  futureBlocks,
-		engine:        engine,
-		vmConfig:      vmConfig,
-		badBlocks:     badBlocks,
-		pendingBlocks: make(map[uint64]struct {
-			block    *types.Block
-			receipts types.Receipts
-		}),
-		confirmedBlocks: make(map[uint32]map[coreCommon.Hash]*blockInfo),
-		addressNonce:    make(map[uint32]map[common.Address]uint64),
-		addressCost:     make(map[uint32]map[common.Address]*big.Int),
-		addressCounter:  make(map[uint32]map[common.Address]uint64),
-		chainLastHeight: make(map[uint32]uint64),
-	}
-	bc.SetValidator(NewDexonBlockValidator(chainConfig, bc, engine))
-	bc.SetProcessor(NewStateProcessor(chainConfig, bc, engine))
-
-	var err error
-	bc.hc, err = NewHeaderChain(db, chainConfig, engine, bc.getProcInterrupt)
-	if err != nil {
-		return nil, err
-	}
-	bc.genesisBlock = bc.GetBlockByNumber(0)
-	if bc.genesisBlock == nil {
-		return nil, ErrNoGenesis
-	}
-	if err := bc.loadLastState(); err != nil {
-		return nil, err
-	}
-	// Check the current state of the block hashes and make sure that we do not have any of the bad blocks in our chain
-	for hash := range BadHashes {
-		if header := bc.GetHeaderByHash(hash); header != nil {
-			// get the canonical block corresponding to the offending header's number
-			headerByNumber := bc.GetHeaderByNumber(header.Number.Uint64())
-			// make sure the headerByNumber (if present) is in our current canonical chain
-			if headerByNumber != nil && headerByNumber.Hash() == header.Hash() {
-				log.Error("Found bad hash, rewinding chain", "number", header.Number, "hash", header.ParentHash)
-				bc.SetHead(header.Number.Uint64() - 1)
-				log.Error("Chain rewind was successful, resuming normal operation")
-			}
-		}
-	}
-
-	// Take ownership of this particular state
-	go bc.update()
-	return bc, nil
-}
-
 func (bc *BlockChain) getProcInterrupt() bool {
 	return atomic.LoadInt32(&bc.procInterrupt) == 1
 }
@@ -1600,7 +1527,15 @@ func (bc *BlockChain) ProcessPendingBlock(block *types.Block, witness *coreTypes
 	return n, err
 }
 
-func (bc *BlockChain) processPendingBlock(block *types.Block, witness *coreTypes.Witness) (*common.Hash, []interface{}, []*types.Log, error) {
+func (bc *BlockChain) processPendingBlock(
+	block *types.Block, witness *coreTypes.Witness) (*common.Hash, []interface{}, []*types.Log, error) {
+	// Pre-checks passed, start the full block imports
+	bc.wg.Add(1)
+	defer bc.wg.Done()
+
+	bc.chainmu.Lock()
+	defer bc.chainmu.Unlock()
+
 	// A queued approach to delivering events. This is generally
 	// faster than direct delivery and requires much less mutex
 	// acquiring.
@@ -1611,12 +1546,6 @@ func (bc *BlockChain) processPendingBlock(block *types.Block, witness *coreTypes
 		coalescedLogs []*types.Log
 	)
 
-	var witnessData types.WitnessData
-	if err := rlp.Decode(bytes.NewReader(witness.Data), &witnessData); err != nil {
-		log.Error("Witness rlp decode failed", "error", err)
-		panic(err)
-	}
-
 	// Start a parallel signature recovery (signer will fluke on fork transition, minimal perf loss)
 	senderCacher.recoverFromBlocks(types.MakeSigner(bc.chainConfig, block.Number()), []*types.Block{block})
 
@@ -1626,19 +1555,18 @@ func (bc *BlockChain) processPendingBlock(block *types.Block, witness *coreTypes
 	}
 	bstart := time.Now()
 
-	currentBlock := bc.CurrentBlock()
-	if witness.Height > currentBlock.NumberU64() && witness.Height != 0 {
-		if bc.pendingBlocks[witness.Height].block.Root() != witnessData.Root {
-			return nil, nil, nil, fmt.Errorf("invalid witness root %s vs %s",
-				bc.pendingBlocks[witness.Height].block.Root().String(), witnessData.Root.String())
-		}
+	var witnessData types.WitnessData
+	if err := rlp.Decode(bytes.NewReader(witness.Data), &witnessData); err != nil {
+		log.Error("Witness rlp decode failed", "error", err)
+		panic(err)
+	}
 
-		if bc.pendingBlocks[witness.Height].block.ReceiptHash() != witnessData.ReceiptHash {
-			return nil, nil, nil, fmt.Errorf("invalid witness receipt hash %s vs %s",
-				bc.pendingBlocks[witness.Height].block.ReceiptHash().String(), witnessData.ReceiptHash.String())
-		}
+	if err := bc.Validator().ValidateWitnessData(witness.Height, witnessData); err != nil {
+		return nil, nil, nil, fmt.Errorf("valiadte witness data error: %v", err)
 	}
 
+	currentBlock := bc.CurrentBlock()
+
 	var parentBlock *types.Block
 	var pendingState *state.StateDB
 	var err error
@@ -1680,12 +1608,6 @@ func (bc *BlockChain) processPendingBlock(block *types.Block, witness *coreTypes
 		return nil, nil, nil, fmt.Errorf("finalize error: %v", err)
 	}
 
-	// Validate the state using the default validator
-	err = bc.Validator().ValidateState(block, nil, pendingState, receipts, *usedGas)
-	if err != nil {
-		bc.reportBlock(block, receipts, err)
-		return nil, nil, nil, fmt.Errorf("valiadte state error: %v", err)
-	}
 	proctime := time.Since(bstart)
 
 	// commit state to refresh stateCache
@@ -1790,6 +1712,13 @@ func (bc *BlockChain) GetPendingBlock() *types.Block {
 	return bc.pendingBlocks[bc.lastPendingHeight].block
 }
 
+func (bc *BlockChain) GetPendingBlockByNumber(number uint64) *types.Block {
+	bc.pendingBlockMu.RLock()
+	defer bc.pendingBlockMu.RUnlock()
+
+	return bc.pendingBlocks[number].block
+}
+
 func (bc *BlockChain) GetPending() (*types.Block, *state.StateDB) {
 	block := bc.GetPendingBlock()
 	s, err := state.New(block.Header().Root, bc.stateCache)
diff --git a/core/blockchain_test.go b/core/blockchain_test.go
index 786c47064..11bb1317a 100644
--- a/core/blockchain_test.go
+++ b/core/blockchain_test.go
@@ -1074,9 +1074,6 @@ func TestEIP155Transition(t *testing.T) {
 		}
 	)
 
-	dexConf := new(params.DexconConfig)
-	dexConf.BlockReward = new(big.Int)
-	gspec.Config.Dexcon = dexConf
 	genesis := gspec.MustCommit(db)
 
 	blockchain, _ := NewBlockChain(db, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil)
diff --git a/core/chain_makers_test.go b/core/chain_makers_test.go
index cb01ae0c9..0fb995f5f 100644
--- a/core/chain_makers_test.go
+++ b/core/chain_makers_test.go
@@ -148,6 +148,7 @@ func ExampleGenerateChainWithRoundChange() {
 			},
 			addr1: {
 				Balance: big.NewInt(1000000),
+				Staked:  big.NewInt(0),
 			},
 		},
 	}
diff --git a/core/tx_pool.go b/core/tx_pool.go
index 622c8ce9d..54bad9eae 100644
--- a/core/tx_pool.go
+++ b/core/tx_pool.go
@@ -36,6 +36,9 @@ import (
 )
 
 const (
+	// chainHeadChanSize is the size of channel listening to ChainHeadEvent.
+	chainHeadChanSize = 10
+
 	// blockConfirmedChanSize is the size of channel listening to BlockConfirmedEvent.
 	blockConfirmedChanSize = 10
 )
@@ -118,6 +121,7 @@ type blockChain interface {
 	GetBlock(hash common.Hash, number uint64) *types.Block
 	StateAt(root common.Hash) (*state.StateDB, error)
 
+	SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Subscription
 	SubscribeBlockConfirmedEvent(ch chan<- BlockConfirmedEvent) event.Subscription
 }
 
@@ -209,6 +213,8 @@ type TxPool struct {
 	gasPrice          *big.Int
 	txFeed            event.Feed
 	scope             event.SubscriptionScope
+	chainHeadCh       chan ChainHeadEvent
+	chainHeadSub      event.Subscription
 	blockConfirmedCh  chan BlockConfirmedEvent
 	blockConfirmedSub event.Subscription
 	signer            types.Signer
@@ -229,12 +235,13 @@ type TxPool struct {
 
 	wg sync.WaitGroup // for shutdown sync
 
-	homestead bool
+	homestead       bool
+	isBlockProposer bool
 }
 
 // NewTxPool creates a new transaction pool to gather, sort and filter inbound
 // transactions from the network.
-func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain blockChain) *TxPool {
+func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain blockChain, isBlockProposer bool) *TxPool {
 	// Sanitize the input to ensure no vulnerable gas prices are set
 	config = (&config).sanitize()
 
@@ -248,8 +255,10 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block
 		queue:            make(map[common.Address]*txList),
 		beats:            make(map[common.Address]time.Time),
 		all:              newTxLookup(),
+		chainHeadCh:      make(chan ChainHeadEvent, chainHeadChanSize),
 		blockConfirmedCh: make(chan BlockConfirmedEvent, blockConfirmedChanSize),
 		gasPrice:         new(big.Int).SetUint64(config.PriceLimit),
+		isBlockProposer:  isBlockProposer,
 	}
 	pool.locals = newAccountSet(pool.signer)
 	for _, addr := range config.Locals {
@@ -272,6 +281,7 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block
 	}
 	// Subscribe events from blockchain
 	pool.blockConfirmedSub = pool.chain.SubscribeBlockConfirmedEvent(pool.blockConfirmedCh)
+	pool.chainHeadSub = pool.chain.SubscribeChainHeadEvent(pool.chainHeadCh)
 
 	// Start the event loop and return
 	pool.wg.Add(1)
@@ -304,8 +314,29 @@ func (pool *TxPool) loop() {
 	// Keep waiting for and reacting to the various events
 	for {
 		select {
+		// Handle ChainHeadEvent
+		case ev := <-pool.chainHeadCh:
+			if pool.isBlockProposer {
+				break
+			}
+			if ev.Block != nil {
+				pool.mu.Lock()
+				if pool.chainconfig.IsHomestead(ev.Block.Number()) {
+					pool.homestead = true
+				}
+				pool.reset(head.Header(), ev.Block.Header())
+				head = ev.Block
+
+				pool.mu.Unlock()
+			}
+		// Be unsubscribed due to system stopped
+		case <-pool.chainHeadSub.Err():
+			return
 		// Handle BlockConfirmedEvent
 		case ev := <-pool.blockConfirmedCh:
+			if !pool.isBlockProposer {
+				break
+			}
 			if ev.Block != nil {
 				pool.mu.Lock()
 				if pool.chainconfig.IsHomestead(ev.Block.Number()) {
diff --git a/core/tx_pool_test.go b/core/tx_pool_test.go
index 2cc6c7903..4c1d78f7f 100644
--- a/core/tx_pool_test.go
+++ b/core/tx_pool_test.go
@@ -87,7 +87,7 @@ func setupTxPool() (*TxPool, *ecdsa.PrivateKey) {
 	blockchain := &testBlockChain{statedb, 1000000, new(event.Feed), new(event.Feed)}
 
 	key, _ := crypto.GenerateKey()
-	pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain)
+	pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain, false)
 
 	return pool, key
 }
@@ -197,7 +197,7 @@ func TestStateChangeDuringTransactionPoolReset(t *testing.T) {
 	tx0 := transaction(0, 100000, key)
 	tx1 := transaction(1, 100000, key)
 
-	pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain)
+	pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain, false)
 	defer pool.Stop()
 
 	nonce := pool.State().GetNonce(address)
@@ -562,7 +562,7 @@ func TestTransactionPostponing(t *testing.T) {
 	statedb, _ := state.New(common.Hash{}, state.NewDatabase(ethdb.NewMemDatabase()))
 	blockchain := &testBlockChain{statedb, 1000000, new(event.Feed), new(event.Feed)}
 
-	pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain)
+	pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain, false)
 	defer pool.Stop()
 
 	// Create two test accounts to produce different gap profiles with
@@ -781,7 +781,7 @@ func testTransactionQueueGlobalLimiting(t *testing.T, nolocals bool) {
 	config.NoLocals = nolocals
 	config.GlobalQueue = config.AccountQueue*3 - 1 // reduce the queue limits to shorten test time (-1 to make it non divisible)
 
-	pool := NewTxPool(config, params.TestChainConfig, blockchain)
+	pool := NewTxPool(config, params.TestChainConfig, blockchain, false)
 	defer pool.Stop()
 
 	// Create a number of test accounts and fund them (last one will be the local)
@@ -869,7 +869,7 @@ func testTransactionQueueTimeLimiting(t *testing.T, nolocals bool) {
 	config.Lifetime = time.Second
 	config.NoLocals = nolocals
 
-	pool := NewTxPool(config, params.TestChainConfig, blockchain)
+	pool := NewTxPool(config, params.TestChainConfig, blockchain, false)
 	defer pool.Stop()
 
 	// Create two test accounts to ensure remotes expire but locals do not
@@ -1022,7 +1022,7 @@ func TestTransactionPendingGlobalLimiting(t *testing.T) {
 	config := testTxPoolConfig
 	config.GlobalSlots = config.AccountSlots * 10
 
-	pool := NewTxPool(config, params.TestChainConfig, blockchain)
+	pool := NewTxPool(config, params.TestChainConfig, blockchain, false)
 	defer pool.Stop()
 
 	// Create a number of test accounts and fund them
@@ -1070,7 +1070,7 @@ func TestTransactionCapClearsFromAll(t *testing.T) {
 	config.AccountQueue = 2
 	config.GlobalSlots = 8
 
-	pool := NewTxPool(config, params.TestChainConfig, blockchain)
+	pool := NewTxPool(config, params.TestChainConfig, blockchain, false)
 	defer pool.Stop()
 
 	// Create a number of test accounts and fund them
@@ -1102,7 +1102,7 @@ func TestTransactionPendingMinimumAllowance(t *testing.T) {
 	config := testTxPoolConfig
 	config.GlobalSlots = 1
 
-	pool := NewTxPool(config, params.TestChainConfig, blockchain)
+	pool := NewTxPool(config, params.TestChainConfig, blockchain, false)
 	defer pool.Stop()
 
 	// Create a number of test accounts and fund them
@@ -1147,7 +1147,7 @@ func TestTransactionPoolRepricing(t *testing.T) {
 	statedb, _ := state.New(common.Hash{}, state.NewDatabase(ethdb.NewMemDatabase()))
 	blockchain := &testBlockChain{statedb, 1000000, new(event.Feed), new(event.Feed)}
 
-	pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain)
+	pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain, false)
 	defer pool.Stop()
 
 	// Keep track of transaction events to ensure all executables get announced
@@ -1268,7 +1268,7 @@ func TestTransactionPoolRepricingKeepsLocals(t *testing.T) {
 	statedb, _ := state.New(common.Hash{}, state.NewDatabase(ethdb.NewMemDatabase()))
 	blockchain := &testBlockChain{statedb, 1000000, new(event.Feed), new(event.Feed)}
 
-	pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain)
+	pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain, false)
 	defer pool.Stop()
 
 	// Create a number of test accounts and fund them
@@ -1334,7 +1334,7 @@ func TestTransactionPoolUnderpricing(t *testing.T) {
 	config.GlobalSlots = 2
 	config.GlobalQueue = 2
 
-	pool := NewTxPool(config, params.TestChainConfig, blockchain)
+	pool := NewTxPool(config, params.TestChainConfig, blockchain, false)
 	defer pool.Stop()
 
 	// Keep track of transaction events to ensure all executables get announced
@@ -1440,7 +1440,7 @@ func TestTransactionPoolStableUnderpricing(t *testing.T) {
 	config.GlobalSlots = 128
 	config.GlobalQueue = 0
 
-	pool := NewTxPool(config, params.TestChainConfig, blockchain)
+	pool := NewTxPool(config, params.TestChainConfig, blockchain, false)
 	defer pool.Stop()
 
 	// Keep track of transaction events to ensure all executables get announced
@@ -1502,7 +1502,7 @@ func TestTransactionReplacement(t *testing.T) {
 	statedb, _ := state.New(common.Hash{}, state.NewDatabase(ethdb.NewMemDatabase()))
 	blockchain := &testBlockChain{statedb, 1000000, new(event.Feed), new(event.Feed)}
 
-	pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain)
+	pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain, false)
 	defer pool.Stop()
 
 	// Keep track of transaction events to ensure all executables get announced
@@ -1601,7 +1601,7 @@ func testTransactionJournaling(t *testing.T, nolocals bool) {
 	config.Journal = journal
 	config.Rejournal = time.Second
 
-	pool := NewTxPool(config, params.TestChainConfig, blockchain)
+	pool := NewTxPool(config, params.TestChainConfig, blockchain, false)
 
 	// Create two test accounts to ensure remotes expire but locals do not
 	local, _ := crypto.GenerateKey()
@@ -1638,7 +1638,7 @@ func testTransactionJournaling(t *testing.T, nolocals bool) {
 	statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 1)
 	blockchain = &testBlockChain{statedb, 1000000, new(event.Feed), new(event.Feed)}
 
-	pool = NewTxPool(config, params.TestChainConfig, blockchain)
+	pool = NewTxPool(config, params.TestChainConfig, blockchain, false)
 
 	pending, queued = pool.Stats()
 	if queued != 0 {
@@ -1664,7 +1664,7 @@ func testTransactionJournaling(t *testing.T, nolocals bool) {
 
 	statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 1)
 	blockchain = &testBlockChain{statedb, 1000000, new(event.Feed), new(event.Feed)}
-	pool = NewTxPool(config, params.TestChainConfig, blockchain)
+	pool = NewTxPool(config, params.TestChainConfig, blockchain, false)
 
 	pending, queued = pool.Stats()
 	if pending != 0 {
@@ -1694,7 +1694,7 @@ func TestTransactionStatusCheck(t *testing.T) {
 	statedb, _ := state.New(common.Hash{}, state.NewDatabase(ethdb.NewMemDatabase()))
 	blockchain := &testBlockChain{statedb, 1000000, new(event.Feed), new(event.Feed)}
 
-	pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain)
+	pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain, false)
 	defer pool.Stop()
 
 	// Create the test accounts to check various transaction statuses with
diff --git a/core/types.go b/core/types.go
index 38b8e51dc..327031b01 100644
--- a/core/types.go
+++ b/core/types.go
@@ -33,6 +33,9 @@ type Validator interface {
 	// ValidateState validates the given statedb and optionally the receipts and
 	// gas used.
 	ValidateState(block, parent *types.Block, state *state.StateDB, receipts types.Receipts, usedGas uint64) error
+
+	// ValidateWitnessData validates the given witness result.
+	ValidateWitnessData(height uint64, data types.WitnessData) error
 }
 
 // Processor is an interface for processing blocks using a given initial state.
diff --git a/core/types/block.go b/core/types/block.go
index eb75dcb30..73b3ddfda 100644
--- a/core/types/block.go
+++ b/core/types/block.go
@@ -64,6 +64,7 @@ func (n *BlockNonce) UnmarshalText(input []byte) error {
 	return hexutil.UnmarshalFixedText("BlockNonce", input, n[:])
 }
 
+// WitnessData represents the witness data.
 type WitnessData struct {
 	Root        common.Hash
 	TxHash      common.Hash
diff --git a/dex/backend.go b/dex/backend.go
index d7bc4630e..740b8cd6f 100644
--- a/dex/backend.go
+++ b/dex/backend.go
@@ -134,8 +134,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Dexon, error) {
 		}
 		cacheConfig = &core.CacheConfig{Disabled: config.NoPruning, TrieCleanLimit: config.TrieCleanCache, TrieDirtyLimit: config.TrieDirtyCache, TrieTimeLimit: config.TrieTimeout}
 	)
-	dex.blockchain, err = core.NewBlockChainWithDexonValidator(chainDb, cacheConfig,
-		dex.chainConfig, dex.engine, vmConfig, nil)
+	dex.blockchain, err = core.NewBlockChain(chainDb, cacheConfig, dex.chainConfig, dex.engine, vmConfig, nil)
 
 	// Rewind the chain in case of an incompatible config upgrade.
 	if compat, ok := genesisErr.(*params.ConfigCompatError); ok {
@@ -148,7 +147,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Dexon, error) {
 	if config.TxPool.Journal != "" {
 		config.TxPool.Journal = ctx.ResolvePath(config.TxPool.Journal)
 	}
-	dex.txPool = core.NewTxPool(config.TxPool, dex.chainConfig, dex.blockchain)
+	dex.txPool = core.NewTxPool(config.TxPool, dex.chainConfig, dex.blockchain, config.BlockProposerEnabled)
 
 	dex.APIBackend = &DexAPIBackend{dex, nil}
 	gpoParams := config.GPO
@@ -166,7 +165,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Dexon, error) {
 
 	pm, err := NewProtocolManager(dex.chainConfig, config.SyncMode,
 		config.NetworkId, dex.eventMux, dex.txPool, dex.engine, dex.blockchain,
-		chainDb, dex.governance)
+		chainDb, config.BlockProposerEnabled, dex.governance)
 	if err != nil {
 		return nil, err
 	}
diff --git a/dex/config.go b/dex/config.go
index 2b8669fde..e78a698a9 100644
--- a/dex/config.go
+++ b/dex/config.go
@@ -46,10 +46,11 @@ var DefaultConfig = Config{
 		Blocks:     20,
 		Percentile: 60,
 	},
-	DefaultGasPrice:   big.NewInt(params.GWei),
-	GasFloor:          8000000,
-	GasCeil:           8000000,
-	GasLimitTolerance: 1000000,
+	BlockProposerEnabled: false,
+	DefaultGasPrice:      big.NewInt(params.GWei),
+	GasFloor:             8000000,
+	GasCeil:              8000000,
+	GasLimitTolerance:    1000000,
 }
 
 func init() {
@@ -106,6 +107,9 @@ type Config struct {
 	// Gas Price Oracle options
 	GPO gasprice.Config
 
+	// BlockProposer options
+	BlockProposerEnabled bool
+
 	// Enables tracking of SHA3 preimages in the VM
 	EnablePreimageRecording bool
 
diff --git a/dex/handler.go b/dex/handler.go
index 2f8ed13fa..7bc9c297d 100644
--- a/dex/handler.go
+++ b/dex/handler.go
@@ -106,12 +106,11 @@ type ProtocolManager struct {
 
 	SubProtocols []p2p.Protocol
 
-	eventMux      *event.TypeMux
-	txsCh         chan core.NewTxsEvent
-	txsSub        event.Subscription
-	metasCh       chan newMetasEvent
-	metasSub      event.Subscription
-	minedBlockSub *event.TypeMuxSubscription
+	eventMux *event.TypeMux
+	txsCh    chan core.NewTxsEvent
+	txsSub   event.Subscription
+	metasCh  chan newMetasEvent
+	metasSub event.Subscription
 
 	// channels for fetcher, syncer, txsyncLoop
 	newPeerCh   chan *peer
@@ -132,6 +131,9 @@ type ProtocolManager struct {
 	// wait group is used for graceful shutdowns during downloading
 	// and processing
 	wg sync.WaitGroup
+
+	// Dexcon
+	isBlockProposer bool
 }
 
 // NewProtocolManager returns a new Ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
@@ -140,24 +142,25 @@ func NewProtocolManager(
 	config *params.ChainConfig, mode downloader.SyncMode, networkID uint64,
 	mux *event.TypeMux, txpool txPool, engine consensus.Engine,
 	blockchain *core.BlockChain, chaindb ethdb.Database,
-	gov governance) (*ProtocolManager, error) {
+	isBlockProposer bool, gov governance) (*ProtocolManager, error) {
 	tab := newNodeTable()
 	// Create the protocol manager with the base fields
 	manager := &ProtocolManager{
-		networkID:   networkID,
-		eventMux:    mux,
-		txpool:      txpool,
-		nodeTable:   tab,
-		gov:         gov,
-		blockchain:  blockchain,
-		cache:       newCache(128),
-		chainconfig: config,
-		newPeerCh:   make(chan *peer),
-		noMorePeers: make(chan struct{}),
-		txsyncCh:    make(chan *txsync),
-		metasyncCh:  make(chan *metasync),
-		quitSync:    make(chan struct{}),
-		receiveCh:   make(chan interface{}, 1024),
+		networkID:       networkID,
+		eventMux:        mux,
+		txpool:          txpool,
+		nodeTable:       tab,
+		gov:             gov,
+		blockchain:      blockchain,
+		cache:           newCache(128),
+		chainconfig:     config,
+		newPeerCh:       make(chan *peer),
+		noMorePeers:     make(chan struct{}),
+		txsyncCh:        make(chan *txsync),
+		metasyncCh:      make(chan *metasync),
+		quitSync:        make(chan struct{}),
+		receiveCh:       make(chan interface{}, 1024),
+		isBlockProposer: isBlockProposer,
 	}
 
 	// Figure out whether to allow fast sync or not
@@ -258,10 +261,6 @@ func (pm *ProtocolManager) Start(srvr p2pServer, maxPeers int) {
 	pm.metasSub = pm.nodeTable.SubscribeNewMetasEvent(pm.metasCh)
 	go pm.metaBroadcastLoop()
 
-	// broadcast mined blocks
-	pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})
-	go pm.minedBroadcastLoop()
-
 	// run the peer set loop
 	pm.chainHeadCh = make(chan core.ChainHeadEvent)
 	pm.chainHeadSub = pm.blockchain.SubscribeChainHeadEvent(pm.chainHeadCh)
@@ -306,8 +305,7 @@ func (pm *ProtocolManager) makeSelfNodeMeta() *NodeMeta {
 func (pm *ProtocolManager) Stop() {
 	log.Info("Stopping Ethereum protocol")
 
-	pm.txsSub.Unsubscribe()        // quits txBroadcastLoop
-	pm.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop
+	pm.txsSub.Unsubscribe() // quits txBroadcastLoop
 	pm.chainHeadSub.Unsubscribe()
 
 	// Quit the sync loop.
@@ -650,6 +648,10 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
 		}
 
 	case msg.Code == NewBlockHashesMsg:
+		// Ignore new block hash messages in block proposer mode.
+		if pm.isBlockProposer {
+			break
+		}
 		var announces newBlockHashesData
 		if err := msg.Decode(&announces); err != nil {
 			return errResp(ErrDecode, "%v: %v", msg, err)
@@ -670,6 +672,10 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
 		}
 
 	case msg.Code == NewBlockMsg:
+		// Ignore new block messages in block proposer mode.
+		if pm.isBlockProposer {
+			break
+		}
 		// Retrieve and decode the propagated block
 		var request newBlockData
 		if err := msg.Decode(&request); err != nil {
@@ -732,7 +738,13 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
 			p.MarkNodeMeta(meta.Hash())
 		}
 		pm.nodeTable.Add(metas)
+
+	// Block proposer-only messages.
+
 	case msg.Code == LatticeBlockMsg:
+		if !pm.isBlockProposer {
+			break
+		}
 		var block coreTypes.Block
 		if err := msg.Decode(&block); err != nil {
 			return errResp(ErrDecode, "msg %v: %v", msg, err)
@@ -740,6 +752,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
 		pm.cache.addBlock(&block)
 		pm.receiveCh <- &block
 	case msg.Code == VoteMsg:
+		if !pm.isBlockProposer {
+			break
+		}
 		var vote coreTypes.Vote
 		if err := msg.Decode(&vote); err != nil {
 			return errResp(ErrDecode, "msg %v: %v", msg, err)
@@ -749,6 +764,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
 		}
 		pm.receiveCh <- &vote
 	case msg.Code == AgreementMsg:
+		if !pm.isBlockProposer {
+			break
+		}
 		// DKG set is receiver
 		var agreement coreTypes.AgreementResult
 		if err := msg.Decode(&agreement); err != nil {
@@ -756,6 +774,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
 		}
 		pm.receiveCh <- &agreement
 	case msg.Code == RandomnessMsg:
+		if !pm.isBlockProposer {
+			break
+		}
 		// Broadcast this to all peer
 		var randomness coreTypes.BlockRandomnessResult
 		if err := msg.Decode(&randomness); err != nil {
@@ -763,6 +784,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
 		}
 		pm.receiveCh <- &randomness
 	case msg.Code == DKGPrivateShareMsg:
+		if !pm.isBlockProposer {
+			break
+		}
 		// Do not relay this msg
 		var ps dkgTypes.PrivateShare
 		if err := msg.Decode(&ps); err != nil {
@@ -770,6 +794,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
 		}
 		pm.receiveCh <- &ps
 	case msg.Code == DKGPartialSignatureMsg:
+		if !pm.isBlockProposer {
+			break
+		}
 		// broadcast in DKG set
 		var psig dkgTypes.PartialSignature
 		if err := msg.Decode(&psig); err != nil {
@@ -777,6 +804,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
 		}
 		pm.receiveCh <- &psig
 	case msg.Code == PullBlocksMsg:
+		if !pm.isBlockProposer {
+			break
+		}
 		var hashes coreCommon.Hashes
 		if err := msg.Decode(&hashes); err != nil {
 			return errResp(ErrDecode, "msg %v: %v", msg, err)
@@ -789,6 +819,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
 			}
 		}
 	case msg.Code == PullVotesMsg:
+		if !pm.isBlockProposer {
+			break
+		}
 		var pos coreTypes.Position
 		if err := msg.Decode(&pos); err != nil {
 			return errResp(ErrDecode, "msg %v: %v", msg, err)
@@ -1003,17 +1036,6 @@ func (pm *ProtocolManager) BroadcastPullVotes(
 	}
 }
 
-// Mined broadcast loop
-func (pm *ProtocolManager) minedBroadcastLoop() {
-	// automatically stops if unsubscribe
-	for obj := range pm.minedBlockSub.Chan() {
-		if ev, ok := obj.Data.(core.NewMinedBlockEvent); ok {
-			pm.BroadcastBlock(ev.Block, true)  // First propagate block to peers
-			pm.BroadcastBlock(ev.Block, false) // Only then announce to the rest
-		}
-	}
-}
-
 func (pm *ProtocolManager) txBroadcastLoop() {
 	for {
 		select {
@@ -1053,7 +1075,10 @@ func (pm *ProtocolManager) peerSetLoop() {
 
 	for {
 		select {
-		case <-pm.chainHeadCh:
+		case event := <-pm.chainHeadCh:
+			pm.BroadcastBlock(event.Block, true)  // First propagate block to peers
+			pm.BroadcastBlock(event.Block, false) // Only then announce to the rest
+
 			newRound := pm.gov.LenCRS() - 1
 			log.Trace("new round", "round", newRound)
 			if newRound == round {
diff --git a/eth/backend.go b/eth/backend.go
index 04e4569f2..a6a558823 100644
--- a/eth/backend.go
+++ b/eth/backend.go
@@ -173,7 +173,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
 	if config.TxPool.Journal != "" {
 		config.TxPool.Journal = ctx.ResolvePath(config.TxPool.Journal)
 	}
-	eth.txPool = core.NewTxPool(config.TxPool, eth.chainConfig, eth.blockchain)
+	eth.txPool = core.NewTxPool(config.TxPool, eth.chainConfig, eth.blockchain, false)
 
 	if eth.protocolManager, err = NewProtocolManager(eth.chainConfig, config.SyncMode, config.NetworkId, eth.eventMux, eth.txPool, eth.engine, eth.blockchain, chainDb, config.Whitelist); err != nil {
 		return nil, err
diff --git a/p2p/server.go b/p2p/server.go
index 8cd2863b3..36b1721a5 100644
--- a/p2p/server.go
+++ b/p2p/server.go
@@ -33,13 +33,13 @@ import (
 	"github.com/dexon-foundation/dexon/crypto"
 	"github.com/dexon-foundation/dexon/event"
 	"github.com/dexon-foundation/dexon/log"
+	"github.com/dexon-foundation/dexon/p2p/discover"
 	"github.com/dexon-foundation/dexon/p2p/discv5"
 	"github.com/dexon-foundation/dexon/p2p/enode"
 	"github.com/dexon-foundation/dexon/p2p/enr"
 	"github.com/dexon-foundation/dexon/p2p/nat"
 	"github.com/dexon-foundation/dexon/p2p/netutil"
 	"github.com/dexon-foundation/dexon/rlp"
-	"github.com/ethereum/go-ethereum/p2p/discover"
 )
 
 const (
diff --git a/test/run_test.sh b/test/run_test.sh
index a7dd934bc..f535a609d 100755
--- a/test/run_test.sh
+++ b/test/run_test.sh
@@ -15,14 +15,20 @@ rm -f log-latest
 ln -s $logsdir log-latest
 
 # A standalone RPC server for accepting RPC requests.
-# datadir=$PWD/Dexon.rpc
-# rm -rf $datadir
-# $GDEX --datadir=$datadir init genesis.json
-# $GDEX --verbosity=4 --gcmode=archive --datadir=$datadir \
-#   --rpc --rpcapi=eth,net,web3,debug --rpcaddr=0.0.0.0 --rpcport=8543 \
-#   --ws --wsapi=eth,net,web3,debug --wsaddr=0.0.0.0 --wsport=8544  \
-#   --wsorigins='*' --rpcvhosts='*' --rpccorsdomain="*" \
-#   > $logsdir/gdex.rpc.log 2>&1 &
+datadir=$PWD/Dexon.rpc
+rm -rf $datadir
+$GDEX --datadir=$datadir init genesis.json
+$GDEX \
+  --testnet \
+  --verbosity=4 \
+  --gcmode=archive \
+  --datadir=$datadir \
+  --rpc --rpcapi=eth,net,web3,debug \
+  --rpcaddr=0.0.0.0 --rpcport=8545 \
+  --ws --wsapi=eth,net,web3,debug \
+  --wsaddr=0.0.0.0 --wsport=8546  \
+  --wsorigins='*' --rpcvhosts='*' --rpccorsdomain="*" \
+  > $logsdir/gdex.rpc.log 2>&1 &
 
 # Nodes
 for i in $(seq 0 3); do
@@ -37,9 +43,9 @@ for i in $(seq 0 3); do
     --datadir=$datadir --nodekey=test$i.nodekey \
     --port=$((30305 + $i)) \
     --rpc --rpcapi=eth,net,web3,debug \
-    --rpcaddr=0.0.0.0 --rpcport=$((8545 + $i * 2)) \
+    --rpcaddr=0.0.0.0 --rpcport=$((8547 + $i * 2)) \
     --ws --wsapi=eth,net,web3,debug \
-    --wsaddr=0.0.0.0 --wsport=$((8546 + $i * 2)) \
+    --wsaddr=0.0.0.0 --wsport=$((8548 + $i * 2)) \
     --wsorigins='*' --rpcvhosts='*' --rpccorsdomain="*" \
     --pprof --pprofaddr=localhost --pprofport=$((6060 + $i)) \
     > $logsdir/gdex.$i.log 2>&1 &
-- 
cgit v1.2.3