diff options
Diffstat (limited to 'swarm/network')
26 files changed, 1512 insertions, 821 deletions
diff --git a/swarm/network/discovery.go b/swarm/network/discovery.go index 55bf7c033..301959480 100644 --- a/swarm/network/discovery.go +++ b/swarm/network/discovery.go @@ -26,30 +26,30 @@ import ( // discovery bzz extension for requesting and relaying node address records -// discPeer wraps BzzPeer and embeds an Overlay connectivity driver -type discPeer struct { +// Peer wraps BzzPeer and embeds Kademlia overlay connectivity driver +type Peer struct { *BzzPeer - overlay Overlay - sentPeers bool // whether we already sent peer closer to this address - mtx sync.RWMutex + kad *Kademlia + sentPeers bool // whether we already sent peer closer to this address + mtx sync.RWMutex // peers map[string]bool // tracks node records sent to the peer depth uint8 // the proximity order advertised by remote as depth of saturation } -// NewDiscovery constructs a discovery peer -func newDiscovery(p *BzzPeer, o Overlay) *discPeer { - d := &discPeer{ - overlay: o, +// NewPeer constructs a discovery peer +func NewPeer(p *BzzPeer, kad *Kademlia) *Peer { + d := &Peer{ + kad: kad, BzzPeer: p, peers: make(map[string]bool), } // record remote as seen so we never send a peer its own record - d.seen(d) + d.seen(p.BzzAddr) return d } // HandleMsg is the message handler that delegates incoming messages -func (d *discPeer) HandleMsg(ctx context.Context, msg interface{}) error { +func (d *Peer) HandleMsg(ctx context.Context, msg interface{}) error { switch msg := msg.(type) { case *peersMsg: @@ -64,24 +64,18 @@ func (d *discPeer) HandleMsg(ctx context.Context, msg interface{}) error { } // NotifyDepth sends a message to all connections if depth of saturation is changed -func NotifyDepth(depth uint8, h Overlay) { - f := func(val OverlayConn, po int, _ bool) bool { - dp, ok := val.(*discPeer) - if ok { - dp.NotifyDepth(depth) - } +func NotifyDepth(depth uint8, kad *Kademlia) { + f := func(val *Peer, po int, _ bool) bool { + val.NotifyDepth(depth) return true } - h.EachConn(nil, 255, f) + kad.EachConn(nil, 255, f) } // NotifyPeer informs all peers about a newly added node -func NotifyPeer(p OverlayAddr, k Overlay) { - f := func(val OverlayConn, po int, _ bool) bool { - dp, ok := val.(*discPeer) - if ok { - dp.NotifyPeer(p, uint8(po)) - } +func NotifyPeer(p *BzzAddr, k *Kademlia) { + f := func(val *Peer, po int, _ bool) bool { + val.NotifyPeer(p, uint8(po)) return true } k.EachConn(p.Address(), 255, f) @@ -91,22 +85,20 @@ func NotifyPeer(p OverlayAddr, k Overlay) { // the peer's PO is within the recipients advertised depth // OR the peer is closer to the recipient than self // unless already notified during the connection session -func (d *discPeer) NotifyPeer(a OverlayAddr, po uint8) { +func (d *Peer) NotifyPeer(a *BzzAddr, po uint8) { // immediately return if (po < d.getDepth() && pot.ProxCmp(d.localAddr, d, a) != 1) || d.seen(a) { return } - // log.Trace(fmt.Sprintf("%08x peer %08x notified of peer %08x", d.localAddr.Over()[:4], d.Address()[:4], a.Address()[:4])) resp := &peersMsg{ - Peers: []*BzzAddr{ToAddr(a)}, + Peers: []*BzzAddr{a}, } go d.Send(context.TODO(), resp) } // NotifyDepth sends a subPeers Msg to the receiver notifying them about // a change in the depth of saturation -func (d *discPeer) NotifyDepth(po uint8) { - // log.Trace(fmt.Sprintf("%08x peer %08x notified of new depth %v", d.localAddr.Over()[:4], d.Address()[:4], po)) +func (d *Peer) NotifyDepth(po uint8) { go d.Send(context.TODO(), &subPeersMsg{Depth: po}) } @@ -141,7 +133,7 @@ func (msg peersMsg) String() string { // handlePeersMsg called by the protocol when receiving peerset (for target address) // list of nodes ([]PeerAddr in peersMsg) is added to the overlay db using the // Register interface method -func (d *discPeer) handlePeersMsg(msg *peersMsg) error { +func (d *Peer) handlePeersMsg(msg *peersMsg) error { // register all addresses if len(msg.Peers) == 0 { return nil @@ -149,12 +141,12 @@ func (d *discPeer) handlePeersMsg(msg *peersMsg) error { for _, a := range msg.Peers { d.seen(a) - NotifyPeer(a, d.overlay) + NotifyPeer(a, d.kad) } - return d.overlay.Register(toOverlayAddrs(msg.Peers...)) + return d.kad.Register(msg.Peers...) } -// subPeers msg is communicating the depth/sharpness/focus of the overlay table of a peer +// subPeers msg is communicating the depth of the overlay table of a peer type subPeersMsg struct { Depth uint8 } @@ -164,21 +156,20 @@ func (msg subPeersMsg) String() string { return fmt.Sprintf("%T: request peers > PO%02d. ", msg, msg.Depth) } -func (d *discPeer) handleSubPeersMsg(msg *subPeersMsg) error { +func (d *Peer) handleSubPeersMsg(msg *subPeersMsg) error { if !d.sentPeers { d.setDepth(msg.Depth) var peers []*BzzAddr - d.overlay.EachConn(d.Over(), 255, func(p OverlayConn, po int, isproxbin bool) bool { + d.kad.EachConn(d.Over(), 255, func(p *Peer, po int, isproxbin bool) bool { if pob, _ := pof(d, d.localAddr, 0); pob > po { return false } - if !d.seen(p) { - peers = append(peers, ToAddr(p.Off())) + if !d.seen(p.BzzAddr) { + peers = append(peers, p.BzzAddr) } return true }) if len(peers) > 0 { - // log.Debug(fmt.Sprintf("%08x: %v peers sent to %v", d.overlay.BaseAddr(), len(peers), d)) go d.Send(context.TODO(), &peersMsg{Peers: peers}) } } @@ -186,9 +177,9 @@ func (d *discPeer) handleSubPeersMsg(msg *subPeersMsg) error { return nil } -// seen takes an Overlay peer and checks if it was sent to a peer already +// seen takes an peer address and checks if it was sent to a peer already // if not, marks the peer as sent -func (d *discPeer) seen(p OverlayPeer) bool { +func (d *Peer) seen(p *BzzAddr) bool { d.mtx.Lock() defer d.mtx.Unlock() k := string(p.Address()) @@ -199,12 +190,13 @@ func (d *discPeer) seen(p OverlayPeer) bool { return false } -func (d *discPeer) getDepth() uint8 { +func (d *Peer) getDepth() uint8 { d.mtx.RLock() defer d.mtx.RUnlock() return d.depth } -func (d *discPeer) setDepth(depth uint8) { + +func (d *Peer) setDepth(depth uint8) { d.mtx.Lock() defer d.mtx.Unlock() d.depth = depth diff --git a/swarm/network/discovery_test.go b/swarm/network/discovery_test.go index 0427d81ca..494bc8196 100644 --- a/swarm/network/discovery_test.go +++ b/swarm/network/discovery_test.go @@ -33,7 +33,7 @@ func TestDiscovery(t *testing.T) { id := s.IDs[0] raddr := NewAddrFromNodeID(id) - pp.Register([]OverlayAddr{OverlayAddr(raddr)}) + pp.Register(raddr) // start the hive and wait for the connection pp.Start(s.Server) diff --git a/swarm/network/fetcher.go b/swarm/network/fetcher.go new file mode 100644 index 000000000..35e2f0132 --- /dev/null +++ b/swarm/network/fetcher.go @@ -0,0 +1,305 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package network + +import ( + "context" + "sync" + "time" + + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/p2p/discover" + "github.com/ethereum/go-ethereum/swarm/storage" +) + +var searchTimeout = 1 * time.Second + +// Time to consider peer to be skipped. +// Also used in stream delivery. +var RequestTimeout = 10 * time.Second + +type RequestFunc func(context.Context, *Request) (*discover.NodeID, chan struct{}, error) + +// Fetcher is created when a chunk is not found locally. It starts a request handler loop once and +// keeps it alive until all active requests are completed. This can happen: +// 1. either because the chunk is delivered +// 2. or becuse the requestor cancelled/timed out +// Fetcher self destroys itself after it is completed. +// TODO: cancel all forward requests after termination +type Fetcher struct { + protoRequestFunc RequestFunc // request function fetcher calls to issue retrieve request for a chunk + addr storage.Address // the address of the chunk to be fetched + offerC chan *discover.NodeID // channel of sources (peer node id strings) + requestC chan struct{} + skipCheck bool +} + +type Request struct { + Addr storage.Address // chunk address + Source *discover.NodeID // nodeID of peer to request from (can be nil) + SkipCheck bool // whether to offer the chunk first or deliver directly + peersToSkip *sync.Map // peers not to request chunk from (only makes sense if source is nil) +} + +// NewRequest returns a new instance of Request based on chunk address skip check and +// a map of peers to skip. +func NewRequest(addr storage.Address, skipCheck bool, peersToSkip *sync.Map) *Request { + return &Request{ + Addr: addr, + SkipCheck: skipCheck, + peersToSkip: peersToSkip, + } +} + +// SkipPeer returns if the peer with nodeID should not be requested to deliver a chunk. +// Peers to skip are kept per Request and for a time period of RequestTimeout. +// This function is used in stream package in Delivery.RequestFromPeers to optimize +// requests for chunks. +func (r *Request) SkipPeer(nodeID string) bool { + val, ok := r.peersToSkip.Load(nodeID) + if !ok { + return false + } + t, ok := val.(time.Time) + if ok && time.Now().After(t.Add(RequestTimeout)) { + // deadine expired + r.peersToSkip.Delete(nodeID) + return false + } + return true +} + +// FetcherFactory is initialised with a request function and can create fetchers +type FetcherFactory struct { + request RequestFunc + skipCheck bool +} + +// NewFetcherFactory takes a request function and skip check parameter and creates a FetcherFactory +func NewFetcherFactory(request RequestFunc, skipCheck bool) *FetcherFactory { + return &FetcherFactory{ + request: request, + skipCheck: skipCheck, + } +} + +// New contructs a new Fetcher, for the given chunk. All peers in peersToSkip are not requested to +// deliver the given chunk. peersToSkip should always contain the peers which are actively requesting +// this chunk, to make sure we don't request back the chunks from them. +// The created Fetcher is started and returned. +func (f *FetcherFactory) New(ctx context.Context, source storage.Address, peersToSkip *sync.Map) storage.NetFetcher { + fetcher := NewFetcher(source, f.request, f.skipCheck) + go fetcher.run(ctx, peersToSkip) + return fetcher +} + +// NewFetcher creates a new Fetcher for the given chunk address using the given request function. +func NewFetcher(addr storage.Address, rf RequestFunc, skipCheck bool) *Fetcher { + return &Fetcher{ + addr: addr, + protoRequestFunc: rf, + offerC: make(chan *discover.NodeID), + requestC: make(chan struct{}), + skipCheck: skipCheck, + } +} + +// Offer is called when an upstream peer offers the chunk via syncing as part of `OfferedHashesMsg` and the node does not have the chunk locally. +func (f *Fetcher) Offer(ctx context.Context, source *discover.NodeID) { + // First we need to have this select to make sure that we return if context is done + select { + case <-ctx.Done(): + return + default: + } + + // This select alone would not guarantee that we return of context is done, it could potentially + // push to offerC instead if offerC is available (see number 2 in https://golang.org/ref/spec#Select_statements) + select { + case f.offerC <- source: + case <-ctx.Done(): + } +} + +// Request is called when an upstream peer request the chunk as part of `RetrieveRequestMsg`, or from a local request through FileStore, and the node does not have the chunk locally. +func (f *Fetcher) Request(ctx context.Context) { + // First we need to have this select to make sure that we return if context is done + select { + case <-ctx.Done(): + return + default: + } + + // This select alone would not guarantee that we return of context is done, it could potentially + // push to offerC instead if offerC is available (see number 2 in https://golang.org/ref/spec#Select_statements) + select { + case f.requestC <- struct{}{}: + case <-ctx.Done(): + } +} + +// start prepares the Fetcher +// it keeps the Fetcher alive within the lifecycle of the passed context +func (f *Fetcher) run(ctx context.Context, peers *sync.Map) { + var ( + doRequest bool // determines if retrieval is initiated in the current iteration + wait *time.Timer // timer for search timeout + waitC <-chan time.Time // timer channel + sources []*discover.NodeID // known sources, ie. peers that offered the chunk + requested bool // true if the chunk was actually requested + ) + gone := make(chan *discover.NodeID) // channel to signal that a peer we requested from disconnected + + // loop that keeps the fetching process alive + // after every request a timer is set. If this goes off we request again from another peer + // note that the previous request is still alive and has the chance to deliver, so + // rerequesting extends the search. ie., + // if a peer we requested from is gone we issue a new request, so the number of active + // requests never decreases + for { + select { + + // incoming offer + case source := <-f.offerC: + log.Trace("new source", "peer addr", source, "request addr", f.addr) + // 1) the chunk is offered by a syncing peer + // add to known sources + sources = append(sources, source) + // launch a request to the source iff the chunk was requested (not just expected because its offered by a syncing peer) + doRequest = requested + + // incoming request + case <-f.requestC: + log.Trace("new request", "request addr", f.addr) + // 2) chunk is requested, set requested flag + // launch a request iff none been launched yet + doRequest = !requested + requested = true + + // peer we requested from is gone. fall back to another + // and remove the peer from the peers map + case id := <-gone: + log.Trace("peer gone", "peer id", id.String(), "request addr", f.addr) + peers.Delete(id.String()) + doRequest = requested + + // search timeout: too much time passed since the last request, + // extend the search to a new peer if we can find one + case <-waitC: + log.Trace("search timed out: rerequesting", "request addr", f.addr) + doRequest = requested + + // all Fetcher context closed, can quit + case <-ctx.Done(): + log.Trace("terminate fetcher", "request addr", f.addr) + // TODO: send cancelations to all peers left over in peers map (i.e., those we requested from) + return + } + + // need to issue a new request + if doRequest { + var err error + sources, err = f.doRequest(ctx, gone, peers, sources) + if err != nil { + log.Warn("unable to request", "request addr", f.addr, "err", err) + } + } + + // if wait channel is not set, set it to a timer + if requested { + if wait == nil { + wait = time.NewTimer(searchTimeout) + defer wait.Stop() + waitC = wait.C + } else { + // stop the timer and drain the channel if it was not drained earlier + if !wait.Stop() { + select { + case <-wait.C: + default: + } + } + // reset the timer to go off after searchTimeout + wait.Reset(searchTimeout) + } + } + doRequest = false + } +} + +// doRequest attempts at finding a peer to request the chunk from +// * first it tries to request explicitly from peers that are known to have offered the chunk +// * if there are no such peers (available) it tries to request it from a peer closest to the chunk address +// excluding those in the peersToSkip map +// * if no such peer is found an error is returned +// +// if a request is successful, +// * the peer's address is added to the set of peers to skip +// * the peer's address is removed from prospective sources, and +// * a go routine is started that reports on the gone channel if the peer is disconnected (or terminated their streamer) +func (f *Fetcher) doRequest(ctx context.Context, gone chan *discover.NodeID, peersToSkip *sync.Map, sources []*discover.NodeID) ([]*discover.NodeID, error) { + var i int + var sourceID *discover.NodeID + var quit chan struct{} + + req := &Request{ + Addr: f.addr, + SkipCheck: f.skipCheck, + peersToSkip: peersToSkip, + } + + foundSource := false + // iterate over known sources + for i = 0; i < len(sources); i++ { + req.Source = sources[i] + var err error + sourceID, quit, err = f.protoRequestFunc(ctx, req) + if err == nil { + // remove the peer from known sources + // Note: we can modify the source although we are looping on it, because we break from the loop immediately + sources = append(sources[:i], sources[i+1:]...) + foundSource = true + break + } + } + + // if there are no known sources, or none available, we try request from a closest node + if !foundSource { + req.Source = nil + var err error + sourceID, quit, err = f.protoRequestFunc(ctx, req) + if err != nil { + // if no peers found to request from + return sources, err + } + } + // add peer to the set of peers to skip from now + peersToSkip.Store(sourceID.String(), time.Now()) + + // if the quit channel is closed, it indicates that the source peer we requested from + // disconnected or terminated its streamer + // here start a go routine that watches this channel and reports the source peer on the gone channel + // this go routine quits if the fetcher global context is done to prevent process leak + go func() { + select { + case <-quit: + gone <- sourceID + case <-ctx.Done(): + } + }() + return sources, nil +} diff --git a/swarm/network/fetcher_test.go b/swarm/network/fetcher_test.go new file mode 100644 index 000000000..21b81d652 --- /dev/null +++ b/swarm/network/fetcher_test.go @@ -0,0 +1,459 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package network + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/ethereum/go-ethereum/p2p/discover" +) + +var requestedPeerID = discover.MustHexID("1dd9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439") +var sourcePeerID = discover.MustHexID("2dd9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439") + +// mockRequester pushes every request to the requestC channel when its doRequest function is called +type mockRequester struct { + // requests []Request + requestC chan *Request // when a request is coming it is pushed to requestC + waitTimes []time.Duration // with waitTimes[i] you can define how much to wait on the ith request (optional) + ctr int //counts the number of requests + quitC chan struct{} +} + +func newMockRequester(waitTimes ...time.Duration) *mockRequester { + return &mockRequester{ + requestC: make(chan *Request), + waitTimes: waitTimes, + quitC: make(chan struct{}), + } +} + +func (m *mockRequester) doRequest(ctx context.Context, request *Request) (*discover.NodeID, chan struct{}, error) { + waitTime := time.Duration(0) + if m.ctr < len(m.waitTimes) { + waitTime = m.waitTimes[m.ctr] + m.ctr++ + } + time.Sleep(waitTime) + m.requestC <- request + + // if there is a Source in the request use that, if not use the global requestedPeerId + source := request.Source + if source == nil { + source = &requestedPeerID + } + return source, m.quitC, nil +} + +// TestFetcherSingleRequest creates a Fetcher using mockRequester, and run it with a sample set of peers to skip. +// mockRequester pushes a Request on a channel every time the request function is called. Using +// this channel we test if calling Fetcher.Request calls the request function, and whether it uses +// the correct peers to skip which we provided for the fetcher.run function. +func TestFetcherSingleRequest(t *testing.T) { + requester := newMockRequester() + addr := make([]byte, 32) + fetcher := NewFetcher(addr, requester.doRequest, true) + + peers := []string{"a", "b", "c", "d"} + peersToSkip := &sync.Map{} + for _, p := range peers { + peersToSkip.Store(p, time.Now()) + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go fetcher.run(ctx, peersToSkip) + + rctx := context.Background() + fetcher.Request(rctx) + + select { + case request := <-requester.requestC: + // request should contain all peers from peersToSkip provided to the fetcher + for _, p := range peers { + if _, ok := request.peersToSkip.Load(p); !ok { + t.Fatalf("request.peersToSkip misses peer") + } + } + + // source peer should be also added to peersToSkip eventually + time.Sleep(100 * time.Millisecond) + if _, ok := request.peersToSkip.Load(requestedPeerID.String()); !ok { + t.Fatalf("request.peersToSkip does not contain peer returned by the request function") + } + + // fetch should trigger a request, if it doesn't happen in time, test should fail + case <-time.After(200 * time.Millisecond): + t.Fatalf("fetch timeout") + } +} + +// TestCancelStopsFetcher tests that a cancelled fetcher does not initiate further requests even if its fetch function is called +func TestFetcherCancelStopsFetcher(t *testing.T) { + requester := newMockRequester() + addr := make([]byte, 32) + fetcher := NewFetcher(addr, requester.doRequest, true) + + peersToSkip := &sync.Map{} + + ctx, cancel := context.WithCancel(context.Background()) + + // we start the fetcher, and then we immediately cancel the context + go fetcher.run(ctx, peersToSkip) + cancel() + + rctx, rcancel := context.WithTimeout(ctx, 100*time.Millisecond) + defer rcancel() + // we call Request with an active context + fetcher.Request(rctx) + + // fetcher should not initiate request, we can only check by waiting a bit and making sure no request is happening + select { + case <-requester.requestC: + t.Fatalf("cancelled fetcher initiated request") + case <-time.After(200 * time.Millisecond): + } +} + +// TestFetchCancelStopsRequest tests that calling a Request function with a cancelled context does not initiate a request +func TestFetcherCancelStopsRequest(t *testing.T) { + requester := newMockRequester(100 * time.Millisecond) + addr := make([]byte, 32) + fetcher := NewFetcher(addr, requester.doRequest, true) + + peersToSkip := &sync.Map{} + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // we start the fetcher with an active context + go fetcher.run(ctx, peersToSkip) + + rctx, rcancel := context.WithCancel(context.Background()) + rcancel() + + // we call Request with a cancelled context + fetcher.Request(rctx) + + // fetcher should not initiate request, we can only check by waiting a bit and making sure no request is happening + select { + case <-requester.requestC: + t.Fatalf("cancelled fetch function initiated request") + case <-time.After(200 * time.Millisecond): + } + + // if there is another Request with active context, there should be a request, because the fetcher itself is not cancelled + rctx = context.Background() + fetcher.Request(rctx) + + select { + case <-requester.requestC: + case <-time.After(200 * time.Millisecond): + t.Fatalf("expected request") + } +} + +// TestOfferUsesSource tests Fetcher Offer behavior. +// In this case there should be 1 (and only one) request initiated from the source peer, and the +// source nodeid should appear in the peersToSkip map. +func TestFetcherOfferUsesSource(t *testing.T) { + requester := newMockRequester(100 * time.Millisecond) + addr := make([]byte, 32) + fetcher := NewFetcher(addr, requester.doRequest, true) + + peersToSkip := &sync.Map{} + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // start the fetcher + go fetcher.run(ctx, peersToSkip) + + rctx := context.Background() + // call the Offer function with the source peer + fetcher.Offer(rctx, &sourcePeerID) + + // fetcher should not initiate request + select { + case <-requester.requestC: + t.Fatalf("fetcher initiated request") + case <-time.After(200 * time.Millisecond): + } + + // call Request after the Offer + rctx = context.Background() + fetcher.Request(rctx) + + // there should be exactly 1 request coming from fetcher + var request *Request + select { + case request = <-requester.requestC: + if *request.Source != sourcePeerID { + t.Fatalf("Expected source id %v got %v", sourcePeerID, request.Source) + } + case <-time.After(200 * time.Millisecond): + t.Fatalf("fetcher did not initiate request") + } + + select { + case <-requester.requestC: + t.Fatalf("Fetcher number of requests expected 1 got 2") + case <-time.After(200 * time.Millisecond): + } + + // source peer should be added to peersToSkip eventually + time.Sleep(100 * time.Millisecond) + if _, ok := request.peersToSkip.Load(sourcePeerID.String()); !ok { + t.Fatalf("SourcePeerId not added to peersToSkip") + } +} + +func TestFetcherOfferAfterRequestUsesSourceFromContext(t *testing.T) { + requester := newMockRequester(100 * time.Millisecond) + addr := make([]byte, 32) + fetcher := NewFetcher(addr, requester.doRequest, true) + + peersToSkip := &sync.Map{} + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // start the fetcher + go fetcher.run(ctx, peersToSkip) + + // call Request first + rctx := context.Background() + fetcher.Request(rctx) + + // there should be a request coming from fetcher + var request *Request + select { + case request = <-requester.requestC: + if request.Source != nil { + t.Fatalf("Incorrect source peer id, expected nil got %v", request.Source) + } + case <-time.After(200 * time.Millisecond): + t.Fatalf("fetcher did not initiate request") + } + + // after the Request call Offer + fetcher.Offer(context.Background(), &sourcePeerID) + + // there should be a request coming from fetcher + select { + case request = <-requester.requestC: + if *request.Source != sourcePeerID { + t.Fatalf("Incorrect source peer id, expected %v got %v", sourcePeerID, request.Source) + } + case <-time.After(200 * time.Millisecond): + t.Fatalf("fetcher did not initiate request") + } + + // source peer should be added to peersToSkip eventually + time.Sleep(100 * time.Millisecond) + if _, ok := request.peersToSkip.Load(sourcePeerID.String()); !ok { + t.Fatalf("SourcePeerId not added to peersToSkip") + } +} + +// TestFetcherRetryOnTimeout tests that fetch retries after searchTimeOut has passed +func TestFetcherRetryOnTimeout(t *testing.T) { + requester := newMockRequester() + addr := make([]byte, 32) + fetcher := NewFetcher(addr, requester.doRequest, true) + + peersToSkip := &sync.Map{} + + // set searchTimeOut to low value so the test is quicker + defer func(t time.Duration) { + searchTimeout = t + }(searchTimeout) + searchTimeout = 250 * time.Millisecond + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // start the fetcher + go fetcher.run(ctx, peersToSkip) + + // call the fetch function with an active context + rctx := context.Background() + fetcher.Request(rctx) + + // after 100ms the first request should be initiated + time.Sleep(100 * time.Millisecond) + + select { + case <-requester.requestC: + default: + t.Fatalf("fetch did not initiate request") + } + + // after another 100ms no new request should be initiated, because search timeout is 250ms + time.Sleep(100 * time.Millisecond) + + select { + case <-requester.requestC: + t.Fatalf("unexpected request from fetcher") + default: + } + + // after another 300ms search timeout is over, there should be a new request + time.Sleep(300 * time.Millisecond) + + select { + case <-requester.requestC: + default: + t.Fatalf("fetch did not retry request") + } +} + +// TestFetcherFactory creates a FetcherFactory and checks if the factory really creates and starts +// a Fetcher when it return a fetch function. We test the fetching functionality just by checking if +// a request is initiated when the fetch function is called +func TestFetcherFactory(t *testing.T) { + requester := newMockRequester(100 * time.Millisecond) + addr := make([]byte, 32) + fetcherFactory := NewFetcherFactory(requester.doRequest, false) + + peersToSkip := &sync.Map{} + + fetcher := fetcherFactory.New(context.Background(), addr, peersToSkip) + + fetcher.Request(context.Background()) + + // check if the created fetchFunction really starts a fetcher and initiates a request + select { + case <-requester.requestC: + case <-time.After(200 * time.Millisecond): + t.Fatalf("fetch timeout") + } + +} + +func TestFetcherRequestQuitRetriesRequest(t *testing.T) { + requester := newMockRequester() + addr := make([]byte, 32) + fetcher := NewFetcher(addr, requester.doRequest, true) + + // make sure searchTimeout is long so it is sure the request is not retried because of timeout + defer func(t time.Duration) { + searchTimeout = t + }(searchTimeout) + searchTimeout = 10 * time.Second + + peersToSkip := &sync.Map{} + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go fetcher.run(ctx, peersToSkip) + + rctx := context.Background() + fetcher.Request(rctx) + + select { + case <-requester.requestC: + case <-time.After(200 * time.Millisecond): + t.Fatalf("request is not initiated") + } + + close(requester.quitC) + + select { + case <-requester.requestC: + case <-time.After(200 * time.Millisecond): + t.Fatalf("request is not initiated after failed request") + } +} + +// TestRequestSkipPeer checks if PeerSkip function will skip provided peer +// and not skip unknown one. +func TestRequestSkipPeer(t *testing.T) { + addr := make([]byte, 32) + peers := []discover.NodeID{ + discover.MustHexID("1dd9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"), + discover.MustHexID("2dd9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"), + } + + peersToSkip := new(sync.Map) + peersToSkip.Store(peers[0].String(), time.Now()) + r := NewRequest(addr, false, peersToSkip) + + if !r.SkipPeer(peers[0].String()) { + t.Errorf("peer not skipped") + } + + if r.SkipPeer(peers[1].String()) { + t.Errorf("peer skipped") + } +} + +// TestRequestSkipPeerExpired checks if a peer to skip is not skipped +// after RequestTimeout has passed. +func TestRequestSkipPeerExpired(t *testing.T) { + addr := make([]byte, 32) + peer := discover.MustHexID("1dd9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439") + + // set RequestTimeout to a low value and reset it after the test + defer func(t time.Duration) { RequestTimeout = t }(RequestTimeout) + RequestTimeout = 250 * time.Millisecond + + peersToSkip := new(sync.Map) + peersToSkip.Store(peer.String(), time.Now()) + r := NewRequest(addr, false, peersToSkip) + + if !r.SkipPeer(peer.String()) { + t.Errorf("peer not skipped") + } + + time.Sleep(500 * time.Millisecond) + + if r.SkipPeer(peer.String()) { + t.Errorf("peer skipped") + } +} + +// TestRequestSkipPeerPermanent checks if a peer to skip is not skipped +// after RequestTimeout is not skipped if it is set for a permanent skipping +// by value to peersToSkip map is not time.Duration. +func TestRequestSkipPeerPermanent(t *testing.T) { + addr := make([]byte, 32) + peer := discover.MustHexID("1dd9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439") + + // set RequestTimeout to a low value and reset it after the test + defer func(t time.Duration) { RequestTimeout = t }(RequestTimeout) + RequestTimeout = 250 * time.Millisecond + + peersToSkip := new(sync.Map) + peersToSkip.Store(peer.String(), true) + r := NewRequest(addr, false, peersToSkip) + + if !r.SkipPeer(peer.String()) { + t.Errorf("peer not skipped") + } + + time.Sleep(500 * time.Millisecond) + + if !r.SkipPeer(peer.String()) { + t.Errorf("peer not skipped") + } +} diff --git a/swarm/network/hive.go b/swarm/network/hive.go index 366021088..425c1d5a1 100644 --- a/swarm/network/hive.go +++ b/swarm/network/hive.go @@ -32,31 +32,10 @@ import ( Hive is the logistic manager of the swarm When the hive is started, a forever loop is launched that -asks the Overlay Topology driver (e.g., generic kademlia nodetable) +asks the kademlia nodetable to suggest peers to bootstrap connectivity */ -// Overlay is the interface for kademlia (or other topology drivers) -type Overlay interface { - // suggest peers to connect to - SuggestPeer() (OverlayAddr, int, bool) - // register and deregister peer connections - On(OverlayConn) (depth uint8, changed bool) - Off(OverlayConn) - // register peer addresses - Register([]OverlayAddr) error - // iterate over connected peers - EachConn([]byte, int, func(OverlayConn, int, bool) bool) - // iterate over known peers (address records) - EachAddr([]byte, int, func(OverlayAddr, int, bool) bool) - // pretty print the connectivity - String() string - // base Overlay address of the node itself - BaseAddr() []byte - // connectivity health check used for testing - Healthy(*PeerPot) *Health -} - // HiveParams holds the config options to hive type HiveParams struct { Discovery bool // if want discovery of not @@ -78,7 +57,7 @@ func NewHiveParams() *HiveParams { // Hive manages network connections of the swarm node type Hive struct { *HiveParams // settings - Overlay // the overlay connectiviy driver + *Kademlia // the overlay connectiviy driver Store state.Store // storage interface to save peers across sessions addPeer func(*discover.Node) // server callback to connect to a peer // bookkeeping @@ -88,12 +67,12 @@ type Hive struct { // NewHive constructs a new hive // HiveParams: config parameters -// Overlay: connectivity driver using a network topology +// Kademlia: connectivity driver using a network topology // StateStore: to save peers across sessions -func NewHive(params *HiveParams, overlay Overlay, store state.Store) *Hive { +func NewHive(params *HiveParams, kad *Kademlia, store state.Store) *Hive { return &Hive{ HiveParams: params, - Overlay: overlay, + Kademlia: kad, Store: store, } } @@ -133,7 +112,7 @@ func (h *Hive) Stop() error { } } log.Info(fmt.Sprintf("%08x hive stopped, dropping peers", h.BaseAddr()[:4])) - h.EachConn(nil, 255, func(p OverlayConn, _ int, _ bool) bool { + h.EachConn(nil, 255, func(p *Peer, _ int, _ bool) bool { log.Info(fmt.Sprintf("%08x dropping peer %08x", h.BaseAddr()[:4], p.Address()[:4])) p.Drop(nil) return true @@ -151,14 +130,14 @@ func (h *Hive) connect() { addr, depth, changed := h.SuggestPeer() if h.Discovery && changed { - NotifyDepth(uint8(depth), h) + NotifyDepth(uint8(depth), h.Kademlia) } if addr == nil { continue } log.Trace(fmt.Sprintf("%08x hive connect() suggested %08x", h.BaseAddr()[:4], addr.Address()[:4])) - under, err := discover.ParseNode(string(addr.(Addr).Under())) + under, err := discover.ParseNode(string(addr.Under())) if err != nil { log.Warn(fmt.Sprintf("%08x unable to connect to bee %08x: invalid node URL: %v", h.BaseAddr()[:4], addr.Address()[:4], err)) continue @@ -170,19 +149,19 @@ func (h *Hive) connect() { // Run protocol run function func (h *Hive) Run(p *BzzPeer) error { - dp := newDiscovery(p, h) + dp := NewPeer(p, h.Kademlia) depth, changed := h.On(dp) // if we want discovery, advertise change of depth if h.Discovery { if changed { // if depth changed, send to all peers - NotifyDepth(depth, h) + NotifyDepth(depth, h.Kademlia) } else { // otherwise just send depth to new peer dp.NotifyDepth(depth) } } - NotifyPeer(p.Off(), h) + NotifyPeer(p.BzzAddr, h.Kademlia) defer h.Off(dp) return dp.Run(dp.HandleMsg) } @@ -206,17 +185,6 @@ func (h *Hive) PeerInfo(id discover.NodeID) interface{} { } } -// ToAddr returns the serialisable version of u -func ToAddr(pa OverlayPeer) *BzzAddr { - if addr, ok := pa.(*BzzAddr); ok { - return addr - } - if p, ok := pa.(*discPeer); ok { - return p.BzzAddr - } - return pa.(*BzzPeer).BzzAddr -} - // loadPeers, savePeer implement persistence callback/ func (h *Hive) loadPeers() error { var as []*BzzAddr @@ -230,28 +198,19 @@ func (h *Hive) loadPeers() error { } log.Info(fmt.Sprintf("hive %08x: peers loaded", h.BaseAddr()[:4])) - return h.Register(toOverlayAddrs(as...)) -} - -// toOverlayAddrs transforms an array of BzzAddr to OverlayAddr -func toOverlayAddrs(as ...*BzzAddr) (oas []OverlayAddr) { - for _, a := range as { - oas = append(oas, OverlayAddr(a)) - } - return + return h.Register(as...) } // savePeers, savePeer implement persistence callback/ func (h *Hive) savePeers() error { var peers []*BzzAddr - h.Overlay.EachAddr(nil, 256, func(pa OverlayAddr, i int, _ bool) bool { + h.Kademlia.EachAddr(nil, 256, func(pa *BzzAddr, i int, _ bool) bool { if pa == nil { log.Warn(fmt.Sprintf("empty addr: %v", i)) return true } - apa := ToAddr(pa) - log.Trace("saving peer", "peer", apa) - peers = append(peers, apa) + log.Trace("saving peer", "peer", pa) + peers = append(peers, pa) return true }) if err := h.Store.Put("peers", peers); err != nil { diff --git a/swarm/network/hive_test.go b/swarm/network/hive_test.go index c2abfb2aa..7ea000c1a 100644 --- a/swarm/network/hive_test.go +++ b/swarm/network/hive_test.go @@ -41,7 +41,7 @@ func TestRegisterAndConnect(t *testing.T) { id := s.IDs[0] raddr := NewAddrFromNodeID(id) - pp.Register([]OverlayAddr{OverlayAddr(raddr)}) + pp.Register(raddr) // start the hive and wait for the connection err := pp.Start(s.Server) @@ -77,7 +77,7 @@ func TestHiveStatePersistance(t *testing.T) { peers := make(map[string]bool) for _, id := range s.IDs { raddr := NewAddrFromNodeID(id) - pp.Register([]OverlayAddr{OverlayAddr(raddr)}) + pp.Register(raddr) peers[raddr.String()] = true } @@ -97,8 +97,8 @@ func TestHiveStatePersistance(t *testing.T) { pp.Start(s1.Server) i := 0 - pp.Overlay.EachAddr(nil, 256, func(addr OverlayAddr, po int, nn bool) bool { - delete(peers, addr.(*BzzAddr).String()) + pp.Kademlia.EachAddr(nil, 256, func(addr *BzzAddr, po int, nn bool) bool { + delete(peers, addr.String()) i++ return true }) diff --git a/swarm/network/kademlia.go b/swarm/network/kademlia.go index 0177d449c..55a0c6f13 100644 --- a/swarm/network/kademlia.go +++ b/swarm/network/kademlia.go @@ -62,7 +62,7 @@ type KadParams struct { RetryExponent int // exponent to multiply retry intervals with MaxRetries int // maximum number of redial attempts // function to sanction or prevent suggesting a peer - Reachable func(OverlayAddr) bool + Reachable func(*BzzAddr) bool } // NewKadParams returns a params struct with default values @@ -106,45 +106,22 @@ func NewKademlia(addr []byte, params *KadParams) *Kademlia { } } -// OverlayPeer interface captures the common aspect of view of a peer from the Overlay -// topology driver -type OverlayPeer interface { - Address() []byte -} - -// OverlayConn represents a connected peer -type OverlayConn interface { - OverlayPeer - Drop(error) // call to indicate a peer should be expunged - Off() OverlayAddr // call to return a persitent OverlayAddr -} - -// OverlayAddr represents a kademlia peer record -type OverlayAddr interface { - OverlayPeer - Update(OverlayAddr) OverlayAddr // returns the updated version of the original -} - -// entry represents a Kademlia table entry (an extension of OverlayPeer) +// entry represents a Kademlia table entry (an extension of BzzAddr) type entry struct { - OverlayPeer + *BzzAddr + conn *Peer seenAt time.Time retries int } -// newEntry creates a kademlia peer from an OverlayPeer interface -func newEntry(p OverlayPeer) *entry { +// newEntry creates a kademlia peer from a *Peer +func newEntry(p *BzzAddr) *entry { return &entry{ - OverlayPeer: p, - seenAt: time.Now(), + BzzAddr: p, + seenAt: time.Now(), } } -// Bin is the binary (bitvector) serialisation of the entry address -func (e *entry) Bin() string { - return pot.ToBin(e.addr().Address()) -} - // Label is a short tag for the entry for debug func Label(e *entry) string { return fmt.Sprintf("%s (%d)", e.Hex()[:4], e.retries) @@ -152,29 +129,12 @@ func Label(e *entry) string { // Hex is the hexadecimal serialisation of the entry address func (e *entry) Hex() string { - return fmt.Sprintf("%x", e.addr().Address()) + return fmt.Sprintf("%x", e.Address()) } -// String is the short tag for the entry -func (e *entry) String() string { - return fmt.Sprintf("%s (%d)", e.Hex()[:8], e.retries) -} - -// addr returns the kad peer record (OverlayAddr) corresponding to the entry -func (e *entry) addr() OverlayAddr { - a, _ := e.OverlayPeer.(OverlayAddr) - return a -} - -// conn returns the connected peer (OverlayPeer) corresponding to the entry -func (e *entry) conn() OverlayConn { - c, _ := e.OverlayPeer.(OverlayConn) - return c -} - -// Register enters each OverlayAddr as kademlia peer record into the +// Register enters each address as kademlia peer record into the // database of known peer addresses -func (k *Kademlia) Register(peers []OverlayAddr) error { +func (k *Kademlia) Register(peers ...*BzzAddr) error { k.lock.Lock() defer k.lock.Unlock() var known, size int @@ -203,7 +163,6 @@ func (k *Kademlia) Register(peers []OverlayAddr) error { if k.addrCountC != nil && size-known > 0 { k.addrCountC <- k.addrs.Size() } - // log.Trace(fmt.Sprintf("%x registered %v peers, %v known, total: %v", k.BaseAddr()[:4], size, known, k.addrs.Size())) k.sendNeighbourhoodDepthChange() return nil @@ -212,7 +171,7 @@ func (k *Kademlia) Register(peers []OverlayAddr) error { // SuggestPeer returns a known peer for the lowest proximity bin for the // lowest bincount below depth // naturally if there is an empty row it returns a peer for that -func (k *Kademlia) SuggestPeer() (a OverlayAddr, o int, want bool) { +func (k *Kademlia) SuggestPeer() (a *BzzAddr, o int, want bool) { k.lock.Lock() defer k.lock.Unlock() minsize := k.MinBinSize @@ -224,15 +183,18 @@ func (k *Kademlia) SuggestPeer() (a OverlayAddr, o int, want bool) { if po < depth { return false } - a = k.callable(val) + e := val.(*entry) + c := k.callable(e) + if c { + a = e.BzzAddr + } ppo = po - return a == nil + return !c }) if a != nil { log.Trace(fmt.Sprintf("%08x candidate nearest neighbour found: %v (%v)", k.BaseAddr()[:4], a, ppo)) return a, 0, false } - // log.Trace(fmt.Sprintf("%08x no candidate nearest neighbours to connect to (Depth: %v, minProxSize: %v) %#v", k.BaseAddr()[:4], depth, k.MinProxBinSize, a)) var bpo []int prev := -1 @@ -250,7 +212,6 @@ func (k *Kademlia) SuggestPeer() (a OverlayAddr, o int, want bool) { }) // all buckets are full, ie., minsize == k.MinBinSize if len(bpo) == 0 { - // log.Debug(fmt.Sprintf("%08x: all bins saturated", k.BaseAddr()[:4])) return nil, 0, false } // as long as we got candidate peers to connect to @@ -264,8 +225,12 @@ func (k *Kademlia) SuggestPeer() (a OverlayAddr, o int, want bool) { return false } return f(func(val pot.Val, _ int) bool { - a = k.callable(val) - return a == nil + e := val.(*entry) + c := k.callable(e) + if c { + a = e.BzzAddr + } + return !c }) }) // found a candidate @@ -282,25 +247,26 @@ func (k *Kademlia) SuggestPeer() (a OverlayAddr, o int, want bool) { } // On inserts the peer as a kademlia peer into the live peers -func (k *Kademlia) On(p OverlayConn) (uint8, bool) { +func (k *Kademlia) On(p *Peer) (uint8, bool) { k.lock.Lock() defer k.lock.Unlock() - e := newEntry(p) var ins bool k.conns, _, _, _ = pot.Swap(k.conns, p, pof, func(v pot.Val) pot.Val { // if not found live if v == nil { ins = true // insert new online peer into conns - return e + return p } // found among live peers, do nothing return v }) if ins { + a := newEntry(p.BzzAddr) + a.conn = p // insert new online peer into addrs k.addrs, _, _, _ = pot.Swap(k.addrs, p, pof, func(v pot.Val) pot.Val { - return e + return a }) // send new address count value only if the peer is inserted if k.addrCountC != nil { @@ -324,6 +290,8 @@ func (k *Kademlia) On(p OverlayConn) (uint8, bool) { // Not receiving from the returned channel will block On function // when the neighbourhood depth is changed. func (k *Kademlia) NeighbourhoodDepthC() <-chan int { + k.lock.Lock() + defer k.lock.Unlock() if k.nDepthC == nil { k.nDepthC = make(chan int) } @@ -357,7 +325,7 @@ func (k *Kademlia) AddrCountC() <-chan int { } // Off removes a peer from among live peers -func (k *Kademlia) Off(p OverlayConn) { +func (k *Kademlia) Off(p *Peer) { k.lock.Lock() defer k.lock.Unlock() var del bool @@ -367,7 +335,7 @@ func (k *Kademlia) Off(p OverlayConn) { panic(fmt.Sprintf("connected peer not found %v", p)) } del = true - return newEntry(p.Off()) + return newEntry(p.BzzAddr) }) if del { @@ -383,7 +351,7 @@ func (k *Kademlia) Off(p OverlayConn) { } } -func (k *Kademlia) EachBin(base []byte, pof pot.Pof, o int, eachBinFunc func(conn OverlayConn, po int) bool) { +func (k *Kademlia) EachBin(base []byte, pof pot.Pof, o int, eachBinFunc func(conn *Peer, po int) bool) { k.lock.RLock() defer k.lock.RUnlock() @@ -403,7 +371,7 @@ func (k *Kademlia) EachBin(base []byte, pof pot.Pof, o int, eachBinFunc func(con for bin := startPo; bin <= endPo; bin++ { f(func(val pot.Val, _ int) bool { - return eachBinFunc(val.(*entry).conn(), bin) + return eachBinFunc(val.(*Peer), bin) }) } return true @@ -413,13 +381,13 @@ func (k *Kademlia) EachBin(base []byte, pof pot.Pof, o int, eachBinFunc func(con // EachConn is an iterator with args (base, po, f) applies f to each live peer // that has proximity order po or less as measured from the base // if base is nil, kademlia base address is used -func (k *Kademlia) EachConn(base []byte, o int, f func(OverlayConn, int, bool) bool) { +func (k *Kademlia) EachConn(base []byte, o int, f func(*Peer, int, bool) bool) { k.lock.RLock() defer k.lock.RUnlock() k.eachConn(base, o, f) } -func (k *Kademlia) eachConn(base []byte, o int, f func(OverlayConn, int, bool) bool) { +func (k *Kademlia) eachConn(base []byte, o int, f func(*Peer, int, bool) bool) { if len(base) == 0 { base = k.base } @@ -428,20 +396,20 @@ func (k *Kademlia) eachConn(base []byte, o int, f func(OverlayConn, int, bool) b if po > o { return true } - return f(val.(*entry).conn(), po, po >= depth) + return f(val.(*Peer), po, po >= depth) }) } // EachAddr called with (base, po, f) is an iterator applying f to each known peer // that has proximity order po or less as measured from the base // if base is nil, kademlia base address is used -func (k *Kademlia) EachAddr(base []byte, o int, f func(OverlayAddr, int, bool) bool) { +func (k *Kademlia) EachAddr(base []byte, o int, f func(*BzzAddr, int, bool) bool) { k.lock.RLock() defer k.lock.RUnlock() k.eachAddr(base, o, f) } -func (k *Kademlia) eachAddr(base []byte, o int, f func(OverlayAddr, int, bool) bool) { +func (k *Kademlia) eachAddr(base []byte, o int, f func(*BzzAddr, int, bool) bool) { if len(base) == 0 { base = k.base } @@ -450,7 +418,7 @@ func (k *Kademlia) eachAddr(base []byte, o int, f func(OverlayAddr, int, bool) b if po > o { return true } - return f(val.(*entry).addr(), po, po >= depth) + return f(val.(*entry).BzzAddr, po, po >= depth) }) } @@ -472,12 +440,11 @@ func (k *Kademlia) neighbourhoodDepth() (depth int) { return depth } -// callable when called with val, -func (k *Kademlia) callable(val pot.Val) OverlayAddr { - e := val.(*entry) +// callable decides if an address entry represents a callable peer +func (k *Kademlia) callable(e *entry) bool { // not callable if peer is live or exceeded maxRetries - if e.conn() != nil || e.retries > k.MaxRetries { - return nil + if e.conn != nil || e.retries > k.MaxRetries { + return false } // calculate the allowed number of retries based on time lapsed since last seen timeAgo := int64(time.Since(e.seenAt)) @@ -491,17 +458,17 @@ func (k *Kademlia) callable(val pot.Val) OverlayAddr { // peer can be retried again if retries < e.retries { log.Trace(fmt.Sprintf("%08x: %v long time since last try (at %v) needed before retry %v, wait only warrants %v", k.BaseAddr()[:4], e, timeAgo, e.retries, retries)) - return nil + return false } // function to sanction or prevent suggesting a peer - if k.Reachable != nil && !k.Reachable(e.addr()) { + if k.Reachable != nil && !k.Reachable(e.BzzAddr) { log.Trace(fmt.Sprintf("%08x: peer %v is temporarily not callable", k.BaseAddr()[:4], e)) - return nil + return false } e.retries++ log.Trace(fmt.Sprintf("%08x: peer %v is callable", k.BaseAddr()[:4], e)) - return e.addr() + return true } // BaseAddr return the kademlia base address @@ -516,7 +483,8 @@ func (k *Kademlia) String() string { return k.string() } -// String returns kademlia table + kaddb table displayed with ascii +// string returns kademlia table + kaddb table displayed with ascii +// caller must hold the lock func (k *Kademlia) string() string { wsrow := " " var rows []string @@ -538,7 +506,7 @@ func (k *Kademlia) string() string { row := []string{fmt.Sprintf("%2d", size)} rest -= size f(func(val pot.Val, vpo int) bool { - e := val.(*entry) + e := val.(*Peer) row = append(row, fmt.Sprintf("%x", e.Address()[:2])) rowlen++ return rowlen < 4 @@ -594,8 +562,9 @@ type PeerPot struct { EmptyBins []int } -// NewPeerPotMap creates a map of pot record of OverlayAddr with keys +// NewPeerPotMap creates a map of pot record of *BzzAddr with keys // as hexadecimal representations of the address. +// used for testing only func NewPeerPotMap(kadMinProxSize int, addrs [][]byte) map[string]*PeerPot { // create a table of all nodes for health check np := pot.NewPot(nil, 0) @@ -640,6 +609,7 @@ func NewPeerPotMap(kadMinProxSize int, addrs [][]byte) map[string]*PeerPot { // saturation returns the lowest proximity order that the bin for that order // has less than n peers +// It is used in Healthy function for testing only func (k *Kademlia) saturation(n int) int { prev := -1 k.addrs.EachBin(k.base, pof, 0, func(po, size int, f func(func(val pot.Val, i int) bool) bool) bool { @@ -654,7 +624,7 @@ func (k *Kademlia) saturation(n int) int { } // full returns true if all required bins have connected peers. -// It is used in Healthy function. +// It is used in Healthy function for testing only func (k *Kademlia) full(emptyBins []int) (full bool) { prev := 0 e := len(emptyBins) @@ -688,10 +658,13 @@ func (k *Kademlia) full(emptyBins []int) (full bool) { return e == 0 } +// knowNearestNeighbours tests if all known nearest neighbours given as arguments +// are found in the addressbook +// It is used in Healthy function for testing only func (k *Kademlia) knowNearestNeighbours(peers [][]byte) bool { pm := make(map[string]bool) - k.eachAddr(nil, 255, func(p OverlayAddr, po int, nn bool) bool { + k.eachAddr(nil, 255, func(p *BzzAddr, po int, nn bool) bool { if !nn { return false } @@ -709,10 +682,13 @@ func (k *Kademlia) knowNearestNeighbours(peers [][]byte) bool { return true } +// gotNearestNeighbours tests if all known nearest neighbours given as arguments +// are connected peers +// It is used in Healthy function for testing only func (k *Kademlia) gotNearestNeighbours(peers [][]byte) (got bool, n int, missing [][]byte) { pm := make(map[string]bool) - k.eachConn(nil, 255, func(p OverlayConn, po int, nn bool) bool { + k.eachConn(nil, 255, func(p *Peer, po int, nn bool) bool { if !nn { return false } @@ -735,6 +711,7 @@ func (k *Kademlia) gotNearestNeighbours(peers [][]byte) (got bool, n int, missin } // Health state of the Kademlia +// used for testing only type Health struct { KnowNN bool // whether node knows all its nearest neighbours GotNN bool // whether node is connected to all its nearest neighbours @@ -746,6 +723,7 @@ type Health struct { // Healthy reports the health state of the kademlia connectivity // returns a Health struct +// used for testing only func (k *Kademlia) Healthy(pp *PeerPot) *Health { k.lock.RLock() defer k.lock.RUnlock() diff --git a/swarm/network/kademlia_test.go b/swarm/network/kademlia_test.go index b60e1e9a3..903c8dbda 100644 --- a/swarm/network/kademlia_test.go +++ b/swarm/network/kademlia_test.go @@ -38,71 +38,42 @@ func testKadPeerAddr(s string) *BzzAddr { return &BzzAddr{OAddr: a, UAddr: a} } -type testDropPeer struct { - Peer - dropc chan error -} - -type dropError struct { - error - addr string -} - -func (d *testDropPeer) Drop(err error) { - err2 := &dropError{err, binStr(d)} - d.dropc <- err2 -} - -type testKademlia struct { - *Kademlia - Discovery bool - dropc chan error -} - -func newTestKademlia(b string) *testKademlia { +func newTestKademlia(b string) *Kademlia { params := NewKadParams() params.MinBinSize = 1 params.MinProxBinSize = 2 base := pot.NewAddressFromString(b) - return &testKademlia{ - NewKademlia(base, params), - false, - make(chan error), - } + return NewKademlia(base, params) } -func (k *testKademlia) newTestKadPeer(s string) Peer { - return &testDropPeer{&BzzPeer{BzzAddr: testKadPeerAddr(s)}, k.dropc} +func newTestKadPeer(k *Kademlia, s string) *Peer { + return NewPeer(&BzzPeer{BzzAddr: testKadPeerAddr(s)}, k) } -func (k *testKademlia) On(ons ...string) *testKademlia { +func On(k *Kademlia, ons ...string) { for _, s := range ons { - k.Kademlia.On(k.newTestKadPeer(s).(OverlayConn)) + k.On(newTestKadPeer(k, s)) } - return k } -func (k *testKademlia) Off(offs ...string) *testKademlia { +func Off(k *Kademlia, offs ...string) { for _, s := range offs { - k.Kademlia.Off(k.newTestKadPeer(s).(OverlayConn)) + k.Off(newTestKadPeer(k, s)) } - - return k } -func (k *testKademlia) Register(regs ...string) *testKademlia { - var as []OverlayAddr +func Register(k *Kademlia, regs ...string) { + var as []*BzzAddr for _, s := range regs { as = append(as, testKadPeerAddr(s)) } - err := k.Kademlia.Register(as) + err := k.Register(as...) if err != nil { panic(err.Error()) } - return k } -func testSuggestPeer(t *testing.T, k *testKademlia, expAddr string, expPo int, expWant bool) error { +func testSuggestPeer(k *Kademlia, expAddr string, expPo int, expWant bool) error { addr, o, want := k.SuggestPeer() if binStr(addr) != expAddr { return fmt.Errorf("incorrect peer address suggested. expected %v, got %v", expAddr, binStr(addr)) @@ -116,7 +87,7 @@ func testSuggestPeer(t *testing.T, k *testKademlia, expAddr string, expPo int, e return nil } -func binStr(a OverlayPeer) string { +func binStr(a *BzzAddr) string { if a == nil { return "<nil>" } @@ -125,15 +96,17 @@ func binStr(a OverlayPeer) string { func TestSuggestPeerBug(t *testing.T) { // 2 row gap, unsaturated proxbin, no callables -> want PO 0 - k := newTestKademlia("00000000").On( + k := newTestKademlia("00000000") + On(k, "10000000", "11000000", "01000000", "00010000", "00011000", - ).Off( + ) + Off(k, "01000000", ) - err := testSuggestPeer(t, k, "01000000", 0, false) + err := testSuggestPeer(k, "01000000", 0, false) if err != nil { t.Fatal(err.Error()) } @@ -141,140 +114,140 @@ func TestSuggestPeerBug(t *testing.T) { func TestSuggestPeerFindPeers(t *testing.T) { // 2 row gap, unsaturated proxbin, no callables -> want PO 0 - k := newTestKademlia("00000000").On("00100000") - err := testSuggestPeer(t, k, "<nil>", 0, false) + k := newTestKademlia("00000000") + On(k, "00100000") + err := testSuggestPeer(k, "<nil>", 0, false) if err != nil { t.Fatal(err.Error()) } // 2 row gap, saturated proxbin, no callables -> want PO 0 - k.On("00010000") - err = testSuggestPeer(t, k, "<nil>", 0, false) + On(k, "00010000") + err = testSuggestPeer(k, "<nil>", 0, false) if err != nil { t.Fatal(err.Error()) } // 1 row gap (1 less), saturated proxbin, no callables -> want PO 1 - k.On("10000000") - err = testSuggestPeer(t, k, "<nil>", 1, false) + On(k, "10000000") + err = testSuggestPeer(k, "<nil>", 1, false) if err != nil { t.Fatal(err.Error()) } // no gap (1 less), saturated proxbin, no callables -> do not want more - k.On("01000000", "00100001") - err = testSuggestPeer(t, k, "<nil>", 0, false) + On(k, "01000000", "00100001") + err = testSuggestPeer(k, "<nil>", 0, false) if err != nil { t.Fatal(err.Error()) } // oversaturated proxbin, > do not want more - k.On("00100001") - err = testSuggestPeer(t, k, "<nil>", 0, false) + On(k, "00100001") + err = testSuggestPeer(k, "<nil>", 0, false) if err != nil { t.Fatal(err.Error()) } // reintroduce gap, disconnected peer callable - // log.Info(k.String()) - k.Off("01000000") - err = testSuggestPeer(t, k, "01000000", 0, false) + Off(k, "01000000") + err = testSuggestPeer(k, "01000000", 0, false) if err != nil { t.Fatal(err.Error()) } // second time disconnected peer not callable // with reasonably set Interval - err = testSuggestPeer(t, k, "<nil>", 1, true) + err = testSuggestPeer(k, "<nil>", 1, true) if err != nil { t.Fatal(err.Error()) } // on and off again, peer callable again - k.On("01000000") - k.Off("01000000") - err = testSuggestPeer(t, k, "01000000", 0, false) + On(k, "01000000") + Off(k, "01000000") + err = testSuggestPeer(k, "01000000", 0, false) if err != nil { t.Fatal(err.Error()) } - k.On("01000000") + On(k, "01000000") // new closer peer appears, it is immediately wanted - k.Register("00010001") - err = testSuggestPeer(t, k, "00010001", 0, false) + Register(k, "00010001") + err = testSuggestPeer(k, "00010001", 0, false) if err != nil { t.Fatal(err.Error()) } // PO1 disconnects - k.On("00010001") + On(k, "00010001") log.Info(k.String()) - k.Off("01000000") + Off(k, "01000000") log.Info(k.String()) // second time, gap filling - err = testSuggestPeer(t, k, "01000000", 0, false) + err = testSuggestPeer(k, "01000000", 0, false) if err != nil { t.Fatal(err.Error()) } - k.On("01000000") - err = testSuggestPeer(t, k, "<nil>", 0, false) + On(k, "01000000") + err = testSuggestPeer(k, "<nil>", 0, false) if err != nil { t.Fatal(err.Error()) } k.MinBinSize = 2 - err = testSuggestPeer(t, k, "<nil>", 0, true) + err = testSuggestPeer(k, "<nil>", 0, true) if err != nil { t.Fatal(err.Error()) } - k.Register("01000001") - err = testSuggestPeer(t, k, "01000001", 0, false) + Register(k, "01000001") + err = testSuggestPeer(k, "01000001", 0, false) if err != nil { t.Fatal(err.Error()) } - k.On("10000001") + On(k, "10000001") log.Trace(fmt.Sprintf("Kad:\n%v", k.String())) - err = testSuggestPeer(t, k, "<nil>", 1, true) + err = testSuggestPeer(k, "<nil>", 1, true) if err != nil { t.Fatal(err.Error()) } - k.On("01000001") - err = testSuggestPeer(t, k, "<nil>", 0, false) + On(k, "01000001") + err = testSuggestPeer(k, "<nil>", 0, false) if err != nil { t.Fatal(err.Error()) } k.MinBinSize = 3 - k.Register("10000010") - err = testSuggestPeer(t, k, "10000010", 0, false) + Register(k, "10000010") + err = testSuggestPeer(k, "10000010", 0, false) if err != nil { t.Fatal(err.Error()) } - k.On("10000010") - err = testSuggestPeer(t, k, "<nil>", 1, false) + On(k, "10000010") + err = testSuggestPeer(k, "<nil>", 1, false) if err != nil { t.Fatal(err.Error()) } - k.On("01000010") - err = testSuggestPeer(t, k, "<nil>", 2, false) + On(k, "01000010") + err = testSuggestPeer(k, "<nil>", 2, false) if err != nil { t.Fatal(err.Error()) } - k.On("00100010") - err = testSuggestPeer(t, k, "<nil>", 3, false) + On(k, "00100010") + err = testSuggestPeer(k, "<nil>", 3, false) if err != nil { t.Fatal(err.Error()) } - k.On("00010010") - err = testSuggestPeer(t, k, "<nil>", 0, false) + On(k, "00010010") + err = testSuggestPeer(k, "<nil>", 0, false) if err != nil { t.Fatal(err.Error()) } @@ -282,10 +255,8 @@ func TestSuggestPeerFindPeers(t *testing.T) { } func TestSuggestPeerRetries(t *testing.T) { - t.Skip("Test is disabled, because it is flaky. It fails with kademlia_test.go:346: incorrect peer address suggested. expected <nil>, got 01000000") - // 2 row gap, unsaturated proxbin, no callables -> want PO 0 k := newTestKademlia("00000000") - k.RetryInterval = int64(100 * time.Millisecond) // cycle + k.RetryInterval = int64(300 * time.Millisecond) // cycle k.MaxRetries = 50 k.RetryExponent = 2 sleep := func(n int) { @@ -296,53 +267,53 @@ func TestSuggestPeerRetries(t *testing.T) { time.Sleep(time.Duration(ts)) } - k.Register("01000000") - k.On("00000001", "00000010") - err := testSuggestPeer(t, k, "01000000", 0, false) + Register(k, "01000000") + On(k, "00000001", "00000010") + err := testSuggestPeer(k, "01000000", 0, false) if err != nil { t.Fatal(err.Error()) } - err = testSuggestPeer(t, k, "<nil>", 0, false) + err = testSuggestPeer(k, "<nil>", 0, false) if err != nil { t.Fatal(err.Error()) } sleep(1) - err = testSuggestPeer(t, k, "01000000", 0, false) + err = testSuggestPeer(k, "01000000", 0, false) if err != nil { t.Fatal(err.Error()) } - err = testSuggestPeer(t, k, "<nil>", 0, false) + err = testSuggestPeer(k, "<nil>", 0, false) if err != nil { t.Fatal(err.Error()) } sleep(1) - err = testSuggestPeer(t, k, "01000000", 0, false) + err = testSuggestPeer(k, "01000000", 0, false) if err != nil { t.Fatal(err.Error()) } - err = testSuggestPeer(t, k, "<nil>", 0, false) + err = testSuggestPeer(k, "<nil>", 0, false) if err != nil { t.Fatal(err.Error()) } sleep(2) - err = testSuggestPeer(t, k, "01000000", 0, false) + err = testSuggestPeer(k, "01000000", 0, false) if err != nil { t.Fatal(err.Error()) } - err = testSuggestPeer(t, k, "<nil>", 0, false) + err = testSuggestPeer(k, "<nil>", 0, false) if err != nil { t.Fatal(err.Error()) } sleep(2) - err = testSuggestPeer(t, k, "<nil>", 0, false) + err = testSuggestPeer(k, "<nil>", 0, false) if err != nil { t.Fatal(err.Error()) } @@ -350,7 +321,9 @@ func TestSuggestPeerRetries(t *testing.T) { } func TestKademliaHiveString(t *testing.T) { - k := newTestKademlia("00000000").On("01000000", "00100000").Register("10000000", "10000001") + k := newTestKademlia("00000000") + On(k, "01000000", "00100000") + Register(k, "10000000", "10000001") k.MaxProxDisplay = 8 h := k.String() expH := "\n=========================================================================\nMon Feb 27 12:10:28 UTC 2017 KΛÐΞMLIΛ hive: queen's address: 000000\npopulation: 2 (4), MinProxBinSize: 2, MinBinSize: 1, MaxBinSize: 4\n000 0 | 2 8100 (0) 8000 (0)\n============ DEPTH: 1 ==========================================\n001 1 4000 | 1 4000 (0)\n002 1 2000 | 1 2000 (0)\n003 0 | 0\n004 0 | 0\n005 0 | 0\n006 0 | 0\n007 0 | 0\n=========================================================================" @@ -378,7 +351,7 @@ func testKademliaCase(t *testing.T, pivotAddr string, addrs ...string) { continue } p := &BzzAddr{OAddr: a, UAddr: a} - if err := k.Register([]OverlayAddr{p}); err != nil { + if err := k.Register(p); err != nil { t.Fatal(err) } } @@ -392,12 +365,12 @@ func testKademliaCase(t *testing.T, pivotAddr string, addrs ...string) { if a == nil { break } - k.On(&BzzPeer{BzzAddr: a.(*BzzAddr)}) + k.On(NewPeer(&BzzPeer{BzzAddr: a}, k)) } h := k.Healthy(pp) if !(h.GotNN && h.KnowNN && h.Full) { - t.Error("not healthy") + t.Fatalf("not healthy: %#v\n%v", h, k.String()) } } diff --git a/swarm/network/networkid_test.go b/swarm/network/networkid_test.go index 05134b083..91a1f6d7b 100644 --- a/swarm/network/networkid_test.go +++ b/swarm/network/networkid_test.go @@ -92,7 +92,7 @@ func TestNetworkID(t *testing.T) { if kademlias[node].addrs.Size() != len(netIDGroup)-1 { t.Fatalf("Kademlia size has not expected peer size. Kademlia size: %d, expected size: %d", kademlias[node].addrs.Size(), len(netIDGroup)-1) } - kademlias[node].EachAddr(nil, 0, func(addr OverlayAddr, _ int, _ bool) bool { + kademlias[node].EachAddr(nil, 0, func(addr *BzzAddr, _ int, _ bool) bool { found := false for _, nd := range netIDGroup { p := ToOverlayAddr(nd.Bytes()) diff --git a/swarm/network/priorityqueue/priorityqueue.go b/swarm/network/priorityqueue/priorityqueue.go index fab638c9e..538502605 100644 --- a/swarm/network/priorityqueue/priorityqueue.go +++ b/swarm/network/priorityqueue/priorityqueue.go @@ -28,10 +28,13 @@ package priorityqueue import ( "context" "errors" + + "github.com/ethereum/go-ethereum/log" ) var ( - errContention = errors.New("queue contention") + ErrContention = errors.New("contention") + errBadPriority = errors.New("bad priority") wakey = struct{}{} @@ -39,7 +42,7 @@ var ( // PriorityQueue is the basic structure type PriorityQueue struct { - queues []chan interface{} + Queues []chan interface{} wakeup chan struct{} } @@ -50,27 +53,29 @@ func New(n int, l int) *PriorityQueue { queues[i] = make(chan interface{}, l) } return &PriorityQueue{ - queues: queues, + Queues: queues, wakeup: make(chan struct{}, 1), } } // Run is a forever loop popping items from the queues func (pq *PriorityQueue) Run(ctx context.Context, f func(interface{})) { - top := len(pq.queues) - 1 + top := len(pq.Queues) - 1 p := top READ: for { - q := pq.queues[p] + q := pq.Queues[p] select { case <-ctx.Done(): return case x := <-q: + log.Trace("priority.queue f(x)", "p", p, "len(Queues[p])", len(pq.Queues[p])) f(x) p = top default: if p > 0 { p-- + log.Trace("priority.queue p > 0", "p", p) continue READ } p = top @@ -78,6 +83,7 @@ READ: case <-ctx.Done(): return case <-pq.wakeup: + log.Trace("priority.queue wakeup", "p", p) } } } @@ -85,23 +91,15 @@ READ: // Push pushes an item to the appropriate queue specified in the priority argument // if context is given it waits until either the item is pushed or the Context aborts -// otherwise returns errContention if the queue is full -func (pq *PriorityQueue) Push(ctx context.Context, x interface{}, p int) error { - if p < 0 || p >= len(pq.queues) { +func (pq *PriorityQueue) Push(x interface{}, p int) error { + if p < 0 || p >= len(pq.Queues) { return errBadPriority } - if ctx == nil { - select { - case pq.queues[p] <- x: - default: - return errContention - } - } else { - select { - case pq.queues[p] <- x: - case <-ctx.Done(): - return ctx.Err() - } + log.Trace("priority.queue push", "p", p, "len(Queues[p])", len(pq.Queues[p])) + select { + case pq.Queues[p] <- x: + default: + return ErrContention } select { case pq.wakeup <- wakey: diff --git a/swarm/network/priorityqueue/priorityqueue_test.go b/swarm/network/priorityqueue/priorityqueue_test.go index cd54250f8..ed8b575c2 100644 --- a/swarm/network/priorityqueue/priorityqueue_test.go +++ b/swarm/network/priorityqueue/priorityqueue_test.go @@ -30,7 +30,7 @@ func TestPriorityQueue(t *testing.T) { results = append(results, v.(string)) wg.Done() }) - pq.Push(context.Background(), "2.0", 2) + pq.Push("2.0", 2) wg.Wait() if results[0] != "2.0" { t.Errorf("expected first result %q, got %q", "2.0", results[0]) @@ -66,7 +66,7 @@ Loop: { priorities: []int{0, 0, 0}, values: []string{"0.0", "0.0", "0.1"}, - errors: []error{nil, nil, errContention}, + errors: []error{nil, nil, ErrContention}, }, } { var results []string @@ -74,7 +74,7 @@ Loop: pq := New(3, 2) wg.Add(len(tc.values)) for j, value := range tc.values { - err := pq.Push(nil, value, tc.priorities[j]) + err := pq.Push(value, tc.priorities[j]) if tc.errors != nil && err != tc.errors[j] { t.Errorf("expected push error %v, got %v", tc.errors[j], err) continue Loop diff --git a/swarm/network/protocol.go b/swarm/network/protocol.go index 7f7ca5eed..ef0956d5f 100644 --- a/swarm/network/protocol.go +++ b/swarm/network/protocol.go @@ -62,32 +62,6 @@ var DiscoverySpec = &protocols.Spec{ }, } -// Addr interface that peerPool needs -type Addr interface { - OverlayPeer - Over() []byte - Under() []byte - String() string - Update(OverlayAddr) OverlayAddr -} - -// Peer interface represents an live peer connection -type Peer interface { - Addr // the address of a peer - Conn // the live connection (protocols.Peer) - LastActive() time.Time // last time active -} - -// Conn interface represents an live peer connection -type Conn interface { - ID() discover.NodeID // the key that uniquely identifies the Node for the peerPool - Handshake(context.Context, interface{}, func(interface{}) error) (interface{}, error) // can send messages - Send(context.Context, interface{}) error // can send messages - Drop(error) // disconnect this peer - Run(func(context.Context, interface{}) error) error // the run function to run a protocol - Off() OverlayAddr -} - // BzzConfig captures the config params used by the hive type BzzConfig struct { OverlayAddr []byte // base address of the overlay network @@ -114,7 +88,7 @@ type Bzz struct { // * bzz config // * overlay driver // * peer store -func NewBzz(config *BzzConfig, kad Overlay, store state.Store, streamerSpec *protocols.Spec, streamerRun func(*BzzPeer) error) *Bzz { +func NewBzz(config *BzzConfig, kad *Kademlia, store state.Store, streamerSpec *protocols.Spec, streamerRun func(*BzzPeer) error) *Bzz { return &Bzz{ Hive: NewHive(config.HiveParams, kad, store), NetworkID: config.NetworkID, @@ -131,7 +105,7 @@ func (b *Bzz) UpdateLocalAddr(byteaddr []byte) *BzzAddr { b.localAddr = b.localAddr.Update(&BzzAddr{ UAddr: byteaddr, OAddr: b.localAddr.OAddr, - }).(*BzzAddr) + }) return b.localAddr } @@ -274,7 +248,7 @@ type BzzPeer struct { LightNode bool } -func NewBzzTestPeer(p *protocols.Peer, addr *BzzAddr) *BzzPeer { +func NewBzzPeer(p *protocols.Peer, addr *BzzAddr) *BzzPeer { return &BzzPeer{ Peer: p, localAddr: addr, @@ -282,11 +256,6 @@ func NewBzzTestPeer(p *protocols.Peer, addr *BzzAddr) *BzzPeer { } } -// Off returns the overlay peer record for offline persistence -func (p *BzzPeer) Off() OverlayAddr { - return p.BzzAddr -} - // LastActive returns the time the peer was last active func (p *BzzPeer) LastActive() time.Time { return p.lastActive @@ -388,8 +357,8 @@ func (a *BzzAddr) ID() discover.NodeID { } // Update updates the underlay address of a peer record -func (a *BzzAddr) Update(na OverlayAddr) OverlayAddr { - return &BzzAddr{a.OAddr, na.(Addr).Under()} +func (a *BzzAddr) Update(na *BzzAddr) *BzzAddr { + return &BzzAddr{a.OAddr, na.UAddr} } // String pretty prints the address @@ -410,9 +379,9 @@ func RandomAddr() *BzzAddr { } // NewNodeIDFromAddr transforms the underlay address to an adapters.NodeID -func NewNodeIDFromAddr(addr Addr) discover.NodeID { - log.Info(fmt.Sprintf("uaddr=%s", string(addr.Under()))) - node := discover.MustParseNode(string(addr.Under())) +func NewNodeIDFromAddr(addr *BzzAddr) discover.NodeID { + log.Info(fmt.Sprintf("uaddr=%s", string(addr.UAddr))) + node := discover.MustParseNode(string(addr.UAddr)) return node.ID } diff --git a/swarm/network/simulation/simulation.go b/swarm/network/simulation/simulation.go index 74f9d98ee..096f7322c 100644 --- a/swarm/network/simulation/simulation.go +++ b/swarm/network/simulation/simulation.go @@ -94,7 +94,7 @@ func New(services map[string]ServiceFunc) (s *Simulation) { } s.Net = simulations.NewNetwork( - adapters.NewSimAdapter(adapterServices), + adapters.NewTCPAdapter(adapterServices), &simulations.NetworkConfig{ID: "0"}, ) @@ -164,17 +164,6 @@ var maxParallelCleanups = 10 func (s *Simulation) Close() { close(s.done) - // Close all connections before calling the Network Shutdown. - // It is possible that p2p.Server.Stop will block if there are - // existing connections. - for _, c := range s.Net.Conns { - if c.Up { - s.Net.Disconnect(c.One, c.Other) - } - } - s.shutdownWG.Wait() - s.Net.Shutdown() - sem := make(chan struct{}, maxParallelCleanups) s.mu.RLock() cleanupFuncs := make([]func(), len(s.cleanupFuncs)) @@ -206,6 +195,9 @@ func (s *Simulation) Close() { } close(s.runC) } + + s.shutdownWG.Wait() + s.Net.Shutdown() } // Done returns a channel that is closed when the simulation diff --git a/swarm/network/simulations/discovery/discovery_test.go b/swarm/network/simulations/discovery/discovery_test.go index acf3479e5..913d6d837 100644 --- a/swarm/network/simulations/discovery/discovery_test.go +++ b/swarm/network/simulations/discovery/discovery_test.go @@ -556,8 +556,8 @@ func newService(ctx *adapters.ServiceContext) (node.Service, error) { kp.MinProxBinSize = testMinProxBinSize if ctx.Config.Reachable != nil { - kp.Reachable = func(o network.OverlayAddr) bool { - return ctx.Config.Reachable(o.(*network.BzzAddr).ID()) + kp.Reachable = func(o *network.BzzAddr) bool { + return ctx.Config.Reachable(o.ID()) } } kad := network.NewKademlia(addr.Over(), kp) diff --git a/swarm/network/stream/common_test.go b/swarm/network/stream/common_test.go index 491dc9fd5..e0d776e34 100644 --- a/swarm/network/stream/common_test.go +++ b/swarm/network/stream/common_test.go @@ -107,9 +107,14 @@ func newStreamerTester(t *testing.T) (*p2ptest.ProtocolTester, *Registry, *stora return nil, nil, nil, removeDataDir, err } - db := storage.NewDBAPI(localStore) - delivery := NewDelivery(to, db) - streamer := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), nil) + netStore, err := storage.NewNetStore(localStore, nil) + if err != nil { + return nil, nil, nil, removeDataDir, err + } + + delivery := NewDelivery(to, netStore) + netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New + streamer := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), nil) teardown := func() { streamer.Close() removeDataDir() @@ -150,14 +155,14 @@ func newRoundRobinStore(stores ...storage.ChunkStore) *roundRobinStore { } } -func (rrs *roundRobinStore) Get(ctx context.Context, addr storage.Address) (*storage.Chunk, error) { +func (rrs *roundRobinStore) Get(ctx context.Context, addr storage.Address) (storage.Chunk, error) { return nil, errors.New("get not well defined on round robin store") } -func (rrs *roundRobinStore) Put(ctx context.Context, chunk *storage.Chunk) { +func (rrs *roundRobinStore) Put(ctx context.Context, chunk storage.Chunk) error { i := atomic.AddUint32(&rrs.index, 1) idx := int(i) % len(rrs.stores) - rrs.stores[idx].Put(ctx, chunk) + return rrs.stores[idx].Put(ctx, chunk) } func (rrs *roundRobinStore) Close() { diff --git a/swarm/network/stream/delivery.go b/swarm/network/stream/delivery.go index 36040339d..d0f27eebc 100644 --- a/swarm/network/stream/delivery.go +++ b/swarm/network/stream/delivery.go @@ -19,12 +19,11 @@ package stream import ( "context" "errors" - "time" - "github.com/ethereum/go-ethereum/common" + "fmt" + "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/p2p/discover" - cp "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/log" "github.com/ethereum/go-ethereum/swarm/network" "github.com/ethereum/go-ethereum/swarm/spancontext" @@ -46,39 +45,34 @@ var ( ) type Delivery struct { - db *storage.DBAPI - overlay network.Overlay - receiveC chan *ChunkDeliveryMsg - getPeer func(discover.NodeID) *Peer + chunkStore storage.SyncChunkStore + kad *network.Kademlia + getPeer func(discover.NodeID) *Peer } -func NewDelivery(overlay network.Overlay, db *storage.DBAPI) *Delivery { - d := &Delivery{ - db: db, - overlay: overlay, - receiveC: make(chan *ChunkDeliveryMsg, deliveryCap), +func NewDelivery(kad *network.Kademlia, chunkStore storage.SyncChunkStore) *Delivery { + return &Delivery{ + chunkStore: chunkStore, + kad: kad, } - - go d.processReceivedChunks() - return d } // SwarmChunkServer implements Server type SwarmChunkServer struct { deliveryC chan []byte batchC chan []byte - db *storage.DBAPI + chunkStore storage.ChunkStore currentLen uint64 quit chan struct{} } // NewSwarmChunkServer is SwarmChunkServer constructor -func NewSwarmChunkServer(db *storage.DBAPI) *SwarmChunkServer { +func NewSwarmChunkServer(chunkStore storage.ChunkStore) *SwarmChunkServer { s := &SwarmChunkServer{ - deliveryC: make(chan []byte, deliveryCap), - batchC: make(chan []byte), - db: db, - quit: make(chan struct{}), + deliveryC: make(chan []byte, deliveryCap), + batchC: make(chan []byte), + chunkStore: chunkStore, + quit: make(chan struct{}), } go s.processDeliveries() return s @@ -123,13 +117,11 @@ func (s *SwarmChunkServer) Close() { // GetData retrives chunk data from db store func (s *SwarmChunkServer) GetData(ctx context.Context, key []byte) ([]byte, error) { - chunk, err := s.db.Get(ctx, storage.Address(key)) - if err == storage.ErrFetching { - <-chunk.ReqC - } else if err != nil { + chunk, err := s.chunkStore.Get(ctx, storage.Address(key)) + if err != nil { return nil, err } - return chunk.SData, nil + return chunk.Data(), nil } // RetrieveRequestMsg is the protocol msg for chunk retrieve requests @@ -153,57 +145,39 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req * return err } streamer := s.Server.(*SwarmChunkServer) - chunk, created := d.db.GetOrCreateRequest(ctx, req.Addr) - if chunk.ReqC != nil { - if created { - if err := d.RequestFromPeers(ctx, chunk.Addr[:], true, sp.ID()); err != nil { - log.Warn("unable to forward chunk request", "peer", sp.ID(), "key", chunk.Addr, "err", err) - chunk.SetErrored(storage.ErrChunkForward) - return nil - } + + var cancel func() + // TODO: do something with this hardcoded timeout, maybe use TTL in the future + ctx, cancel = context.WithTimeout(context.WithValue(ctx, "peer", sp.ID().String()), network.RequestTimeout) + + go func() { + select { + case <-ctx.Done(): + case <-streamer.quit: } - go func() { - var osp opentracing.Span - ctx, osp = spancontext.StartSpan( - ctx, - "waiting.delivery") - defer osp.Finish() - - t := time.NewTimer(10 * time.Minute) - defer t.Stop() - - log.Debug("waiting delivery", "peer", sp.ID(), "hash", req.Addr, "node", common.Bytes2Hex(d.overlay.BaseAddr()), "created", created) - start := time.Now() - select { - case <-chunk.ReqC: - log.Debug("retrieve request ReqC closed", "peer", sp.ID(), "hash", req.Addr, "time", time.Since(start)) - case <-t.C: - log.Debug("retrieve request timeout", "peer", sp.ID(), "hash", req.Addr) - chunk.SetErrored(storage.ErrChunkTimeout) - return - } - chunk.SetErrored(nil) - - if req.SkipCheck { - err := sp.Deliver(ctx, chunk, s.priority) - if err != nil { - log.Warn("ERROR in handleRetrieveRequestMsg, DROPPING peer!", "err", err) - sp.Drop(err) - } + cancel() + }() + + go func() { + chunk, err := d.chunkStore.Get(ctx, req.Addr) + if err != nil { + log.Warn("ChunkStore.Get can not retrieve chunk", "err", err) + return + } + if req.SkipCheck { + err = sp.Deliver(ctx, chunk, s.priority) + if err != nil { + log.Warn("ERROR in handleRetrieveRequestMsg", "err", err) } - streamer.deliveryC <- chunk.Addr[:] - }() - return nil - } - // TODO: call the retrieve function of the outgoing syncer - if req.SkipCheck { - log.Trace("deliver", "peer", sp.ID(), "hash", chunk.Addr) - if length := len(chunk.SData); length < 9 { - log.Error("Chunk.SData to deliver is too short", "len(chunk.SData)", length, "address", chunk.Addr) + return } - return sp.Deliver(ctx, chunk, s.priority) - } - streamer.deliveryC <- chunk.Addr[:] + select { + case streamer.deliveryC <- chunk.Address()[:]: + case <-streamer.quit: + } + + }() + return nil } @@ -213,6 +187,7 @@ type ChunkDeliveryMsg struct { peer *Peer // set in handleChunkDeliveryMsg } +// TODO: Fix context SNAFU func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req *ChunkDeliveryMsg) error { var osp opentracing.Span ctx, osp = spancontext.StartSpan( @@ -220,81 +195,63 @@ func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req *Ch "chunk.delivery") defer osp.Finish() - req.peer = sp - d.receiveC <- req - return nil -} + processReceivedChunksCount.Inc(1) -func (d *Delivery) processReceivedChunks() { -R: - for req := range d.receiveC { - processReceivedChunksCount.Inc(1) - - if len(req.SData) > cp.DefaultSize+8 { - log.Warn("received chunk is bigger than expected", "len", len(req.SData)) - continue R - } - - // this should be has locally - chunk, err := d.db.Get(context.TODO(), req.Addr) - if err == nil { - continue R - } - if err != storage.ErrFetching { - log.Error("processReceivedChunks db error", "addr", req.Addr, "err", err, "chunk", chunk) - continue R - } - select { - case <-chunk.ReqC: - log.Error("someone else delivered?", "hash", chunk.Addr.Hex()) - continue R - default: - } - - chunk.SData = req.SData - d.db.Put(context.TODO(), chunk) - - go func(req *ChunkDeliveryMsg) { - err := chunk.WaitToStore() + go func() { + req.peer = sp + err := d.chunkStore.Put(ctx, storage.NewChunk(req.Addr, req.SData)) + if err != nil { if err == storage.ErrChunkInvalid { + // we removed this log because it spams the logs + // TODO: Enable this log line + // log.Warn("invalid chunk delivered", "peer", sp.ID(), "chunk", req.Addr, ) req.peer.Drop(err) } - }(req) - } + } + }() + return nil } // RequestFromPeers sends a chunk retrieve request to -func (d *Delivery) RequestFromPeers(ctx context.Context, hash []byte, skipCheck bool, peersToSkip ...discover.NodeID) error { - var success bool - var err error +func (d *Delivery) RequestFromPeers(ctx context.Context, req *network.Request) (*discover.NodeID, chan struct{}, error) { requestFromPeersCount.Inc(1) + var sp *Peer + spID := req.Source - d.overlay.EachConn(hash, 255, func(p network.OverlayConn, po int, nn bool) bool { - spId := p.(network.Peer).ID() - for _, p := range peersToSkip { - if p == spId { - log.Trace("Delivery.RequestFromPeers: skip peer", "peer", spId) + if spID != nil { + sp = d.getPeer(*spID) + if sp == nil { + return nil, nil, fmt.Errorf("source peer %v not found", spID.String()) + } + } else { + d.kad.EachConn(req.Addr[:], 255, func(p *network.Peer, po int, nn bool) bool { + id := p.ID() + // TODO: skip light nodes that do not accept retrieve requests + if req.SkipPeer(id.String()) { + log.Trace("Delivery.RequestFromPeers: skip peer", "peer id", id) return true } - } - sp := d.getPeer(spId) + sp = d.getPeer(id) + if sp == nil { + log.Warn("Delivery.RequestFromPeers: peer not found", "id", id) + return true + } + spID = &id + return false + }) if sp == nil { - log.Warn("Delivery.RequestFromPeers: peer not found", "id", spId) - return true + return nil, nil, errors.New("no peer found") } - err = sp.SendPriority(ctx, &RetrieveRequestMsg{ - Addr: hash, - SkipCheck: skipCheck, - }, Top) - if err != nil { - return true - } - requestFromPeersEachCount.Inc(1) - success = true - return false - }) - if success { - return nil } - return errors.New("no peer found") + + err := sp.SendPriority(ctx, &RetrieveRequestMsg{ + Addr: req.Addr, + SkipCheck: req.SkipCheck, + }, Top) + if err != nil { + return nil, nil, err + } + requestFromPeersEachCount.Inc(1) + + return spID, sp.quit, nil } diff --git a/swarm/network/stream/delivery_test.go b/swarm/network/stream/delivery_test.go index 972cc859a..ece54d4ee 100644 --- a/swarm/network/stream/delivery_test.go +++ b/swarm/network/stream/delivery_test.go @@ -47,7 +47,13 @@ func TestStreamerRetrieveRequest(t *testing.T) { peerID := tester.IDs[0] - streamer.delivery.RequestFromPeers(context.TODO(), hash0[:], true) + ctx := context.Background() + req := network.NewRequest( + storage.Address(hash0[:]), + true, + &sync.Map{}, + ) + streamer.delivery.RequestFromPeers(ctx, req) err = tester.TestExchanges(p2ptest.Exchange{ Label: "RetrieveRequestMsg", @@ -93,7 +99,7 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) { { Code: 5, Msg: &RetrieveRequestMsg{ - Addr: chunk.Addr[:], + Addr: chunk.Address()[:], }, Peer: peerID, }, @@ -139,10 +145,11 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) { }) hash := storage.Address(hash0[:]) - chunk := storage.NewChunk(hash, nil) - chunk.SData = hash - localStore.Put(context.TODO(), chunk) - chunk.WaitToStore() + chunk := storage.NewChunk(hash, hash) + err = localStore.Put(context.TODO(), chunk) + if err != nil { + t.Fatalf("Expected no err got %v", err) + } err = tester.TestExchanges(p2ptest.Exchange{ Label: "RetrieveRequestMsg", @@ -178,10 +185,11 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) { } hash = storage.Address(hash1[:]) - chunk = storage.NewChunk(hash, nil) - chunk.SData = hash1[:] - localStore.Put(context.TODO(), chunk) - chunk.WaitToStore() + chunk = storage.NewChunk(hash, hash1[:]) + err = localStore.Put(context.TODO(), chunk) + if err != nil { + t.Fatalf("Expected no err got %v", err) + } err = tester.TestExchanges(p2ptest.Exchange{ Label: "RetrieveRequestMsg", @@ -235,16 +243,6 @@ func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) { chunkKey := hash0[:] chunkData := hash1[:] - chunk, created := localStore.GetOrCreateRequest(context.TODO(), chunkKey) - - if !created { - t.Fatal("chunk already exists") - } - select { - case <-chunk.ReqC: - t.Fatal("chunk is already received") - default: - } err = tester.TestExchanges(p2ptest.Exchange{ Label: "Subscribe message", @@ -261,7 +259,7 @@ func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) { }, }, p2ptest.Exchange{ - Label: "ChunkDeliveryRequest message", + Label: "ChunkDelivery message", Triggers: []p2ptest.Trigger{ { Code: 6, @@ -277,21 +275,26 @@ func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) { if err != nil { t.Fatalf("Expected no error, got %v", err) } - - timeout := time.NewTimer(1 * time.Second) - - select { - case <-timeout.C: - t.Fatal("timeout receiving chunk") - case <-chunk.ReqC: + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + // wait for the chunk to get stored + storedChunk, err := localStore.Get(ctx, chunkKey) + for err != nil { + select { + case <-ctx.Done(): + t.Fatalf("Chunk is not in localstore after timeout, err: %v", err) + default: + } + storedChunk, err = localStore.Get(ctx, chunkKey) + time.Sleep(50 * time.Millisecond) } - storedChunk, err := localStore.Get(context.TODO(), chunkKey) if err != nil { t.Fatalf("Expected no error, got %v", err) } - if !bytes.Equal(storedChunk.SData, chunkData) { + if !bytes.Equal(storedChunk.Data(), chunkData) { t.Fatal("Retrieved chunk has different data than original") } @@ -324,19 +327,20 @@ func testDeliveryFromNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck store.Close() } localStore := store.(*storage.LocalStore) - db := storage.NewDBAPI(localStore) + netStore, err := storage.NewNetStore(localStore, nil) + if err != nil { + return nil, nil, err + } + kad := network.NewKademlia(addr.Over(), network.NewKadParams()) - delivery := NewDelivery(kad, db) + delivery := NewDelivery(kad, netStore) + netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New - r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{ + r := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ SkipCheck: skipCheck, }) bucket.Store(bucketKeyRegistry, r) - retrieveFunc := func(ctx context.Context, chunk *storage.Chunk) error { - return delivery.RequestFromPeers(ctx, chunk.Addr[:], skipCheck) - } - netStore := storage.NewNetStore(localStore, retrieveFunc) fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams()) bucket.Store(bucketKeyFileStore, fileStore) @@ -498,7 +502,6 @@ func BenchmarkDeliveryFromNodesWithCheck(b *testing.B) { func benchmarkDeliveryFromNodes(b *testing.B, nodes, conns, chunkCount int, skipCheck bool) { sim := simulation.New(map[string]simulation.ServiceFunc{ "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { - id := ctx.Config.ID addr := network.NewAddrFromNodeID(id) store, datadir, err := createTestLocalStorageForID(id, addr) @@ -511,20 +514,20 @@ func benchmarkDeliveryFromNodes(b *testing.B, nodes, conns, chunkCount int, skip store.Close() } localStore := store.(*storage.LocalStore) - db := storage.NewDBAPI(localStore) + netStore, err := storage.NewNetStore(localStore, nil) + if err != nil { + return nil, nil, err + } kad := network.NewKademlia(addr.Over(), network.NewKadParams()) - delivery := NewDelivery(kad, db) + delivery := NewDelivery(kad, netStore) + netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New - r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{ + r := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ SkipCheck: skipCheck, DoSync: true, SyncUpdateDelay: 0, }) - retrieveFunc := func(ctx context.Context, chunk *storage.Chunk) error { - return delivery.RequestFromPeers(ctx, chunk.Addr[:], skipCheck) - } - netStore := storage.NewNetStore(localStore, retrieveFunc) fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams()) bucket.Store(bucketKeyFileStore, fileStore) diff --git a/swarm/network/stream/intervals_test.go b/swarm/network/stream/intervals_test.go index f4294134b..452aaca76 100644 --- a/swarm/network/stream/intervals_test.go +++ b/swarm/network/stream/intervals_test.go @@ -38,13 +38,18 @@ import ( "github.com/ethereum/go-ethereum/swarm/storage" ) -func TestIntervals(t *testing.T) { +func TestIntervalsLive(t *testing.T) { testIntervals(t, true, nil, false) - testIntervals(t, false, NewRange(9, 26), false) - testIntervals(t, true, NewRange(9, 26), false) - testIntervals(t, true, nil, true) +} + +func TestIntervalsHistory(t *testing.T) { + testIntervals(t, false, NewRange(9, 26), false) testIntervals(t, false, NewRange(9, 26), true) +} + +func TestIntervalsLiveAndHistory(t *testing.T) { + testIntervals(t, true, NewRange(9, 26), false) testIntervals(t, true, NewRange(9, 26), true) } @@ -70,17 +75,21 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) { os.RemoveAll(datadir) } localStore := store.(*storage.LocalStore) - db := storage.NewDBAPI(localStore) + netStore, err := storage.NewNetStore(localStore, nil) + if err != nil { + return nil, nil, err + } kad := network.NewKademlia(addr.Over(), network.NewKadParams()) - delivery := NewDelivery(kad, db) + delivery := NewDelivery(kad, netStore) + netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New - r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{ + r := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ SkipCheck: skipCheck, }) bucket.Store(bucketKeyRegistry, r) r.RegisterClientFunc(externalStreamName, func(p *Peer, t string, live bool) (Client, error) { - return newTestExternalClient(db), nil + return newTestExternalClient(netStore), nil }) r.RegisterServerFunc(externalStreamName, func(p *Peer, t string, live bool) (Server, error) { return newTestExternalServer(t, externalStreamSessionAt, externalStreamMaxKeys, nil), nil @@ -101,9 +110,13 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) { t.Fatal(err) } - ctx, cancel := context.WithTimeout(context.Background(), 100*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) defer cancel() + if _, err := sim.WaitTillHealthy(ctx, 2); err != nil { + t.Fatal(err) + } + result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error { nodeIDs := sim.UpNodeIDs() storer := nodeIDs[0] @@ -136,11 +149,6 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) { liveErrC := make(chan error) historyErrC := make(chan error) - if _, err := sim.WaitTillHealthy(ctx, 2); err != nil { - log.Error("WaitKademlia error: %v", "err", err) - return err - } - log.Debug("Watching for disconnections") disconnections := sim.PeerEvents( context.Background(), @@ -148,6 +156,11 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) { simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeDrop), ) + err = registry.Subscribe(storer, NewStream(externalStreamName, "", live), history, Top) + if err != nil { + return err + } + go func() { for d := range disconnections { if d.Error != nil { @@ -172,7 +185,7 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) { var liveHashesChan chan []byte liveHashesChan, err = getHashes(ctx, registry, storer, NewStream(externalStreamName, "", true)) if err != nil { - log.Error("Subscription error: %v", "err", err) + log.Error("get hashes", "err", err) return } i := externalStreamSessionAt @@ -216,6 +229,7 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) { var historyHashesChan chan []byte historyHashesChan, err = getHashes(ctx, registry, storer, NewStream(externalStreamName, "", false)) if err != nil { + log.Error("get hashes", "err", err) return } @@ -252,10 +266,6 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) { } }() - err = registry.Subscribe(storer, NewStream(externalStreamName, "", live), history, Top) - if err != nil { - return err - } if err := <-liveErrC; err != nil { return err } @@ -302,34 +312,32 @@ func enableNotifications(r *Registry, peerID discover.NodeID, s Stream) error { type testExternalClient struct { hashes chan []byte - db *storage.DBAPI + store storage.SyncChunkStore enableNotificationsC chan struct{} } -func newTestExternalClient(db *storage.DBAPI) *testExternalClient { +func newTestExternalClient(store storage.SyncChunkStore) *testExternalClient { return &testExternalClient{ hashes: make(chan []byte), - db: db, + store: store, enableNotificationsC: make(chan struct{}), } } -func (c *testExternalClient) NeedData(ctx context.Context, hash []byte) func() { - chunk, _ := c.db.GetOrCreateRequest(ctx, hash) - if chunk.ReqC == nil { +func (c *testExternalClient) NeedData(ctx context.Context, hash []byte) func(context.Context) error { + wait := c.store.FetchFunc(ctx, storage.Address(hash)) + if wait == nil { return nil } - c.hashes <- hash - //NOTE: This was failing on go1.9.x with a deadlock. - //Sometimes this function would just block - //It is commented now, but it may be well worth after the chunk refactor - //to re-enable this and see if the problem has been addressed - /* - return func() { - return chunk.WaitToStore() + select { + case c.hashes <- hash: + case <-ctx.Done(): + log.Warn("testExternalClient NeedData context", "err", ctx.Err()) + return func(_ context.Context) error { + return ctx.Err() } - */ - return nil + } + return wait } func (c *testExternalClient) BatchDone(Stream, uint64, []byte, []byte) func() (*TakeoverProof, error) { diff --git a/swarm/network/stream/messages.go b/swarm/network/stream/messages.go index a19f63589..2e1a81e82 100644 --- a/swarm/network/stream/messages.go +++ b/swarm/network/stream/messages.go @@ -18,9 +18,7 @@ package stream import ( "context" - "errors" "fmt" - "sync" "time" "github.com/ethereum/go-ethereum/metrics" @@ -31,6 +29,8 @@ import ( opentracing "github.com/opentracing/opentracing-go" ) +var syncBatchTimeout = 30 * time.Second + // Stream defines a unique stream identifier. type Stream struct { // Name is used for Client and Server functions identification. @@ -117,8 +117,7 @@ func (p *Peer) handleSubscribeMsg(ctx context.Context, req *SubscribeMsg) (err e go func() { if err := p.SendOfferedHashes(os, from, to); err != nil { - log.Warn("SendOfferedHashes dropping peer", "err", err) - p.Drop(err) + log.Warn("SendOfferedHashes error", "peer", p.ID().TerminalString(), "err", err) } }() @@ -135,8 +134,7 @@ func (p *Peer) handleSubscribeMsg(ctx context.Context, req *SubscribeMsg) (err e } go func() { if err := p.SendOfferedHashes(os, req.History.From, req.History.To); err != nil { - log.Warn("SendOfferedHashes dropping peer", "err", err) - p.Drop(err) + log.Warn("SendOfferedHashes error", "peer", p.ID().TerminalString(), "err", err) } }() } @@ -202,38 +200,52 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg if err != nil { return fmt.Errorf("error initiaising bitvector of length %v: %v", len(hashes)/HashSize, err) } - wg := sync.WaitGroup{} + + ctr := 0 + errC := make(chan error) + ctx, cancel := context.WithTimeout(ctx, syncBatchTimeout) + + ctx = context.WithValue(ctx, "source", p.ID().String()) for i := 0; i < len(hashes); i += HashSize { hash := hashes[i : i+HashSize] if wait := c.NeedData(ctx, hash); wait != nil { + ctr++ want.Set(i/HashSize, true) - wg.Add(1) // create request and wait until the chunk data arrives and is stored - go func(w func()) { - w() - wg.Done() + go func(w func(context.Context) error) { + select { + case errC <- w(ctx): + case <-ctx.Done(): + } }(wait) } } - // done := make(chan bool) - // go func() { - // wg.Wait() - // close(done) - // }() - // go func() { - // select { - // case <-done: - // s.next <- s.batchDone(p, req, hashes) - // case <-time.After(1 * time.Second): - // p.Drop(errors.New("timeout waiting for batch to be delivered")) - // } - // }() + go func() { - wg.Wait() + defer cancel() + for i := 0; i < ctr; i++ { + select { + case err := <-errC: + if err != nil { + log.Debug("client.handleOfferedHashesMsg() error waiting for chunk, dropping peer", "peer", p.ID(), "err", err) + p.Drop(err) + return + } + case <-ctx.Done(): + log.Debug("client.handleOfferedHashesMsg() context done", "ctx.Err()", ctx.Err()) + return + case <-c.quit: + log.Debug("client.handleOfferedHashesMsg() quit") + return + } + } select { case c.next <- c.batchDone(p, req, hashes): case <-c.quit: + log.Debug("client.handleOfferedHashesMsg() quit") + case <-ctx.Done(): + log.Debug("client.handleOfferedHashesMsg() context done", "ctx.Err()", ctx.Err()) } }() // only send wantedKeysMsg if all missing chunks of the previous batch arrived @@ -242,7 +254,7 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg c.sessionAt = req.From } from, to := c.nextBatch(req.To + 1) - log.Trace("received offered batch", "peer", p.ID(), "stream", req.Stream, "from", req.From, "to", req.To) + log.Trace("set next batch", "peer", p.ID(), "stream", req.Stream, "from", req.From, "to", req.To, "addr", p.streamer.addr.ID()) if from == to { return nil } @@ -254,25 +266,25 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg To: to, } go func() { + log.Trace("sending want batch", "peer", p.ID(), "stream", msg.Stream, "from", msg.From, "to", msg.To) select { - case <-time.After(120 * time.Second): - log.Warn("handleOfferedHashesMsg timeout, so dropping peer") - p.Drop(errors.New("handle offered hashes timeout")) - return case err := <-c.next: if err != nil { - log.Warn("c.next dropping peer", "err", err) + log.Warn("c.next error dropping peer", "err", err) p.Drop(err) return } case <-c.quit: + log.Debug("client.handleOfferedHashesMsg() quit") + return + case <-ctx.Done(): + log.Debug("client.handleOfferedHashesMsg() context done", "ctx.Err()", ctx.Err()) return } log.Trace("sending want batch", "peer", p.ID(), "stream", msg.Stream, "from", msg.From, "to", msg.To) err := p.SendPriority(ctx, msg, c.priority) if err != nil { - log.Warn("SendPriority err, so dropping peer", "err", err) - p.Drop(err) + log.Warn("SendPriority error", "err", err) } }() return nil @@ -306,8 +318,7 @@ func (p *Peer) handleWantedHashesMsg(ctx context.Context, req *WantedHashesMsg) // launch in go routine since GetBatch blocks until new hashes arrive go func() { if err := p.SendOfferedHashes(s, req.From, req.To); err != nil { - log.Warn("SendOfferedHashes dropping peer", "err", err) - p.Drop(err) + log.Warn("SendOfferedHashes error", "err", err) } }() // go p.SendOfferedHashes(s, req.From, req.To) @@ -327,11 +338,7 @@ func (p *Peer) handleWantedHashesMsg(ctx context.Context, req *WantedHashesMsg) if err != nil { return fmt.Errorf("handleWantedHashesMsg get data %x: %v", hash, err) } - chunk := storage.NewChunk(hash, nil) - chunk.SData = data - if length := len(chunk.SData); length < 9 { - log.Error("Chunk.SData to sync is too short", "len(chunk.SData)", length, "address", chunk.Addr) - } + chunk := storage.NewChunk(hash, data) if err := p.Deliver(ctx, chunk, s.priority); err != nil { return err } diff --git a/swarm/network/stream/peer.go b/swarm/network/stream/peer.go index 80b9ab711..1466a7a9c 100644 --- a/swarm/network/stream/peer.go +++ b/swarm/network/stream/peer.go @@ -33,8 +33,6 @@ import ( opentracing "github.com/opentracing/opentracing-go" ) -var sendTimeout = 30 * time.Second - type notFoundError struct { t string s Stream @@ -83,8 +81,40 @@ func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer { ctx, cancel := context.WithCancel(context.Background()) go p.pq.Run(ctx, func(i interface{}) { wmsg := i.(WrappedPriorityMsg) - p.Send(wmsg.Context, wmsg.Msg) + err := p.Send(wmsg.Context, wmsg.Msg) + if err != nil { + log.Error("Message send error, dropping peer", "peer", p.ID(), "err", err) + p.Drop(err) + } }) + + // basic monitoring for pq contention + go func(pq *pq.PriorityQueue) { + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + for { + select { + case <-ticker.C: + var len_maxi int + var cap_maxi int + for k := range pq.Queues { + if len_maxi < len(pq.Queues[k]) { + len_maxi = len(pq.Queues[k]) + } + + if cap_maxi < cap(pq.Queues[k]) { + cap_maxi = cap(pq.Queues[k]) + } + } + + metrics.GetOrRegisterGauge(fmt.Sprintf("pq_len_%s", p.ID().TerminalString()), nil).Update(int64(len_maxi)) + metrics.GetOrRegisterGauge(fmt.Sprintf("pq_cap_%s", p.ID().TerminalString()), nil).Update(int64(cap_maxi)) + case <-p.quit: + return + } + } + }(p.pq) + go func() { <-p.quit cancel() @@ -93,7 +123,7 @@ func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer { } // Deliver sends a storeRequestMsg protocol message to the peer -func (p *Peer) Deliver(ctx context.Context, chunk *storage.Chunk, priority uint8) error { +func (p *Peer) Deliver(ctx context.Context, chunk storage.Chunk, priority uint8) error { var sp opentracing.Span ctx, sp = spancontext.StartSpan( ctx, @@ -101,8 +131,8 @@ func (p *Peer) Deliver(ctx context.Context, chunk *storage.Chunk, priority uint8 defer sp.Finish() msg := &ChunkDeliveryMsg{ - Addr: chunk.Addr, - SData: chunk.SData, + Addr: chunk.Address(), + SData: chunk.Data(), } return p.SendPriority(ctx, msg, priority) } @@ -111,13 +141,16 @@ func (p *Peer) Deliver(ctx context.Context, chunk *storage.Chunk, priority uint8 func (p *Peer) SendPriority(ctx context.Context, msg interface{}, priority uint8) error { defer metrics.GetOrRegisterResettingTimer(fmt.Sprintf("peer.sendpriority_t.%d", priority), nil).UpdateSince(time.Now()) metrics.GetOrRegisterCounter(fmt.Sprintf("peer.sendpriority.%d", priority), nil).Inc(1) - cctx, cancel := context.WithTimeout(context.Background(), sendTimeout) - defer cancel() wmsg := WrappedPriorityMsg{ Context: ctx, Msg: msg, } - return p.pq.Push(cctx, wmsg, int(priority)) + err := p.pq.Push(wmsg, int(priority)) + if err == pq.ErrContention { + log.Warn("dropping peer on priority queue contention", "peer", p.ID()) + p.Drop(err) + } + return err } // SendOfferedHashes sends OfferedHashesMsg protocol msg diff --git a/swarm/network/stream/snapshot_retrieval_test.go b/swarm/network/stream/snapshot_retrieval_test.go index 4ff947b21..19eaad34e 100644 --- a/swarm/network/stream/snapshot_retrieval_test.go +++ b/swarm/network/stream/snapshot_retrieval_test.go @@ -124,23 +124,30 @@ func runFileRetrievalTest(nodeCount int) error { return nil, nil, err } bucket.Store(bucketKeyStore, store) - cleanup = func() { - os.RemoveAll(datadir) - store.Close() - } + localStore := store.(*storage.LocalStore) - db := storage.NewDBAPI(localStore) + netStore, err := storage.NewNetStore(localStore, nil) + if err != nil { + return nil, nil, err + } kad := network.NewKademlia(addr.Over(), network.NewKadParams()) - delivery := NewDelivery(kad, db) + delivery := NewDelivery(kad, netStore) + netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New - r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{ + r := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ DoSync: true, SyncUpdateDelay: 3 * time.Second, }) - fileStore := storage.NewFileStore(storage.NewNetStore(localStore, nil), storage.NewFileStoreParams()) + fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams()) bucket.Store(bucketKeyFileStore, fileStore) + cleanup = func() { + os.RemoveAll(datadir) + netStore.Close() + r.Close() + } + return r, cleanup, nil }, @@ -267,24 +274,31 @@ func runRetrievalTest(chunkCount int, nodeCount int) error { return nil, nil, err } bucket.Store(bucketKeyStore, store) - cleanup = func() { - os.RemoveAll(datadir) - store.Close() - } + localStore := store.(*storage.LocalStore) - db := storage.NewDBAPI(localStore) + netStore, err := storage.NewNetStore(localStore, nil) + if err != nil { + return nil, nil, err + } kad := network.NewKademlia(addr.Over(), network.NewKadParams()) - delivery := NewDelivery(kad, db) + delivery := NewDelivery(kad, netStore) + netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New - r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{ + r := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ DoSync: true, SyncUpdateDelay: 0, }) - fileStore := storage.NewFileStore(storage.NewNetStore(localStore, nil), storage.NewFileStoreParams()) + fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams()) bucketKeyFileStore = simulation.BucketKey("filestore") bucket.Store(bucketKeyFileStore, fileStore) + cleanup = func() { + os.RemoveAll(datadir) + netStore.Close() + r.Close() + } + return r, cleanup, nil }, diff --git a/swarm/network/stream/snapshot_sync_test.go b/swarm/network/stream/snapshot_sync_test.go index 4e1ab09fc..7cd09099c 100644 --- a/swarm/network/stream/snapshot_sync_test.go +++ b/swarm/network/stream/snapshot_sync_test.go @@ -21,6 +21,7 @@ import ( "fmt" "io" "os" + "runtime" "sync" "testing" "time" @@ -39,15 +40,20 @@ import ( mockdb "github.com/ethereum/go-ethereum/swarm/storage/mock/db" ) -const testMinProxBinSize = 2 const MaxTimeout = 600 type synctestConfig struct { - addrs [][]byte - hashes []storage.Address - idToChunksMap map[discover.NodeID][]int - chunksToNodesMap map[string][]int - addrToIDMap map[string]discover.NodeID + addrs [][]byte + hashes []storage.Address + idToChunksMap map[discover.NodeID][]int + //chunksToNodesMap map[string][]int + addrToIDMap map[string]discover.NodeID +} + +// Tests in this file should not request chunks from peers. +// This function will panic indicating that there is a problem if request has been made. +func dummyRequestFromPeers(_ context.Context, req *network.Request) (*discover.NodeID, chan struct{}, error) { + panic(fmt.Sprintf("unexpected request: address %s, source %s", req.Addr.String(), req.Source.String())) } //This test is a syncing test for nodes. @@ -58,6 +64,9 @@ type synctestConfig struct { //they are expected to store based on the syncing protocol. //Number of chunks and nodes can be provided via commandline too. func TestSyncingViaGlobalSync(t *testing.T) { + if runtime.GOOS == "darwin" && os.Getenv("TRAVIS") == "true" { + t.Skip("Flaky on mac on travis") + } //if nodes/chunks have been provided via commandline, //run the tests with these values if *nodes != 0 && *chunks != 0 { @@ -86,11 +95,14 @@ func TestSyncingViaGlobalSync(t *testing.T) { } func TestSyncingViaDirectSubscribe(t *testing.T) { + if runtime.GOOS == "darwin" && os.Getenv("TRAVIS") == "true" { + t.Skip("Flaky on mac on travis") + } //if nodes/chunks have been provided via commandline, //run the tests with these values if *nodes != 0 && *chunks != 0 { log.Info(fmt.Sprintf("Running test with %d chunks and %d nodes...", *chunks, *nodes)) - err := testSyncingViaDirectSubscribe(*chunks, *nodes) + err := testSyncingViaDirectSubscribe(t, *chunks, *nodes) if err != nil { t.Fatal(err) } @@ -110,7 +122,7 @@ func TestSyncingViaDirectSubscribe(t *testing.T) { for _, chnk := range chnkCnt { for _, n := range nodeCnt { log.Info(fmt.Sprintf("Long running test with %d chunks and %d nodes...", chnk, n)) - err := testSyncingViaDirectSubscribe(chnk, n) + err := testSyncingViaDirectSubscribe(t, chnk, n) if err != nil { t.Fatal(err) } @@ -130,21 +142,27 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) { return nil, nil, err } bucket.Store(bucketKeyStore, store) - cleanup = func() { - os.RemoveAll(datadir) - store.Close() - } localStore := store.(*storage.LocalStore) - db := storage.NewDBAPI(localStore) + netStore, err := storage.NewNetStore(localStore, nil) + if err != nil { + return nil, nil, err + } kad := network.NewKademlia(addr.Over(), network.NewKadParams()) - delivery := NewDelivery(kad, db) + delivery := NewDelivery(kad, netStore) + netStore.NewNetFetcherFunc = network.NewFetcherFactory(dummyRequestFromPeers, true).New - r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{ + r := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ DoSync: true, SyncUpdateDelay: 3 * time.Second, }) bucket.Store(bucketKeyRegistry, r) + cleanup = func() { + os.RemoveAll(datadir) + netStore.Close() + r.Close() + } + return r, cleanup, nil }, @@ -166,9 +184,27 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) { t.Fatal(err) } - ctx, cancelSimRun := context.WithTimeout(context.Background(), 1*time.Minute) + ctx, cancelSimRun := context.WithTimeout(context.Background(), 2*time.Minute) defer cancelSimRun() + if _, err := sim.WaitTillHealthy(ctx, 2); err != nil { + t.Fatal(err) + } + + disconnections := sim.PeerEvents( + context.Background(), + sim.NodeIDs(), + simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeDrop), + ) + + go func() { + for d := range disconnections { + log.Error("peer drop", "node", d.NodeID, "peer", d.Event.Peer) + t.Fatal("unexpected disconnect") + cancelSimRun() + } + }() + result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error { nodeIDs := sim.UpNodeIDs() for _, n := range nodeIDs { @@ -197,10 +233,6 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) { conf.hashes = append(conf.hashes, hashes...) mapKeysToNodes(conf) - if _, err := sim.WaitTillHealthy(ctx, 2); err != nil { - return err - } - // File retrieval check is repeated until all uploaded files are retrieved from all nodes // or until the timeout is reached. allSuccess := false @@ -220,6 +252,7 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) { }() } for !allSuccess { + allSuccess = true for _, id := range nodeIDs { //for each expected chunk, check if it is in the local store localChunks := conf.idToChunksMap[id] @@ -252,7 +285,10 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) { log.Debug(fmt.Sprintf("Chunk %s IS FOUND for id %s", chunk, id)) } } - allSuccess = localSuccess + if !localSuccess { + allSuccess = false + break + } } } if !allSuccess { @@ -264,6 +300,7 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) { if result.Error != nil { t.Fatal(result.Error) } + log.Info("Simulation ended") } /* @@ -277,7 +314,7 @@ The test loads a snapshot file to construct the swarm network, assuming that the snapshot file identifies a healthy kademlia network. The snapshot should have 'streamer' in its service list. */ -func testSyncingViaDirectSubscribe(chunkCount int, nodeCount int) error { +func testSyncingViaDirectSubscribe(t *testing.T, chunkCount int, nodeCount int) error { sim := simulation.New(map[string]simulation.ServiceFunc{ "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { @@ -288,28 +325,34 @@ func testSyncingViaDirectSubscribe(chunkCount int, nodeCount int) error { return nil, nil, err } bucket.Store(bucketKeyStore, store) - cleanup = func() { - os.RemoveAll(datadir) - store.Close() - } localStore := store.(*storage.LocalStore) - db := storage.NewDBAPI(localStore) + netStore, err := storage.NewNetStore(localStore, nil) + if err != nil { + return nil, nil, err + } kad := network.NewKademlia(addr.Over(), network.NewKadParams()) - delivery := NewDelivery(kad, db) + delivery := NewDelivery(kad, netStore) + netStore.NewNetFetcherFunc = network.NewFetcherFactory(dummyRequestFromPeers, true).New - r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), nil) + r := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), nil) bucket.Store(bucketKeyRegistry, r) - fileStore := storage.NewFileStore(storage.NewNetStore(localStore, nil), storage.NewFileStoreParams()) + fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams()) bucket.Store(bucketKeyFileStore, fileStore) + cleanup = func() { + os.RemoveAll(datadir) + netStore.Close() + r.Close() + } + return r, cleanup, nil }, }) defer sim.Close() - ctx, cancelSimRun := context.WithTimeout(context.Background(), 1*time.Minute) + ctx, cancelSimRun := context.WithTimeout(context.Background(), 2*time.Minute) defer cancelSimRun() conf := &synctestConfig{} @@ -325,6 +368,24 @@ func testSyncingViaDirectSubscribe(chunkCount int, nodeCount int) error { return err } + if _, err := sim.WaitTillHealthy(ctx, 2); err != nil { + return err + } + + disconnections := sim.PeerEvents( + context.Background(), + sim.NodeIDs(), + simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeDrop), + ) + + go func() { + for d := range disconnections { + log.Error("peer drop", "node", d.NodeID, "peer", d.Event.Peer) + t.Fatal("unexpected disconnect") + cancelSimRun() + } + }() + result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error { nodeIDs := sim.UpNodeIDs() for _, n := range nodeIDs { @@ -402,6 +463,7 @@ func testSyncingViaDirectSubscribe(chunkCount int, nodeCount int) error { // or until the timeout is reached. allSuccess := false for !allSuccess { + allSuccess = true for _, id := range nodeIDs { //for each expected chunk, check if it is in the local store localChunks := conf.idToChunksMap[id] @@ -434,7 +496,10 @@ func testSyncingViaDirectSubscribe(chunkCount int, nodeCount int) error { log.Debug(fmt.Sprintf("Chunk %s IS FOUND for id %s", chunk, id)) } } - allSuccess = localSuccess + if !localSuccess { + allSuccess = false + break + } } } if !allSuccess { @@ -447,7 +512,7 @@ func testSyncingViaDirectSubscribe(chunkCount int, nodeCount int) error { return result.Error } - log.Info("Simulation terminated") + log.Info("Simulation ended") return nil } @@ -457,20 +522,14 @@ func testSyncingViaDirectSubscribe(chunkCount int, nodeCount int) error { //returns the number of subscriptions requested func startSyncing(r *Registry, conf *synctestConfig) (int, error) { var err error - - kad, ok := r.delivery.overlay.(*network.Kademlia) - if !ok { - return 0, fmt.Errorf("Not a Kademlia!") - } - + kad := r.delivery.kad subCnt := 0 //iterate over each bin and solicit needed subscription to bins - kad.EachBin(r.addr.Over(), pof, 0, func(conn network.OverlayConn, po int) bool { + kad.EachBin(r.addr.Over(), pof, 0, func(conn *network.Peer, po int) bool { //identify begin and start index of the bin(s) we want to subscribe to - histRange := &Range{} subCnt++ - err = r.RequestSubscription(conf.addrToIDMap[string(conn.Address())], NewStream("SYNC", FormatSyncBinKey(uint8(po)), true), histRange, Top) + err = r.RequestSubscription(conf.addrToIDMap[string(conn.Address())], NewStream("SYNC", FormatSyncBinKey(uint8(po)), true), NewRange(0, 0), High) if err != nil { log.Error(fmt.Sprintf("Error in RequestSubsciption! %v", err)) return false @@ -483,7 +542,6 @@ func startSyncing(r *Registry, conf *synctestConfig) (int, error) { //map chunk keys to addresses which are responsible func mapKeysToNodes(conf *synctestConfig) { - kmap := make(map[string][]int) nodemap := make(map[string][]int) //build a pot for chunk hashes np := pot.NewPot(nil, 0) @@ -492,36 +550,33 @@ func mapKeysToNodes(conf *synctestConfig) { indexmap[string(a)] = i np, _, _ = pot.Add(np, a, pof) } + + var kadMinProxSize = 2 + + ppmap := network.NewPeerPotMap(kadMinProxSize, conf.addrs) + //for each address, run EachNeighbour on the chunk hashes pot to identify closest nodes log.Trace(fmt.Sprintf("Generated hash chunk(s): %v", conf.hashes)) for i := 0; i < len(conf.hashes); i++ { - pl := 256 //highest possible proximity - var nns []int + var a []byte np.EachNeighbour([]byte(conf.hashes[i]), pof, func(val pot.Val, po int) bool { - a := val.([]byte) - if pl < 256 && pl != po { - return false - } - if pl == 256 || pl == po { - log.Trace(fmt.Sprintf("appending %s", conf.addrToIDMap[string(a)])) - nns = append(nns, indexmap[string(a)]) - nodemap[string(a)] = append(nodemap[string(a)], i) - } - if pl == 256 && len(nns) >= testMinProxBinSize { - //maxProxBinSize has been reached at this po, so save it - //we will add all other nodes at the same po - pl = po - } - return true + // take the first address + a = val.([]byte) + return false }) - kmap[string(conf.hashes[i])] = nns + + nns := ppmap[common.Bytes2Hex(a)].NNSet + nns = append(nns, a) + + for _, p := range nns { + nodemap[string(p)] = append(nodemap[string(p)], i) + } } for addr, chunks := range nodemap { //this selects which chunks are expected to be found with the given node conf.idToChunksMap[conf.addrToIDMap[addr]] = chunks } log.Debug(fmt.Sprintf("Map of expected chunks by ID: %v", conf.idToChunksMap)) - conf.chunksToNodesMap = kmap } //upload a file(chunks) to a single local node store diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go index cd0580a0c..1f1f34b7b 100644 --- a/swarm/network/stream/stream.go +++ b/swarm/network/stream/stream.go @@ -32,10 +32,8 @@ import ( "github.com/ethereum/go-ethereum/swarm/network" "github.com/ethereum/go-ethereum/swarm/network/stream/intervals" "github.com/ethereum/go-ethereum/swarm/pot" - "github.com/ethereum/go-ethereum/swarm/spancontext" "github.com/ethereum/go-ethereum/swarm/state" "github.com/ethereum/go-ethereum/swarm/storage" - opentracing "github.com/opentracing/opentracing-go" ) const ( @@ -43,8 +41,8 @@ const ( Mid High Top - PriorityQueue // number of queues - PriorityQueueCap = 32 // queue capacity + PriorityQueue = 4 // number of priority queues - Low, Mid, High, Top + PriorityQueueCap = 128 // queue capacity HashSize = 32 ) @@ -73,7 +71,7 @@ type RegistryOptions struct { } // NewRegistry is Streamer constructor -func NewRegistry(addr *network.BzzAddr, delivery *Delivery, db *storage.DBAPI, intervalsStore state.Store, options *RegistryOptions) *Registry { +func NewRegistry(addr *network.BzzAddr, delivery *Delivery, syncChunkStore storage.SyncChunkStore, intervalsStore state.Store, options *RegistryOptions) *Registry { if options == nil { options = &RegistryOptions{} } @@ -93,13 +91,13 @@ func NewRegistry(addr *network.BzzAddr, delivery *Delivery, db *storage.DBAPI, i streamer.api = NewAPI(streamer) delivery.getPeer = streamer.getPeer streamer.RegisterServerFunc(swarmChunkServerStreamName, func(_ *Peer, _ string, _ bool) (Server, error) { - return NewSwarmChunkServer(delivery.db), nil + return NewSwarmChunkServer(delivery.chunkStore), nil }) streamer.RegisterClientFunc(swarmChunkServerStreamName, func(p *Peer, t string, live bool) (Client, error) { - return NewSwarmSyncerClient(p, delivery.db, false, NewStream(swarmChunkServerStreamName, t, live)) + return NewSwarmSyncerClient(p, syncChunkStore, NewStream(swarmChunkServerStreamName, t, live)) }) - RegisterSwarmSyncerServer(streamer, db) - RegisterSwarmSyncerClient(streamer, db) + RegisterSwarmSyncerServer(streamer, syncChunkStore) + RegisterSwarmSyncerClient(streamer, syncChunkStore) if options.DoSync { // latestIntC function ensures that @@ -130,7 +128,7 @@ func NewRegistry(addr *network.BzzAddr, delivery *Delivery, db *storage.DBAPI, i // wait for kademlia table to be healthy time.Sleep(options.SyncUpdateDelay) - kad := streamer.delivery.overlay.(*network.Kademlia) + kad := streamer.delivery.kad depthC := latestIntC(kad.NeighbourhoodDepthC()) addressBookSizeC := latestIntC(kad.AddrCountC()) @@ -325,16 +323,6 @@ func (r *Registry) Quit(peerId discover.NodeID, s Stream) error { return peer.Send(context.TODO(), msg) } -func (r *Registry) Retrieve(ctx context.Context, chunk *storage.Chunk) error { - var sp opentracing.Span - ctx, sp = spancontext.StartSpan( - ctx, - "registry.retrieve") - defer sp.Finish() - - return r.delivery.RequestFromPeers(ctx, chunk.Addr[:], r.skipCheck) -} - func (r *Registry) NodeInfo() interface{} { return nil } @@ -398,9 +386,7 @@ func (r *Registry) Run(p *network.BzzPeer) error { // and they are no longer required after iteration, request to Quit // them will be send to appropriate peers. func (r *Registry) updateSyncing() { - // if overlay in not Kademlia, panic - kad := r.delivery.overlay.(*network.Kademlia) - + kad := r.delivery.kad // map of all SYNC streams for all peers // used at the and of the function to remove servers // that are not needed anymore @@ -421,8 +407,7 @@ func (r *Registry) updateSyncing() { r.peersMu.RUnlock() // request subscriptions for all nodes and bins - kad.EachBin(r.addr.Over(), pot.DefaultPof(256), 0, func(conn network.OverlayConn, bin int) bool { - p := conn.(network.Peer) + kad.EachBin(r.addr.Over(), pot.DefaultPof(256), 0, func(p *network.Peer, bin int) bool { log.Debug(fmt.Sprintf("Requesting subscription by: registry %s from peer %s for bin: %d", r.addr.ID(), p.ID(), bin)) // bin is always less then 256 and it is safe to convert it to type uint8 @@ -461,10 +446,11 @@ func (r *Registry) updateSyncing() { func (r *Registry) runProtocol(p *p2p.Peer, rw p2p.MsgReadWriter) error { peer := protocols.NewPeer(p, rw, Spec) - bzzPeer := network.NewBzzTestPeer(peer, r.addr) - r.delivery.overlay.On(bzzPeer) - defer r.delivery.overlay.Off(bzzPeer) - return r.Run(bzzPeer) + bp := network.NewBzzPeer(peer, r.addr) + np := network.NewPeer(bp, r.delivery.kad) + r.delivery.kad.On(np) + defer r.delivery.kad.Off(np) + return r.Run(bp) } // HandleMsg is the message handler that delegates incoming messages @@ -559,7 +545,7 @@ func (c client) NextInterval() (start, end uint64, err error) { // Client interface for incoming peer Streamer type Client interface { - NeedData(context.Context, []byte) func() + NeedData(context.Context, []byte) func(context.Context) error BatchDone(Stream, uint64, []byte, []byte) func() (*TakeoverProof, error) Close() } diff --git a/swarm/network/stream/streamer_test.go b/swarm/network/stream/streamer_test.go index 7523860c9..06e96b9a9 100644 --- a/swarm/network/stream/streamer_test.go +++ b/swarm/network/stream/streamer_test.go @@ -80,15 +80,17 @@ func newTestClient(t string) *testClient { } } -func (self *testClient) NeedData(ctx context.Context, hash []byte) func() { +func (self *testClient) NeedData(ctx context.Context, hash []byte) func(context.Context) error { self.receivedHashes[string(hash)] = hash if bytes.Equal(hash, hash0[:]) { - return func() { + return func(context.Context) error { <-self.wait0 + return nil } } else if bytes.Equal(hash, hash2[:]) { - return func() { + return func(context.Context) error { <-self.wait2 + return nil } } return nil diff --git a/swarm/network/stream/syncer.go b/swarm/network/stream/syncer.go index d7febe4a3..e9811a678 100644 --- a/swarm/network/stream/syncer.go +++ b/swarm/network/stream/syncer.go @@ -28,7 +28,6 @@ import ( ) const ( - // BatchSize = 2 BatchSize = 128 ) @@ -38,35 +37,37 @@ const ( // * (live/non-live historical) chunk syncing per proximity bin type SwarmSyncerServer struct { po uint8 - db *storage.DBAPI + store storage.SyncChunkStore sessionAt uint64 start uint64 + live bool quit chan struct{} } // NewSwarmSyncerServer is contructor for SwarmSyncerServer -func NewSwarmSyncerServer(live bool, po uint8, db *storage.DBAPI) (*SwarmSyncerServer, error) { - sessionAt := db.CurrentBucketStorageIndex(po) +func NewSwarmSyncerServer(live bool, po uint8, syncChunkStore storage.SyncChunkStore) (*SwarmSyncerServer, error) { + sessionAt := syncChunkStore.BinIndex(po) var start uint64 if live { start = sessionAt } return &SwarmSyncerServer{ po: po, - db: db, + store: syncChunkStore, sessionAt: sessionAt, start: start, + live: live, quit: make(chan struct{}), }, nil } -func RegisterSwarmSyncerServer(streamer *Registry, db *storage.DBAPI) { +func RegisterSwarmSyncerServer(streamer *Registry, syncChunkStore storage.SyncChunkStore) { streamer.RegisterServerFunc("SYNC", func(p *Peer, t string, live bool) (Server, error) { po, err := ParseSyncBinKey(t) if err != nil { return nil, err } - return NewSwarmSyncerServer(live, po, db) + return NewSwarmSyncerServer(live, po, syncChunkStore) }) // streamer.RegisterServerFunc(stream, func(p *Peer) (Server, error) { // return NewOutgoingProvableSwarmSyncer(po, db) @@ -78,27 +79,35 @@ func (s *SwarmSyncerServer) Close() { close(s.quit) } -// GetSection retrieves the actual chunk from localstore +// GetData retrieves the actual chunk from netstore func (s *SwarmSyncerServer) GetData(ctx context.Context, key []byte) ([]byte, error) { - chunk, err := s.db.Get(ctx, storage.Address(key)) - if err == storage.ErrFetching { - <-chunk.ReqC - } else if err != nil { + chunk, err := s.store.Get(ctx, storage.Address(key)) + if err != nil { return nil, err } - return chunk.SData, nil + return chunk.Data(), nil } // GetBatch retrieves the next batch of hashes from the dbstore func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) { var batch []byte i := 0 - if from == 0 { - from = s.start - } - if to <= from || from >= s.sessionAt { - to = math.MaxUint64 + if s.live { + if from == 0 { + from = s.start + } + if to <= from || from >= s.sessionAt { + to = math.MaxUint64 + } + } else { + if (to < from && to != 0) || from > s.sessionAt { + return nil, 0, 0, nil, nil + } + if to == 0 || to > s.sessionAt { + to = s.sessionAt + } } + var ticker *time.Ticker defer func() { if ticker != nil { @@ -119,8 +128,8 @@ func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint6 } metrics.GetOrRegisterCounter("syncer.setnextbatch.iterator", nil).Inc(1) - err := s.db.Iterator(from, to, s.po, func(addr storage.Address, idx uint64) bool { - batch = append(batch, addr[:]...) + err := s.store.Iterator(from, to, s.po, func(key storage.Address, idx uint64) bool { + batch = append(batch, key[:]...) i++ to = idx return i < BatchSize @@ -134,7 +143,7 @@ func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint6 wait = true } - log.Trace("Swarm syncer offer batch", "po", s.po, "len", i, "from", from, "to", to, "current store count", s.db.CurrentBucketStorageIndex(s.po)) + log.Trace("Swarm syncer offer batch", "po", s.po, "len", i, "from", from, "to", to, "current store count", s.store.BinIndex(s.po)) return batch, from, to, nil, nil } @@ -146,28 +155,26 @@ type SwarmSyncerClient struct { sessionReader storage.LazySectionReader retrieveC chan *storage.Chunk storeC chan *storage.Chunk - db *storage.DBAPI + store storage.SyncChunkStore // chunker storage.Chunker - currentRoot storage.Address - requestFunc func(chunk *storage.Chunk) - end, start uint64 - peer *Peer - ignoreExistingRequest bool - stream Stream + currentRoot storage.Address + requestFunc func(chunk *storage.Chunk) + end, start uint64 + peer *Peer + stream Stream } // NewSwarmSyncerClient is a contructor for provable data exchange syncer -func NewSwarmSyncerClient(p *Peer, db *storage.DBAPI, ignoreExistingRequest bool, stream Stream) (*SwarmSyncerClient, error) { +func NewSwarmSyncerClient(p *Peer, store storage.SyncChunkStore, stream Stream) (*SwarmSyncerClient, error) { return &SwarmSyncerClient{ - db: db, - peer: p, - ignoreExistingRequest: ignoreExistingRequest, - stream: stream, + store: store, + peer: p, + stream: stream, }, nil } // // NewIncomingProvableSwarmSyncer is a contructor for provable data exchange syncer -// func NewIncomingProvableSwarmSyncer(po int, priority int, index uint64, sessionAt uint64, intervals []uint64, sessionRoot storage.Key, chunker *storage.PyramidChunker, store storage.ChunkStore, p Peer) *SwarmSyncerClient { +// func NewIncomingProvableSwarmSyncer(po int, priority int, index uint64, sessionAt uint64, intervals []uint64, sessionRoot storage.Address, chunker *storage.PyramidChunker, store storage.ChunkStore, p Peer) *SwarmSyncerClient { // retrieveC := make(storage.Chunk, chunksCap) // RunChunkRequestor(p, retrieveC) // storeC := make(storage.Chunk, chunksCap) @@ -204,26 +211,15 @@ func NewSwarmSyncerClient(p *Peer, db *storage.DBAPI, ignoreExistingRequest bool // RegisterSwarmSyncerClient registers the client constructor function for // to handle incoming sync streams -func RegisterSwarmSyncerClient(streamer *Registry, db *storage.DBAPI) { +func RegisterSwarmSyncerClient(streamer *Registry, store storage.SyncChunkStore) { streamer.RegisterClientFunc("SYNC", func(p *Peer, t string, live bool) (Client, error) { - return NewSwarmSyncerClient(p, db, true, NewStream("SYNC", t, live)) + return NewSwarmSyncerClient(p, store, NewStream("SYNC", t, live)) }) } // NeedData -func (s *SwarmSyncerClient) NeedData(ctx context.Context, key []byte) (wait func()) { - chunk, _ := s.db.GetOrCreateRequest(ctx, key) - // TODO: we may want to request from this peer anyway even if the request exists - - // ignoreExistingRequest is temporary commented out until its functionality is verified. - // For now, this optimization can be disabled. - if chunk.ReqC == nil { //|| (s.ignoreExistingRequest && !created) { - return nil - } - // create request and wait until the chunk data arrives and is stored - return func() { - chunk.WaitToStore() - } +func (s *SwarmSyncerClient) NeedData(ctx context.Context, key []byte) (wait func(context.Context) error) { + return s.store.FetchFunc(ctx, key) } // BatchDone diff --git a/swarm/network/stream/syncer_test.go b/swarm/network/stream/syncer_test.go index f72aa3444..469d520f8 100644 --- a/swarm/network/stream/syncer_test.go +++ b/swarm/network/stream/syncer_test.go @@ -102,17 +102,22 @@ func testSyncBetweenNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck } } localStore := store.(*storage.LocalStore) - db := storage.NewDBAPI(localStore) - bucket.Store(bucketKeyDB, db) + netStore, err := storage.NewNetStore(localStore, nil) + if err != nil { + return nil, nil, err + } + bucket.Store(bucketKeyDB, netStore) kad := network.NewKademlia(addr.Over(), network.NewKadParams()) - delivery := NewDelivery(kad, db) + delivery := NewDelivery(kad, netStore) + netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New + bucket.Store(bucketKeyDelivery, delivery) - r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{ + r := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ SkipCheck: skipCheck, }) - fileStore := storage.NewFileStore(storage.NewNetStore(localStore, nil), storage.NewFileStoreParams()) + fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams()) bucket.Store(bucketKeyFileStore, fileStore) return r, cleanup, nil @@ -197,8 +202,8 @@ func testSyncBetweenNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck if !ok { return fmt.Errorf("No DB") } - db := item.(*storage.DBAPI) - db.Iterator(0, math.MaxUint64, po, func(addr storage.Address, index uint64) bool { + netStore := item.(*storage.NetStore) + netStore.Iterator(0, math.MaxUint64, po, func(addr storage.Address, index uint64) bool { hashes[i] = append(hashes[i], addr) totalHashes++ hashCounts[i]++ @@ -216,16 +221,11 @@ func testSyncBetweenNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck if !ok { return fmt.Errorf("No DB") } - db := item.(*storage.DBAPI) - chunk, err := db.Get(ctx, key) - if err == storage.ErrFetching { - <-chunk.ReqC - } else if err != nil { - continue + db := item.(*storage.NetStore) + _, err := db.Get(ctx, key) + if err == nil { + found++ } - // needed for leveldb not to be closed? - // chunk.WaitToStore() - found++ } } log.Debug("sync check", "node", node, "index", i, "bin", po, "found", found, "total", total) |