diff options
Diffstat (limited to 'swarm/chunk')
-rw-r--r-- | swarm/chunk/chunk.go | 154 | ||||
-rw-r--r-- | swarm/chunk/tag.go | 218 | ||||
-rw-r--r-- | swarm/chunk/tag_test.go | 273 | ||||
-rw-r--r-- | swarm/chunk/tags.go | 96 | ||||
-rw-r--r-- | swarm/chunk/tags_test.go | 48 |
5 files changed, 788 insertions, 1 deletions
diff --git a/swarm/chunk/chunk.go b/swarm/chunk/chunk.go index 7540af8ce..c44292bb9 100644 --- a/swarm/chunk/chunk.go +++ b/swarm/chunk/chunk.go @@ -1,6 +1,23 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + package chunk import ( + "context" "errors" "fmt" @@ -28,7 +45,7 @@ type chunk struct { sdata []byte } -func NewChunk(addr Address, data []byte) *chunk { +func NewChunk(addr Address, data []byte) Chunk { return &chunk{ addr: addr, sdata: data, @@ -107,3 +124,138 @@ func Proximity(one, other []byte) (ret int) { } return MaxPO } + +// ModeGet enumerates different Getter modes. +type ModeGet int + +func (m ModeGet) String() string { + switch m { + case ModeGetRequest: + return "Request" + case ModeGetSync: + return "Sync" + case ModeGetLookup: + return "Lookup" + default: + return "Unknown" + } +} + +// Getter modes. +const ( + // ModeGetRequest: when accessed for retrieval + ModeGetRequest ModeGet = iota + // ModeGetSync: when accessed for syncing or proof of custody request + ModeGetSync + // ModeGetLookup: when accessed to lookup a a chunk in feeds or other places + ModeGetLookup +) + +// ModePut enumerates different Putter modes. +type ModePut int + +func (m ModePut) String() string { + switch m { + case ModePutRequest: + return "Request" + case ModePutSync: + return "Sync" + case ModePutUpload: + return "Upload" + default: + return "Unknown" + } +} + +// Putter modes. +const ( + // ModePutRequest: when a chunk is received as a result of retrieve request and delivery + ModePutRequest ModePut = iota + // ModePutSync: when a chunk is received via syncing + ModePutSync + // ModePutUpload: when a chunk is created by local upload + ModePutUpload +) + +// ModeSet enumerates different Setter modes. +type ModeSet int + +func (m ModeSet) String() string { + switch m { + case ModeSetAccess: + return "Access" + case ModeSetSync: + return "Sync" + case ModeSetRemove: + return "Remove" + default: + return "Unknown" + } +} + +// Setter modes. +const ( + // ModeSetAccess: when an update request is received for a chunk or chunk is retrieved for delivery + ModeSetAccess ModeSet = iota + // ModeSetSync: when a chunk is added to a pull sync batch or when a push sync receipt is received + ModeSetSync + // ModeSetRemove: when a chunk is removed + ModeSetRemove +) + +// Descriptor holds information required for Pull syncing. This struct +// is provided by subscribing to pull index. +type Descriptor struct { + Address Address + BinID uint64 +} + +func (d *Descriptor) String() string { + if d == nil { + return "" + } + return fmt.Sprintf("%s bin id %v", d.Address.Hex(), d.BinID) +} + +type Store interface { + Get(ctx context.Context, mode ModeGet, addr Address) (ch Chunk, err error) + Put(ctx context.Context, mode ModePut, ch Chunk) (exists bool, err error) + Has(ctx context.Context, addr Address) (yes bool, err error) + Set(ctx context.Context, mode ModeSet, addr Address) (err error) + LastPullSubscriptionBinID(bin uint8) (id uint64, err error) + SubscribePull(ctx context.Context, bin uint8, since, until uint64) (c <-chan Descriptor, stop func()) + Close() (err error) +} + +// Validator validates a chunk. +type Validator interface { + Validate(ch Chunk) bool +} + +// ValidatorStore encapsulates Store by decorting the Put method +// with validators check. +type ValidatorStore struct { + Store + validators []Validator +} + +// NewValidatorStore returns a new ValidatorStore which uses +// provided validators to validate chunks on Put. +func NewValidatorStore(store Store, validators ...Validator) (s *ValidatorStore) { + return &ValidatorStore{ + Store: store, + validators: validators, + } +} + +// Put overrides Store put method with validators check. If one of the validators +// return true, the chunk is considered valid and Store Put method is called. +// If all validators return false, ErrChunkInvalid is returned. +func (s *ValidatorStore) Put(ctx context.Context, mode ModePut, ch Chunk) (exists bool, err error) { + for _, v := range s.validators { + if v.Validate(ch) { + return s.Store.Put(ctx, mode, ch) + } + } + return false, ErrChunkInvalid +} diff --git a/swarm/chunk/tag.go b/swarm/chunk/tag.go new file mode 100644 index 000000000..ee700d22b --- /dev/null +++ b/swarm/chunk/tag.go @@ -0,0 +1,218 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package chunk + +import ( + "encoding/binary" + "errors" + "sync/atomic" + "time" +) + +var ( + errExists = errors.New("already exists") + errNA = errors.New("not available yet") + errNoETA = errors.New("unable to calculate ETA") + errTagNotFound = errors.New("tag not found") +) + +// State is the enum type for chunk states +type State = uint32 + +const ( + StateSplit State = iota // chunk has been processed by filehasher/swarm safe call + StateStored // chunk stored locally + StateSeen // chunk previously seen + StateSent // chunk sent to neighbourhood + StateSynced // proof is received; chunk removed from sync db; chunk is available everywhere +) + +// Tag represents info on the status of new chunks +type Tag struct { + Uid uint32 // a unique identifier for this tag + Name string // a name tag for this tag + Address Address // the associated swarm hash for this tag + total int64 // total chunks belonging to a tag + split int64 // number of chunks already processed by splitter for hashing + seen int64 // number of chunks already seen + stored int64 // number of chunks already stored locally + sent int64 // number of chunks sent for push syncing + synced int64 // number of chunks synced with proof + startedAt time.Time // tag started to calculate ETA +} + +// New creates a new tag, stores it by the name and returns it +// it returns an error if the tag with this name already exists +func NewTag(uid uint32, s string, total int64) *Tag { + t := &Tag{ + Uid: uid, + Name: s, + startedAt: time.Now(), + total: total, + } + return t +} + +// Inc increments the count for a state +func (t *Tag) Inc(state State) { + var v *int64 + switch state { + case StateSplit: + v = &t.split + case StateStored: + v = &t.stored + case StateSeen: + v = &t.seen + case StateSent: + v = &t.sent + case StateSynced: + v = &t.synced + } + atomic.AddInt64(v, 1) +} + +// Get returns the count for a state on a tag +func (t *Tag) Get(state State) int64 { + var v *int64 + switch state { + case StateSplit: + v = &t.split + case StateStored: + v = &t.stored + case StateSeen: + v = &t.seen + case StateSent: + v = &t.sent + case StateSynced: + v = &t.synced + } + return atomic.LoadInt64(v) +} + +// GetTotal returns the total count +func (t *Tag) Total() int64 { + return atomic.LoadInt64(&t.total) +} + +// DoneSplit sets total count to SPLIT count and sets the associated swarm hash for this tag +// is meant to be called when splitter finishes for input streams of unknown size +func (t *Tag) DoneSplit(address Address) int64 { + total := atomic.LoadInt64(&t.split) + atomic.StoreInt64(&t.total, total) + t.Address = address + return total +} + +// Status returns the value of state and the total count +func (t *Tag) Status(state State) (int64, int64, error) { + count, seen, total := t.Get(state), atomic.LoadInt64(&t.seen), atomic.LoadInt64(&t.total) + if total == 0 { + return count, total, errNA + } + switch state { + case StateSplit, StateStored, StateSeen: + return count, total, nil + case StateSent, StateSynced: + stored := atomic.LoadInt64(&t.stored) + if stored < total { + return count, total - seen, errNA + } + return count, total - seen, nil + } + return count, total, errNA +} + +// ETA returns the time of completion estimated based on time passed and rate of completion +func (t *Tag) ETA(state State) (time.Time, error) { + cnt, total, err := t.Status(state) + if err != nil { + return time.Time{}, err + } + if cnt == 0 || total == 0 { + return time.Time{}, errNoETA + } + diff := time.Since(t.startedAt) + dur := time.Duration(total) * diff / time.Duration(cnt) + return t.startedAt.Add(dur), nil +} + +// MarshalBinary marshals the tag into a byte slice +func (tag *Tag) MarshalBinary() (data []byte, err error) { + buffer := make([]byte, 4) + binary.BigEndian.PutUint32(buffer, tag.Uid) + encodeInt64Append(&buffer, tag.total) + encodeInt64Append(&buffer, tag.split) + encodeInt64Append(&buffer, tag.seen) + encodeInt64Append(&buffer, tag.stored) + encodeInt64Append(&buffer, tag.sent) + encodeInt64Append(&buffer, tag.synced) + + intBuffer := make([]byte, 8) + + n := binary.PutVarint(intBuffer, tag.startedAt.Unix()) + buffer = append(buffer, intBuffer[:n]...) + + n = binary.PutVarint(intBuffer, int64(len(tag.Address))) + buffer = append(buffer, intBuffer[:n]...) + + buffer = append(buffer, tag.Address[:]...) + + buffer = append(buffer, []byte(tag.Name)...) + + return buffer, nil +} + +// UnmarshalBinary unmarshals a byte slice into a tag +func (tag *Tag) UnmarshalBinary(buffer []byte) error { + if len(buffer) < 13 { + return errors.New("buffer too short") + } + tag.Uid = binary.BigEndian.Uint32(buffer) + buffer = buffer[4:] + + tag.total = decodeInt64Splice(&buffer) + tag.split = decodeInt64Splice(&buffer) + tag.seen = decodeInt64Splice(&buffer) + tag.stored = decodeInt64Splice(&buffer) + tag.sent = decodeInt64Splice(&buffer) + tag.synced = decodeInt64Splice(&buffer) + + t, n := binary.Varint(buffer) + tag.startedAt = time.Unix(t, 0) + buffer = buffer[n:] + + t, n = binary.Varint(buffer) + buffer = buffer[n:] + if t > 0 { + tag.Address = buffer[:t] + } + tag.Name = string(buffer[t:]) + + return nil +} + +func encodeInt64Append(buffer *[]byte, val int64) { + intBuffer := make([]byte, 8) + n := binary.PutVarint(intBuffer, val) + *buffer = append(*buffer, intBuffer[:n]...) +} + +func decodeInt64Splice(buffer *[]byte) int64 { + val, n := binary.Varint((*buffer)) + *buffer = (*buffer)[n:] + return val +} diff --git a/swarm/chunk/tag_test.go b/swarm/chunk/tag_test.go new file mode 100644 index 000000000..e6acfb185 --- /dev/null +++ b/swarm/chunk/tag_test.go @@ -0,0 +1,273 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package chunk + +import ( + "bytes" + "sync" + "testing" + "time" +) + +var ( + allStates = []State{StateSplit, StateStored, StateSeen, StateSent, StateSynced} +) + +// TestTagSingleIncrements tests if Inc increments the tag state value +func TestTagSingleIncrements(t *testing.T) { + tg := &Tag{total: 10} + + tc := []struct { + state uint32 + inc int + expcount int64 + exptotal int64 + }{ + {state: StateSplit, inc: 10, expcount: 10, exptotal: 10}, + {state: StateStored, inc: 9, expcount: 9, exptotal: 9}, + {state: StateSeen, inc: 1, expcount: 1, exptotal: 10}, + {state: StateSent, inc: 9, expcount: 9, exptotal: 9}, + {state: StateSynced, inc: 9, expcount: 9, exptotal: 9}, + } + + for _, tc := range tc { + for i := 0; i < tc.inc; i++ { + tg.Inc(tc.state) + } + } + + for _, tc := range tc { + if tg.Get(tc.state) != tc.expcount { + t.Fatalf("not incremented") + } + } +} + +// TestTagStatus is a unit test to cover Tag.Status method functionality +func TestTagStatus(t *testing.T) { + tg := &Tag{total: 10} + tg.Inc(StateSeen) + tg.Inc(StateSent) + tg.Inc(StateSynced) + + for i := 0; i < 10; i++ { + tg.Inc(StateSplit) + tg.Inc(StateStored) + } + for _, v := range []struct { + state State + expVal int64 + expTotal int64 + }{ + {state: StateStored, expVal: 10, expTotal: 10}, + {state: StateSplit, expVal: 10, expTotal: 10}, + {state: StateSeen, expVal: 1, expTotal: 10}, + {state: StateSent, expVal: 1, expTotal: 9}, + {state: StateSynced, expVal: 1, expTotal: 9}, + } { + val, total, err := tg.Status(v.state) + if err != nil { + t.Fatal(err) + } + if val != v.expVal { + t.Fatalf("should be %d, got %d", v.expVal, val) + } + if total != v.expTotal { + t.Fatalf("expected total to be %d, got %d", v.expTotal, total) + } + } +} + +// tests ETA is precise +func TestTagETA(t *testing.T) { + now := time.Now() + maxDiff := 100000 // 100 microsecond + tg := &Tag{total: 10, startedAt: now} + time.Sleep(100 * time.Millisecond) + tg.Inc(StateSplit) + eta, err := tg.ETA(StateSplit) + if err != nil { + t.Fatal(err) + } + diff := time.Until(eta) - 9*time.Since(now) + if int(diff) > maxDiff { + t.Fatalf("ETA is not precise, got diff %v > .1ms", diff) + } +} + +// TestTagConcurrentIncrements tests Inc calls concurrently +func TestTagConcurrentIncrements(t *testing.T) { + tg := &Tag{} + n := 1000 + wg := sync.WaitGroup{} + wg.Add(5 * n) + for _, f := range allStates { + go func(f State) { + for j := 0; j < n; j++ { + go func() { + tg.Inc(f) + wg.Done() + }() + } + }(f) + } + wg.Wait() + for _, f := range allStates { + v := tg.Get(f) + if v != int64(n) { + t.Fatalf("expected state %v to be %v, got %v", f, n, v) + } + } +} + +// TestTagsMultipleConcurrentIncrements tests Inc calls concurrently +func TestTagsMultipleConcurrentIncrementsSyncMap(t *testing.T) { + ts := NewTags() + n := 100 + wg := sync.WaitGroup{} + wg.Add(10 * 5 * n) + for i := 0; i < 10; i++ { + s := string([]byte{uint8(i)}) + tag, err := ts.New(s, int64(n)) + if err != nil { + t.Fatal(err) + } + for _, f := range allStates { + go func(tag *Tag, f State) { + for j := 0; j < n; j++ { + go func() { + tag.Inc(f) + wg.Done() + }() + } + }(tag, f) + } + } + wg.Wait() + i := 0 + ts.Range(func(k, v interface{}) bool { + i++ + uid := k.(uint32) + for _, f := range allStates { + tag, err := ts.Get(uid) + if err != nil { + t.Fatal(err) + } + stateVal := tag.Get(f) + if stateVal != int64(n) { + t.Fatalf("expected tag %v state %v to be %v, got %v", uid, f, n, v) + } + } + return true + + }) + if i != 10 { + t.Fatal("not enough tagz") + } +} + +// TestMarshallingWithAddr tests that marshalling and unmarshalling is done correctly when the +// tag Address (byte slice) contains some arbitrary value +func TestMarshallingWithAddr(t *testing.T) { + tg := NewTag(111, "test/tag", 10) + tg.Address = []byte{0, 1, 2, 3, 4, 5, 6} + + for _, f := range allStates { + tg.Inc(f) + } + + b, err := tg.MarshalBinary() + if err != nil { + t.Fatal(err) + } + + unmarshalledTag := &Tag{} + err = unmarshalledTag.UnmarshalBinary(b) + if err != nil { + t.Fatal(err) + } + + if unmarshalledTag.Uid != tg.Uid { + t.Fatalf("tag uids not equal. want %d got %d", tg.Uid, unmarshalledTag.Uid) + } + + if unmarshalledTag.Name != tg.Name { + t.Fatalf("tag names not equal. want %s got %s", tg.Name, unmarshalledTag.Name) + } + + for _, state := range allStates { + uv, tv := unmarshalledTag.Get(state), tg.Get(state) + if uv != tv { + t.Fatalf("state %d inconsistent. expected %d to equal %d", state, uv, tv) + } + } + + if unmarshalledTag.Total() != tg.Total() { + t.Fatalf("tag names not equal. want %d got %d", tg.Total(), unmarshalledTag.Total()) + } + + if len(unmarshalledTag.Address) != len(tg.Address) { + t.Fatalf("tag addresses length mismatch, want %d, got %d", len(tg.Address), len(unmarshalledTag.Address)) + } + + if !bytes.Equal(unmarshalledTag.Address, tg.Address) { + t.Fatalf("expected tag address to be %v got %v", unmarshalledTag.Address, tg.Address) + } +} + +// TestMarshallingNoAddress tests that marshalling and unmarshalling is done correctly +// when the tag Address (byte slice) is empty in this case +func TestMarshallingNoAddr(t *testing.T) { + tg := NewTag(111, "test/tag", 10) + for _, f := range allStates { + tg.Inc(f) + } + + b, err := tg.MarshalBinary() + if err != nil { + t.Fatal(err) + } + + unmarshalledTag := &Tag{} + err = unmarshalledTag.UnmarshalBinary(b) + if err != nil { + t.Fatal(err) + } + + if unmarshalledTag.Uid != tg.Uid { + t.Fatalf("tag uids not equal. want %d got %d", tg.Uid, unmarshalledTag.Uid) + } + + if unmarshalledTag.Name != tg.Name { + t.Fatalf("tag names not equal. want %s got %s", tg.Name, unmarshalledTag.Name) + } + + for _, state := range allStates { + uv, tv := unmarshalledTag.Get(state), tg.Get(state) + if uv != tv { + t.Fatalf("state %d inconsistent. expected %d to equal %d", state, uv, tv) + } + } + + if unmarshalledTag.Total() != tg.Total() { + t.Fatalf("tag names not equal. want %d got %d", tg.Total(), unmarshalledTag.Total()) + } + + if len(unmarshalledTag.Address) != len(tg.Address) { + t.Fatalf("expected tag addresses to be equal length") + } +} diff --git a/swarm/chunk/tags.go b/swarm/chunk/tags.go new file mode 100644 index 000000000..435f5d706 --- /dev/null +++ b/swarm/chunk/tags.go @@ -0,0 +1,96 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package chunk + +import ( + "context" + "errors" + "math/rand" + "sync" + "time" + + "github.com/ethereum/go-ethereum/swarm/sctx" +) + +// Tags hold tag information indexed by a unique random uint32 +type Tags struct { + tags *sync.Map + rng *rand.Rand +} + +// NewTags creates a tags object +func NewTags() *Tags { + return &Tags{ + tags: &sync.Map{}, + rng: rand.New(rand.NewSource(time.Now().Unix())), + } +} + +// New creates a new tag, stores it by the name and returns it +// it returns an error if the tag with this name already exists +func (ts *Tags) New(s string, total int64) (*Tag, error) { + t := &Tag{ + Uid: ts.rng.Uint32(), + Name: s, + startedAt: time.Now(), + total: total, + } + if _, loaded := ts.tags.LoadOrStore(t.Uid, t); loaded { + return nil, errExists + } + return t, nil +} + +// All returns all existing tags in Tags' sync.Map +// Note that tags are returned in no particular order +func (ts *Tags) All() (t []*Tag) { + ts.tags.Range(func(k, v interface{}) bool { + t = append(t, v.(*Tag)) + + return true + }) + + return t +} + +// Get returns the undelying tag for the uid or an error if not found +func (ts *Tags) Get(uid uint32) (*Tag, error) { + t, ok := ts.tags.Load(uid) + if !ok { + return nil, errors.New("tag not found") + } + return t.(*Tag), nil +} + +// GetFromContext gets a tag from the tag uid stored in the context +func (ts *Tags) GetFromContext(ctx context.Context) (*Tag, error) { + uid := sctx.GetTag(ctx) + t, ok := ts.tags.Load(uid) + if !ok { + return nil, errTagNotFound + } + return t.(*Tag), nil +} + +// Range exposes sync.Map's iterator +func (ts *Tags) Range(fn func(k, v interface{}) bool) { + ts.tags.Range(fn) +} + +func (ts *Tags) Delete(k interface{}) { + ts.tags.Delete(k) +} diff --git a/swarm/chunk/tags_test.go b/swarm/chunk/tags_test.go new file mode 100644 index 000000000..f818c4c5c --- /dev/null +++ b/swarm/chunk/tags_test.go @@ -0,0 +1,48 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package chunk + +import "testing" + +func TestAll(t *testing.T) { + ts := NewTags() + + ts.New("1", 1) + ts.New("2", 1) + + all := ts.All() + + if len(all) != 2 { + t.Fatalf("expected length to be 2 got %d", len(all)) + } + + if n := all[0].Total(); n != 1 { + t.Fatalf("expected tag 0 total to be 1 got %d", n) + } + + if n := all[1].Total(); n != 1 { + t.Fatalf("expected tag 1 total to be 1 got %d", n) + } + + ts.New("3", 1) + all = ts.All() + + if len(all) != 3 { + t.Fatalf("expected length to be 3 got %d", len(all)) + } + +} |