aboutsummaryrefslogtreecommitdiffstats
path: root/whisper/whisperv6/peer_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'whisper/whisperv6/peer_test.go')
-rw-r--r--whisper/whisperv6/peer_test.go186
1 files changed, 135 insertions, 51 deletions
diff --git a/whisper/whisperv6/peer_test.go b/whisper/whisperv6/peer_test.go
index 8a65cb714..ec985ae65 100644
--- a/whisper/whisperv6/peer_test.go
+++ b/whisper/whisperv6/peer_test.go
@@ -23,17 +23,19 @@ import (
mrand "math/rand"
"net"
"sync"
+ "sync/atomic"
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/p2p/nat"
)
-var keys []string = []string{
+var keys = []string{
"d49dcf37238dc8a7aac57dc61b9fee68f0a97f062968978b9fafa7d1033d03a9",
"73fd6143c48e80ed3c56ea159fe7494a0b6b393a392227b422f4c3e8f1b54f98",
"119dd32adb1daa7a4c7bf77f847fb28730785aa92947edf42fdd997b54de40dc",
@@ -69,9 +71,8 @@ var keys []string = []string{
"7184c1701569e3a4c4d2ddce691edd983b81e42e09196d332e1ae2f1e062cff4",
}
-const NumNodes = 16 // must not exceed the number of keys (32)
-
type TestData struct {
+ started int64
counter [NumNodes]int
mutex sync.RWMutex
}
@@ -80,24 +81,32 @@ type TestNode struct {
shh *Whisper
id *ecdsa.PrivateKey
server *p2p.Server
- filerId string
+ filerID string
}
+const NumNodes = 8 // must not exceed the number of keys (32)
+
var result TestData
var nodes [NumNodes]*TestNode
-var sharedKey []byte = []byte("some arbitrary data here")
-var sharedTopic TopicType = TopicType{0xF, 0x1, 0x2, 0}
-var expectedMessage []byte = []byte("per rectum ad astra")
+var sharedKey = hexutil.MustDecode("0x03ca634cae0d49acb401d8a4c6b6fe8c55b70d115bf400769cc1400f3258cd31")
+var wrongKey = hexutil.MustDecode("0xf91156714d7ec88d3edc1c652c2181dbb3044e8771c683f3b30d33c12b986b11")
+var sharedTopic = TopicType{0xF, 0x1, 0x2, 0}
+var wrongTopic = TopicType{0, 0, 0, 0}
+var expectedMessage = []byte("per aspera ad astra")
+var unexpectedMessage = []byte("per rectum ad astra")
var masterBloomFilter []byte
var masterPow = 0.00000001
-var round int = 1
+var round = 1
+var debugMode = false
+var prevTime time.Time
+var cntPrev int
func TestSimulation(t *testing.T) {
// create a chain of whisper nodes,
// installs the filters with shared (predefined) parameters
initialize(t)
- // each node sends a number of random (undecryptable) messages
+ // each node sends one random (not decryptable) message
for i := 0; i < NumNodes; i++ {
sendMsg(t, false, i)
}
@@ -114,7 +123,6 @@ func TestSimulation(t *testing.T) {
// send new pow and bloom exchange messages
resetParams(t)
- round++
// node #1 sends one expected (decryptable) message
sendMsg(t, true, 1)
@@ -122,11 +130,6 @@ func TestSimulation(t *testing.T) {
// check if each node (except node #0) have received and decrypted exactly one message
checkPropagation(t, false)
- for i := 1; i < NumNodes; i++ {
- time.Sleep(20 * time.Millisecond)
- sendMsg(t, true, i)
- }
-
// check if corresponding protocol-level messages were correctly decoded
checkPowExchangeForNodeZero(t)
checkBloomFilterExchange(t)
@@ -144,10 +147,12 @@ func resetParams(t *testing.T) {
for i := 0; i < NumNodes; i++ {
nodes[i].shh.SetBloomFilter(masterBloomFilter)
}
+
+ round++
}
func initBloom(t *testing.T) {
- masterBloomFilter = make([]byte, bloomFilterSize)
+ masterBloomFilter = make([]byte, BloomFilterSize)
_, err := mrand.Read(masterBloomFilter)
if err != nil {
t.Fatalf("rand failed: %s.", err)
@@ -159,7 +164,7 @@ func initBloom(t *testing.T) {
masterBloomFilter[i] = 0xFF
}
- if !bloomFilterMatch(masterBloomFilter, msgBloom) {
+ if !BloomFilterMatch(masterBloomFilter, msgBloom) {
t.Fatalf("bloom mismatch on initBloom.")
}
}
@@ -173,7 +178,7 @@ func initialize(t *testing.T) {
for i := 0; i < NumNodes; i++ {
var node TestNode
- b := make([]byte, bloomFilterSize)
+ b := make([]byte, BloomFilterSize)
copy(b, masterBloomFilter)
node.shh = New(&DefaultConfig)
node.shh.SetMinimumPoW(masterPow)
@@ -186,7 +191,7 @@ func initialize(t *testing.T) {
topics = append(topics, sharedTopic)
f := Filter{KeySym: sharedKey}
f.Topics = [][]byte{topics[0][:]}
- node.filerId, err = node.shh.Subscribe(&f)
+ node.filerID, err = node.shh.Subscribe(&f)
if err != nil {
t.Fatalf("failed to install the filter: %s.", err)
}
@@ -199,9 +204,9 @@ func initialize(t *testing.T) {
name := common.MakeName("whisper-go", "2.0")
var peers []*discover.Node
if i > 0 {
- peerNodeId := nodes[i-1].id
+ peerNodeID := nodes[i-1].id
peerPort := uint16(port - 1)
- peerNode := discover.PubkeyID(&peerNodeId.PublicKey)
+ peerNode := discover.PubkeyID(&peerNodeID.PublicKey)
peer := discover.NewNode(peerNode, ip, peerPort, peerPort)
peers = append(peers, peer)
}
@@ -223,22 +228,27 @@ func initialize(t *testing.T) {
nodes[i] = &node
}
- for i := 1; i < NumNodes; i++ {
- go nodes[i].server.Start()
+ for i := 0; i < NumNodes; i++ {
+ go startServer(t, nodes[i].server)
}
- // we need to wait until the first node actually starts
- err = nodes[0].server.Start()
+ waitForServersToStart(t)
+}
+
+func startServer(t *testing.T, s *p2p.Server) {
+ err := s.Start()
if err != nil {
t.Fatalf("failed to start the fisrt server.")
}
+
+ atomic.AddInt64(&result.started, 1)
}
func stopServers() {
for i := 0; i < NumNodes; i++ {
n := nodes[i]
if n != nil {
- n.shh.Unsubscribe(n.filerId)
+ n.shh.Unsubscribe(n.filerID)
n.shh.Stop()
n.server.Stop()
}
@@ -250,8 +260,10 @@ func checkPropagation(t *testing.T, includingNodeZero bool) {
return
}
- const cycle = 50
- const iterations = 200
+ prevTime = time.Now()
+ // (cycle * iterations) should not exceed 50 seconds, since TTL=50
+ const cycle = 200 // time in milliseconds
+ const iterations = 250
first := 0
if !includingNodeZero {
@@ -260,35 +272,35 @@ func checkPropagation(t *testing.T, includingNodeZero bool) {
for j := 0; j < iterations; j++ {
for i := first; i < NumNodes; i++ {
- f := nodes[i].shh.GetFilter(nodes[i].filerId)
+ f := nodes[i].shh.GetFilter(nodes[i].filerID)
if f == nil {
- t.Fatalf("failed to get filterId %s from node %d, round %d.", nodes[i].filerId, i, round)
+ t.Fatalf("failed to get filterId %s from node %d, round %d.", nodes[i].filerID, i, round)
}
mail := f.Retrieve()
- if !validateMail(t, i, mail) {
- return
- }
+ validateMail(t, i, mail)
if isTestComplete() {
+ checkTestStatus()
return
}
}
+ checkTestStatus()
time.Sleep(cycle * time.Millisecond)
}
- t.Fatalf("Test was not complete: timeout %d seconds.", iterations*cycle/1000)
-
if !includingNodeZero {
- f := nodes[0].shh.GetFilter(nodes[0].filerId)
+ f := nodes[0].shh.GetFilter(nodes[0].filerID)
if f != nil {
t.Fatalf("node zero received a message with low PoW.")
}
}
+
+ t.Fatalf("Test was not complete (%d round): timeout %d seconds. nodes=%v", round, iterations*cycle/1000, nodes)
}
-func validateMail(t *testing.T, index int, mail []*ReceivedMessage) bool {
+func validateMail(t *testing.T, index int, mail []*ReceivedMessage) {
var cnt int
for _, m := range mail {
if bytes.Equal(m.Payload, expectedMessage) {
@@ -298,14 +310,13 @@ func validateMail(t *testing.T, index int, mail []*ReceivedMessage) bool {
if cnt == 0 {
// no messages received yet: nothing is wrong
- return true
+ return
}
if cnt > 1 {
t.Fatalf("node %d received %d.", index, cnt)
- return false
}
- if cnt > 0 {
+ if cnt == 1 {
result.mutex.Lock()
defer result.mutex.Unlock()
result.counter[index] += cnt
@@ -313,7 +324,28 @@ func validateMail(t *testing.T, index int, mail []*ReceivedMessage) bool {
t.Fatalf("node %d accumulated %d.", index, result.counter[index])
}
}
- return true
+}
+
+func checkTestStatus() {
+ var cnt int
+ var arr [NumNodes]int
+
+ for i := 0; i < NumNodes; i++ {
+ arr[i] = nodes[i].server.PeerCount()
+ envelopes := nodes[i].shh.Envelopes()
+ if len(envelopes) >= NumNodes {
+ cnt++
+ }
+ }
+
+ if debugMode {
+ if cntPrev != cnt {
+ fmt.Printf(" %v \t number of nodes that have received all msgs: %d, number of peers per node: %v \n",
+ time.Since(prevTime), cnt, arr)
+ prevTime = time.Now()
+ cntPrev = cnt
+ }
+ }
}
func isTestComplete() bool {
@@ -328,7 +360,7 @@ func isTestComplete() bool {
for i := 0; i < NumNodes; i++ {
envelopes := nodes[i].shh.Envelopes()
- if len(envelopes) < 2 {
+ if len(envelopes) < NumNodes+1 {
return false
}
}
@@ -343,9 +375,10 @@ func sendMsg(t *testing.T, expected bool, id int) {
opt := MessageParams{KeySym: sharedKey, Topic: sharedTopic, Payload: expectedMessage, PoW: 0.00000001, WorkTime: 1}
if !expected {
- opt.KeySym[0]++
- opt.Topic[0]++
- opt.Payload = opt.Payload[1:]
+ opt.KeySym = wrongKey
+ opt.Topic = wrongTopic
+ opt.Payload = unexpectedMessage
+ opt.Payload[0] = byte(id)
}
msg, err := NewSentMessage(&opt)
@@ -389,20 +422,37 @@ func TestPeerBasic(t *testing.T) {
}
func checkPowExchangeForNodeZero(t *testing.T) {
+ const iterations = 200
+ for j := 0; j < iterations; j++ {
+ lastCycle := (j == iterations-1)
+ ok := checkPowExchangeForNodeZeroOnce(t, lastCycle)
+ if ok {
+ break
+ }
+ time.Sleep(50 * time.Millisecond)
+ }
+}
+
+func checkPowExchangeForNodeZeroOnce(t *testing.T, mustPass bool) bool {
cnt := 0
for i, node := range nodes {
for peer := range node.shh.peers {
if peer.peer.ID() == discover.PubkeyID(&nodes[0].id.PublicKey) {
cnt++
if peer.powRequirement != masterPow {
- t.Fatalf("node %d: failed to set the new pow requirement.", i)
+ if mustPass {
+ t.Fatalf("node %d: failed to set the new pow requirement for node zero.", i)
+ } else {
+ return false
+ }
}
}
}
}
if cnt == 0 {
- t.Fatalf("no matching peers found.")
+ t.Fatalf("looking for node zero: no matching peers found.")
}
+ return true
}
func checkPowExchange(t *testing.T) {
@@ -418,13 +468,47 @@ func checkPowExchange(t *testing.T) {
}
}
-func checkBloomFilterExchange(t *testing.T) {
+func checkBloomFilterExchangeOnce(t *testing.T, mustPass bool) bool {
for i, node := range nodes {
for peer := range node.shh.peers {
- if !bytes.Equal(peer.bloomFilter, masterBloomFilter) {
- t.Fatalf("node %d: failed to exchange bloom filter requirement in round %d. \n%x expected \n%x got",
- i, round, masterBloomFilter, peer.bloomFilter)
+ peer.bloomMu.Lock()
+ equals := bytes.Equal(peer.bloomFilter, masterBloomFilter)
+ peer.bloomMu.Unlock()
+ if !equals {
+ if mustPass {
+ t.Fatalf("node %d: failed to exchange bloom filter requirement in round %d. \n%x expected \n%x got",
+ i, round, masterBloomFilter, peer.bloomFilter)
+ } else {
+ return false
+ }
}
}
}
+
+ return true
+}
+
+func checkBloomFilterExchange(t *testing.T) {
+ const iterations = 200
+ for j := 0; j < iterations; j++ {
+ lastCycle := (j == iterations-1)
+ ok := checkBloomFilterExchangeOnce(t, lastCycle)
+ if ok {
+ break
+ }
+ time.Sleep(50 * time.Millisecond)
+ }
+}
+
+func waitForServersToStart(t *testing.T) {
+ const iterations = 200
+ var started int64
+ for j := 0; j < iterations; j++ {
+ time.Sleep(50 * time.Millisecond)
+ started = atomic.LoadInt64(&result.started)
+ if started == NumNodes {
+ return
+ }
+ }
+ t.Fatalf("Failed to start all the servers, running: %d", started)
}