aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/chunk
diff options
context:
space:
mode:
authorElad <theman@elad.im>2019-05-06 02:34:22 +0800
committerAnton Evangelatov <anton.evangelatov@gmail.com>2019-05-10 18:26:52 +0800
commitad6c39012fc761dd02a6b1a7fbfdcf2478abe2a1 (patch)
tree396077f7f33307321110f17bc3a19abe115e5d47 /swarm/chunk
parent3030893a21b17a0e90ddd0047d0f310fee8335a0 (diff)
downloadgo-tangerine-ad6c39012fc761dd02a6b1a7fbfdcf2478abe2a1.tar
go-tangerine-ad6c39012fc761dd02a6b1a7fbfdcf2478abe2a1.tar.gz
go-tangerine-ad6c39012fc761dd02a6b1a7fbfdcf2478abe2a1.tar.bz2
go-tangerine-ad6c39012fc761dd02a6b1a7fbfdcf2478abe2a1.tar.lz
go-tangerine-ad6c39012fc761dd02a6b1a7fbfdcf2478abe2a1.tar.xz
go-tangerine-ad6c39012fc761dd02a6b1a7fbfdcf2478abe2a1.tar.zst
go-tangerine-ad6c39012fc761dd02a6b1a7fbfdcf2478abe2a1.zip
swarm: push tags integration - request flow
swarm/api: integrate tags to count chunks being split and stored swarm/api/http: integrate tags in middleware for HTTP `POST` calls and assert chunks being calculated and counted correctly swarm: remove deprecated and unused code, add swarm hash to DoneSplit signature, remove calls to the api client from the http package
Diffstat (limited to 'swarm/chunk')
-rw-r--r--swarm/chunk/tag.go122
-rw-r--r--swarm/chunk/tag_test.go50
-rw-r--r--swarm/chunk/tags.go24
-rw-r--r--swarm/chunk/tags_test.go48
4 files changed, 154 insertions, 90 deletions
diff --git a/swarm/chunk/tag.go b/swarm/chunk/tag.go
index 359ac11ac..ee700d22b 100644
--- a/swarm/chunk/tag.go
+++ b/swarm/chunk/tag.go
@@ -34,11 +34,11 @@ var (
type State = uint32
const (
- SPLIT State = iota // chunk has been processed by filehasher/swarm safe call
- STORED // chunk stored locally
- SEEN // chunk previously seen
- SENT // chunk sent to neighbourhood
- SYNCED // proof is received; chunk removed from sync db; chunk is available everywhere
+ StateSplit State = iota // chunk has been processed by filehasher/swarm safe call
+ StateStored // chunk stored locally
+ StateSeen // chunk previously seen
+ StateSent // chunk sent to neighbourhood
+ StateSynced // proof is received; chunk removed from sync db; chunk is available everywhere
)
// Tag represents info on the status of new chunks
@@ -46,18 +46,18 @@ type Tag struct {
Uid uint32 // a unique identifier for this tag
Name string // a name tag for this tag
Address Address // the associated swarm hash for this tag
- total uint32 // total chunks belonging to a tag
- split uint32 // number of chunks already processed by splitter for hashing
- seen uint32 // number of chunks already seen
- stored uint32 // number of chunks already stored locally
- sent uint32 // number of chunks sent for push syncing
- synced uint32 // number of chunks synced with proof
+ total int64 // total chunks belonging to a tag
+ split int64 // number of chunks already processed by splitter for hashing
+ seen int64 // number of chunks already seen
+ stored int64 // number of chunks already stored locally
+ sent int64 // number of chunks sent for push syncing
+ synced int64 // number of chunks synced with proof
startedAt time.Time // tag started to calculate ETA
}
// New creates a new tag, stores it by the name and returns it
// it returns an error if the tag with this name already exists
-func NewTag(uid uint32, s string, total uint32) *Tag {
+func NewTag(uid uint32, s string, total int64) *Tag {
t := &Tag{
Uid: uid,
Name: s,
@@ -69,65 +69,65 @@ func NewTag(uid uint32, s string, total uint32) *Tag {
// Inc increments the count for a state
func (t *Tag) Inc(state State) {
- var v *uint32
+ var v *int64
switch state {
- case SPLIT:
+ case StateSplit:
v = &t.split
- case STORED:
+ case StateStored:
v = &t.stored
- case SEEN:
+ case StateSeen:
v = &t.seen
- case SENT:
+ case StateSent:
v = &t.sent
- case SYNCED:
+ case StateSynced:
v = &t.synced
}
- atomic.AddUint32(v, 1)
+ atomic.AddInt64(v, 1)
}
// Get returns the count for a state on a tag
-func (t *Tag) Get(state State) int {
- var v *uint32
+func (t *Tag) Get(state State) int64 {
+ var v *int64
switch state {
- case SPLIT:
+ case StateSplit:
v = &t.split
- case STORED:
+ case StateStored:
v = &t.stored
- case SEEN:
+ case StateSeen:
v = &t.seen
- case SENT:
+ case StateSent:
v = &t.sent
- case SYNCED:
+ case StateSynced:
v = &t.synced
}
- return int(atomic.LoadUint32(v))
+ return atomic.LoadInt64(v)
}
// GetTotal returns the total count
-func (t *Tag) Total() int {
- return int(atomic.LoadUint32(&t.total))
+func (t *Tag) Total() int64 {
+ return atomic.LoadInt64(&t.total)
}
// DoneSplit sets total count to SPLIT count and sets the associated swarm hash for this tag
// is meant to be called when splitter finishes for input streams of unknown size
-func (t *Tag) DoneSplit(address Address) int {
- total := atomic.LoadUint32(&t.split)
- atomic.StoreUint32(&t.total, total)
+func (t *Tag) DoneSplit(address Address) int64 {
+ total := atomic.LoadInt64(&t.split)
+ atomic.StoreInt64(&t.total, total)
t.Address = address
- return int(total)
+ return total
}
// Status returns the value of state and the total count
-func (t *Tag) Status(state State) (int, int, error) {
- count, seen, total := t.Get(state), int(atomic.LoadUint32(&t.seen)), int(atomic.LoadUint32(&t.total))
+func (t *Tag) Status(state State) (int64, int64, error) {
+ count, seen, total := t.Get(state), atomic.LoadInt64(&t.seen), atomic.LoadInt64(&t.total)
if total == 0 {
return count, total, errNA
}
switch state {
- case SPLIT, STORED, SEEN:
+ case StateSplit, StateStored, StateSeen:
return count, total, nil
- case SENT, SYNCED:
- stored := int(atomic.LoadUint32(&t.stored))
+ case StateSent, StateSynced:
+ stored := atomic.LoadInt64(&t.stored)
if stored < total {
return count, total - seen, errNA
}
@@ -152,14 +152,14 @@ func (t *Tag) ETA(state State) (time.Time, error) {
// MarshalBinary marshals the tag into a byte slice
func (tag *Tag) MarshalBinary() (data []byte, err error) {
- buffer := make([]byte, 0)
- encodeUint32Append(&buffer, tag.Uid)
- encodeUint32Append(&buffer, tag.total)
- encodeUint32Append(&buffer, tag.split)
- encodeUint32Append(&buffer, tag.seen)
- encodeUint32Append(&buffer, tag.stored)
- encodeUint32Append(&buffer, tag.sent)
- encodeUint32Append(&buffer, tag.synced)
+ buffer := make([]byte, 4)
+ binary.BigEndian.PutUint32(buffer, tag.Uid)
+ encodeInt64Append(&buffer, tag.total)
+ encodeInt64Append(&buffer, tag.split)
+ encodeInt64Append(&buffer, tag.seen)
+ encodeInt64Append(&buffer, tag.stored)
+ encodeInt64Append(&buffer, tag.sent)
+ encodeInt64Append(&buffer, tag.synced)
intBuffer := make([]byte, 8)
@@ -181,14 +181,15 @@ func (tag *Tag) UnmarshalBinary(buffer []byte) error {
if len(buffer) < 13 {
return errors.New("buffer too short")
}
+ tag.Uid = binary.BigEndian.Uint32(buffer)
+ buffer = buffer[4:]
- tag.Uid = decodeUint32Splice(&buffer)
- tag.total = decodeUint32Splice(&buffer)
- tag.split = decodeUint32Splice(&buffer)
- tag.seen = decodeUint32Splice(&buffer)
- tag.stored = decodeUint32Splice(&buffer)
- tag.sent = decodeUint32Splice(&buffer)
- tag.synced = decodeUint32Splice(&buffer)
+ tag.total = decodeInt64Splice(&buffer)
+ tag.split = decodeInt64Splice(&buffer)
+ tag.seen = decodeInt64Splice(&buffer)
+ tag.stored = decodeInt64Splice(&buffer)
+ tag.sent = decodeInt64Splice(&buffer)
+ tag.synced = decodeInt64Splice(&buffer)
t, n := binary.Varint(buffer)
tag.startedAt = time.Unix(t, 0)
@@ -202,17 +203,16 @@ func (tag *Tag) UnmarshalBinary(buffer []byte) error {
tag.Name = string(buffer[t:])
return nil
-
}
-func encodeUint32Append(buffer *[]byte, val uint32) {
- intBuffer := make([]byte, 4)
- binary.BigEndian.PutUint32(intBuffer, val)
- *buffer = append(*buffer, intBuffer...)
+func encodeInt64Append(buffer *[]byte, val int64) {
+ intBuffer := make([]byte, 8)
+ n := binary.PutVarint(intBuffer, val)
+ *buffer = append(*buffer, intBuffer[:n]...)
}
-func decodeUint32Splice(buffer *[]byte) uint32 {
- val := binary.BigEndian.Uint32((*buffer)[:4])
- *buffer = (*buffer)[4:]
+func decodeInt64Splice(buffer *[]byte) int64 {
+ val, n := binary.Varint((*buffer))
+ *buffer = (*buffer)[n:]
return val
}
diff --git a/swarm/chunk/tag_test.go b/swarm/chunk/tag_test.go
index b3f3be2ca..e6acfb185 100644
--- a/swarm/chunk/tag_test.go
+++ b/swarm/chunk/tag_test.go
@@ -24,7 +24,7 @@ import (
)
var (
- allStates = []State{SPLIT, STORED, SEEN, SENT, SYNCED}
+ allStates = []State{StateSplit, StateStored, StateSeen, StateSent, StateSynced}
)
// TestTagSingleIncrements tests if Inc increments the tag state value
@@ -34,14 +34,14 @@ func TestTagSingleIncrements(t *testing.T) {
tc := []struct {
state uint32
inc int
- expcount int
- exptotal int
+ expcount int64
+ exptotal int64
}{
- {state: SPLIT, inc: 10, expcount: 10, exptotal: 10},
- {state: STORED, inc: 9, expcount: 9, exptotal: 9},
- {state: SEEN, inc: 1, expcount: 1, exptotal: 10},
- {state: SENT, inc: 9, expcount: 9, exptotal: 9},
- {state: SYNCED, inc: 9, expcount: 9, exptotal: 9},
+ {state: StateSplit, inc: 10, expcount: 10, exptotal: 10},
+ {state: StateStored, inc: 9, expcount: 9, exptotal: 9},
+ {state: StateSeen, inc: 1, expcount: 1, exptotal: 10},
+ {state: StateSent, inc: 9, expcount: 9, exptotal: 9},
+ {state: StateSynced, inc: 9, expcount: 9, exptotal: 9},
}
for _, tc := range tc {
@@ -60,24 +60,24 @@ func TestTagSingleIncrements(t *testing.T) {
// TestTagStatus is a unit test to cover Tag.Status method functionality
func TestTagStatus(t *testing.T) {
tg := &Tag{total: 10}
- tg.Inc(SEEN)
- tg.Inc(SENT)
- tg.Inc(SYNCED)
+ tg.Inc(StateSeen)
+ tg.Inc(StateSent)
+ tg.Inc(StateSynced)
for i := 0; i < 10; i++ {
- tg.Inc(SPLIT)
- tg.Inc(STORED)
+ tg.Inc(StateSplit)
+ tg.Inc(StateStored)
}
for _, v := range []struct {
state State
- expVal int
- expTotal int
+ expVal int64
+ expTotal int64
}{
- {state: STORED, expVal: 10, expTotal: 10},
- {state: SPLIT, expVal: 10, expTotal: 10},
- {state: SEEN, expVal: 1, expTotal: 10},
- {state: SENT, expVal: 1, expTotal: 9},
- {state: SYNCED, expVal: 1, expTotal: 9},
+ {state: StateStored, expVal: 10, expTotal: 10},
+ {state: StateSplit, expVal: 10, expTotal: 10},
+ {state: StateSeen, expVal: 1, expTotal: 10},
+ {state: StateSent, expVal: 1, expTotal: 9},
+ {state: StateSynced, expVal: 1, expTotal: 9},
} {
val, total, err := tg.Status(v.state)
if err != nil {
@@ -98,8 +98,8 @@ func TestTagETA(t *testing.T) {
maxDiff := 100000 // 100 microsecond
tg := &Tag{total: 10, startedAt: now}
time.Sleep(100 * time.Millisecond)
- tg.Inc(SPLIT)
- eta, err := tg.ETA(SPLIT)
+ tg.Inc(StateSplit)
+ eta, err := tg.ETA(StateSplit)
if err != nil {
t.Fatal(err)
}
@@ -128,7 +128,7 @@ func TestTagConcurrentIncrements(t *testing.T) {
wg.Wait()
for _, f := range allStates {
v := tg.Get(f)
- if v != n {
+ if v != int64(n) {
t.Fatalf("expected state %v to be %v, got %v", f, n, v)
}
}
@@ -142,7 +142,7 @@ func TestTagsMultipleConcurrentIncrementsSyncMap(t *testing.T) {
wg.Add(10 * 5 * n)
for i := 0; i < 10; i++ {
s := string([]byte{uint8(i)})
- tag, err := ts.New(s, n)
+ tag, err := ts.New(s, int64(n))
if err != nil {
t.Fatal(err)
}
@@ -168,7 +168,7 @@ func TestTagsMultipleConcurrentIncrementsSyncMap(t *testing.T) {
t.Fatal(err)
}
stateVal := tag.Get(f)
- if stateVal != n {
+ if stateVal != int64(n) {
t.Fatalf("expected tag %v state %v to be %v, got %v", uid, f, n, v)
}
}
diff --git a/swarm/chunk/tags.go b/swarm/chunk/tags.go
index 07f9b8cd7..435f5d706 100644
--- a/swarm/chunk/tags.go
+++ b/swarm/chunk/tags.go
@@ -42,12 +42,12 @@ func NewTags() *Tags {
// New creates a new tag, stores it by the name and returns it
// it returns an error if the tag with this name already exists
-func (ts *Tags) New(s string, total int) (*Tag, error) {
+func (ts *Tags) New(s string, total int64) (*Tag, error) {
t := &Tag{
Uid: ts.rng.Uint32(),
Name: s,
startedAt: time.Now(),
- total: uint32(total),
+ total: total,
}
if _, loaded := ts.tags.LoadOrStore(t.Uid, t); loaded {
return nil, errExists
@@ -55,6 +55,18 @@ func (ts *Tags) New(s string, total int) (*Tag, error) {
return t, nil
}
+// All returns all existing tags in Tags' sync.Map
+// Note that tags are returned in no particular order
+func (ts *Tags) All() (t []*Tag) {
+ ts.tags.Range(func(k, v interface{}) bool {
+ t = append(t, v.(*Tag))
+
+ return true
+ })
+
+ return t
+}
+
// Get returns the undelying tag for the uid or an error if not found
func (ts *Tags) Get(uid uint32) (*Tag, error) {
t, ok := ts.tags.Load(uid)
@@ -64,8 +76,8 @@ func (ts *Tags) Get(uid uint32) (*Tag, error) {
return t.(*Tag), nil
}
-// GetContext gets a tag from the tag uid stored in the context
-func (ts *Tags) GetContext(ctx context.Context) (*Tag, error) {
+// GetFromContext gets a tag from the tag uid stored in the context
+func (ts *Tags) GetFromContext(ctx context.Context) (*Tag, error) {
uid := sctx.GetTag(ctx)
t, ok := ts.tags.Load(uid)
if !ok {
@@ -78,3 +90,7 @@ func (ts *Tags) GetContext(ctx context.Context) (*Tag, error) {
func (ts *Tags) Range(fn func(k, v interface{}) bool) {
ts.tags.Range(fn)
}
+
+func (ts *Tags) Delete(k interface{}) {
+ ts.tags.Delete(k)
+}
diff --git a/swarm/chunk/tags_test.go b/swarm/chunk/tags_test.go
new file mode 100644
index 000000000..f818c4c5c
--- /dev/null
+++ b/swarm/chunk/tags_test.go
@@ -0,0 +1,48 @@
+// Copyright 2019 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package chunk
+
+import "testing"
+
+func TestAll(t *testing.T) {
+ ts := NewTags()
+
+ ts.New("1", 1)
+ ts.New("2", 1)
+
+ all := ts.All()
+
+ if len(all) != 2 {
+ t.Fatalf("expected length to be 2 got %d", len(all))
+ }
+
+ if n := all[0].Total(); n != 1 {
+ t.Fatalf("expected tag 0 total to be 1 got %d", n)
+ }
+
+ if n := all[1].Total(); n != 1 {
+ t.Fatalf("expected tag 1 total to be 1 got %d", n)
+ }
+
+ ts.New("3", 1)
+ all = ts.All()
+
+ if len(all) != 3 {
+ t.Fatalf("expected length to be 3 got %d", len(all))
+ }
+
+}