aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--p2p/discover/database.go96
-rw-r--r--p2p/discover/database_test.go103
-rw-r--r--p2p/discover/table.go168
-rw-r--r--p2p/discover/table_test.go3
-rw-r--r--p2p/discover/udp.go11
-rw-r--r--p2p/discover/udp_test.go1
-rw-r--r--xeth/xeth.go46
7 files changed, 233 insertions, 195 deletions
diff --git a/p2p/discover/database.go b/p2p/discover/database.go
index d5c594364..e8e3371ff 100644
--- a/p2p/discover/database.go
+++ b/p2p/discover/database.go
@@ -21,6 +21,7 @@ package discover
import (
"bytes"
+ "crypto/rand"
"encoding/binary"
"os"
"sync"
@@ -46,11 +47,8 @@ var (
// nodeDB stores all nodes we know about.
type nodeDB struct {
- lvl *leveldb.DB // Interface to the database itself
- seeder iterator.Iterator // Iterator for fetching possible seed nodes
-
- self NodeID // Own node id to prevent adding it into the database
-
+ lvl *leveldb.DB // Interface to the database itself
+ self NodeID // Own node id to prevent adding it into the database
runner sync.Once // Ensures we can start at most one expirer
quit chan struct{} // Channel to signal the expiring thread to stop
}
@@ -302,52 +300,70 @@ func (db *nodeDB) updateFindFails(id NodeID, fails int) error {
return db.storeInt64(makeKey(id, nodeDBDiscoverFindFails), int64(fails))
}
-// querySeeds retrieves a batch of nodes to be used as potential seed servers
-// during bootstrapping the node into the network.
-//
-// Ideal seeds are the most recently seen nodes (highest probability to be still
-// alive), but yet untried. However, since leveldb only supports dumb iteration
-// we will instead start pulling in potential seeds that haven't been yet pinged
-// since the start of the boot procedure.
-//
-// If the database runs out of potential seeds, we restart the startup counter
-// and start iterating over the peers again.
-func (db *nodeDB) querySeeds(n int) []*Node {
- // Create a new seed iterator if none exists
- if db.seeder == nil {
- db.seeder = db.lvl.NewIterator(nil, nil)
+// querySeeds retrieves random nodes to be used as potential seed nodes
+// for bootstrapping.
+func (db *nodeDB) querySeeds(n int, maxAge time.Duration) []*Node {
+ var (
+ now = time.Now()
+ nodes = make([]*Node, 0, n)
+ it = db.lvl.NewIterator(nil, nil)
+ id NodeID
+ )
+ defer it.Release()
+
+seek:
+ for seeks := 0; len(nodes) < n && seeks < n*5; seeks++ {
+ // Seek to a random entry. The first byte is incremented by a
+ // random amount each time in order to increase the likelihood
+ // of hitting all existing nodes in very small databases.
+ ctr := id[0]
+ rand.Read(id[:])
+ id[0] = ctr + id[0]%16
+ it.Seek(makeKey(id, nodeDBDiscoverRoot))
+
+ n := nextNode(it)
+ if n == nil {
+ id[0] = 0
+ continue seek // iterator exhausted
+ }
+ if n.ID == db.self {
+ continue seek
+ }
+ if now.Sub(db.lastPong(n.ID)) > maxAge {
+ continue seek
+ }
+ for i := range nodes {
+ if nodes[i].ID == n.ID {
+ continue seek // duplicate
+ }
+ }
+ nodes = append(nodes, n)
}
- // Iterate over the nodes and find suitable seeds
- nodes := make([]*Node, 0, n)
- for len(nodes) < n && db.seeder.Next() {
- // Iterate until a discovery node is found
- id, field := splitKey(db.seeder.Key())
+ return nodes
+}
+
+// reads the next node record from the iterator, skipping over other
+// database entries.
+func nextNode(it iterator.Iterator) *Node {
+ for end := false; !end; end = !it.Next() {
+ id, field := splitKey(it.Key())
if field != nodeDBDiscoverRoot {
continue
}
- // Dump it if its a self reference
- if bytes.Compare(id[:], db.self[:]) == 0 {
- db.deleteNode(id)
+ var n Node
+ if err := rlp.DecodeBytes(it.Value(), &n); err != nil {
+ if glog.V(logger.Warn) {
+ glog.Errorf("invalid node %x: %v", id, err)
+ }
continue
}
- // Load it as a potential seed
- if node := db.node(id); node != nil {
- nodes = append(nodes, node)
- }
- }
- // Release the iterator if we reached the end
- if len(nodes) == 0 {
- db.seeder.Release()
- db.seeder = nil
+ return &n
}
- return nodes
+ return nil
}
// close flushes and closes the database files.
func (db *nodeDB) close() {
- if db.seeder != nil {
- db.seeder.Release()
- }
close(db.quit)
db.lvl.Close()
}
diff --git a/p2p/discover/database_test.go b/p2p/discover/database_test.go
index 569585903..80c1a6ff2 100644
--- a/p2p/discover/database_test.go
+++ b/p2p/discover/database_test.go
@@ -162,9 +162,33 @@ var nodeDBSeedQueryNodes = []struct {
node *Node
pong time.Time
}{
+ // This one should not be in the result set because its last
+ // pong time is too far in the past.
{
node: newNode(
- MustHexID("0x01d9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"),
+ MustHexID("0x84d9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"),
+ net.IP{127, 0, 0, 3},
+ 30303,
+ 30303,
+ ),
+ pong: time.Now().Add(-3 * time.Hour),
+ },
+ // This one shouldn't be in in the result set because its
+ // nodeID is the local node's ID.
+ {
+ node: newNode(
+ MustHexID("0x57d9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"),
+ net.IP{127, 0, 0, 3},
+ 30303,
+ 30303,
+ ),
+ pong: time.Now().Add(-4 * time.Second),
+ },
+
+ // These should be in the result set.
+ {
+ node: newNode(
+ MustHexID("0x22d9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"),
net.IP{127, 0, 0, 1},
30303,
30303,
@@ -173,7 +197,7 @@ var nodeDBSeedQueryNodes = []struct {
},
{
node: newNode(
- MustHexID("0x02d9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"),
+ MustHexID("0x44d9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"),
net.IP{127, 0, 0, 2},
30303,
30303,
@@ -182,7 +206,7 @@ var nodeDBSeedQueryNodes = []struct {
},
{
node: newNode(
- MustHexID("0x03d9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"),
+ MustHexID("0xe2d9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"),
net.IP{127, 0, 0, 3},
30303,
30303,
@@ -192,7 +216,7 @@ var nodeDBSeedQueryNodes = []struct {
}
func TestNodeDBSeedQuery(t *testing.T) {
- db, _ := newNodeDB("", Version, NodeID{})
+ db, _ := newNodeDB("", Version, nodeDBSeedQueryNodes[1].node.ID)
defer db.close()
// Insert a batch of nodes for querying
@@ -200,20 +224,24 @@ func TestNodeDBSeedQuery(t *testing.T) {
if err := db.updateNode(seed.node); err != nil {
t.Fatalf("node %d: failed to insert: %v", i, err)
}
+ if err := db.updateLastPong(seed.node.ID, seed.pong); err != nil {
+ t.Fatalf("node %d: failed to insert lastPong: %v", i, err)
+ }
}
+
// Retrieve the entire batch and check for duplicates
- seeds := db.querySeeds(2 * len(nodeDBSeedQueryNodes))
- if len(seeds) != len(nodeDBSeedQueryNodes) {
- t.Errorf("seed count mismatch: have %v, want %v", len(seeds), len(nodeDBSeedQueryNodes))
- }
+ seeds := db.querySeeds(len(nodeDBSeedQueryNodes)*2, time.Hour)
have := make(map[NodeID]struct{})
for _, seed := range seeds {
have[seed.ID] = struct{}{}
}
want := make(map[NodeID]struct{})
- for _, seed := range nodeDBSeedQueryNodes {
+ for _, seed := range nodeDBSeedQueryNodes[2:] {
want[seed.node.ID] = struct{}{}
}
+ if len(seeds) != len(want) {
+ t.Errorf("seed count mismatch: have %v, want %v", len(seeds), len(want))
+ }
for id, _ := range have {
if _, ok := want[id]; !ok {
t.Errorf("extra seed: %v", id)
@@ -224,63 +252,6 @@ func TestNodeDBSeedQuery(t *testing.T) {
t.Errorf("missing seed: %v", id)
}
}
- // Make sure the next batch is empty (seed EOF)
- seeds = db.querySeeds(2 * len(nodeDBSeedQueryNodes))
- if len(seeds) != 0 {
- t.Errorf("seed count mismatch: have %v, want %v", len(seeds), 0)
- }
-}
-
-func TestNodeDBSeedQueryContinuation(t *testing.T) {
- db, _ := newNodeDB("", Version, NodeID{})
- defer db.close()
-
- // Insert a batch of nodes for querying
- for i, seed := range nodeDBSeedQueryNodes {
- if err := db.updateNode(seed.node); err != nil {
- t.Fatalf("node %d: failed to insert: %v", i, err)
- }
- }
- // Iteratively retrieve the batch, checking for an empty batch on reset
- for i := 0; i < len(nodeDBSeedQueryNodes); i++ {
- if seeds := db.querySeeds(1); len(seeds) != 1 {
- t.Errorf("1st iteration %d: seed count mismatch: have %v, want %v", i, len(seeds), 1)
- }
- }
- if seeds := db.querySeeds(1); len(seeds) != 0 {
- t.Errorf("reset: seed count mismatch: have %v, want %v", len(seeds), 0)
- }
- for i := 0; i < len(nodeDBSeedQueryNodes); i++ {
- if seeds := db.querySeeds(1); len(seeds) != 1 {
- t.Errorf("2nd iteration %d: seed count mismatch: have %v, want %v", i, len(seeds), 1)
- }
- }
-}
-
-func TestNodeDBSelfSeedQuery(t *testing.T) {
- // Assign a node as self to verify evacuation
- self := nodeDBSeedQueryNodes[0].node.ID
- db, _ := newNodeDB("", Version, self)
- defer db.close()
-
- // Insert a batch of nodes for querying
- for i, seed := range nodeDBSeedQueryNodes {
- if err := db.updateNode(seed.node); err != nil {
- t.Fatalf("node %d: failed to insert: %v", i, err)
- }
- }
- // Retrieve the entire batch and check that self was evacuated
- seeds := db.querySeeds(2 * len(nodeDBSeedQueryNodes))
- if len(seeds) != len(nodeDBSeedQueryNodes)-1 {
- t.Errorf("seed count mismatch: have %v, want %v", len(seeds), len(nodeDBSeedQueryNodes)-1)
- }
- have := make(map[NodeID]struct{})
- for _, seed := range seeds {
- have[seed.ID] = struct{}{}
- }
- if _, ok := have[self]; ok {
- t.Errorf("self not evacuated")
- }
}
func TestNodeDBPersistency(t *testing.T) {
diff --git a/p2p/discover/table.go b/p2p/discover/table.go
index 972bc1077..c128c2ed1 100644
--- a/p2p/discover/table.go
+++ b/p2p/discover/table.go
@@ -44,6 +44,10 @@ const (
maxBondingPingPongs = 16
maxFindnodeFailures = 5
+
+ autoRefreshInterval = 1 * time.Hour
+ seedCount = 30
+ seedMaxAge = 5 * 24 * time.Hour
)
type Table struct {
@@ -52,6 +56,10 @@ type Table struct {
nursery []*Node // bootstrap nodes
db *nodeDB // database of known nodes
+ refreshReq chan struct{}
+ closeReq chan struct{}
+ closed chan struct{}
+
bondmu sync.Mutex
bonding map[NodeID]*bondproc
bondslots chan struct{} // limits total number of active bonding processes
@@ -80,10 +88,7 @@ type transport interface {
// bucket contains nodes, ordered by their last activity. the entry
// that was most recently active is the first element in entries.
-type bucket struct {
- lastLookup time.Time
- entries []*Node
-}
+type bucket struct{ entries []*Node }
func newTable(t transport, ourID NodeID, ourAddr *net.UDPAddr, nodeDBPath string) *Table {
// If no node database was given, use an in-memory one
@@ -93,11 +98,14 @@ func newTable(t transport, ourID NodeID, ourAddr *net.UDPAddr, nodeDBPath string
db, _ = newNodeDB("", Version, ourID)
}
tab := &Table{
- net: t,
- db: db,
- self: newNode(ourID, ourAddr.IP, uint16(ourAddr.Port), uint16(ourAddr.Port)),
- bonding: make(map[NodeID]*bondproc),
- bondslots: make(chan struct{}, maxBondingPingPongs),
+ net: t,
+ db: db,
+ self: newNode(ourID, ourAddr.IP, uint16(ourAddr.Port), uint16(ourAddr.Port)),
+ bonding: make(map[NodeID]*bondproc),
+ bondslots: make(chan struct{}, maxBondingPingPongs),
+ refreshReq: make(chan struct{}),
+ closeReq: make(chan struct{}),
+ closed: make(chan struct{}),
}
for i := 0; i < cap(tab.bondslots); i++ {
tab.bondslots <- struct{}{}
@@ -105,6 +113,7 @@ func newTable(t transport, ourID NodeID, ourAddr *net.UDPAddr, nodeDBPath string
for i := range tab.buckets {
tab.buckets[i] = new(bucket)
}
+ go tab.refreshLoop()
return tab
}
@@ -163,10 +172,12 @@ func randUint(max uint32) uint32 {
// Close terminates the network listener and flushes the node database.
func (tab *Table) Close() {
- if tab.net != nil {
- tab.net.close()
+ select {
+ case <-tab.closed:
+ // already closed.
+ case tab.closeReq <- struct{}{}:
+ <-tab.closed // wait for refreshLoop to end.
}
- tab.db.close()
}
// Bootstrap sets the bootstrap nodes. These nodes are used to connect
@@ -183,7 +194,7 @@ func (tab *Table) Bootstrap(nodes []*Node) {
tab.nursery = append(tab.nursery, &cpy)
}
tab.mutex.Unlock()
- tab.refresh()
+ tab.requestRefresh()
}
// Lookup performs a network search for nodes close
@@ -204,15 +215,13 @@ func (tab *Table) Lookup(targetID NodeID) []*Node {
asked[tab.self.ID] = true
tab.mutex.Lock()
- // update last lookup stamp (for refresh logic)
- tab.buckets[logdist(tab.self.sha, target)].lastLookup = time.Now()
// generate initial result set
result := tab.closest(target, bucketSize)
tab.mutex.Unlock()
- // If the result set is empty, all nodes were dropped, refresh
+ // If the result set is empty, all nodes were dropped, refresh.
if len(result.entries) == 0 {
- tab.refresh()
+ tab.requestRefresh()
return nil
}
@@ -257,56 +266,86 @@ func (tab *Table) Lookup(targetID NodeID) []*Node {
return result.entries
}
-// refresh performs a lookup for a random target to keep buckets full, or seeds
-// the table if it is empty (initial bootstrap or discarded faulty peers).
-func (tab *Table) refresh() {
- seed := true
+func (tab *Table) requestRefresh() {
+ select {
+ case tab.refreshReq <- struct{}{}:
+ case <-tab.closed:
+ }
+}
- // If the discovery table is empty, seed with previously known nodes
- tab.mutex.Lock()
- for _, bucket := range tab.buckets {
- if len(bucket.entries) > 0 {
- seed = false
- break
+func (tab *Table) refreshLoop() {
+ defer func() {
+ tab.db.close()
+ if tab.net != nil {
+ tab.net.close()
}
- }
- tab.mutex.Unlock()
+ close(tab.closed)
+ }()
- // If the table is not empty, try to refresh using the live entries
- if !seed {
- // The Kademlia paper specifies that the bucket refresh should
- // perform a refresh in the least recently used bucket. We cannot
- // adhere to this because the findnode target is a 512bit value
- // (not hash-sized) and it is not easily possible to generate a
- // sha3 preimage that falls into a chosen bucket.
- //
- // We perform a lookup with a random target instead.
- var target NodeID
- rand.Read(target[:])
-
- result := tab.Lookup(target)
- if len(result) == 0 {
- // Lookup failed, seed after all
- seed = true
+ timer := time.NewTicker(autoRefreshInterval)
+ var done chan struct{}
+ for {
+ select {
+ case <-timer.C:
+ if done == nil {
+ done = make(chan struct{})
+ go tab.doRefresh(done)
+ }
+ case <-tab.refreshReq:
+ if done == nil {
+ done = make(chan struct{})
+ go tab.doRefresh(done)
+ }
+ case <-done:
+ done = nil
+ case <-tab.closeReq:
+ if done != nil {
+ <-done
+ }
+ return
}
}
+}
- if seed {
- // Pick a batch of previously know seeds to lookup with
- seeds := tab.db.querySeeds(10)
- for _, seed := range seeds {
- glog.V(logger.Debug).Infoln("Seeding network with", seed)
- }
- nodes := append(tab.nursery, seeds...)
+// doRefresh performs a lookup for a random target to keep buckets
+// full. seed nodes are inserted if the table is empty (initial
+// bootstrap or discarded faulty peers).
+func (tab *Table) doRefresh(done chan struct{}) {
+ defer close(done)
+
+ // The Kademlia paper specifies that the bucket refresh should
+ // perform a lookup in the least recently used bucket. We cannot
+ // adhere to this because the findnode target is a 512bit value
+ // (not hash-sized) and it is not easily possible to generate a
+ // sha3 preimage that falls into a chosen bucket.
+ // We perform a lookup with a random target instead.
+ var target NodeID
+ rand.Read(target[:])
+ result := tab.Lookup(target)
+ if len(result) > 0 {
+ return
+ }
- // Bond with all the seed nodes (will pingpong only if failed recently)
- bonded := tab.bondall(nodes)
- if len(bonded) > 0 {
- tab.Lookup(tab.self.ID)
+ // The table is empty. Load nodes from the database and insert
+ // them. This should yield a few previously seen nodes that are
+ // (hopefully) still alive.
+ seeds := tab.db.querySeeds(seedCount, seedMaxAge)
+ seeds = tab.bondall(append(seeds, tab.nursery...))
+ if glog.V(logger.Debug) {
+ if len(seeds) == 0 {
+ glog.Infof("no seed nodes found")
+ }
+ for _, n := range seeds {
+ age := time.Since(tab.db.lastPong(n.ID))
+ glog.Infof("seed node (age %v): %v", age, n)
}
- // TODO: the Kademlia paper says that we're supposed to perform
- // random lookups in all buckets further away than our closest neighbor.
}
+ tab.mutex.Lock()
+ tab.stuff(seeds)
+ tab.mutex.Unlock()
+
+ // Finally, do a self lookup to fill up the buckets.
+ tab.Lookup(tab.self.ID)
}
// closest returns the n nodes in the table that are closest to the
@@ -373,8 +412,9 @@ func (tab *Table) bond(pinged bool, id NodeID, addr *net.UDPAddr, tcpPort uint16
}
// If the node is unknown (non-bonded) or failed (remotely unknown), bond from scratch
var result error
- if node == nil || fails > 0 {
- glog.V(logger.Detail).Infof("Bonding %x: known=%v, fails=%v", id[:8], node != nil, fails)
+ age := time.Since(tab.db.lastPong(id))
+ if node == nil || fails > 0 || age > nodeDBNodeExpiration {
+ glog.V(logger.Detail).Infof("Bonding %x: known=%t, fails=%d age=%v", id[:8], node != nil, fails, age)
tab.bondmu.Lock()
w := tab.bonding[id]
@@ -435,13 +475,17 @@ func (tab *Table) pingpong(w *bondproc, pinged bool, id NodeID, addr *net.UDPAdd
// ping a remote endpoint and wait for a reply, also updating the node
// database accordingly.
func (tab *Table) ping(id NodeID, addr *net.UDPAddr) error {
- // Update the last ping and send the message
tab.db.updateLastPing(id, time.Now())
if err := tab.net.ping(id, addr); err != nil {
return err
}
- // Pong received, update the database and return
tab.db.updateLastPong(id, time.Now())
+
+ // Start the background expiration goroutine after the first
+ // successful communication. Subsequent calls have no effect if it
+ // is already running. We do this here instead of somewhere else
+ // so that the search for seed nodes also considers older nodes
+ // that would otherwise be removed by the expiration.
tab.db.ensureExpirer()
return nil
}
diff --git a/p2p/discover/table_test.go b/p2p/discover/table_test.go
index 426f4e9cc..84962a1a5 100644
--- a/p2p/discover/table_test.go
+++ b/p2p/discover/table_test.go
@@ -514,9 +514,6 @@ func (tn *preminedTestnet) findnode(toid NodeID, toaddr *net.UDPAddr, target Nod
if toaddr.Port == 0 {
panic("query to node at distance 0")
}
- if target != tn.target {
- panic("findnode with wrong target")
- }
next := uint16(toaddr.Port) - 1
var result []*Node
for i, id := range tn.dists[toaddr.Port] {
diff --git a/p2p/discover/udp.go b/p2p/discover/udp.go
index afb31ee69..20f69cf08 100644
--- a/p2p/discover/udp.go
+++ b/p2p/discover/udp.go
@@ -39,7 +39,6 @@ var (
errPacketTooSmall = errors.New("too small")
errBadHash = errors.New("bad hash")
errExpired = errors.New("expired")
- errBadVersion = errors.New("version mismatch")
errUnsolicitedReply = errors.New("unsolicited reply")
errUnknownNode = errors.New("unknown node")
errTimeout = errors.New("RPC timeout")
@@ -52,8 +51,6 @@ const (
respTimeout = 500 * time.Millisecond
sendTimeout = 500 * time.Millisecond
expiration = 20 * time.Second
-
- refreshInterval = 1 * time.Hour
)
// RPC packet types
@@ -312,10 +309,8 @@ func (t *udp) loop() {
plist = list.New()
timeout = time.NewTimer(0)
nextTimeout *pending // head of plist when timeout was last reset
- refresh = time.NewTicker(refreshInterval)
)
<-timeout.C // ignore first timeout
- defer refresh.Stop()
defer timeout.Stop()
resetTimeout := func() {
@@ -344,9 +339,6 @@ func (t *udp) loop() {
resetTimeout()
select {
- case <-refresh.C:
- go t.refresh()
-
case <-t.closing:
for el := plist.Front(); el != nil; el = el.Next() {
el.Value.(*pending).errc <- errClosed
@@ -529,9 +521,6 @@ func (req *ping) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) er
if expired(req.Expiration) {
return errExpired
}
- if req.Version != Version {
- return errBadVersion
- }
t.send(from, pongPacket, pong{
To: makeEndpoint(from, req.From.TCP),
ReplyTok: mac,
diff --git a/p2p/discover/udp_test.go b/p2p/discover/udp_test.go
index a86d3737b..913199c26 100644
--- a/p2p/discover/udp_test.go
+++ b/p2p/discover/udp_test.go
@@ -122,7 +122,6 @@ func TestUDP_packetErrors(t *testing.T) {
defer test.table.Close()
test.packetIn(errExpired, pingPacket, &ping{From: testRemote, To: testLocalAnnounced, Version: Version})
- test.packetIn(errBadVersion, pingPacket, &ping{From: testRemote, To: testLocalAnnounced, Version: 99, Expiration: futureExp})
test.packetIn(errUnsolicitedReply, pongPacket, &pong{ReplyTok: []byte{}, Expiration: futureExp})
test.packetIn(errUnknownNode, findnodePacket, &findnode{Expiration: futureExp})
test.packetIn(errUnsolicitedReply, neighborsPacket, &neighbors{Expiration: futureExp})
diff --git a/xeth/xeth.go b/xeth/xeth.go
index 00b70da6c..623b3a963 100644
--- a/xeth/xeth.go
+++ b/xeth/xeth.go
@@ -532,8 +532,10 @@ func (self *XEth) NewLogFilter(earliest, latest int64, skip, max int, address []
self.logMu.Lock()
defer self.logMu.Unlock()
- var id int
filter := core.NewFilter(self.backend)
+ id := self.filterManager.InstallFilter(filter)
+ self.logQueue[id] = &logQueue{timeout: time.Now()}
+
filter.SetEarliestBlock(earliest)
filter.SetLatestBlock(latest)
filter.SetSkip(skip)
@@ -544,10 +546,10 @@ func (self *XEth) NewLogFilter(earliest, latest int64, skip, max int, address []
self.logMu.Lock()
defer self.logMu.Unlock()
- self.logQueue[id].add(logs...)
+ if queue := self.logQueue[id]; queue != nil {
+ queue.add(logs...)
+ }
}
- id = self.filterManager.InstallFilter(filter)
- self.logQueue[id] = &logQueue{timeout: time.Now()}
return id
}
@@ -556,16 +558,18 @@ func (self *XEth) NewTransactionFilter() int {
self.transactionMu.Lock()
defer self.transactionMu.Unlock()
- var id int
filter := core.NewFilter(self.backend)
+ id := self.filterManager.InstallFilter(filter)
+ self.transactionQueue[id] = &hashQueue{timeout: time.Now()}
+
filter.TransactionCallback = func(tx *types.Transaction) {
self.transactionMu.Lock()
defer self.transactionMu.Unlock()
- self.transactionQueue[id].add(tx.Hash())
+ if queue := self.transactionQueue[id]; queue != nil {
+ queue.add(tx.Hash())
+ }
}
- id = self.filterManager.InstallFilter(filter)
- self.transactionQueue[id] = &hashQueue{timeout: time.Now()}
return id
}
@@ -573,16 +577,18 @@ func (self *XEth) NewBlockFilter() int {
self.blockMu.Lock()
defer self.blockMu.Unlock()
- var id int
filter := core.NewFilter(self.backend)
+ id := self.filterManager.InstallFilter(filter)
+ self.blockQueue[id] = &hashQueue{timeout: time.Now()}
+
filter.BlockCallback = func(block *types.Block, logs state.Logs) {
self.blockMu.Lock()
defer self.blockMu.Unlock()
- self.blockQueue[id].add(block.Hash())
+ if queue := self.blockQueue[id]; queue != nil {
+ queue.add(block.Hash())
+ }
}
- id = self.filterManager.InstallFilter(filter)
- self.blockQueue[id] = &hashQueue{timeout: time.Now()}
return id
}
@@ -1022,16 +1028,24 @@ func (m callmsg) Value() *big.Int { return m.value }
func (m callmsg) Data() []byte { return m.data }
type logQueue struct {
+ mu sync.Mutex
+
logs state.Logs
timeout time.Time
id int
}
func (l *logQueue) add(logs ...*state.Log) {
+ l.mu.Lock()
+ defer l.mu.Unlock()
+
l.logs = append(l.logs, logs...)
}
func (l *logQueue) get() state.Logs {
+ l.mu.Lock()
+ defer l.mu.Unlock()
+
l.timeout = time.Now()
tmp := l.logs
l.logs = nil
@@ -1039,16 +1053,24 @@ func (l *logQueue) get() state.Logs {
}
type hashQueue struct {
+ mu sync.Mutex
+
hashes []common.Hash
timeout time.Time
id int
}
func (l *hashQueue) add(hashes ...common.Hash) {
+ l.mu.Lock()
+ defer l.mu.Unlock()
+
l.hashes = append(l.hashes, hashes...)
}
func (l *hashQueue) get() []common.Hash {
+ l.mu.Lock()
+ defer l.mu.Unlock()
+
l.timeout = time.Now()
tmp := l.hashes
l.hashes = nil