diff options
Diffstat (limited to 'swarm/network')
-rw-r--r-- | swarm/network/hive.go | 2 | ||||
-rw-r--r-- | swarm/network/hive_test.go | 27 | ||||
-rw-r--r-- | swarm/network/kademlia.go | 157 | ||||
-rw-r--r-- | swarm/network/kademlia_test.go | 112 | ||||
-rw-r--r-- | swarm/network/protocol_test.go | 4 | ||||
-rw-r--r-- | swarm/network/simulation/kademlia_test.go | 1 | ||||
-rw-r--r-- | swarm/network/stream/common_test.go | 106 | ||||
-rw-r--r-- | swarm/network/stream/delivery.go | 179 | ||||
-rw-r--r-- | swarm/network/stream/delivery_test.go | 177 | ||||
-rw-r--r-- | swarm/network/stream/intervals_test.go | 9 | ||||
-rw-r--r-- | swarm/network/stream/lightnode_test.go | 89 | ||||
-rw-r--r-- | swarm/network/stream/messages.go | 72 | ||||
-rw-r--r-- | swarm/network/stream/peer.go | 184 | ||||
-rw-r--r-- | swarm/network/stream/peer_test.go | 309 | ||||
-rw-r--r-- | swarm/network/stream/snapshot_retrieval_test.go | 6 | ||||
-rw-r--r-- | swarm/network/stream/snapshot_sync_test.go | 30 | ||||
-rw-r--r-- | swarm/network/stream/stream.go | 390 | ||||
-rw-r--r-- | swarm/network/stream/streamer_test.go | 191 | ||||
-rw-r--r-- | swarm/network/stream/syncer.go | 209 | ||||
-rw-r--r-- | swarm/network/stream/syncer_test.go | 68 |
20 files changed, 1145 insertions, 1177 deletions
diff --git a/swarm/network/hive.go b/swarm/network/hive.go index 2eb521f1d..ad51b29c2 100644 --- a/swarm/network/hive.go +++ b/swarm/network/hive.go @@ -116,7 +116,7 @@ func (h *Hive) Stop() error { log.Info(fmt.Sprintf("%08x hive stopped, dropping peers", h.BaseAddr()[:4])) h.EachConn(nil, 255, func(p *Peer, _ int) bool { log.Info(fmt.Sprintf("%08x dropping peer %08x", h.BaseAddr()[:4], p.Address()[:4])) - p.Drop(nil) + p.Drop() return true }) diff --git a/swarm/network/hive_test.go b/swarm/network/hive_test.go index d03db42bc..3e9732216 100644 --- a/swarm/network/hive_test.go +++ b/swarm/network/hive_test.go @@ -117,7 +117,7 @@ func TestHiveStatePersistance(t *testing.T) { const peersCount = 5 - startHive := func(t *testing.T, dir string) (h *Hive) { + startHive := func(t *testing.T, dir string) (h *Hive, cleanupFunc func()) { store, err := state.NewDBStore(dir) if err != nil { t.Fatal(err) @@ -137,27 +137,30 @@ func TestHiveStatePersistance(t *testing.T) { if err := h.Start(s.Server); err != nil { t.Fatal(err) } - return h + + cleanupFunc = func() { + err := h.Stop() + if err != nil { + t.Fatal(err) + } + + s.Stop() + } + return h, cleanupFunc } - h1 := startHive(t, dir) + h1, cleanup1 := startHive(t, dir) peers := make(map[string]bool) for i := 0; i < peersCount; i++ { raddr := RandomAddr() h1.Register(raddr) peers[raddr.String()] = true } - if err = h1.Stop(); err != nil { - t.Fatal(err) - } + cleanup1() // start the hive and check that we know of all expected peers - h2 := startHive(t, dir) - defer func() { - if err = h2.Stop(); err != nil { - t.Fatal(err) - } - }() + h2, cleanup2 := startHive(t, dir) + cleanup2() i := 0 h2.Kademlia.EachAddr(nil, 256, func(addr *BzzAddr, po int) bool { diff --git a/swarm/network/kademlia.go b/swarm/network/kademlia.go index dd6de44fd..90491ab31 100644 --- a/swarm/network/kademlia.go +++ b/swarm/network/kademlia.go @@ -25,6 +25,7 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/swarm/log" "github.com/ethereum/go-ethereum/swarm/pot" sv "github.com/ethereum/go-ethereum/swarm/version" @@ -82,14 +83,14 @@ func NewKadParams() *KadParams { // Kademlia is a table of live peers and a db of known peers (node records) type Kademlia struct { lock sync.RWMutex - *KadParams // Kademlia configuration parameters - base []byte // immutable baseaddress of the table - addrs *pot.Pot // pots container for known peer addresses - conns *pot.Pot // pots container for live peer connections - depth uint8 // stores the last current depth of saturation - nDepth int // stores the last neighbourhood depth - nDepthC chan int // returned by DepthC function to signal neighbourhood depth change - addrCountC chan int // returned by AddrCountC function to signal peer count change + *KadParams // Kademlia configuration parameters + base []byte // immutable baseaddress of the table + addrs *pot.Pot // pots container for known peer addresses + conns *pot.Pot // pots container for live peer connections + depth uint8 // stores the last current depth of saturation + nDepth int // stores the last neighbourhood depth + nDepthMu sync.RWMutex // protects neighbourhood depth nDepth + nDepthSig []chan struct{} // signals when neighbourhood depth nDepth is changed } // NewKademlia creates a Kademlia table for base address addr @@ -138,6 +139,9 @@ func (e *entry) Hex() string { func (k *Kademlia) Register(peers ...*BzzAddr) error { k.lock.Lock() defer k.lock.Unlock() + + metrics.GetOrRegisterCounter("kad.register", nil).Inc(1) + var known, size int for _, p := range peers { log.Trace("kademlia trying to register", "addr", p) @@ -164,8 +168,6 @@ func (k *Kademlia) Register(peers ...*BzzAddr) error { return newEntry(p) } - log.Trace("found among known peers, underlay addr is same, do nothing", "new", p, "old", e.BzzAddr) - return v }) if found { @@ -173,12 +175,8 @@ func (k *Kademlia) Register(peers ...*BzzAddr) error { } size++ } - // send new address count value only if there are new addresses - if k.addrCountC != nil && size-known > 0 { - k.addrCountC <- k.addrs.Size() - } - k.sendNeighbourhoodDepthChange() + k.setNeighbourhoodDepth() return nil } @@ -186,6 +184,9 @@ func (k *Kademlia) Register(peers ...*BzzAddr) error { func (k *Kademlia) SuggestPeer() (suggestedPeer *BzzAddr, saturationDepth int, changed bool) { k.lock.Lock() defer k.lock.Unlock() + + metrics.GetOrRegisterCounter("kad.suggestpeer", nil).Inc(1) + radius := neighbourhoodRadiusForPot(k.conns, k.NeighbourhoodSize, k.base) // collect undersaturated bins in ascending order of number of connected peers // and from shallow to deep (ascending order of PO) @@ -297,6 +298,9 @@ func (k *Kademlia) SuggestPeer() (suggestedPeer *BzzAddr, saturationDepth int, c func (k *Kademlia) On(p *Peer) (uint8, bool) { k.lock.Lock() defer k.lock.Unlock() + + metrics.GetOrRegisterCounter("kad.on", nil).Inc(1) + var ins bool k.conns, _, _, _ = pot.Swap(k.conns, p, Pof, func(v pot.Val) pot.Val { // if not found live @@ -315,12 +319,7 @@ func (k *Kademlia) On(p *Peer) (uint8, bool) { k.addrs, _, _, _ = pot.Swap(k.addrs, p, Pof, func(v pot.Val) pot.Val { return a }) - // send new address count value only if the peer is inserted - if k.addrCountC != nil { - k.addrCountC <- k.addrs.Size() - } } - log.Trace(k.string()) // calculate if depth of saturation changed depth := uint8(k.saturation()) var changed bool @@ -328,75 +327,72 @@ func (k *Kademlia) On(p *Peer) (uint8, bool) { changed = true k.depth = depth } - k.sendNeighbourhoodDepthChange() + k.setNeighbourhoodDepth() return k.depth, changed } -// NeighbourhoodDepthC returns the channel that sends a new kademlia -// neighbourhood depth on each change. -// Not receiving from the returned channel will block On function -// when the neighbourhood depth is changed. -// TODO: Why is this exported, and if it should be; why can't we have more subscribers than one? -func (k *Kademlia) NeighbourhoodDepthC() <-chan int { - k.lock.Lock() - defer k.lock.Unlock() - if k.nDepthC == nil { - k.nDepthC = make(chan int) +// setNeighbourhoodDepth calculates neighbourhood depth with depthForPot, +// sets it to the nDepth and sends a signal to every nDepthSig channel. +func (k *Kademlia) setNeighbourhoodDepth() { + nDepth := depthForPot(k.conns, k.NeighbourhoodSize, k.base) + var changed bool + k.nDepthMu.Lock() + if nDepth != k.nDepth { + k.nDepth = nDepth + changed = true } - return k.nDepthC -} + k.nDepthMu.Unlock() -// CloseNeighbourhoodDepthC closes the channel returned by -// NeighbourhoodDepthC and stops sending neighbourhood change. -func (k *Kademlia) CloseNeighbourhoodDepthC() { - k.lock.Lock() - defer k.lock.Unlock() - - if k.nDepthC != nil { - close(k.nDepthC) - k.nDepthC = nil + if len(k.nDepthSig) > 0 && changed { + for _, c := range k.nDepthSig { + // Every nDepthSig channel has a buffer capacity of 1, + // so every receiver will get the signal even if the + // select statement has the default case to avoid blocking. + select { + case c <- struct{}{}: + default: + } + } } } -// sendNeighbourhoodDepthChange sends new neighbourhood depth to k.nDepth channel -// if it is initialized. -func (k *Kademlia) sendNeighbourhoodDepthChange() { - // nDepthC is initialized when NeighbourhoodDepthC is called and returned by it. - // It provides signaling of neighbourhood depth change. - // This part of the code is sending new neighbourhood depth to nDepthC if that condition is met. - if k.nDepthC != nil { - nDepth := depthForPot(k.conns, k.NeighbourhoodSize, k.base) - if nDepth != k.nDepth { - k.nDepth = nDepth - k.nDepthC <- nDepth - } - } +// NeighbourhoodDepth returns the value calculated by depthForPot function +// in setNeighbourhoodDepth method. +func (k *Kademlia) NeighbourhoodDepth() int { + k.nDepthMu.RLock() + defer k.nDepthMu.RUnlock() + return k.nDepth } -// AddrCountC returns the channel that sends a new -// address count value on each change. -// Not receiving from the returned channel will block Register function -// when address count value changes. -func (k *Kademlia) AddrCountC() <-chan int { +// SubscribeToNeighbourhoodDepthChange returns the channel that signals +// when neighbourhood depth value is changed. The current neighbourhood depth +// is returned by NeighbourhoodDepth method. Returned function unsubscribes +// the channel from signaling and releases the resources. Returned function is safe +// to be called multiple times. +func (k *Kademlia) SubscribeToNeighbourhoodDepthChange() (c <-chan struct{}, unsubscribe func()) { + channel := make(chan struct{}, 1) + var closeOnce sync.Once + k.lock.Lock() defer k.lock.Unlock() - if k.addrCountC == nil { - k.addrCountC = make(chan int) - } - return k.addrCountC -} + k.nDepthSig = append(k.nDepthSig, channel) -// CloseAddrCountC closes the channel returned by -// AddrCountC and stops sending address count change. -func (k *Kademlia) CloseAddrCountC() { - k.lock.Lock() - defer k.lock.Unlock() + unsubscribe = func() { + k.lock.Lock() + defer k.lock.Unlock() - if k.addrCountC != nil { - close(k.addrCountC) - k.addrCountC = nil + for i, c := range k.nDepthSig { + if c == channel { + k.nDepthSig = append(k.nDepthSig[:i], k.nDepthSig[i+1:]...) + break + } + } + + closeOnce.Do(func() { close(channel) }) } + + return channel, unsubscribe } // Off removes a peer from among live peers @@ -422,11 +418,7 @@ func (k *Kademlia) Off(p *Peer) { // v cannot be nil, but no need to check return nil }) - // send new address count value only if the peer is deleted - if k.addrCountC != nil { - k.addrCountC <- k.addrs.Size() - } - k.sendNeighbourhoodDepthChange() + k.setNeighbourhoodDepth() } } @@ -484,13 +476,6 @@ func (k *Kademlia) eachAddr(base []byte, o int, f func(*BzzAddr, int) bool) { }) } -// NeighbourhoodDepth returns the depth for the pot, see depthForPot -func (k *Kademlia) NeighbourhoodDepth() (depth int) { - k.lock.RLock() - defer k.lock.RUnlock() - return depthForPot(k.conns, k.NeighbourhoodSize, k.base) -} - // neighbourhoodRadiusForPot returns the neighbourhood radius of the kademlia // neighbourhood radius encloses the nearest neighbour set with size >= neighbourhoodSize // i.e., neighbourhood radius is the deepest PO such that all bins not shallower altogether @@ -608,7 +593,7 @@ func (k *Kademlia) string() string { if len(sv.GitCommit) > 0 { rows = append(rows, fmt.Sprintf("commit hash: %s", sv.GitCommit)) } - rows = append(rows, fmt.Sprintf("%v KΛÐΞMLIΛ hive: queen's address: %x", time.Now().UTC().Format(time.UnixDate), k.BaseAddr()[:3])) + rows = append(rows, fmt.Sprintf("%v KΛÐΞMLIΛ hive: queen's address: %x", time.Now().UTC().Format(time.UnixDate), k.BaseAddr())) rows = append(rows, fmt.Sprintf("population: %d (%d), NeighbourhoodSize: %d, MinBinSize: %d, MaxBinSize: %d", k.conns.Size(), k.addrs.Size(), k.NeighbourhoodSize, k.MinBinSize, k.MaxBinSize)) liverows := make([]string, k.MaxProxDisplay) diff --git a/swarm/network/kademlia_test.go b/swarm/network/kademlia_test.go index b4663eee5..035879cd3 100644 --- a/swarm/network/kademlia_test.go +++ b/swarm/network/kademlia_test.go @@ -541,7 +541,7 @@ func TestKademliaHiveString(t *testing.T) { tk.Register("10000000", "10000001") tk.MaxProxDisplay = 8 h := tk.String() - expH := "\n=========================================================================\nMon Feb 27 12:10:28 UTC 2017 KΛÐΞMLIΛ hive: queen's address: 000000\npopulation: 2 (4), NeighbourhoodSize: 2, MinBinSize: 2, MaxBinSize: 4\n============ DEPTH: 0 ==========================================\n000 0 | 2 8100 (0) 8000 (0)\n001 1 4000 | 1 4000 (0)\n002 1 2000 | 1 2000 (0)\n003 0 | 0\n004 0 | 0\n005 0 | 0\n006 0 | 0\n007 0 | 0\n=========================================================================" + expH := "\n=========================================================================\nMon Feb 27 12:10:28 UTC 2017 KΛÐΞMLIΛ hive: queen's address: 0000000000000000000000000000000000000000000000000000000000000000\npopulation: 2 (4), NeighbourhoodSize: 2, MinBinSize: 2, MaxBinSize: 4\n============ DEPTH: 0 ==========================================\n000 0 | 2 8100 (0) 8000 (0)\n001 1 4000 | 1 4000 (0)\n002 1 2000 | 1 2000 (0)\n003 0 | 0\n004 0 | 0\n005 0 | 0\n006 0 | 0\n007 0 | 0\n=========================================================================" if expH[104:] != h[104:] { t.Fatalf("incorrect hive output. expected %v, got %v", expH, h) } @@ -560,3 +560,113 @@ func newTestDiscoveryPeer(addr pot.Address, kad *Kademlia) *Peer { } return NewPeer(bp, kad) } + +// TestKademlia_SubscribeToNeighbourhoodDepthChange checks if correct +// signaling over SubscribeToNeighbourhoodDepthChange channels are made +// when neighbourhood depth is changed. +func TestKademlia_SubscribeToNeighbourhoodDepthChange(t *testing.T) { + + testSignal := func(t *testing.T, k *testKademlia, prevDepth int, c <-chan struct{}) (newDepth int) { + t.Helper() + + select { + case _, ok := <-c: + if !ok { + t.Error("closed signal channel") + } + newDepth = k.NeighbourhoodDepth() + if prevDepth == newDepth { + t.Error("depth not changed") + } + return newDepth + case <-time.After(2 * time.Second): + t.Error("timeout") + } + return newDepth + } + + t.Run("single subscription", func(t *testing.T) { + k := newTestKademlia(t, "00000000") + + c, u := k.SubscribeToNeighbourhoodDepthChange() + defer u() + + depth := k.NeighbourhoodDepth() + + k.On("11111101", "01000000", "10000000", "00000010") + + testSignal(t, k, depth, c) + }) + + t.Run("multiple subscriptions", func(t *testing.T) { + k := newTestKademlia(t, "00000000") + + c1, u1 := k.SubscribeToNeighbourhoodDepthChange() + defer u1() + + c2, u2 := k.SubscribeToNeighbourhoodDepthChange() + defer u2() + + depth := k.NeighbourhoodDepth() + + k.On("11111101", "01000000", "10000000", "00000010") + + testSignal(t, k, depth, c1) + + testSignal(t, k, depth, c2) + }) + + t.Run("multiple changes", func(t *testing.T) { + k := newTestKademlia(t, "00000000") + + c, u := k.SubscribeToNeighbourhoodDepthChange() + defer u() + + depth := k.NeighbourhoodDepth() + + k.On("11111101", "01000000", "10000000", "00000010") + + depth = testSignal(t, k, depth, c) + + k.On("11111101", "01000010", "10000010", "00000110") + + testSignal(t, k, depth, c) + }) + + t.Run("no depth change", func(t *testing.T) { + k := newTestKademlia(t, "00000000") + + c, u := k.SubscribeToNeighbourhoodDepthChange() + defer u() + + // does not trigger the depth change + k.On("11111101") + + select { + case _, ok := <-c: + if !ok { + t.Error("closed signal channel") + } + t.Error("signal received") + case <-time.After(1 * time.Second): + // all fine + } + }) + + t.Run("no new peers", func(t *testing.T) { + k := newTestKademlia(t, "00000000") + + changeC, unsubscribe := k.SubscribeToNeighbourhoodDepthChange() + defer unsubscribe() + + select { + case _, ok := <-changeC: + if !ok { + t.Error("closed signal channel") + } + t.Error("signal received") + case <-time.After(1 * time.Second): + // all fine + } + }) +} diff --git a/swarm/network/protocol_test.go b/swarm/network/protocol_test.go index 2207ba308..737ad0784 100644 --- a/swarm/network/protocol_test.go +++ b/swarm/network/protocol_test.go @@ -235,6 +235,7 @@ func TestBzzHandshakeNetworkIDMismatch(t *testing.T) { if err != nil { t.Fatal(err) } + defer s.Stop() node := s.Nodes[0] err = s.testHandshake( @@ -258,6 +259,7 @@ func TestBzzHandshakeVersionMismatch(t *testing.T) { if err != nil { t.Fatal(err) } + defer s.Stop() node := s.Nodes[0] err = s.testHandshake( @@ -281,6 +283,7 @@ func TestBzzHandshakeSuccess(t *testing.T) { if err != nil { t.Fatal(err) } + defer s.Stop() node := s.Nodes[0] err = s.testHandshake( @@ -312,6 +315,7 @@ func TestBzzHandshakeLightNode(t *testing.T) { if err != nil { t.Fatal(err) } + defer pt.Stop() node := pt.Nodes[0] addr := NewAddr(node) diff --git a/swarm/network/simulation/kademlia_test.go b/swarm/network/simulation/kademlia_test.go index 0ac1e7803..4d7dc6240 100644 --- a/swarm/network/simulation/kademlia_test.go +++ b/swarm/network/simulation/kademlia_test.go @@ -156,6 +156,7 @@ func createSimServiceMap(discovery bool) map[string]ServiceFunc { // Call WaitTillSnapshotRecreated() function and wait until it returns // Iterate the nodes and check if all the connections are successfully recreated func TestWaitTillSnapshotRecreated(t *testing.T) { + t.Skip("test is flaky. disabling until underlying problem is addressed") var err error sim := New(createSimServiceMap(true)) _, err = sim.AddNodesAndConnectRing(16) diff --git a/swarm/network/stream/common_test.go b/swarm/network/stream/common_test.go index 917c440d2..615b3b68f 100644 --- a/swarm/network/stream/common_test.go +++ b/swarm/network/stream/common_test.go @@ -30,16 +30,19 @@ import ( "sync/atomic" "time" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/simulations/adapters" p2ptest "github.com/ethereum/go-ethereum/p2p/testing" + "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/network" "github.com/ethereum/go-ethereum/swarm/network/simulation" "github.com/ethereum/go-ethereum/swarm/state" "github.com/ethereum/go-ethereum/swarm/storage" - mockmem "github.com/ethereum/go-ethereum/swarm/storage/mock/mem" + "github.com/ethereum/go-ethereum/swarm/storage/localstore" + "github.com/ethereum/go-ethereum/swarm/storage/mock" "github.com/ethereum/go-ethereum/swarm/testutil" colorable "github.com/mattn/go-colorable" ) @@ -51,7 +54,6 @@ var ( useMockStore = flag.Bool("mockstore", false, "disabled mock store (default: enabled)") longrunning = flag.Bool("longrunning", false, "do run long-running tests") - bucketKeyDB = simulation.BucketKey("db") bucketKeyStore = simulation.BucketKey("store") bucketKeyFileStore = simulation.BucketKey("filestore") bucketKeyNetStore = simulation.BucketKey("netstore") @@ -113,26 +115,24 @@ func newNetStoreAndDeliveryWithRequestFunc(ctx *adapters.ServiceContext, bucket func netStoreAndDeliveryWithAddr(ctx *adapters.ServiceContext, bucket *sync.Map, addr *network.BzzAddr) (*storage.NetStore, *Delivery, func(), error) { n := ctx.Config.Node() - store, datadir, err := createTestLocalStorageForID(n.ID(), addr) - if *useMockStore { - store, datadir, err = createMockStore(mockmem.NewGlobalStore(), n.ID(), addr) - } + localStore, localStoreCleanup, err := newTestLocalStore(n.ID(), addr, nil) if err != nil { return nil, nil, nil, err } - localStore := store.(*storage.LocalStore) + netStore, err := storage.NewNetStore(localStore, nil) if err != nil { + localStore.Close() + localStoreCleanup() return nil, nil, nil, err } - fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams()) + fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams(), chunk.NewTags()) kad := network.NewKademlia(addr.Over(), network.NewKadParams()) delivery := NewDelivery(kad, netStore) - bucket.Store(bucketKeyStore, store) - bucket.Store(bucketKeyDB, netStore) + bucket.Store(bucketKeyStore, localStore) bucket.Store(bucketKeyDelivery, delivery) bucket.Store(bucketKeyFileStore, fileStore) // for the kademlia object, we use the global key from the simulation package, @@ -141,13 +141,13 @@ func netStoreAndDeliveryWithAddr(ctx *adapters.ServiceContext, bucket *sync.Map, cleanup := func() { netStore.Close() - os.RemoveAll(datadir) + localStoreCleanup() } return netStore, delivery, cleanup, nil } -func newStreamerTester(registryOptions *RegistryOptions) (*p2ptest.ProtocolTester, *Registry, *storage.LocalStore, func(), error) { +func newStreamerTester(registryOptions *RegistryOptions) (*p2ptest.ProtocolTester, *Registry, *localstore.DB, func(), error) { // setup addr := network.RandomAddr() // tested peers peer address to := network.NewKademlia(addr.OAddr, network.NewKadParams()) @@ -161,11 +161,7 @@ func newStreamerTester(registryOptions *RegistryOptions) (*p2ptest.ProtocolTeste os.RemoveAll(datadir) } - params := storage.NewDefaultLocalStoreParams() - params.Init(datadir) - params.BaseKey = addr.Over() - - localStore, err := storage.NewTestLocalStoreForAddr(params) + localStore, err := localstore.New(datadir, addr.Over(), nil) if err != nil { removeDataDir() return nil, nil, nil, nil, err @@ -173,17 +169,16 @@ func newStreamerTester(registryOptions *RegistryOptions) (*p2ptest.ProtocolTeste netStore, err := storage.NewNetStore(localStore, nil) if err != nil { + localStore.Close() removeDataDir() return nil, nil, nil, nil, err } delivery := NewDelivery(to, netStore) netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New - streamer := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), registryOptions, nil) - teardown := func() { - streamer.Close() - removeDataDir() - } + intervalsStore := state.NewInmemoryStore() + streamer := NewRegistry(addr.ID(), delivery, netStore, intervalsStore, registryOptions, nil) + prvkey, err := crypto.GenerateKey() if err != nil { removeDataDir() @@ -191,7 +186,13 @@ func newStreamerTester(registryOptions *RegistryOptions) (*p2ptest.ProtocolTeste } protocolTester := p2ptest.NewProtocolTester(prvkey, 1, streamer.runProtocol) - + teardown := func() { + protocolTester.Stop() + streamer.Close() + intervalsStore.Close() + netStore.Close() + removeDataDir() + } err = waitForPeers(streamer, 10*time.Second, 1) if err != nil { teardown() @@ -228,24 +229,37 @@ func newRoundRobinStore(stores ...storage.ChunkStore) *roundRobinStore { } // not used in this context, only to fulfill ChunkStore interface -func (rrs *roundRobinStore) Has(ctx context.Context, addr storage.Address) bool { - panic("RoundRobinStor doesn't support HasChunk") +func (rrs *roundRobinStore) Has(_ context.Context, _ storage.Address) (bool, error) { + return false, errors.New("roundRobinStore doesn't support Has") } -func (rrs *roundRobinStore) Get(ctx context.Context, addr storage.Address) (storage.Chunk, error) { - return nil, errors.New("get not well defined on round robin store") +func (rrs *roundRobinStore) Get(_ context.Context, _ chunk.ModeGet, _ storage.Address) (storage.Chunk, error) { + return nil, errors.New("roundRobinStore doesn't support Get") } -func (rrs *roundRobinStore) Put(ctx context.Context, chunk storage.Chunk) error { +func (rrs *roundRobinStore) Put(ctx context.Context, mode chunk.ModePut, ch storage.Chunk) (bool, error) { i := atomic.AddUint32(&rrs.index, 1) idx := int(i) % len(rrs.stores) - return rrs.stores[idx].Put(ctx, chunk) + return rrs.stores[idx].Put(ctx, mode, ch) +} + +func (rrs *roundRobinStore) Set(ctx context.Context, mode chunk.ModeSet, addr chunk.Address) (err error) { + return errors.New("roundRobinStore doesn't support Set") +} + +func (rrs *roundRobinStore) LastPullSubscriptionBinID(bin uint8) (id uint64, err error) { + return 0, errors.New("roundRobinStore doesn't support LastPullSubscriptionBinID") +} + +func (rrs *roundRobinStore) SubscribePull(ctx context.Context, bin uint8, since, until uint64) (c <-chan chunk.Descriptor, stop func()) { + return nil, nil } -func (rrs *roundRobinStore) Close() { +func (rrs *roundRobinStore) Close() error { for _, store := range rrs.stores { store.Close() } + return nil } func readAll(fileStore *storage.FileStore, hash []byte) (int64, error) { @@ -311,24 +325,28 @@ func generateRandomFile() (string, error) { return string(b), nil } -//create a local store for the given node -func createTestLocalStorageForID(id enode.ID, addr *network.BzzAddr) (storage.ChunkStore, string, error) { - var datadir string - var err error - datadir, err = ioutil.TempDir("", fmt.Sprintf("syncer-test-%s", id.TerminalString())) +func newTestLocalStore(id enode.ID, addr *network.BzzAddr, globalStore mock.GlobalStorer) (localStore *localstore.DB, cleanup func(), err error) { + dir, err := ioutil.TempDir("", "swarm-stream-") if err != nil { - return nil, "", err + return nil, nil, err + } + cleanup = func() { + os.RemoveAll(dir) + } + + var mockStore *mock.NodeStore + if globalStore != nil { + mockStore = globalStore.NewNodeStore(common.BytesToAddress(id.Bytes())) } - var store storage.ChunkStore - params := storage.NewDefaultLocalStoreParams() - params.ChunkDbPath = datadir - params.BaseKey = addr.Over() - store, err = storage.NewTestLocalStoreForAddr(params) + + localStore, err = localstore.New(dir, addr.Over(), &localstore.Options{ + MockStore: mockStore, + }) if err != nil { - os.RemoveAll(datadir) - return nil, "", err + cleanup() + return nil, nil, err } - return store, datadir, nil + return localStore, cleanup, nil } // watchDisconnections receives simulation peer events in a new goroutine and sets atomic value diff --git a/swarm/network/stream/delivery.go b/swarm/network/stream/delivery.go index bc4f1f665..1b4a14ea2 100644 --- a/swarm/network/stream/delivery.go +++ b/swarm/network/stream/delivery.go @@ -20,9 +20,11 @@ import ( "context" "errors" "fmt" + "time" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/log" "github.com/ethereum/go-ethereum/swarm/network" "github.com/ethereum/go-ethereum/swarm/spancontext" @@ -32,11 +34,6 @@ import ( olog "github.com/opentracing/opentracing-go/log" ) -const ( - swarmChunkServerStreamName = "RETRIEVE_REQUEST" - deliveryCap = 32 -) - var ( processReceivedChunksCount = metrics.NewRegisteredCounter("network.stream.received_chunks.count", nil) handleRetrieveRequestMsgCount = metrics.NewRegisteredCounter("network.stream.handle_retrieve_request_msg.count", nil) @@ -44,93 +41,25 @@ var ( requestFromPeersCount = metrics.NewRegisteredCounter("network.stream.request_from_peers.count", nil) requestFromPeersEachCount = metrics.NewRegisteredCounter("network.stream.request_from_peers_each.count", nil) + + lastReceivedChunksMsg = metrics.GetOrRegisterGauge("network.stream.received_chunks", nil) ) type Delivery struct { - chunkStore storage.SyncChunkStore - kad *network.Kademlia - getPeer func(enode.ID) *Peer + netStore *storage.NetStore + kad *network.Kademlia + getPeer func(enode.ID) *Peer + quit chan struct{} } -func NewDelivery(kad *network.Kademlia, chunkStore storage.SyncChunkStore) *Delivery { +func NewDelivery(kad *network.Kademlia, netStore *storage.NetStore) *Delivery { return &Delivery{ - chunkStore: chunkStore, - kad: kad, + netStore: netStore, + kad: kad, + quit: make(chan struct{}), } } -// SwarmChunkServer implements Server -type SwarmChunkServer struct { - deliveryC chan []byte - batchC chan []byte - chunkStore storage.ChunkStore - currentLen uint64 - quit chan struct{} -} - -// NewSwarmChunkServer is SwarmChunkServer constructor -func NewSwarmChunkServer(chunkStore storage.ChunkStore) *SwarmChunkServer { - s := &SwarmChunkServer{ - deliveryC: make(chan []byte, deliveryCap), - batchC: make(chan []byte), - chunkStore: chunkStore, - quit: make(chan struct{}), - } - go s.processDeliveries() - return s -} - -// processDeliveries handles delivered chunk hashes -func (s *SwarmChunkServer) processDeliveries() { - var hashes []byte - var batchC chan []byte - for { - select { - case <-s.quit: - return - case hash := <-s.deliveryC: - hashes = append(hashes, hash...) - batchC = s.batchC - case batchC <- hashes: - hashes = nil - batchC = nil - } - } -} - -// SessionIndex returns zero in all cases for SwarmChunkServer. -func (s *SwarmChunkServer) SessionIndex() (uint64, error) { - return 0, nil -} - -// SetNextBatch -func (s *SwarmChunkServer) SetNextBatch(_, _ uint64) (hashes []byte, from uint64, to uint64, proof *HandoverProof, err error) { - select { - case hashes = <-s.batchC: - case <-s.quit: - return - } - - from = s.currentLen - s.currentLen += uint64(len(hashes)) - to = s.currentLen - return -} - -// Close needs to be called on a stream server -func (s *SwarmChunkServer) Close() { - close(s.quit) -} - -// GetData retrives chunk data from db store -func (s *SwarmChunkServer) GetData(ctx context.Context, key []byte) ([]byte, error) { - chunk, err := s.chunkStore.Get(ctx, storage.Address(key)) - if err != nil { - return nil, err - } - return chunk.Data(), nil -} - // RetrieveRequestMsg is the protocol msg for chunk retrieve requests type RetrieveRequestMsg struct { Addr storage.Address @@ -149,12 +78,6 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req * osp.LogFields(olog.String("ref", req.Addr.String())) - s, err := sp.getServer(NewStream(swarmChunkServerStreamName, "", true)) - if err != nil { - return err - } - streamer := s.Server.(*SwarmChunkServer) - var cancel func() // TODO: do something with this hardcoded timeout, maybe use TTL in the future ctx = context.WithValue(ctx, "peer", sp.ID().String()) @@ -164,36 +87,26 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req * go func() { select { case <-ctx.Done(): - case <-streamer.quit: + case <-d.quit: } cancel() }() go func() { defer osp.Finish() - chunk, err := d.chunkStore.Get(ctx, req.Addr) + ch, err := d.netStore.Get(ctx, chunk.ModeGetRequest, req.Addr) if err != nil { retrieveChunkFail.Inc(1) log.Debug("ChunkStore.Get can not retrieve chunk", "peer", sp.ID().String(), "addr", req.Addr, "hopcount", req.HopCount, "err", err) return } - if req.SkipCheck { - syncing := false - osp.LogFields(olog.Bool("skipCheck", true)) + syncing := false - err = sp.Deliver(ctx, chunk, s.priority, syncing) - if err != nil { - log.Warn("ERROR in handleRetrieveRequestMsg", "err", err) - } - osp.LogFields(olog.Bool("delivered", true)) - return - } - osp.LogFields(olog.Bool("skipCheck", false)) - select { - case streamer.deliveryC <- chunk.Address()[:]: - case <-streamer.quit: + err = sp.Deliver(ctx, ch, Top, syncing) + if err != nil { + log.Warn("ERROR in handleRetrieveRequestMsg", "err", err) } - + osp.LogFields(olog.Bool("delivered", true)) }() return nil @@ -216,7 +129,7 @@ type ChunkDeliveryMsgRetrieval ChunkDeliveryMsg type ChunkDeliveryMsgSyncing ChunkDeliveryMsg // chunk delivery msg is response to retrieverequest msg -func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req *ChunkDeliveryMsg) error { +func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req interface{}) error { var osp opentracing.Span ctx, osp = spancontext.StartSpan( ctx, @@ -224,36 +137,58 @@ func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req *Ch processReceivedChunksCount.Inc(1) - // retrieve the span for the originating retrieverequest - spanId := fmt.Sprintf("stream.send.request.%v.%v", sp.ID(), req.Addr) - span := tracing.ShiftSpanByKey(spanId) + // record the last time we received a chunk delivery message + lastReceivedChunksMsg.Update(time.Now().UnixNano()) + + var msg *ChunkDeliveryMsg + var mode chunk.ModePut + switch r := req.(type) { + case *ChunkDeliveryMsgRetrieval: + msg = (*ChunkDeliveryMsg)(r) + peerPO := chunk.Proximity(sp.BzzAddr.Over(), msg.Addr) + po := chunk.Proximity(d.kad.BaseAddr(), msg.Addr) + depth := d.kad.NeighbourhoodDepth() + // chunks within the area of responsibility should always sync + // https://github.com/ethersphere/go-ethereum/pull/1282#discussion_r269406125 + if po >= depth || peerPO < po { + mode = chunk.ModePutSync + } else { + // do not sync if peer that is sending us a chunk is closer to the chunk then we are + mode = chunk.ModePutRequest + } + case *ChunkDeliveryMsgSyncing: + msg = (*ChunkDeliveryMsg)(r) + mode = chunk.ModePutSync + case *ChunkDeliveryMsg: + msg = r + mode = chunk.ModePutSync + } - log.Trace("handle.chunk.delivery", "ref", req.Addr, "from peer", sp.ID()) + log.Trace("handle.chunk.delivery", "ref", msg.Addr, "from peer", sp.ID()) go func() { defer osp.Finish() - if span != nil { - span.LogFields(olog.String("finish", "from handleChunkDeliveryMsg")) - defer span.Finish() - } - - req.peer = sp - log.Trace("handle.chunk.delivery", "put", req.Addr) - err := d.chunkStore.Put(ctx, storage.NewChunk(req.Addr, req.SData)) + msg.peer = sp + log.Trace("handle.chunk.delivery", "put", msg.Addr) + _, err := d.netStore.Put(ctx, mode, storage.NewChunk(msg.Addr, msg.SData)) if err != nil { if err == storage.ErrChunkInvalid { // we removed this log because it spams the logs // TODO: Enable this log line - // log.Warn("invalid chunk delivered", "peer", sp.ID(), "chunk", req.Addr, ) - req.peer.Drop(err) + // log.Warn("invalid chunk delivered", "peer", sp.ID(), "chunk", msg.Addr, ) + msg.peer.Drop() } } - log.Trace("handle.chunk.delivery", "done put", req.Addr, "err", err) + log.Trace("handle.chunk.delivery", "done put", msg.Addr, "err", err) }() return nil } +func (d *Delivery) Close() { + close(d.quit) +} + // RequestFromPeers sends a chunk retrieve request to a peer // The most eligible peer that hasn't already been sent to is chosen // TODO: define "eligible" diff --git a/swarm/network/stream/delivery_test.go b/swarm/network/stream/delivery_test.go index 50b788150..fc0f9d5df 100644 --- a/swarm/network/stream/delivery_test.go +++ b/swarm/network/stream/delivery_test.go @@ -31,6 +31,7 @@ import ( "github.com/ethereum/go-ethereum/p2p/protocols" "github.com/ethereum/go-ethereum/p2p/simulations/adapters" p2ptest "github.com/ethereum/go-ethereum/p2p/testing" + "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/log" "github.com/ethereum/go-ethereum/swarm/network" pq "github.com/ethereum/go-ethereum/swarm/network/priorityqueue" @@ -40,64 +41,11 @@ import ( "github.com/ethereum/go-ethereum/swarm/testutil" ) -//Tests initializing a retrieve request -func TestStreamerRetrieveRequest(t *testing.T) { - regOpts := &RegistryOptions{ - Retrieval: RetrievalClientOnly, - Syncing: SyncingDisabled, - } - tester, streamer, _, teardown, err := newStreamerTester(regOpts) - if err != nil { - t.Fatal(err) - } - defer teardown() - - node := tester.Nodes[0] - - ctx := context.Background() - req := network.NewRequest( - storage.Address(hash0[:]), - true, - &sync.Map{}, - ) - streamer.delivery.RequestFromPeers(ctx, req) - - stream := NewStream(swarmChunkServerStreamName, "", true) - - err = tester.TestExchanges(p2ptest.Exchange{ - Label: "RetrieveRequestMsg", - Expects: []p2ptest.Expect{ - { //start expecting a subscription for RETRIEVE_REQUEST due to `RetrievalClientOnly` - Code: 4, - Msg: &SubscribeMsg{ - Stream: stream, - History: nil, - Priority: Top, - }, - Peer: node.ID(), - }, - { //expect a retrieve request message for the given hash - Code: 5, - Msg: &RetrieveRequestMsg{ - Addr: hash0[:], - SkipCheck: true, - }, - Peer: node.ID(), - }, - }, - }) - - if err != nil { - t.Fatalf("Expected no error, got %v", err) - } -} - //Test requesting a chunk from a peer then issuing a "empty" OfferedHashesMsg (no hashes available yet) //Should time out as the peer does not have the chunk (no syncing happened previously) func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) { - tester, streamer, _, teardown, err := newStreamerTester(&RegistryOptions{ - Retrieval: RetrievalEnabled, - Syncing: SyncingDisabled, //do no syncing + tester, _, _, teardown, err := newStreamerTester(&RegistryOptions{ + Syncing: SyncingDisabled, //do no syncing }) if err != nil { t.Fatal(err) @@ -108,30 +56,8 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) { chunk := storage.NewChunk(storage.Address(hash0[:]), nil) - peer := streamer.getPeer(node.ID()) - - stream := NewStream(swarmChunkServerStreamName, "", true) - //simulate pre-subscription to RETRIEVE_REQUEST stream on peer - peer.handleSubscribeMsg(context.TODO(), &SubscribeMsg{ - Stream: stream, - History: nil, - Priority: Top, - }) - //test the exchange err = tester.TestExchanges(p2ptest.Exchange{ - Expects: []p2ptest.Expect{ - { //first expect a subscription to the RETRIEVE_REQUEST stream - Code: 4, - Msg: &SubscribeMsg{ - Stream: stream, - History: nil, - Priority: Top, - }, - Peer: node.ID(), - }, - }, - }, p2ptest.Exchange{ Label: "RetrieveRequestMsg", Triggers: []p2ptest.Trigger{ { //then the actual RETRIEVE_REQUEST.... @@ -158,7 +84,7 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) { //should fail with a timeout as the peer we are requesting //the chunk from does not have the chunk - expectedError := `exchange #1 "RetrieveRequestMsg": timed out` + expectedError := `exchange #0 "RetrieveRequestMsg": timed out` if err == nil || err.Error() != expectedError { t.Fatalf("Expected error %v, got %v", expectedError, err) } @@ -167,9 +93,8 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) { // upstream request server receives a retrieve Request and responds with // offered hashes or delivery if skipHash is set to true func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) { - tester, streamer, localStore, teardown, err := newStreamerTester(&RegistryOptions{ - Retrieval: RetrievalEnabled, - Syncing: SyncingDisabled, + tester, _, localStore, teardown, err := newStreamerTester(&RegistryOptions{ + Syncing: SyncingDisabled, }) if err != nil { t.Fatal(err) @@ -178,36 +103,14 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) { node := tester.Nodes[0] - peer := streamer.getPeer(node.ID()) - - stream := NewStream(swarmChunkServerStreamName, "", true) - - peer.handleSubscribeMsg(context.TODO(), &SubscribeMsg{ - Stream: stream, - History: nil, - Priority: Top, - }) - - hash := storage.Address(hash0[:]) - chunk := storage.NewChunk(hash, hash) - err = localStore.Put(context.TODO(), chunk) + hash := storage.Address(hash1[:]) + ch := storage.NewChunk(hash, hash1[:]) + _, err = localStore.Put(context.TODO(), chunk.ModePutUpload, ch) if err != nil { t.Fatalf("Expected no err got %v", err) } err = tester.TestExchanges(p2ptest.Exchange{ - Expects: []p2ptest.Expect{ - { - Code: 4, - Msg: &SubscribeMsg{ - Stream: stream, - History: nil, - Priority: Top, - }, - Peer: node.ID(), - }, - }, - }, p2ptest.Exchange{ Label: "RetrieveRequestMsg", Triggers: []p2ptest.Trigger{ { @@ -220,51 +123,10 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) { }, Expects: []p2ptest.Expect{ { - Code: 1, - Msg: &OfferedHashesMsg{ - HandoverProof: &HandoverProof{ - Handover: &Handover{}, - }, - Hashes: hash, - From: 0, - // TODO: why is this 32??? - To: 32, - Stream: stream, - }, - Peer: node.ID(), - }, - }, - }) - - if err != nil { - t.Fatal(err) - } - - hash = storage.Address(hash1[:]) - chunk = storage.NewChunk(hash, hash1[:]) - err = localStore.Put(context.TODO(), chunk) - if err != nil { - t.Fatalf("Expected no err got %v", err) - } - - err = tester.TestExchanges(p2ptest.Exchange{ - Label: "RetrieveRequestMsg", - Triggers: []p2ptest.Trigger{ - { - Code: 5, - Msg: &RetrieveRequestMsg{ - Addr: hash, - SkipCheck: true, - }, - Peer: node.ID(), - }, - }, - Expects: []p2ptest.Expect{ - { Code: 6, Msg: &ChunkDeliveryMsg{ - Addr: hash, - SData: hash, + Addr: ch.Address(), + SData: ch.Data(), }, Peer: node.ID(), }, @@ -294,7 +156,7 @@ func TestRequestFromPeers(t *testing.T) { // an empty priorityQueue has to be created to prevent a goroutine being called after the test has finished sp := &Peer{ - Peer: protocolsPeer, + BzzPeer: &network.BzzPeer{Peer: protocolsPeer, BzzAddr: addr}, pq: pq.New(int(PriorityQueue), PriorityQueueCap), streamer: r, } @@ -334,7 +196,7 @@ func TestRequestFromPeersWithLightNode(t *testing.T) { r := NewRegistry(addr.ID(), delivery, nil, nil, nil, nil) // an empty priorityQueue has to be created to prevent a goroutine being called after the test has finished sp := &Peer{ - Peer: protocolsPeer, + BzzPeer: &network.BzzPeer{Peer: protocolsPeer, BzzAddr: addr}, pq: pq.New(int(PriorityQueue), PriorityQueueCap), streamer: r, } @@ -358,8 +220,7 @@ func TestRequestFromPeersWithLightNode(t *testing.T) { func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) { tester, streamer, localStore, teardown, err := newStreamerTester(&RegistryOptions{ - Retrieval: RetrievalDisabled, - Syncing: SyncingDisabled, + Syncing: SyncingDisabled, }) if err != nil { t.Fatal(err) @@ -420,14 +281,14 @@ func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) { defer cancel() // wait for the chunk to get stored - storedChunk, err := localStore.Get(ctx, chunkKey) + storedChunk, err := localStore.Get(ctx, chunk.ModeGetRequest, chunkKey) for err != nil { select { case <-ctx.Done(): t.Fatalf("Chunk is not in localstore after timeout, err: %v", err) default: } - storedChunk, err = localStore.Get(ctx, chunkKey) + storedChunk, err = localStore.Get(ctx, chunk.ModeGetRequest, chunkKey) time.Sleep(50 * time.Millisecond) } @@ -471,7 +332,6 @@ func testDeliveryFromNodes(t *testing.T, nodes, chunkCount int, skipCheck bool) r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ SkipCheck: skipCheck, Syncing: SyncingDisabled, - Retrieval: RetrievalEnabled, }, nil) bucket.Store(bucketKeyRegistry, r) @@ -520,7 +380,7 @@ func testDeliveryFromNodes(t *testing.T, nodes, chunkCount int, skipCheck bool) i++ } //...which then gets passed to the round-robin file store - roundRobinFileStore := storage.NewFileStore(newRoundRobinStore(stores...), storage.NewFileStoreParams()) + roundRobinFileStore := storage.NewFileStore(newRoundRobinStore(stores...), storage.NewFileStoreParams(), chunk.NewTags()) //now we can actually upload a (random) file to the round-robin store size := chunkCount * chunkSize log.Debug("Storing data to file store") @@ -622,7 +482,6 @@ func benchmarkDeliveryFromNodes(b *testing.B, nodes, chunkCount int, skipCheck b r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ SkipCheck: skipCheck, Syncing: SyncingDisabled, - Retrieval: RetrievalDisabled, SyncUpdateDelay: 0, }, nil) bucket.Store(bucketKeyRegistry, r) @@ -700,7 +559,7 @@ func benchmarkDeliveryFromNodes(b *testing.B, nodes, chunkCount int, skipCheck b errs := make(chan error) for _, hash := range hashes { go func(h storage.Address) { - _, err := netStore.Get(ctx, h) + _, err := netStore.Get(ctx, chunk.ModeGetRequest, h) log.Warn("test check netstore get", "hash", h, "err", err) errs <- err }(hash) diff --git a/swarm/network/stream/intervals_test.go b/swarm/network/stream/intervals_test.go index 009a941ef..660954857 100644 --- a/swarm/network/stream/intervals_test.go +++ b/swarm/network/stream/intervals_test.go @@ -66,7 +66,6 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) { } r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ - Retrieval: RetrievalDisabled, Syncing: SyncingRegisterOnly, SkipCheck: skipCheck, }, nil) @@ -287,20 +286,20 @@ func enableNotifications(r *Registry, peerID enode.ID, s Stream) error { type testExternalClient struct { hashes chan []byte - store storage.SyncChunkStore + netStore *storage.NetStore enableNotificationsC chan struct{} } -func newTestExternalClient(store storage.SyncChunkStore) *testExternalClient { +func newTestExternalClient(netStore *storage.NetStore) *testExternalClient { return &testExternalClient{ hashes: make(chan []byte), - store: store, + netStore: netStore, enableNotificationsC: make(chan struct{}), } } func (c *testExternalClient) NeedData(ctx context.Context, hash []byte) func(context.Context) error { - wait := c.store.FetchFunc(ctx, storage.Address(hash)) + wait := c.netStore.FetchFunc(ctx, storage.Address(hash)) if wait == nil { return nil } diff --git a/swarm/network/stream/lightnode_test.go b/swarm/network/stream/lightnode_test.go index 501660fab..eb4e73d47 100644 --- a/swarm/network/stream/lightnode_test.go +++ b/swarm/network/stream/lightnode_test.go @@ -22,94 +22,10 @@ import ( ) // This test checks the default behavior of the server, that is -// when it is serving Retrieve requests. -func TestLigthnodeRetrieveRequestWithRetrieve(t *testing.T) { - registryOptions := &RegistryOptions{ - Retrieval: RetrievalClientOnly, - Syncing: SyncingDisabled, - } - tester, _, _, teardown, err := newStreamerTester(registryOptions) - if err != nil { - t.Fatal(err) - } - defer teardown() - - node := tester.Nodes[0] - - stream := NewStream(swarmChunkServerStreamName, "", false) - - err = tester.TestExchanges(p2ptest.Exchange{ - Label: "SubscribeMsg", - Triggers: []p2ptest.Trigger{ - { - Code: 4, - Msg: &SubscribeMsg{ - Stream: stream, - }, - Peer: node.ID(), - }, - }, - }) - if err != nil { - t.Fatalf("Got %v", err) - } - - err = tester.TestDisconnected(&p2ptest.Disconnect{Peer: node.ID()}) - if err == nil || err.Error() != "timed out waiting for peers to disconnect" { - t.Fatalf("Expected no disconnect, got %v", err) - } -} - -// This test checks the Lightnode behavior of server, when serving Retrieve -// requests are disabled -func TestLigthnodeRetrieveRequestWithoutRetrieve(t *testing.T) { - registryOptions := &RegistryOptions{ - Retrieval: RetrievalDisabled, - Syncing: SyncingDisabled, - } - tester, _, _, teardown, err := newStreamerTester(registryOptions) - if err != nil { - t.Fatal(err) - } - defer teardown() - - node := tester.Nodes[0] - - stream := NewStream(swarmChunkServerStreamName, "", false) - - err = tester.TestExchanges( - p2ptest.Exchange{ - Label: "SubscribeMsg", - Triggers: []p2ptest.Trigger{ - { - Code: 4, - Msg: &SubscribeMsg{ - Stream: stream, - }, - Peer: node.ID(), - }, - }, - Expects: []p2ptest.Expect{ - { - Code: 7, - Msg: &SubscribeErrorMsg{ - Error: "stream RETRIEVE_REQUEST not registered", - }, - Peer: node.ID(), - }, - }, - }) - if err != nil { - t.Fatalf("Got %v", err) - } -} - -// This test checks the default behavior of the server, that is // when syncing is enabled. func TestLigthnodeRequestSubscriptionWithSync(t *testing.T) { registryOptions := &RegistryOptions{ - Retrieval: RetrievalDisabled, - Syncing: SyncingRegisterOnly, + Syncing: SyncingRegisterOnly, } tester, _, _, teardown, err := newStreamerTester(registryOptions) if err != nil { @@ -153,8 +69,7 @@ func TestLigthnodeRequestSubscriptionWithSync(t *testing.T) { // when syncing is disabled. func TestLigthnodeRequestSubscriptionWithoutSync(t *testing.T) { registryOptions := &RegistryOptions{ - Retrieval: RetrievalDisabled, - Syncing: SyncingDisabled, + Syncing: SyncingDisabled, } tester, _, _, teardown, err := newStreamerTester(registryOptions) if err != nil { diff --git a/swarm/network/stream/messages.go b/swarm/network/stream/messages.go index b293724cc..339101b88 100644 --- a/swarm/network/stream/messages.go +++ b/swarm/network/stream/messages.go @@ -24,9 +24,7 @@ import ( "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/swarm/log" bv "github.com/ethereum/go-ethereum/swarm/network/bitvector" - "github.com/ethereum/go-ethereum/swarm/spancontext" "github.com/ethereum/go-ethereum/swarm/storage" - "github.com/opentracing/opentracing-go" ) var syncBatchTimeout = 30 * time.Second @@ -175,7 +173,11 @@ type QuitMsg struct { } func (p *Peer) handleQuitMsg(req *QuitMsg) error { - return p.removeClient(req.Stream) + err := p.removeClient(req.Stream) + if _, ok := err.(*notFoundError); ok { + return nil + } + return err } // OfferedHashesMsg is the protocol msg for offering to hand over a @@ -197,12 +199,6 @@ func (m OfferedHashesMsg) String() string { func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg) error { metrics.GetOrRegisterCounter("peer.handleofferedhashes", nil).Inc(1) - var sp opentracing.Span - ctx, sp = spancontext.StartSpan( - ctx, - "handle.offered.hashes") - defer sp.Finish() - c, _, err := p.getOrSetClient(req.Stream, req.From, req.To) if err != nil { return err @@ -219,6 +215,9 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg return fmt.Errorf("error initiaising bitvector of length %v: %v", lenHashes/HashSize, err) } + var wantDelaySet bool + var wantDelay time.Time + ctr := 0 errC := make(chan error) ctx, cancel := context.WithTimeout(ctx, syncBatchTimeout) @@ -230,6 +229,13 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg if wait := c.NeedData(ctx, hash); wait != nil { ctr++ want.Set(i/HashSize, true) + + // measure how long it takes before we mark chunks for retrieval, and actually send the request + if !wantDelaySet { + wantDelaySet = true + wantDelay = time.Now() + } + // create request and wait until the chunk data arrives and is stored go func(w func(context.Context) error) { select { @@ -247,7 +253,7 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg case err := <-errC: if err != nil { log.Debug("client.handleOfferedHashesMsg() error waiting for chunk, dropping peer", "peer", p.ID(), "err", err) - p.Drop(err) + p.Drop() return } case <-ctx.Done(): @@ -283,28 +289,34 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg From: from, To: to, } - go func() { - log.Trace("sending want batch", "peer", p.ID(), "stream", msg.Stream, "from", msg.From, "to", msg.To) - select { - case err := <-c.next: - if err != nil { - log.Warn("c.next error dropping peer", "err", err) - p.Drop(err) - return - } - case <-c.quit: - log.Debug("client.handleOfferedHashesMsg() quit") - return - case <-ctx.Done(): - log.Debug("client.handleOfferedHashesMsg() context done", "ctx.Err()", ctx.Err()) - return - } - log.Trace("sending want batch", "peer", p.ID(), "stream", msg.Stream, "from", msg.From, "to", msg.To) - err := p.SendPriority(ctx, msg, c.priority) + + log.Trace("sending want batch", "peer", p.ID(), "stream", msg.Stream, "from", msg.From, "to", msg.To) + select { + case err := <-c.next: if err != nil { - log.Warn("SendPriority error", "err", err) + log.Warn("c.next error dropping peer", "err", err) + p.Drop() + return err } - }() + case <-c.quit: + log.Debug("client.handleOfferedHashesMsg() quit") + return nil + case <-ctx.Done(): + log.Debug("client.handleOfferedHashesMsg() context done", "ctx.Err()", ctx.Err()) + return nil + } + log.Trace("sending want batch", "peer", p.ID(), "stream", msg.Stream, "from", msg.From, "to", msg.To) + + // record want delay + if wantDelaySet { + metrics.GetOrRegisterResettingTimer("handleoffered.wantdelay", nil).UpdateSince(wantDelay) + } + + err = p.SendPriority(ctx, msg, c.priority) + if err != nil { + log.Warn("SendPriority error", "err", err) + } + return nil } diff --git a/swarm/network/stream/peer.go b/swarm/network/stream/peer.go index 152814bd4..28fd06e4d 100644 --- a/swarm/network/stream/peer.go +++ b/swarm/network/stream/peer.go @@ -24,8 +24,10 @@ import ( "time" "github.com/ethereum/go-ethereum/metrics" - "github.com/ethereum/go-ethereum/p2p/protocols" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/log" + "github.com/ethereum/go-ethereum/swarm/network" pq "github.com/ethereum/go-ethereum/swarm/network/priorityqueue" "github.com/ethereum/go-ethereum/swarm/network/stream/intervals" "github.com/ethereum/go-ethereum/swarm/spancontext" @@ -54,7 +56,7 @@ var ErrMaxPeerServers = errors.New("max peer servers") // Peer is the Peer extension for the streaming protocol type Peer struct { - *protocols.Peer + *network.BzzPeer streamer *Registry pq *pq.PriorityQueue serverMu sync.RWMutex @@ -74,9 +76,9 @@ type WrappedPriorityMsg struct { } // NewPeer is the constructor for Peer -func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer { +func NewPeer(peer *network.BzzPeer, streamer *Registry) *Peer { p := &Peer{ - Peer: peer, + BzzPeer: peer, pq: pq.New(int(PriorityQueue), PriorityQueueCap), streamer: streamer, servers: make(map[Stream]*server), @@ -90,7 +92,7 @@ func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer { err := p.Send(wmsg.Context, wmsg.Msg) if err != nil { log.Error("Message send error, dropping peer", "peer", p.ID(), "err", err) - p.Drop(err) + p.Drop() } }) @@ -134,7 +136,7 @@ func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer { func (p *Peer) Deliver(ctx context.Context, chunk storage.Chunk, priority uint8, syncing bool) error { var msg interface{} - spanName := "send.chunk.delivery" + metrics.GetOrRegisterCounter("peer.deliver", nil).Inc(1) //we send different types of messages if delivery is for syncing or retrievals, //even if handling and content of the message are the same, @@ -144,16 +146,13 @@ func (p *Peer) Deliver(ctx context.Context, chunk storage.Chunk, priority uint8, Addr: chunk.Address(), SData: chunk.Data(), } - spanName += ".syncing" } else { msg = &ChunkDeliveryMsgRetrieval{ Addr: chunk.Address(), SData: chunk.Data(), } - spanName += ".retrieval" } - ctx = context.WithValue(ctx, "stream_send_tag", nil) return p.SendPriority(ctx, msg, priority) } @@ -416,7 +415,174 @@ func (p *Peer) removeClientParams(s Stream) error { } func (p *Peer) close() { + p.serverMu.Lock() + defer p.serverMu.Unlock() + for _, s := range p.servers { s.Close() } + + p.servers = nil +} + +// runUpdateSyncing is a long running function that creates the initial +// syncing subscriptions to the peer and waits for neighbourhood depth change +// to create new ones or quit existing ones based on the new neighbourhood depth +// and if peer enters or leaves nearest neighbourhood by using +// syncSubscriptionsDiff and updateSyncSubscriptions functions. +func (p *Peer) runUpdateSyncing() { + timer := time.NewTimer(p.streamer.syncUpdateDelay) + defer timer.Stop() + + select { + case <-timer.C: + case <-p.streamer.quit: + return + } + + kad := p.streamer.delivery.kad + po := chunk.Proximity(p.BzzAddr.Over(), kad.BaseAddr()) + + depth := kad.NeighbourhoodDepth() + + log.Debug("update syncing subscriptions: initial", "peer", p.ID(), "po", po, "depth", depth) + + // initial subscriptions + p.updateSyncSubscriptions(syncSubscriptionsDiff(po, -1, depth, kad.MaxProxDisplay)) + + depthChangeSignal, unsubscribeDepthChangeSignal := kad.SubscribeToNeighbourhoodDepthChange() + defer unsubscribeDepthChangeSignal() + + prevDepth := depth + for { + select { + case _, ok := <-depthChangeSignal: + if !ok { + return + } + // update subscriptions for this peer when depth changes + depth := kad.NeighbourhoodDepth() + log.Debug("update syncing subscriptions", "peer", p.ID(), "po", po, "depth", depth) + p.updateSyncSubscriptions(syncSubscriptionsDiff(po, prevDepth, depth, kad.MaxProxDisplay)) + prevDepth = depth + case <-p.streamer.quit: + return + } + } + log.Debug("update syncing subscriptions: exiting", "peer", p.ID()) +} + +// updateSyncSubscriptions accepts two slices of integers, the first one +// representing proximity order bins for required syncing subscriptions +// and the second one representing bins for syncing subscriptions that +// need to be removed. This function sends request for subscription +// messages and quit messages for provided bins. +func (p *Peer) updateSyncSubscriptions(subBins, quitBins []int) { + if p.streamer.getPeer(p.ID()) == nil { + log.Debug("update syncing subscriptions", "peer not found", p.ID()) + return + } + log.Debug("update syncing subscriptions", "peer", p.ID(), "subscribe", subBins, "quit", quitBins) + for _, po := range subBins { + p.subscribeSync(po) + } + for _, po := range quitBins { + p.quitSync(po) + } +} + +// subscribeSync send the request for syncing subscriptions to the peer +// using subscriptionFunc. This function is used to request syncing subscriptions +// when new peer is added to the registry and on neighbourhood depth change. +func (p *Peer) subscribeSync(po int) { + err := subscriptionFunc(p.streamer, p.ID(), uint8(po)) + if err != nil { + log.Error("subscription", "err", err) + } +} + +// quitSync sends the quit message for live and history syncing streams to the peer. +// This function is used in runUpdateSyncing indirectly over updateSyncSubscriptions +// to remove unneeded syncing subscriptions on neighbourhood depth change. +func (p *Peer) quitSync(po int) { + live := NewStream("SYNC", FormatSyncBinKey(uint8(po)), true) + history := getHistoryStream(live) + err := p.streamer.Quit(p.ID(), live) + if err != nil && err != p2p.ErrShuttingDown { + log.Error("quit", "err", err, "peer", p.ID(), "stream", live) + } + err = p.streamer.Quit(p.ID(), history) + if err != nil && err != p2p.ErrShuttingDown { + log.Error("quit", "err", err, "peer", p.ID(), "stream", history) + } + + err = p.removeServer(live) + if err != nil { + log.Error("remove server", "err", err, "peer", p.ID(), "stream", live) + } + err = p.removeServer(history) + if err != nil { + log.Error("remove server", "err", err, "peer", p.ID(), "stream", live) + } +} + +// syncSubscriptionsDiff calculates to which proximity order bins a peer +// (with po peerPO) needs to be subscribed after kademlia neighbourhood depth +// change from prevDepth to newDepth. Max argument limits the number of +// proximity order bins. Returned values are slices of integers which represent +// proximity order bins, the first one to which additional subscriptions need to +// be requested and the second one which subscriptions need to be quit. Argument +// prevDepth with value less then 0 represents no previous depth, used for +// initial syncing subscriptions. +func syncSubscriptionsDiff(peerPO, prevDepth, newDepth, max int) (subBins, quitBins []int) { + newStart, newEnd := syncBins(peerPO, newDepth, max) + if prevDepth < 0 { + // no previous depth, return the complete range + // for subscriptions requests and nothing for quitting + return intRange(newStart, newEnd), nil + } + + prevStart, prevEnd := syncBins(peerPO, prevDepth, max) + + if newStart < prevStart { + subBins = append(subBins, intRange(newStart, prevStart)...) + } + + if prevStart < newStart { + quitBins = append(quitBins, intRange(prevStart, newStart)...) + } + + if newEnd < prevEnd { + quitBins = append(quitBins, intRange(newEnd, prevEnd)...) + } + + if prevEnd < newEnd { + subBins = append(subBins, intRange(prevEnd, newEnd)...) + } + + return subBins, quitBins +} + +// syncBins returns the range to which proximity order bins syncing +// subscriptions need to be requested, based on peer proximity and +// kademlia neighbourhood depth. Returned range is [start,end), inclusive for +// start and exclusive for end. +func syncBins(peerPO, depth, max int) (start, end int) { + if peerPO < depth { + // subscribe only to peerPO bin if it is not + // in the nearest neighbourhood + return peerPO, peerPO + 1 + } + // subscribe from depth to max bin if the peer + // is in the nearest neighbourhood + return depth, max + 1 +} + +// intRange returns the slice of integers [start,end). The start +// is inclusive and the end is not. +func intRange(start, end int) (r []int) { + for i := start; i < end; i++ { + r = append(r, i) + } + return r } diff --git a/swarm/network/stream/peer_test.go b/swarm/network/stream/peer_test.go new file mode 100644 index 000000000..98c5cc010 --- /dev/null +++ b/swarm/network/stream/peer_test.go @@ -0,0 +1,309 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package stream + +import ( + "context" + "fmt" + "reflect" + "sort" + "sync" + "testing" + "time" + + "github.com/ethereum/go-ethereum/node" + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/ethereum/go-ethereum/p2p/simulations/adapters" + "github.com/ethereum/go-ethereum/swarm/chunk" + "github.com/ethereum/go-ethereum/swarm/network" + "github.com/ethereum/go-ethereum/swarm/network/simulation" + "github.com/ethereum/go-ethereum/swarm/state" +) + +// TestSyncSubscriptionsDiff validates the output of syncSubscriptionsDiff +// function for various arguments. +func TestSyncSubscriptionsDiff(t *testing.T) { + max := network.NewKadParams().MaxProxDisplay + for _, tc := range []struct { + po, prevDepth, newDepth int + subBins, quitBins []int + }{ + { + po: 0, prevDepth: -1, newDepth: 0, + subBins: []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}, + }, + { + po: 1, prevDepth: -1, newDepth: 0, + subBins: []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}, + }, + { + po: 2, prevDepth: -1, newDepth: 0, + subBins: []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}, + }, + { + po: 0, prevDepth: -1, newDepth: 1, + subBins: []int{0}, + }, + { + po: 1, prevDepth: -1, newDepth: 1, + subBins: []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}, + }, + { + po: 2, prevDepth: -1, newDepth: 2, + subBins: []int{2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}, + }, + { + po: 3, prevDepth: -1, newDepth: 2, + subBins: []int{2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}, + }, + { + po: 1, prevDepth: -1, newDepth: 2, + subBins: []int{1}, + }, + { + po: 0, prevDepth: 0, newDepth: 0, // 0-16 -> 0-16 + }, + { + po: 1, prevDepth: 0, newDepth: 0, // 0-16 -> 0-16 + }, + { + po: 0, prevDepth: 0, newDepth: 1, // 0-16 -> 0 + quitBins: []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}, + }, + { + po: 0, prevDepth: 0, newDepth: 2, // 0-16 -> 0 + quitBins: []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}, + }, + { + po: 1, prevDepth: 0, newDepth: 1, // 0-16 -> 1-16 + quitBins: []int{0}, + }, + { + po: 1, prevDepth: 1, newDepth: 0, // 1-16 -> 0-16 + subBins: []int{0}, + }, + { + po: 4, prevDepth: 0, newDepth: 1, // 0-16 -> 1-16 + quitBins: []int{0}, + }, + { + po: 4, prevDepth: 0, newDepth: 4, // 0-16 -> 4-16 + quitBins: []int{0, 1, 2, 3}, + }, + { + po: 4, prevDepth: 0, newDepth: 5, // 0-16 -> 4 + quitBins: []int{0, 1, 2, 3, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}, + }, + { + po: 4, prevDepth: 5, newDepth: 0, // 4 -> 0-16 + subBins: []int{0, 1, 2, 3, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}, + }, + { + po: 4, prevDepth: 5, newDepth: 6, // 4 -> 4 + }, + } { + subBins, quitBins := syncSubscriptionsDiff(tc.po, tc.prevDepth, tc.newDepth, max) + if fmt.Sprint(subBins) != fmt.Sprint(tc.subBins) { + t.Errorf("po: %v, prevDepth: %v, newDepth: %v: got subBins %v, want %v", tc.po, tc.prevDepth, tc.newDepth, subBins, tc.subBins) + } + if fmt.Sprint(quitBins) != fmt.Sprint(tc.quitBins) { + t.Errorf("po: %v, prevDepth: %v, newDepth: %v: got quitBins %v, want %v", tc.po, tc.prevDepth, tc.newDepth, quitBins, tc.quitBins) + } + } +} + +// TestUpdateSyncingSubscriptions validates that syncing subscriptions are correctly +// made on initial node connections and that subscriptions are correctly changed +// when kademlia neighbourhood depth is changed by connecting more nodes. +func TestUpdateSyncingSubscriptions(t *testing.T) { + sim := simulation.New(map[string]simulation.ServiceFunc{ + "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { + addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket) + if err != nil { + return nil, nil, err + } + r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ + SyncUpdateDelay: 100 * time.Millisecond, + Syncing: SyncingAutoSubscribe, + }, nil) + cleanup = func() { + r.Close() + clean() + } + bucket.Store("bzz-address", addr) + return r, cleanup, nil + }, + }) + defer sim.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) + defer cancel() + + result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) (err error) { + // initial nodes, first one as pivot center of the start + ids, err := sim.AddNodesAndConnectStar(10) + if err != nil { + return err + } + + // pivot values + pivotRegistryID := ids[0] + pivotRegistry := sim.Service("streamer", pivotRegistryID).(*Registry) + pivotKademlia := pivotRegistry.delivery.kad + // nodes proximities from the pivot node + nodeProximities := make(map[string]int) + for _, id := range ids[1:] { + bzzAddr, ok := sim.NodeItem(id, "bzz-address") + if !ok { + t.Fatal("no bzz address for node") + } + nodeProximities[id.String()] = chunk.Proximity(pivotKademlia.BaseAddr(), bzzAddr.(*network.BzzAddr).Over()) + } + // wait until sync subscriptions are done for all nodes + waitForSubscriptions(t, pivotRegistry, ids[1:]...) + + // check initial sync streams + err = checkSyncStreamsWithRetry(pivotRegistry, nodeProximities) + if err != nil { + return err + } + + // add more nodes until the depth is changed + prevDepth := pivotKademlia.NeighbourhoodDepth() + var noDepthChangeChecked bool // true it there was a check when no depth is changed + for { + ids, err := sim.AddNodes(5) + if err != nil { + return err + } + // add new nodes to sync subscriptions check + for _, id := range ids { + bzzAddr, ok := sim.NodeItem(id, "bzz-address") + if !ok { + t.Fatal("no bzz address for node") + } + nodeProximities[id.String()] = chunk.Proximity(pivotKademlia.BaseAddr(), bzzAddr.(*network.BzzAddr).Over()) + } + err = sim.Net.ConnectNodesStar(ids, pivotRegistryID) + if err != nil { + return err + } + waitForSubscriptions(t, pivotRegistry, ids...) + + newDepth := pivotKademlia.NeighbourhoodDepth() + // depth is not changed, check if streams are still correct + if newDepth == prevDepth { + err = checkSyncStreamsWithRetry(pivotRegistry, nodeProximities) + if err != nil { + return err + } + noDepthChangeChecked = true + } + // do the final check when depth is changed and + // there has been at least one check + // for the case when depth is not changed + if newDepth != prevDepth && noDepthChangeChecked { + // check sync streams for changed depth + return checkSyncStreamsWithRetry(pivotRegistry, nodeProximities) + } + prevDepth = newDepth + } + }) + if result.Error != nil { + t.Fatal(result.Error) + } +} + +// waitForSubscriptions is a test helper function that blocks until +// stream server subscriptions are established on the provided registry +// to the nodes with provided IDs. +func waitForSubscriptions(t *testing.T, r *Registry, ids ...enode.ID) { + t.Helper() + + for retries := 0; retries < 100; retries++ { + subs := r.api.GetPeerServerSubscriptions() + if allSubscribed(subs, ids) { + return + } + time.Sleep(50 * time.Millisecond) + } + t.Fatalf("missing subscriptions") +} + +// allSubscribed returns true if nodes with ids have subscriptions +// in provided subs map. +func allSubscribed(subs map[string][]string, ids []enode.ID) bool { + for _, id := range ids { + if s, ok := subs[id.String()]; !ok || len(s) == 0 { + return false + } + } + return true +} + +// checkSyncStreamsWithRetry is calling checkSyncStreams with retries. +func checkSyncStreamsWithRetry(r *Registry, nodeProximities map[string]int) (err error) { + for retries := 0; retries < 5; retries++ { + err = checkSyncStreams(r, nodeProximities) + if err == nil { + return nil + } + time.Sleep(500 * time.Millisecond) + } + return err +} + +// checkSyncStreams validates that registry contains expected sync +// subscriptions to nodes with proximities in a map nodeProximities. +func checkSyncStreams(r *Registry, nodeProximities map[string]int) error { + depth := r.delivery.kad.NeighbourhoodDepth() + maxPO := r.delivery.kad.MaxProxDisplay + for id, po := range nodeProximities { + wantStreams := syncStreams(po, depth, maxPO) + gotStreams := nodeStreams(r, id) + + if r.getPeer(enode.HexID(id)) == nil { + // ignore removed peer + continue + } + + if !reflect.DeepEqual(gotStreams, wantStreams) { + return fmt.Errorf("node %s got streams %v, want %v", id, gotStreams, wantStreams) + } + } + return nil +} + +// syncStreams returns expected sync streams that need to be +// established between a node with kademlia neighbourhood depth +// and a node with proximity order po. +func syncStreams(po, depth, maxPO int) (streams []string) { + start, end := syncBins(po, depth, maxPO) + for bin := start; bin < end; bin++ { + streams = append(streams, NewStream("SYNC", FormatSyncBinKey(uint8(bin)), false).String()) + streams = append(streams, NewStream("SYNC", FormatSyncBinKey(uint8(bin)), true).String()) + } + return streams +} + +// nodeStreams returns stream server subscriptions on a registry +// to the peer with provided id. +func nodeStreams(r *Registry, id string) []string { + streams := r.api.GetPeerServerSubscriptions()[id] + sort.Strings(streams) + return streams +} diff --git a/swarm/network/stream/snapshot_retrieval_test.go b/swarm/network/stream/snapshot_retrieval_test.go index 2957999f8..e34f87951 100644 --- a/swarm/network/stream/snapshot_retrieval_test.go +++ b/swarm/network/stream/snapshot_retrieval_test.go @@ -25,6 +25,7 @@ import ( "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/simulations/adapters" + "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/log" "github.com/ethereum/go-ethereum/swarm/network/simulation" "github.com/ethereum/go-ethereum/swarm/state" @@ -118,7 +119,6 @@ var retrievalSimServiceMap = map[string]simulation.ServiceFunc{ } r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ - Retrieval: RetrievalEnabled, Syncing: SyncingAutoSubscribe, SyncUpdateDelay: syncUpdateDelay, }, nil) @@ -278,8 +278,8 @@ func runRetrievalTest(t *testing.T, chunkCount int, nodeCount int) error { if !ok { return fmt.Errorf("No localstore") } - lstore := item.(*storage.LocalStore) - conf.hashes, err = uploadFileToSingleNodeStore(node.ID(), chunkCount, lstore) + store := item.(chunk.Store) + conf.hashes, err = uploadFileToSingleNodeStore(node.ID(), chunkCount, store) if err != nil { return err } diff --git a/swarm/network/stream/snapshot_sync_test.go b/swarm/network/stream/snapshot_sync_test.go index ce1e69db2..da4ff673b 100644 --- a/swarm/network/stream/snapshot_sync_test.go +++ b/swarm/network/stream/snapshot_sync_test.go @@ -31,6 +31,7 @@ import ( "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/simulations" "github.com/ethereum/go-ethereum/p2p/simulations/adapters" + "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/network" "github.com/ethereum/go-ethereum/swarm/network/simulation" "github.com/ethereum/go-ethereum/swarm/pot" @@ -117,7 +118,6 @@ var simServiceMap = map[string]simulation.ServiceFunc{ store := state.NewInmemoryStore() r := NewRegistry(addr.ID(), delivery, netStore, store, &RegistryOptions{ - Retrieval: RetrievalDisabled, Syncing: SyncingAutoSubscribe, SyncUpdateDelay: 3 * time.Second, }, nil) @@ -190,10 +190,10 @@ func runSim(conf *synctestConfig, ctx context.Context, sim *simulation.Simulatio node := sim.Net.GetRandomUpNode() item, ok := sim.NodeItem(node.ID(), bucketKeyStore) if !ok { - return fmt.Errorf("No localstore") + return errors.New("no store in simulation bucket") } - lstore := item.(*storage.LocalStore) - hashes, err := uploadFileToSingleNodeStore(node.ID(), chunkCount, lstore) + store := item.(chunk.Store) + hashes, err := uploadFileToSingleNodeStore(node.ID(), chunkCount, store) if err != nil { return err } @@ -221,25 +221,25 @@ func runSim(conf *synctestConfig, ctx context.Context, sim *simulation.Simulatio localChunks := conf.idToChunksMap[id] for _, ch := range localChunks { //get the real chunk by the index in the index array - chunk := conf.hashes[ch] - log.Trace(fmt.Sprintf("node has chunk: %s:", chunk)) + ch := conf.hashes[ch] + log.Trace("node has chunk", "address", ch) //check if the expected chunk is indeed in the localstore var err error if *useMockStore { //use the globalStore if the mockStore should be used; in that case, //the complete localStore stack is bypassed for getting the chunk - _, err = globalStore.Get(common.BytesToAddress(id.Bytes()), chunk) + _, err = globalStore.Get(common.BytesToAddress(id.Bytes()), ch) } else { //use the actual localstore item, ok := sim.NodeItem(id, bucketKeyStore) if !ok { - return fmt.Errorf("Error accessing localstore") + return errors.New("no store in simulation bucket") } - lstore := item.(*storage.LocalStore) - _, err = lstore.Get(ctx, chunk) + store := item.(chunk.Store) + _, err = store.Get(ctx, chunk.ModeGetLookup, ch) } if err != nil { - log.Debug(fmt.Sprintf("Chunk %s NOT found for id %s", chunk, id)) + log.Debug("chunk not found", "address", ch.Hex(), "node", id) // Do not get crazy with logging the warn message time.Sleep(500 * time.Millisecond) continue REPEAT @@ -247,10 +247,10 @@ func runSim(conf *synctestConfig, ctx context.Context, sim *simulation.Simulatio evt := &simulations.Event{ Type: EventTypeChunkArrived, Node: sim.Net.GetNode(id), - Data: chunk.String(), + Data: ch.String(), } sim.Net.Events().Send(evt) - log.Debug(fmt.Sprintf("Chunk %s IS FOUND for id %s", chunk, id)) + log.Trace("chunk found", "address", ch.Hex(), "node", id) } } return nil @@ -296,9 +296,9 @@ func mapKeysToNodes(conf *synctestConfig) { } //upload a file(chunks) to a single local node store -func uploadFileToSingleNodeStore(id enode.ID, chunkCount int, lstore *storage.LocalStore) ([]storage.Address, error) { +func uploadFileToSingleNodeStore(id enode.ID, chunkCount int, store chunk.Store) ([]storage.Address, error) { log.Debug(fmt.Sprintf("Uploading to node id: %s", id)) - fileStore := storage.NewFileStore(lstore, storage.NewFileStoreParams()) + fileStore := storage.NewFileStore(store, storage.NewFileStoreParams(), chunk.NewTags()) size := chunkSize var rootAddrs []storage.Address for i := 0; i < chunkCount; i++ { diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go index 1038e52d0..9cdf5c04b 100644 --- a/swarm/network/stream/stream.go +++ b/swarm/network/stream/stream.go @@ -18,7 +18,6 @@ package stream import ( "context" - "errors" "fmt" "math" "reflect" @@ -49,7 +48,6 @@ const ( // Enumerate options for syncing and retrieval type SyncingOption int -type RetrievalOption int // Syncing options const ( @@ -61,17 +59,6 @@ const ( SyncingAutoSubscribe ) -const ( - // Retrieval disabled. Used mostly for tests to isolate syncing features (i.e. syncing only) - RetrievalDisabled RetrievalOption = iota - // Only the client side of the retrieve request is registered. - // (light nodes do not serve retrieve requests) - // once the client is registered, subscription to retrieve request stream is always sent - RetrievalClientOnly - // Both client and server funcs are registered, subscribe sent automatically - RetrievalEnabled -) - // subscriptionFunc is used to determine what to do in order to perform subscriptions // usually we would start to really subscribe to nodes, but for tests other functionality may be needed // (see TestRequestPeerSubscriptions in streamer_test.go) @@ -79,59 +66,58 @@ var subscriptionFunc = doRequestSubscription // Registry registry for outgoing and incoming streamer constructors type Registry struct { - addr enode.ID - api *API - skipCheck bool - clientMu sync.RWMutex - serverMu sync.RWMutex - peersMu sync.RWMutex - serverFuncs map[string]func(*Peer, string, bool) (Server, error) - clientFuncs map[string]func(*Peer, string, bool) (Client, error) - peers map[enode.ID]*Peer - delivery *Delivery - intervalsStore state.Store - autoRetrieval bool // automatically subscribe to retrieve request stream - maxPeerServers int - spec *protocols.Spec //this protocol's spec - balance protocols.Balance //implements protocols.Balance, for accounting - prices protocols.Prices //implements protocols.Prices, provides prices to accounting - quit chan struct{} // terminates registry goroutines + addr enode.ID + api *API + skipCheck bool + clientMu sync.RWMutex + serverMu sync.RWMutex + peersMu sync.RWMutex + serverFuncs map[string]func(*Peer, string, bool) (Server, error) + clientFuncs map[string]func(*Peer, string, bool) (Client, error) + peers map[enode.ID]*Peer + delivery *Delivery + intervalsStore state.Store + maxPeerServers int + spec *protocols.Spec //this protocol's spec + balance protocols.Balance //implements protocols.Balance, for accounting + prices protocols.Prices //implements protocols.Prices, provides prices to accounting + quit chan struct{} // terminates registry goroutines + syncMode SyncingOption + syncUpdateDelay time.Duration } // RegistryOptions holds optional values for NewRegistry constructor. type RegistryOptions struct { SkipCheck bool - Syncing SyncingOption // Defines syncing behavior - Retrieval RetrievalOption // Defines retrieval behavior + Syncing SyncingOption // Defines syncing behavior SyncUpdateDelay time.Duration MaxPeerServers int // The limit of servers for each peer in registry } // NewRegistry is Streamer constructor -func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.SyncChunkStore, intervalsStore state.Store, options *RegistryOptions, balance protocols.Balance) *Registry { +func NewRegistry(localID enode.ID, delivery *Delivery, netStore *storage.NetStore, intervalsStore state.Store, options *RegistryOptions, balance protocols.Balance) *Registry { if options == nil { options = &RegistryOptions{} } if options.SyncUpdateDelay <= 0 { options.SyncUpdateDelay = 15 * time.Second } - // check if retrieval has been disabled - retrieval := options.Retrieval != RetrievalDisabled quit := make(chan struct{}) streamer := &Registry{ - addr: localID, - skipCheck: options.SkipCheck, - serverFuncs: make(map[string]func(*Peer, string, bool) (Server, error)), - clientFuncs: make(map[string]func(*Peer, string, bool) (Client, error)), - peers: make(map[enode.ID]*Peer), - delivery: delivery, - intervalsStore: intervalsStore, - autoRetrieval: retrieval, - maxPeerServers: options.MaxPeerServers, - balance: balance, - quit: quit, + addr: localID, + skipCheck: options.SkipCheck, + serverFuncs: make(map[string]func(*Peer, string, bool) (Server, error)), + clientFuncs: make(map[string]func(*Peer, string, bool) (Client, error)), + peers: make(map[enode.ID]*Peer), + delivery: delivery, + intervalsStore: intervalsStore, + maxPeerServers: options.MaxPeerServers, + balance: balance, + quit: quit, + syncUpdateDelay: options.SyncUpdateDelay, + syncMode: options.Syncing, } streamer.setupSpec() @@ -139,124 +125,10 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.Sy streamer.api = NewAPI(streamer) delivery.getPeer = streamer.getPeer - // if retrieval is enabled, register the server func, so that retrieve requests will be served (non-light nodes only) - if options.Retrieval == RetrievalEnabled { - streamer.RegisterServerFunc(swarmChunkServerStreamName, func(_ *Peer, _ string, live bool) (Server, error) { - if !live { - return nil, errors.New("only live retrieval requests supported") - } - return NewSwarmChunkServer(delivery.chunkStore), nil - }) - } - - // if retrieval is not disabled, register the client func (both light nodes and normal nodes can issue retrieve requests) - if options.Retrieval != RetrievalDisabled { - streamer.RegisterClientFunc(swarmChunkServerStreamName, func(p *Peer, t string, live bool) (Client, error) { - return NewSwarmSyncerClient(p, syncChunkStore, NewStream(swarmChunkServerStreamName, t, live)) - }) - } - // If syncing is not disabled, the syncing functions are registered (both client and server) if options.Syncing != SyncingDisabled { - RegisterSwarmSyncerServer(streamer, syncChunkStore) - RegisterSwarmSyncerClient(streamer, syncChunkStore) - } - - // if syncing is set to automatically subscribe to the syncing stream, start the subscription process - if options.Syncing == SyncingAutoSubscribe { - // latestIntC function ensures that - // - receiving from the in chan is not blocked by processing inside the for loop - // - the latest int value is delivered to the loop after the processing is done - // In context of NeighbourhoodDepthC: - // after the syncing is done updating inside the loop, we do not need to update on the intermediate - // depth changes, only to the latest one - latestIntC := func(in <-chan int) <-chan int { - out := make(chan int, 1) - - go func() { - defer close(out) - - for { - select { - case i, ok := <-in: - if !ok { - return - } - select { - case <-out: - default: - } - out <- i - case <-quit: - return - } - } - }() - - return out - } - - kad := streamer.delivery.kad - // get notification channels from Kademlia before returning - // from this function to avoid race with Close method and - // the goroutine created below - depthC := latestIntC(kad.NeighbourhoodDepthC()) - addressBookSizeC := latestIntC(kad.AddrCountC()) - - go func() { - // wait for kademlia table to be healthy - // but return if Registry is closed before - select { - case <-time.After(options.SyncUpdateDelay): - case <-quit: - return - } - - // initial requests for syncing subscription to peers - streamer.updateSyncing() - - for depth := range depthC { - log.Debug("Kademlia neighbourhood depth change", "depth", depth) - - // Prevent too early sync subscriptions by waiting until there are no - // new peers connecting. Sync streams updating will be done after no - // peers are connected for at least SyncUpdateDelay period. - timer := time.NewTimer(options.SyncUpdateDelay) - // Hard limit to sync update delay, preventing long delays - // on a very dynamic network - maxTimer := time.NewTimer(3 * time.Minute) - loop: - for { - select { - case <-maxTimer.C: - // force syncing update when a hard timeout is reached - log.Trace("Sync subscriptions update on hard timeout") - // request for syncing subscription to new peers - streamer.updateSyncing() - break loop - case <-timer.C: - // start syncing as no new peers has been added to kademlia - // for some time - log.Trace("Sync subscriptions update") - // request for syncing subscription to new peers - streamer.updateSyncing() - break loop - case size := <-addressBookSizeC: - log.Trace("Kademlia address book size changed on depth change", "size", size) - // new peers has been added to kademlia, - // reset the timer to prevent early sync subscriptions - if !timer.Stop() { - <-timer.C - } - timer.Reset(options.SyncUpdateDelay) - case <-quit: - break loop - } - } - timer.Stop() - maxTimer.Stop() - } - }() + RegisterSwarmSyncerServer(streamer, netStore) + RegisterSwarmSyncerClient(streamer, netStore) } return streamer @@ -381,7 +253,7 @@ func (r *Registry) Subscribe(peerId enode.ID, s Stream, h *Range, priority uint8 } log.Debug("Subscribe ", "peer", peerId, "stream", s, "history", h) - return peer.SendPriority(context.TODO(), msg, priority) + return peer.Send(context.TODO(), msg) } func (r *Registry) Unsubscribe(peerId enode.ID, s Stream) error { @@ -422,8 +294,7 @@ func (r *Registry) Quit(peerId enode.ID, s Stream) error { func (r *Registry) Close() error { // Stop sending neighborhood depth change and address count // change from Kademlia that were initiated in NewRegistry constructor. - r.delivery.kad.CloseNeighbourhoodDepthC() - r.delivery.kad.CloseAddrCountC() + r.delivery.Close() close(r.quit) return r.intervalsStore.Close() } @@ -438,6 +309,7 @@ func (r *Registry) getPeer(peerId enode.ID) *Peer { func (r *Registry) setPeer(peer *Peer) { r.peersMu.Lock() r.peers[peer.ID()] = peer + metrics.GetOrRegisterCounter("registry.setpeer", nil).Inc(1) metrics.GetOrRegisterGauge("registry.peers", nil).Update(int64(len(r.peers))) r.peersMu.Unlock() } @@ -445,6 +317,7 @@ func (r *Registry) setPeer(peer *Peer) { func (r *Registry) deletePeer(peer *Peer) { r.peersMu.Lock() delete(r.peers, peer.ID()) + metrics.GetOrRegisterCounter("registry.deletepeer", nil).Inc(1) metrics.GetOrRegisterGauge("registry.peers", nil).Update(int64(len(r.peers))) r.peersMu.Unlock() } @@ -458,132 +331,31 @@ func (r *Registry) peersCount() (c int) { // Run protocol run function func (r *Registry) Run(p *network.BzzPeer) error { - sp := NewPeer(p.Peer, r) + sp := NewPeer(p, r) r.setPeer(sp) + + if r.syncMode == SyncingAutoSubscribe { + go sp.runUpdateSyncing() + } + defer r.deletePeer(sp) defer close(sp.quit) defer sp.close() - if r.autoRetrieval && !p.LightNode { - err := r.Subscribe(p.ID(), NewStream(swarmChunkServerStreamName, "", true), nil, Top) - if err != nil { - return err - } - } - return sp.Run(sp.HandleMsg) } -// updateSyncing subscribes to SYNC streams by iterating over the -// kademlia connections and bins. If there are existing SYNC streams -// and they are no longer required after iteration, request to Quit -// them will be send to appropriate peers. -func (r *Registry) updateSyncing() { - kad := r.delivery.kad - // map of all SYNC streams for all peers - // used at the and of the function to remove servers - // that are not needed anymore - subs := make(map[enode.ID]map[Stream]struct{}) - r.peersMu.RLock() - for id, peer := range r.peers { - peer.serverMu.RLock() - for stream := range peer.servers { - if stream.Name == "SYNC" { - if _, ok := subs[id]; !ok { - subs[id] = make(map[Stream]struct{}) - } - subs[id][stream] = struct{}{} - } - } - peer.serverMu.RUnlock() - } - r.peersMu.RUnlock() - - // start requesting subscriptions from peers - r.requestPeerSubscriptions(kad, subs) - - // remove SYNC servers that do not need to be subscribed - for id, streams := range subs { - if len(streams) == 0 { - continue - } - peer := r.getPeer(id) - if peer == nil { - continue - } - for stream := range streams { - log.Debug("Remove sync server", "peer", id, "stream", stream) - err := r.Quit(peer.ID(), stream) - if err != nil && err != p2p.ErrShuttingDown { - log.Error("quit", "err", err, "peer", peer.ID(), "stream", stream) - } - } - } -} - -// requestPeerSubscriptions calls on each live peer in the kademlia table -// and sends a `RequestSubscription` to peers according to their bin -// and their relationship with kademlia's depth. -// Also check `TestRequestPeerSubscriptions` in order to understand the -// expected behavior. -// The function expects: -// * the kademlia -// * a map of subscriptions -// * the actual function to subscribe -// (in case of the test, it doesn't do real subscriptions) -func (r *Registry) requestPeerSubscriptions(kad *network.Kademlia, subs map[enode.ID]map[Stream]struct{}) { - - var startPo int - var endPo int - var ok bool - - // kademlia's depth - kadDepth := kad.NeighbourhoodDepth() - // request subscriptions for all nodes and bins - // nil as base takes the node's base; we need to pass 255 as `EachConn` runs - // from deepest bins backwards - kad.EachConn(nil, 255, func(p *network.Peer, po int) bool { - // nodes that do not provide stream protocol - // should not be subscribed, e.g. bootnodes - if !p.HasCap("stream") { - return true - } - //if the peer's bin is shallower than the kademlia depth, - //only the peer's bin should be subscribed - if po < kadDepth { - startPo = po - endPo = po - } else { - //if the peer's bin is equal or deeper than the kademlia depth, - //each bin from the depth up to k.MaxProxDisplay should be subscribed - startPo = kadDepth - endPo = kad.MaxProxDisplay - } - - for bin := startPo; bin <= endPo; bin++ { - //do the actual subscription - ok = subscriptionFunc(r, p, uint8(bin), subs) - } - return ok - }) -} - // doRequestSubscription sends the actual RequestSubscription to the peer -func doRequestSubscription(r *Registry, p *network.Peer, bin uint8, subs map[enode.ID]map[Stream]struct{}) bool { - log.Debug("Requesting subscription by registry:", "registry", r.addr, "peer", p.ID(), "bin", bin) +func doRequestSubscription(r *Registry, id enode.ID, bin uint8) error { + log.Debug("Requesting subscription by registry:", "registry", r.addr, "peer", id, "bin", bin) // bin is always less then 256 and it is safe to convert it to type uint8 stream := NewStream("SYNC", FormatSyncBinKey(bin), true) - if streams, ok := subs[p.ID()]; ok { - // delete live and history streams from the map, so that it won't be removed with a Quit request - delete(streams, stream) - delete(streams, getHistoryStream(stream)) - } - err := r.RequestSubscription(p.ID(), stream, NewRange(0, 0), High) + err := r.RequestSubscription(id, stream, NewRange(0, 0), High) if err != nil { - log.Debug("Request subscription", "err", err, "peer", p.ID(), "stream", stream) - return false + log.Debug("Request subscription", "err", err, "peer", id, "stream", stream) + return err } - return true + return nil } func (r *Registry) runProtocol(p *p2p.Peer, rw p2p.MsgReadWriter) error { @@ -619,24 +391,66 @@ func (p *Peer) HandleMsg(ctx context.Context, msg interface{}) error { return p.handleUnsubscribeMsg(msg) case *OfferedHashesMsg: - return p.handleOfferedHashesMsg(ctx, msg) + go func() { + err := p.handleOfferedHashesMsg(ctx, msg) + if err != nil { + log.Error(err.Error()) + p.Drop() + } + }() + return nil case *TakeoverProofMsg: - return p.handleTakeoverProofMsg(ctx, msg) + go func() { + err := p.handleTakeoverProofMsg(ctx, msg) + if err != nil { + log.Error(err.Error()) + p.Drop() + } + }() + return nil case *WantedHashesMsg: - return p.handleWantedHashesMsg(ctx, msg) + go func() { + err := p.handleWantedHashesMsg(ctx, msg) + if err != nil { + log.Error(err.Error()) + p.Drop() + } + }() + return nil case *ChunkDeliveryMsgRetrieval: // handling chunk delivery is the same for retrieval and syncing, so let's cast the msg - return p.streamer.delivery.handleChunkDeliveryMsg(ctx, p, ((*ChunkDeliveryMsg)(msg))) + go func() { + err := p.streamer.delivery.handleChunkDeliveryMsg(ctx, p, ((*ChunkDeliveryMsg)(msg))) + if err != nil { + log.Error(err.Error()) + p.Drop() + } + }() + return nil case *ChunkDeliveryMsgSyncing: // handling chunk delivery is the same for retrieval and syncing, so let's cast the msg - return p.streamer.delivery.handleChunkDeliveryMsg(ctx, p, ((*ChunkDeliveryMsg)(msg))) + go func() { + err := p.streamer.delivery.handleChunkDeliveryMsg(ctx, p, ((*ChunkDeliveryMsg)(msg))) + if err != nil { + log.Error(err.Error()) + p.Drop() + } + }() + return nil case *RetrieveRequestMsg: - return p.streamer.delivery.handleRetrieveRequestMsg(ctx, p, msg) + go func() { + err := p.streamer.delivery.handleRetrieveRequestMsg(ctx, p, msg) + if err != nil { + log.Error(err.Error()) + p.Drop() + } + }() + return nil case *RequestSubscriptionMsg: return p.handleRequestSubscription(ctx, msg) @@ -767,7 +581,7 @@ func (c *client) batchDone(p *Peer, req *OfferedHashesMsg, hashes []byte) error return err } - if err := p.SendPriority(context.TODO(), tp, c.priority); err != nil { + if err := p.Send(context.TODO(), tp); err != nil { return err } if c.to > 0 && tp.Takeover.End >= c.to { @@ -969,15 +783,13 @@ func (api *API) UnsubscribeStream(peerId enode.ID, s Stream) error { } /* -GetPeerSubscriptions is a API function which allows to query a peer for stream subscriptions it has. +GetPeerServerSubscriptions is a API function which allows to query a peer for stream subscriptions it has. It can be called via RPC. It returns a map of node IDs with an array of string representations of Stream objects. */ -func (api *API) GetPeerSubscriptions() map[string][]string { - //create the empty map +func (api *API) GetPeerServerSubscriptions() map[string][]string { pstreams := make(map[string][]string) - //iterate all streamer peers api.streamer.peersMu.RLock() defer api.streamer.peersMu.RUnlock() diff --git a/swarm/network/stream/streamer_test.go b/swarm/network/stream/streamer_test.go index bdd3087bb..767112b2b 100644 --- a/swarm/network/stream/streamer_test.go +++ b/swarm/network/stream/streamer_test.go @@ -28,9 +28,6 @@ import ( "testing" "time" - "github.com/ethereum/go-ethereum/swarm/testutil" - - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/p2p/enode" @@ -39,6 +36,7 @@ import ( "github.com/ethereum/go-ethereum/swarm/network" "github.com/ethereum/go-ethereum/swarm/network/simulation" "github.com/ethereum/go-ethereum/swarm/state" + "github.com/ethereum/go-ethereum/swarm/testutil" "golang.org/x/crypto/sha3" ) @@ -539,7 +537,7 @@ func TestStreamerDownstreamCorruptHashesMsgExchange(t *testing.T) { t.Fatal(err) } - expectedError := errors.New("Message handler error: (msg code 1): error invalid hashes length (len: 40)") + expectedError := errors.New("subprotocol error") if err := tester.TestDisconnected(&p2ptest.Disconnect{Peer: node.ID(), Error: expectedError}); err != nil { t.Fatal(err) } @@ -779,7 +777,6 @@ func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) { func TestMaxPeerServersWithUnsubscribe(t *testing.T) { var maxPeerServers = 6 tester, streamer, _, teardown, err := newStreamerTester(&RegistryOptions{ - Retrieval: RetrievalDisabled, Syncing: SyncingDisabled, MaxPeerServers: maxPeerServers, }) @@ -940,8 +937,7 @@ func TestMaxPeerServersWithoutUnsubscribe(t *testing.T) { //`Price` interface implementation func TestHasPriceImplementation(t *testing.T) { _, r, _, teardown, err := newStreamerTester(&RegistryOptions{ - Retrieval: RetrievalDisabled, - Syncing: SyncingDisabled, + Syncing: SyncingDisabled, }) if err != nil { t.Fatal(err) @@ -967,164 +963,8 @@ func TestHasPriceImplementation(t *testing.T) { } } -/* -TestRequestPeerSubscriptions is a unit test for stream's pull sync subscriptions. - -The test does: - * assign each connected peer to a bin map - * build up a known kademlia in advance - * run the EachConn function, which returns supposed subscription bins - * store all supposed bins per peer in a map - * check that all peers have the expected subscriptions - -This kad table and its peers are copied from network.TestKademliaCase1, -it represents an edge case but for the purpose of testing the -syncing subscriptions it is just fine. - -Addresses used in this test are discovered as part of the simulation network -in higher level tests for streaming. They were generated randomly. - -The resulting kademlia looks like this: -========================================================================= -Fri Dec 21 20:02:39 UTC 2018 KΛÐΞMLIΛ hive: queen's address: 7efef1 -population: 12 (12), MinProxBinSize: 2, MinBinSize: 2, MaxBinSize: 4 -000 2 8196 835f | 2 8196 (0) 835f (0) -001 2 2690 28f0 | 2 2690 (0) 28f0 (0) -002 2 4d72 4a45 | 2 4d72 (0) 4a45 (0) -003 1 646e | 1 646e (0) -004 3 769c 76d1 7656 | 3 769c (0) 76d1 (0) 7656 (0) -============ DEPTH: 5 ========================================== -005 1 7a48 | 1 7a48 (0) -006 1 7cbd | 1 7cbd (0) -007 0 | 0 -008 0 | 0 -009 0 | 0 -010 0 | 0 -011 0 | 0 -012 0 | 0 -013 0 | 0 -014 0 | 0 -015 0 | 0 -========================================================================= -*/ -func TestRequestPeerSubscriptions(t *testing.T) { - // the pivot address; this is the actual kademlia node - pivotAddr := "7efef1c41d77f843ad167be95f6660567eb8a4a59f39240000cce2e0d65baf8e" - - // a map of bin number to addresses from the given kademlia - binMap := make(map[int][]string) - binMap[0] = []string{ - "835fbbf1d16ba7347b6e2fc552d6e982148d29c624ea20383850df3c810fa8fc", - "81968a2d8fb39114342ee1da85254ec51e0608d7f0f6997c2a8354c260a71009", - } - binMap[1] = []string{ - "28f0bc1b44658548d6e05dd16d4c2fe77f1da5d48b6774bc4263b045725d0c19", - "2690a910c33ee37b91eb6c4e0731d1d345e2dc3b46d308503a6e85bbc242c69e", - } - binMap[2] = []string{ - "4a45f1fc63e1a9cb9dfa44c98da2f3d20c2923e5d75ff60b2db9d1bdb0c54d51", - "4d72a04ddeb851a68cd197ef9a92a3e2ff01fbbff638e64929dd1a9c2e150112", - } - binMap[3] = []string{ - "646e9540c84f6a2f9cf6585d45a4c219573b4fd1b64a3c9a1386fc5cf98c0d4d", - } - binMap[4] = []string{ - "7656caccdc79cd8d7ce66d415cc96a718e8271c62fb35746bfc2b49faf3eebf3", - "76d1e83c71ca246d042e37ff1db181f2776265fbcfdc890ce230bfa617c9c2f0", - "769ce86aa90b518b7ed382f9fdacfbed93574e18dc98fe6c342e4f9f409c2d5a", - } - binMap[5] = []string{ - "7a48f75f8ca60487ae42d6f92b785581b40b91f2da551ae73d5eae46640e02e8", - } - binMap[6] = []string{ - "7cbd42350bde8e18ae5b955b5450f8e2cef3419f92fbf5598160c60fd78619f0", - } - - // create the pivot's kademlia - addr := common.FromHex(pivotAddr) - k := network.NewKademlia(addr, network.NewKadParams()) - - // construct the peers and the kademlia - for _, binaddrs := range binMap { - for _, a := range binaddrs { - addr := common.FromHex(a) - k.On(network.NewPeer(&network.BzzPeer{BzzAddr: &network.BzzAddr{OAddr: addr}}, k)) - } - } - - // TODO: check kad table is same - // currently k.String() prints date so it will never be the same :) - // --> implement JSON representation of kad table - log.Debug(k.String()) - - // simulate that we would do subscriptions: just store the bin numbers - fakeSubscriptions := make(map[string][]int) - //after the test, we need to reset the subscriptionFunc to the default - defer func() { subscriptionFunc = doRequestSubscription }() - // define the function which should run for each connection - // instead of doing real subscriptions, we just store the bin numbers - subscriptionFunc = func(r *Registry, p *network.Peer, bin uint8, subs map[enode.ID]map[Stream]struct{}) bool { - // get the peer ID - peerstr := fmt.Sprintf("%x", p.Over()) - // create the array of bins per peer - if _, ok := fakeSubscriptions[peerstr]; !ok { - fakeSubscriptions[peerstr] = make([]int, 0) - } - // store the (fake) bin subscription - log.Debug(fmt.Sprintf("Adding fake subscription for peer %s with bin %d", peerstr, bin)) - fakeSubscriptions[peerstr] = append(fakeSubscriptions[peerstr], int(bin)) - return true - } - // create just a simple Registry object in order to be able to call... - r := &Registry{} - r.requestPeerSubscriptions(k, nil) - // calculate the kademlia depth - kdepth := k.NeighbourhoodDepth() - - // now, check that all peers have the expected (fake) subscriptions - // iterate the bin map - for bin, peers := range binMap { - // for every peer... - for _, peer := range peers { - // ...get its (fake) subscriptions - fakeSubsForPeer := fakeSubscriptions[peer] - // if the peer's bin is shallower than the kademlia depth... - if bin < kdepth { - // (iterate all (fake) subscriptions) - for _, subbin := range fakeSubsForPeer { - // ...only the peer's bin should be "subscribed" - // (and thus have only one subscription) - if subbin != bin || len(fakeSubsForPeer) != 1 { - t.Fatalf("Did not get expected subscription for bin < depth; bin of peer %s: %d, subscription: %d", peer, bin, subbin) - } - } - } else { //if the peer's bin is equal or higher than the kademlia depth... - // (iterate all (fake) subscriptions) - for i, subbin := range fakeSubsForPeer { - // ...each bin from the peer's bin number up to k.MaxProxDisplay should be "subscribed" - // as we start from depth we can use the iteration index to check - if subbin != i+kdepth { - t.Fatalf("Did not get expected subscription for bin > depth; bin of peer %s: %d, subscription: %d", peer, bin, subbin) - } - // the last "subscription" should be k.MaxProxDisplay - if i == len(fakeSubsForPeer)-1 && subbin != k.MaxProxDisplay { - t.Fatalf("Expected last subscription to be: %d, but is: %d", k.MaxProxDisplay, subbin) - } - } - } - } - } - // print some output - for p, subs := range fakeSubscriptions { - log.Debug(fmt.Sprintf("Peer %s has the following fake subscriptions: ", p)) - for _, bin := range subs { - log.Debug(fmt.Sprintf("%d,", bin)) - } - } -} - -// TestGetSubscriptions is a unit test for the api.GetPeerSubscriptions() function -func TestGetSubscriptions(t *testing.T) { +// TestGetServerSubscriptions is a unit test for the api.GetPeerServerSubscriptions() function +func TestGetServerSubscriptions(t *testing.T) { // create an amount of dummy peers testPeerCount := 8 // every peer will have this amount of dummy servers @@ -1135,7 +975,7 @@ func TestGetSubscriptions(t *testing.T) { r := &Registry{} api := NewAPI(r) // call once, at this point should be empty - regs := api.GetPeerSubscriptions() + regs := api.GetPeerServerSubscriptions() if len(regs) != 0 { t.Fatal("Expected subscription count to be 0, but it is not") } @@ -1159,7 +999,7 @@ func TestGetSubscriptions(t *testing.T) { r.peers = peerMap // call the subscriptions again - regs = api.GetPeerSubscriptions() + regs = api.GetPeerServerSubscriptions() // count how many (fake) subscriptions there are cnt := 0 for _, reg := range regs { @@ -1175,11 +1015,11 @@ func TestGetSubscriptions(t *testing.T) { } /* -TestGetSubscriptionsRPC sets up a simulation network of `nodeCount` nodes, +TestGetServerSubscriptionsRPC sets up a simulation network of `nodeCount` nodes, starts the simulation, waits for SyncUpdateDelay in order to kick off stream registration, then tests that there are subscriptions. */ -func TestGetSubscriptionsRPC(t *testing.T) { +func TestGetServerSubscriptionsRPC(t *testing.T) { if testutil.RaceEnabled && os.Getenv("TRAVIS") == "true" { t.Skip("flaky with -race on Travis") @@ -1206,15 +1046,13 @@ func TestGetSubscriptionsRPC(t *testing.T) { defer func() { subscriptionFunc = doRequestSubscription }() // we use this subscriptionFunc for this test: just increases count and calls the actual subscription - subscriptionFunc = func(r *Registry, p *network.Peer, bin uint8, subs map[enode.ID]map[Stream]struct{}) bool { + subscriptionFunc = func(r *Registry, id enode.ID, bin uint8) error { // syncing starts after syncUpdateDelay and loops after that Duration; we only want to count at the first iteration // in the first iteration, subs will be empty (no existing subscriptions), thus we can use this check // this avoids flakyness - if len(subs) == 0 { - expectedMsgCount.inc() - } - doRequestSubscription(r, p, bin, subs) - return true + expectedMsgCount.inc() + doRequestSubscription(r, id, bin) + return nil } // create a standard sim sim := simulation.New(map[string]simulation.ServiceFunc{ @@ -1226,7 +1064,6 @@ func TestGetSubscriptionsRPC(t *testing.T) { // configure so that sync registrations actually happen r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ - Retrieval: RetrievalEnabled, Syncing: SyncingAutoSubscribe, //enable sync registrations SyncUpdateDelay: syncUpdateDelay, }, nil) @@ -1321,7 +1158,7 @@ func TestGetSubscriptionsRPC(t *testing.T) { //ask it for subscriptions pstreams := make(map[string][]string) - err = client.Call(&pstreams, "stream_getPeerSubscriptions") + err = client.Call(&pstreams, "stream_getPeerServerSubscriptions") if err != nil { return fmt.Errorf("client call stream_getPeerSubscriptions: %v", err) } diff --git a/swarm/network/stream/syncer.go b/swarm/network/stream/syncer.go index 5f03dcff7..9bde39550 100644 --- a/swarm/network/stream/syncer.go +++ b/swarm/network/stream/syncer.go @@ -22,6 +22,7 @@ import ( "time" "github.com/ethereum/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/log" "github.com/ethereum/go-ethereum/swarm/storage" ) @@ -35,27 +36,29 @@ const ( // * live request delivery with or without checkback // * (live/non-live historical) chunk syncing per proximity bin type SwarmSyncerServer struct { - po uint8 - store storage.SyncChunkStore - quit chan struct{} + correlateId string //used for logging + po uint8 + netStore *storage.NetStore + quit chan struct{} } // NewSwarmSyncerServer is constructor for SwarmSyncerServer -func NewSwarmSyncerServer(po uint8, syncChunkStore storage.SyncChunkStore) (*SwarmSyncerServer, error) { +func NewSwarmSyncerServer(po uint8, netStore *storage.NetStore, correlateId string) (*SwarmSyncerServer, error) { return &SwarmSyncerServer{ - po: po, - store: syncChunkStore, - quit: make(chan struct{}), + correlateId: correlateId, + po: po, + netStore: netStore, + quit: make(chan struct{}), }, nil } -func RegisterSwarmSyncerServer(streamer *Registry, syncChunkStore storage.SyncChunkStore) { - streamer.RegisterServerFunc("SYNC", func(_ *Peer, t string, _ bool) (Server, error) { +func RegisterSwarmSyncerServer(streamer *Registry, netStore *storage.NetStore) { + streamer.RegisterServerFunc("SYNC", func(p *Peer, t string, _ bool) (Server, error) { po, err := ParseSyncBinKey(t) if err != nil { return nil, err } - return NewSwarmSyncerServer(po, syncChunkStore) + return NewSwarmSyncerServer(po, netStore, p.ID().String()+"|"+string(po)) }) // streamer.RegisterServerFunc(stream, func(p *Peer) (Server, error) { // return NewOutgoingProvableSwarmSyncer(po, db) @@ -69,130 +72,138 @@ func (s *SwarmSyncerServer) Close() { // GetData retrieves the actual chunk from netstore func (s *SwarmSyncerServer) GetData(ctx context.Context, key []byte) ([]byte, error) { - chunk, err := s.store.Get(ctx, storage.Address(key)) + ch, err := s.netStore.Get(ctx, chunk.ModeGetSync, storage.Address(key)) if err != nil { return nil, err } - return chunk.Data(), nil + return ch.Data(), nil } // SessionIndex returns current storage bin (po) index. func (s *SwarmSyncerServer) SessionIndex() (uint64, error) { - return s.store.BinIndex(s.po), nil + return s.netStore.LastPullSubscriptionBinID(s.po) } -// GetBatch retrieves the next batch of hashes from the dbstore +// SetNextBatch retrieves the next batch of hashes from the localstore. +// It expects a range of bin IDs, both ends inclusive in syncing, and returns +// concatenated byte slice of chunk addresses and bin IDs of the first and +// the last one in that slice. The batch may have up to BatchSize number of +// chunk addresses. If at least one chunk is added to the batch and no new chunks +// are added in batchTimeout period, the batch will be returned. This function +// will block until new chunks are received from localstore pull subscription. func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) { - var batch []byte - i := 0 - - var ticker *time.Ticker - defer func() { - if ticker != nil { - ticker.Stop() + //TODO: maybe add unit test for intervals usage in netstore/localstore together with SwarmSyncerServer? + if from > 0 { + from-- + } + batchStart := time.Now() + descriptors, stop := s.netStore.SubscribePull(context.Background(), s.po, from, to) + defer stop() + + const batchTimeout = 2 * time.Second + + var ( + batch []byte + batchSize int + batchStartID *uint64 + batchEndID uint64 + timer *time.Timer + timerC <-chan time.Time + ) + + defer func(start time.Time) { + metrics.GetOrRegisterResettingTimer("syncer.set-next-batch.total-time", nil).UpdateSince(start) + metrics.GetOrRegisterCounter("syncer.set-next-batch.batch-size", nil).Inc(int64(batchSize)) + if timer != nil { + timer.Stop() } - }() - var wait bool - for { - if wait { - if ticker == nil { - ticker = time.NewTicker(1000 * time.Millisecond) + }(batchStart) + + for iterate := true; iterate; { + select { + case d, ok := <-descriptors: + if !ok { + iterate = false + break } - select { - case <-ticker.C: - case <-s.quit: - return nil, 0, 0, nil, nil + batch = append(batch, d.Address[:]...) + // This is the most naive approach to label the chunk as synced + // allowing it to be garbage collected. A proper way requires + // validating that the chunk is successfully stored by the peer. + err := s.netStore.Set(context.Background(), chunk.ModeSetSync, d.Address) + if err != nil { + metrics.GetOrRegisterCounter("syncer.set-next-batch.set-sync-err", nil).Inc(1) + log.Debug("syncer pull subscription - err setting chunk as synced", "correlateId", s.correlateId, "err", err) + return nil, 0, 0, nil, err } - } - - metrics.GetOrRegisterCounter("syncer.setnextbatch.iterator", nil).Inc(1) - err := s.store.Iterator(from, to, s.po, func(key storage.Address, idx uint64) bool { - select { - case <-s.quit: - return false - default: + batchSize++ + if batchStartID == nil { + // set batch start id only if + // this is the first iteration + batchStartID = &d.BinID } - batch = append(batch, key[:]...) - i++ - to = idx - return i < BatchSize - }) - if err != nil { - return nil, 0, 0, nil, err - } - if len(batch) > 0 { - break + batchEndID = d.BinID + if batchSize >= BatchSize { + iterate = false + metrics.GetOrRegisterCounter("syncer.set-next-batch.full-batch", nil).Inc(1) + log.Debug("syncer pull subscription - batch size reached", "correlateId", s.correlateId, "batchSize", batchSize, "batchStartID", batchStartID, "batchEndID", batchEndID) + } + if timer == nil { + timer = time.NewTimer(batchTimeout) + } else { + log.Debug("syncer pull subscription - stopping timer", "correlateId", s.correlateId) + if !timer.Stop() { + <-timer.C + } + log.Debug("syncer pull subscription - channel drained, resetting timer", "correlateId", s.correlateId) + timer.Reset(batchTimeout) + } + timerC = timer.C + case <-timerC: + // return batch if new chunks are not + // received after some time + iterate = false + metrics.GetOrRegisterCounter("syncer.set-next-batch.timer-expire", nil).Inc(1) + log.Debug("syncer pull subscription timer expired", "correlateId", s.correlateId, "batchSize", batchSize, "batchStartID", batchStartID, "batchEndID", batchEndID) + case <-s.quit: + iterate = false + log.Debug("syncer pull subscription - quit received", "correlateId", s.correlateId, "batchSize", batchSize, "batchStartID", batchStartID, "batchEndID", batchEndID) } - wait = true } - - log.Trace("Swarm syncer offer batch", "po", s.po, "len", i, "from", from, "to", to, "current store count", s.store.BinIndex(s.po)) - return batch, from, to, nil, nil + if batchStartID == nil { + // if batch start id is not set, return 0 + batchStartID = new(uint64) + } + return batch, *batchStartID, batchEndID, nil, nil } // SwarmSyncerClient type SwarmSyncerClient struct { - store storage.SyncChunkStore - peer *Peer - stream Stream + netStore *storage.NetStore + peer *Peer + stream Stream } // NewSwarmSyncerClient is a contructor for provable data exchange syncer -func NewSwarmSyncerClient(p *Peer, store storage.SyncChunkStore, stream Stream) (*SwarmSyncerClient, error) { +func NewSwarmSyncerClient(p *Peer, netStore *storage.NetStore, stream Stream) (*SwarmSyncerClient, error) { return &SwarmSyncerClient{ - store: store, - peer: p, - stream: stream, + netStore: netStore, + peer: p, + stream: stream, }, nil } -// // NewIncomingProvableSwarmSyncer is a contructor for provable data exchange syncer -// func NewIncomingProvableSwarmSyncer(po int, priority int, index uint64, sessionAt uint64, intervals []uint64, sessionRoot storage.Address, chunker *storage.PyramidChunker, store storage.ChunkStore, p Peer) *SwarmSyncerClient { -// retrieveC := make(storage.Chunk, chunksCap) -// RunChunkRequestor(p, retrieveC) -// storeC := make(storage.Chunk, chunksCap) -// RunChunkStorer(store, storeC) -// s := &SwarmSyncerClient{ -// po: po, -// priority: priority, -// sessionAt: sessionAt, -// start: index, -// end: index, -// nextC: make(chan struct{}, 1), -// intervals: intervals, -// sessionRoot: sessionRoot, -// sessionReader: chunker.Join(sessionRoot, retrieveC), -// retrieveC: retrieveC, -// storeC: storeC, -// } -// return s -// } - -// // StartSyncing is called on the Peer to start the syncing process -// // the idea is that it is called only after kademlia is close to healthy -// func StartSyncing(s *Streamer, peerId enode.ID, po uint8, nn bool) { -// lastPO := po -// if nn { -// lastPO = maxPO -// } -// -// for i := po; i <= lastPO; i++ { -// s.Subscribe(peerId, "SYNC", newSyncLabel("LIVE", po), 0, 0, High, true) -// s.Subscribe(peerId, "SYNC", newSyncLabel("HISTORY", po), 0, 0, Mid, false) -// } -// } - // RegisterSwarmSyncerClient registers the client constructor function for // to handle incoming sync streams -func RegisterSwarmSyncerClient(streamer *Registry, store storage.SyncChunkStore) { +func RegisterSwarmSyncerClient(streamer *Registry, netStore *storage.NetStore) { streamer.RegisterClientFunc("SYNC", func(p *Peer, t string, live bool) (Client, error) { - return NewSwarmSyncerClient(p, store, NewStream("SYNC", t, live)) + return NewSwarmSyncerClient(p, netStore, NewStream("SYNC", t, live)) }) } // NeedData func (s *SwarmSyncerClient) NeedData(ctx context.Context, key []byte) (wait func(context.Context) error) { - return s.store.FetchFunc(ctx, key) + return s.netStore.FetchFunc(ctx, key) } // BatchDone diff --git a/swarm/network/stream/syncer_test.go b/swarm/network/stream/syncer_test.go index 07586714e..b787c7bb8 100644 --- a/swarm/network/stream/syncer_test.go +++ b/swarm/network/stream/syncer_test.go @@ -21,22 +21,20 @@ import ( "errors" "fmt" "io/ioutil" - "math" "os" "sync" "testing" "time" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/simulations/adapters" + "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/log" "github.com/ethereum/go-ethereum/swarm/network" "github.com/ethereum/go-ethereum/swarm/network/simulation" "github.com/ethereum/go-ethereum/swarm/state" "github.com/ethereum/go-ethereum/swarm/storage" - "github.com/ethereum/go-ethereum/swarm/storage/mock" "github.com/ethereum/go-ethereum/swarm/testutil" ) @@ -55,24 +53,6 @@ func TestSyncerSimulation(t *testing.T) { } } -func createMockStore(globalStore mock.GlobalStorer, id enode.ID, addr *network.BzzAddr) (lstore storage.ChunkStore, datadir string, err error) { - address := common.BytesToAddress(id.Bytes()) - mockStore := globalStore.NewNodeStore(address) - params := storage.NewDefaultLocalStoreParams() - - datadir, err = ioutil.TempDir("", "localMockStore-"+id.TerminalString()) - if err != nil { - return nil, "", err - } - params.Init(datadir) - params.BaseKey = addr.Over() - lstore, err = storage.NewLocalStore(params, mockStore) - if err != nil { - return nil, "", err - } - return lstore, datadir, nil -} - func testSyncBetweenNodes(t *testing.T, nodes, chunkCount int, skipCheck bool, po uint8) { sim := simulation.New(map[string]simulation.ServiceFunc{ @@ -103,7 +83,6 @@ func testSyncBetweenNodes(t *testing.T, nodes, chunkCount int, skipCheck bool, p } r := NewRegistry(addr.ID(), delivery, netStore, store, &RegistryOptions{ - Retrieval: RetrievalDisabled, Syncing: SyncingAutoSubscribe, SkipCheck: skipCheck, }, nil) @@ -181,17 +160,32 @@ func testSyncBetweenNodes(t *testing.T, nodes, chunkCount int, skipCheck bool, p if i < nodes-1 { hashCounts[i] = hashCounts[i+1] } - item, ok := sim.NodeItem(nodeIDs[i], bucketKeyDB) + item, ok := sim.NodeItem(nodeIDs[i], bucketKeyStore) if !ok { return fmt.Errorf("No DB") } - netStore := item.(*storage.NetStore) - netStore.Iterator(0, math.MaxUint64, po, func(addr storage.Address, index uint64) bool { - hashes[i] = append(hashes[i], addr) - totalHashes++ - hashCounts[i]++ - return true - }) + store := item.(chunk.Store) + until, err := store.LastPullSubscriptionBinID(po) + if err != nil { + return err + } + if until > 0 { + c, _ := store.SubscribePull(ctx, po, 0, until) + for iterate := true; iterate; { + select { + case cd, ok := <-c: + if !ok { + iterate = false + break + } + hashes[i] = append(hashes[i], cd.Address) + totalHashes++ + hashCounts[i]++ + case <-ctx.Done(): + return ctx.Err() + } + } + } } var total, found int for _, node := range nodeIDs { @@ -200,12 +194,12 @@ func testSyncBetweenNodes(t *testing.T, nodes, chunkCount int, skipCheck bool, p for j := i; j < nodes; j++ { total += len(hashes[j]) for _, key := range hashes[j] { - item, ok := sim.NodeItem(nodeIDs[j], bucketKeyDB) + item, ok := sim.NodeItem(nodeIDs[j], bucketKeyStore) if !ok { return fmt.Errorf("No DB") } - db := item.(*storage.NetStore) - _, err := db.Get(ctx, key) + db := item.(chunk.Store) + _, err := db.Get(ctx, chunk.ModeGetRequest, key) if err == nil { found++ } @@ -216,7 +210,7 @@ func testSyncBetweenNodes(t *testing.T, nodes, chunkCount int, skipCheck bool, p if total == found && total > 0 { return nil } - return fmt.Errorf("Total not equallying found: total is %d", total) + return fmt.Errorf("Total not equallying found %v: total is %d", found, total) }) if result.Error != nil { @@ -237,8 +231,7 @@ func TestSameVersionID(t *testing.T) { } r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ - Retrieval: RetrievalDisabled, - Syncing: SyncingAutoSubscribe, + Syncing: SyncingAutoSubscribe, }, nil) bucket.Store(bucketKeyRegistry, r) @@ -301,8 +294,7 @@ func TestDifferentVersionID(t *testing.T) { } r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ - Retrieval: RetrievalDisabled, - Syncing: SyncingAutoSubscribe, + Syncing: SyncingAutoSubscribe, }, nil) bucket.Store(bucketKeyRegistry, r) |