From 996755c4a832afce8629a771cab8879c88c98355 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jano=C5=A1=20Gulja=C5=A1?= Date: Wed, 10 Apr 2019 16:50:58 +0200 Subject: cmd/swarm, swarm: LocalStore storage integration --- swarm/chunk/chunk.go | 105 ++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 104 insertions(+), 1 deletion(-) (limited to 'swarm/chunk') diff --git a/swarm/chunk/chunk.go b/swarm/chunk/chunk.go index 7540af8ce..c8551814c 100644 --- a/swarm/chunk/chunk.go +++ b/swarm/chunk/chunk.go @@ -1,6 +1,7 @@ package chunk import ( + "context" "errors" "fmt" @@ -28,7 +29,7 @@ type chunk struct { sdata []byte } -func NewChunk(addr Address, data []byte) *chunk { +func NewChunk(addr Address, data []byte) Chunk { return &chunk{ addr: addr, sdata: data, @@ -107,3 +108,105 @@ func Proximity(one, other []byte) (ret int) { } return MaxPO } + +// ModeGet enumerates different Getter modes. +type ModeGet int + +// Getter modes. +const ( + // ModeGetRequest: when accessed for retrieval + ModeGetRequest ModeGet = iota + // ModeGetSync: when accessed for syncing or proof of custody request + ModeGetSync + // ModeGetLookup: when accessed to lookup a a chunk in feeds or other places + ModeGetLookup +) + +// ModePut enumerates different Putter modes. +type ModePut int + +// Putter modes. +const ( + // ModePutRequest: when a chunk is received as a result of retrieve request and delivery + ModePutRequest ModePut = iota + // ModePutSync: when a chunk is received via syncing + ModePutSync + // ModePutUpload: when a chunk is created by local upload + ModePutUpload +) + +// ModeSet enumerates different Setter modes. +type ModeSet int + +// Setter modes. +const ( + // ModeSetAccess: when an update request is received for a chunk or chunk is retrieved for delivery + ModeSetAccess ModeSet = iota + // ModeSetSync: when push sync receipt is received + ModeSetSync + // ModeSetRemove: when a chunk is removed + ModeSetRemove +) + +// Descriptor holds information required for Pull syncing. This struct +// is provided by subscribing to pull index. +type Descriptor struct { + Address Address + BinID uint64 +} + +func (d *Descriptor) String() string { + if d == nil { + return "" + } + return fmt.Sprintf("%s bin id %v", d.Address.Hex(), d.BinID) +} + +type Store interface { + Get(ctx context.Context, mode ModeGet, addr Address) (ch Chunk, err error) + Put(ctx context.Context, mode ModePut, ch Chunk) (exists bool, err error) + Has(ctx context.Context, addr Address) (yes bool, err error) + Set(ctx context.Context, mode ModeSet, addr Address) (err error) + LastPullSubscriptionBinID(bin uint8) (id uint64, err error) + SubscribePull(ctx context.Context, bin uint8, since, until uint64) (c <-chan Descriptor, stop func()) + Close() (err error) +} + +// FetchStore is a Store which supports syncing +type FetchStore interface { + Store + FetchFunc(ctx context.Context, addr Address) func(context.Context) error +} + +// Validator validates a chunk. +type Validator interface { + Validate(ch Chunk) bool +} + +// ValidatorStore encapsulates Store by decorting the Put method +// with validators check. +type ValidatorStore struct { + Store + validators []Validator +} + +// NewValidatorStore returns a new ValidatorStore which uses +// provided validators to validate chunks on Put. +func NewValidatorStore(store Store, validators ...Validator) (s *ValidatorStore) { + return &ValidatorStore{ + Store: store, + validators: validators, + } +} + +// Put overrides Store put method with validators check. If one of the validators +// return true, the chunk is considered valid and Store Put method is called. +// If all validators return false, ErrChunkInvalid is returned. +func (s *ValidatorStore) Put(ctx context.Context, mode ModePut, ch Chunk) (exists bool, err error) { + for _, v := range s.validators { + if v.Validate(ch) { + return s.Store.Put(ctx, mode, ch) + } + } + return false, ErrChunkInvalid +} -- cgit v1.2.3 From 993b145f25845e50e8af41ffb1116eaee381d693 Mon Sep 17 00:00:00 2001 From: Anton Evangelatov Date: Thu, 11 Apr 2019 10:26:52 +0200 Subject: swarm/storage/localstore: fix export db.Put signature cmd/swarm/swarm-smoke: improve smoke tests (#1337) swarm/network: remove dead code (#1339) swarm/network: remove FetchStore and SyncChunkStore in favor of NetStore (#1342) --- swarm/chunk/chunk.go | 6 ------ 1 file changed, 6 deletions(-) (limited to 'swarm/chunk') diff --git a/swarm/chunk/chunk.go b/swarm/chunk/chunk.go index c8551814c..2455904f3 100644 --- a/swarm/chunk/chunk.go +++ b/swarm/chunk/chunk.go @@ -172,12 +172,6 @@ type Store interface { Close() (err error) } -// FetchStore is a Store which supports syncing -type FetchStore interface { - Store - FetchFunc(ctx context.Context, addr Address) func(context.Context) error -} - // Validator validates a chunk. type Validator interface { Validate(ch Chunk) bool -- cgit v1.2.3 From c1213bd00c2a84a9dfc218e44cc2f85902f91128 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jano=C5=A1=20Gulja=C5=A1?= Date: Thu, 25 Apr 2019 10:22:57 +0200 Subject: swarm: LocalStore metrics * swarm/shed: remove metrics fields from DB struct * swarm/schunk: add String methods to modes * swarm/storage/localstore: add metrics and traces * swarm/chunk: unknown modes without spaces in String methods * swarm/storage/localstore: remove bin number from pull subscription metrics * swarm/storage/localstore: add resetting time metrics and code improvements --- swarm/chunk/chunk.go | 39 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) (limited to 'swarm/chunk') diff --git a/swarm/chunk/chunk.go b/swarm/chunk/chunk.go index 2455904f3..9ae59c95f 100644 --- a/swarm/chunk/chunk.go +++ b/swarm/chunk/chunk.go @@ -112,6 +112,19 @@ func Proximity(one, other []byte) (ret int) { // ModeGet enumerates different Getter modes. type ModeGet int +func (m ModeGet) String() string { + switch m { + case ModeGetRequest: + return "Request" + case ModeGetSync: + return "Sync" + case ModeGetLookup: + return "Lookup" + default: + return "Unknown" + } +} + // Getter modes. const ( // ModeGetRequest: when accessed for retrieval @@ -125,6 +138,19 @@ const ( // ModePut enumerates different Putter modes. type ModePut int +func (m ModePut) String() string { + switch m { + case ModePutRequest: + return "Request" + case ModePutSync: + return "Sync" + case ModePutUpload: + return "Upload" + default: + return "Unknown" + } +} + // Putter modes. const ( // ModePutRequest: when a chunk is received as a result of retrieve request and delivery @@ -138,6 +164,19 @@ const ( // ModeSet enumerates different Setter modes. type ModeSet int +func (m ModeSet) String() string { + switch m { + case ModeSetAccess: + return "Access" + case ModeSetSync: + return "Sync" + case ModeSetRemove: + return "Remove" + default: + return "Unknown" + } +} + // Setter modes. const ( // ModeSetAccess: when an update request is received for a chunk or chunk is retrieved for delivery -- cgit v1.2.3 From 12240baf61e251a80e3759dd22ca5847dcb8d807 Mon Sep 17 00:00:00 2001 From: Elad Date: Tue, 30 Apr 2019 13:10:57 +0900 Subject: swarm/chunk: add tags data type * swarm/chunk: add tags backend to chunk package --- swarm/chunk/chunk.go | 16 +++ swarm/chunk/tag.go | 218 ++++++++++++++++++++++++++++++++++++++ swarm/chunk/tag_test.go | 273 ++++++++++++++++++++++++++++++++++++++++++++++++ swarm/chunk/tags.go | 80 ++++++++++++++ 4 files changed, 587 insertions(+) create mode 100644 swarm/chunk/tag.go create mode 100644 swarm/chunk/tag_test.go create mode 100644 swarm/chunk/tags.go (limited to 'swarm/chunk') diff --git a/swarm/chunk/chunk.go b/swarm/chunk/chunk.go index 9ae59c95f..17f49348b 100644 --- a/swarm/chunk/chunk.go +++ b/swarm/chunk/chunk.go @@ -1,3 +1,19 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + package chunk import ( diff --git a/swarm/chunk/tag.go b/swarm/chunk/tag.go new file mode 100644 index 000000000..359ac11ac --- /dev/null +++ b/swarm/chunk/tag.go @@ -0,0 +1,218 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package chunk + +import ( + "encoding/binary" + "errors" + "sync/atomic" + "time" +) + +var ( + errExists = errors.New("already exists") + errNA = errors.New("not available yet") + errNoETA = errors.New("unable to calculate ETA") + errTagNotFound = errors.New("tag not found") +) + +// State is the enum type for chunk states +type State = uint32 + +const ( + SPLIT State = iota // chunk has been processed by filehasher/swarm safe call + STORED // chunk stored locally + SEEN // chunk previously seen + SENT // chunk sent to neighbourhood + SYNCED // proof is received; chunk removed from sync db; chunk is available everywhere +) + +// Tag represents info on the status of new chunks +type Tag struct { + Uid uint32 // a unique identifier for this tag + Name string // a name tag for this tag + Address Address // the associated swarm hash for this tag + total uint32 // total chunks belonging to a tag + split uint32 // number of chunks already processed by splitter for hashing + seen uint32 // number of chunks already seen + stored uint32 // number of chunks already stored locally + sent uint32 // number of chunks sent for push syncing + synced uint32 // number of chunks synced with proof + startedAt time.Time // tag started to calculate ETA +} + +// New creates a new tag, stores it by the name and returns it +// it returns an error if the tag with this name already exists +func NewTag(uid uint32, s string, total uint32) *Tag { + t := &Tag{ + Uid: uid, + Name: s, + startedAt: time.Now(), + total: total, + } + return t +} + +// Inc increments the count for a state +func (t *Tag) Inc(state State) { + var v *uint32 + switch state { + case SPLIT: + v = &t.split + case STORED: + v = &t.stored + case SEEN: + v = &t.seen + case SENT: + v = &t.sent + case SYNCED: + v = &t.synced + } + atomic.AddUint32(v, 1) +} + +// Get returns the count for a state on a tag +func (t *Tag) Get(state State) int { + var v *uint32 + switch state { + case SPLIT: + v = &t.split + case STORED: + v = &t.stored + case SEEN: + v = &t.seen + case SENT: + v = &t.sent + case SYNCED: + v = &t.synced + } + return int(atomic.LoadUint32(v)) +} + +// GetTotal returns the total count +func (t *Tag) Total() int { + return int(atomic.LoadUint32(&t.total)) +} + +// DoneSplit sets total count to SPLIT count and sets the associated swarm hash for this tag +// is meant to be called when splitter finishes for input streams of unknown size +func (t *Tag) DoneSplit(address Address) int { + total := atomic.LoadUint32(&t.split) + atomic.StoreUint32(&t.total, total) + t.Address = address + return int(total) +} + +// Status returns the value of state and the total count +func (t *Tag) Status(state State) (int, int, error) { + count, seen, total := t.Get(state), int(atomic.LoadUint32(&t.seen)), int(atomic.LoadUint32(&t.total)) + if total == 0 { + return count, total, errNA + } + switch state { + case SPLIT, STORED, SEEN: + return count, total, nil + case SENT, SYNCED: + stored := int(atomic.LoadUint32(&t.stored)) + if stored < total { + return count, total - seen, errNA + } + return count, total - seen, nil + } + return count, total, errNA +} + +// ETA returns the time of completion estimated based on time passed and rate of completion +func (t *Tag) ETA(state State) (time.Time, error) { + cnt, total, err := t.Status(state) + if err != nil { + return time.Time{}, err + } + if cnt == 0 || total == 0 { + return time.Time{}, errNoETA + } + diff := time.Since(t.startedAt) + dur := time.Duration(total) * diff / time.Duration(cnt) + return t.startedAt.Add(dur), nil +} + +// MarshalBinary marshals the tag into a byte slice +func (tag *Tag) MarshalBinary() (data []byte, err error) { + buffer := make([]byte, 0) + encodeUint32Append(&buffer, tag.Uid) + encodeUint32Append(&buffer, tag.total) + encodeUint32Append(&buffer, tag.split) + encodeUint32Append(&buffer, tag.seen) + encodeUint32Append(&buffer, tag.stored) + encodeUint32Append(&buffer, tag.sent) + encodeUint32Append(&buffer, tag.synced) + + intBuffer := make([]byte, 8) + + n := binary.PutVarint(intBuffer, tag.startedAt.Unix()) + buffer = append(buffer, intBuffer[:n]...) + + n = binary.PutVarint(intBuffer, int64(len(tag.Address))) + buffer = append(buffer, intBuffer[:n]...) + + buffer = append(buffer, tag.Address[:]...) + + buffer = append(buffer, []byte(tag.Name)...) + + return buffer, nil +} + +// UnmarshalBinary unmarshals a byte slice into a tag +func (tag *Tag) UnmarshalBinary(buffer []byte) error { + if len(buffer) < 13 { + return errors.New("buffer too short") + } + + tag.Uid = decodeUint32Splice(&buffer) + tag.total = decodeUint32Splice(&buffer) + tag.split = decodeUint32Splice(&buffer) + tag.seen = decodeUint32Splice(&buffer) + tag.stored = decodeUint32Splice(&buffer) + tag.sent = decodeUint32Splice(&buffer) + tag.synced = decodeUint32Splice(&buffer) + + t, n := binary.Varint(buffer) + tag.startedAt = time.Unix(t, 0) + buffer = buffer[n:] + + t, n = binary.Varint(buffer) + buffer = buffer[n:] + if t > 0 { + tag.Address = buffer[:t] + } + tag.Name = string(buffer[t:]) + + return nil + +} + +func encodeUint32Append(buffer *[]byte, val uint32) { + intBuffer := make([]byte, 4) + binary.BigEndian.PutUint32(intBuffer, val) + *buffer = append(*buffer, intBuffer...) +} + +func decodeUint32Splice(buffer *[]byte) uint32 { + val := binary.BigEndian.Uint32((*buffer)[:4]) + *buffer = (*buffer)[4:] + return val +} diff --git a/swarm/chunk/tag_test.go b/swarm/chunk/tag_test.go new file mode 100644 index 000000000..b3f3be2ca --- /dev/null +++ b/swarm/chunk/tag_test.go @@ -0,0 +1,273 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package chunk + +import ( + "bytes" + "sync" + "testing" + "time" +) + +var ( + allStates = []State{SPLIT, STORED, SEEN, SENT, SYNCED} +) + +// TestTagSingleIncrements tests if Inc increments the tag state value +func TestTagSingleIncrements(t *testing.T) { + tg := &Tag{total: 10} + + tc := []struct { + state uint32 + inc int + expcount int + exptotal int + }{ + {state: SPLIT, inc: 10, expcount: 10, exptotal: 10}, + {state: STORED, inc: 9, expcount: 9, exptotal: 9}, + {state: SEEN, inc: 1, expcount: 1, exptotal: 10}, + {state: SENT, inc: 9, expcount: 9, exptotal: 9}, + {state: SYNCED, inc: 9, expcount: 9, exptotal: 9}, + } + + for _, tc := range tc { + for i := 0; i < tc.inc; i++ { + tg.Inc(tc.state) + } + } + + for _, tc := range tc { + if tg.Get(tc.state) != tc.expcount { + t.Fatalf("not incremented") + } + } +} + +// TestTagStatus is a unit test to cover Tag.Status method functionality +func TestTagStatus(t *testing.T) { + tg := &Tag{total: 10} + tg.Inc(SEEN) + tg.Inc(SENT) + tg.Inc(SYNCED) + + for i := 0; i < 10; i++ { + tg.Inc(SPLIT) + tg.Inc(STORED) + } + for _, v := range []struct { + state State + expVal int + expTotal int + }{ + {state: STORED, expVal: 10, expTotal: 10}, + {state: SPLIT, expVal: 10, expTotal: 10}, + {state: SEEN, expVal: 1, expTotal: 10}, + {state: SENT, expVal: 1, expTotal: 9}, + {state: SYNCED, expVal: 1, expTotal: 9}, + } { + val, total, err := tg.Status(v.state) + if err != nil { + t.Fatal(err) + } + if val != v.expVal { + t.Fatalf("should be %d, got %d", v.expVal, val) + } + if total != v.expTotal { + t.Fatalf("expected total to be %d, got %d", v.expTotal, total) + } + } +} + +// tests ETA is precise +func TestTagETA(t *testing.T) { + now := time.Now() + maxDiff := 100000 // 100 microsecond + tg := &Tag{total: 10, startedAt: now} + time.Sleep(100 * time.Millisecond) + tg.Inc(SPLIT) + eta, err := tg.ETA(SPLIT) + if err != nil { + t.Fatal(err) + } + diff := time.Until(eta) - 9*time.Since(now) + if int(diff) > maxDiff { + t.Fatalf("ETA is not precise, got diff %v > .1ms", diff) + } +} + +// TestTagConcurrentIncrements tests Inc calls concurrently +func TestTagConcurrentIncrements(t *testing.T) { + tg := &Tag{} + n := 1000 + wg := sync.WaitGroup{} + wg.Add(5 * n) + for _, f := range allStates { + go func(f State) { + for j := 0; j < n; j++ { + go func() { + tg.Inc(f) + wg.Done() + }() + } + }(f) + } + wg.Wait() + for _, f := range allStates { + v := tg.Get(f) + if v != n { + t.Fatalf("expected state %v to be %v, got %v", f, n, v) + } + } +} + +// TestTagsMultipleConcurrentIncrements tests Inc calls concurrently +func TestTagsMultipleConcurrentIncrementsSyncMap(t *testing.T) { + ts := NewTags() + n := 100 + wg := sync.WaitGroup{} + wg.Add(10 * 5 * n) + for i := 0; i < 10; i++ { + s := string([]byte{uint8(i)}) + tag, err := ts.New(s, n) + if err != nil { + t.Fatal(err) + } + for _, f := range allStates { + go func(tag *Tag, f State) { + for j := 0; j < n; j++ { + go func() { + tag.Inc(f) + wg.Done() + }() + } + }(tag, f) + } + } + wg.Wait() + i := 0 + ts.Range(func(k, v interface{}) bool { + i++ + uid := k.(uint32) + for _, f := range allStates { + tag, err := ts.Get(uid) + if err != nil { + t.Fatal(err) + } + stateVal := tag.Get(f) + if stateVal != n { + t.Fatalf("expected tag %v state %v to be %v, got %v", uid, f, n, v) + } + } + return true + + }) + if i != 10 { + t.Fatal("not enough tagz") + } +} + +// TestMarshallingWithAddr tests that marshalling and unmarshalling is done correctly when the +// tag Address (byte slice) contains some arbitrary value +func TestMarshallingWithAddr(t *testing.T) { + tg := NewTag(111, "test/tag", 10) + tg.Address = []byte{0, 1, 2, 3, 4, 5, 6} + + for _, f := range allStates { + tg.Inc(f) + } + + b, err := tg.MarshalBinary() + if err != nil { + t.Fatal(err) + } + + unmarshalledTag := &Tag{} + err = unmarshalledTag.UnmarshalBinary(b) + if err != nil { + t.Fatal(err) + } + + if unmarshalledTag.Uid != tg.Uid { + t.Fatalf("tag uids not equal. want %d got %d", tg.Uid, unmarshalledTag.Uid) + } + + if unmarshalledTag.Name != tg.Name { + t.Fatalf("tag names not equal. want %s got %s", tg.Name, unmarshalledTag.Name) + } + + for _, state := range allStates { + uv, tv := unmarshalledTag.Get(state), tg.Get(state) + if uv != tv { + t.Fatalf("state %d inconsistent. expected %d to equal %d", state, uv, tv) + } + } + + if unmarshalledTag.Total() != tg.Total() { + t.Fatalf("tag names not equal. want %d got %d", tg.Total(), unmarshalledTag.Total()) + } + + if len(unmarshalledTag.Address) != len(tg.Address) { + t.Fatalf("tag addresses length mismatch, want %d, got %d", len(tg.Address), len(unmarshalledTag.Address)) + } + + if !bytes.Equal(unmarshalledTag.Address, tg.Address) { + t.Fatalf("expected tag address to be %v got %v", unmarshalledTag.Address, tg.Address) + } +} + +// TestMarshallingNoAddress tests that marshalling and unmarshalling is done correctly +// when the tag Address (byte slice) is empty in this case +func TestMarshallingNoAddr(t *testing.T) { + tg := NewTag(111, "test/tag", 10) + for _, f := range allStates { + tg.Inc(f) + } + + b, err := tg.MarshalBinary() + if err != nil { + t.Fatal(err) + } + + unmarshalledTag := &Tag{} + err = unmarshalledTag.UnmarshalBinary(b) + if err != nil { + t.Fatal(err) + } + + if unmarshalledTag.Uid != tg.Uid { + t.Fatalf("tag uids not equal. want %d got %d", tg.Uid, unmarshalledTag.Uid) + } + + if unmarshalledTag.Name != tg.Name { + t.Fatalf("tag names not equal. want %s got %s", tg.Name, unmarshalledTag.Name) + } + + for _, state := range allStates { + uv, tv := unmarshalledTag.Get(state), tg.Get(state) + if uv != tv { + t.Fatalf("state %d inconsistent. expected %d to equal %d", state, uv, tv) + } + } + + if unmarshalledTag.Total() != tg.Total() { + t.Fatalf("tag names not equal. want %d got %d", tg.Total(), unmarshalledTag.Total()) + } + + if len(unmarshalledTag.Address) != len(tg.Address) { + t.Fatalf("expected tag addresses to be equal length") + } +} diff --git a/swarm/chunk/tags.go b/swarm/chunk/tags.go new file mode 100644 index 000000000..07f9b8cd7 --- /dev/null +++ b/swarm/chunk/tags.go @@ -0,0 +1,80 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package chunk + +import ( + "context" + "errors" + "math/rand" + "sync" + "time" + + "github.com/ethereum/go-ethereum/swarm/sctx" +) + +// Tags hold tag information indexed by a unique random uint32 +type Tags struct { + tags *sync.Map + rng *rand.Rand +} + +// NewTags creates a tags object +func NewTags() *Tags { + return &Tags{ + tags: &sync.Map{}, + rng: rand.New(rand.NewSource(time.Now().Unix())), + } +} + +// New creates a new tag, stores it by the name and returns it +// it returns an error if the tag with this name already exists +func (ts *Tags) New(s string, total int) (*Tag, error) { + t := &Tag{ + Uid: ts.rng.Uint32(), + Name: s, + startedAt: time.Now(), + total: uint32(total), + } + if _, loaded := ts.tags.LoadOrStore(t.Uid, t); loaded { + return nil, errExists + } + return t, nil +} + +// Get returns the undelying tag for the uid or an error if not found +func (ts *Tags) Get(uid uint32) (*Tag, error) { + t, ok := ts.tags.Load(uid) + if !ok { + return nil, errors.New("tag not found") + } + return t.(*Tag), nil +} + +// GetContext gets a tag from the tag uid stored in the context +func (ts *Tags) GetContext(ctx context.Context) (*Tag, error) { + uid := sctx.GetTag(ctx) + t, ok := ts.tags.Load(uid) + if !ok { + return nil, errTagNotFound + } + return t.(*Tag), nil +} + +// Range exposes sync.Map's iterator +func (ts *Tags) Range(fn func(k, v interface{}) bool) { + ts.tags.Range(fn) +} -- cgit v1.2.3 From ad6c39012fc761dd02a6b1a7fbfdcf2478abe2a1 Mon Sep 17 00:00:00 2001 From: Elad Date: Sun, 5 May 2019 22:34:22 +0400 Subject: swarm: push tags integration - request flow swarm/api: integrate tags to count chunks being split and stored swarm/api/http: integrate tags in middleware for HTTP `POST` calls and assert chunks being calculated and counted correctly swarm: remove deprecated and unused code, add swarm hash to DoneSplit signature, remove calls to the api client from the http package --- swarm/chunk/tag.go | 122 +++++++++++++++++++++++------------------------ swarm/chunk/tag_test.go | 50 +++++++++---------- swarm/chunk/tags.go | 24 ++++++++-- swarm/chunk/tags_test.go | 48 +++++++++++++++++++ 4 files changed, 154 insertions(+), 90 deletions(-) create mode 100644 swarm/chunk/tags_test.go (limited to 'swarm/chunk') diff --git a/swarm/chunk/tag.go b/swarm/chunk/tag.go index 359ac11ac..ee700d22b 100644 --- a/swarm/chunk/tag.go +++ b/swarm/chunk/tag.go @@ -34,11 +34,11 @@ var ( type State = uint32 const ( - SPLIT State = iota // chunk has been processed by filehasher/swarm safe call - STORED // chunk stored locally - SEEN // chunk previously seen - SENT // chunk sent to neighbourhood - SYNCED // proof is received; chunk removed from sync db; chunk is available everywhere + StateSplit State = iota // chunk has been processed by filehasher/swarm safe call + StateStored // chunk stored locally + StateSeen // chunk previously seen + StateSent // chunk sent to neighbourhood + StateSynced // proof is received; chunk removed from sync db; chunk is available everywhere ) // Tag represents info on the status of new chunks @@ -46,18 +46,18 @@ type Tag struct { Uid uint32 // a unique identifier for this tag Name string // a name tag for this tag Address Address // the associated swarm hash for this tag - total uint32 // total chunks belonging to a tag - split uint32 // number of chunks already processed by splitter for hashing - seen uint32 // number of chunks already seen - stored uint32 // number of chunks already stored locally - sent uint32 // number of chunks sent for push syncing - synced uint32 // number of chunks synced with proof + total int64 // total chunks belonging to a tag + split int64 // number of chunks already processed by splitter for hashing + seen int64 // number of chunks already seen + stored int64 // number of chunks already stored locally + sent int64 // number of chunks sent for push syncing + synced int64 // number of chunks synced with proof startedAt time.Time // tag started to calculate ETA } // New creates a new tag, stores it by the name and returns it // it returns an error if the tag with this name already exists -func NewTag(uid uint32, s string, total uint32) *Tag { +func NewTag(uid uint32, s string, total int64) *Tag { t := &Tag{ Uid: uid, Name: s, @@ -69,65 +69,65 @@ func NewTag(uid uint32, s string, total uint32) *Tag { // Inc increments the count for a state func (t *Tag) Inc(state State) { - var v *uint32 + var v *int64 switch state { - case SPLIT: + case StateSplit: v = &t.split - case STORED: + case StateStored: v = &t.stored - case SEEN: + case StateSeen: v = &t.seen - case SENT: + case StateSent: v = &t.sent - case SYNCED: + case StateSynced: v = &t.synced } - atomic.AddUint32(v, 1) + atomic.AddInt64(v, 1) } // Get returns the count for a state on a tag -func (t *Tag) Get(state State) int { - var v *uint32 +func (t *Tag) Get(state State) int64 { + var v *int64 switch state { - case SPLIT: + case StateSplit: v = &t.split - case STORED: + case StateStored: v = &t.stored - case SEEN: + case StateSeen: v = &t.seen - case SENT: + case StateSent: v = &t.sent - case SYNCED: + case StateSynced: v = &t.synced } - return int(atomic.LoadUint32(v)) + return atomic.LoadInt64(v) } // GetTotal returns the total count -func (t *Tag) Total() int { - return int(atomic.LoadUint32(&t.total)) +func (t *Tag) Total() int64 { + return atomic.LoadInt64(&t.total) } // DoneSplit sets total count to SPLIT count and sets the associated swarm hash for this tag // is meant to be called when splitter finishes for input streams of unknown size -func (t *Tag) DoneSplit(address Address) int { - total := atomic.LoadUint32(&t.split) - atomic.StoreUint32(&t.total, total) +func (t *Tag) DoneSplit(address Address) int64 { + total := atomic.LoadInt64(&t.split) + atomic.StoreInt64(&t.total, total) t.Address = address - return int(total) + return total } // Status returns the value of state and the total count -func (t *Tag) Status(state State) (int, int, error) { - count, seen, total := t.Get(state), int(atomic.LoadUint32(&t.seen)), int(atomic.LoadUint32(&t.total)) +func (t *Tag) Status(state State) (int64, int64, error) { + count, seen, total := t.Get(state), atomic.LoadInt64(&t.seen), atomic.LoadInt64(&t.total) if total == 0 { return count, total, errNA } switch state { - case SPLIT, STORED, SEEN: + case StateSplit, StateStored, StateSeen: return count, total, nil - case SENT, SYNCED: - stored := int(atomic.LoadUint32(&t.stored)) + case StateSent, StateSynced: + stored := atomic.LoadInt64(&t.stored) if stored < total { return count, total - seen, errNA } @@ -152,14 +152,14 @@ func (t *Tag) ETA(state State) (time.Time, error) { // MarshalBinary marshals the tag into a byte slice func (tag *Tag) MarshalBinary() (data []byte, err error) { - buffer := make([]byte, 0) - encodeUint32Append(&buffer, tag.Uid) - encodeUint32Append(&buffer, tag.total) - encodeUint32Append(&buffer, tag.split) - encodeUint32Append(&buffer, tag.seen) - encodeUint32Append(&buffer, tag.stored) - encodeUint32Append(&buffer, tag.sent) - encodeUint32Append(&buffer, tag.synced) + buffer := make([]byte, 4) + binary.BigEndian.PutUint32(buffer, tag.Uid) + encodeInt64Append(&buffer, tag.total) + encodeInt64Append(&buffer, tag.split) + encodeInt64Append(&buffer, tag.seen) + encodeInt64Append(&buffer, tag.stored) + encodeInt64Append(&buffer, tag.sent) + encodeInt64Append(&buffer, tag.synced) intBuffer := make([]byte, 8) @@ -181,14 +181,15 @@ func (tag *Tag) UnmarshalBinary(buffer []byte) error { if len(buffer) < 13 { return errors.New("buffer too short") } + tag.Uid = binary.BigEndian.Uint32(buffer) + buffer = buffer[4:] - tag.Uid = decodeUint32Splice(&buffer) - tag.total = decodeUint32Splice(&buffer) - tag.split = decodeUint32Splice(&buffer) - tag.seen = decodeUint32Splice(&buffer) - tag.stored = decodeUint32Splice(&buffer) - tag.sent = decodeUint32Splice(&buffer) - tag.synced = decodeUint32Splice(&buffer) + tag.total = decodeInt64Splice(&buffer) + tag.split = decodeInt64Splice(&buffer) + tag.seen = decodeInt64Splice(&buffer) + tag.stored = decodeInt64Splice(&buffer) + tag.sent = decodeInt64Splice(&buffer) + tag.synced = decodeInt64Splice(&buffer) t, n := binary.Varint(buffer) tag.startedAt = time.Unix(t, 0) @@ -202,17 +203,16 @@ func (tag *Tag) UnmarshalBinary(buffer []byte) error { tag.Name = string(buffer[t:]) return nil - } -func encodeUint32Append(buffer *[]byte, val uint32) { - intBuffer := make([]byte, 4) - binary.BigEndian.PutUint32(intBuffer, val) - *buffer = append(*buffer, intBuffer...) +func encodeInt64Append(buffer *[]byte, val int64) { + intBuffer := make([]byte, 8) + n := binary.PutVarint(intBuffer, val) + *buffer = append(*buffer, intBuffer[:n]...) } -func decodeUint32Splice(buffer *[]byte) uint32 { - val := binary.BigEndian.Uint32((*buffer)[:4]) - *buffer = (*buffer)[4:] +func decodeInt64Splice(buffer *[]byte) int64 { + val, n := binary.Varint((*buffer)) + *buffer = (*buffer)[n:] return val } diff --git a/swarm/chunk/tag_test.go b/swarm/chunk/tag_test.go index b3f3be2ca..e6acfb185 100644 --- a/swarm/chunk/tag_test.go +++ b/swarm/chunk/tag_test.go @@ -24,7 +24,7 @@ import ( ) var ( - allStates = []State{SPLIT, STORED, SEEN, SENT, SYNCED} + allStates = []State{StateSplit, StateStored, StateSeen, StateSent, StateSynced} ) // TestTagSingleIncrements tests if Inc increments the tag state value @@ -34,14 +34,14 @@ func TestTagSingleIncrements(t *testing.T) { tc := []struct { state uint32 inc int - expcount int - exptotal int + expcount int64 + exptotal int64 }{ - {state: SPLIT, inc: 10, expcount: 10, exptotal: 10}, - {state: STORED, inc: 9, expcount: 9, exptotal: 9}, - {state: SEEN, inc: 1, expcount: 1, exptotal: 10}, - {state: SENT, inc: 9, expcount: 9, exptotal: 9}, - {state: SYNCED, inc: 9, expcount: 9, exptotal: 9}, + {state: StateSplit, inc: 10, expcount: 10, exptotal: 10}, + {state: StateStored, inc: 9, expcount: 9, exptotal: 9}, + {state: StateSeen, inc: 1, expcount: 1, exptotal: 10}, + {state: StateSent, inc: 9, expcount: 9, exptotal: 9}, + {state: StateSynced, inc: 9, expcount: 9, exptotal: 9}, } for _, tc := range tc { @@ -60,24 +60,24 @@ func TestTagSingleIncrements(t *testing.T) { // TestTagStatus is a unit test to cover Tag.Status method functionality func TestTagStatus(t *testing.T) { tg := &Tag{total: 10} - tg.Inc(SEEN) - tg.Inc(SENT) - tg.Inc(SYNCED) + tg.Inc(StateSeen) + tg.Inc(StateSent) + tg.Inc(StateSynced) for i := 0; i < 10; i++ { - tg.Inc(SPLIT) - tg.Inc(STORED) + tg.Inc(StateSplit) + tg.Inc(StateStored) } for _, v := range []struct { state State - expVal int - expTotal int + expVal int64 + expTotal int64 }{ - {state: STORED, expVal: 10, expTotal: 10}, - {state: SPLIT, expVal: 10, expTotal: 10}, - {state: SEEN, expVal: 1, expTotal: 10}, - {state: SENT, expVal: 1, expTotal: 9}, - {state: SYNCED, expVal: 1, expTotal: 9}, + {state: StateStored, expVal: 10, expTotal: 10}, + {state: StateSplit, expVal: 10, expTotal: 10}, + {state: StateSeen, expVal: 1, expTotal: 10}, + {state: StateSent, expVal: 1, expTotal: 9}, + {state: StateSynced, expVal: 1, expTotal: 9}, } { val, total, err := tg.Status(v.state) if err != nil { @@ -98,8 +98,8 @@ func TestTagETA(t *testing.T) { maxDiff := 100000 // 100 microsecond tg := &Tag{total: 10, startedAt: now} time.Sleep(100 * time.Millisecond) - tg.Inc(SPLIT) - eta, err := tg.ETA(SPLIT) + tg.Inc(StateSplit) + eta, err := tg.ETA(StateSplit) if err != nil { t.Fatal(err) } @@ -128,7 +128,7 @@ func TestTagConcurrentIncrements(t *testing.T) { wg.Wait() for _, f := range allStates { v := tg.Get(f) - if v != n { + if v != int64(n) { t.Fatalf("expected state %v to be %v, got %v", f, n, v) } } @@ -142,7 +142,7 @@ func TestTagsMultipleConcurrentIncrementsSyncMap(t *testing.T) { wg.Add(10 * 5 * n) for i := 0; i < 10; i++ { s := string([]byte{uint8(i)}) - tag, err := ts.New(s, n) + tag, err := ts.New(s, int64(n)) if err != nil { t.Fatal(err) } @@ -168,7 +168,7 @@ func TestTagsMultipleConcurrentIncrementsSyncMap(t *testing.T) { t.Fatal(err) } stateVal := tag.Get(f) - if stateVal != n { + if stateVal != int64(n) { t.Fatalf("expected tag %v state %v to be %v, got %v", uid, f, n, v) } } diff --git a/swarm/chunk/tags.go b/swarm/chunk/tags.go index 07f9b8cd7..435f5d706 100644 --- a/swarm/chunk/tags.go +++ b/swarm/chunk/tags.go @@ -42,12 +42,12 @@ func NewTags() *Tags { // New creates a new tag, stores it by the name and returns it // it returns an error if the tag with this name already exists -func (ts *Tags) New(s string, total int) (*Tag, error) { +func (ts *Tags) New(s string, total int64) (*Tag, error) { t := &Tag{ Uid: ts.rng.Uint32(), Name: s, startedAt: time.Now(), - total: uint32(total), + total: total, } if _, loaded := ts.tags.LoadOrStore(t.Uid, t); loaded { return nil, errExists @@ -55,6 +55,18 @@ func (ts *Tags) New(s string, total int) (*Tag, error) { return t, nil } +// All returns all existing tags in Tags' sync.Map +// Note that tags are returned in no particular order +func (ts *Tags) All() (t []*Tag) { + ts.tags.Range(func(k, v interface{}) bool { + t = append(t, v.(*Tag)) + + return true + }) + + return t +} + // Get returns the undelying tag for the uid or an error if not found func (ts *Tags) Get(uid uint32) (*Tag, error) { t, ok := ts.tags.Load(uid) @@ -64,8 +76,8 @@ func (ts *Tags) Get(uid uint32) (*Tag, error) { return t.(*Tag), nil } -// GetContext gets a tag from the tag uid stored in the context -func (ts *Tags) GetContext(ctx context.Context) (*Tag, error) { +// GetFromContext gets a tag from the tag uid stored in the context +func (ts *Tags) GetFromContext(ctx context.Context) (*Tag, error) { uid := sctx.GetTag(ctx) t, ok := ts.tags.Load(uid) if !ok { @@ -78,3 +90,7 @@ func (ts *Tags) GetContext(ctx context.Context) (*Tag, error) { func (ts *Tags) Range(fn func(k, v interface{}) bool) { ts.tags.Range(fn) } + +func (ts *Tags) Delete(k interface{}) { + ts.tags.Delete(k) +} diff --git a/swarm/chunk/tags_test.go b/swarm/chunk/tags_test.go new file mode 100644 index 000000000..f818c4c5c --- /dev/null +++ b/swarm/chunk/tags_test.go @@ -0,0 +1,48 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package chunk + +import "testing" + +func TestAll(t *testing.T) { + ts := NewTags() + + ts.New("1", 1) + ts.New("2", 1) + + all := ts.All() + + if len(all) != 2 { + t.Fatalf("expected length to be 2 got %d", len(all)) + } + + if n := all[0].Total(); n != 1 { + t.Fatalf("expected tag 0 total to be 1 got %d", n) + } + + if n := all[1].Total(); n != 1 { + t.Fatalf("expected tag 1 total to be 1 got %d", n) + } + + ts.New("3", 1) + all = ts.All() + + if len(all) != 3 { + t.Fatalf("expected length to be 3 got %d", len(all)) + } + +} -- cgit v1.2.3 From 84dfaea246dea179319db90a63afc1189cd09246 Mon Sep 17 00:00:00 2001 From: Elad Date: Thu, 9 May 2019 12:54:06 +0400 Subject: swarm: instrument setNextBatch swarm/storage/localstore: add gc metrics, disable flaky test --- swarm/chunk/chunk.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'swarm/chunk') diff --git a/swarm/chunk/chunk.go b/swarm/chunk/chunk.go index 17f49348b..c44292bb9 100644 --- a/swarm/chunk/chunk.go +++ b/swarm/chunk/chunk.go @@ -197,7 +197,7 @@ func (m ModeSet) String() string { const ( // ModeSetAccess: when an update request is received for a chunk or chunk is retrieved for delivery ModeSetAccess ModeSet = iota - // ModeSetSync: when push sync receipt is received + // ModeSetSync: when a chunk is added to a pull sync batch or when a push sync receipt is received ModeSetSync // ModeSetRemove: when a chunk is removed ModeSetRemove -- cgit v1.2.3