diff options
-rw-r--r-- | swarm/docs/migration-v0.3-to-v0.4.md | 31 | ||||
-rw-r--r-- | swarm/network/protocol.go | 4 | ||||
-rw-r--r-- | swarm/network/protocol_test.go | 2 | ||||
-rw-r--r-- | swarm/network/stream/syncer.go | 13 | ||||
-rw-r--r-- | swarm/storage/feed/cacheentry.go | 2 | ||||
-rw-r--r-- | swarm/storage/feed/handler.go | 21 | ||||
-rw-r--r-- | swarm/storage/feed/handler_test.go | 8 | ||||
-rw-r--r-- | swarm/storage/feed/id_test.go | 4 | ||||
-rw-r--r-- | swarm/storage/feed/lookup/algorithm_fluzcapacitor.go | 63 | ||||
-rw-r--r-- | swarm/storage/feed/lookup/algorithm_longearth.go | 185 | ||||
-rw-r--r-- | swarm/storage/feed/lookup/epoch.go | 2 | ||||
-rw-r--r-- | swarm/storage/feed/lookup/lookup.go | 75 | ||||
-rw-r--r-- | swarm/storage/feed/lookup/lookup_test.go | 646 | ||||
-rw-r--r-- | swarm/storage/feed/lookup/store_test.go | 154 | ||||
-rw-r--r-- | swarm/storage/feed/lookup/timesim_test.go | 128 | ||||
-rw-r--r-- | swarm/storage/feed/query_test.go | 2 | ||||
-rw-r--r-- | swarm/storage/feed/request_test.go | 2 | ||||
-rw-r--r-- | swarm/storage/feed/update_test.go | 2 |
18 files changed, 993 insertions, 351 deletions
diff --git a/swarm/docs/migration-v0.3-to-v0.4.md b/swarm/docs/migration-v0.3-to-v0.4.md new file mode 100644 index 000000000..cebc286c1 --- /dev/null +++ b/swarm/docs/migration-v0.3-to-v0.4.md @@ -0,0 +1,31 @@ +Swarm DB migration notes +========================= +Swarm `v0.4` introduces major changes to the existing codebase. Among other things, the storage layer has been rewritten to be more modular and flexible +in a manner that will accomodate for our future needs. Since Swarm at this point does not provide any storage guarantees, we have made the decision to not impose any migrations on our public cluster nor on our users. What this essentially means is that local storage will be purged on `v0.4`. We have nevertheless, provided a procedure below for those of you running private clusters and would like to migrate the data to the new local storage format. + +You are highly encouraged to report to us any bugs or problems caused by running the migration steps below. + +**Note**: we highly recommend you run the commands below with `--verbosity 5` flag and open an issue with the relevant terminal output in case something goes wrong. + +**Important**: since you would be creating an export of your local store, the potential disk usage might peak at `x2-x3` times the normal Swarm data folder size. Please make sure you have enough disk space, backup mediums or other form of local/network attached storage _before_ executing the following steps! + +**Important**: when trying to run Swarm with an old local store format, the Swarm binary will refuse to start showing an error message. + +You will need the following information for the migration procedure: +1. Your `datadir` path. This is indicated with the `--datadir` flag when running Swarm. If you do not specify this flag, the `datadir` will reside by default on `$HOME/.ethereum`. +2. Your chunk directory location. This would normally be located in your `datadir/swarm/bzz-<your bzz account>/chunks`. We will refer to this as `chunkDir` below. +3. Your `bzzAddr`. This is _not_ your `--bzzaccount`! You can find your `bzzAddr` when starting Swarm by looking for the following line: +``` +INFO [03-21|17:25:04.791] Swarm network started bzzaddr=ca1e9f3938cc1425c6061b96ad9eb93e134dfe8734ad490164ef20af9d1cf59c +``` + +The migration process is done in the following manner: +1. Try to run the updated Swarm binary, it should complain about the local store format and exit. If it does - execute the following steps: +2. `$ swarm --verbosity 5 db export <chunkDir> <exportLocation>/<exportFilename>.tar <bzzAddr>` +3. Move or Remove your existing `chunkDir` +4. Run the new Swarm binary as your would start your Swarm node normally. The binary should now load normally and not complain. This step creates a new empty chunk store. Please shut down the node after it starts correctly. +5. `$ swarm --verbosity 5 db import --legacy <chunkDir> <exportLocation>/<exportFilename>.tar <bzzAddr>` +6. Wait patientally for the `Imported X chunks successfully` message. +7. Start your Swarm node as you normally would +8. Have a beer + diff --git a/swarm/network/protocol.go b/swarm/network/protocol.go index ad3f8df8f..dd8a86c0b 100644 --- a/swarm/network/protocol.go +++ b/swarm/network/protocol.go @@ -33,7 +33,7 @@ import ( ) const ( - DefaultNetworkID = 3 + DefaultNetworkID = 4 // timeout for waiting bzzHandshakeTimeout = 3000 * time.Millisecond ) @@ -43,7 +43,7 @@ var DefaultTestNetworkID = rand.Uint64() // BzzSpec is the spec of the generic swarm handshake var BzzSpec = &protocols.Spec{ Name: "bzz", - Version: 8, + Version: 9, MaxMsgSize: 10 * 1024 * 1024, Messages: []interface{}{ HandshakeMsg{}, diff --git a/swarm/network/protocol_test.go b/swarm/network/protocol_test.go index 737ad0784..616abde9e 100644 --- a/swarm/network/protocol_test.go +++ b/swarm/network/protocol_test.go @@ -36,7 +36,7 @@ import ( ) const ( - TestProtocolVersion = 8 + TestProtocolVersion = 9 ) var TestProtocolNetworkID = DefaultTestNetworkID diff --git a/swarm/network/stream/syncer.go b/swarm/network/stream/syncer.go index 9bde39550..7a61950ed 100644 --- a/swarm/network/stream/syncer.go +++ b/swarm/network/stream/syncer.go @@ -18,6 +18,7 @@ package stream import ( "context" + "fmt" "strconv" "time" @@ -58,7 +59,7 @@ func RegisterSwarmSyncerServer(streamer *Registry, netStore *storage.NetStore) { if err != nil { return nil, err } - return NewSwarmSyncerServer(po, netStore, p.ID().String()+"|"+string(po)) + return NewSwarmSyncerServer(po, netStore, fmt.Sprintf("%s|%d", p.ID(), po)) }) // streamer.RegisterServerFunc(stream, func(p *Peer) (Server, error) { // return NewOutgoingProvableSwarmSyncer(po, db) @@ -146,16 +147,16 @@ func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint6 if batchSize >= BatchSize { iterate = false metrics.GetOrRegisterCounter("syncer.set-next-batch.full-batch", nil).Inc(1) - log.Debug("syncer pull subscription - batch size reached", "correlateId", s.correlateId, "batchSize", batchSize, "batchStartID", batchStartID, "batchEndID", batchEndID) + log.Trace("syncer pull subscription - batch size reached", "correlateId", s.correlateId, "batchSize", batchSize, "batchStartID", batchStartID, "batchEndID", batchEndID) } if timer == nil { timer = time.NewTimer(batchTimeout) } else { - log.Debug("syncer pull subscription - stopping timer", "correlateId", s.correlateId) + log.Trace("syncer pull subscription - stopping timer", "correlateId", s.correlateId) if !timer.Stop() { <-timer.C } - log.Debug("syncer pull subscription - channel drained, resetting timer", "correlateId", s.correlateId) + log.Trace("syncer pull subscription - channel drained, resetting timer", "correlateId", s.correlateId) timer.Reset(batchTimeout) } timerC = timer.C @@ -164,10 +165,10 @@ func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint6 // received after some time iterate = false metrics.GetOrRegisterCounter("syncer.set-next-batch.timer-expire", nil).Inc(1) - log.Debug("syncer pull subscription timer expired", "correlateId", s.correlateId, "batchSize", batchSize, "batchStartID", batchStartID, "batchEndID", batchEndID) + log.Trace("syncer pull subscription timer expired", "correlateId", s.correlateId, "batchSize", batchSize, "batchStartID", batchStartID, "batchEndID", batchEndID) case <-s.quit: iterate = false - log.Debug("syncer pull subscription - quit received", "correlateId", s.correlateId, "batchSize", batchSize, "batchStartID", batchStartID, "batchEndID", batchEndID) + log.Trace("syncer pull subscription - quit received", "correlateId", s.correlateId, "batchSize", batchSize, "batchStartID", batchStartID, "batchEndID", batchEndID) } } if batchStartID == nil { diff --git a/swarm/storage/feed/cacheentry.go b/swarm/storage/feed/cacheentry.go index be42008e9..1c7e22619 100644 --- a/swarm/storage/feed/cacheentry.go +++ b/swarm/storage/feed/cacheentry.go @@ -27,7 +27,7 @@ import ( const ( hasherCount = 8 feedsHashAlgorithm = storage.SHA3Hash - defaultRetrieveTimeout = 100 * time.Millisecond + defaultRetrieveTimeout = 1000 * time.Millisecond ) // cacheEntry caches the last known update of a specific Swarm feed. diff --git a/swarm/storage/feed/handler.go b/swarm/storage/feed/handler.go index 0f6f2ba34..98ed7fa99 100644 --- a/swarm/storage/feed/handler.go +++ b/swarm/storage/feed/handler.go @@ -23,6 +23,7 @@ import ( "context" "fmt" "sync" + "sync/atomic" "github.com/ethereum/go-ethereum/swarm/chunk" @@ -178,12 +179,12 @@ func (h *Handler) Lookup(ctx context.Context, query *Query) (*cacheEntry, error) return nil, NewError(ErrInit, "Call Handler.SetStore() before performing lookups") } - var readCount int + var readCount int32 // Invoke the lookup engine. // The callback will be called every time the lookup algorithm needs to guess requestPtr, err := lookup.Lookup(ctx, timeLimit, query.Hint, func(ctx context.Context, epoch lookup.Epoch, now uint64) (interface{}, error) { - readCount++ + atomic.AddInt32(&readCount, 1) id := ID{ Feed: query.Feed, Epoch: epoch, @@ -228,17 +229,17 @@ func (h *Handler) updateCache(request *Request) (*cacheEntry, error) { updateAddr := request.Addr() log.Trace("feed cache update", "topic", request.Topic.Hex(), "updateaddr", updateAddr, "epoch time", request.Epoch.Time, "epoch level", request.Epoch.Level) - feedUpdate := h.get(&request.Feed) - if feedUpdate == nil { - feedUpdate = &cacheEntry{} - h.set(&request.Feed, feedUpdate) + entry := h.get(&request.Feed) + if entry == nil { + entry = &cacheEntry{} + h.set(&request.Feed, entry) } // update our rsrcs entry map - feedUpdate.lastKey = updateAddr - feedUpdate.Update = request.Update - feedUpdate.Reader = bytes.NewReader(feedUpdate.data) - return feedUpdate, nil + entry.lastKey = updateAddr + entry.Update = request.Update + entry.Reader = bytes.NewReader(entry.data) + return entry, nil } // Update publishes a feed update diff --git a/swarm/storage/feed/handler_test.go b/swarm/storage/feed/handler_test.go index c4f6fe689..3d8213e60 100644 --- a/swarm/storage/feed/handler_test.go +++ b/swarm/storage/feed/handler_test.go @@ -177,8 +177,8 @@ func TestFeedsHandler(t *testing.T) { if err != nil { t.Fatal(err) } - if request.Epoch.Base() != 0 || request.Epoch.Level != 22 { - t.Fatalf("Expected epoch base time to be %d, got %d. Expected epoch level to be %d, got %d", 0, request.Epoch.Base(), 22, request.Epoch.Level) + if request.Epoch.Base() != 0 || request.Epoch.Level != 28 { + t.Fatalf("Expected epoch base time to be %d, got %d. Expected epoch level to be %d, got %d", 0, request.Epoch.Base(), 28, request.Epoch.Level) } data = []byte(updates[3]) request.SetData(data) @@ -213,8 +213,8 @@ func TestFeedsHandler(t *testing.T) { if !bytes.Equal(update2.data, []byte(updates[len(updates)-1])) { t.Fatalf("feed update data was %v, expected %v", string(update2.data), updates[len(updates)-1]) } - if update2.Level != 22 { - t.Fatalf("feed update epoch level was %d, expected 22", update2.Level) + if update2.Level != 28 { + t.Fatalf("feed update epoch level was %d, expected 28", update2.Level) } if update2.Base() != 0 { t.Fatalf("feed update epoch base time was %d, expected 0", update2.Base()) diff --git a/swarm/storage/feed/id_test.go b/swarm/storage/feed/id_test.go index e561ff9b4..8a820abfe 100644 --- a/swarm/storage/feed/id_test.go +++ b/swarm/storage/feed/id_test.go @@ -16,11 +16,11 @@ func getTestID() *ID { func TestIDAddr(t *testing.T) { id := getTestID() updateAddr := id.Addr() - compareByteSliceToExpectedHex(t, "updateAddr", updateAddr, "0x8b24583ec293e085f4c78aaee66d1bc5abfb8b4233304d14a349afa57af2a783") + compareByteSliceToExpectedHex(t, "updateAddr", updateAddr, "0x842d0a81987b9755dfeaa5558f5c134c1c0af48b6545005cac7b533d9411453a") } func TestIDSerializer(t *testing.T) { - testBinarySerializerRecovery(t, getTestID(), "0x776f726c64206e657773207265706f72742c20657665727920686f7572000000876a8936a7cd0b79ef0735ad0896c1afe278781ce803000000000019") + testBinarySerializerRecovery(t, getTestID(), "0x776f726c64206e657773207265706f72742c20657665727920686f7572000000876a8936a7cd0b79ef0735ad0896c1afe278781ce80300000000001f") } func TestIDLengthCheck(t *testing.T) { diff --git a/swarm/storage/feed/lookup/algorithm_fluzcapacitor.go b/swarm/storage/feed/lookup/algorithm_fluzcapacitor.go new file mode 100644 index 000000000..3840bd0fd --- /dev/null +++ b/swarm/storage/feed/lookup/algorithm_fluzcapacitor.go @@ -0,0 +1,63 @@ +package lookup + +import "context" + +// FluzCapacitorAlgorithm works by narrowing the epoch search area if an update is found +// going back and forth in time +// First, it will attempt to find an update where it should be now if the hint was +// really the last update. If that lookup fails, then the last update must be either the hint itself +// or the epochs right below. If however, that lookup succeeds, then the update must be +// that one or within the epochs right below. +// see the guide for a more graphical representation +func FluzCapacitorAlgorithm(ctx context.Context, now uint64, hint Epoch, read ReadFunc) (value interface{}, err error) { + var lastFound interface{} + var epoch Epoch + if hint == NoClue { + hint = worstHint + } + + t := now + + for { + epoch = GetNextEpoch(hint, t) + value, err = read(ctx, epoch, now) + if err != nil { + return nil, err + } + if value != nil { + lastFound = value + if epoch.Level == LowestLevel || epoch.Equals(hint) { + return value, nil + } + hint = epoch + continue + } + if epoch.Base() == hint.Base() { + if lastFound != nil { + return lastFound, nil + } + // we have reached the hint itself + if hint == worstHint { + return nil, nil + } + // check it out + value, err = read(ctx, hint, now) + if err != nil { + return nil, err + } + if value != nil { + return value, nil + } + // bad hint. + t = hint.Base() + hint = worstHint + continue + } + base := epoch.Base() + if base == 0 { + return nil, nil + } + t = base - 1 + } + +} diff --git a/swarm/storage/feed/lookup/algorithm_longearth.go b/swarm/storage/feed/lookup/algorithm_longearth.go new file mode 100644 index 000000000..d0342f67c --- /dev/null +++ b/swarm/storage/feed/lookup/algorithm_longearth.go @@ -0,0 +1,185 @@ +package lookup + +import ( + "context" + "sync/atomic" + "time" +) + +type stepFunc func(ctx context.Context, t uint64, hint Epoch) interface{} + +// LongEarthLookaheadDelay is the headstart the lookahead gives R before it launches +var LongEarthLookaheadDelay = 250 * time.Millisecond + +// LongEarthLookbackDelay is the headstart the lookback gives R before it launches +var LongEarthLookbackDelay = 250 * time.Millisecond + +// LongEarthAlgorithm explores possible lookup paths in parallel, pruning paths as soon +// as a more promising lookup path is found. As a result, this lookup algorithm is an order +// of magnitude faster than the FluzCapacitor algorithm, but at the expense of more exploratory reads. +// This algorithm works as follows. On each step, the next epoch is immediately looked up (R) +// and given a head start, while two parallel "steps" are launched a short time after: +// look ahead (A) is the path the algorithm would take if the R lookup returns a value, whereas +// look back (B) is the path the algorithm would take if the R lookup failed. +// as soon as R is actually finished, the A or B paths are pruned depending on the value of R. +// if A returns earlier than R, then R and B read operations can be safely canceled, saving time. +// The maximum number of active read operations is calculated as 2^(timeout/headstart). +// If headstart is infinite, this algorithm behaves as FluzCapacitor. +// timeout is the maximum execution time of the passed `read` function. +// the two head starts can be configured by changing LongEarthLookaheadDelay or LongEarthLookbackDelay +func LongEarthAlgorithm(ctx context.Context, now uint64, hint Epoch, read ReadFunc) (interface{}, error) { + if hint == NoClue { + hint = worstHint + } + + var stepCounter int32 // for debugging, stepCounter allows to give an ID to each step instance + + errc := make(chan struct{}) // errc will help as an error shortcut signal + var gerr error // in case of error, this variable will be set + + var step stepFunc // For efficiency, the algorithm step is defined as a closure + step = func(ctxS context.Context, t uint64, last Epoch) interface{} { + stepID := atomic.AddInt32(&stepCounter, 1) // give an ID to this call instance + trace(stepID, "init: t=%d, last=%s", t, last.String()) + var valueA, valueB, valueR interface{} + + // initialize the three read contexts + ctxR, cancelR := context.WithCancel(ctxS) // will handle the current read operation + ctxA, cancelA := context.WithCancel(ctxS) // will handle the lookahead path + ctxB, cancelB := context.WithCancel(ctxS) // will handle the lookback path + + epoch := GetNextEpoch(last, t) // calculate the epoch to look up in this step instance + + // define the lookAhead function, which will follow the path as if R was successful + lookAhead := func() { + valueA = step(ctxA, t, epoch) // launch the next step, recursively. + if valueA != nil { // if this path is successful, we don't need R or B. + cancelB() + cancelR() + } + } + + // define the lookBack function, which will follow the path as if R was unsuccessful + lookBack := func() { + if epoch.Base() == last.Base() { + return + } + base := epoch.Base() + if base == 0 { + return + } + valueB = step(ctxB, base-1, last) + } + + go func() { //goroutine to read the current epoch (R) + defer cancelR() + var err error + valueR, err = read(ctxR, epoch, now) // read this epoch + if valueR == nil { // if unsuccessful, cancel lookahead, otherwise cancel lookback. + cancelA() + } else { + cancelB() + } + if err != nil && err != context.Canceled { + gerr = err + close(errc) + } + }() + + go func() { // goroutine to give a headstart to R and then launch lookahead. + defer cancelA() + + // if we are at the lowest level or the epoch to look up equals the last one, + // then we cannot lookahead (can't go lower or repeat the same lookup, this would + // cause an infinite loop) + if epoch.Level == LowestLevel || epoch.Equals(last) { + return + } + + // give a head start to R, or launch immediately if R finishes early enough + select { + case <-TimeAfter(LongEarthLookaheadDelay): + lookAhead() + case <-ctxR.Done(): + if valueR != nil { + lookAhead() // only look ahead if R was successful + } + case <-ctxA.Done(): + } + }() + + go func() { // goroutine to give a headstart to R and then launch lookback. + defer cancelB() + + // give a head start to R, or launch immediately if R finishes early enough + select { + case <-TimeAfter(LongEarthLookbackDelay): + lookBack() + case <-ctxR.Done(): + if valueR == nil { + lookBack() // only look back in case R failed + } + case <-ctxB.Done(): + } + }() + + <-ctxA.Done() + if valueA != nil { + trace(stepID, "Returning valueA=%v", valueA) + return valueA + } + + <-ctxR.Done() + if valueR != nil { + trace(stepID, "Returning valueR=%v", valueR) + return valueR + } + <-ctxB.Done() + trace(stepID, "Returning valueB=%v", valueB) + return valueB + } + + var value interface{} + stepCtx, cancel := context.WithCancel(ctx) + + go func() { // launch the root step in its own goroutine to allow cancellation + defer cancel() + value = step(stepCtx, now, hint) + }() + + // wait for the algorithm to finish, but shortcut in case + // of errors + select { + case <-stepCtx.Done(): + case <-errc: + cancel() + return nil, gerr + } + + if ctx.Err() != nil { + return nil, ctx.Err() + } + + if value != nil || hint == worstHint { + return value, nil + } + + // at this point the algorithm did not return a value, + // so we challenge the hint given. + value, err := read(ctx, hint, now) + if err != nil { + return nil, err + } + if value != nil { + return value, nil // hint is valid, return it. + } + + // hint is invalid. Invoke the algorithm + // without hint. + now = hint.Base() + if hint.Level == HighestLevel { + now-- + } + + return LongEarthAlgorithm(ctx, now, NoClue, read) +} diff --git a/swarm/storage/feed/lookup/epoch.go b/swarm/storage/feed/lookup/epoch.go index bafe95477..6d75ba243 100644 --- a/swarm/storage/feed/lookup/epoch.go +++ b/swarm/storage/feed/lookup/epoch.go @@ -87,5 +87,5 @@ func (e *Epoch) Equals(epoch Epoch) bool { // String implements the Stringer interface. func (e *Epoch) String() string { - return fmt.Sprintf("Epoch{Time:%d, Level:%d}", e.Time, e.Level) + return fmt.Sprintf("Epoch{Base: %d, Time:%d, Level:%d}", e.Base(), e.Time, e.Level) } diff --git a/swarm/storage/feed/lookup/lookup.go b/swarm/storage/feed/lookup/lookup.go index 1642c659a..4b233a0e0 100644 --- a/swarm/storage/feed/lookup/lookup.go +++ b/swarm/storage/feed/lookup/lookup.go @@ -20,7 +20,10 @@ so they can be found */ package lookup -import "context" +import ( + "context" + "time" +) const maxuint64 = ^uint64(0) @@ -28,8 +31,8 @@ const maxuint64 = ^uint64(0) const LowestLevel uint8 = 0 // default is 0 (1 second) // HighestLevel sets the lowest frequency the algorithm will operate at, as a power of 2. -// 25 -> 2^25 equals to roughly one year. -const HighestLevel = 25 // default is 25 (~1 year) +// 31 -> 2^31 equals to roughly 38 years. +const HighestLevel = 31 // DefaultLevel sets what level will be chosen to search when there is no hint const DefaultLevel = HighestLevel @@ -43,7 +46,12 @@ type Algorithm func(ctx context.Context, now uint64, hint Epoch, read ReadFunc) // read() will be called on each lookup attempt // Returns an error only if read() returns an error // Returns nil if an update was not found -var Lookup Algorithm = FluzCapacitorAlgorithm +var Lookup Algorithm = LongEarthAlgorithm + +// TimeAfter must point to a function that returns a timer +// This is here so that tests can replace it with +// a mock up timer factory to simulate time deterministically +var TimeAfter = time.After // ReadFunc is a handler called by Lookup each time it attempts to find a value // It should return <nil> if a value is not found @@ -123,61 +131,6 @@ func GetFirstEpoch(now uint64) Epoch { var worstHint = Epoch{Time: 0, Level: 63} -// FluzCapacitorAlgorithm works by narrowing the epoch search area if an update is found -// going back and forth in time -// First, it will attempt to find an update where it should be now if the hint was -// really the last update. If that lookup fails, then the last update must be either the hint itself -// or the epochs right below. If however, that lookup succeeds, then the update must be -// that one or within the epochs right below. -// see the guide for a more graphical representation -func FluzCapacitorAlgorithm(ctx context.Context, now uint64, hint Epoch, read ReadFunc) (value interface{}, err error) { - var lastFound interface{} - var epoch Epoch - if hint == NoClue { - hint = worstHint - } - - t := now - - for { - epoch = GetNextEpoch(hint, t) - value, err = read(ctx, epoch, now) - if err != nil { - return nil, err - } - if value != nil { - lastFound = value - if epoch.Level == LowestLevel || epoch.Equals(hint) { - return value, nil - } - hint = epoch - continue - } - if epoch.Base() == hint.Base() { - if lastFound != nil { - return lastFound, nil - } - // we have reached the hint itself - if hint == worstHint { - return nil, nil - } - // check it out - value, err = read(ctx, hint, now) - if err != nil { - return nil, err - } - if value != nil { - return value, nil - } - // bad hint. - t = hint.Base() - hint = worstHint - continue - } - base := epoch.Base() - if base == 0 { - return nil, nil - } - t = base - 1 - } +var trace = func(id int32, formatString string, a ...interface{}) { + //fmt.Printf("Step ID #%d "+formatString+"\n", append([]interface{}{id}, a...)...) } diff --git a/swarm/storage/feed/lookup/lookup_test.go b/swarm/storage/feed/lookup/lookup_test.go index 60d77b709..b0d132de6 100644 --- a/swarm/storage/feed/lookup/lookup_test.go +++ b/swarm/storage/feed/lookup/lookup_test.go @@ -21,56 +21,55 @@ import ( "fmt" "math/rand" "testing" + "time" - "github.com/ethereum/go-ethereum/swarm/log" "github.com/ethereum/go-ethereum/swarm/storage/feed/lookup" ) -type Data struct { - Payload uint64 - Time uint64 +type AlgorithmInfo struct { + Lookup lookup.Algorithm + Name string } -type Store map[lookup.EpochID]*Data - -func write(store Store, epoch lookup.Epoch, value *Data) { - log.Debug("Write: %d-%d, value='%d'\n", epoch.Base(), epoch.Level, value.Payload) - store[epoch.ID()] = value +var algorithms = []AlgorithmInfo{ + {lookup.FluzCapacitorAlgorithm, "FluzCapacitor"}, + {lookup.LongEarthAlgorithm, "LongEarth"}, } -func update(store Store, last lookup.Epoch, now uint64, value *Data) lookup.Epoch { - epoch := lookup.GetNextEpoch(last, now) - - write(store, epoch, value) +const enablePrintMetrics = false // set to true to display algorithm benchmarking stats - return epoch +func printMetric(metric string, store *Store, elapsed time.Duration) { + if enablePrintMetrics { + fmt.Printf("metric=%s, readcount=%d (successful=%d, failed=%d), cached=%d, canceled=%d, maxSimult=%d, elapsed=%s\n", metric, + store.reads, store.successful, store.failed, store.cacheHits, store.canceled, store.maxSimultaneous, elapsed) + } } const Day = 60 * 60 * 24 const Year = Day * 365 const Month = Day * 30 -func makeReadFunc(store Store, counter *int) lookup.ReadFunc { - return func(ctx context.Context, epoch lookup.Epoch, now uint64) (interface{}, error) { - *counter++ - data := store[epoch.ID()] - var valueStr string - if data != nil { - valueStr = fmt.Sprintf("%d", data.Payload) - } - log.Debug("Read: %d-%d, value='%s'\n", epoch.Base(), epoch.Level, valueStr) - if data != nil && data.Time <= now { - return data, nil - } - return nil, nil - } +// DefaultStoreConfig indicates the time the different read +// operations will take in the simulation +// This allows to measure an algorithm performance relative +// to other +var DefaultStoreConfig = &StoreConfig{ + CacheReadTime: 50 * time.Millisecond, + FailedReadTime: 1000 * time.Millisecond, + SuccessfulReadTime: 500 * time.Millisecond, } +// TestLookup verifies if the last update and intermediates are +// found and if that same last update is found faster if a hint is given func TestLookup(t *testing.T) { + // ### 1.- Initialize stopwatch time sim + stopwatch := NewStopwatch(50 * time.Millisecond) + lookup.TimeAfter = stopwatch.TimeAfter() + defer stopwatch.Stop() - store := make(Store) - readCount := 0 - readFunc := makeReadFunc(store, &readCount) + // ### 2.- Setup mock storage and generate updates + store := NewStore(DefaultStoreConfig) + readFunc := store.MakeReadFunc() // write an update every month for 12 months 3 years ago and then silence for two years now := uint64(1533799046) @@ -83,68 +82,90 @@ func TestLookup(t *testing.T) { Payload: t, //our "payload" will be the timestamp itself. Time: t, } - epoch = update(store, epoch, t, &data) + epoch = store.Update(epoch, t, &data) lastData = &data } - // try to get the last value - - value, err := lookup.Lookup(context.Background(), now, lookup.NoClue, readFunc) - if err != nil { - t.Fatal(err) + // ### 3.- Test all algorithms + for _, algo := range algorithms { + t.Run(algo.Name, func(t *testing.T) { + + store.Reset() // reset the store read counters + + // ### 3.1.- Test how long it takes to find the last update without a hint: + timeElapsedWithoutHint := stopwatch.Measure(func() { + + // try to get the last value + value, err := algo.Lookup(context.Background(), now, lookup.NoClue, readFunc) + if err != nil { + t.Fatal(err) + } + if value != lastData { + t.Fatalf("Expected lookup to return the last written value: %v. Got %v", lastData, value) + } + + }) + printMetric("SIMPLE READ", store, timeElapsedWithoutHint) + + store.Reset() // reset the read counters for the next test + + // ### 3.2.- Test how long it takes to find the last update *with* a hint. + // it should take less time! + timeElapsed := stopwatch.Measure(func() { + // Provide a hint to get a faster lookup. In particular, we give the exact location of the last update + value, err := algo.Lookup(context.Background(), now, epoch, readFunc) + if err != nil { + t.Fatal(err) + } + if value != lastData { + t.Fatalf("Expected lookup to return the last written value: %v. Got %v", lastData, value) + } + }) + printMetric("WITH HINT", store, stopwatch.Elapsed()) + + if timeElapsed > timeElapsedWithoutHint { + t.Fatalf("Expected lookup to complete faster than %s since we provided a hint. Took %s", timeElapsedWithoutHint, timeElapsed) + } + + store.Reset() // reset the read counters for the next test + + // ### 3.3.- try to get an intermediate value + // if we look for a value in, e.g., now - Year*3 + 6*Month, we should get that value + // Since the "payload" is the timestamp itself, we can check this. + expectedTime := now - Year*3 + 6*Month + timeElapsed = stopwatch.Measure(func() { + value, err := algo.Lookup(context.Background(), expectedTime, lookup.NoClue, readFunc) + if err != nil { + t.Fatal(err) + } + + data, ok := value.(*Data) + + if !ok { + t.Fatal("Expected value to contain data") + } + + if data.Time != expectedTime { + t.Fatalf("Expected value timestamp to be %d, got %d", data.Time, expectedTime) + } + }) + printMetric("INTERMEDIATE READ", store, timeElapsed) + }) } - - readCountWithoutHint := readCount - - if value != lastData { - t.Fatalf("Expected lookup to return the last written value: %v. Got %v", lastData, value) - } - - // reset the read count for the next test - readCount = 0 - // Provide a hint to get a faster lookup. In particular, we give the exact location of the last update - value, err = lookup.Lookup(context.Background(), now, epoch, readFunc) - if err != nil { - t.Fatal(err) - } - - if value != lastData { - t.Fatalf("Expected lookup to return the last written value: %v. Got %v", lastData, value) - } - - if readCount > readCountWithoutHint { - t.Fatalf("Expected lookup to complete with fewer or same reads than %d since we provided a hint. Did %d reads.", readCountWithoutHint, readCount) - } - - // try to get an intermediate value - // if we look for a value in now - Year*3 + 6*Month, we should get that value - // Since the "payload" is the timestamp itself, we can check this. - - expectedTime := now - Year*3 + 6*Month - - value, err = lookup.Lookup(context.Background(), expectedTime, lookup.NoClue, readFunc) - if err != nil { - t.Fatal(err) - } - - data, ok := value.(*Data) - - if !ok { - t.Fatal("Expected value to contain data") - } - - if data.Time != expectedTime { - t.Fatalf("Expected value timestamp to be %d, got %d", data.Time, expectedTime) - } - } +// TestOneUpdateAt0 checks if the lookup algorithm can return an update that +// is precisely set at t=0 func TestOneUpdateAt0(t *testing.T) { + // ### 1.- Initialize stopwatch time sim + stopwatch := NewStopwatch(50 * time.Millisecond) + lookup.TimeAfter = stopwatch.TimeAfter() + defer stopwatch.Stop() - store := make(Store) - readCount := 0 + // ### 2.- Setup mock storage and generate updates + store := NewStore(DefaultStoreConfig) + readFunc := store.MakeReadFunc() - readFunc := makeReadFunc(store, &readCount) now := uint64(1533903729) var epoch lookup.Epoch @@ -152,24 +173,37 @@ func TestOneUpdateAt0(t *testing.T) { Payload: 79, Time: 0, } - update(store, epoch, 0, &data) - - value, err := lookup.Lookup(context.Background(), now, lookup.NoClue, readFunc) - if err != nil { - t.Fatal(err) - } - if value != &data { - t.Fatalf("Expected lookup to return the last written value: %v. Got %v", data, value) + store.Update(epoch, 0, &data) //place 1 update in t=0 + + // ### 3.- Test all algorithms + for _, algo := range algorithms { + t.Run(algo.Name, func(t *testing.T) { + store.Reset() // reset the read counters for the next test + timeElapsed := stopwatch.Measure(func() { + value, err := algo.Lookup(context.Background(), now, lookup.NoClue, readFunc) + if err != nil { + t.Fatal(err) + } + if value != &data { + t.Fatalf("Expected lookup to return the last written value: %v. Got %v", data, value) + } + }) + printMetric("SIMPLE", store, timeElapsed) + }) } } -// Tests the update is found even when a bad hint is given +// TestBadHint tests if the update is found even when a bad hint is given func TestBadHint(t *testing.T) { + // ### 1.- Initialize stopwatch time sim + stopwatch := NewStopwatch(50 * time.Millisecond) + lookup.TimeAfter = stopwatch.TimeAfter() + defer stopwatch.Stop() - store := make(Store) - readCount := 0 + // ### 2.- Setup mock storage and generate updates + store := NewStore(DefaultStoreConfig) + readFunc := store.MakeReadFunc() - readFunc := makeReadFunc(store, &readCount) now := uint64(1533903729) var epoch lookup.Epoch @@ -179,7 +213,7 @@ func TestBadHint(t *testing.T) { } // place an update for t=1200 - update(store, epoch, 1200, &data) + store.Update(epoch, 1200, &data) // come up with some evil hint badHint := lookup.Epoch{ @@ -187,21 +221,35 @@ func TestBadHint(t *testing.T) { Time: 1200000000, } - value, err := lookup.Lookup(context.Background(), now, badHint, readFunc) - if err != nil { - t.Fatal(err) - } - if value != &data { - t.Fatalf("Expected lookup to return the last written value: %v. Got %v", data, value) + // ### 3.- Test all algorithms + for _, algo := range algorithms { + t.Run(algo.Name, func(t *testing.T) { + store.Reset() + timeElapsed := stopwatch.Measure(func() { + value, err := algo.Lookup(context.Background(), now, badHint, readFunc) + if err != nil { + t.Fatal(err) + } + if value != &data { + t.Fatalf("Expected lookup to return the last written value: %v. Got %v", data, value) + } + }) + printMetric("SIMPLE", store, timeElapsed) + }) } } -// Tests whether the update is found when the bad hint is exactly below the last update +// TestBadHintNextToUpdate checks whether the update is found when the bad hint is exactly below the last update func TestBadHintNextToUpdate(t *testing.T) { - store := make(Store) - readCount := 0 + // ### 1.- Initialize stopwatch time sim + stopwatch := NewStopwatch(50 * time.Millisecond) + lookup.TimeAfter = stopwatch.TimeAfter() + defer stopwatch.Stop() + + // ### 2.- Setup mock storage and generate updates + store := NewStore(DefaultStoreConfig) + readFunc := store.MakeReadFunc() - readFunc := makeReadFunc(store, &readCount) now := uint64(1533903729) var last *Data @@ -227,7 +275,7 @@ func TestBadHintNextToUpdate(t *testing.T) { Time: 0, } last = &data - epoch = update(store, epoch, 1200000000+i, &data) + epoch = store.Update(epoch, 1200000000+i, &data) } // come up with some evil hint: @@ -237,99 +285,132 @@ func TestBadHintNextToUpdate(t *testing.T) { Time: 1200000005, } - value, err := lookup.Lookup(context.Background(), now, badHint, readFunc) - if err != nil { - t.Fatal(err) - } - if value != last { - t.Fatalf("Expected lookup to return the last written value: %v. Got %v", last, value) + // ### 3.- Test all algorithms + for _, algo := range algorithms { + t.Run(algo.Name, func(t *testing.T) { + store.Reset() // reset read counters for next test + + timeElapsed := stopwatch.Measure(func() { + value, err := algo.Lookup(context.Background(), now, badHint, readFunc) + if err != nil { + t.Fatal(err) + } + if value != last { + t.Fatalf("Expected lookup to return the last written value: %v. Got %v", last, value) + } + }) + printMetric("SIMPLE", store, timeElapsed) + }) } } +// TestContextCancellation checks whether a lookup can be canceled func TestContextCancellation(t *testing.T) { - readFunc := func(ctx context.Context, epoch lookup.Epoch, now uint64) (interface{}, error) { - <-ctx.Done() - return nil, ctx.Err() - } - - ctx, cancel := context.WithCancel(context.Background()) - - errc := make(chan error) - - go func() { - _, err := lookup.Lookup(ctx, 1200000000, lookup.NoClue, readFunc) - errc <- err - }() - - cancel() - - if err := <-errc; err != context.Canceled { - t.Fatalf("Expected lookup to return a context Cancelled error, got %v", err) - } - - // text context cancellation during hint lookup: - ctx, cancel = context.WithCancel(context.Background()) - errc = make(chan error) - someHint := lookup.Epoch{ - Level: 25, - Time: 300, - } - - readFunc = func(ctx context.Context, epoch lookup.Epoch, now uint64) (interface{}, error) { - if epoch == someHint { - go cancel() - <-ctx.Done() - return nil, ctx.Err() - } - return nil, nil - } - - go func() { - _, err := lookup.Lookup(ctx, 301, someHint, readFunc) - errc <- err - }() - - if err := <-errc; err != context.Canceled { - t.Fatalf("Expected lookup to return a context Cancelled error, got %v", err) + // ### 1.- Test all algorithms + for _, algo := range algorithms { + t.Run(algo.Name, func(t *testing.T) { + + // ### 2.1.- Test a simple cancel of an always blocking read function + readFunc := func(ctx context.Context, epoch lookup.Epoch, now uint64) (interface{}, error) { + <-ctx.Done() + return nil, ctx.Err() + } + + ctx, cancel := context.WithCancel(context.Background()) + errc := make(chan error) + + go func() { + _, err := algo.Lookup(ctx, 1200000000, lookup.NoClue, readFunc) + errc <- err + }() + + cancel() //actually cancel the lookup + + if err := <-errc; err != context.Canceled { + t.Fatalf("Expected lookup to return a context canceled error, got %v", err) + } + + // ### 2.2.- Test context cancellation during hint lookup: + ctx, cancel = context.WithCancel(context.Background()) + errc = make(chan error) + someHint := lookup.Epoch{ + Level: 25, + Time: 300, + } + // put up a read function that gets canceled only on hint lookup + readFunc = func(ctx context.Context, epoch lookup.Epoch, now uint64) (interface{}, error) { + if epoch == someHint { + go cancel() + <-ctx.Done() + return nil, ctx.Err() + } + return nil, nil + } + + go func() { + _, err := algo.Lookup(ctx, 301, someHint, readFunc) + errc <- err + }() + + if err := <-errc; err != context.Canceled { + t.Fatalf("Expected lookup to return a context canceled error, got %v", err) + } + }) } } +// TestLookupFail makes sure the lookup function fails on a timely manner +// when there are no updates at all func TestLookupFail(t *testing.T) { + // ### 1.- Initialize stopwatch time sim + stopwatch := NewStopwatch(50 * time.Millisecond) + lookup.TimeAfter = stopwatch.TimeAfter() + defer stopwatch.Stop() - store := make(Store) - readCount := 0 + // ### 2.- Setup mock storage, without adding updates + // don't write anything and try to look up. + // we're testing we don't get stuck in a loop and that the lookup + // function converges in a timely fashion + store := NewStore(DefaultStoreConfig) + readFunc := store.MakeReadFunc() - readFunc := makeReadFunc(store, &readCount) now := uint64(1533903729) - // don't write anything and try to look up. - // we're testing we don't get stuck in a loop + // ### 3.- Test all algorithms + for _, algo := range algorithms { + t.Run(algo.Name, func(t *testing.T) { + store.Reset() - value, err := lookup.Lookup(context.Background(), now, lookup.NoClue, readFunc) - if err != nil { - t.Fatal(err) - } - if value != nil { - t.Fatal("Expected value to be nil, since the update should've failed") - } + stopwatch.Measure(func() { + value, err := algo.Lookup(context.Background(), now, lookup.NoClue, readFunc) + if err != nil { + t.Fatal(err) + } + if value != nil { + t.Fatal("Expected value to be nil, since the update should've failed") + } + }) - expectedReads := now/(1<<lookup.HighestLevel) + 1 - if uint64(readCount) != expectedReads { - t.Fatalf("Expected lookup to fail after %d reads. Did %d reads.", expectedReads, readCount) + printMetric("SIMPLE", store, stopwatch.Elapsed()) + }) } } func TestHighFreqUpdates(t *testing.T) { + // ### 1.- Initialize stopwatch time sim + stopwatch := NewStopwatch(50 * time.Millisecond) + lookup.TimeAfter = stopwatch.TimeAfter() + defer stopwatch.Stop() - store := make(Store) - readCount := 0 + // ### 2.- Setup mock storage and add one update per second + // for the last 1000 seconds: + store := NewStore(DefaultStoreConfig) + readFunc := store.MakeReadFunc() - readFunc := makeReadFunc(store, &readCount) now := uint64(1533903729) - // write an update every second for the last 1000 seconds var epoch lookup.Epoch var lastData *Data @@ -339,103 +420,147 @@ func TestHighFreqUpdates(t *testing.T) { Payload: T, //our "payload" will be the timestamp itself. Time: T, } - epoch = update(store, epoch, T, &data) + epoch = store.Update(epoch, T, &data) lastData = &data } - value, err := lookup.Lookup(context.Background(), lastData.Time, lookup.NoClue, readFunc) - if err != nil { - t.Fatal(err) - } - - if value != lastData { - t.Fatalf("Expected lookup to return the last written value: %v. Got %v", lastData, value) - } - - readCountWithoutHint := readCount - // reset the read count for the next test - readCount = 0 - // Provide a hint to get a faster lookup. In particular, we give the exact location of the last update - value, err = lookup.Lookup(context.Background(), now, epoch, readFunc) - if err != nil { - t.Fatal(err) - } - - if value != lastData { - t.Fatalf("Expected lookup to return the last written value: %v. Got %v", lastData, value) - } - - if readCount > readCountWithoutHint { - t.Fatalf("Expected lookup to complete with fewer or equal reads than %d since we provided a hint. Did %d reads.", readCountWithoutHint, readCount) - } - - for i := uint64(0); i <= 994; i++ { - T := uint64(now - 1000 + i) // update every second for the last 1000 seconds - value, err := lookup.Lookup(context.Background(), T, lookup.NoClue, readFunc) - if err != nil { - t.Fatal(err) - } - data, _ := value.(*Data) - if data == nil { - t.Fatalf("Expected lookup to return %d, got nil", T) - } - if data.Payload != T { - t.Fatalf("Expected lookup to return %d, got %d", T, data.Time) - } + // ### 3.- Test all algorithms + for _, algo := range algorithms { + t.Run(algo.Name, func(t *testing.T) { + store.Reset() // reset read counters for next test + + // ### 3.1.- Test how long it takes to find the last update without a hint: + timeElapsedWithoutHint := stopwatch.Measure(func() { + value, err := algo.Lookup(context.Background(), lastData.Time, lookup.NoClue, readFunc) + stopwatch.Stop() + if err != nil { + t.Fatal(err) + } + + if value != lastData { + t.Fatalf("Expected lookup to return the last written value: %v. Got %v", lastData, value) + } + }) + printMetric("SIMPLE", store, timeElapsedWithoutHint) + + // reset the read count for the next test + store.Reset() + + // ### 3.2.- Now test how long it takes to find the last update *with* a hint, + // it should take less time! + timeElapsed := stopwatch.Measure(func() { + // Provide a hint to get a faster lookup. In particular, we give the exact location of the last update + value, err := algo.Lookup(context.Background(), now, epoch, readFunc) + stopwatch.Stop() + if err != nil { + t.Fatal(err) + } + + if value != lastData { + t.Fatalf("Expected lookup to return the last written value: %v. Got %v", lastData, value) + } + + }) + if timeElapsed > timeElapsedWithoutHint { + t.Fatalf("Expected lookup to complete faster than %s since we provided a hint. Took %s", timeElapsedWithoutHint, timeElapsed) + } + printMetric("WITH HINT", store, timeElapsed) + + store.Reset() // reset read counters + + // ### 3.3.- Test multiple lookups at different intervals + timeElapsed = stopwatch.Measure(func() { + for i := uint64(0); i <= 10; i++ { + T := uint64(now - 1000 + i) + value, err := algo.Lookup(context.Background(), T, lookup.NoClue, readFunc) + if err != nil { + t.Fatal(err) + } + data, _ := value.(*Data) + if data == nil { + t.Fatalf("Expected lookup to return %d, got nil", T) + } + if data.Payload != T { + t.Fatalf("Expected lookup to return %d, got %d", T, data.Time) + } + } + }) + printMetric("MULTIPLE", store, timeElapsed) + }) } } +// TestSparseUpdates checks the lookup algorithm when +// updates come sparsely and in bursts func TestSparseUpdates(t *testing.T) { + // ### 1.- Initialize stopwatch time sim + stopwatch := NewStopwatch(50 * time.Millisecond) + lookup.TimeAfter = stopwatch.TimeAfter() + defer stopwatch.Stop() - store := make(Store) - readCount := 0 - readFunc := makeReadFunc(store, &readCount) - - // write an update every 5 years 3 times starting in Jan 1st 1970 and then silence + // ### 2.- Setup mock storage and write an updates sparsely in bursts, + // every 5 years 3 times starting in Jan 1st 1970 and then silence + store := NewStore(DefaultStoreConfig) + readFunc := store.MakeReadFunc() - now := uint64(1533799046) + now := uint64(633799046) var epoch lookup.Epoch var lastData *Data - for i := uint64(0); i < 5; i++ { - T := uint64(Year * 5 * i) // write an update every 5 years 3 times starting in Jan 1st 1970 and then silence - data := Data{ - Payload: T, //our "payload" will be the timestamp itself. - Time: T, + for i := uint64(0); i < 3; i++ { + for j := uint64(0); j < 10; j++ { + T := uint64(Year*5*i + j) // write a burst of 10 updates every 5 years 3 times starting in Jan 1st 1970 and then silence + data := Data{ + Payload: T, //our "payload" will be the timestamp itself. + Time: T, + } + epoch = store.Update(epoch, T, &data) + lastData = &data } - epoch = update(store, epoch, T, &data) - lastData = &data - } - - // try to get the last value - - value, err := lookup.Lookup(context.Background(), now, lookup.NoClue, readFunc) - if err != nil { - t.Fatal(err) - } - - readCountWithoutHint := readCount - - if value != lastData { - t.Fatalf("Expected lookup to return the last written value: %v. Got %v", lastData, value) } - // reset the read count for the next test - readCount = 0 - // Provide a hint to get a faster lookup. In particular, we give the exact location of the last update - value, err = lookup.Lookup(context.Background(), now, epoch, readFunc) - if err != nil { - t.Fatal(err) + // ### 3.- Test all algorithms + for _, algo := range algorithms { + t.Run(algo.Name, func(t *testing.T) { + store.Reset() // reset read counters for next test + + // ### 3.1.- Test how long it takes to find the last update without a hint: + timeElapsedWithoutHint := stopwatch.Measure(func() { + value, err := algo.Lookup(context.Background(), now, lookup.NoClue, readFunc) + stopwatch.Stop() + if err != nil { + t.Fatal(err) + } + + if value != lastData { + t.Fatalf("Expected lookup to return the last written value: %v. Got %v", lastData, value) + } + }) + printMetric("SIMPLE", store, timeElapsedWithoutHint) + + // reset the read count for the next test + store.Reset() + + // ### 3.2.- Now test how long it takes to find the last update *with* a hint, + // it should take less time! + timeElapsed := stopwatch.Measure(func() { + value, err := algo.Lookup(context.Background(), now, epoch, readFunc) + if err != nil { + t.Fatal(err) + } + + if value != lastData { + t.Fatalf("Expected lookup to return the last written value: %v. Got %v", lastData, value) + } + }) + if timeElapsed > timeElapsedWithoutHint { + t.Fatalf("Expected lookup to complete faster than %s since we provided a hint. Took %s", timeElapsedWithoutHint, timeElapsed) + } + + printMetric("WITH HINT", store, stopwatch.Elapsed()) + + }) } - - if value != lastData { - t.Fatalf("Expected lookup to return the last written value: %v. Got %v", lastData, value) - } - - if readCount > readCountWithoutHint { - t.Fatalf("Expected lookup to complete with fewer reads than %d since we provided a hint. Did %d reads.", readCountWithoutHint, readCount) - } - } // testG will hold precooked test results @@ -447,8 +572,9 @@ type testG struct { } // test cases -var testGetNextLevelCases = []testG{{e: lookup.Epoch{Time: 989875233, Level: 12}, n: 989875233, x: 11}, {e: lookup.Epoch{Time: 995807650, Level: 18}, n: 995598156, x: 19}, {e: lookup.Epoch{Time: 969167082, Level: 0}, n: 968990357, x: 18}, {e: lookup.Epoch{Time: 993087628, Level: 14}, n: 992987044, x: 20}, {e: lookup.Epoch{Time: 963364631, Level: 20}, n: 963364630, x: 19}, {e: lookup.Epoch{Time: 963497510, Level: 16}, n: 963370732, x: 18}, {e: lookup.Epoch{Time: 955421349, Level: 22}, n: 955421348, x: 21}, {e: lookup.Epoch{Time: 968220379, Level: 15}, n: 968220378, x: 14}, {e: lookup.Epoch{Time: 939129014, Level: 6}, n: 939128771, x: 11}, {e: lookup.Epoch{Time: 907847903, Level: 6}, n: 907791833, x: 18}, {e: lookup.Epoch{Time: 910835564, Level: 15}, n: 910835564, x: 14}, {e: lookup.Epoch{Time: 913578333, Level: 22}, n: 881808431, x: 25}, {e: lookup.Epoch{Time: 895818460, Level: 3}, n: 895818132, x: 9}, {e: lookup.Epoch{Time: 903843025, Level: 24}, n: 895609561, x: 23}, {e: lookup.Epoch{Time: 877889433, Level: 13}, n: 877877093, x: 15}, {e: lookup.Epoch{Time: 901450396, Level: 10}, n: 901450058, x: 9}, {e: lookup.Epoch{Time: 925179910, Level: 3}, n: 925168393, x: 16}, {e: lookup.Epoch{Time: 913485477, Level: 21}, n: 913485476, x: 20}, {e: lookup.Epoch{Time: 924462991, Level: 18}, n: 924462990, x: 17}, {e: lookup.Epoch{Time: 941175128, Level: 13}, n: 941175127, x: 12}, {e: lookup.Epoch{Time: 920126583, Level: 3}, n: 920100782, x: 19}, {e: lookup.Epoch{Time: 932403200, Level: 9}, n: 932279891, x: 17}, {e: lookup.Epoch{Time: 948284931, Level: 2}, n: 948284921, x: 9}, {e: lookup.Epoch{Time: 953540997, Level: 7}, n: 950547986, x: 22}, {e: lookup.Epoch{Time: 926639837, Level: 18}, n: 918608882, x: 24}, {e: lookup.Epoch{Time: 954637598, Level: 1}, n: 954578761, x: 17}, {e: lookup.Epoch{Time: 943482981, Level: 10}, n: 942924151, x: 19}, {e: lookup.Epoch{Time: 963580771, Level: 7}, n: 963580771, x: 6}, {e: lookup.Epoch{Time: 993744930, Level: 7}, n: 993690858, x: 16}, {e: lookup.Epoch{Time: 1018890213, Level: 12}, n: 1018890212, x: 11}, {e: lookup.Epoch{Time: 1030309411, Level: 2}, n: 1030309227, x: 9}, {e: lookup.Epoch{Time: 1063204997, Level: 20}, n: 1063204996, x: 19}, {e: lookup.Epoch{Time: 1094340832, Level: 6}, n: 1094340633, x: 7}, {e: lookup.Epoch{Time: 1077880597, Level: 10}, n: 1075914292, x: 20}, {e: lookup.Epoch{Time: 1051114957, Level: 18}, n: 1051114957, x: 17}, {e: lookup.Epoch{Time: 1045649701, Level: 22}, n: 1045649700, x: 21}, {e: lookup.Epoch{Time: 1066198885, Level: 14}, n: 1066198884, x: 13}, {e: lookup.Epoch{Time: 1053231952, Level: 1}, n: 1053210845, x: 16}, {e: lookup.Epoch{Time: 1068763404, Level: 14}, n: 1068675428, x: 18}, {e: lookup.Epoch{Time: 1039042173, Level: 15}, n: 1038973110, x: 17}, {e: lookup.Epoch{Time: 1050747636, Level: 6}, n: 1050747364, x: 9}, {e: lookup.Epoch{Time: 1030034434, Level: 23}, n: 1030034433, x: 22}, {e: lookup.Epoch{Time: 1003783425, Level: 18}, n: 1003783424, x: 17}, {e: lookup.Epoch{Time: 988163976, Level: 15}, n: 988084064, x: 17}, {e: lookup.Epoch{Time: 1007222377, Level: 15}, n: 1007222377, x: 14}, {e: lookup.Epoch{Time: 1001211375, Level: 13}, n: 1001208178, x: 14}, {e: lookup.Epoch{Time: 997623199, Level: 8}, n: 997623198, x: 7}, {e: lookup.Epoch{Time: 1026283830, Level: 10}, n: 1006681704, x: 24}, {e: lookup.Epoch{Time: 1019421907, Level: 20}, n: 1019421906, x: 19}, {e: lookup.Epoch{Time: 1043154306, Level: 16}, n: 1043108343, x: 16}, {e: lookup.Epoch{Time: 1075643767, Level: 17}, n: 1075325898, x: 18}, {e: lookup.Epoch{Time: 1043726309, Level: 20}, n: 1043726308, x: 19}, {e: lookup.Epoch{Time: 1056415324, Level: 17}, n: 1056415324, x: 16}, {e: lookup.Epoch{Time: 1088650219, Level: 13}, n: 1088650218, x: 12}, {e: lookup.Epoch{Time: 1088551662, Level: 7}, n: 1088543355, x: 13}, {e: lookup.Epoch{Time: 1069667265, Level: 6}, n: 1069667075, x: 7}, {e: lookup.Epoch{Time: 1079145970, Level: 18}, n: 1079145969, x: 17}, {e: lookup.Epoch{Time: 1083338876, Level: 7}, n: 1083338875, x: 6}, {e: lookup.Epoch{Time: 1051581086, Level: 4}, n: 1051568869, x: 14}, {e: lookup.Epoch{Time: 1028430882, Level: 4}, n: 1028430864, x: 5}, {e: lookup.Epoch{Time: 1057356462, Level: 1}, n: 1057356417, x: 5}, {e: lookup.Epoch{Time: 1033104266, Level: 0}, n: 1033097479, x: 13}, {e: lookup.Epoch{Time: 1031391367, Level: 11}, n: 1031387304, x: 14}, {e: lookup.Epoch{Time: 1049781164, Level: 15}, n: 1049781163, x: 14}, {e: lookup.Epoch{Time: 1027271628, Level: 12}, n: 1027271627, x: 11}, {e: lookup.Epoch{Time: 1057270560, Level: 23}, n: 1057270560, x: 22}, {e: lookup.Epoch{Time: 1047501317, Level: 15}, n: 1047501317, x: 14}, {e: lookup.Epoch{Time: 1058349035, Level: 11}, n: 1045175573, x: 24}, {e: lookup.Epoch{Time: 1057396147, Level: 20}, n: 1057396147, x: 19}, {e: lookup.Epoch{Time: 1048906375, Level: 18}, n: 1039616919, x: 25}, {e: lookup.Epoch{Time: 1074294831, Level: 20}, n: 1074294831, x: 19}, {e: lookup.Epoch{Time: 1088946052, Level: 1}, n: 1088917364, x: 14}, {e: lookup.Epoch{Time: 1112337595, Level: 17}, n: 1111008110, x: 22}, {e: lookup.Epoch{Time: 1099990284, Level: 5}, n: 1099968370, x: 15}, {e: lookup.Epoch{Time: 1087036441, Level: 16}, n: 1053967855, x: 25}, {e: lookup.Epoch{Time: 1069225185, Level: 8}, n: 1069224660, x: 10}, {e: lookup.Epoch{Time: 1057505479, Level: 9}, n: 1057505170, x: 14}, {e: lookup.Epoch{Time: 1072381377, Level: 12}, n: 1065950959, x: 22}, {e: lookup.Epoch{Time: 1093887139, Level: 8}, n: 1093863305, x: 14}, {e: lookup.Epoch{Time: 1082366510, Level: 24}, n: 1082366510, x: 23}, {e: lookup.Epoch{Time: 1103231132, Level: 14}, n: 1102292201, x: 22}, {e: lookup.Epoch{Time: 1094502355, Level: 3}, n: 1094324652, x: 18}, {e: lookup.Epoch{Time: 1068488344, Level: 12}, n: 1067577330, x: 19}, {e: lookup.Epoch{Time: 1050278233, Level: 12}, n: 1050278232, x: 11}, {e: lookup.Epoch{Time: 1047660768, Level: 5}, n: 1047652137, x: 17}, {e: lookup.Epoch{Time: 1060116167, Level: 11}, n: 1060114091, x: 12}, {e: lookup.Epoch{Time: 1068149392, Level: 21}, n: 1052074801, x: 24}, {e: lookup.Epoch{Time: 1081934120, Level: 6}, n: 1081933847, x: 8}, {e: lookup.Epoch{Time: 1107943693, Level: 16}, n: 1107096139, x: 25}, {e: lookup.Epoch{Time: 1131571649, Level: 9}, n: 1131570428, x: 11}, {e: lookup.Epoch{Time: 1123139367, Level: 0}, n: 1122912198, x: 20}, {e: lookup.Epoch{Time: 1121144423, Level: 6}, n: 1120568289, x: 20}, {e: lookup.Epoch{Time: 1089932411, Level: 17}, n: 1089932410, x: 16}, {e: lookup.Epoch{Time: 1104899012, Level: 22}, n: 1098978789, x: 22}, {e: lookup.Epoch{Time: 1094588059, Level: 21}, n: 1094588059, x: 20}, {e: lookup.Epoch{Time: 1114987438, Level: 24}, n: 1114987437, x: 23}, {e: lookup.Epoch{Time: 1084186305, Level: 7}, n: 1084186241, x: 6}, {e: lookup.Epoch{Time: 1058827111, Level: 8}, n: 1058826504, x: 9}, {e: lookup.Epoch{Time: 1090679810, Level: 12}, n: 1090616539, x: 17}, {e: lookup.Epoch{Time: 1084299475, Level: 23}, n: 1084299475, x: 22}} +var testGetNextLevelCases = []testG{{e: lookup.Epoch{Time: 989875233, Level: 12}, n: 989807323, x: 24}, {e: lookup.Epoch{Time: 995807650, Level: 18}, n: 995807649, x: 17}, {e: lookup.Epoch{Time: 969167082, Level: 0}, n: 969111431, x: 18}, {e: lookup.Epoch{Time: 993087628, Level: 14}, n: 993087627, x: 13}, {e: lookup.Epoch{Time: 963364631, Level: 20}, n: 962941578, x: 19}, {e: lookup.Epoch{Time: 963497510, Level: 16}, n: 963497509, x: 15}, {e: lookup.Epoch{Time: 955421349, Level: 22}, n: 929292183, x: 27}, {e: lookup.Epoch{Time: 968220379, Level: 15}, n: 968220378, x: 14}, {e: lookup.Epoch{Time: 939129014, Level: 6}, n: 939126953, x: 11}, {e: lookup.Epoch{Time: 907847903, Level: 6}, n: 907846146, x: 11}, {e: lookup.Epoch{Time: 910835564, Level: 15}, n: 703619757, x: 28}, {e: lookup.Epoch{Time: 913578333, Level: 22}, n: 913578332, x: 21}, {e: lookup.Epoch{Time: 895818460, Level: 3}, n: 895818132, x: 9}, {e: lookup.Epoch{Time: 903843025, Level: 24}, n: 903843025, x: 23}, {e: lookup.Epoch{Time: 877889433, Level: 13}, n: 149120378, x: 29}, {e: lookup.Epoch{Time: 901450396, Level: 10}, n: 858997793, x: 26}, {e: lookup.Epoch{Time: 925179910, Level: 3}, n: 925177237, x: 13}, {e: lookup.Epoch{Time: 913485477, Level: 21}, n: 907146511, x: 22}, {e: lookup.Epoch{Time: 924462991, Level: 18}, n: 924462990, x: 17}, {e: lookup.Epoch{Time: 941175128, Level: 13}, n: 941168924, x: 13}, {e: lookup.Epoch{Time: 920126583, Level: 3}, n: 538054817, x: 28}, {e: lookup.Epoch{Time: 891721312, Level: 18}, n: 890975671, x: 21}, {e: lookup.Epoch{Time: 920397342, Level: 11}, n: 920396960, x: 10}, {e: lookup.Epoch{Time: 953406530, Level: 3}, n: 953406530, x: 2}, {e: lookup.Epoch{Time: 920024527, Level: 23}, n: 920024527, x: 22}, {e: lookup.Epoch{Time: 927050922, Level: 7}, n: 927049632, x: 11}, {e: lookup.Epoch{Time: 894599900, Level: 10}, n: 890021707, x: 22}, {e: lookup.Epoch{Time: 883010150, Level: 3}, n: 882969902, x: 15}, {e: lookup.Epoch{Time: 855561102, Level: 22}, n: 855561102, x: 21}, {e: lookup.Epoch{Time: 828245477, Level: 19}, n: 825245571, x: 22}, {e: lookup.Epoch{Time: 851095026, Level: 4}, n: 851083702, x: 13}, {e: lookup.Epoch{Time: 879209039, Level: 11}, n: 879209039, x: 10}, {e: lookup.Epoch{Time: 859265651, Level: 0}, n: 840582083, x: 24}, {e: lookup.Epoch{Time: 827349870, Level: 24}, n: 827349869, x: 23}, {e: lookup.Epoch{Time: 819602318, Level: 3}, n: 18446744073490860182, x: 31}, {e: lookup.Epoch{Time: 849708538, Level: 7}, n: 849708538, x: 6}, {e: lookup.Epoch{Time: 873885094, Level: 11}, n: 873881798, x: 11}, {e: lookup.Epoch{Time: 852169070, Level: 1}, n: 852049399, x: 17}, {e: lookup.Epoch{Time: 852885343, Level: 8}, n: 852875652, x: 13}, {e: lookup.Epoch{Time: 830957057, Level: 8}, n: 830955867, x: 10}, {e: lookup.Epoch{Time: 807353611, Level: 4}, n: 807325211, x: 16}, {e: lookup.Epoch{Time: 803198793, Level: 8}, n: 696477575, x: 26}, {e: lookup.Epoch{Time: 791356887, Level: 10}, n: 791356003, x: 10}, {e: lookup.Epoch{Time: 817771215, Level: 12}, n: 817708431, x: 17}, {e: lookup.Epoch{Time: 846211146, Level: 14}, n: 846211146, x: 13}, {e: lookup.Epoch{Time: 821849822, Level: 9}, n: 821849229, x: 9}, {e: lookup.Epoch{Time: 789508756, Level: 9}, n: 789508755, x: 8}, {e: lookup.Epoch{Time: 814088521, Level: 12}, n: 814088512, x: 11}, {e: lookup.Epoch{Time: 813665673, Level: 6}, n: 813548257, x: 17}, {e: lookup.Epoch{Time: 791472209, Level: 6}, n: 720857845, x: 26}, {e: lookup.Epoch{Time: 805687744, Level: 2}, n: 805687720, x: 6}, {e: lookup.Epoch{Time: 783153927, Level: 12}, n: 783134053, x: 14}, {e: lookup.Epoch{Time: 815033655, Level: 11}, n: 815033654, x: 10}, {e: lookup.Epoch{Time: 821184581, Level: 6}, n: 821184464, x: 11}, {e: lookup.Epoch{Time: 841908114, Level: 2}, n: 841636025, x: 18}, {e: lookup.Epoch{Time: 862969167, Level: 20}, n: 862919955, x: 19}, {e: lookup.Epoch{Time: 887604565, Level: 21}, n: 887604564, x: 20}, {e: lookup.Epoch{Time: 863723789, Level: 10}, n: 858274530, x: 22}, {e: lookup.Epoch{Time: 851533290, Level: 10}, n: 851531385, x: 11}, {e: lookup.Epoch{Time: 826032484, Level: 14}, n: 826032484, x: 13}, {e: lookup.Epoch{Time: 819401505, Level: 7}, n: 818943526, x: 18}, {e: lookup.Epoch{Time: 800886832, Level: 12}, n: 800563106, x: 19}, {e: lookup.Epoch{Time: 780767476, Level: 10}, n: 694450997, x: 26}, {e: lookup.Epoch{Time: 789209418, Level: 15}, n: 789209417, x: 14}, {e: lookup.Epoch{Time: 816086666, Level: 9}, n: 816034646, x: 18}, {e: lookup.Epoch{Time: 835407077, Level: 21}, n: 835407076, x: 20}, {e: lookup.Epoch{Time: 846527322, Level: 20}, n: 846527321, x: 19}, {e: lookup.Epoch{Time: 850131130, Level: 19}, n: 18446744073670013406, x: 31}, {e: lookup.Epoch{Time: 842248607, Level: 24}, n: 783963834, x: 28}, {e: lookup.Epoch{Time: 816181999, Level: 2}, n: 816124867, x: 15}, {e: lookup.Epoch{Time: 806627026, Level: 17}, n: 756013427, x: 28}, {e: lookup.Epoch{Time: 826223084, Level: 4}, n: 826169865, x: 16}, {e: lookup.Epoch{Time: 835380147, Level: 21}, n: 835380147, x: 20}, {e: lookup.Epoch{Time: 860137874, Level: 3}, n: 860137782, x: 7}, {e: lookup.Epoch{Time: 860623757, Level: 8}, n: 860621582, x: 12}, {e: lookup.Epoch{Time: 875464114, Level: 24}, n: 875464114, x: 23}, {e: lookup.Epoch{Time: 853804052, Level: 6}, n: 853804051, x: 5}, {e: lookup.Epoch{Time: 864150903, Level: 14}, n: 854360673, x: 24}, {e: lookup.Epoch{Time: 850104561, Level: 23}, n: 850104561, x: 22}, {e: lookup.Epoch{Time: 878020186, Level: 24}, n: 878020186, x: 23}, {e: lookup.Epoch{Time: 900150940, Level: 8}, n: 899224760, x: 21}, {e: lookup.Epoch{Time: 869566202, Level: 2}, n: 869566199, x: 3}, {e: lookup.Epoch{Time: 851878045, Level: 5}, n: 851878045, x: 4}, {e: lookup.Epoch{Time: 824469671, Level: 12}, n: 824466504, x: 13}, {e: lookup.Epoch{Time: 819830223, Level: 9}, n: 816550241, x: 22}, {e: lookup.Epoch{Time: 813720249, Level: 20}, n: 801351581, x: 28}, {e: lookup.Epoch{Time: 831200185, Level: 20}, n: 830760165, x: 19}, {e: lookup.Epoch{Time: 838915973, Level: 9}, n: 838915972, x: 8}, {e: lookup.Epoch{Time: 812902644, Level: 5}, n: 812902644, x: 4}, {e: lookup.Epoch{Time: 812755887, Level: 3}, n: 812755887, x: 2}, {e: lookup.Epoch{Time: 822497779, Level: 8}, n: 822486000, x: 14}, {e: lookup.Epoch{Time: 832407585, Level: 9}, n: 579450238, x: 28}, {e: lookup.Epoch{Time: 799645403, Level: 23}, n: 799645403, x: 22}, {e: lookup.Epoch{Time: 827279665, Level: 2}, n: 826723872, x: 19}, {e: lookup.Epoch{Time: 846062554, Level: 6}, n: 765881119, x: 28}, {e: lookup.Epoch{Time: 855122998, Level: 6}, n: 855122978, x: 5}, {e: lookup.Epoch{Time: 841905104, Level: 4}, n: 751401236, x: 28}, {e: lookup.Epoch{Time: 857737438, Level: 12}, n: 325468127, x: 29}, {e: lookup.Epoch{Time: 838103691, Level: 18}, n: 779030823, x: 28}, {e: lookup.Epoch{Time: 841581240, Level: 22}, n: 841581239, x: 21}} +// TestGetNextLevel tests the lookup.GetNextLevel function func TestGetNextLevel(t *testing.T) { // First, test well-known cases @@ -492,7 +618,7 @@ func TestGetNextLevel(t *testing.T) { } -// cookGetNextLevelTests is used to generate a deterministic +// CookGetNextLevelTests is used to generate a deterministic // set of cases for TestGetNextLevel and thus "freeze" its current behavior func CookGetNextLevelTests(t *testing.T) { st := "" diff --git a/swarm/storage/feed/lookup/store_test.go b/swarm/storage/feed/lookup/store_test.go new file mode 100644 index 000000000..ed5209319 --- /dev/null +++ b/swarm/storage/feed/lookup/store_test.go @@ -0,0 +1,154 @@ +package lookup_test + +/* +This file contains components to mock a storage for testing +lookup algorithms and measure the number of reads. +*/ + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/ethereum/go-ethereum/swarm/log" + "github.com/ethereum/go-ethereum/swarm/storage/feed/lookup" +) + +// Data is a struct to keep a value to store/retrieve during testing +type Data struct { + Payload uint64 + Time uint64 +} + +// String implements fmt.Stringer +func (d *Data) String() string { + return fmt.Sprintf("%d-%d", d.Payload, d.Time) +} + +// Datamap is an internal map to hold the mocked storage +type DataMap map[lookup.EpochID]*Data + +// StoreConfig allows to specify the simulated delays for each type of +// read operation +type StoreConfig struct { + CacheReadTime time.Duration // time it takes to read from the cache + FailedReadTime time.Duration // time it takes to acknowledge a read as failed + SuccessfulReadTime time.Duration // time it takes to fetch data +} + +// StoreCounters will track read count metrics +type StoreCounters struct { + reads int + cacheHits int + failed int + successful int + canceled int + maxSimultaneous int +} + +// Store simulates a store and keeps track of performance counters +type Store struct { + StoreConfig + StoreCounters + data DataMap + cache DataMap + lock sync.RWMutex + activeReads int +} + +// NewStore returns a new mock store ready for use +func NewStore(config *StoreConfig) *Store { + store := &Store{ + StoreConfig: *config, + data: make(DataMap), + } + + store.Reset() + return store +} + +// Reset reset performance counters and clears the cache +func (s *Store) Reset() { + s.cache = make(DataMap) + s.StoreCounters = StoreCounters{} +} + +// Put stores a value in the mock store at the given epoch +func (s *Store) Put(epoch lookup.Epoch, value *Data) { + log.Debug("Write: %d-%d, value='%d'\n", epoch.Base(), epoch.Level, value.Payload) + s.data[epoch.ID()] = value +} + +// Update runs the seed algorithm to place the update in the appropriate epoch +func (s *Store) Update(last lookup.Epoch, now uint64, value *Data) lookup.Epoch { + epoch := lookup.GetNextEpoch(last, now) + s.Put(epoch, value) + return epoch +} + +// Get retrieves data at the specified epoch, simulating a delay +func (s *Store) Get(ctx context.Context, epoch lookup.Epoch, now uint64) (value interface{}, err error) { + epochID := epoch.ID() + var operationTime time.Duration + + defer func() { // simulate a delay according to what has actually happened + select { + case <-lookup.TimeAfter(operationTime): + case <-ctx.Done(): + s.lock.Lock() + s.canceled++ + s.lock.Unlock() + value = nil + err = ctx.Err() + } + s.lock.Lock() + s.activeReads-- + s.lock.Unlock() + }() + + s.lock.Lock() + defer s.lock.Unlock() + s.reads++ + s.activeReads++ + if s.activeReads > s.maxSimultaneous { + s.maxSimultaneous = s.activeReads + } + + // 1.- Simulate a cache read + item := s.cache[epochID] + operationTime += s.CacheReadTime + + if item != nil { + s.cacheHits++ + if item.Time <= now { + s.successful++ + return item, nil + } + return nil, nil + } + + // 2.- simulate a full read + + item = s.data[epochID] + if item != nil { + operationTime += s.SuccessfulReadTime + s.successful++ + s.cache[epochID] = item + if item.Time <= now { + return item, nil + } + } else { + operationTime += s.FailedReadTime + s.failed++ + } + return nil, nil +} + +// MakeReadFunc returns a read function suitable for the lookup algorithm, mapped +// to this mock storage +func (s *Store) MakeReadFunc() lookup.ReadFunc { + return func(ctx context.Context, epoch lookup.Epoch, now uint64) (interface{}, error) { + return s.Get(ctx, epoch, now) + } +} diff --git a/swarm/storage/feed/lookup/timesim_test.go b/swarm/storage/feed/lookup/timesim_test.go new file mode 100644 index 000000000..2a254188c --- /dev/null +++ b/swarm/storage/feed/lookup/timesim_test.go @@ -0,0 +1,128 @@ +package lookup_test + +// This file contains simple time simulation tools for testing +// and measuring time-aware algorithms + +import ( + "sync" + "time" +) + +// Timer tracks information about a simulated timer +type Timer struct { + deadline time.Time + signal chan time.Time + id int +} + +// Stopwatch measures simulated execution time and manages simulated timers +type Stopwatch struct { + t time.Time + resolution time.Duration + timers map[int]*Timer + timerCounter int + stopSignal chan struct{} + lock sync.RWMutex +} + +// NewStopwatch returns a simulated clock that ticks on `resolution` intervals +func NewStopwatch(resolution time.Duration) *Stopwatch { + s := &Stopwatch{ + resolution: resolution, + } + s.Reset() + return s +} + +// Reset clears all timers and sents the stopwatch to zero +func (s *Stopwatch) Reset() { + s.t = time.Time{} + s.timers = make(map[int]*Timer) + s.Stop() +} + +// Tick advances simulated time by the stopwatch's resolution and triggers +// all due timers +func (s *Stopwatch) Tick() { + s.t = s.t.Add(s.resolution) + + s.lock.Lock() + defer s.lock.Unlock() + + for id, timer := range s.timers { + if s.t.After(timer.deadline) || s.t.Equal(timer.deadline) { + timer.signal <- s.t + close(timer.signal) + delete(s.timers, id) + } + } +} + +// NewTimer returns a new timer that will trigger after `duration` elapses in the +// simulation +func (s *Stopwatch) NewTimer(duration time.Duration) <-chan time.Time { + s.lock.Lock() + defer s.lock.Unlock() + + s.timerCounter++ + timer := &Timer{ + deadline: s.t.Add(duration), + signal: make(chan time.Time, 1), + id: s.timerCounter, + } + + s.timers[timer.id] = timer + return timer.signal +} + +// TimeAfter returns a simulated timer factory that can replace `time.After` +func (s *Stopwatch) TimeAfter() func(d time.Duration) <-chan time.Time { + return func(d time.Duration) <-chan time.Time { + return s.NewTimer(d) + } +} + +// Elapsed returns the time that has passed in the simulation +func (s *Stopwatch) Elapsed() time.Duration { + return s.t.Sub(time.Time{}) +} + +// Run starts the time simulation +func (s *Stopwatch) Run() { + go func() { + stopSignal := make(chan struct{}) + s.lock.Lock() + if s.stopSignal != nil { + close(s.stopSignal) + } + s.stopSignal = stopSignal + s.lock.Unlock() + for { + select { + case <-time.After(1 * time.Millisecond): + s.Tick() + case <-stopSignal: + return + } + } + }() +} + +// Stop stops the time simulation +func (s *Stopwatch) Stop() { + s.lock.Lock() + defer s.lock.Unlock() + + if s.stopSignal != nil { + close(s.stopSignal) + s.stopSignal = nil + } +} + +func (s *Stopwatch) Measure(measuredFunc func()) time.Duration { + s.Reset() + s.Run() + defer s.Stop() + measuredFunc() + return s.Elapsed() +} diff --git a/swarm/storage/feed/query_test.go b/swarm/storage/feed/query_test.go index 9fa5e2980..1ec45762e 100644 --- a/swarm/storage/feed/query_test.go +++ b/swarm/storage/feed/query_test.go @@ -30,7 +30,7 @@ func getTestQuery() *Query { } func TestQueryValues(t *testing.T) { - var expected = KV{"hint.level": "25", "hint.time": "1000", "time": "5000", "topic": "0x776f726c64206e657773207265706f72742c20657665727920686f7572000000", "user": "0x876A8936A7Cd0b79Ef0735AD0896c1AFe278781c"} + var expected = KV{"hint.level": "31", "hint.time": "1000", "time": "5000", "topic": "0x776f726c64206e657773207265706f72742c20657665727920686f7572000000", "user": "0x876A8936A7Cd0b79Ef0735AD0896c1AFe278781c"} query := getTestQuery() testValueSerializer(t, query, expected) diff --git a/swarm/storage/feed/request_test.go b/swarm/storage/feed/request_test.go index c30158fdd..b9c1381c6 100644 --- a/swarm/storage/feed/request_test.go +++ b/swarm/storage/feed/request_test.go @@ -223,7 +223,7 @@ func TestUpdateChunkSerializationErrorChecking(t *testing.T) { t.Fatalf("error creating update chunk:%s", err) } - compareByteSliceToExpectedHex(t, "chunk", chunk.Data(), "0x0000000000000000776f726c64206e657773207265706f72742c20657665727920686f7572000000876a8936a7cd0b79ef0735ad0896c1afe278781ce803000000000019416c206269656e206861636572206a616dc3a173206c652066616c7461207072656d696f5a0ffe0bc27f207cd5b00944c8b9cee93e08b89b5ada777f123ac535189333f174a6a4ca2f43a92c4a477a49d774813c36ce8288552c58e6205b0ac35d0507eb00") + compareByteSliceToExpectedHex(t, "chunk", chunk.Data(), "0x0000000000000000776f726c64206e657773207265706f72742c20657665727920686f7572000000876a8936a7cd0b79ef0735ad0896c1afe278781ce80300000000001f416c206269656e206861636572206a616dc3a173206c652066616c7461207072656d696f9896df5937e64e51a7994479ff3fe0ed790d539b9b3e85e93c0014a8a64374f23603c79d16e99b50a757896d3816d7022ac594ad1415679a9b164afb2e5926d801") var recovered Request recovered.fromChunk(chunk) diff --git a/swarm/storage/feed/update_test.go b/swarm/storage/feed/update_test.go index 24c09b361..e4e0963e9 100644 --- a/swarm/storage/feed/update_test.go +++ b/swarm/storage/feed/update_test.go @@ -28,7 +28,7 @@ func getTestFeedUpdate() *Update { } func TestUpdateSerializer(t *testing.T) { - testBinarySerializerRecovery(t, getTestFeedUpdate(), "0x0000000000000000776f726c64206e657773207265706f72742c20657665727920686f7572000000876a8936a7cd0b79ef0735ad0896c1afe278781ce803000000000019456c20717565206c6565206d7563686f207920616e6461206d7563686f2c207665206d7563686f20792073616265206d7563686f") + testBinarySerializerRecovery(t, getTestFeedUpdate(), "0x0000000000000000776f726c64206e657773207265706f72742c20657665727920686f7572000000876a8936a7cd0b79ef0735ad0896c1afe278781ce80300000000001f456c20717565206c6565206d7563686f207920616e6461206d7563686f2c207665206d7563686f20792073616265206d7563686f") } func TestUpdateLengthCheck(t *testing.T) { |