aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/network
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/network')
-rw-r--r--swarm/network/discovery.go76
-rw-r--r--swarm/network/discovery_test.go2
-rw-r--r--swarm/network/fetcher.go305
-rw-r--r--swarm/network/fetcher_test.go459
-rw-r--r--swarm/network/hive.go71
-rw-r--r--swarm/network/hive_test.go8
-rw-r--r--swarm/network/kademlia.go154
-rw-r--r--swarm/network/kademlia_test.go183
-rw-r--r--swarm/network/networkid_test.go2
-rw-r--r--swarm/network/priorityqueue/priorityqueue.go38
-rw-r--r--swarm/network/priorityqueue/priorityqueue_test.go6
-rw-r--r--swarm/network/protocol.go47
-rw-r--r--swarm/network/simulation/simulation.go16
-rw-r--r--swarm/network/simulations/discovery/discovery_test.go4
-rw-r--r--swarm/network/stream/common_test.go17
-rw-r--r--swarm/network/stream/delivery.go231
-rw-r--r--swarm/network/stream/delivery_test.go93
-rw-r--r--swarm/network/stream/intervals_test.go78
-rw-r--r--swarm/network/stream/messages.go87
-rw-r--r--swarm/network/stream/peer.go51
-rw-r--r--swarm/network/stream/snapshot_retrieval_test.go46
-rw-r--r--swarm/network/stream/snapshot_sync_test.go179
-rw-r--r--swarm/network/stream/stream.go46
-rw-r--r--swarm/network/stream/streamer_test.go8
-rw-r--r--swarm/network/stream/syncer.go94
-rw-r--r--swarm/network/stream/syncer_test.go32
26 files changed, 1512 insertions, 821 deletions
diff --git a/swarm/network/discovery.go b/swarm/network/discovery.go
index 55bf7c033..301959480 100644
--- a/swarm/network/discovery.go
+++ b/swarm/network/discovery.go
@@ -26,30 +26,30 @@ import (
// discovery bzz extension for requesting and relaying node address records
-// discPeer wraps BzzPeer and embeds an Overlay connectivity driver
-type discPeer struct {
+// Peer wraps BzzPeer and embeds Kademlia overlay connectivity driver
+type Peer struct {
*BzzPeer
- overlay Overlay
- sentPeers bool // whether we already sent peer closer to this address
- mtx sync.RWMutex
+ kad *Kademlia
+ sentPeers bool // whether we already sent peer closer to this address
+ mtx sync.RWMutex //
peers map[string]bool // tracks node records sent to the peer
depth uint8 // the proximity order advertised by remote as depth of saturation
}
-// NewDiscovery constructs a discovery peer
-func newDiscovery(p *BzzPeer, o Overlay) *discPeer {
- d := &discPeer{
- overlay: o,
+// NewPeer constructs a discovery peer
+func NewPeer(p *BzzPeer, kad *Kademlia) *Peer {
+ d := &Peer{
+ kad: kad,
BzzPeer: p,
peers: make(map[string]bool),
}
// record remote as seen so we never send a peer its own record
- d.seen(d)
+ d.seen(p.BzzAddr)
return d
}
// HandleMsg is the message handler that delegates incoming messages
-func (d *discPeer) HandleMsg(ctx context.Context, msg interface{}) error {
+func (d *Peer) HandleMsg(ctx context.Context, msg interface{}) error {
switch msg := msg.(type) {
case *peersMsg:
@@ -64,24 +64,18 @@ func (d *discPeer) HandleMsg(ctx context.Context, msg interface{}) error {
}
// NotifyDepth sends a message to all connections if depth of saturation is changed
-func NotifyDepth(depth uint8, h Overlay) {
- f := func(val OverlayConn, po int, _ bool) bool {
- dp, ok := val.(*discPeer)
- if ok {
- dp.NotifyDepth(depth)
- }
+func NotifyDepth(depth uint8, kad *Kademlia) {
+ f := func(val *Peer, po int, _ bool) bool {
+ val.NotifyDepth(depth)
return true
}
- h.EachConn(nil, 255, f)
+ kad.EachConn(nil, 255, f)
}
// NotifyPeer informs all peers about a newly added node
-func NotifyPeer(p OverlayAddr, k Overlay) {
- f := func(val OverlayConn, po int, _ bool) bool {
- dp, ok := val.(*discPeer)
- if ok {
- dp.NotifyPeer(p, uint8(po))
- }
+func NotifyPeer(p *BzzAddr, k *Kademlia) {
+ f := func(val *Peer, po int, _ bool) bool {
+ val.NotifyPeer(p, uint8(po))
return true
}
k.EachConn(p.Address(), 255, f)
@@ -91,22 +85,20 @@ func NotifyPeer(p OverlayAddr, k Overlay) {
// the peer's PO is within the recipients advertised depth
// OR the peer is closer to the recipient than self
// unless already notified during the connection session
-func (d *discPeer) NotifyPeer(a OverlayAddr, po uint8) {
+func (d *Peer) NotifyPeer(a *BzzAddr, po uint8) {
// immediately return
if (po < d.getDepth() && pot.ProxCmp(d.localAddr, d, a) != 1) || d.seen(a) {
return
}
- // log.Trace(fmt.Sprintf("%08x peer %08x notified of peer %08x", d.localAddr.Over()[:4], d.Address()[:4], a.Address()[:4]))
resp := &peersMsg{
- Peers: []*BzzAddr{ToAddr(a)},
+ Peers: []*BzzAddr{a},
}
go d.Send(context.TODO(), resp)
}
// NotifyDepth sends a subPeers Msg to the receiver notifying them about
// a change in the depth of saturation
-func (d *discPeer) NotifyDepth(po uint8) {
- // log.Trace(fmt.Sprintf("%08x peer %08x notified of new depth %v", d.localAddr.Over()[:4], d.Address()[:4], po))
+func (d *Peer) NotifyDepth(po uint8) {
go d.Send(context.TODO(), &subPeersMsg{Depth: po})
}
@@ -141,7 +133,7 @@ func (msg peersMsg) String() string {
// handlePeersMsg called by the protocol when receiving peerset (for target address)
// list of nodes ([]PeerAddr in peersMsg) is added to the overlay db using the
// Register interface method
-func (d *discPeer) handlePeersMsg(msg *peersMsg) error {
+func (d *Peer) handlePeersMsg(msg *peersMsg) error {
// register all addresses
if len(msg.Peers) == 0 {
return nil
@@ -149,12 +141,12 @@ func (d *discPeer) handlePeersMsg(msg *peersMsg) error {
for _, a := range msg.Peers {
d.seen(a)
- NotifyPeer(a, d.overlay)
+ NotifyPeer(a, d.kad)
}
- return d.overlay.Register(toOverlayAddrs(msg.Peers...))
+ return d.kad.Register(msg.Peers...)
}
-// subPeers msg is communicating the depth/sharpness/focus of the overlay table of a peer
+// subPeers msg is communicating the depth of the overlay table of a peer
type subPeersMsg struct {
Depth uint8
}
@@ -164,21 +156,20 @@ func (msg subPeersMsg) String() string {
return fmt.Sprintf("%T: request peers > PO%02d. ", msg, msg.Depth)
}
-func (d *discPeer) handleSubPeersMsg(msg *subPeersMsg) error {
+func (d *Peer) handleSubPeersMsg(msg *subPeersMsg) error {
if !d.sentPeers {
d.setDepth(msg.Depth)
var peers []*BzzAddr
- d.overlay.EachConn(d.Over(), 255, func(p OverlayConn, po int, isproxbin bool) bool {
+ d.kad.EachConn(d.Over(), 255, func(p *Peer, po int, isproxbin bool) bool {
if pob, _ := pof(d, d.localAddr, 0); pob > po {
return false
}
- if !d.seen(p) {
- peers = append(peers, ToAddr(p.Off()))
+ if !d.seen(p.BzzAddr) {
+ peers = append(peers, p.BzzAddr)
}
return true
})
if len(peers) > 0 {
- // log.Debug(fmt.Sprintf("%08x: %v peers sent to %v", d.overlay.BaseAddr(), len(peers), d))
go d.Send(context.TODO(), &peersMsg{Peers: peers})
}
}
@@ -186,9 +177,9 @@ func (d *discPeer) handleSubPeersMsg(msg *subPeersMsg) error {
return nil
}
-// seen takes an Overlay peer and checks if it was sent to a peer already
+// seen takes an peer address and checks if it was sent to a peer already
// if not, marks the peer as sent
-func (d *discPeer) seen(p OverlayPeer) bool {
+func (d *Peer) seen(p *BzzAddr) bool {
d.mtx.Lock()
defer d.mtx.Unlock()
k := string(p.Address())
@@ -199,12 +190,13 @@ func (d *discPeer) seen(p OverlayPeer) bool {
return false
}
-func (d *discPeer) getDepth() uint8 {
+func (d *Peer) getDepth() uint8 {
d.mtx.RLock()
defer d.mtx.RUnlock()
return d.depth
}
-func (d *discPeer) setDepth(depth uint8) {
+
+func (d *Peer) setDepth(depth uint8) {
d.mtx.Lock()
defer d.mtx.Unlock()
d.depth = depth
diff --git a/swarm/network/discovery_test.go b/swarm/network/discovery_test.go
index 0427d81ca..494bc8196 100644
--- a/swarm/network/discovery_test.go
+++ b/swarm/network/discovery_test.go
@@ -33,7 +33,7 @@ func TestDiscovery(t *testing.T) {
id := s.IDs[0]
raddr := NewAddrFromNodeID(id)
- pp.Register([]OverlayAddr{OverlayAddr(raddr)})
+ pp.Register(raddr)
// start the hive and wait for the connection
pp.Start(s.Server)
diff --git a/swarm/network/fetcher.go b/swarm/network/fetcher.go
new file mode 100644
index 000000000..35e2f0132
--- /dev/null
+++ b/swarm/network/fetcher.go
@@ -0,0 +1,305 @@
+// Copyright 2018 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package network
+
+import (
+ "context"
+ "sync"
+ "time"
+
+ "github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/p2p/discover"
+ "github.com/ethereum/go-ethereum/swarm/storage"
+)
+
+var searchTimeout = 1 * time.Second
+
+// Time to consider peer to be skipped.
+// Also used in stream delivery.
+var RequestTimeout = 10 * time.Second
+
+type RequestFunc func(context.Context, *Request) (*discover.NodeID, chan struct{}, error)
+
+// Fetcher is created when a chunk is not found locally. It starts a request handler loop once and
+// keeps it alive until all active requests are completed. This can happen:
+// 1. either because the chunk is delivered
+// 2. or becuse the requestor cancelled/timed out
+// Fetcher self destroys itself after it is completed.
+// TODO: cancel all forward requests after termination
+type Fetcher struct {
+ protoRequestFunc RequestFunc // request function fetcher calls to issue retrieve request for a chunk
+ addr storage.Address // the address of the chunk to be fetched
+ offerC chan *discover.NodeID // channel of sources (peer node id strings)
+ requestC chan struct{}
+ skipCheck bool
+}
+
+type Request struct {
+ Addr storage.Address // chunk address
+ Source *discover.NodeID // nodeID of peer to request from (can be nil)
+ SkipCheck bool // whether to offer the chunk first or deliver directly
+ peersToSkip *sync.Map // peers not to request chunk from (only makes sense if source is nil)
+}
+
+// NewRequest returns a new instance of Request based on chunk address skip check and
+// a map of peers to skip.
+func NewRequest(addr storage.Address, skipCheck bool, peersToSkip *sync.Map) *Request {
+ return &Request{
+ Addr: addr,
+ SkipCheck: skipCheck,
+ peersToSkip: peersToSkip,
+ }
+}
+
+// SkipPeer returns if the peer with nodeID should not be requested to deliver a chunk.
+// Peers to skip are kept per Request and for a time period of RequestTimeout.
+// This function is used in stream package in Delivery.RequestFromPeers to optimize
+// requests for chunks.
+func (r *Request) SkipPeer(nodeID string) bool {
+ val, ok := r.peersToSkip.Load(nodeID)
+ if !ok {
+ return false
+ }
+ t, ok := val.(time.Time)
+ if ok && time.Now().After(t.Add(RequestTimeout)) {
+ // deadine expired
+ r.peersToSkip.Delete(nodeID)
+ return false
+ }
+ return true
+}
+
+// FetcherFactory is initialised with a request function and can create fetchers
+type FetcherFactory struct {
+ request RequestFunc
+ skipCheck bool
+}
+
+// NewFetcherFactory takes a request function and skip check parameter and creates a FetcherFactory
+func NewFetcherFactory(request RequestFunc, skipCheck bool) *FetcherFactory {
+ return &FetcherFactory{
+ request: request,
+ skipCheck: skipCheck,
+ }
+}
+
+// New contructs a new Fetcher, for the given chunk. All peers in peersToSkip are not requested to
+// deliver the given chunk. peersToSkip should always contain the peers which are actively requesting
+// this chunk, to make sure we don't request back the chunks from them.
+// The created Fetcher is started and returned.
+func (f *FetcherFactory) New(ctx context.Context, source storage.Address, peersToSkip *sync.Map) storage.NetFetcher {
+ fetcher := NewFetcher(source, f.request, f.skipCheck)
+ go fetcher.run(ctx, peersToSkip)
+ return fetcher
+}
+
+// NewFetcher creates a new Fetcher for the given chunk address using the given request function.
+func NewFetcher(addr storage.Address, rf RequestFunc, skipCheck bool) *Fetcher {
+ return &Fetcher{
+ addr: addr,
+ protoRequestFunc: rf,
+ offerC: make(chan *discover.NodeID),
+ requestC: make(chan struct{}),
+ skipCheck: skipCheck,
+ }
+}
+
+// Offer is called when an upstream peer offers the chunk via syncing as part of `OfferedHashesMsg` and the node does not have the chunk locally.
+func (f *Fetcher) Offer(ctx context.Context, source *discover.NodeID) {
+ // First we need to have this select to make sure that we return if context is done
+ select {
+ case <-ctx.Done():
+ return
+ default:
+ }
+
+ // This select alone would not guarantee that we return of context is done, it could potentially
+ // push to offerC instead if offerC is available (see number 2 in https://golang.org/ref/spec#Select_statements)
+ select {
+ case f.offerC <- source:
+ case <-ctx.Done():
+ }
+}
+
+// Request is called when an upstream peer request the chunk as part of `RetrieveRequestMsg`, or from a local request through FileStore, and the node does not have the chunk locally.
+func (f *Fetcher) Request(ctx context.Context) {
+ // First we need to have this select to make sure that we return if context is done
+ select {
+ case <-ctx.Done():
+ return
+ default:
+ }
+
+ // This select alone would not guarantee that we return of context is done, it could potentially
+ // push to offerC instead if offerC is available (see number 2 in https://golang.org/ref/spec#Select_statements)
+ select {
+ case f.requestC <- struct{}{}:
+ case <-ctx.Done():
+ }
+}
+
+// start prepares the Fetcher
+// it keeps the Fetcher alive within the lifecycle of the passed context
+func (f *Fetcher) run(ctx context.Context, peers *sync.Map) {
+ var (
+ doRequest bool // determines if retrieval is initiated in the current iteration
+ wait *time.Timer // timer for search timeout
+ waitC <-chan time.Time // timer channel
+ sources []*discover.NodeID // known sources, ie. peers that offered the chunk
+ requested bool // true if the chunk was actually requested
+ )
+ gone := make(chan *discover.NodeID) // channel to signal that a peer we requested from disconnected
+
+ // loop that keeps the fetching process alive
+ // after every request a timer is set. If this goes off we request again from another peer
+ // note that the previous request is still alive and has the chance to deliver, so
+ // rerequesting extends the search. ie.,
+ // if a peer we requested from is gone we issue a new request, so the number of active
+ // requests never decreases
+ for {
+ select {
+
+ // incoming offer
+ case source := <-f.offerC:
+ log.Trace("new source", "peer addr", source, "request addr", f.addr)
+ // 1) the chunk is offered by a syncing peer
+ // add to known sources
+ sources = append(sources, source)
+ // launch a request to the source iff the chunk was requested (not just expected because its offered by a syncing peer)
+ doRequest = requested
+
+ // incoming request
+ case <-f.requestC:
+ log.Trace("new request", "request addr", f.addr)
+ // 2) chunk is requested, set requested flag
+ // launch a request iff none been launched yet
+ doRequest = !requested
+ requested = true
+
+ // peer we requested from is gone. fall back to another
+ // and remove the peer from the peers map
+ case id := <-gone:
+ log.Trace("peer gone", "peer id", id.String(), "request addr", f.addr)
+ peers.Delete(id.String())
+ doRequest = requested
+
+ // search timeout: too much time passed since the last request,
+ // extend the search to a new peer if we can find one
+ case <-waitC:
+ log.Trace("search timed out: rerequesting", "request addr", f.addr)
+ doRequest = requested
+
+ // all Fetcher context closed, can quit
+ case <-ctx.Done():
+ log.Trace("terminate fetcher", "request addr", f.addr)
+ // TODO: send cancelations to all peers left over in peers map (i.e., those we requested from)
+ return
+ }
+
+ // need to issue a new request
+ if doRequest {
+ var err error
+ sources, err = f.doRequest(ctx, gone, peers, sources)
+ if err != nil {
+ log.Warn("unable to request", "request addr", f.addr, "err", err)
+ }
+ }
+
+ // if wait channel is not set, set it to a timer
+ if requested {
+ if wait == nil {
+ wait = time.NewTimer(searchTimeout)
+ defer wait.Stop()
+ waitC = wait.C
+ } else {
+ // stop the timer and drain the channel if it was not drained earlier
+ if !wait.Stop() {
+ select {
+ case <-wait.C:
+ default:
+ }
+ }
+ // reset the timer to go off after searchTimeout
+ wait.Reset(searchTimeout)
+ }
+ }
+ doRequest = false
+ }
+}
+
+// doRequest attempts at finding a peer to request the chunk from
+// * first it tries to request explicitly from peers that are known to have offered the chunk
+// * if there are no such peers (available) it tries to request it from a peer closest to the chunk address
+// excluding those in the peersToSkip map
+// * if no such peer is found an error is returned
+//
+// if a request is successful,
+// * the peer's address is added to the set of peers to skip
+// * the peer's address is removed from prospective sources, and
+// * a go routine is started that reports on the gone channel if the peer is disconnected (or terminated their streamer)
+func (f *Fetcher) doRequest(ctx context.Context, gone chan *discover.NodeID, peersToSkip *sync.Map, sources []*discover.NodeID) ([]*discover.NodeID, error) {
+ var i int
+ var sourceID *discover.NodeID
+ var quit chan struct{}
+
+ req := &Request{
+ Addr: f.addr,
+ SkipCheck: f.skipCheck,
+ peersToSkip: peersToSkip,
+ }
+
+ foundSource := false
+ // iterate over known sources
+ for i = 0; i < len(sources); i++ {
+ req.Source = sources[i]
+ var err error
+ sourceID, quit, err = f.protoRequestFunc(ctx, req)
+ if err == nil {
+ // remove the peer from known sources
+ // Note: we can modify the source although we are looping on it, because we break from the loop immediately
+ sources = append(sources[:i], sources[i+1:]...)
+ foundSource = true
+ break
+ }
+ }
+
+ // if there are no known sources, or none available, we try request from a closest node
+ if !foundSource {
+ req.Source = nil
+ var err error
+ sourceID, quit, err = f.protoRequestFunc(ctx, req)
+ if err != nil {
+ // if no peers found to request from
+ return sources, err
+ }
+ }
+ // add peer to the set of peers to skip from now
+ peersToSkip.Store(sourceID.String(), time.Now())
+
+ // if the quit channel is closed, it indicates that the source peer we requested from
+ // disconnected or terminated its streamer
+ // here start a go routine that watches this channel and reports the source peer on the gone channel
+ // this go routine quits if the fetcher global context is done to prevent process leak
+ go func() {
+ select {
+ case <-quit:
+ gone <- sourceID
+ case <-ctx.Done():
+ }
+ }()
+ return sources, nil
+}
diff --git a/swarm/network/fetcher_test.go b/swarm/network/fetcher_test.go
new file mode 100644
index 000000000..21b81d652
--- /dev/null
+++ b/swarm/network/fetcher_test.go
@@ -0,0 +1,459 @@
+// Copyright 2018 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package network
+
+import (
+ "context"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/ethereum/go-ethereum/p2p/discover"
+)
+
+var requestedPeerID = discover.MustHexID("1dd9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439")
+var sourcePeerID = discover.MustHexID("2dd9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439")
+
+// mockRequester pushes every request to the requestC channel when its doRequest function is called
+type mockRequester struct {
+ // requests []Request
+ requestC chan *Request // when a request is coming it is pushed to requestC
+ waitTimes []time.Duration // with waitTimes[i] you can define how much to wait on the ith request (optional)
+ ctr int //counts the number of requests
+ quitC chan struct{}
+}
+
+func newMockRequester(waitTimes ...time.Duration) *mockRequester {
+ return &mockRequester{
+ requestC: make(chan *Request),
+ waitTimes: waitTimes,
+ quitC: make(chan struct{}),
+ }
+}
+
+func (m *mockRequester) doRequest(ctx context.Context, request *Request) (*discover.NodeID, chan struct{}, error) {
+ waitTime := time.Duration(0)
+ if m.ctr < len(m.waitTimes) {
+ waitTime = m.waitTimes[m.ctr]
+ m.ctr++
+ }
+ time.Sleep(waitTime)
+ m.requestC <- request
+
+ // if there is a Source in the request use that, if not use the global requestedPeerId
+ source := request.Source
+ if source == nil {
+ source = &requestedPeerID
+ }
+ return source, m.quitC, nil
+}
+
+// TestFetcherSingleRequest creates a Fetcher using mockRequester, and run it with a sample set of peers to skip.
+// mockRequester pushes a Request on a channel every time the request function is called. Using
+// this channel we test if calling Fetcher.Request calls the request function, and whether it uses
+// the correct peers to skip which we provided for the fetcher.run function.
+func TestFetcherSingleRequest(t *testing.T) {
+ requester := newMockRequester()
+ addr := make([]byte, 32)
+ fetcher := NewFetcher(addr, requester.doRequest, true)
+
+ peers := []string{"a", "b", "c", "d"}
+ peersToSkip := &sync.Map{}
+ for _, p := range peers {
+ peersToSkip.Store(p, time.Now())
+ }
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ go fetcher.run(ctx, peersToSkip)
+
+ rctx := context.Background()
+ fetcher.Request(rctx)
+
+ select {
+ case request := <-requester.requestC:
+ // request should contain all peers from peersToSkip provided to the fetcher
+ for _, p := range peers {
+ if _, ok := request.peersToSkip.Load(p); !ok {
+ t.Fatalf("request.peersToSkip misses peer")
+ }
+ }
+
+ // source peer should be also added to peersToSkip eventually
+ time.Sleep(100 * time.Millisecond)
+ if _, ok := request.peersToSkip.Load(requestedPeerID.String()); !ok {
+ t.Fatalf("request.peersToSkip does not contain peer returned by the request function")
+ }
+
+ // fetch should trigger a request, if it doesn't happen in time, test should fail
+ case <-time.After(200 * time.Millisecond):
+ t.Fatalf("fetch timeout")
+ }
+}
+
+// TestCancelStopsFetcher tests that a cancelled fetcher does not initiate further requests even if its fetch function is called
+func TestFetcherCancelStopsFetcher(t *testing.T) {
+ requester := newMockRequester()
+ addr := make([]byte, 32)
+ fetcher := NewFetcher(addr, requester.doRequest, true)
+
+ peersToSkip := &sync.Map{}
+
+ ctx, cancel := context.WithCancel(context.Background())
+
+ // we start the fetcher, and then we immediately cancel the context
+ go fetcher.run(ctx, peersToSkip)
+ cancel()
+
+ rctx, rcancel := context.WithTimeout(ctx, 100*time.Millisecond)
+ defer rcancel()
+ // we call Request with an active context
+ fetcher.Request(rctx)
+
+ // fetcher should not initiate request, we can only check by waiting a bit and making sure no request is happening
+ select {
+ case <-requester.requestC:
+ t.Fatalf("cancelled fetcher initiated request")
+ case <-time.After(200 * time.Millisecond):
+ }
+}
+
+// TestFetchCancelStopsRequest tests that calling a Request function with a cancelled context does not initiate a request
+func TestFetcherCancelStopsRequest(t *testing.T) {
+ requester := newMockRequester(100 * time.Millisecond)
+ addr := make([]byte, 32)
+ fetcher := NewFetcher(addr, requester.doRequest, true)
+
+ peersToSkip := &sync.Map{}
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ // we start the fetcher with an active context
+ go fetcher.run(ctx, peersToSkip)
+
+ rctx, rcancel := context.WithCancel(context.Background())
+ rcancel()
+
+ // we call Request with a cancelled context
+ fetcher.Request(rctx)
+
+ // fetcher should not initiate request, we can only check by waiting a bit and making sure no request is happening
+ select {
+ case <-requester.requestC:
+ t.Fatalf("cancelled fetch function initiated request")
+ case <-time.After(200 * time.Millisecond):
+ }
+
+ // if there is another Request with active context, there should be a request, because the fetcher itself is not cancelled
+ rctx = context.Background()
+ fetcher.Request(rctx)
+
+ select {
+ case <-requester.requestC:
+ case <-time.After(200 * time.Millisecond):
+ t.Fatalf("expected request")
+ }
+}
+
+// TestOfferUsesSource tests Fetcher Offer behavior.
+// In this case there should be 1 (and only one) request initiated from the source peer, and the
+// source nodeid should appear in the peersToSkip map.
+func TestFetcherOfferUsesSource(t *testing.T) {
+ requester := newMockRequester(100 * time.Millisecond)
+ addr := make([]byte, 32)
+ fetcher := NewFetcher(addr, requester.doRequest, true)
+
+ peersToSkip := &sync.Map{}
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ // start the fetcher
+ go fetcher.run(ctx, peersToSkip)
+
+ rctx := context.Background()
+ // call the Offer function with the source peer
+ fetcher.Offer(rctx, &sourcePeerID)
+
+ // fetcher should not initiate request
+ select {
+ case <-requester.requestC:
+ t.Fatalf("fetcher initiated request")
+ case <-time.After(200 * time.Millisecond):
+ }
+
+ // call Request after the Offer
+ rctx = context.Background()
+ fetcher.Request(rctx)
+
+ // there should be exactly 1 request coming from fetcher
+ var request *Request
+ select {
+ case request = <-requester.requestC:
+ if *request.Source != sourcePeerID {
+ t.Fatalf("Expected source id %v got %v", sourcePeerID, request.Source)
+ }
+ case <-time.After(200 * time.Millisecond):
+ t.Fatalf("fetcher did not initiate request")
+ }
+
+ select {
+ case <-requester.requestC:
+ t.Fatalf("Fetcher number of requests expected 1 got 2")
+ case <-time.After(200 * time.Millisecond):
+ }
+
+ // source peer should be added to peersToSkip eventually
+ time.Sleep(100 * time.Millisecond)
+ if _, ok := request.peersToSkip.Load(sourcePeerID.String()); !ok {
+ t.Fatalf("SourcePeerId not added to peersToSkip")
+ }
+}
+
+func TestFetcherOfferAfterRequestUsesSourceFromContext(t *testing.T) {
+ requester := newMockRequester(100 * time.Millisecond)
+ addr := make([]byte, 32)
+ fetcher := NewFetcher(addr, requester.doRequest, true)
+
+ peersToSkip := &sync.Map{}
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ // start the fetcher
+ go fetcher.run(ctx, peersToSkip)
+
+ // call Request first
+ rctx := context.Background()
+ fetcher.Request(rctx)
+
+ // there should be a request coming from fetcher
+ var request *Request
+ select {
+ case request = <-requester.requestC:
+ if request.Source != nil {
+ t.Fatalf("Incorrect source peer id, expected nil got %v", request.Source)
+ }
+ case <-time.After(200 * time.Millisecond):
+ t.Fatalf("fetcher did not initiate request")
+ }
+
+ // after the Request call Offer
+ fetcher.Offer(context.Background(), &sourcePeerID)
+
+ // there should be a request coming from fetcher
+ select {
+ case request = <-requester.requestC:
+ if *request.Source != sourcePeerID {
+ t.Fatalf("Incorrect source peer id, expected %v got %v", sourcePeerID, request.Source)
+ }
+ case <-time.After(200 * time.Millisecond):
+ t.Fatalf("fetcher did not initiate request")
+ }
+
+ // source peer should be added to peersToSkip eventually
+ time.Sleep(100 * time.Millisecond)
+ if _, ok := request.peersToSkip.Load(sourcePeerID.String()); !ok {
+ t.Fatalf("SourcePeerId not added to peersToSkip")
+ }
+}
+
+// TestFetcherRetryOnTimeout tests that fetch retries after searchTimeOut has passed
+func TestFetcherRetryOnTimeout(t *testing.T) {
+ requester := newMockRequester()
+ addr := make([]byte, 32)
+ fetcher := NewFetcher(addr, requester.doRequest, true)
+
+ peersToSkip := &sync.Map{}
+
+ // set searchTimeOut to low value so the test is quicker
+ defer func(t time.Duration) {
+ searchTimeout = t
+ }(searchTimeout)
+ searchTimeout = 250 * time.Millisecond
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ // start the fetcher
+ go fetcher.run(ctx, peersToSkip)
+
+ // call the fetch function with an active context
+ rctx := context.Background()
+ fetcher.Request(rctx)
+
+ // after 100ms the first request should be initiated
+ time.Sleep(100 * time.Millisecond)
+
+ select {
+ case <-requester.requestC:
+ default:
+ t.Fatalf("fetch did not initiate request")
+ }
+
+ // after another 100ms no new request should be initiated, because search timeout is 250ms
+ time.Sleep(100 * time.Millisecond)
+
+ select {
+ case <-requester.requestC:
+ t.Fatalf("unexpected request from fetcher")
+ default:
+ }
+
+ // after another 300ms search timeout is over, there should be a new request
+ time.Sleep(300 * time.Millisecond)
+
+ select {
+ case <-requester.requestC:
+ default:
+ t.Fatalf("fetch did not retry request")
+ }
+}
+
+// TestFetcherFactory creates a FetcherFactory and checks if the factory really creates and starts
+// a Fetcher when it return a fetch function. We test the fetching functionality just by checking if
+// a request is initiated when the fetch function is called
+func TestFetcherFactory(t *testing.T) {
+ requester := newMockRequester(100 * time.Millisecond)
+ addr := make([]byte, 32)
+ fetcherFactory := NewFetcherFactory(requester.doRequest, false)
+
+ peersToSkip := &sync.Map{}
+
+ fetcher := fetcherFactory.New(context.Background(), addr, peersToSkip)
+
+ fetcher.Request(context.Background())
+
+ // check if the created fetchFunction really starts a fetcher and initiates a request
+ select {
+ case <-requester.requestC:
+ case <-time.After(200 * time.Millisecond):
+ t.Fatalf("fetch timeout")
+ }
+
+}
+
+func TestFetcherRequestQuitRetriesRequest(t *testing.T) {
+ requester := newMockRequester()
+ addr := make([]byte, 32)
+ fetcher := NewFetcher(addr, requester.doRequest, true)
+
+ // make sure searchTimeout is long so it is sure the request is not retried because of timeout
+ defer func(t time.Duration) {
+ searchTimeout = t
+ }(searchTimeout)
+ searchTimeout = 10 * time.Second
+
+ peersToSkip := &sync.Map{}
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ go fetcher.run(ctx, peersToSkip)
+
+ rctx := context.Background()
+ fetcher.Request(rctx)
+
+ select {
+ case <-requester.requestC:
+ case <-time.After(200 * time.Millisecond):
+ t.Fatalf("request is not initiated")
+ }
+
+ close(requester.quitC)
+
+ select {
+ case <-requester.requestC:
+ case <-time.After(200 * time.Millisecond):
+ t.Fatalf("request is not initiated after failed request")
+ }
+}
+
+// TestRequestSkipPeer checks if PeerSkip function will skip provided peer
+// and not skip unknown one.
+func TestRequestSkipPeer(t *testing.T) {
+ addr := make([]byte, 32)
+ peers := []discover.NodeID{
+ discover.MustHexID("1dd9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"),
+ discover.MustHexID("2dd9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"),
+ }
+
+ peersToSkip := new(sync.Map)
+ peersToSkip.Store(peers[0].String(), time.Now())
+ r := NewRequest(addr, false, peersToSkip)
+
+ if !r.SkipPeer(peers[0].String()) {
+ t.Errorf("peer not skipped")
+ }
+
+ if r.SkipPeer(peers[1].String()) {
+ t.Errorf("peer skipped")
+ }
+}
+
+// TestRequestSkipPeerExpired checks if a peer to skip is not skipped
+// after RequestTimeout has passed.
+func TestRequestSkipPeerExpired(t *testing.T) {
+ addr := make([]byte, 32)
+ peer := discover.MustHexID("1dd9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439")
+
+ // set RequestTimeout to a low value and reset it after the test
+ defer func(t time.Duration) { RequestTimeout = t }(RequestTimeout)
+ RequestTimeout = 250 * time.Millisecond
+
+ peersToSkip := new(sync.Map)
+ peersToSkip.Store(peer.String(), time.Now())
+ r := NewRequest(addr, false, peersToSkip)
+
+ if !r.SkipPeer(peer.String()) {
+ t.Errorf("peer not skipped")
+ }
+
+ time.Sleep(500 * time.Millisecond)
+
+ if r.SkipPeer(peer.String()) {
+ t.Errorf("peer skipped")
+ }
+}
+
+// TestRequestSkipPeerPermanent checks if a peer to skip is not skipped
+// after RequestTimeout is not skipped if it is set for a permanent skipping
+// by value to peersToSkip map is not time.Duration.
+func TestRequestSkipPeerPermanent(t *testing.T) {
+ addr := make([]byte, 32)
+ peer := discover.MustHexID("1dd9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439")
+
+ // set RequestTimeout to a low value and reset it after the test
+ defer func(t time.Duration) { RequestTimeout = t }(RequestTimeout)
+ RequestTimeout = 250 * time.Millisecond
+
+ peersToSkip := new(sync.Map)
+ peersToSkip.Store(peer.String(), true)
+ r := NewRequest(addr, false, peersToSkip)
+
+ if !r.SkipPeer(peer.String()) {
+ t.Errorf("peer not skipped")
+ }
+
+ time.Sleep(500 * time.Millisecond)
+
+ if !r.SkipPeer(peer.String()) {
+ t.Errorf("peer not skipped")
+ }
+}
diff --git a/swarm/network/hive.go b/swarm/network/hive.go
index 366021088..425c1d5a1 100644
--- a/swarm/network/hive.go
+++ b/swarm/network/hive.go
@@ -32,31 +32,10 @@ import (
Hive is the logistic manager of the swarm
When the hive is started, a forever loop is launched that
-asks the Overlay Topology driver (e.g., generic kademlia nodetable)
+asks the kademlia nodetable
to suggest peers to bootstrap connectivity
*/
-// Overlay is the interface for kademlia (or other topology drivers)
-type Overlay interface {
- // suggest peers to connect to
- SuggestPeer() (OverlayAddr, int, bool)
- // register and deregister peer connections
- On(OverlayConn) (depth uint8, changed bool)
- Off(OverlayConn)
- // register peer addresses
- Register([]OverlayAddr) error
- // iterate over connected peers
- EachConn([]byte, int, func(OverlayConn, int, bool) bool)
- // iterate over known peers (address records)
- EachAddr([]byte, int, func(OverlayAddr, int, bool) bool)
- // pretty print the connectivity
- String() string
- // base Overlay address of the node itself
- BaseAddr() []byte
- // connectivity health check used for testing
- Healthy(*PeerPot) *Health
-}
-
// HiveParams holds the config options to hive
type HiveParams struct {
Discovery bool // if want discovery of not
@@ -78,7 +57,7 @@ func NewHiveParams() *HiveParams {
// Hive manages network connections of the swarm node
type Hive struct {
*HiveParams // settings
- Overlay // the overlay connectiviy driver
+ *Kademlia // the overlay connectiviy driver
Store state.Store // storage interface to save peers across sessions
addPeer func(*discover.Node) // server callback to connect to a peer
// bookkeeping
@@ -88,12 +67,12 @@ type Hive struct {
// NewHive constructs a new hive
// HiveParams: config parameters
-// Overlay: connectivity driver using a network topology
+// Kademlia: connectivity driver using a network topology
// StateStore: to save peers across sessions
-func NewHive(params *HiveParams, overlay Overlay, store state.Store) *Hive {
+func NewHive(params *HiveParams, kad *Kademlia, store state.Store) *Hive {
return &Hive{
HiveParams: params,
- Overlay: overlay,
+ Kademlia: kad,
Store: store,
}
}
@@ -133,7 +112,7 @@ func (h *Hive) Stop() error {
}
}
log.Info(fmt.Sprintf("%08x hive stopped, dropping peers", h.BaseAddr()[:4]))
- h.EachConn(nil, 255, func(p OverlayConn, _ int, _ bool) bool {
+ h.EachConn(nil, 255, func(p *Peer, _ int, _ bool) bool {
log.Info(fmt.Sprintf("%08x dropping peer %08x", h.BaseAddr()[:4], p.Address()[:4]))
p.Drop(nil)
return true
@@ -151,14 +130,14 @@ func (h *Hive) connect() {
addr, depth, changed := h.SuggestPeer()
if h.Discovery && changed {
- NotifyDepth(uint8(depth), h)
+ NotifyDepth(uint8(depth), h.Kademlia)
}
if addr == nil {
continue
}
log.Trace(fmt.Sprintf("%08x hive connect() suggested %08x", h.BaseAddr()[:4], addr.Address()[:4]))
- under, err := discover.ParseNode(string(addr.(Addr).Under()))
+ under, err := discover.ParseNode(string(addr.Under()))
if err != nil {
log.Warn(fmt.Sprintf("%08x unable to connect to bee %08x: invalid node URL: %v", h.BaseAddr()[:4], addr.Address()[:4], err))
continue
@@ -170,19 +149,19 @@ func (h *Hive) connect() {
// Run protocol run function
func (h *Hive) Run(p *BzzPeer) error {
- dp := newDiscovery(p, h)
+ dp := NewPeer(p, h.Kademlia)
depth, changed := h.On(dp)
// if we want discovery, advertise change of depth
if h.Discovery {
if changed {
// if depth changed, send to all peers
- NotifyDepth(depth, h)
+ NotifyDepth(depth, h.Kademlia)
} else {
// otherwise just send depth to new peer
dp.NotifyDepth(depth)
}
}
- NotifyPeer(p.Off(), h)
+ NotifyPeer(p.BzzAddr, h.Kademlia)
defer h.Off(dp)
return dp.Run(dp.HandleMsg)
}
@@ -206,17 +185,6 @@ func (h *Hive) PeerInfo(id discover.NodeID) interface{} {
}
}
-// ToAddr returns the serialisable version of u
-func ToAddr(pa OverlayPeer) *BzzAddr {
- if addr, ok := pa.(*BzzAddr); ok {
- return addr
- }
- if p, ok := pa.(*discPeer); ok {
- return p.BzzAddr
- }
- return pa.(*BzzPeer).BzzAddr
-}
-
// loadPeers, savePeer implement persistence callback/
func (h *Hive) loadPeers() error {
var as []*BzzAddr
@@ -230,28 +198,19 @@ func (h *Hive) loadPeers() error {
}
log.Info(fmt.Sprintf("hive %08x: peers loaded", h.BaseAddr()[:4]))
- return h.Register(toOverlayAddrs(as...))
-}
-
-// toOverlayAddrs transforms an array of BzzAddr to OverlayAddr
-func toOverlayAddrs(as ...*BzzAddr) (oas []OverlayAddr) {
- for _, a := range as {
- oas = append(oas, OverlayAddr(a))
- }
- return
+ return h.Register(as...)
}
// savePeers, savePeer implement persistence callback/
func (h *Hive) savePeers() error {
var peers []*BzzAddr
- h.Overlay.EachAddr(nil, 256, func(pa OverlayAddr, i int, _ bool) bool {
+ h.Kademlia.EachAddr(nil, 256, func(pa *BzzAddr, i int, _ bool) bool {
if pa == nil {
log.Warn(fmt.Sprintf("empty addr: %v", i))
return true
}
- apa := ToAddr(pa)
- log.Trace("saving peer", "peer", apa)
- peers = append(peers, apa)
+ log.Trace("saving peer", "peer", pa)
+ peers = append(peers, pa)
return true
})
if err := h.Store.Put("peers", peers); err != nil {
diff --git a/swarm/network/hive_test.go b/swarm/network/hive_test.go
index c2abfb2aa..7ea000c1a 100644
--- a/swarm/network/hive_test.go
+++ b/swarm/network/hive_test.go
@@ -41,7 +41,7 @@ func TestRegisterAndConnect(t *testing.T) {
id := s.IDs[0]
raddr := NewAddrFromNodeID(id)
- pp.Register([]OverlayAddr{OverlayAddr(raddr)})
+ pp.Register(raddr)
// start the hive and wait for the connection
err := pp.Start(s.Server)
@@ -77,7 +77,7 @@ func TestHiveStatePersistance(t *testing.T) {
peers := make(map[string]bool)
for _, id := range s.IDs {
raddr := NewAddrFromNodeID(id)
- pp.Register([]OverlayAddr{OverlayAddr(raddr)})
+ pp.Register(raddr)
peers[raddr.String()] = true
}
@@ -97,8 +97,8 @@ func TestHiveStatePersistance(t *testing.T) {
pp.Start(s1.Server)
i := 0
- pp.Overlay.EachAddr(nil, 256, func(addr OverlayAddr, po int, nn bool) bool {
- delete(peers, addr.(*BzzAddr).String())
+ pp.Kademlia.EachAddr(nil, 256, func(addr *BzzAddr, po int, nn bool) bool {
+ delete(peers, addr.String())
i++
return true
})
diff --git a/swarm/network/kademlia.go b/swarm/network/kademlia.go
index 0177d449c..55a0c6f13 100644
--- a/swarm/network/kademlia.go
+++ b/swarm/network/kademlia.go
@@ -62,7 +62,7 @@ type KadParams struct {
RetryExponent int // exponent to multiply retry intervals with
MaxRetries int // maximum number of redial attempts
// function to sanction or prevent suggesting a peer
- Reachable func(OverlayAddr) bool
+ Reachable func(*BzzAddr) bool
}
// NewKadParams returns a params struct with default values
@@ -106,45 +106,22 @@ func NewKademlia(addr []byte, params *KadParams) *Kademlia {
}
}
-// OverlayPeer interface captures the common aspect of view of a peer from the Overlay
-// topology driver
-type OverlayPeer interface {
- Address() []byte
-}
-
-// OverlayConn represents a connected peer
-type OverlayConn interface {
- OverlayPeer
- Drop(error) // call to indicate a peer should be expunged
- Off() OverlayAddr // call to return a persitent OverlayAddr
-}
-
-// OverlayAddr represents a kademlia peer record
-type OverlayAddr interface {
- OverlayPeer
- Update(OverlayAddr) OverlayAddr // returns the updated version of the original
-}
-
-// entry represents a Kademlia table entry (an extension of OverlayPeer)
+// entry represents a Kademlia table entry (an extension of BzzAddr)
type entry struct {
- OverlayPeer
+ *BzzAddr
+ conn *Peer
seenAt time.Time
retries int
}
-// newEntry creates a kademlia peer from an OverlayPeer interface
-func newEntry(p OverlayPeer) *entry {
+// newEntry creates a kademlia peer from a *Peer
+func newEntry(p *BzzAddr) *entry {
return &entry{
- OverlayPeer: p,
- seenAt: time.Now(),
+ BzzAddr: p,
+ seenAt: time.Now(),
}
}
-// Bin is the binary (bitvector) serialisation of the entry address
-func (e *entry) Bin() string {
- return pot.ToBin(e.addr().Address())
-}
-
// Label is a short tag for the entry for debug
func Label(e *entry) string {
return fmt.Sprintf("%s (%d)", e.Hex()[:4], e.retries)
@@ -152,29 +129,12 @@ func Label(e *entry) string {
// Hex is the hexadecimal serialisation of the entry address
func (e *entry) Hex() string {
- return fmt.Sprintf("%x", e.addr().Address())
+ return fmt.Sprintf("%x", e.Address())
}
-// String is the short tag for the entry
-func (e *entry) String() string {
- return fmt.Sprintf("%s (%d)", e.Hex()[:8], e.retries)
-}
-
-// addr returns the kad peer record (OverlayAddr) corresponding to the entry
-func (e *entry) addr() OverlayAddr {
- a, _ := e.OverlayPeer.(OverlayAddr)
- return a
-}
-
-// conn returns the connected peer (OverlayPeer) corresponding to the entry
-func (e *entry) conn() OverlayConn {
- c, _ := e.OverlayPeer.(OverlayConn)
- return c
-}
-
-// Register enters each OverlayAddr as kademlia peer record into the
+// Register enters each address as kademlia peer record into the
// database of known peer addresses
-func (k *Kademlia) Register(peers []OverlayAddr) error {
+func (k *Kademlia) Register(peers ...*BzzAddr) error {
k.lock.Lock()
defer k.lock.Unlock()
var known, size int
@@ -203,7 +163,6 @@ func (k *Kademlia) Register(peers []OverlayAddr) error {
if k.addrCountC != nil && size-known > 0 {
k.addrCountC <- k.addrs.Size()
}
- // log.Trace(fmt.Sprintf("%x registered %v peers, %v known, total: %v", k.BaseAddr()[:4], size, known, k.addrs.Size()))
k.sendNeighbourhoodDepthChange()
return nil
@@ -212,7 +171,7 @@ func (k *Kademlia) Register(peers []OverlayAddr) error {
// SuggestPeer returns a known peer for the lowest proximity bin for the
// lowest bincount below depth
// naturally if there is an empty row it returns a peer for that
-func (k *Kademlia) SuggestPeer() (a OverlayAddr, o int, want bool) {
+func (k *Kademlia) SuggestPeer() (a *BzzAddr, o int, want bool) {
k.lock.Lock()
defer k.lock.Unlock()
minsize := k.MinBinSize
@@ -224,15 +183,18 @@ func (k *Kademlia) SuggestPeer() (a OverlayAddr, o int, want bool) {
if po < depth {
return false
}
- a = k.callable(val)
+ e := val.(*entry)
+ c := k.callable(e)
+ if c {
+ a = e.BzzAddr
+ }
ppo = po
- return a == nil
+ return !c
})
if a != nil {
log.Trace(fmt.Sprintf("%08x candidate nearest neighbour found: %v (%v)", k.BaseAddr()[:4], a, ppo))
return a, 0, false
}
- // log.Trace(fmt.Sprintf("%08x no candidate nearest neighbours to connect to (Depth: %v, minProxSize: %v) %#v", k.BaseAddr()[:4], depth, k.MinProxBinSize, a))
var bpo []int
prev := -1
@@ -250,7 +212,6 @@ func (k *Kademlia) SuggestPeer() (a OverlayAddr, o int, want bool) {
})
// all buckets are full, ie., minsize == k.MinBinSize
if len(bpo) == 0 {
- // log.Debug(fmt.Sprintf("%08x: all bins saturated", k.BaseAddr()[:4]))
return nil, 0, false
}
// as long as we got candidate peers to connect to
@@ -264,8 +225,12 @@ func (k *Kademlia) SuggestPeer() (a OverlayAddr, o int, want bool) {
return false
}
return f(func(val pot.Val, _ int) bool {
- a = k.callable(val)
- return a == nil
+ e := val.(*entry)
+ c := k.callable(e)
+ if c {
+ a = e.BzzAddr
+ }
+ return !c
})
})
// found a candidate
@@ -282,25 +247,26 @@ func (k *Kademlia) SuggestPeer() (a OverlayAddr, o int, want bool) {
}
// On inserts the peer as a kademlia peer into the live peers
-func (k *Kademlia) On(p OverlayConn) (uint8, bool) {
+func (k *Kademlia) On(p *Peer) (uint8, bool) {
k.lock.Lock()
defer k.lock.Unlock()
- e := newEntry(p)
var ins bool
k.conns, _, _, _ = pot.Swap(k.conns, p, pof, func(v pot.Val) pot.Val {
// if not found live
if v == nil {
ins = true
// insert new online peer into conns
- return e
+ return p
}
// found among live peers, do nothing
return v
})
if ins {
+ a := newEntry(p.BzzAddr)
+ a.conn = p
// insert new online peer into addrs
k.addrs, _, _, _ = pot.Swap(k.addrs, p, pof, func(v pot.Val) pot.Val {
- return e
+ return a
})
// send new address count value only if the peer is inserted
if k.addrCountC != nil {
@@ -324,6 +290,8 @@ func (k *Kademlia) On(p OverlayConn) (uint8, bool) {
// Not receiving from the returned channel will block On function
// when the neighbourhood depth is changed.
func (k *Kademlia) NeighbourhoodDepthC() <-chan int {
+ k.lock.Lock()
+ defer k.lock.Unlock()
if k.nDepthC == nil {
k.nDepthC = make(chan int)
}
@@ -357,7 +325,7 @@ func (k *Kademlia) AddrCountC() <-chan int {
}
// Off removes a peer from among live peers
-func (k *Kademlia) Off(p OverlayConn) {
+func (k *Kademlia) Off(p *Peer) {
k.lock.Lock()
defer k.lock.Unlock()
var del bool
@@ -367,7 +335,7 @@ func (k *Kademlia) Off(p OverlayConn) {
panic(fmt.Sprintf("connected peer not found %v", p))
}
del = true
- return newEntry(p.Off())
+ return newEntry(p.BzzAddr)
})
if del {
@@ -383,7 +351,7 @@ func (k *Kademlia) Off(p OverlayConn) {
}
}
-func (k *Kademlia) EachBin(base []byte, pof pot.Pof, o int, eachBinFunc func(conn OverlayConn, po int) bool) {
+func (k *Kademlia) EachBin(base []byte, pof pot.Pof, o int, eachBinFunc func(conn *Peer, po int) bool) {
k.lock.RLock()
defer k.lock.RUnlock()
@@ -403,7 +371,7 @@ func (k *Kademlia) EachBin(base []byte, pof pot.Pof, o int, eachBinFunc func(con
for bin := startPo; bin <= endPo; bin++ {
f(func(val pot.Val, _ int) bool {
- return eachBinFunc(val.(*entry).conn(), bin)
+ return eachBinFunc(val.(*Peer), bin)
})
}
return true
@@ -413,13 +381,13 @@ func (k *Kademlia) EachBin(base []byte, pof pot.Pof, o int, eachBinFunc func(con
// EachConn is an iterator with args (base, po, f) applies f to each live peer
// that has proximity order po or less as measured from the base
// if base is nil, kademlia base address is used
-func (k *Kademlia) EachConn(base []byte, o int, f func(OverlayConn, int, bool) bool) {
+func (k *Kademlia) EachConn(base []byte, o int, f func(*Peer, int, bool) bool) {
k.lock.RLock()
defer k.lock.RUnlock()
k.eachConn(base, o, f)
}
-func (k *Kademlia) eachConn(base []byte, o int, f func(OverlayConn, int, bool) bool) {
+func (k *Kademlia) eachConn(base []byte, o int, f func(*Peer, int, bool) bool) {
if len(base) == 0 {
base = k.base
}
@@ -428,20 +396,20 @@ func (k *Kademlia) eachConn(base []byte, o int, f func(OverlayConn, int, bool) b
if po > o {
return true
}
- return f(val.(*entry).conn(), po, po >= depth)
+ return f(val.(*Peer), po, po >= depth)
})
}
// EachAddr called with (base, po, f) is an iterator applying f to each known peer
// that has proximity order po or less as measured from the base
// if base is nil, kademlia base address is used
-func (k *Kademlia) EachAddr(base []byte, o int, f func(OverlayAddr, int, bool) bool) {
+func (k *Kademlia) EachAddr(base []byte, o int, f func(*BzzAddr, int, bool) bool) {
k.lock.RLock()
defer k.lock.RUnlock()
k.eachAddr(base, o, f)
}
-func (k *Kademlia) eachAddr(base []byte, o int, f func(OverlayAddr, int, bool) bool) {
+func (k *Kademlia) eachAddr(base []byte, o int, f func(*BzzAddr, int, bool) bool) {
if len(base) == 0 {
base = k.base
}
@@ -450,7 +418,7 @@ func (k *Kademlia) eachAddr(base []byte, o int, f func(OverlayAddr, int, bool) b
if po > o {
return true
}
- return f(val.(*entry).addr(), po, po >= depth)
+ return f(val.(*entry).BzzAddr, po, po >= depth)
})
}
@@ -472,12 +440,11 @@ func (k *Kademlia) neighbourhoodDepth() (depth int) {
return depth
}
-// callable when called with val,
-func (k *Kademlia) callable(val pot.Val) OverlayAddr {
- e := val.(*entry)
+// callable decides if an address entry represents a callable peer
+func (k *Kademlia) callable(e *entry) bool {
// not callable if peer is live or exceeded maxRetries
- if e.conn() != nil || e.retries > k.MaxRetries {
- return nil
+ if e.conn != nil || e.retries > k.MaxRetries {
+ return false
}
// calculate the allowed number of retries based on time lapsed since last seen
timeAgo := int64(time.Since(e.seenAt))
@@ -491,17 +458,17 @@ func (k *Kademlia) callable(val pot.Val) OverlayAddr {
// peer can be retried again
if retries < e.retries {
log.Trace(fmt.Sprintf("%08x: %v long time since last try (at %v) needed before retry %v, wait only warrants %v", k.BaseAddr()[:4], e, timeAgo, e.retries, retries))
- return nil
+ return false
}
// function to sanction or prevent suggesting a peer
- if k.Reachable != nil && !k.Reachable(e.addr()) {
+ if k.Reachable != nil && !k.Reachable(e.BzzAddr) {
log.Trace(fmt.Sprintf("%08x: peer %v is temporarily not callable", k.BaseAddr()[:4], e))
- return nil
+ return false
}
e.retries++
log.Trace(fmt.Sprintf("%08x: peer %v is callable", k.BaseAddr()[:4], e))
- return e.addr()
+ return true
}
// BaseAddr return the kademlia base address
@@ -516,7 +483,8 @@ func (k *Kademlia) String() string {
return k.string()
}
-// String returns kademlia table + kaddb table displayed with ascii
+// string returns kademlia table + kaddb table displayed with ascii
+// caller must hold the lock
func (k *Kademlia) string() string {
wsrow := " "
var rows []string
@@ -538,7 +506,7 @@ func (k *Kademlia) string() string {
row := []string{fmt.Sprintf("%2d", size)}
rest -= size
f(func(val pot.Val, vpo int) bool {
- e := val.(*entry)
+ e := val.(*Peer)
row = append(row, fmt.Sprintf("%x", e.Address()[:2]))
rowlen++
return rowlen < 4
@@ -594,8 +562,9 @@ type PeerPot struct {
EmptyBins []int
}
-// NewPeerPotMap creates a map of pot record of OverlayAddr with keys
+// NewPeerPotMap creates a map of pot record of *BzzAddr with keys
// as hexadecimal representations of the address.
+// used for testing only
func NewPeerPotMap(kadMinProxSize int, addrs [][]byte) map[string]*PeerPot {
// create a table of all nodes for health check
np := pot.NewPot(nil, 0)
@@ -640,6 +609,7 @@ func NewPeerPotMap(kadMinProxSize int, addrs [][]byte) map[string]*PeerPot {
// saturation returns the lowest proximity order that the bin for that order
// has less than n peers
+// It is used in Healthy function for testing only
func (k *Kademlia) saturation(n int) int {
prev := -1
k.addrs.EachBin(k.base, pof, 0, func(po, size int, f func(func(val pot.Val, i int) bool) bool) bool {
@@ -654,7 +624,7 @@ func (k *Kademlia) saturation(n int) int {
}
// full returns true if all required bins have connected peers.
-// It is used in Healthy function.
+// It is used in Healthy function for testing only
func (k *Kademlia) full(emptyBins []int) (full bool) {
prev := 0
e := len(emptyBins)
@@ -688,10 +658,13 @@ func (k *Kademlia) full(emptyBins []int) (full bool) {
return e == 0
}
+// knowNearestNeighbours tests if all known nearest neighbours given as arguments
+// are found in the addressbook
+// It is used in Healthy function for testing only
func (k *Kademlia) knowNearestNeighbours(peers [][]byte) bool {
pm := make(map[string]bool)
- k.eachAddr(nil, 255, func(p OverlayAddr, po int, nn bool) bool {
+ k.eachAddr(nil, 255, func(p *BzzAddr, po int, nn bool) bool {
if !nn {
return false
}
@@ -709,10 +682,13 @@ func (k *Kademlia) knowNearestNeighbours(peers [][]byte) bool {
return true
}
+// gotNearestNeighbours tests if all known nearest neighbours given as arguments
+// are connected peers
+// It is used in Healthy function for testing only
func (k *Kademlia) gotNearestNeighbours(peers [][]byte) (got bool, n int, missing [][]byte) {
pm := make(map[string]bool)
- k.eachConn(nil, 255, func(p OverlayConn, po int, nn bool) bool {
+ k.eachConn(nil, 255, func(p *Peer, po int, nn bool) bool {
if !nn {
return false
}
@@ -735,6 +711,7 @@ func (k *Kademlia) gotNearestNeighbours(peers [][]byte) (got bool, n int, missin
}
// Health state of the Kademlia
+// used for testing only
type Health struct {
KnowNN bool // whether node knows all its nearest neighbours
GotNN bool // whether node is connected to all its nearest neighbours
@@ -746,6 +723,7 @@ type Health struct {
// Healthy reports the health state of the kademlia connectivity
// returns a Health struct
+// used for testing only
func (k *Kademlia) Healthy(pp *PeerPot) *Health {
k.lock.RLock()
defer k.lock.RUnlock()
diff --git a/swarm/network/kademlia_test.go b/swarm/network/kademlia_test.go
index b60e1e9a3..903c8dbda 100644
--- a/swarm/network/kademlia_test.go
+++ b/swarm/network/kademlia_test.go
@@ -38,71 +38,42 @@ func testKadPeerAddr(s string) *BzzAddr {
return &BzzAddr{OAddr: a, UAddr: a}
}
-type testDropPeer struct {
- Peer
- dropc chan error
-}
-
-type dropError struct {
- error
- addr string
-}
-
-func (d *testDropPeer) Drop(err error) {
- err2 := &dropError{err, binStr(d)}
- d.dropc <- err2
-}
-
-type testKademlia struct {
- *Kademlia
- Discovery bool
- dropc chan error
-}
-
-func newTestKademlia(b string) *testKademlia {
+func newTestKademlia(b string) *Kademlia {
params := NewKadParams()
params.MinBinSize = 1
params.MinProxBinSize = 2
base := pot.NewAddressFromString(b)
- return &testKademlia{
- NewKademlia(base, params),
- false,
- make(chan error),
- }
+ return NewKademlia(base, params)
}
-func (k *testKademlia) newTestKadPeer(s string) Peer {
- return &testDropPeer{&BzzPeer{BzzAddr: testKadPeerAddr(s)}, k.dropc}
+func newTestKadPeer(k *Kademlia, s string) *Peer {
+ return NewPeer(&BzzPeer{BzzAddr: testKadPeerAddr(s)}, k)
}
-func (k *testKademlia) On(ons ...string) *testKademlia {
+func On(k *Kademlia, ons ...string) {
for _, s := range ons {
- k.Kademlia.On(k.newTestKadPeer(s).(OverlayConn))
+ k.On(newTestKadPeer(k, s))
}
- return k
}
-func (k *testKademlia) Off(offs ...string) *testKademlia {
+func Off(k *Kademlia, offs ...string) {
for _, s := range offs {
- k.Kademlia.Off(k.newTestKadPeer(s).(OverlayConn))
+ k.Off(newTestKadPeer(k, s))
}
-
- return k
}
-func (k *testKademlia) Register(regs ...string) *testKademlia {
- var as []OverlayAddr
+func Register(k *Kademlia, regs ...string) {
+ var as []*BzzAddr
for _, s := range regs {
as = append(as, testKadPeerAddr(s))
}
- err := k.Kademlia.Register(as)
+ err := k.Register(as...)
if err != nil {
panic(err.Error())
}
- return k
}
-func testSuggestPeer(t *testing.T, k *testKademlia, expAddr string, expPo int, expWant bool) error {
+func testSuggestPeer(k *Kademlia, expAddr string, expPo int, expWant bool) error {
addr, o, want := k.SuggestPeer()
if binStr(addr) != expAddr {
return fmt.Errorf("incorrect peer address suggested. expected %v, got %v", expAddr, binStr(addr))
@@ -116,7 +87,7 @@ func testSuggestPeer(t *testing.T, k *testKademlia, expAddr string, expPo int, e
return nil
}
-func binStr(a OverlayPeer) string {
+func binStr(a *BzzAddr) string {
if a == nil {
return "<nil>"
}
@@ -125,15 +96,17 @@ func binStr(a OverlayPeer) string {
func TestSuggestPeerBug(t *testing.T) {
// 2 row gap, unsaturated proxbin, no callables -> want PO 0
- k := newTestKademlia("00000000").On(
+ k := newTestKademlia("00000000")
+ On(k,
"10000000", "11000000",
"01000000",
"00010000", "00011000",
- ).Off(
+ )
+ Off(k,
"01000000",
)
- err := testSuggestPeer(t, k, "01000000", 0, false)
+ err := testSuggestPeer(k, "01000000", 0, false)
if err != nil {
t.Fatal(err.Error())
}
@@ -141,140 +114,140 @@ func TestSuggestPeerBug(t *testing.T) {
func TestSuggestPeerFindPeers(t *testing.T) {
// 2 row gap, unsaturated proxbin, no callables -> want PO 0
- k := newTestKademlia("00000000").On("00100000")
- err := testSuggestPeer(t, k, "<nil>", 0, false)
+ k := newTestKademlia("00000000")
+ On(k, "00100000")
+ err := testSuggestPeer(k, "<nil>", 0, false)
if err != nil {
t.Fatal(err.Error())
}
// 2 row gap, saturated proxbin, no callables -> want PO 0
- k.On("00010000")
- err = testSuggestPeer(t, k, "<nil>", 0, false)
+ On(k, "00010000")
+ err = testSuggestPeer(k, "<nil>", 0, false)
if err != nil {
t.Fatal(err.Error())
}
// 1 row gap (1 less), saturated proxbin, no callables -> want PO 1
- k.On("10000000")
- err = testSuggestPeer(t, k, "<nil>", 1, false)
+ On(k, "10000000")
+ err = testSuggestPeer(k, "<nil>", 1, false)
if err != nil {
t.Fatal(err.Error())
}
// no gap (1 less), saturated proxbin, no callables -> do not want more
- k.On("01000000", "00100001")
- err = testSuggestPeer(t, k, "<nil>", 0, false)
+ On(k, "01000000", "00100001")
+ err = testSuggestPeer(k, "<nil>", 0, false)
if err != nil {
t.Fatal(err.Error())
}
// oversaturated proxbin, > do not want more
- k.On("00100001")
- err = testSuggestPeer(t, k, "<nil>", 0, false)
+ On(k, "00100001")
+ err = testSuggestPeer(k, "<nil>", 0, false)
if err != nil {
t.Fatal(err.Error())
}
// reintroduce gap, disconnected peer callable
- // log.Info(k.String())
- k.Off("01000000")
- err = testSuggestPeer(t, k, "01000000", 0, false)
+ Off(k, "01000000")
+ err = testSuggestPeer(k, "01000000", 0, false)
if err != nil {
t.Fatal(err.Error())
}
// second time disconnected peer not callable
// with reasonably set Interval
- err = testSuggestPeer(t, k, "<nil>", 1, true)
+ err = testSuggestPeer(k, "<nil>", 1, true)
if err != nil {
t.Fatal(err.Error())
}
// on and off again, peer callable again
- k.On("01000000")
- k.Off("01000000")
- err = testSuggestPeer(t, k, "01000000", 0, false)
+ On(k, "01000000")
+ Off(k, "01000000")
+ err = testSuggestPeer(k, "01000000", 0, false)
if err != nil {
t.Fatal(err.Error())
}
- k.On("01000000")
+ On(k, "01000000")
// new closer peer appears, it is immediately wanted
- k.Register("00010001")
- err = testSuggestPeer(t, k, "00010001", 0, false)
+ Register(k, "00010001")
+ err = testSuggestPeer(k, "00010001", 0, false)
if err != nil {
t.Fatal(err.Error())
}
// PO1 disconnects
- k.On("00010001")
+ On(k, "00010001")
log.Info(k.String())
- k.Off("01000000")
+ Off(k, "01000000")
log.Info(k.String())
// second time, gap filling
- err = testSuggestPeer(t, k, "01000000", 0, false)
+ err = testSuggestPeer(k, "01000000", 0, false)
if err != nil {
t.Fatal(err.Error())
}
- k.On("01000000")
- err = testSuggestPeer(t, k, "<nil>", 0, false)
+ On(k, "01000000")
+ err = testSuggestPeer(k, "<nil>", 0, false)
if err != nil {
t.Fatal(err.Error())
}
k.MinBinSize = 2
- err = testSuggestPeer(t, k, "<nil>", 0, true)
+ err = testSuggestPeer(k, "<nil>", 0, true)
if err != nil {
t.Fatal(err.Error())
}
- k.Register("01000001")
- err = testSuggestPeer(t, k, "01000001", 0, false)
+ Register(k, "01000001")
+ err = testSuggestPeer(k, "01000001", 0, false)
if err != nil {
t.Fatal(err.Error())
}
- k.On("10000001")
+ On(k, "10000001")
log.Trace(fmt.Sprintf("Kad:\n%v", k.String()))
- err = testSuggestPeer(t, k, "<nil>", 1, true)
+ err = testSuggestPeer(k, "<nil>", 1, true)
if err != nil {
t.Fatal(err.Error())
}
- k.On("01000001")
- err = testSuggestPeer(t, k, "<nil>", 0, false)
+ On(k, "01000001")
+ err = testSuggestPeer(k, "<nil>", 0, false)
if err != nil {
t.Fatal(err.Error())
}
k.MinBinSize = 3
- k.Register("10000010")
- err = testSuggestPeer(t, k, "10000010", 0, false)
+ Register(k, "10000010")
+ err = testSuggestPeer(k, "10000010", 0, false)
if err != nil {
t.Fatal(err.Error())
}
- k.On("10000010")
- err = testSuggestPeer(t, k, "<nil>", 1, false)
+ On(k, "10000010")
+ err = testSuggestPeer(k, "<nil>", 1, false)
if err != nil {
t.Fatal(err.Error())
}
- k.On("01000010")
- err = testSuggestPeer(t, k, "<nil>", 2, false)
+ On(k, "01000010")
+ err = testSuggestPeer(k, "<nil>", 2, false)
if err != nil {
t.Fatal(err.Error())
}
- k.On("00100010")
- err = testSuggestPeer(t, k, "<nil>", 3, false)
+ On(k, "00100010")
+ err = testSuggestPeer(k, "<nil>", 3, false)
if err != nil {
t.Fatal(err.Error())
}
- k.On("00010010")
- err = testSuggestPeer(t, k, "<nil>", 0, false)
+ On(k, "00010010")
+ err = testSuggestPeer(k, "<nil>", 0, false)
if err != nil {
t.Fatal(err.Error())
}
@@ -282,10 +255,8 @@ func TestSuggestPeerFindPeers(t *testing.T) {
}
func TestSuggestPeerRetries(t *testing.T) {
- t.Skip("Test is disabled, because it is flaky. It fails with kademlia_test.go:346: incorrect peer address suggested. expected <nil>, got 01000000")
- // 2 row gap, unsaturated proxbin, no callables -> want PO 0
k := newTestKademlia("00000000")
- k.RetryInterval = int64(100 * time.Millisecond) // cycle
+ k.RetryInterval = int64(300 * time.Millisecond) // cycle
k.MaxRetries = 50
k.RetryExponent = 2
sleep := func(n int) {
@@ -296,53 +267,53 @@ func TestSuggestPeerRetries(t *testing.T) {
time.Sleep(time.Duration(ts))
}
- k.Register("01000000")
- k.On("00000001", "00000010")
- err := testSuggestPeer(t, k, "01000000", 0, false)
+ Register(k, "01000000")
+ On(k, "00000001", "00000010")
+ err := testSuggestPeer(k, "01000000", 0, false)
if err != nil {
t.Fatal(err.Error())
}
- err = testSuggestPeer(t, k, "<nil>", 0, false)
+ err = testSuggestPeer(k, "<nil>", 0, false)
if err != nil {
t.Fatal(err.Error())
}
sleep(1)
- err = testSuggestPeer(t, k, "01000000", 0, false)
+ err = testSuggestPeer(k, "01000000", 0, false)
if err != nil {
t.Fatal(err.Error())
}
- err = testSuggestPeer(t, k, "<nil>", 0, false)
+ err = testSuggestPeer(k, "<nil>", 0, false)
if err != nil {
t.Fatal(err.Error())
}
sleep(1)
- err = testSuggestPeer(t, k, "01000000", 0, false)
+ err = testSuggestPeer(k, "01000000", 0, false)
if err != nil {
t.Fatal(err.Error())
}
- err = testSuggestPeer(t, k, "<nil>", 0, false)
+ err = testSuggestPeer(k, "<nil>", 0, false)
if err != nil {
t.Fatal(err.Error())
}
sleep(2)
- err = testSuggestPeer(t, k, "01000000", 0, false)
+ err = testSuggestPeer(k, "01000000", 0, false)
if err != nil {
t.Fatal(err.Error())
}
- err = testSuggestPeer(t, k, "<nil>", 0, false)
+ err = testSuggestPeer(k, "<nil>", 0, false)
if err != nil {
t.Fatal(err.Error())
}
sleep(2)
- err = testSuggestPeer(t, k, "<nil>", 0, false)
+ err = testSuggestPeer(k, "<nil>", 0, false)
if err != nil {
t.Fatal(err.Error())
}
@@ -350,7 +321,9 @@ func TestSuggestPeerRetries(t *testing.T) {
}
func TestKademliaHiveString(t *testing.T) {
- k := newTestKademlia("00000000").On("01000000", "00100000").Register("10000000", "10000001")
+ k := newTestKademlia("00000000")
+ On(k, "01000000", "00100000")
+ Register(k, "10000000", "10000001")
k.MaxProxDisplay = 8
h := k.String()
expH := "\n=========================================================================\nMon Feb 27 12:10:28 UTC 2017 KΛÐΞMLIΛ hive: queen's address: 000000\npopulation: 2 (4), MinProxBinSize: 2, MinBinSize: 1, MaxBinSize: 4\n000 0 | 2 8100 (0) 8000 (0)\n============ DEPTH: 1 ==========================================\n001 1 4000 | 1 4000 (0)\n002 1 2000 | 1 2000 (0)\n003 0 | 0\n004 0 | 0\n005 0 | 0\n006 0 | 0\n007 0 | 0\n========================================================================="
@@ -378,7 +351,7 @@ func testKademliaCase(t *testing.T, pivotAddr string, addrs ...string) {
continue
}
p := &BzzAddr{OAddr: a, UAddr: a}
- if err := k.Register([]OverlayAddr{p}); err != nil {
+ if err := k.Register(p); err != nil {
t.Fatal(err)
}
}
@@ -392,12 +365,12 @@ func testKademliaCase(t *testing.T, pivotAddr string, addrs ...string) {
if a == nil {
break
}
- k.On(&BzzPeer{BzzAddr: a.(*BzzAddr)})
+ k.On(NewPeer(&BzzPeer{BzzAddr: a}, k))
}
h := k.Healthy(pp)
if !(h.GotNN && h.KnowNN && h.Full) {
- t.Error("not healthy")
+ t.Fatalf("not healthy: %#v\n%v", h, k.String())
}
}
diff --git a/swarm/network/networkid_test.go b/swarm/network/networkid_test.go
index 05134b083..91a1f6d7b 100644
--- a/swarm/network/networkid_test.go
+++ b/swarm/network/networkid_test.go
@@ -92,7 +92,7 @@ func TestNetworkID(t *testing.T) {
if kademlias[node].addrs.Size() != len(netIDGroup)-1 {
t.Fatalf("Kademlia size has not expected peer size. Kademlia size: %d, expected size: %d", kademlias[node].addrs.Size(), len(netIDGroup)-1)
}
- kademlias[node].EachAddr(nil, 0, func(addr OverlayAddr, _ int, _ bool) bool {
+ kademlias[node].EachAddr(nil, 0, func(addr *BzzAddr, _ int, _ bool) bool {
found := false
for _, nd := range netIDGroup {
p := ToOverlayAddr(nd.Bytes())
diff --git a/swarm/network/priorityqueue/priorityqueue.go b/swarm/network/priorityqueue/priorityqueue.go
index fab638c9e..538502605 100644
--- a/swarm/network/priorityqueue/priorityqueue.go
+++ b/swarm/network/priorityqueue/priorityqueue.go
@@ -28,10 +28,13 @@ package priorityqueue
import (
"context"
"errors"
+
+ "github.com/ethereum/go-ethereum/log"
)
var (
- errContention = errors.New("queue contention")
+ ErrContention = errors.New("contention")
+
errBadPriority = errors.New("bad priority")
wakey = struct{}{}
@@ -39,7 +42,7 @@ var (
// PriorityQueue is the basic structure
type PriorityQueue struct {
- queues []chan interface{}
+ Queues []chan interface{}
wakeup chan struct{}
}
@@ -50,27 +53,29 @@ func New(n int, l int) *PriorityQueue {
queues[i] = make(chan interface{}, l)
}
return &PriorityQueue{
- queues: queues,
+ Queues: queues,
wakeup: make(chan struct{}, 1),
}
}
// Run is a forever loop popping items from the queues
func (pq *PriorityQueue) Run(ctx context.Context, f func(interface{})) {
- top := len(pq.queues) - 1
+ top := len(pq.Queues) - 1
p := top
READ:
for {
- q := pq.queues[p]
+ q := pq.Queues[p]
select {
case <-ctx.Done():
return
case x := <-q:
+ log.Trace("priority.queue f(x)", "p", p, "len(Queues[p])", len(pq.Queues[p]))
f(x)
p = top
default:
if p > 0 {
p--
+ log.Trace("priority.queue p > 0", "p", p)
continue READ
}
p = top
@@ -78,6 +83,7 @@ READ:
case <-ctx.Done():
return
case <-pq.wakeup:
+ log.Trace("priority.queue wakeup", "p", p)
}
}
}
@@ -85,23 +91,15 @@ READ:
// Push pushes an item to the appropriate queue specified in the priority argument
// if context is given it waits until either the item is pushed or the Context aborts
-// otherwise returns errContention if the queue is full
-func (pq *PriorityQueue) Push(ctx context.Context, x interface{}, p int) error {
- if p < 0 || p >= len(pq.queues) {
+func (pq *PriorityQueue) Push(x interface{}, p int) error {
+ if p < 0 || p >= len(pq.Queues) {
return errBadPriority
}
- if ctx == nil {
- select {
- case pq.queues[p] <- x:
- default:
- return errContention
- }
- } else {
- select {
- case pq.queues[p] <- x:
- case <-ctx.Done():
- return ctx.Err()
- }
+ log.Trace("priority.queue push", "p", p, "len(Queues[p])", len(pq.Queues[p]))
+ select {
+ case pq.Queues[p] <- x:
+ default:
+ return ErrContention
}
select {
case pq.wakeup <- wakey:
diff --git a/swarm/network/priorityqueue/priorityqueue_test.go b/swarm/network/priorityqueue/priorityqueue_test.go
index cd54250f8..ed8b575c2 100644
--- a/swarm/network/priorityqueue/priorityqueue_test.go
+++ b/swarm/network/priorityqueue/priorityqueue_test.go
@@ -30,7 +30,7 @@ func TestPriorityQueue(t *testing.T) {
results = append(results, v.(string))
wg.Done()
})
- pq.Push(context.Background(), "2.0", 2)
+ pq.Push("2.0", 2)
wg.Wait()
if results[0] != "2.0" {
t.Errorf("expected first result %q, got %q", "2.0", results[0])
@@ -66,7 +66,7 @@ Loop:
{
priorities: []int{0, 0, 0},
values: []string{"0.0", "0.0", "0.1"},
- errors: []error{nil, nil, errContention},
+ errors: []error{nil, nil, ErrContention},
},
} {
var results []string
@@ -74,7 +74,7 @@ Loop:
pq := New(3, 2)
wg.Add(len(tc.values))
for j, value := range tc.values {
- err := pq.Push(nil, value, tc.priorities[j])
+ err := pq.Push(value, tc.priorities[j])
if tc.errors != nil && err != tc.errors[j] {
t.Errorf("expected push error %v, got %v", tc.errors[j], err)
continue Loop
diff --git a/swarm/network/protocol.go b/swarm/network/protocol.go
index 7f7ca5eed..ef0956d5f 100644
--- a/swarm/network/protocol.go
+++ b/swarm/network/protocol.go
@@ -62,32 +62,6 @@ var DiscoverySpec = &protocols.Spec{
},
}
-// Addr interface that peerPool needs
-type Addr interface {
- OverlayPeer
- Over() []byte
- Under() []byte
- String() string
- Update(OverlayAddr) OverlayAddr
-}
-
-// Peer interface represents an live peer connection
-type Peer interface {
- Addr // the address of a peer
- Conn // the live connection (protocols.Peer)
- LastActive() time.Time // last time active
-}
-
-// Conn interface represents an live peer connection
-type Conn interface {
- ID() discover.NodeID // the key that uniquely identifies the Node for the peerPool
- Handshake(context.Context, interface{}, func(interface{}) error) (interface{}, error) // can send messages
- Send(context.Context, interface{}) error // can send messages
- Drop(error) // disconnect this peer
- Run(func(context.Context, interface{}) error) error // the run function to run a protocol
- Off() OverlayAddr
-}
-
// BzzConfig captures the config params used by the hive
type BzzConfig struct {
OverlayAddr []byte // base address of the overlay network
@@ -114,7 +88,7 @@ type Bzz struct {
// * bzz config
// * overlay driver
// * peer store
-func NewBzz(config *BzzConfig, kad Overlay, store state.Store, streamerSpec *protocols.Spec, streamerRun func(*BzzPeer) error) *Bzz {
+func NewBzz(config *BzzConfig, kad *Kademlia, store state.Store, streamerSpec *protocols.Spec, streamerRun func(*BzzPeer) error) *Bzz {
return &Bzz{
Hive: NewHive(config.HiveParams, kad, store),
NetworkID: config.NetworkID,
@@ -131,7 +105,7 @@ func (b *Bzz) UpdateLocalAddr(byteaddr []byte) *BzzAddr {
b.localAddr = b.localAddr.Update(&BzzAddr{
UAddr: byteaddr,
OAddr: b.localAddr.OAddr,
- }).(*BzzAddr)
+ })
return b.localAddr
}
@@ -274,7 +248,7 @@ type BzzPeer struct {
LightNode bool
}
-func NewBzzTestPeer(p *protocols.Peer, addr *BzzAddr) *BzzPeer {
+func NewBzzPeer(p *protocols.Peer, addr *BzzAddr) *BzzPeer {
return &BzzPeer{
Peer: p,
localAddr: addr,
@@ -282,11 +256,6 @@ func NewBzzTestPeer(p *protocols.Peer, addr *BzzAddr) *BzzPeer {
}
}
-// Off returns the overlay peer record for offline persistence
-func (p *BzzPeer) Off() OverlayAddr {
- return p.BzzAddr
-}
-
// LastActive returns the time the peer was last active
func (p *BzzPeer) LastActive() time.Time {
return p.lastActive
@@ -388,8 +357,8 @@ func (a *BzzAddr) ID() discover.NodeID {
}
// Update updates the underlay address of a peer record
-func (a *BzzAddr) Update(na OverlayAddr) OverlayAddr {
- return &BzzAddr{a.OAddr, na.(Addr).Under()}
+func (a *BzzAddr) Update(na *BzzAddr) *BzzAddr {
+ return &BzzAddr{a.OAddr, na.UAddr}
}
// String pretty prints the address
@@ -410,9 +379,9 @@ func RandomAddr() *BzzAddr {
}
// NewNodeIDFromAddr transforms the underlay address to an adapters.NodeID
-func NewNodeIDFromAddr(addr Addr) discover.NodeID {
- log.Info(fmt.Sprintf("uaddr=%s", string(addr.Under())))
- node := discover.MustParseNode(string(addr.Under()))
+func NewNodeIDFromAddr(addr *BzzAddr) discover.NodeID {
+ log.Info(fmt.Sprintf("uaddr=%s", string(addr.UAddr)))
+ node := discover.MustParseNode(string(addr.UAddr))
return node.ID
}
diff --git a/swarm/network/simulation/simulation.go b/swarm/network/simulation/simulation.go
index 74f9d98ee..096f7322c 100644
--- a/swarm/network/simulation/simulation.go
+++ b/swarm/network/simulation/simulation.go
@@ -94,7 +94,7 @@ func New(services map[string]ServiceFunc) (s *Simulation) {
}
s.Net = simulations.NewNetwork(
- adapters.NewSimAdapter(adapterServices),
+ adapters.NewTCPAdapter(adapterServices),
&simulations.NetworkConfig{ID: "0"},
)
@@ -164,17 +164,6 @@ var maxParallelCleanups = 10
func (s *Simulation) Close() {
close(s.done)
- // Close all connections before calling the Network Shutdown.
- // It is possible that p2p.Server.Stop will block if there are
- // existing connections.
- for _, c := range s.Net.Conns {
- if c.Up {
- s.Net.Disconnect(c.One, c.Other)
- }
- }
- s.shutdownWG.Wait()
- s.Net.Shutdown()
-
sem := make(chan struct{}, maxParallelCleanups)
s.mu.RLock()
cleanupFuncs := make([]func(), len(s.cleanupFuncs))
@@ -206,6 +195,9 @@ func (s *Simulation) Close() {
}
close(s.runC)
}
+
+ s.shutdownWG.Wait()
+ s.Net.Shutdown()
}
// Done returns a channel that is closed when the simulation
diff --git a/swarm/network/simulations/discovery/discovery_test.go b/swarm/network/simulations/discovery/discovery_test.go
index acf3479e5..913d6d837 100644
--- a/swarm/network/simulations/discovery/discovery_test.go
+++ b/swarm/network/simulations/discovery/discovery_test.go
@@ -556,8 +556,8 @@ func newService(ctx *adapters.ServiceContext) (node.Service, error) {
kp.MinProxBinSize = testMinProxBinSize
if ctx.Config.Reachable != nil {
- kp.Reachable = func(o network.OverlayAddr) bool {
- return ctx.Config.Reachable(o.(*network.BzzAddr).ID())
+ kp.Reachable = func(o *network.BzzAddr) bool {
+ return ctx.Config.Reachable(o.ID())
}
}
kad := network.NewKademlia(addr.Over(), kp)
diff --git a/swarm/network/stream/common_test.go b/swarm/network/stream/common_test.go
index 491dc9fd5..e0d776e34 100644
--- a/swarm/network/stream/common_test.go
+++ b/swarm/network/stream/common_test.go
@@ -107,9 +107,14 @@ func newStreamerTester(t *testing.T) (*p2ptest.ProtocolTester, *Registry, *stora
return nil, nil, nil, removeDataDir, err
}
- db := storage.NewDBAPI(localStore)
- delivery := NewDelivery(to, db)
- streamer := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), nil)
+ netStore, err := storage.NewNetStore(localStore, nil)
+ if err != nil {
+ return nil, nil, nil, removeDataDir, err
+ }
+
+ delivery := NewDelivery(to, netStore)
+ netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
+ streamer := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), nil)
teardown := func() {
streamer.Close()
removeDataDir()
@@ -150,14 +155,14 @@ func newRoundRobinStore(stores ...storage.ChunkStore) *roundRobinStore {
}
}
-func (rrs *roundRobinStore) Get(ctx context.Context, addr storage.Address) (*storage.Chunk, error) {
+func (rrs *roundRobinStore) Get(ctx context.Context, addr storage.Address) (storage.Chunk, error) {
return nil, errors.New("get not well defined on round robin store")
}
-func (rrs *roundRobinStore) Put(ctx context.Context, chunk *storage.Chunk) {
+func (rrs *roundRobinStore) Put(ctx context.Context, chunk storage.Chunk) error {
i := atomic.AddUint32(&rrs.index, 1)
idx := int(i) % len(rrs.stores)
- rrs.stores[idx].Put(ctx, chunk)
+ return rrs.stores[idx].Put(ctx, chunk)
}
func (rrs *roundRobinStore) Close() {
diff --git a/swarm/network/stream/delivery.go b/swarm/network/stream/delivery.go
index 36040339d..d0f27eebc 100644
--- a/swarm/network/stream/delivery.go
+++ b/swarm/network/stream/delivery.go
@@ -19,12 +19,11 @@ package stream
import (
"context"
"errors"
- "time"
- "github.com/ethereum/go-ethereum/common"
+ "fmt"
+
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/p2p/discover"
- cp "github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/log"
"github.com/ethereum/go-ethereum/swarm/network"
"github.com/ethereum/go-ethereum/swarm/spancontext"
@@ -46,39 +45,34 @@ var (
)
type Delivery struct {
- db *storage.DBAPI
- overlay network.Overlay
- receiveC chan *ChunkDeliveryMsg
- getPeer func(discover.NodeID) *Peer
+ chunkStore storage.SyncChunkStore
+ kad *network.Kademlia
+ getPeer func(discover.NodeID) *Peer
}
-func NewDelivery(overlay network.Overlay, db *storage.DBAPI) *Delivery {
- d := &Delivery{
- db: db,
- overlay: overlay,
- receiveC: make(chan *ChunkDeliveryMsg, deliveryCap),
+func NewDelivery(kad *network.Kademlia, chunkStore storage.SyncChunkStore) *Delivery {
+ return &Delivery{
+ chunkStore: chunkStore,
+ kad: kad,
}
-
- go d.processReceivedChunks()
- return d
}
// SwarmChunkServer implements Server
type SwarmChunkServer struct {
deliveryC chan []byte
batchC chan []byte
- db *storage.DBAPI
+ chunkStore storage.ChunkStore
currentLen uint64
quit chan struct{}
}
// NewSwarmChunkServer is SwarmChunkServer constructor
-func NewSwarmChunkServer(db *storage.DBAPI) *SwarmChunkServer {
+func NewSwarmChunkServer(chunkStore storage.ChunkStore) *SwarmChunkServer {
s := &SwarmChunkServer{
- deliveryC: make(chan []byte, deliveryCap),
- batchC: make(chan []byte),
- db: db,
- quit: make(chan struct{}),
+ deliveryC: make(chan []byte, deliveryCap),
+ batchC: make(chan []byte),
+ chunkStore: chunkStore,
+ quit: make(chan struct{}),
}
go s.processDeliveries()
return s
@@ -123,13 +117,11 @@ func (s *SwarmChunkServer) Close() {
// GetData retrives chunk data from db store
func (s *SwarmChunkServer) GetData(ctx context.Context, key []byte) ([]byte, error) {
- chunk, err := s.db.Get(ctx, storage.Address(key))
- if err == storage.ErrFetching {
- <-chunk.ReqC
- } else if err != nil {
+ chunk, err := s.chunkStore.Get(ctx, storage.Address(key))
+ if err != nil {
return nil, err
}
- return chunk.SData, nil
+ return chunk.Data(), nil
}
// RetrieveRequestMsg is the protocol msg for chunk retrieve requests
@@ -153,57 +145,39 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req *
return err
}
streamer := s.Server.(*SwarmChunkServer)
- chunk, created := d.db.GetOrCreateRequest(ctx, req.Addr)
- if chunk.ReqC != nil {
- if created {
- if err := d.RequestFromPeers(ctx, chunk.Addr[:], true, sp.ID()); err != nil {
- log.Warn("unable to forward chunk request", "peer", sp.ID(), "key", chunk.Addr, "err", err)
- chunk.SetErrored(storage.ErrChunkForward)
- return nil
- }
+
+ var cancel func()
+ // TODO: do something with this hardcoded timeout, maybe use TTL in the future
+ ctx, cancel = context.WithTimeout(context.WithValue(ctx, "peer", sp.ID().String()), network.RequestTimeout)
+
+ go func() {
+ select {
+ case <-ctx.Done():
+ case <-streamer.quit:
}
- go func() {
- var osp opentracing.Span
- ctx, osp = spancontext.StartSpan(
- ctx,
- "waiting.delivery")
- defer osp.Finish()
-
- t := time.NewTimer(10 * time.Minute)
- defer t.Stop()
-
- log.Debug("waiting delivery", "peer", sp.ID(), "hash", req.Addr, "node", common.Bytes2Hex(d.overlay.BaseAddr()), "created", created)
- start := time.Now()
- select {
- case <-chunk.ReqC:
- log.Debug("retrieve request ReqC closed", "peer", sp.ID(), "hash", req.Addr, "time", time.Since(start))
- case <-t.C:
- log.Debug("retrieve request timeout", "peer", sp.ID(), "hash", req.Addr)
- chunk.SetErrored(storage.ErrChunkTimeout)
- return
- }
- chunk.SetErrored(nil)
-
- if req.SkipCheck {
- err := sp.Deliver(ctx, chunk, s.priority)
- if err != nil {
- log.Warn("ERROR in handleRetrieveRequestMsg, DROPPING peer!", "err", err)
- sp.Drop(err)
- }
+ cancel()
+ }()
+
+ go func() {
+ chunk, err := d.chunkStore.Get(ctx, req.Addr)
+ if err != nil {
+ log.Warn("ChunkStore.Get can not retrieve chunk", "err", err)
+ return
+ }
+ if req.SkipCheck {
+ err = sp.Deliver(ctx, chunk, s.priority)
+ if err != nil {
+ log.Warn("ERROR in handleRetrieveRequestMsg", "err", err)
}
- streamer.deliveryC <- chunk.Addr[:]
- }()
- return nil
- }
- // TODO: call the retrieve function of the outgoing syncer
- if req.SkipCheck {
- log.Trace("deliver", "peer", sp.ID(), "hash", chunk.Addr)
- if length := len(chunk.SData); length < 9 {
- log.Error("Chunk.SData to deliver is too short", "len(chunk.SData)", length, "address", chunk.Addr)
+ return
}
- return sp.Deliver(ctx, chunk, s.priority)
- }
- streamer.deliveryC <- chunk.Addr[:]
+ select {
+ case streamer.deliveryC <- chunk.Address()[:]:
+ case <-streamer.quit:
+ }
+
+ }()
+
return nil
}
@@ -213,6 +187,7 @@ type ChunkDeliveryMsg struct {
peer *Peer // set in handleChunkDeliveryMsg
}
+// TODO: Fix context SNAFU
func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req *ChunkDeliveryMsg) error {
var osp opentracing.Span
ctx, osp = spancontext.StartSpan(
@@ -220,81 +195,63 @@ func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req *Ch
"chunk.delivery")
defer osp.Finish()
- req.peer = sp
- d.receiveC <- req
- return nil
-}
+ processReceivedChunksCount.Inc(1)
-func (d *Delivery) processReceivedChunks() {
-R:
- for req := range d.receiveC {
- processReceivedChunksCount.Inc(1)
-
- if len(req.SData) > cp.DefaultSize+8 {
- log.Warn("received chunk is bigger than expected", "len", len(req.SData))
- continue R
- }
-
- // this should be has locally
- chunk, err := d.db.Get(context.TODO(), req.Addr)
- if err == nil {
- continue R
- }
- if err != storage.ErrFetching {
- log.Error("processReceivedChunks db error", "addr", req.Addr, "err", err, "chunk", chunk)
- continue R
- }
- select {
- case <-chunk.ReqC:
- log.Error("someone else delivered?", "hash", chunk.Addr.Hex())
- continue R
- default:
- }
-
- chunk.SData = req.SData
- d.db.Put(context.TODO(), chunk)
-
- go func(req *ChunkDeliveryMsg) {
- err := chunk.WaitToStore()
+ go func() {
+ req.peer = sp
+ err := d.chunkStore.Put(ctx, storage.NewChunk(req.Addr, req.SData))
+ if err != nil {
if err == storage.ErrChunkInvalid {
+ // we removed this log because it spams the logs
+ // TODO: Enable this log line
+ // log.Warn("invalid chunk delivered", "peer", sp.ID(), "chunk", req.Addr, )
req.peer.Drop(err)
}
- }(req)
- }
+ }
+ }()
+ return nil
}
// RequestFromPeers sends a chunk retrieve request to
-func (d *Delivery) RequestFromPeers(ctx context.Context, hash []byte, skipCheck bool, peersToSkip ...discover.NodeID) error {
- var success bool
- var err error
+func (d *Delivery) RequestFromPeers(ctx context.Context, req *network.Request) (*discover.NodeID, chan struct{}, error) {
requestFromPeersCount.Inc(1)
+ var sp *Peer
+ spID := req.Source
- d.overlay.EachConn(hash, 255, func(p network.OverlayConn, po int, nn bool) bool {
- spId := p.(network.Peer).ID()
- for _, p := range peersToSkip {
- if p == spId {
- log.Trace("Delivery.RequestFromPeers: skip peer", "peer", spId)
+ if spID != nil {
+ sp = d.getPeer(*spID)
+ if sp == nil {
+ return nil, nil, fmt.Errorf("source peer %v not found", spID.String())
+ }
+ } else {
+ d.kad.EachConn(req.Addr[:], 255, func(p *network.Peer, po int, nn bool) bool {
+ id := p.ID()
+ // TODO: skip light nodes that do not accept retrieve requests
+ if req.SkipPeer(id.String()) {
+ log.Trace("Delivery.RequestFromPeers: skip peer", "peer id", id)
return true
}
- }
- sp := d.getPeer(spId)
+ sp = d.getPeer(id)
+ if sp == nil {
+ log.Warn("Delivery.RequestFromPeers: peer not found", "id", id)
+ return true
+ }
+ spID = &id
+ return false
+ })
if sp == nil {
- log.Warn("Delivery.RequestFromPeers: peer not found", "id", spId)
- return true
+ return nil, nil, errors.New("no peer found")
}
- err = sp.SendPriority(ctx, &RetrieveRequestMsg{
- Addr: hash,
- SkipCheck: skipCheck,
- }, Top)
- if err != nil {
- return true
- }
- requestFromPeersEachCount.Inc(1)
- success = true
- return false
- })
- if success {
- return nil
}
- return errors.New("no peer found")
+
+ err := sp.SendPriority(ctx, &RetrieveRequestMsg{
+ Addr: req.Addr,
+ SkipCheck: req.SkipCheck,
+ }, Top)
+ if err != nil {
+ return nil, nil, err
+ }
+ requestFromPeersEachCount.Inc(1)
+
+ return spID, sp.quit, nil
}
diff --git a/swarm/network/stream/delivery_test.go b/swarm/network/stream/delivery_test.go
index 972cc859a..ece54d4ee 100644
--- a/swarm/network/stream/delivery_test.go
+++ b/swarm/network/stream/delivery_test.go
@@ -47,7 +47,13 @@ func TestStreamerRetrieveRequest(t *testing.T) {
peerID := tester.IDs[0]
- streamer.delivery.RequestFromPeers(context.TODO(), hash0[:], true)
+ ctx := context.Background()
+ req := network.NewRequest(
+ storage.Address(hash0[:]),
+ true,
+ &sync.Map{},
+ )
+ streamer.delivery.RequestFromPeers(ctx, req)
err = tester.TestExchanges(p2ptest.Exchange{
Label: "RetrieveRequestMsg",
@@ -93,7 +99,7 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) {
{
Code: 5,
Msg: &RetrieveRequestMsg{
- Addr: chunk.Addr[:],
+ Addr: chunk.Address()[:],
},
Peer: peerID,
},
@@ -139,10 +145,11 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) {
})
hash := storage.Address(hash0[:])
- chunk := storage.NewChunk(hash, nil)
- chunk.SData = hash
- localStore.Put(context.TODO(), chunk)
- chunk.WaitToStore()
+ chunk := storage.NewChunk(hash, hash)
+ err = localStore.Put(context.TODO(), chunk)
+ if err != nil {
+ t.Fatalf("Expected no err got %v", err)
+ }
err = tester.TestExchanges(p2ptest.Exchange{
Label: "RetrieveRequestMsg",
@@ -178,10 +185,11 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) {
}
hash = storage.Address(hash1[:])
- chunk = storage.NewChunk(hash, nil)
- chunk.SData = hash1[:]
- localStore.Put(context.TODO(), chunk)
- chunk.WaitToStore()
+ chunk = storage.NewChunk(hash, hash1[:])
+ err = localStore.Put(context.TODO(), chunk)
+ if err != nil {
+ t.Fatalf("Expected no err got %v", err)
+ }
err = tester.TestExchanges(p2ptest.Exchange{
Label: "RetrieveRequestMsg",
@@ -235,16 +243,6 @@ func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) {
chunkKey := hash0[:]
chunkData := hash1[:]
- chunk, created := localStore.GetOrCreateRequest(context.TODO(), chunkKey)
-
- if !created {
- t.Fatal("chunk already exists")
- }
- select {
- case <-chunk.ReqC:
- t.Fatal("chunk is already received")
- default:
- }
err = tester.TestExchanges(p2ptest.Exchange{
Label: "Subscribe message",
@@ -261,7 +259,7 @@ func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) {
},
},
p2ptest.Exchange{
- Label: "ChunkDeliveryRequest message",
+ Label: "ChunkDelivery message",
Triggers: []p2ptest.Trigger{
{
Code: 6,
@@ -277,21 +275,26 @@ func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) {
if err != nil {
t.Fatalf("Expected no error, got %v", err)
}
-
- timeout := time.NewTimer(1 * time.Second)
-
- select {
- case <-timeout.C:
- t.Fatal("timeout receiving chunk")
- case <-chunk.ReqC:
+ ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
+ defer cancel()
+
+ // wait for the chunk to get stored
+ storedChunk, err := localStore.Get(ctx, chunkKey)
+ for err != nil {
+ select {
+ case <-ctx.Done():
+ t.Fatalf("Chunk is not in localstore after timeout, err: %v", err)
+ default:
+ }
+ storedChunk, err = localStore.Get(ctx, chunkKey)
+ time.Sleep(50 * time.Millisecond)
}
- storedChunk, err := localStore.Get(context.TODO(), chunkKey)
if err != nil {
t.Fatalf("Expected no error, got %v", err)
}
- if !bytes.Equal(storedChunk.SData, chunkData) {
+ if !bytes.Equal(storedChunk.Data(), chunkData) {
t.Fatal("Retrieved chunk has different data than original")
}
@@ -324,19 +327,20 @@ func testDeliveryFromNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck
store.Close()
}
localStore := store.(*storage.LocalStore)
- db := storage.NewDBAPI(localStore)
+ netStore, err := storage.NewNetStore(localStore, nil)
+ if err != nil {
+ return nil, nil, err
+ }
+
kad := network.NewKademlia(addr.Over(), network.NewKadParams())
- delivery := NewDelivery(kad, db)
+ delivery := NewDelivery(kad, netStore)
+ netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
- r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{
+ r := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
SkipCheck: skipCheck,
})
bucket.Store(bucketKeyRegistry, r)
- retrieveFunc := func(ctx context.Context, chunk *storage.Chunk) error {
- return delivery.RequestFromPeers(ctx, chunk.Addr[:], skipCheck)
- }
- netStore := storage.NewNetStore(localStore, retrieveFunc)
fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
bucket.Store(bucketKeyFileStore, fileStore)
@@ -498,7 +502,6 @@ func BenchmarkDeliveryFromNodesWithCheck(b *testing.B) {
func benchmarkDeliveryFromNodes(b *testing.B, nodes, conns, chunkCount int, skipCheck bool) {
sim := simulation.New(map[string]simulation.ServiceFunc{
"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
-
id := ctx.Config.ID
addr := network.NewAddrFromNodeID(id)
store, datadir, err := createTestLocalStorageForID(id, addr)
@@ -511,20 +514,20 @@ func benchmarkDeliveryFromNodes(b *testing.B, nodes, conns, chunkCount int, skip
store.Close()
}
localStore := store.(*storage.LocalStore)
- db := storage.NewDBAPI(localStore)
+ netStore, err := storage.NewNetStore(localStore, nil)
+ if err != nil {
+ return nil, nil, err
+ }
kad := network.NewKademlia(addr.Over(), network.NewKadParams())
- delivery := NewDelivery(kad, db)
+ delivery := NewDelivery(kad, netStore)
+ netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
- r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{
+ r := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
SkipCheck: skipCheck,
DoSync: true,
SyncUpdateDelay: 0,
})
- retrieveFunc := func(ctx context.Context, chunk *storage.Chunk) error {
- return delivery.RequestFromPeers(ctx, chunk.Addr[:], skipCheck)
- }
- netStore := storage.NewNetStore(localStore, retrieveFunc)
fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
bucket.Store(bucketKeyFileStore, fileStore)
diff --git a/swarm/network/stream/intervals_test.go b/swarm/network/stream/intervals_test.go
index f4294134b..452aaca76 100644
--- a/swarm/network/stream/intervals_test.go
+++ b/swarm/network/stream/intervals_test.go
@@ -38,13 +38,18 @@ import (
"github.com/ethereum/go-ethereum/swarm/storage"
)
-func TestIntervals(t *testing.T) {
+func TestIntervalsLive(t *testing.T) {
testIntervals(t, true, nil, false)
- testIntervals(t, false, NewRange(9, 26), false)
- testIntervals(t, true, NewRange(9, 26), false)
-
testIntervals(t, true, nil, true)
+}
+
+func TestIntervalsHistory(t *testing.T) {
+ testIntervals(t, false, NewRange(9, 26), false)
testIntervals(t, false, NewRange(9, 26), true)
+}
+
+func TestIntervalsLiveAndHistory(t *testing.T) {
+ testIntervals(t, true, NewRange(9, 26), false)
testIntervals(t, true, NewRange(9, 26), true)
}
@@ -70,17 +75,21 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
os.RemoveAll(datadir)
}
localStore := store.(*storage.LocalStore)
- db := storage.NewDBAPI(localStore)
+ netStore, err := storage.NewNetStore(localStore, nil)
+ if err != nil {
+ return nil, nil, err
+ }
kad := network.NewKademlia(addr.Over(), network.NewKadParams())
- delivery := NewDelivery(kad, db)
+ delivery := NewDelivery(kad, netStore)
+ netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
- r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{
+ r := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
SkipCheck: skipCheck,
})
bucket.Store(bucketKeyRegistry, r)
r.RegisterClientFunc(externalStreamName, func(p *Peer, t string, live bool) (Client, error) {
- return newTestExternalClient(db), nil
+ return newTestExternalClient(netStore), nil
})
r.RegisterServerFunc(externalStreamName, func(p *Peer, t string, live bool) (Server, error) {
return newTestExternalServer(t, externalStreamSessionAt, externalStreamMaxKeys, nil), nil
@@ -101,9 +110,13 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
t.Fatal(err)
}
- ctx, cancel := context.WithTimeout(context.Background(), 100*time.Second)
+ ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
+ if _, err := sim.WaitTillHealthy(ctx, 2); err != nil {
+ t.Fatal(err)
+ }
+
result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
nodeIDs := sim.UpNodeIDs()
storer := nodeIDs[0]
@@ -136,11 +149,6 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
liveErrC := make(chan error)
historyErrC := make(chan error)
- if _, err := sim.WaitTillHealthy(ctx, 2); err != nil {
- log.Error("WaitKademlia error: %v", "err", err)
- return err
- }
-
log.Debug("Watching for disconnections")
disconnections := sim.PeerEvents(
context.Background(),
@@ -148,6 +156,11 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeDrop),
)
+ err = registry.Subscribe(storer, NewStream(externalStreamName, "", live), history, Top)
+ if err != nil {
+ return err
+ }
+
go func() {
for d := range disconnections {
if d.Error != nil {
@@ -172,7 +185,7 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
var liveHashesChan chan []byte
liveHashesChan, err = getHashes(ctx, registry, storer, NewStream(externalStreamName, "", true))
if err != nil {
- log.Error("Subscription error: %v", "err", err)
+ log.Error("get hashes", "err", err)
return
}
i := externalStreamSessionAt
@@ -216,6 +229,7 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
var historyHashesChan chan []byte
historyHashesChan, err = getHashes(ctx, registry, storer, NewStream(externalStreamName, "", false))
if err != nil {
+ log.Error("get hashes", "err", err)
return
}
@@ -252,10 +266,6 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
}
}()
- err = registry.Subscribe(storer, NewStream(externalStreamName, "", live), history, Top)
- if err != nil {
- return err
- }
if err := <-liveErrC; err != nil {
return err
}
@@ -302,34 +312,32 @@ func enableNotifications(r *Registry, peerID discover.NodeID, s Stream) error {
type testExternalClient struct {
hashes chan []byte
- db *storage.DBAPI
+ store storage.SyncChunkStore
enableNotificationsC chan struct{}
}
-func newTestExternalClient(db *storage.DBAPI) *testExternalClient {
+func newTestExternalClient(store storage.SyncChunkStore) *testExternalClient {
return &testExternalClient{
hashes: make(chan []byte),
- db: db,
+ store: store,
enableNotificationsC: make(chan struct{}),
}
}
-func (c *testExternalClient) NeedData(ctx context.Context, hash []byte) func() {
- chunk, _ := c.db.GetOrCreateRequest(ctx, hash)
- if chunk.ReqC == nil {
+func (c *testExternalClient) NeedData(ctx context.Context, hash []byte) func(context.Context) error {
+ wait := c.store.FetchFunc(ctx, storage.Address(hash))
+ if wait == nil {
return nil
}
- c.hashes <- hash
- //NOTE: This was failing on go1.9.x with a deadlock.
- //Sometimes this function would just block
- //It is commented now, but it may be well worth after the chunk refactor
- //to re-enable this and see if the problem has been addressed
- /*
- return func() {
- return chunk.WaitToStore()
+ select {
+ case c.hashes <- hash:
+ case <-ctx.Done():
+ log.Warn("testExternalClient NeedData context", "err", ctx.Err())
+ return func(_ context.Context) error {
+ return ctx.Err()
}
- */
- return nil
+ }
+ return wait
}
func (c *testExternalClient) BatchDone(Stream, uint64, []byte, []byte) func() (*TakeoverProof, error) {
diff --git a/swarm/network/stream/messages.go b/swarm/network/stream/messages.go
index a19f63589..2e1a81e82 100644
--- a/swarm/network/stream/messages.go
+++ b/swarm/network/stream/messages.go
@@ -18,9 +18,7 @@ package stream
import (
"context"
- "errors"
"fmt"
- "sync"
"time"
"github.com/ethereum/go-ethereum/metrics"
@@ -31,6 +29,8 @@ import (
opentracing "github.com/opentracing/opentracing-go"
)
+var syncBatchTimeout = 30 * time.Second
+
// Stream defines a unique stream identifier.
type Stream struct {
// Name is used for Client and Server functions identification.
@@ -117,8 +117,7 @@ func (p *Peer) handleSubscribeMsg(ctx context.Context, req *SubscribeMsg) (err e
go func() {
if err := p.SendOfferedHashes(os, from, to); err != nil {
- log.Warn("SendOfferedHashes dropping peer", "err", err)
- p.Drop(err)
+ log.Warn("SendOfferedHashes error", "peer", p.ID().TerminalString(), "err", err)
}
}()
@@ -135,8 +134,7 @@ func (p *Peer) handleSubscribeMsg(ctx context.Context, req *SubscribeMsg) (err e
}
go func() {
if err := p.SendOfferedHashes(os, req.History.From, req.History.To); err != nil {
- log.Warn("SendOfferedHashes dropping peer", "err", err)
- p.Drop(err)
+ log.Warn("SendOfferedHashes error", "peer", p.ID().TerminalString(), "err", err)
}
}()
}
@@ -202,38 +200,52 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg
if err != nil {
return fmt.Errorf("error initiaising bitvector of length %v: %v", len(hashes)/HashSize, err)
}
- wg := sync.WaitGroup{}
+
+ ctr := 0
+ errC := make(chan error)
+ ctx, cancel := context.WithTimeout(ctx, syncBatchTimeout)
+
+ ctx = context.WithValue(ctx, "source", p.ID().String())
for i := 0; i < len(hashes); i += HashSize {
hash := hashes[i : i+HashSize]
if wait := c.NeedData(ctx, hash); wait != nil {
+ ctr++
want.Set(i/HashSize, true)
- wg.Add(1)
// create request and wait until the chunk data arrives and is stored
- go func(w func()) {
- w()
- wg.Done()
+ go func(w func(context.Context) error) {
+ select {
+ case errC <- w(ctx):
+ case <-ctx.Done():
+ }
}(wait)
}
}
- // done := make(chan bool)
- // go func() {
- // wg.Wait()
- // close(done)
- // }()
- // go func() {
- // select {
- // case <-done:
- // s.next <- s.batchDone(p, req, hashes)
- // case <-time.After(1 * time.Second):
- // p.Drop(errors.New("timeout waiting for batch to be delivered"))
- // }
- // }()
+
go func() {
- wg.Wait()
+ defer cancel()
+ for i := 0; i < ctr; i++ {
+ select {
+ case err := <-errC:
+ if err != nil {
+ log.Debug("client.handleOfferedHashesMsg() error waiting for chunk, dropping peer", "peer", p.ID(), "err", err)
+ p.Drop(err)
+ return
+ }
+ case <-ctx.Done():
+ log.Debug("client.handleOfferedHashesMsg() context done", "ctx.Err()", ctx.Err())
+ return
+ case <-c.quit:
+ log.Debug("client.handleOfferedHashesMsg() quit")
+ return
+ }
+ }
select {
case c.next <- c.batchDone(p, req, hashes):
case <-c.quit:
+ log.Debug("client.handleOfferedHashesMsg() quit")
+ case <-ctx.Done():
+ log.Debug("client.handleOfferedHashesMsg() context done", "ctx.Err()", ctx.Err())
}
}()
// only send wantedKeysMsg if all missing chunks of the previous batch arrived
@@ -242,7 +254,7 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg
c.sessionAt = req.From
}
from, to := c.nextBatch(req.To + 1)
- log.Trace("received offered batch", "peer", p.ID(), "stream", req.Stream, "from", req.From, "to", req.To)
+ log.Trace("set next batch", "peer", p.ID(), "stream", req.Stream, "from", req.From, "to", req.To, "addr", p.streamer.addr.ID())
if from == to {
return nil
}
@@ -254,25 +266,25 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg
To: to,
}
go func() {
+ log.Trace("sending want batch", "peer", p.ID(), "stream", msg.Stream, "from", msg.From, "to", msg.To)
select {
- case <-time.After(120 * time.Second):
- log.Warn("handleOfferedHashesMsg timeout, so dropping peer")
- p.Drop(errors.New("handle offered hashes timeout"))
- return
case err := <-c.next:
if err != nil {
- log.Warn("c.next dropping peer", "err", err)
+ log.Warn("c.next error dropping peer", "err", err)
p.Drop(err)
return
}
case <-c.quit:
+ log.Debug("client.handleOfferedHashesMsg() quit")
+ return
+ case <-ctx.Done():
+ log.Debug("client.handleOfferedHashesMsg() context done", "ctx.Err()", ctx.Err())
return
}
log.Trace("sending want batch", "peer", p.ID(), "stream", msg.Stream, "from", msg.From, "to", msg.To)
err := p.SendPriority(ctx, msg, c.priority)
if err != nil {
- log.Warn("SendPriority err, so dropping peer", "err", err)
- p.Drop(err)
+ log.Warn("SendPriority error", "err", err)
}
}()
return nil
@@ -306,8 +318,7 @@ func (p *Peer) handleWantedHashesMsg(ctx context.Context, req *WantedHashesMsg)
// launch in go routine since GetBatch blocks until new hashes arrive
go func() {
if err := p.SendOfferedHashes(s, req.From, req.To); err != nil {
- log.Warn("SendOfferedHashes dropping peer", "err", err)
- p.Drop(err)
+ log.Warn("SendOfferedHashes error", "err", err)
}
}()
// go p.SendOfferedHashes(s, req.From, req.To)
@@ -327,11 +338,7 @@ func (p *Peer) handleWantedHashesMsg(ctx context.Context, req *WantedHashesMsg)
if err != nil {
return fmt.Errorf("handleWantedHashesMsg get data %x: %v", hash, err)
}
- chunk := storage.NewChunk(hash, nil)
- chunk.SData = data
- if length := len(chunk.SData); length < 9 {
- log.Error("Chunk.SData to sync is too short", "len(chunk.SData)", length, "address", chunk.Addr)
- }
+ chunk := storage.NewChunk(hash, data)
if err := p.Deliver(ctx, chunk, s.priority); err != nil {
return err
}
diff --git a/swarm/network/stream/peer.go b/swarm/network/stream/peer.go
index 80b9ab711..1466a7a9c 100644
--- a/swarm/network/stream/peer.go
+++ b/swarm/network/stream/peer.go
@@ -33,8 +33,6 @@ import (
opentracing "github.com/opentracing/opentracing-go"
)
-var sendTimeout = 30 * time.Second
-
type notFoundError struct {
t string
s Stream
@@ -83,8 +81,40 @@ func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer {
ctx, cancel := context.WithCancel(context.Background())
go p.pq.Run(ctx, func(i interface{}) {
wmsg := i.(WrappedPriorityMsg)
- p.Send(wmsg.Context, wmsg.Msg)
+ err := p.Send(wmsg.Context, wmsg.Msg)
+ if err != nil {
+ log.Error("Message send error, dropping peer", "peer", p.ID(), "err", err)
+ p.Drop(err)
+ }
})
+
+ // basic monitoring for pq contention
+ go func(pq *pq.PriorityQueue) {
+ ticker := time.NewTicker(5 * time.Second)
+ defer ticker.Stop()
+ for {
+ select {
+ case <-ticker.C:
+ var len_maxi int
+ var cap_maxi int
+ for k := range pq.Queues {
+ if len_maxi < len(pq.Queues[k]) {
+ len_maxi = len(pq.Queues[k])
+ }
+
+ if cap_maxi < cap(pq.Queues[k]) {
+ cap_maxi = cap(pq.Queues[k])
+ }
+ }
+
+ metrics.GetOrRegisterGauge(fmt.Sprintf("pq_len_%s", p.ID().TerminalString()), nil).Update(int64(len_maxi))
+ metrics.GetOrRegisterGauge(fmt.Sprintf("pq_cap_%s", p.ID().TerminalString()), nil).Update(int64(cap_maxi))
+ case <-p.quit:
+ return
+ }
+ }
+ }(p.pq)
+
go func() {
<-p.quit
cancel()
@@ -93,7 +123,7 @@ func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer {
}
// Deliver sends a storeRequestMsg protocol message to the peer
-func (p *Peer) Deliver(ctx context.Context, chunk *storage.Chunk, priority uint8) error {
+func (p *Peer) Deliver(ctx context.Context, chunk storage.Chunk, priority uint8) error {
var sp opentracing.Span
ctx, sp = spancontext.StartSpan(
ctx,
@@ -101,8 +131,8 @@ func (p *Peer) Deliver(ctx context.Context, chunk *storage.Chunk, priority uint8
defer sp.Finish()
msg := &ChunkDeliveryMsg{
- Addr: chunk.Addr,
- SData: chunk.SData,
+ Addr: chunk.Address(),
+ SData: chunk.Data(),
}
return p.SendPriority(ctx, msg, priority)
}
@@ -111,13 +141,16 @@ func (p *Peer) Deliver(ctx context.Context, chunk *storage.Chunk, priority uint8
func (p *Peer) SendPriority(ctx context.Context, msg interface{}, priority uint8) error {
defer metrics.GetOrRegisterResettingTimer(fmt.Sprintf("peer.sendpriority_t.%d", priority), nil).UpdateSince(time.Now())
metrics.GetOrRegisterCounter(fmt.Sprintf("peer.sendpriority.%d", priority), nil).Inc(1)
- cctx, cancel := context.WithTimeout(context.Background(), sendTimeout)
- defer cancel()
wmsg := WrappedPriorityMsg{
Context: ctx,
Msg: msg,
}
- return p.pq.Push(cctx, wmsg, int(priority))
+ err := p.pq.Push(wmsg, int(priority))
+ if err == pq.ErrContention {
+ log.Warn("dropping peer on priority queue contention", "peer", p.ID())
+ p.Drop(err)
+ }
+ return err
}
// SendOfferedHashes sends OfferedHashesMsg protocol msg
diff --git a/swarm/network/stream/snapshot_retrieval_test.go b/swarm/network/stream/snapshot_retrieval_test.go
index 4ff947b21..19eaad34e 100644
--- a/swarm/network/stream/snapshot_retrieval_test.go
+++ b/swarm/network/stream/snapshot_retrieval_test.go
@@ -124,23 +124,30 @@ func runFileRetrievalTest(nodeCount int) error {
return nil, nil, err
}
bucket.Store(bucketKeyStore, store)
- cleanup = func() {
- os.RemoveAll(datadir)
- store.Close()
- }
+
localStore := store.(*storage.LocalStore)
- db := storage.NewDBAPI(localStore)
+ netStore, err := storage.NewNetStore(localStore, nil)
+ if err != nil {
+ return nil, nil, err
+ }
kad := network.NewKademlia(addr.Over(), network.NewKadParams())
- delivery := NewDelivery(kad, db)
+ delivery := NewDelivery(kad, netStore)
+ netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
- r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{
+ r := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
DoSync: true,
SyncUpdateDelay: 3 * time.Second,
})
- fileStore := storage.NewFileStore(storage.NewNetStore(localStore, nil), storage.NewFileStoreParams())
+ fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
bucket.Store(bucketKeyFileStore, fileStore)
+ cleanup = func() {
+ os.RemoveAll(datadir)
+ netStore.Close()
+ r.Close()
+ }
+
return r, cleanup, nil
},
@@ -267,24 +274,31 @@ func runRetrievalTest(chunkCount int, nodeCount int) error {
return nil, nil, err
}
bucket.Store(bucketKeyStore, store)
- cleanup = func() {
- os.RemoveAll(datadir)
- store.Close()
- }
+
localStore := store.(*storage.LocalStore)
- db := storage.NewDBAPI(localStore)
+ netStore, err := storage.NewNetStore(localStore, nil)
+ if err != nil {
+ return nil, nil, err
+ }
kad := network.NewKademlia(addr.Over(), network.NewKadParams())
- delivery := NewDelivery(kad, db)
+ delivery := NewDelivery(kad, netStore)
+ netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
- r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{
+ r := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
DoSync: true,
SyncUpdateDelay: 0,
})
- fileStore := storage.NewFileStore(storage.NewNetStore(localStore, nil), storage.NewFileStoreParams())
+ fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
bucketKeyFileStore = simulation.BucketKey("filestore")
bucket.Store(bucketKeyFileStore, fileStore)
+ cleanup = func() {
+ os.RemoveAll(datadir)
+ netStore.Close()
+ r.Close()
+ }
+
return r, cleanup, nil
},
diff --git a/swarm/network/stream/snapshot_sync_test.go b/swarm/network/stream/snapshot_sync_test.go
index 4e1ab09fc..7cd09099c 100644
--- a/swarm/network/stream/snapshot_sync_test.go
+++ b/swarm/network/stream/snapshot_sync_test.go
@@ -21,6 +21,7 @@ import (
"fmt"
"io"
"os"
+ "runtime"
"sync"
"testing"
"time"
@@ -39,15 +40,20 @@ import (
mockdb "github.com/ethereum/go-ethereum/swarm/storage/mock/db"
)
-const testMinProxBinSize = 2
const MaxTimeout = 600
type synctestConfig struct {
- addrs [][]byte
- hashes []storage.Address
- idToChunksMap map[discover.NodeID][]int
- chunksToNodesMap map[string][]int
- addrToIDMap map[string]discover.NodeID
+ addrs [][]byte
+ hashes []storage.Address
+ idToChunksMap map[discover.NodeID][]int
+ //chunksToNodesMap map[string][]int
+ addrToIDMap map[string]discover.NodeID
+}
+
+// Tests in this file should not request chunks from peers.
+// This function will panic indicating that there is a problem if request has been made.
+func dummyRequestFromPeers(_ context.Context, req *network.Request) (*discover.NodeID, chan struct{}, error) {
+ panic(fmt.Sprintf("unexpected request: address %s, source %s", req.Addr.String(), req.Source.String()))
}
//This test is a syncing test for nodes.
@@ -58,6 +64,9 @@ type synctestConfig struct {
//they are expected to store based on the syncing protocol.
//Number of chunks and nodes can be provided via commandline too.
func TestSyncingViaGlobalSync(t *testing.T) {
+ if runtime.GOOS == "darwin" && os.Getenv("TRAVIS") == "true" {
+ t.Skip("Flaky on mac on travis")
+ }
//if nodes/chunks have been provided via commandline,
//run the tests with these values
if *nodes != 0 && *chunks != 0 {
@@ -86,11 +95,14 @@ func TestSyncingViaGlobalSync(t *testing.T) {
}
func TestSyncingViaDirectSubscribe(t *testing.T) {
+ if runtime.GOOS == "darwin" && os.Getenv("TRAVIS") == "true" {
+ t.Skip("Flaky on mac on travis")
+ }
//if nodes/chunks have been provided via commandline,
//run the tests with these values
if *nodes != 0 && *chunks != 0 {
log.Info(fmt.Sprintf("Running test with %d chunks and %d nodes...", *chunks, *nodes))
- err := testSyncingViaDirectSubscribe(*chunks, *nodes)
+ err := testSyncingViaDirectSubscribe(t, *chunks, *nodes)
if err != nil {
t.Fatal(err)
}
@@ -110,7 +122,7 @@ func TestSyncingViaDirectSubscribe(t *testing.T) {
for _, chnk := range chnkCnt {
for _, n := range nodeCnt {
log.Info(fmt.Sprintf("Long running test with %d chunks and %d nodes...", chnk, n))
- err := testSyncingViaDirectSubscribe(chnk, n)
+ err := testSyncingViaDirectSubscribe(t, chnk, n)
if err != nil {
t.Fatal(err)
}
@@ -130,21 +142,27 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) {
return nil, nil, err
}
bucket.Store(bucketKeyStore, store)
- cleanup = func() {
- os.RemoveAll(datadir)
- store.Close()
- }
localStore := store.(*storage.LocalStore)
- db := storage.NewDBAPI(localStore)
+ netStore, err := storage.NewNetStore(localStore, nil)
+ if err != nil {
+ return nil, nil, err
+ }
kad := network.NewKademlia(addr.Over(), network.NewKadParams())
- delivery := NewDelivery(kad, db)
+ delivery := NewDelivery(kad, netStore)
+ netStore.NewNetFetcherFunc = network.NewFetcherFactory(dummyRequestFromPeers, true).New
- r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{
+ r := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
DoSync: true,
SyncUpdateDelay: 3 * time.Second,
})
bucket.Store(bucketKeyRegistry, r)
+ cleanup = func() {
+ os.RemoveAll(datadir)
+ netStore.Close()
+ r.Close()
+ }
+
return r, cleanup, nil
},
@@ -166,9 +184,27 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) {
t.Fatal(err)
}
- ctx, cancelSimRun := context.WithTimeout(context.Background(), 1*time.Minute)
+ ctx, cancelSimRun := context.WithTimeout(context.Background(), 2*time.Minute)
defer cancelSimRun()
+ if _, err := sim.WaitTillHealthy(ctx, 2); err != nil {
+ t.Fatal(err)
+ }
+
+ disconnections := sim.PeerEvents(
+ context.Background(),
+ sim.NodeIDs(),
+ simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeDrop),
+ )
+
+ go func() {
+ for d := range disconnections {
+ log.Error("peer drop", "node", d.NodeID, "peer", d.Event.Peer)
+ t.Fatal("unexpected disconnect")
+ cancelSimRun()
+ }
+ }()
+
result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
nodeIDs := sim.UpNodeIDs()
for _, n := range nodeIDs {
@@ -197,10 +233,6 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) {
conf.hashes = append(conf.hashes, hashes...)
mapKeysToNodes(conf)
- if _, err := sim.WaitTillHealthy(ctx, 2); err != nil {
- return err
- }
-
// File retrieval check is repeated until all uploaded files are retrieved from all nodes
// or until the timeout is reached.
allSuccess := false
@@ -220,6 +252,7 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) {
}()
}
for !allSuccess {
+ allSuccess = true
for _, id := range nodeIDs {
//for each expected chunk, check if it is in the local store
localChunks := conf.idToChunksMap[id]
@@ -252,7 +285,10 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) {
log.Debug(fmt.Sprintf("Chunk %s IS FOUND for id %s", chunk, id))
}
}
- allSuccess = localSuccess
+ if !localSuccess {
+ allSuccess = false
+ break
+ }
}
}
if !allSuccess {
@@ -264,6 +300,7 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) {
if result.Error != nil {
t.Fatal(result.Error)
}
+ log.Info("Simulation ended")
}
/*
@@ -277,7 +314,7 @@ The test loads a snapshot file to construct the swarm network,
assuming that the snapshot file identifies a healthy
kademlia network. The snapshot should have 'streamer' in its service list.
*/
-func testSyncingViaDirectSubscribe(chunkCount int, nodeCount int) error {
+func testSyncingViaDirectSubscribe(t *testing.T, chunkCount int, nodeCount int) error {
sim := simulation.New(map[string]simulation.ServiceFunc{
"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
@@ -288,28 +325,34 @@ func testSyncingViaDirectSubscribe(chunkCount int, nodeCount int) error {
return nil, nil, err
}
bucket.Store(bucketKeyStore, store)
- cleanup = func() {
- os.RemoveAll(datadir)
- store.Close()
- }
localStore := store.(*storage.LocalStore)
- db := storage.NewDBAPI(localStore)
+ netStore, err := storage.NewNetStore(localStore, nil)
+ if err != nil {
+ return nil, nil, err
+ }
kad := network.NewKademlia(addr.Over(), network.NewKadParams())
- delivery := NewDelivery(kad, db)
+ delivery := NewDelivery(kad, netStore)
+ netStore.NewNetFetcherFunc = network.NewFetcherFactory(dummyRequestFromPeers, true).New
- r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), nil)
+ r := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), nil)
bucket.Store(bucketKeyRegistry, r)
- fileStore := storage.NewFileStore(storage.NewNetStore(localStore, nil), storage.NewFileStoreParams())
+ fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
bucket.Store(bucketKeyFileStore, fileStore)
+ cleanup = func() {
+ os.RemoveAll(datadir)
+ netStore.Close()
+ r.Close()
+ }
+
return r, cleanup, nil
},
})
defer sim.Close()
- ctx, cancelSimRun := context.WithTimeout(context.Background(), 1*time.Minute)
+ ctx, cancelSimRun := context.WithTimeout(context.Background(), 2*time.Minute)
defer cancelSimRun()
conf := &synctestConfig{}
@@ -325,6 +368,24 @@ func testSyncingViaDirectSubscribe(chunkCount int, nodeCount int) error {
return err
}
+ if _, err := sim.WaitTillHealthy(ctx, 2); err != nil {
+ return err
+ }
+
+ disconnections := sim.PeerEvents(
+ context.Background(),
+ sim.NodeIDs(),
+ simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeDrop),
+ )
+
+ go func() {
+ for d := range disconnections {
+ log.Error("peer drop", "node", d.NodeID, "peer", d.Event.Peer)
+ t.Fatal("unexpected disconnect")
+ cancelSimRun()
+ }
+ }()
+
result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
nodeIDs := sim.UpNodeIDs()
for _, n := range nodeIDs {
@@ -402,6 +463,7 @@ func testSyncingViaDirectSubscribe(chunkCount int, nodeCount int) error {
// or until the timeout is reached.
allSuccess := false
for !allSuccess {
+ allSuccess = true
for _, id := range nodeIDs {
//for each expected chunk, check if it is in the local store
localChunks := conf.idToChunksMap[id]
@@ -434,7 +496,10 @@ func testSyncingViaDirectSubscribe(chunkCount int, nodeCount int) error {
log.Debug(fmt.Sprintf("Chunk %s IS FOUND for id %s", chunk, id))
}
}
- allSuccess = localSuccess
+ if !localSuccess {
+ allSuccess = false
+ break
+ }
}
}
if !allSuccess {
@@ -447,7 +512,7 @@ func testSyncingViaDirectSubscribe(chunkCount int, nodeCount int) error {
return result.Error
}
- log.Info("Simulation terminated")
+ log.Info("Simulation ended")
return nil
}
@@ -457,20 +522,14 @@ func testSyncingViaDirectSubscribe(chunkCount int, nodeCount int) error {
//returns the number of subscriptions requested
func startSyncing(r *Registry, conf *synctestConfig) (int, error) {
var err error
-
- kad, ok := r.delivery.overlay.(*network.Kademlia)
- if !ok {
- return 0, fmt.Errorf("Not a Kademlia!")
- }
-
+ kad := r.delivery.kad
subCnt := 0
//iterate over each bin and solicit needed subscription to bins
- kad.EachBin(r.addr.Over(), pof, 0, func(conn network.OverlayConn, po int) bool {
+ kad.EachBin(r.addr.Over(), pof, 0, func(conn *network.Peer, po int) bool {
//identify begin and start index of the bin(s) we want to subscribe to
- histRange := &Range{}
subCnt++
- err = r.RequestSubscription(conf.addrToIDMap[string(conn.Address())], NewStream("SYNC", FormatSyncBinKey(uint8(po)), true), histRange, Top)
+ err = r.RequestSubscription(conf.addrToIDMap[string(conn.Address())], NewStream("SYNC", FormatSyncBinKey(uint8(po)), true), NewRange(0, 0), High)
if err != nil {
log.Error(fmt.Sprintf("Error in RequestSubsciption! %v", err))
return false
@@ -483,7 +542,6 @@ func startSyncing(r *Registry, conf *synctestConfig) (int, error) {
//map chunk keys to addresses which are responsible
func mapKeysToNodes(conf *synctestConfig) {
- kmap := make(map[string][]int)
nodemap := make(map[string][]int)
//build a pot for chunk hashes
np := pot.NewPot(nil, 0)
@@ -492,36 +550,33 @@ func mapKeysToNodes(conf *synctestConfig) {
indexmap[string(a)] = i
np, _, _ = pot.Add(np, a, pof)
}
+
+ var kadMinProxSize = 2
+
+ ppmap := network.NewPeerPotMap(kadMinProxSize, conf.addrs)
+
//for each address, run EachNeighbour on the chunk hashes pot to identify closest nodes
log.Trace(fmt.Sprintf("Generated hash chunk(s): %v", conf.hashes))
for i := 0; i < len(conf.hashes); i++ {
- pl := 256 //highest possible proximity
- var nns []int
+ var a []byte
np.EachNeighbour([]byte(conf.hashes[i]), pof, func(val pot.Val, po int) bool {
- a := val.([]byte)
- if pl < 256 && pl != po {
- return false
- }
- if pl == 256 || pl == po {
- log.Trace(fmt.Sprintf("appending %s", conf.addrToIDMap[string(a)]))
- nns = append(nns, indexmap[string(a)])
- nodemap[string(a)] = append(nodemap[string(a)], i)
- }
- if pl == 256 && len(nns) >= testMinProxBinSize {
- //maxProxBinSize has been reached at this po, so save it
- //we will add all other nodes at the same po
- pl = po
- }
- return true
+ // take the first address
+ a = val.([]byte)
+ return false
})
- kmap[string(conf.hashes[i])] = nns
+
+ nns := ppmap[common.Bytes2Hex(a)].NNSet
+ nns = append(nns, a)
+
+ for _, p := range nns {
+ nodemap[string(p)] = append(nodemap[string(p)], i)
+ }
}
for addr, chunks := range nodemap {
//this selects which chunks are expected to be found with the given node
conf.idToChunksMap[conf.addrToIDMap[addr]] = chunks
}
log.Debug(fmt.Sprintf("Map of expected chunks by ID: %v", conf.idToChunksMap))
- conf.chunksToNodesMap = kmap
}
//upload a file(chunks) to a single local node store
diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go
index cd0580a0c..1f1f34b7b 100644
--- a/swarm/network/stream/stream.go
+++ b/swarm/network/stream/stream.go
@@ -32,10 +32,8 @@ import (
"github.com/ethereum/go-ethereum/swarm/network"
"github.com/ethereum/go-ethereum/swarm/network/stream/intervals"
"github.com/ethereum/go-ethereum/swarm/pot"
- "github.com/ethereum/go-ethereum/swarm/spancontext"
"github.com/ethereum/go-ethereum/swarm/state"
"github.com/ethereum/go-ethereum/swarm/storage"
- opentracing "github.com/opentracing/opentracing-go"
)
const (
@@ -43,8 +41,8 @@ const (
Mid
High
Top
- PriorityQueue // number of queues
- PriorityQueueCap = 32 // queue capacity
+ PriorityQueue = 4 // number of priority queues - Low, Mid, High, Top
+ PriorityQueueCap = 128 // queue capacity
HashSize = 32
)
@@ -73,7 +71,7 @@ type RegistryOptions struct {
}
// NewRegistry is Streamer constructor
-func NewRegistry(addr *network.BzzAddr, delivery *Delivery, db *storage.DBAPI, intervalsStore state.Store, options *RegistryOptions) *Registry {
+func NewRegistry(addr *network.BzzAddr, delivery *Delivery, syncChunkStore storage.SyncChunkStore, intervalsStore state.Store, options *RegistryOptions) *Registry {
if options == nil {
options = &RegistryOptions{}
}
@@ -93,13 +91,13 @@ func NewRegistry(addr *network.BzzAddr, delivery *Delivery, db *storage.DBAPI, i
streamer.api = NewAPI(streamer)
delivery.getPeer = streamer.getPeer
streamer.RegisterServerFunc(swarmChunkServerStreamName, func(_ *Peer, _ string, _ bool) (Server, error) {
- return NewSwarmChunkServer(delivery.db), nil
+ return NewSwarmChunkServer(delivery.chunkStore), nil
})
streamer.RegisterClientFunc(swarmChunkServerStreamName, func(p *Peer, t string, live bool) (Client, error) {
- return NewSwarmSyncerClient(p, delivery.db, false, NewStream(swarmChunkServerStreamName, t, live))
+ return NewSwarmSyncerClient(p, syncChunkStore, NewStream(swarmChunkServerStreamName, t, live))
})
- RegisterSwarmSyncerServer(streamer, db)
- RegisterSwarmSyncerClient(streamer, db)
+ RegisterSwarmSyncerServer(streamer, syncChunkStore)
+ RegisterSwarmSyncerClient(streamer, syncChunkStore)
if options.DoSync {
// latestIntC function ensures that
@@ -130,7 +128,7 @@ func NewRegistry(addr *network.BzzAddr, delivery *Delivery, db *storage.DBAPI, i
// wait for kademlia table to be healthy
time.Sleep(options.SyncUpdateDelay)
- kad := streamer.delivery.overlay.(*network.Kademlia)
+ kad := streamer.delivery.kad
depthC := latestIntC(kad.NeighbourhoodDepthC())
addressBookSizeC := latestIntC(kad.AddrCountC())
@@ -325,16 +323,6 @@ func (r *Registry) Quit(peerId discover.NodeID, s Stream) error {
return peer.Send(context.TODO(), msg)
}
-func (r *Registry) Retrieve(ctx context.Context, chunk *storage.Chunk) error {
- var sp opentracing.Span
- ctx, sp = spancontext.StartSpan(
- ctx,
- "registry.retrieve")
- defer sp.Finish()
-
- return r.delivery.RequestFromPeers(ctx, chunk.Addr[:], r.skipCheck)
-}
-
func (r *Registry) NodeInfo() interface{} {
return nil
}
@@ -398,9 +386,7 @@ func (r *Registry) Run(p *network.BzzPeer) error {
// and they are no longer required after iteration, request to Quit
// them will be send to appropriate peers.
func (r *Registry) updateSyncing() {
- // if overlay in not Kademlia, panic
- kad := r.delivery.overlay.(*network.Kademlia)
-
+ kad := r.delivery.kad
// map of all SYNC streams for all peers
// used at the and of the function to remove servers
// that are not needed anymore
@@ -421,8 +407,7 @@ func (r *Registry) updateSyncing() {
r.peersMu.RUnlock()
// request subscriptions for all nodes and bins
- kad.EachBin(r.addr.Over(), pot.DefaultPof(256), 0, func(conn network.OverlayConn, bin int) bool {
- p := conn.(network.Peer)
+ kad.EachBin(r.addr.Over(), pot.DefaultPof(256), 0, func(p *network.Peer, bin int) bool {
log.Debug(fmt.Sprintf("Requesting subscription by: registry %s from peer %s for bin: %d", r.addr.ID(), p.ID(), bin))
// bin is always less then 256 and it is safe to convert it to type uint8
@@ -461,10 +446,11 @@ func (r *Registry) updateSyncing() {
func (r *Registry) runProtocol(p *p2p.Peer, rw p2p.MsgReadWriter) error {
peer := protocols.NewPeer(p, rw, Spec)
- bzzPeer := network.NewBzzTestPeer(peer, r.addr)
- r.delivery.overlay.On(bzzPeer)
- defer r.delivery.overlay.Off(bzzPeer)
- return r.Run(bzzPeer)
+ bp := network.NewBzzPeer(peer, r.addr)
+ np := network.NewPeer(bp, r.delivery.kad)
+ r.delivery.kad.On(np)
+ defer r.delivery.kad.Off(np)
+ return r.Run(bp)
}
// HandleMsg is the message handler that delegates incoming messages
@@ -559,7 +545,7 @@ func (c client) NextInterval() (start, end uint64, err error) {
// Client interface for incoming peer Streamer
type Client interface {
- NeedData(context.Context, []byte) func()
+ NeedData(context.Context, []byte) func(context.Context) error
BatchDone(Stream, uint64, []byte, []byte) func() (*TakeoverProof, error)
Close()
}
diff --git a/swarm/network/stream/streamer_test.go b/swarm/network/stream/streamer_test.go
index 7523860c9..06e96b9a9 100644
--- a/swarm/network/stream/streamer_test.go
+++ b/swarm/network/stream/streamer_test.go
@@ -80,15 +80,17 @@ func newTestClient(t string) *testClient {
}
}
-func (self *testClient) NeedData(ctx context.Context, hash []byte) func() {
+func (self *testClient) NeedData(ctx context.Context, hash []byte) func(context.Context) error {
self.receivedHashes[string(hash)] = hash
if bytes.Equal(hash, hash0[:]) {
- return func() {
+ return func(context.Context) error {
<-self.wait0
+ return nil
}
} else if bytes.Equal(hash, hash2[:]) {
- return func() {
+ return func(context.Context) error {
<-self.wait2
+ return nil
}
}
return nil
diff --git a/swarm/network/stream/syncer.go b/swarm/network/stream/syncer.go
index d7febe4a3..e9811a678 100644
--- a/swarm/network/stream/syncer.go
+++ b/swarm/network/stream/syncer.go
@@ -28,7 +28,6 @@ import (
)
const (
- // BatchSize = 2
BatchSize = 128
)
@@ -38,35 +37,37 @@ const (
// * (live/non-live historical) chunk syncing per proximity bin
type SwarmSyncerServer struct {
po uint8
- db *storage.DBAPI
+ store storage.SyncChunkStore
sessionAt uint64
start uint64
+ live bool
quit chan struct{}
}
// NewSwarmSyncerServer is contructor for SwarmSyncerServer
-func NewSwarmSyncerServer(live bool, po uint8, db *storage.DBAPI) (*SwarmSyncerServer, error) {
- sessionAt := db.CurrentBucketStorageIndex(po)
+func NewSwarmSyncerServer(live bool, po uint8, syncChunkStore storage.SyncChunkStore) (*SwarmSyncerServer, error) {
+ sessionAt := syncChunkStore.BinIndex(po)
var start uint64
if live {
start = sessionAt
}
return &SwarmSyncerServer{
po: po,
- db: db,
+ store: syncChunkStore,
sessionAt: sessionAt,
start: start,
+ live: live,
quit: make(chan struct{}),
}, nil
}
-func RegisterSwarmSyncerServer(streamer *Registry, db *storage.DBAPI) {
+func RegisterSwarmSyncerServer(streamer *Registry, syncChunkStore storage.SyncChunkStore) {
streamer.RegisterServerFunc("SYNC", func(p *Peer, t string, live bool) (Server, error) {
po, err := ParseSyncBinKey(t)
if err != nil {
return nil, err
}
- return NewSwarmSyncerServer(live, po, db)
+ return NewSwarmSyncerServer(live, po, syncChunkStore)
})
// streamer.RegisterServerFunc(stream, func(p *Peer) (Server, error) {
// return NewOutgoingProvableSwarmSyncer(po, db)
@@ -78,27 +79,35 @@ func (s *SwarmSyncerServer) Close() {
close(s.quit)
}
-// GetSection retrieves the actual chunk from localstore
+// GetData retrieves the actual chunk from netstore
func (s *SwarmSyncerServer) GetData(ctx context.Context, key []byte) ([]byte, error) {
- chunk, err := s.db.Get(ctx, storage.Address(key))
- if err == storage.ErrFetching {
- <-chunk.ReqC
- } else if err != nil {
+ chunk, err := s.store.Get(ctx, storage.Address(key))
+ if err != nil {
return nil, err
}
- return chunk.SData, nil
+ return chunk.Data(), nil
}
// GetBatch retrieves the next batch of hashes from the dbstore
func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) {
var batch []byte
i := 0
- if from == 0 {
- from = s.start
- }
- if to <= from || from >= s.sessionAt {
- to = math.MaxUint64
+ if s.live {
+ if from == 0 {
+ from = s.start
+ }
+ if to <= from || from >= s.sessionAt {
+ to = math.MaxUint64
+ }
+ } else {
+ if (to < from && to != 0) || from > s.sessionAt {
+ return nil, 0, 0, nil, nil
+ }
+ if to == 0 || to > s.sessionAt {
+ to = s.sessionAt
+ }
}
+
var ticker *time.Ticker
defer func() {
if ticker != nil {
@@ -119,8 +128,8 @@ func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint6
}
metrics.GetOrRegisterCounter("syncer.setnextbatch.iterator", nil).Inc(1)
- err := s.db.Iterator(from, to, s.po, func(addr storage.Address, idx uint64) bool {
- batch = append(batch, addr[:]...)
+ err := s.store.Iterator(from, to, s.po, func(key storage.Address, idx uint64) bool {
+ batch = append(batch, key[:]...)
i++
to = idx
return i < BatchSize
@@ -134,7 +143,7 @@ func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint6
wait = true
}
- log.Trace("Swarm syncer offer batch", "po", s.po, "len", i, "from", from, "to", to, "current store count", s.db.CurrentBucketStorageIndex(s.po))
+ log.Trace("Swarm syncer offer batch", "po", s.po, "len", i, "from", from, "to", to, "current store count", s.store.BinIndex(s.po))
return batch, from, to, nil, nil
}
@@ -146,28 +155,26 @@ type SwarmSyncerClient struct {
sessionReader storage.LazySectionReader
retrieveC chan *storage.Chunk
storeC chan *storage.Chunk
- db *storage.DBAPI
+ store storage.SyncChunkStore
// chunker storage.Chunker
- currentRoot storage.Address
- requestFunc func(chunk *storage.Chunk)
- end, start uint64
- peer *Peer
- ignoreExistingRequest bool
- stream Stream
+ currentRoot storage.Address
+ requestFunc func(chunk *storage.Chunk)
+ end, start uint64
+ peer *Peer
+ stream Stream
}
// NewSwarmSyncerClient is a contructor for provable data exchange syncer
-func NewSwarmSyncerClient(p *Peer, db *storage.DBAPI, ignoreExistingRequest bool, stream Stream) (*SwarmSyncerClient, error) {
+func NewSwarmSyncerClient(p *Peer, store storage.SyncChunkStore, stream Stream) (*SwarmSyncerClient, error) {
return &SwarmSyncerClient{
- db: db,
- peer: p,
- ignoreExistingRequest: ignoreExistingRequest,
- stream: stream,
+ store: store,
+ peer: p,
+ stream: stream,
}, nil
}
// // NewIncomingProvableSwarmSyncer is a contructor for provable data exchange syncer
-// func NewIncomingProvableSwarmSyncer(po int, priority int, index uint64, sessionAt uint64, intervals []uint64, sessionRoot storage.Key, chunker *storage.PyramidChunker, store storage.ChunkStore, p Peer) *SwarmSyncerClient {
+// func NewIncomingProvableSwarmSyncer(po int, priority int, index uint64, sessionAt uint64, intervals []uint64, sessionRoot storage.Address, chunker *storage.PyramidChunker, store storage.ChunkStore, p Peer) *SwarmSyncerClient {
// retrieveC := make(storage.Chunk, chunksCap)
// RunChunkRequestor(p, retrieveC)
// storeC := make(storage.Chunk, chunksCap)
@@ -204,26 +211,15 @@ func NewSwarmSyncerClient(p *Peer, db *storage.DBAPI, ignoreExistingRequest bool
// RegisterSwarmSyncerClient registers the client constructor function for
// to handle incoming sync streams
-func RegisterSwarmSyncerClient(streamer *Registry, db *storage.DBAPI) {
+func RegisterSwarmSyncerClient(streamer *Registry, store storage.SyncChunkStore) {
streamer.RegisterClientFunc("SYNC", func(p *Peer, t string, live bool) (Client, error) {
- return NewSwarmSyncerClient(p, db, true, NewStream("SYNC", t, live))
+ return NewSwarmSyncerClient(p, store, NewStream("SYNC", t, live))
})
}
// NeedData
-func (s *SwarmSyncerClient) NeedData(ctx context.Context, key []byte) (wait func()) {
- chunk, _ := s.db.GetOrCreateRequest(ctx, key)
- // TODO: we may want to request from this peer anyway even if the request exists
-
- // ignoreExistingRequest is temporary commented out until its functionality is verified.
- // For now, this optimization can be disabled.
- if chunk.ReqC == nil { //|| (s.ignoreExistingRequest && !created) {
- return nil
- }
- // create request and wait until the chunk data arrives and is stored
- return func() {
- chunk.WaitToStore()
- }
+func (s *SwarmSyncerClient) NeedData(ctx context.Context, key []byte) (wait func(context.Context) error) {
+ return s.store.FetchFunc(ctx, key)
}
// BatchDone
diff --git a/swarm/network/stream/syncer_test.go b/swarm/network/stream/syncer_test.go
index f72aa3444..469d520f8 100644
--- a/swarm/network/stream/syncer_test.go
+++ b/swarm/network/stream/syncer_test.go
@@ -102,17 +102,22 @@ func testSyncBetweenNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck
}
}
localStore := store.(*storage.LocalStore)
- db := storage.NewDBAPI(localStore)
- bucket.Store(bucketKeyDB, db)
+ netStore, err := storage.NewNetStore(localStore, nil)
+ if err != nil {
+ return nil, nil, err
+ }
+ bucket.Store(bucketKeyDB, netStore)
kad := network.NewKademlia(addr.Over(), network.NewKadParams())
- delivery := NewDelivery(kad, db)
+ delivery := NewDelivery(kad, netStore)
+ netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
+
bucket.Store(bucketKeyDelivery, delivery)
- r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{
+ r := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
SkipCheck: skipCheck,
})
- fileStore := storage.NewFileStore(storage.NewNetStore(localStore, nil), storage.NewFileStoreParams())
+ fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
bucket.Store(bucketKeyFileStore, fileStore)
return r, cleanup, nil
@@ -197,8 +202,8 @@ func testSyncBetweenNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck
if !ok {
return fmt.Errorf("No DB")
}
- db := item.(*storage.DBAPI)
- db.Iterator(0, math.MaxUint64, po, func(addr storage.Address, index uint64) bool {
+ netStore := item.(*storage.NetStore)
+ netStore.Iterator(0, math.MaxUint64, po, func(addr storage.Address, index uint64) bool {
hashes[i] = append(hashes[i], addr)
totalHashes++
hashCounts[i]++
@@ -216,16 +221,11 @@ func testSyncBetweenNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck
if !ok {
return fmt.Errorf("No DB")
}
- db := item.(*storage.DBAPI)
- chunk, err := db.Get(ctx, key)
- if err == storage.ErrFetching {
- <-chunk.ReqC
- } else if err != nil {
- continue
+ db := item.(*storage.NetStore)
+ _, err := db.Get(ctx, key)
+ if err == nil {
+ found++
}
- // needed for leveldb not to be closed?
- // chunk.WaitToStore()
- found++
}
}
log.Debug("sync check", "node", node, "index", i, "bin", po, "found", found, "total", total)