diff options
Diffstat (limited to 'swarm/storage/mru/handler.go')
-rw-r--r-- | swarm/storage/mru/handler.go | 418 |
1 files changed, 100 insertions, 318 deletions
diff --git a/swarm/storage/mru/handler.go b/swarm/storage/mru/handler.go index 18c667f14..3e7654795 100644 --- a/swarm/storage/mru/handler.go +++ b/swarm/storage/mru/handler.go @@ -21,11 +21,12 @@ package mru import ( "bytes" "context" + "fmt" "sync" "time" - "unsafe" - "github.com/ethereum/go-ethereum/swarm/chunk" + "github.com/ethereum/go-ethereum/swarm/storage/mru/lookup" + "github.com/ethereum/go-ethereum/swarm/log" "github.com/ethereum/go-ethereum/swarm/storage" ) @@ -33,7 +34,7 @@ import ( type Handler struct { chunkStore *storage.NetStore HashSize int - resources map[uint64]*resource + resources map[uint64]*cacheEntry resourceLock sync.RWMutex storeTimeout time.Duration queryMaxPeriods uint32 @@ -42,12 +43,10 @@ type Handler struct { // HandlerParams pass parameters to the Handler constructor NewHandler // Signer and TimestampProvider are mandatory parameters type HandlerParams struct { - QueryMaxPeriods uint32 } // hashPool contains a pool of ready hashers var hashPool sync.Pool -var minimumChunkLength int // init initializes the package and hashPool func init() { @@ -56,19 +55,12 @@ func init() { return storage.MakeHashFunc(resourceHashAlgorithm)() }, } - if minimumMetadataLength < minimumUpdateDataLength { - minimumChunkLength = minimumMetadataLength - } else { - minimumChunkLength = minimumUpdateDataLength - } } // NewHandler creates a new Mutable Resource API func NewHandler(params *HandlerParams) *Handler { rh := &Handler{ - resources: make(map[uint64]*resource), - storeTimeout: defaultStoreTimeout, - queryMaxPeriods: params.QueryMaxPeriods, + resources: make(map[uint64]*cacheEntry), } for i := 0; i < hasherCount; i++ { @@ -88,44 +80,25 @@ func (h *Handler) SetStore(store *storage.NetStore) { } // Validate is a chunk validation method -// If it looks like a resource update, the chunk address is checked against the ownerAddr of the update's signature +// If it looks like a resource update, the chunk address is checked against the userAddr of the update's signature // It implements the storage.ChunkValidator interface func (h *Handler) Validate(chunkAddr storage.Address, data []byte) bool { dataLength := len(data) - if dataLength < minimumChunkLength || dataLength > chunk.DefaultSize+8 { + if dataLength < minimumSignedUpdateLength { return false } - //metadata chunks have the first two bytes set to zero - if data[0] == 0 && data[1] == 0 && dataLength >= minimumMetadataLength { - //metadata chunk - rootAddr, _ := metadataHash(data) - valid := bytes.Equal(chunkAddr, rootAddr) - if !valid { - log.Debug("Invalid root metadata chunk with address", "addr", chunkAddr.Hex()) - } - return valid - } - - // if it is not a metadata chunk, check if it is a properly formatted update chunk with + // check if it is a properly formatted update chunk with // valid signature and proof of ownership of the resource it is trying // to update // First, deserialize the chunk - var r SignedResourceUpdate + var r Request if err := r.fromChunk(chunkAddr, data); err != nil { log.Debug("Invalid resource chunk", "addr", chunkAddr.Hex(), "err", err.Error()) return false } - // check that the lookup information contained in the chunk matches the updateAddr (chunk search key) - // that was used to retrieve this chunk - // if this validation fails, someone forged a chunk. - if !bytes.Equal(chunkAddr, r.updateHeader.UpdateAddr()) { - log.Debug("period,version,rootAddr contained in update chunk do not match updateAddr", "addr", chunkAddr.Hex()) - return false - } - // Verify signatures and that the signer actually owns the resource // If it fails, it means either the signature is not valid, data is corrupted // or someone is trying to update someone else's resource. @@ -138,301 +111,134 @@ func (h *Handler) Validate(chunkAddr storage.Address, data []byte) bool { } // GetContent retrieves the data payload of the last synced update of the Mutable Resource -func (h *Handler) GetContent(rootAddr storage.Address) (storage.Address, []byte, error) { - rsrc := h.get(rootAddr) - if rsrc == nil || !rsrc.isSynced() { - return nil, nil, NewError(ErrNotFound, " does not exist or is not synced") - } - return rsrc.lastKey, rsrc.data, nil -} - -// GetLastPeriod retrieves the period of the last synced update of the Mutable Resource -func (h *Handler) GetLastPeriod(rootAddr storage.Address) (uint32, error) { - rsrc := h.get(rootAddr) - if rsrc == nil { - return 0, NewError(ErrNotFound, " does not exist") - } else if !rsrc.isSynced() { - return 0, NewError(ErrNotSynced, " is not synced") +func (h *Handler) GetContent(view *View) (storage.Address, []byte, error) { + if view == nil { + return nil, nil, NewError(ErrInvalidValue, "view is nil") } - return rsrc.period, nil -} - -// GetVersion retrieves the period of the last synced update of the Mutable Resource -func (h *Handler) GetVersion(rootAddr storage.Address) (uint32, error) { - rsrc := h.get(rootAddr) + rsrc := h.get(view) if rsrc == nil { - return 0, NewError(ErrNotFound, " does not exist") - } else if !rsrc.isSynced() { - return 0, NewError(ErrNotSynced, " is not synced") - } - return rsrc.version, nil -} - -// New creates a new metadata chunk out of the request passed in. -func (h *Handler) New(ctx context.Context, request *Request) error { - - // frequency 0 is invalid - if request.metadata.Frequency == 0 { - return NewError(ErrInvalidValue, "frequency cannot be 0 when creating a resource") + return nil, nil, NewError(ErrNotFound, "resource does not exist") } - - // make sure owner is set to something - if request.metadata.Owner == zeroAddr { - return NewError(ErrInvalidValue, "ownerAddr must be set to create a new metadata chunk") - } - - // create the meta chunk and store it in swarm - chunk, metaHash, err := request.metadata.newChunk() - if err != nil { - return err - } - if request.metaHash != nil && !bytes.Equal(request.metaHash, metaHash) || - request.rootAddr != nil && !bytes.Equal(request.rootAddr, chunk.Address()) { - return NewError(ErrInvalidValue, "metaHash in UpdateRequest does not match actual metadata") - } - - request.metaHash = metaHash - request.rootAddr = chunk.Address() - - h.chunkStore.Put(ctx, chunk) - log.Debug("new resource", "name", request.metadata.Name, "startTime", request.metadata.StartTime, "frequency", request.metadata.Frequency, "owner", request.metadata.Owner) - - // create the internal index for the resource and populate it with its metadata - rsrc := &resource{ - resourceUpdate: resourceUpdate{ - updateHeader: updateHeader{ - UpdateLookup: UpdateLookup{ - rootAddr: chunk.Address(), - }, - }, - }, - ResourceMetadata: request.metadata, - updated: time.Now(), - } - h.set(chunk.Address(), rsrc) - - return nil + return rsrc.lastKey, rsrc.data, nil } -// NewUpdateRequest prepares an UpdateRequest structure with all the necessary information to +// NewRequest prepares a Request structure with all the necessary information to // just add the desired data and sign it. // The resulting structure can then be signed and passed to Handler.Update to be verified and sent -func (h *Handler) NewUpdateRequest(ctx context.Context, rootAddr storage.Address) (updateRequest *Request, err error) { - - if rootAddr == nil { - return nil, NewError(ErrInvalidValue, "rootAddr cannot be nil") +func (h *Handler) NewRequest(ctx context.Context, view *View) (request *Request, err error) { + if view == nil { + return nil, NewError(ErrInvalidValue, "view cannot be nil") } - // Make sure we have a cache of the metadata chunk - rsrc, err := h.Load(ctx, rootAddr) - if err != nil { - return nil, err - } + now := TimestampProvider.Now().Time + request = new(Request) + request.Header.Version = ProtocolVersion - now := TimestampProvider.Now() + query := NewQueryLatest(view, lookup.NoClue) - updateRequest = new(Request) - updateRequest.period, err = getNextPeriod(rsrc.StartTime.Time, now.Time, rsrc.Frequency) + rsrc, err := h.Lookup(ctx, query) if err != nil { - return nil, err - } - - if _, err = h.lookup(rsrc, LookupLatestVersionInPeriod(rsrc.rootAddr, updateRequest.period)); err != nil { if err.(*Error).code != ErrNotFound { return nil, err } // not finding updates means that there is a network error - // or that the resource really does not have updates in this period. + // or that the resource really does not have updates } - updateRequest.multihash = rsrc.multihash - updateRequest.rootAddr = rsrc.rootAddr - updateRequest.metaHash = rsrc.metaHash - updateRequest.metadata = rsrc.ResourceMetadata + request.View = *view - // if we already have an update for this period then increment version - // resource object MUST be in sync for version to be correct, but we checked this earlier in the method already - if h.hasUpdate(rootAddr, updateRequest.period) { - updateRequest.version = rsrc.version + 1 + // if we already have an update, then find next epoch + if rsrc != nil { + request.Epoch = lookup.GetNextEpoch(rsrc.Epoch, now) } else { - updateRequest.version = 1 + request.Epoch = lookup.GetFirstEpoch(now) } - return updateRequest, nil + return request, nil } -// Lookup retrieves a specific or latest version of the resource update with metadata chunk at params.Root -// Lookup works differently depending on the configuration of `LookupParams` -// See the `LookupParams` documentation and helper functions: -// `LookupLatest`, `LookupLatestVersionInPeriod` and `LookupVersion` +// Lookup retrieves a specific or latest version of the resource +// Lookup works differently depending on the configuration of `ID` +// See the `ID` documentation and helper functions: +// `LookupLatest` and `LookupBefore` // When looking for the latest update, it starts at the next period after the current time. // upon failure tries the corresponding keys of each previous period until one is found // (or startTime is reached, in which case there are no updates). -func (h *Handler) Lookup(ctx context.Context, params *LookupParams) (*resource, error) { +func (h *Handler) Lookup(ctx context.Context, query *Query) (*cacheEntry, error) { - rsrc := h.get(params.rootAddr) - if rsrc == nil { - return nil, NewError(ErrNothingToReturn, "resource not loaded") + timeLimit := query.TimeLimit + if timeLimit == 0 { // if time limit is set to zero, the user wants to get the latest update + timeLimit = TimestampProvider.Now().Time } - return h.lookup(rsrc, params) -} -// LookupPrevious returns the resource before the one currently loaded in the resource cache -// This is useful where resource updates are used incrementally in contrast to -// merely replacing content. -// Requires a cached resource object to determine the current state of the resource. -func (h *Handler) LookupPrevious(ctx context.Context, params *LookupParams) (*resource, error) { - rsrc := h.get(params.rootAddr) - if rsrc == nil { - return nil, NewError(ErrNothingToReturn, "resource not loaded") - } - if !rsrc.isSynced() { - return nil, NewError(ErrNotSynced, "LookupPrevious requires synced resource.") - } else if rsrc.period == 0 { - return nil, NewError(ErrNothingToReturn, " not found") - } - var version, period uint32 - if rsrc.version > 1 { - version = rsrc.version - 1 - period = rsrc.period - } else if rsrc.period == 1 { - return nil, NewError(ErrNothingToReturn, "Current update is the oldest") - } else { - version = 0 - period = rsrc.period - 1 + if query.Hint == lookup.NoClue { // try to use our cache + entry := h.get(&query.View) + if entry != nil && entry.Epoch.Time <= timeLimit { // avoid bad hints + query.Hint = entry.Epoch + } } - return h.lookup(rsrc, NewLookupParams(rsrc.rootAddr, period, version, params.Limit)) -} -// base code for public lookup methods -func (h *Handler) lookup(rsrc *resource, params *LookupParams) (*resource, error) { - - lp := *params // we can't look for anything without a store if h.chunkStore == nil { return nil, NewError(ErrInit, "Call Handler.SetStore() before performing lookups") } - var specificperiod bool - if lp.period > 0 { - specificperiod = true - } else { - // get the current time and the next period - now := TimestampProvider.Now() - - var period uint32 - period, err := getNextPeriod(rsrc.StartTime.Time, now.Time, rsrc.Frequency) - if err != nil { - return nil, err - } - lp.period = period - } + var ul ID + ul.View = query.View + var readCount int - // start from the last possible period, and iterate previous ones - // (unless we want a specific period only) until we find a match. - // If we hit startTime we're out of options - var specificversion bool - if lp.version > 0 { - specificversion = true - } else { - lp.version = 1 - } + // Invoke the lookup engine. + // The callback will be called every time the lookup algorithm needs to guess + requestPtr, err := lookup.Lookup(timeLimit, query.Hint, func(epoch lookup.Epoch, now uint64) (interface{}, error) { + readCount++ + ul.Epoch = epoch + ctx, cancel := context.WithTimeout(ctx, defaultRetrieveTimeout) + defer cancel() - var hops uint32 - if lp.Limit == 0 { - lp.Limit = h.queryMaxPeriods - } - log.Trace("resource lookup", "period", lp.period, "version", lp.version, "limit", lp.Limit) - for lp.period > 0 { - if lp.Limit != 0 && hops > lp.Limit { - return nil, NewErrorf(ErrPeriodDepth, "Lookup exceeded max period hops (%d)", lp.Limit) + chunk, err := h.chunkStore.Get(ctx, ul.Addr()) + if err != nil { // TODO: check for catastrophic errors other than chunk not found + return nil, nil } - updateAddr := lp.UpdateAddr() - - ctx, cancel := context.WithTimeout(context.Background(), defaultRetrieveTimeout) - defer cancel() - chunk, err := h.chunkStore.Get(ctx, updateAddr) - if err == nil { - if specificversion { - return h.updateIndex(rsrc, chunk) - } - // check if we have versions > 1. If a version fails, the previous version is used and returned. - log.Trace("rsrc update version 1 found, checking for version updates", "period", lp.period, "updateAddr", updateAddr) - for { - newversion := lp.version + 1 - updateAddr := lp.UpdateAddr() - - ctx, cancel := context.WithTimeout(context.Background(), defaultRetrieveTimeout) - defer cancel() - - newchunk, err := h.chunkStore.Get(ctx, updateAddr) - if err != nil { - return h.updateIndex(rsrc, chunk) - } - chunk = newchunk - lp.version = newversion - log.Trace("version update found, checking next", "version", lp.version, "period", lp.period, "updateAddr", updateAddr) - } + var request Request + if err := request.fromChunk(chunk.Address(), chunk.Data()); err != nil { + return nil, nil } - if specificperiod { - break + if request.Time <= timeLimit { + return &request, nil } - log.Trace("rsrc update not found, checking previous period", "period", lp.period, "updateAddr", updateAddr) - lp.period-- - hops++ - } - return nil, NewError(ErrNotFound, "no updates found") -} - -// Load retrieves the Mutable Resource metadata chunk stored at rootAddr -// Upon retrieval it creates/updates the index entry for it with metadata corresponding to the chunk contents -func (h *Handler) Load(ctx context.Context, rootAddr storage.Address) (*resource, error) { - //TODO: Maybe add timeout to context, defaultRetrieveTimeout? - ctx, cancel := context.WithTimeout(ctx, defaultRetrieveTimeout) - defer cancel() - chunk, err := h.chunkStore.Get(ctx, rootAddr) + return nil, nil + }) if err != nil { - return nil, NewError(ErrNotFound, err.Error()) + return nil, err } - // create the index entry - rsrc := &resource{} + log.Info(fmt.Sprintf("Resource lookup finished in %d lookups", readCount)) - if err := rsrc.ResourceMetadata.binaryGet(chunk.Data()); err != nil { // Will fail if this is not really a metadata chunk - return nil, err + request, _ := requestPtr.(*Request) + if request == nil { + return nil, NewError(ErrNotFound, "no updates found") } + return h.updateCache(request) - rsrc.rootAddr, rsrc.metaHash = metadataHash(chunk.Data()) - if !bytes.Equal(rsrc.rootAddr, rootAddr) { - return nil, NewError(ErrCorruptData, "Corrupt metadata chunk") - } - h.set(rootAddr, rsrc) - log.Trace("resource index load", "rootkey", rootAddr, "name", rsrc.ResourceMetadata.Name, "starttime", rsrc.ResourceMetadata.StartTime, "frequency", rsrc.ResourceMetadata.Frequency) - return rsrc, nil } -// update mutable resource index map with specified content -func (h *Handler) updateIndex(rsrc *resource, chunk storage.Chunk) (*resource, error) { +// update mutable resource cache map with specified content +func (h *Handler) updateCache(request *Request) (*cacheEntry, error) { - // retrieve metadata from chunk data and check that it matches this mutable resource - var r SignedResourceUpdate - if err := r.fromChunk(chunk.Address(), chunk.Data()); err != nil { - return nil, err + updateAddr := request.Addr() + log.Trace("resource cache update", "topic", request.Topic.Hex(), "updatekey", updateAddr, "epoch time", request.Epoch.Time, "epoch level", request.Epoch.Level) + + rsrc := h.get(&request.View) + if rsrc == nil { + rsrc = &cacheEntry{} + h.set(&request.View, rsrc) } - log.Trace("resource index update", "name", rsrc.ResourceMetadata.Name, "updatekey", chunk.Address(), "period", r.period, "version", r.version) // update our rsrcs entry map - rsrc.lastKey = chunk.Address() - rsrc.period = r.period - rsrc.version = r.version - rsrc.updated = time.Now() - rsrc.data = make([]byte, len(r.data)) - rsrc.multihash = r.multihash - copy(rsrc.data, r.data) + rsrc.lastKey = updateAddr + rsrc.ResourceUpdate = request.ResourceUpdate rsrc.Reader = bytes.NewReader(rsrc.data) - log.Debug("resource synced", "name", rsrc.ResourceMetadata.Name, "updateAddr", chunk.Address(), "period", rsrc.period, "version", rsrc.version) - h.set(chunk.Address(), rsrc) return rsrc, nil } @@ -442,23 +248,16 @@ func (h *Handler) updateIndex(rsrc *resource, chunk storage.Chunk) (*resource, e // Note that a Mutable Resource update cannot span chunks, and thus has a MAX NET LENGTH 4096, INCLUDING update header data and signature. An error will be returned if the total length of the chunk payload will exceed this limit. // Update can only check if the caller is trying to overwrite the very last known version, otherwise it just puts the update // on the network. -func (h *Handler) Update(ctx context.Context, r *SignedResourceUpdate) (storage.Address, error) { - return h.update(ctx, r) -} - -// create and commit an update -func (h *Handler) update(ctx context.Context, r *SignedResourceUpdate) (updateAddr storage.Address, err error) { +func (h *Handler) Update(ctx context.Context, r *Request) (updateAddr storage.Address, err error) { // we can't update anything without a store if h.chunkStore == nil { return nil, NewError(ErrInit, "Call Handler.SetStore() before updating") } - rsrc := h.get(r.rootAddr) - if rsrc != nil && rsrc.period != 0 && rsrc.version != 0 && // This is the only cheap check we can do for sure - rsrc.period == r.period && rsrc.version >= r.version { // without having to lookup update chunks - - return nil, NewError(ErrInvalidValue, "A former update in this period is already known to exist") + rsrc := h.get(&r.View) + if rsrc != nil && rsrc.Epoch.Equals(r.Epoch) { // This is the only cheap check we can do for sure + return nil, NewError(ErrInvalidValue, "A former update in this epoch is already known to exist") } chunk, err := r.toChunk() // Serialize the update into a chunk. Fails if data is too big @@ -468,49 +267,32 @@ func (h *Handler) update(ctx context.Context, r *SignedResourceUpdate) (updateAd // send the chunk h.chunkStore.Put(ctx, chunk) - log.Trace("resource update", "updateAddr", r.updateAddr, "lastperiod", r.period, "version", r.version, "data", chunk.Data(), "multihash", r.multihash) - - // update our resources map entry if the new update is older than the one we have, if we have it. - if rsrc != nil && (r.period > rsrc.period || (rsrc.period == r.period && r.version > rsrc.version)) { - rsrc.period = r.period - rsrc.version = r.version + log.Trace("resource update", "updateAddr", r.idAddr, "epoch time", r.Epoch.Time, "epoch level", r.Epoch.Level, "data", chunk.Data()) + // update our resources map cache entry if the new update is older than the one we have, if we have it. + if rsrc != nil && r.Epoch.After(rsrc.Epoch) { + rsrc.Epoch = r.Epoch rsrc.data = make([]byte, len(r.data)) - rsrc.updated = time.Now() - rsrc.lastKey = r.updateAddr - rsrc.multihash = r.multihash + rsrc.lastKey = r.idAddr copy(rsrc.data, r.data) rsrc.Reader = bytes.NewReader(rsrc.data) } - return r.updateAddr, nil + + return r.idAddr, nil } -// Retrieves the resource index value for the given nameHash -func (h *Handler) get(rootAddr storage.Address) *resource { - if len(rootAddr) < storage.AddressLength { - log.Warn("Handler.get with invalid rootAddr") - return nil - } - hashKey := *(*uint64)(unsafe.Pointer(&rootAddr[0])) +// Retrieves the resource cache value for the given nameHash +func (h *Handler) get(view *View) *cacheEntry { + mapKey := view.mapKey() h.resourceLock.RLock() defer h.resourceLock.RUnlock() - rsrc := h.resources[hashKey] + rsrc := h.resources[mapKey] return rsrc } -// Sets the resource index value for the given nameHash -func (h *Handler) set(rootAddr storage.Address, rsrc *resource) { - if len(rootAddr) < storage.AddressLength { - log.Warn("Handler.set with invalid rootAddr") - return - } - hashKey := *(*uint64)(unsafe.Pointer(&rootAddr[0])) +// Sets the resource cache value for the given View +func (h *Handler) set(view *View, rsrc *cacheEntry) { + mapKey := view.mapKey() h.resourceLock.Lock() defer h.resourceLock.Unlock() - h.resources[hashKey] = rsrc -} - -// Checks if we already have an update on this resource, according to the value in the current state of the resource index -func (h *Handler) hasUpdate(rootAddr storage.Address, period uint32) bool { - rsrc := h.get(rootAddr) - return rsrc != nil && rsrc.period == period + h.resources[mapKey] = rsrc } |