aboutsummaryrefslogtreecommitdiffstats
path: root/eth
diff options
context:
space:
mode:
authorledgerwatch <akhounov@gmail.com>2018-09-30 04:17:06 +0800
committerFelix Lange <fjl@users.noreply.github.com>2018-09-30 04:17:06 +0800
commit3d782bc727fb58630b1d811b9ab95d626c68e40f (patch)
tree0a3ff6ca73bb9e408c0f48d06614dc5488e67f9a /eth
parent01d9f2980531814acc5c87e978966c26ac1bc4c2 (diff)
downloaddexon-3d782bc727fb58630b1d811b9ab95d626c68e40f.tar
dexon-3d782bc727fb58630b1d811b9ab95d626c68e40f.tar.gz
dexon-3d782bc727fb58630b1d811b9ab95d626c68e40f.tar.bz2
dexon-3d782bc727fb58630b1d811b9ab95d626c68e40f.tar.lz
dexon-3d782bc727fb58630b1d811b9ab95d626c68e40f.tar.xz
dexon-3d782bc727fb58630b1d811b9ab95d626c68e40f.tar.zst
dexon-3d782bc727fb58630b1d811b9ab95d626c68e40f.zip
eth: broadcast blocks to at least 4 peers (#17725)
Diffstat (limited to 'eth')
-rw-r--r--eth/handler.go12
-rw-r--r--eth/handler_test.go105
2 files changed, 109 insertions, 8 deletions
diff --git a/eth/handler.go b/eth/handler.go
index 551781ef0..1f62d820e 100644
--- a/eth/handler.go
+++ b/eth/handler.go
@@ -49,6 +49,9 @@ const (
// txChanSize is the size of channel listening to NewTxsEvent.
// The number is referenced from the size of tx pool.
txChanSize = 4096
+
+ // minimim number of peers to broadcast new blocks to
+ minBroadcastPeers = 4
)
var (
@@ -705,7 +708,14 @@ func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) {
return
}
// Send the block to a subset of our peers
- transfer := peers[:int(math.Sqrt(float64(len(peers))))]
+ transferLen := int(math.Sqrt(float64(len(peers))))
+ if transferLen < minBroadcastPeers {
+ transferLen = minBroadcastPeers
+ }
+ if transferLen > len(peers) {
+ transferLen = len(peers)
+ }
+ transfer := peers[:transferLen]
for _, peer := range transfer {
peer.AsyncSendNewBlock(block, td)
}
diff --git a/eth/handler_test.go b/eth/handler_test.go
index 0885a0448..7811cd480 100644
--- a/eth/handler_test.go
+++ b/eth/handler_test.go
@@ -17,6 +17,7 @@
package eth
import (
+ "fmt"
"math"
"math/big"
"math/rand"
@@ -466,14 +467,17 @@ func testDAOChallenge(t *testing.T, localForked, remoteForked bool, timeout bool
}
// Create a DAO aware protocol manager
var (
- evmux = new(event.TypeMux)
- pow = ethash.NewFaker()
- db = ethdb.NewMemDatabase()
- config = &params.ChainConfig{DAOForkBlock: big.NewInt(1), DAOForkSupport: localForked}
- gspec = &core.Genesis{Config: config}
- genesis = gspec.MustCommit(db)
- blockchain, _ = core.NewBlockChain(db, nil, config, pow, vm.Config{}, nil)
+ evmux = new(event.TypeMux)
+ pow = ethash.NewFaker()
+ db = ethdb.NewMemDatabase()
+ config = &params.ChainConfig{DAOForkBlock: big.NewInt(1), DAOForkSupport: localForked}
+ gspec = &core.Genesis{Config: config}
+ genesis = gspec.MustCommit(db)
)
+ blockchain, err := core.NewBlockChain(db, nil, config, pow, vm.Config{}, nil)
+ if err != nil {
+ t.Fatalf("failed to create new blockchain: %v", err)
+ }
pm, err := NewProtocolManager(config, downloader.FullSync, DefaultConfig.NetworkId, evmux, new(testTxPool), pow, blockchain, db)
if err != nil {
t.Fatalf("failed to start test protocol manager: %v", err)
@@ -520,3 +524,90 @@ func testDAOChallenge(t *testing.T, localForked, remoteForked bool, timeout bool
}
}
}
+
+func TestBroadcastBlock(t *testing.T) {
+ var tests = []struct {
+ totalPeers int
+ broadcastExpected int
+ }{
+ {1, 1},
+ {2, 2},
+ {3, 3},
+ {4, 4},
+ {5, 4},
+ {9, 4},
+ {12, 4},
+ {16, 4},
+ {26, 5},
+ {100, 10},
+ }
+ for _, test := range tests {
+ testBroadcastBlock(t, test.totalPeers, test.broadcastExpected)
+ }
+}
+
+func testBroadcastBlock(t *testing.T, totalPeers, broadcastExpected int) {
+ var (
+ evmux = new(event.TypeMux)
+ pow = ethash.NewFaker()
+ db = ethdb.NewMemDatabase()
+ config = &params.ChainConfig{}
+ gspec = &core.Genesis{Config: config}
+ genesis = gspec.MustCommit(db)
+ )
+ blockchain, err := core.NewBlockChain(db, nil, config, pow, vm.Config{}, nil)
+ if err != nil {
+ t.Fatalf("failed to create new blockchain: %v", err)
+ }
+ pm, err := NewProtocolManager(config, downloader.FullSync, DefaultConfig.NetworkId, evmux, new(testTxPool), pow, blockchain, db)
+ if err != nil {
+ t.Fatalf("failed to start test protocol manager: %v", err)
+ }
+ pm.Start(1000)
+ defer pm.Stop()
+ var peers []*testPeer
+ for i := 0; i < totalPeers; i++ {
+ peer, _ := newTestPeer(fmt.Sprintf("peer %d", i), eth63, pm, true)
+ defer peer.close()
+ peers = append(peers, peer)
+ }
+ chain, _ := core.GenerateChain(gspec.Config, genesis, ethash.NewFaker(), db, 1, func(i int, gen *core.BlockGen) {})
+ pm.BroadcastBlock(chain[0], true /*propagate*/)
+
+ errCh := make(chan error, totalPeers)
+ doneCh := make(chan struct{}, totalPeers)
+ for _, peer := range peers {
+ go func(p *testPeer) {
+ if err := p2p.ExpectMsg(p.app, NewBlockMsg, &newBlockData{Block: chain[0], TD: big.NewInt(131136)}); err != nil {
+ errCh <- err
+ } else {
+ doneCh <- struct{}{}
+ }
+ }(peer)
+ }
+ timeoutCh := time.NewTimer(time.Millisecond * 100).C
+ var receivedCount int
+outer:
+ for {
+ select {
+ case err = <-errCh:
+ break outer
+ case <-doneCh:
+ receivedCount++
+ if receivedCount == totalPeers {
+ break outer
+ }
+ case <-timeoutCh:
+ break outer
+ }
+ }
+ for _, peer := range peers {
+ peer.app.Close()
+ }
+ if err != nil {
+ t.Errorf("error matching block by peer: %v", err)
+ }
+ if receivedCount != broadcastExpected {
+ t.Errorf("block broadcast to %d peers, expected %d", receivedCount, broadcastExpected)
+ }
+}