diff options
Diffstat (limited to 'whisper/whisperv6/peer_test.go')
-rw-r--r-- | whisper/whisperv6/peer_test.go | 186 |
1 files changed, 135 insertions, 51 deletions
diff --git a/whisper/whisperv6/peer_test.go b/whisper/whisperv6/peer_test.go index 8a65cb714..ec985ae65 100644 --- a/whisper/whisperv6/peer_test.go +++ b/whisper/whisperv6/peer_test.go @@ -23,17 +23,19 @@ import ( mrand "math/rand" "net" "sync" + "sync/atomic" "testing" "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/discover" "github.com/ethereum/go-ethereum/p2p/nat" ) -var keys []string = []string{ +var keys = []string{ "d49dcf37238dc8a7aac57dc61b9fee68f0a97f062968978b9fafa7d1033d03a9", "73fd6143c48e80ed3c56ea159fe7494a0b6b393a392227b422f4c3e8f1b54f98", "119dd32adb1daa7a4c7bf77f847fb28730785aa92947edf42fdd997b54de40dc", @@ -69,9 +71,8 @@ var keys []string = []string{ "7184c1701569e3a4c4d2ddce691edd983b81e42e09196d332e1ae2f1e062cff4", } -const NumNodes = 16 // must not exceed the number of keys (32) - type TestData struct { + started int64 counter [NumNodes]int mutex sync.RWMutex } @@ -80,24 +81,32 @@ type TestNode struct { shh *Whisper id *ecdsa.PrivateKey server *p2p.Server - filerId string + filerID string } +const NumNodes = 8 // must not exceed the number of keys (32) + var result TestData var nodes [NumNodes]*TestNode -var sharedKey []byte = []byte("some arbitrary data here") -var sharedTopic TopicType = TopicType{0xF, 0x1, 0x2, 0} -var expectedMessage []byte = []byte("per rectum ad astra") +var sharedKey = hexutil.MustDecode("0x03ca634cae0d49acb401d8a4c6b6fe8c55b70d115bf400769cc1400f3258cd31") +var wrongKey = hexutil.MustDecode("0xf91156714d7ec88d3edc1c652c2181dbb3044e8771c683f3b30d33c12b986b11") +var sharedTopic = TopicType{0xF, 0x1, 0x2, 0} +var wrongTopic = TopicType{0, 0, 0, 0} +var expectedMessage = []byte("per aspera ad astra") +var unexpectedMessage = []byte("per rectum ad astra") var masterBloomFilter []byte var masterPow = 0.00000001 -var round int = 1 +var round = 1 +var debugMode = false +var prevTime time.Time +var cntPrev int func TestSimulation(t *testing.T) { // create a chain of whisper nodes, // installs the filters with shared (predefined) parameters initialize(t) - // each node sends a number of random (undecryptable) messages + // each node sends one random (not decryptable) message for i := 0; i < NumNodes; i++ { sendMsg(t, false, i) } @@ -114,7 +123,6 @@ func TestSimulation(t *testing.T) { // send new pow and bloom exchange messages resetParams(t) - round++ // node #1 sends one expected (decryptable) message sendMsg(t, true, 1) @@ -122,11 +130,6 @@ func TestSimulation(t *testing.T) { // check if each node (except node #0) have received and decrypted exactly one message checkPropagation(t, false) - for i := 1; i < NumNodes; i++ { - time.Sleep(20 * time.Millisecond) - sendMsg(t, true, i) - } - // check if corresponding protocol-level messages were correctly decoded checkPowExchangeForNodeZero(t) checkBloomFilterExchange(t) @@ -144,10 +147,12 @@ func resetParams(t *testing.T) { for i := 0; i < NumNodes; i++ { nodes[i].shh.SetBloomFilter(masterBloomFilter) } + + round++ } func initBloom(t *testing.T) { - masterBloomFilter = make([]byte, bloomFilterSize) + masterBloomFilter = make([]byte, BloomFilterSize) _, err := mrand.Read(masterBloomFilter) if err != nil { t.Fatalf("rand failed: %s.", err) @@ -159,7 +164,7 @@ func initBloom(t *testing.T) { masterBloomFilter[i] = 0xFF } - if !bloomFilterMatch(masterBloomFilter, msgBloom) { + if !BloomFilterMatch(masterBloomFilter, msgBloom) { t.Fatalf("bloom mismatch on initBloom.") } } @@ -173,7 +178,7 @@ func initialize(t *testing.T) { for i := 0; i < NumNodes; i++ { var node TestNode - b := make([]byte, bloomFilterSize) + b := make([]byte, BloomFilterSize) copy(b, masterBloomFilter) node.shh = New(&DefaultConfig) node.shh.SetMinimumPoW(masterPow) @@ -186,7 +191,7 @@ func initialize(t *testing.T) { topics = append(topics, sharedTopic) f := Filter{KeySym: sharedKey} f.Topics = [][]byte{topics[0][:]} - node.filerId, err = node.shh.Subscribe(&f) + node.filerID, err = node.shh.Subscribe(&f) if err != nil { t.Fatalf("failed to install the filter: %s.", err) } @@ -199,9 +204,9 @@ func initialize(t *testing.T) { name := common.MakeName("whisper-go", "2.0") var peers []*discover.Node if i > 0 { - peerNodeId := nodes[i-1].id + peerNodeID := nodes[i-1].id peerPort := uint16(port - 1) - peerNode := discover.PubkeyID(&peerNodeId.PublicKey) + peerNode := discover.PubkeyID(&peerNodeID.PublicKey) peer := discover.NewNode(peerNode, ip, peerPort, peerPort) peers = append(peers, peer) } @@ -223,22 +228,27 @@ func initialize(t *testing.T) { nodes[i] = &node } - for i := 1; i < NumNodes; i++ { - go nodes[i].server.Start() + for i := 0; i < NumNodes; i++ { + go startServer(t, nodes[i].server) } - // we need to wait until the first node actually starts - err = nodes[0].server.Start() + waitForServersToStart(t) +} + +func startServer(t *testing.T, s *p2p.Server) { + err := s.Start() if err != nil { t.Fatalf("failed to start the fisrt server.") } + + atomic.AddInt64(&result.started, 1) } func stopServers() { for i := 0; i < NumNodes; i++ { n := nodes[i] if n != nil { - n.shh.Unsubscribe(n.filerId) + n.shh.Unsubscribe(n.filerID) n.shh.Stop() n.server.Stop() } @@ -250,8 +260,10 @@ func checkPropagation(t *testing.T, includingNodeZero bool) { return } - const cycle = 50 - const iterations = 200 + prevTime = time.Now() + // (cycle * iterations) should not exceed 50 seconds, since TTL=50 + const cycle = 200 // time in milliseconds + const iterations = 250 first := 0 if !includingNodeZero { @@ -260,35 +272,35 @@ func checkPropagation(t *testing.T, includingNodeZero bool) { for j := 0; j < iterations; j++ { for i := first; i < NumNodes; i++ { - f := nodes[i].shh.GetFilter(nodes[i].filerId) + f := nodes[i].shh.GetFilter(nodes[i].filerID) if f == nil { - t.Fatalf("failed to get filterId %s from node %d, round %d.", nodes[i].filerId, i, round) + t.Fatalf("failed to get filterId %s from node %d, round %d.", nodes[i].filerID, i, round) } mail := f.Retrieve() - if !validateMail(t, i, mail) { - return - } + validateMail(t, i, mail) if isTestComplete() { + checkTestStatus() return } } + checkTestStatus() time.Sleep(cycle * time.Millisecond) } - t.Fatalf("Test was not complete: timeout %d seconds.", iterations*cycle/1000) - if !includingNodeZero { - f := nodes[0].shh.GetFilter(nodes[0].filerId) + f := nodes[0].shh.GetFilter(nodes[0].filerID) if f != nil { t.Fatalf("node zero received a message with low PoW.") } } + + t.Fatalf("Test was not complete (%d round): timeout %d seconds. nodes=%v", round, iterations*cycle/1000, nodes) } -func validateMail(t *testing.T, index int, mail []*ReceivedMessage) bool { +func validateMail(t *testing.T, index int, mail []*ReceivedMessage) { var cnt int for _, m := range mail { if bytes.Equal(m.Payload, expectedMessage) { @@ -298,14 +310,13 @@ func validateMail(t *testing.T, index int, mail []*ReceivedMessage) bool { if cnt == 0 { // no messages received yet: nothing is wrong - return true + return } if cnt > 1 { t.Fatalf("node %d received %d.", index, cnt) - return false } - if cnt > 0 { + if cnt == 1 { result.mutex.Lock() defer result.mutex.Unlock() result.counter[index] += cnt @@ -313,7 +324,28 @@ func validateMail(t *testing.T, index int, mail []*ReceivedMessage) bool { t.Fatalf("node %d accumulated %d.", index, result.counter[index]) } } - return true +} + +func checkTestStatus() { + var cnt int + var arr [NumNodes]int + + for i := 0; i < NumNodes; i++ { + arr[i] = nodes[i].server.PeerCount() + envelopes := nodes[i].shh.Envelopes() + if len(envelopes) >= NumNodes { + cnt++ + } + } + + if debugMode { + if cntPrev != cnt { + fmt.Printf(" %v \t number of nodes that have received all msgs: %d, number of peers per node: %v \n", + time.Since(prevTime), cnt, arr) + prevTime = time.Now() + cntPrev = cnt + } + } } func isTestComplete() bool { @@ -328,7 +360,7 @@ func isTestComplete() bool { for i := 0; i < NumNodes; i++ { envelopes := nodes[i].shh.Envelopes() - if len(envelopes) < 2 { + if len(envelopes) < NumNodes+1 { return false } } @@ -343,9 +375,10 @@ func sendMsg(t *testing.T, expected bool, id int) { opt := MessageParams{KeySym: sharedKey, Topic: sharedTopic, Payload: expectedMessage, PoW: 0.00000001, WorkTime: 1} if !expected { - opt.KeySym[0]++ - opt.Topic[0]++ - opt.Payload = opt.Payload[1:] + opt.KeySym = wrongKey + opt.Topic = wrongTopic + opt.Payload = unexpectedMessage + opt.Payload[0] = byte(id) } msg, err := NewSentMessage(&opt) @@ -389,20 +422,37 @@ func TestPeerBasic(t *testing.T) { } func checkPowExchangeForNodeZero(t *testing.T) { + const iterations = 200 + for j := 0; j < iterations; j++ { + lastCycle := (j == iterations-1) + ok := checkPowExchangeForNodeZeroOnce(t, lastCycle) + if ok { + break + } + time.Sleep(50 * time.Millisecond) + } +} + +func checkPowExchangeForNodeZeroOnce(t *testing.T, mustPass bool) bool { cnt := 0 for i, node := range nodes { for peer := range node.shh.peers { if peer.peer.ID() == discover.PubkeyID(&nodes[0].id.PublicKey) { cnt++ if peer.powRequirement != masterPow { - t.Fatalf("node %d: failed to set the new pow requirement.", i) + if mustPass { + t.Fatalf("node %d: failed to set the new pow requirement for node zero.", i) + } else { + return false + } } } } } if cnt == 0 { - t.Fatalf("no matching peers found.") + t.Fatalf("looking for node zero: no matching peers found.") } + return true } func checkPowExchange(t *testing.T) { @@ -418,13 +468,47 @@ func checkPowExchange(t *testing.T) { } } -func checkBloomFilterExchange(t *testing.T) { +func checkBloomFilterExchangeOnce(t *testing.T, mustPass bool) bool { for i, node := range nodes { for peer := range node.shh.peers { - if !bytes.Equal(peer.bloomFilter, masterBloomFilter) { - t.Fatalf("node %d: failed to exchange bloom filter requirement in round %d. \n%x expected \n%x got", - i, round, masterBloomFilter, peer.bloomFilter) + peer.bloomMu.Lock() + equals := bytes.Equal(peer.bloomFilter, masterBloomFilter) + peer.bloomMu.Unlock() + if !equals { + if mustPass { + t.Fatalf("node %d: failed to exchange bloom filter requirement in round %d. \n%x expected \n%x got", + i, round, masterBloomFilter, peer.bloomFilter) + } else { + return false + } } } } + + return true +} + +func checkBloomFilterExchange(t *testing.T) { + const iterations = 200 + for j := 0; j < iterations; j++ { + lastCycle := (j == iterations-1) + ok := checkBloomFilterExchangeOnce(t, lastCycle) + if ok { + break + } + time.Sleep(50 * time.Millisecond) + } +} + +func waitForServersToStart(t *testing.T) { + const iterations = 200 + var started int64 + for j := 0; j < iterations; j++ { + time.Sleep(50 * time.Millisecond) + started = atomic.LoadInt64(&result.started) + if started == NumNodes { + return + } + } + t.Fatalf("Failed to start all the servers, running: %d", started) } |