diff options
Diffstat (limited to 'p2p/discv5/sim_test.go')
-rw-r--r-- | p2p/discv5/sim_test.go | 464 |
1 files changed, 464 insertions, 0 deletions
diff --git a/p2p/discv5/sim_test.go b/p2p/discv5/sim_test.go new file mode 100644 index 000000000..2e232fbaa --- /dev/null +++ b/p2p/discv5/sim_test.go @@ -0,0 +1,464 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package discv5 + +import ( + "crypto/ecdsa" + "encoding/binary" + "fmt" + "math/rand" + "net" + "strconv" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common" +) + +// In this test, nodes try to randomly resolve each other. +func TestSimRandomResolve(t *testing.T) { + t.Skip("boring") + if runWithPlaygroundTime(t) { + return + } + + sim := newSimulation() + bootnode := sim.launchNode(false) + + // A new node joins every 10s. + launcher := time.NewTicker(10 * time.Second) + go func() { + for range launcher.C { + net := sim.launchNode(false) + go randomResolves(t, sim, net) + if err := net.SetFallbackNodes([]*Node{bootnode.Self()}); err != nil { + panic(err) + } + fmt.Printf("launched @ %v: %x\n", time.Now(), net.Self().ID[:16]) + } + }() + + time.Sleep(3 * time.Hour) + launcher.Stop() + sim.shutdown() + sim.printStats() +} + +func TestSimTopics(t *testing.T) { + t.Skip("NaCl test") + if runWithPlaygroundTime(t) { + return + } + + // glog.SetV(6) + // glog.SetToStderr(true) + + sim := newSimulation() + bootnode := sim.launchNode(false) + + go func() { + nets := make([]*Network, 1024) + for i, _ := range nets { + net := sim.launchNode(false) + nets[i] = net + if err := net.SetFallbackNodes([]*Node{bootnode.Self()}); err != nil { + panic(err) + } + time.Sleep(time.Second * 5) + } + + for i, net := range nets { + if i < 256 { + stop := make(chan struct{}) + go net.RegisterTopic(testTopic, stop) + go func() { + //time.Sleep(time.Second * 36000) + time.Sleep(time.Second * 40000) + close(stop) + }() + time.Sleep(time.Millisecond * 100) + } + // time.Sleep(time.Second * 10) + //time.Sleep(time.Second) + /*if i%500 == 499 { + time.Sleep(time.Second * 9501) + } else { + time.Sleep(time.Second) + }*/ + } + }() + + // A new node joins every 10s. + /* launcher := time.NewTicker(5 * time.Second) + cnt := 0 + var printNet *Network + go func() { + for range launcher.C { + cnt++ + if cnt <= 1000 { + log := false //(cnt == 500) + net := sim.launchNode(log) + if log { + printNet = net + } + if cnt > 500 { + go net.RegisterTopic(testTopic, nil) + } + if err := net.SetFallbackNodes([]*Node{bootnode.Self()}); err != nil { + panic(err) + } + } + //fmt.Printf("launched @ %v: %x\n", time.Now(), net.Self().ID[:16]) + } + }() + */ + time.Sleep(55000 * time.Second) + //launcher.Stop() + sim.shutdown() + //sim.printStats() + //printNet.log.printLogs() +} + +/*func testHierarchicalTopics(i int) []Topic { + digits := strconv.FormatInt(int64(256+i/4), 4) + res := make([]Topic, 5) + for i, _ := range res { + res[i] = Topic("foo" + digits[1:i+1]) + } + return res +}*/ + +func testHierarchicalTopics(i int) []Topic { + digits := strconv.FormatInt(int64(128+i/8), 2) + res := make([]Topic, 8) + for i, _ := range res { + res[i] = Topic("foo" + digits[1:i+1]) + } + return res +} + +func TestSimTopicHierarchy(t *testing.T) { + t.Skip("NaCl test") + if runWithPlaygroundTime(t) { + return + } + + // glog.SetV(6) + // glog.SetToStderr(true) + + sim := newSimulation() + bootnode := sim.launchNode(false) + + go func() { + nets := make([]*Network, 1024) + for i, _ := range nets { + net := sim.launchNode(false) + nets[i] = net + if err := net.SetFallbackNodes([]*Node{bootnode.Self()}); err != nil { + panic(err) + } + time.Sleep(time.Second * 5) + } + + stop := make(chan struct{}) + for i, net := range nets { + //if i < 256 { + for _, topic := range testHierarchicalTopics(i)[:5] { + //fmt.Println("reg", topic) + go net.RegisterTopic(topic, stop) + } + time.Sleep(time.Millisecond * 100) + //} + } + time.Sleep(time.Second * 90000) + close(stop) + }() + + time.Sleep(100000 * time.Second) + sim.shutdown() +} + +func randomResolves(t *testing.T, s *simulation, net *Network) { + randtime := func() time.Duration { + return time.Duration(rand.Intn(50)+20) * time.Second + } + lookup := func(target NodeID) bool { + result := net.Resolve(target) + return result != nil && result.ID == target + } + + timer := time.NewTimer(randtime()) + for { + select { + case <-timer.C: + target := s.randomNode().Self().ID + if !lookup(target) { + t.Errorf("node %x: target %x not found", net.Self().ID[:8], target[:8]) + } + timer.Reset(randtime()) + case <-net.closed: + return + } + } +} + +type simulation struct { + mu sync.RWMutex + nodes map[NodeID]*Network + nodectr uint32 +} + +func newSimulation() *simulation { + return &simulation{nodes: make(map[NodeID]*Network)} +} + +func (s *simulation) shutdown() { + s.mu.RLock() + alive := make([]*Network, 0, len(s.nodes)) + for _, n := range s.nodes { + alive = append(alive, n) + } + defer s.mu.RUnlock() + + for _, n := range alive { + n.Close() + } +} + +func (s *simulation) printStats() { + s.mu.Lock() + defer s.mu.Unlock() + fmt.Println("node counter:", s.nodectr) + fmt.Println("alive nodes:", len(s.nodes)) + + // for _, n := range s.nodes { + // fmt.Printf("%x\n", n.tab.self.ID[:8]) + // transport := n.conn.(*simTransport) + // fmt.Println(" joined:", transport.joinTime) + // fmt.Println(" sends:", transport.hashctr) + // fmt.Println(" table size:", n.tab.count) + // } + + /*for _, n := range s.nodes { + fmt.Println() + fmt.Printf("*** Node %x\n", n.tab.self.ID[:8]) + n.log.printLogs() + }*/ + +} + +func (s *simulation) randomNode() *Network { + s.mu.Lock() + defer s.mu.Unlock() + + n := rand.Intn(len(s.nodes)) + for _, net := range s.nodes { + if n == 0 { + return net + } + n-- + } + return nil +} + +func (s *simulation) launchNode(log bool) *Network { + var ( + num = s.nodectr + key = newkey() + id = PubkeyID(&key.PublicKey) + ip = make(net.IP, 4) + ) + s.nodectr++ + binary.BigEndian.PutUint32(ip, num) + ip[0] = 10 + addr := &net.UDPAddr{IP: ip, Port: 30303} + + transport := &simTransport{joinTime: time.Now(), sender: id, senderAddr: addr, sim: s, priv: key} + net, err := newNetwork(transport, key.PublicKey, nil, "<no database>") + if err != nil { + panic("cannot launch new node: " + err.Error()) + } + + s.mu.Lock() + s.nodes[id] = net + s.mu.Unlock() + + return net +} + +func (s *simulation) dropNode(id NodeID) { + s.mu.Lock() + n := s.nodes[id] + delete(s.nodes, id) + s.mu.Unlock() + + n.Close() +} + +type simTransport struct { + joinTime time.Time + sender NodeID + senderAddr *net.UDPAddr + sim *simulation + hashctr uint64 + priv *ecdsa.PrivateKey +} + +func (st *simTransport) localAddr() *net.UDPAddr { + return st.senderAddr +} + +func (st *simTransport) Close() {} + +func (st *simTransport) send(remote *Node, ptype nodeEvent, data interface{}) (hash []byte) { + hash = st.nextHash() + var raw []byte + if ptype == pongPacket { + var err error + raw, _, err = encodePacket(st.priv, byte(ptype), data) + if err != nil { + panic(err) + } + } + + st.sendPacket(remote.ID, ingressPacket{ + remoteID: st.sender, + remoteAddr: st.senderAddr, + hash: hash, + ev: ptype, + data: data, + rawData: raw, + }) + return hash +} + +func (st *simTransport) sendPing(remote *Node, remoteAddr *net.UDPAddr, topics []Topic) []byte { + hash := st.nextHash() + st.sendPacket(remote.ID, ingressPacket{ + remoteID: st.sender, + remoteAddr: st.senderAddr, + hash: hash, + ev: pingPacket, + data: &ping{ + Version: 4, + From: rpcEndpoint{IP: st.senderAddr.IP, UDP: uint16(st.senderAddr.Port), TCP: 30303}, + To: rpcEndpoint{IP: remoteAddr.IP, UDP: uint16(remoteAddr.Port), TCP: 30303}, + Expiration: uint64(time.Now().Unix() + int64(expiration)), + Topics: topics, + }, + }) + return hash +} + +func (st *simTransport) sendPong(remote *Node, pingHash []byte) { + raddr := remote.addr() + + st.sendPacket(remote.ID, ingressPacket{ + remoteID: st.sender, + remoteAddr: st.senderAddr, + hash: st.nextHash(), + ev: pongPacket, + data: &pong{ + To: rpcEndpoint{IP: raddr.IP, UDP: uint16(raddr.Port), TCP: 30303}, + ReplyTok: pingHash, + Expiration: uint64(time.Now().Unix() + int64(expiration)), + }, + }) +} + +func (st *simTransport) sendFindnodeHash(remote *Node, target common.Hash) { + st.sendPacket(remote.ID, ingressPacket{ + remoteID: st.sender, + remoteAddr: st.senderAddr, + hash: st.nextHash(), + ev: findnodeHashPacket, + data: &findnodeHash{ + Target: target, + Expiration: uint64(time.Now().Unix() + int64(expiration)), + }, + }) +} + +func (st *simTransport) sendTopicRegister(remote *Node, topics []Topic, idx int, pong []byte) { + //fmt.Println("send", topics, pong) + st.sendPacket(remote.ID, ingressPacket{ + remoteID: st.sender, + remoteAddr: st.senderAddr, + hash: st.nextHash(), + ev: topicRegisterPacket, + data: &topicRegister{ + Topics: topics, + Idx: uint(idx), + Pong: pong, + }, + }) +} + +func (st *simTransport) sendTopicNodes(remote *Node, queryHash common.Hash, nodes []*Node) { + rnodes := make([]rpcNode, len(nodes)) + for i := range nodes { + rnodes[i] = nodeToRPC(nodes[i]) + } + st.sendPacket(remote.ID, ingressPacket{ + remoteID: st.sender, + remoteAddr: st.senderAddr, + hash: st.nextHash(), + ev: topicNodesPacket, + data: &topicNodes{Echo: queryHash, Nodes: rnodes}, + }) +} + +func (st *simTransport) sendNeighbours(remote *Node, nodes []*Node) { + // TODO: send multiple packets + rnodes := make([]rpcNode, len(nodes)) + for i := range nodes { + rnodes[i] = nodeToRPC(nodes[i]) + } + st.sendPacket(remote.ID, ingressPacket{ + remoteID: st.sender, + remoteAddr: st.senderAddr, + hash: st.nextHash(), + ev: neighborsPacket, + data: &neighbors{ + Nodes: rnodes, + Expiration: uint64(time.Now().Unix() + int64(expiration)), + }, + }) +} + +func (st *simTransport) nextHash() []byte { + v := atomic.AddUint64(&st.hashctr, 1) + var hash common.Hash + binary.BigEndian.PutUint64(hash[:], v) + return hash[:] +} + +const packetLoss = 0 // 1/1000 + +func (st *simTransport) sendPacket(remote NodeID, p ingressPacket) { + if rand.Int31n(1000) >= packetLoss { + st.sim.mu.RLock() + recipient := st.sim.nodes[remote] + st.sim.mu.RUnlock() + + time.AfterFunc(200*time.Millisecond, func() { + recipient.reqReadPacket(p) + }) + } +} |