aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/chunk
diff options
context:
space:
mode:
authorPéter Szilágyi <peterke@gmail.com>2019-05-10 19:09:01 +0800
committerGitHub <noreply@github.com>2019-05-10 19:09:01 +0800
commit494f5d448a1685d5de4cb1524b863cd1fc9a13b0 (patch)
tree4db9d1afe4910c888f3488cd93e8537501d88314 /swarm/chunk
parentc94d582aa781b26412ba7d570f6707d193303a02 (diff)
parent9b1543c282f39d452f611eeee0307bdf828e8bc2 (diff)
downloadgo-tangerine-494f5d448a1685d5de4cb1524b863cd1fc9a13b0.tar
go-tangerine-494f5d448a1685d5de4cb1524b863cd1fc9a13b0.tar.gz
go-tangerine-494f5d448a1685d5de4cb1524b863cd1fc9a13b0.tar.bz2
go-tangerine-494f5d448a1685d5de4cb1524b863cd1fc9a13b0.tar.lz
go-tangerine-494f5d448a1685d5de4cb1524b863cd1fc9a13b0.tar.xz
go-tangerine-494f5d448a1685d5de4cb1524b863cd1fc9a13b0.tar.zst
go-tangerine-494f5d448a1685d5de4cb1524b863cd1fc9a13b0.zip
Merge pull request #19550 from ethersphere/swarm-rather-stable
swarm v0.4-rc1
Diffstat (limited to 'swarm/chunk')
-rw-r--r--swarm/chunk/chunk.go154
-rw-r--r--swarm/chunk/tag.go218
-rw-r--r--swarm/chunk/tag_test.go273
-rw-r--r--swarm/chunk/tags.go96
-rw-r--r--swarm/chunk/tags_test.go48
5 files changed, 788 insertions, 1 deletions
diff --git a/swarm/chunk/chunk.go b/swarm/chunk/chunk.go
index 7540af8ce..c44292bb9 100644
--- a/swarm/chunk/chunk.go
+++ b/swarm/chunk/chunk.go
@@ -1,6 +1,23 @@
+// Copyright 2019 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
package chunk
import (
+ "context"
"errors"
"fmt"
@@ -28,7 +45,7 @@ type chunk struct {
sdata []byte
}
-func NewChunk(addr Address, data []byte) *chunk {
+func NewChunk(addr Address, data []byte) Chunk {
return &chunk{
addr: addr,
sdata: data,
@@ -107,3 +124,138 @@ func Proximity(one, other []byte) (ret int) {
}
return MaxPO
}
+
+// ModeGet enumerates different Getter modes.
+type ModeGet int
+
+func (m ModeGet) String() string {
+ switch m {
+ case ModeGetRequest:
+ return "Request"
+ case ModeGetSync:
+ return "Sync"
+ case ModeGetLookup:
+ return "Lookup"
+ default:
+ return "Unknown"
+ }
+}
+
+// Getter modes.
+const (
+ // ModeGetRequest: when accessed for retrieval
+ ModeGetRequest ModeGet = iota
+ // ModeGetSync: when accessed for syncing or proof of custody request
+ ModeGetSync
+ // ModeGetLookup: when accessed to lookup a a chunk in feeds or other places
+ ModeGetLookup
+)
+
+// ModePut enumerates different Putter modes.
+type ModePut int
+
+func (m ModePut) String() string {
+ switch m {
+ case ModePutRequest:
+ return "Request"
+ case ModePutSync:
+ return "Sync"
+ case ModePutUpload:
+ return "Upload"
+ default:
+ return "Unknown"
+ }
+}
+
+// Putter modes.
+const (
+ // ModePutRequest: when a chunk is received as a result of retrieve request and delivery
+ ModePutRequest ModePut = iota
+ // ModePutSync: when a chunk is received via syncing
+ ModePutSync
+ // ModePutUpload: when a chunk is created by local upload
+ ModePutUpload
+)
+
+// ModeSet enumerates different Setter modes.
+type ModeSet int
+
+func (m ModeSet) String() string {
+ switch m {
+ case ModeSetAccess:
+ return "Access"
+ case ModeSetSync:
+ return "Sync"
+ case ModeSetRemove:
+ return "Remove"
+ default:
+ return "Unknown"
+ }
+}
+
+// Setter modes.
+const (
+ // ModeSetAccess: when an update request is received for a chunk or chunk is retrieved for delivery
+ ModeSetAccess ModeSet = iota
+ // ModeSetSync: when a chunk is added to a pull sync batch or when a push sync receipt is received
+ ModeSetSync
+ // ModeSetRemove: when a chunk is removed
+ ModeSetRemove
+)
+
+// Descriptor holds information required for Pull syncing. This struct
+// is provided by subscribing to pull index.
+type Descriptor struct {
+ Address Address
+ BinID uint64
+}
+
+func (d *Descriptor) String() string {
+ if d == nil {
+ return ""
+ }
+ return fmt.Sprintf("%s bin id %v", d.Address.Hex(), d.BinID)
+}
+
+type Store interface {
+ Get(ctx context.Context, mode ModeGet, addr Address) (ch Chunk, err error)
+ Put(ctx context.Context, mode ModePut, ch Chunk) (exists bool, err error)
+ Has(ctx context.Context, addr Address) (yes bool, err error)
+ Set(ctx context.Context, mode ModeSet, addr Address) (err error)
+ LastPullSubscriptionBinID(bin uint8) (id uint64, err error)
+ SubscribePull(ctx context.Context, bin uint8, since, until uint64) (c <-chan Descriptor, stop func())
+ Close() (err error)
+}
+
+// Validator validates a chunk.
+type Validator interface {
+ Validate(ch Chunk) bool
+}
+
+// ValidatorStore encapsulates Store by decorting the Put method
+// with validators check.
+type ValidatorStore struct {
+ Store
+ validators []Validator
+}
+
+// NewValidatorStore returns a new ValidatorStore which uses
+// provided validators to validate chunks on Put.
+func NewValidatorStore(store Store, validators ...Validator) (s *ValidatorStore) {
+ return &ValidatorStore{
+ Store: store,
+ validators: validators,
+ }
+}
+
+// Put overrides Store put method with validators check. If one of the validators
+// return true, the chunk is considered valid and Store Put method is called.
+// If all validators return false, ErrChunkInvalid is returned.
+func (s *ValidatorStore) Put(ctx context.Context, mode ModePut, ch Chunk) (exists bool, err error) {
+ for _, v := range s.validators {
+ if v.Validate(ch) {
+ return s.Store.Put(ctx, mode, ch)
+ }
+ }
+ return false, ErrChunkInvalid
+}
diff --git a/swarm/chunk/tag.go b/swarm/chunk/tag.go
new file mode 100644
index 000000000..ee700d22b
--- /dev/null
+++ b/swarm/chunk/tag.go
@@ -0,0 +1,218 @@
+// Copyright 2019 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package chunk
+
+import (
+ "encoding/binary"
+ "errors"
+ "sync/atomic"
+ "time"
+)
+
+var (
+ errExists = errors.New("already exists")
+ errNA = errors.New("not available yet")
+ errNoETA = errors.New("unable to calculate ETA")
+ errTagNotFound = errors.New("tag not found")
+)
+
+// State is the enum type for chunk states
+type State = uint32
+
+const (
+ StateSplit State = iota // chunk has been processed by filehasher/swarm safe call
+ StateStored // chunk stored locally
+ StateSeen // chunk previously seen
+ StateSent // chunk sent to neighbourhood
+ StateSynced // proof is received; chunk removed from sync db; chunk is available everywhere
+)
+
+// Tag represents info on the status of new chunks
+type Tag struct {
+ Uid uint32 // a unique identifier for this tag
+ Name string // a name tag for this tag
+ Address Address // the associated swarm hash for this tag
+ total int64 // total chunks belonging to a tag
+ split int64 // number of chunks already processed by splitter for hashing
+ seen int64 // number of chunks already seen
+ stored int64 // number of chunks already stored locally
+ sent int64 // number of chunks sent for push syncing
+ synced int64 // number of chunks synced with proof
+ startedAt time.Time // tag started to calculate ETA
+}
+
+// New creates a new tag, stores it by the name and returns it
+// it returns an error if the tag with this name already exists
+func NewTag(uid uint32, s string, total int64) *Tag {
+ t := &Tag{
+ Uid: uid,
+ Name: s,
+ startedAt: time.Now(),
+ total: total,
+ }
+ return t
+}
+
+// Inc increments the count for a state
+func (t *Tag) Inc(state State) {
+ var v *int64
+ switch state {
+ case StateSplit:
+ v = &t.split
+ case StateStored:
+ v = &t.stored
+ case StateSeen:
+ v = &t.seen
+ case StateSent:
+ v = &t.sent
+ case StateSynced:
+ v = &t.synced
+ }
+ atomic.AddInt64(v, 1)
+}
+
+// Get returns the count for a state on a tag
+func (t *Tag) Get(state State) int64 {
+ var v *int64
+ switch state {
+ case StateSplit:
+ v = &t.split
+ case StateStored:
+ v = &t.stored
+ case StateSeen:
+ v = &t.seen
+ case StateSent:
+ v = &t.sent
+ case StateSynced:
+ v = &t.synced
+ }
+ return atomic.LoadInt64(v)
+}
+
+// GetTotal returns the total count
+func (t *Tag) Total() int64 {
+ return atomic.LoadInt64(&t.total)
+}
+
+// DoneSplit sets total count to SPLIT count and sets the associated swarm hash for this tag
+// is meant to be called when splitter finishes for input streams of unknown size
+func (t *Tag) DoneSplit(address Address) int64 {
+ total := atomic.LoadInt64(&t.split)
+ atomic.StoreInt64(&t.total, total)
+ t.Address = address
+ return total
+}
+
+// Status returns the value of state and the total count
+func (t *Tag) Status(state State) (int64, int64, error) {
+ count, seen, total := t.Get(state), atomic.LoadInt64(&t.seen), atomic.LoadInt64(&t.total)
+ if total == 0 {
+ return count, total, errNA
+ }
+ switch state {
+ case StateSplit, StateStored, StateSeen:
+ return count, total, nil
+ case StateSent, StateSynced:
+ stored := atomic.LoadInt64(&t.stored)
+ if stored < total {
+ return count, total - seen, errNA
+ }
+ return count, total - seen, nil
+ }
+ return count, total, errNA
+}
+
+// ETA returns the time of completion estimated based on time passed and rate of completion
+func (t *Tag) ETA(state State) (time.Time, error) {
+ cnt, total, err := t.Status(state)
+ if err != nil {
+ return time.Time{}, err
+ }
+ if cnt == 0 || total == 0 {
+ return time.Time{}, errNoETA
+ }
+ diff := time.Since(t.startedAt)
+ dur := time.Duration(total) * diff / time.Duration(cnt)
+ return t.startedAt.Add(dur), nil
+}
+
+// MarshalBinary marshals the tag into a byte slice
+func (tag *Tag) MarshalBinary() (data []byte, err error) {
+ buffer := make([]byte, 4)
+ binary.BigEndian.PutUint32(buffer, tag.Uid)
+ encodeInt64Append(&buffer, tag.total)
+ encodeInt64Append(&buffer, tag.split)
+ encodeInt64Append(&buffer, tag.seen)
+ encodeInt64Append(&buffer, tag.stored)
+ encodeInt64Append(&buffer, tag.sent)
+ encodeInt64Append(&buffer, tag.synced)
+
+ intBuffer := make([]byte, 8)
+
+ n := binary.PutVarint(intBuffer, tag.startedAt.Unix())
+ buffer = append(buffer, intBuffer[:n]...)
+
+ n = binary.PutVarint(intBuffer, int64(len(tag.Address)))
+ buffer = append(buffer, intBuffer[:n]...)
+
+ buffer = append(buffer, tag.Address[:]...)
+
+ buffer = append(buffer, []byte(tag.Name)...)
+
+ return buffer, nil
+}
+
+// UnmarshalBinary unmarshals a byte slice into a tag
+func (tag *Tag) UnmarshalBinary(buffer []byte) error {
+ if len(buffer) < 13 {
+ return errors.New("buffer too short")
+ }
+ tag.Uid = binary.BigEndian.Uint32(buffer)
+ buffer = buffer[4:]
+
+ tag.total = decodeInt64Splice(&buffer)
+ tag.split = decodeInt64Splice(&buffer)
+ tag.seen = decodeInt64Splice(&buffer)
+ tag.stored = decodeInt64Splice(&buffer)
+ tag.sent = decodeInt64Splice(&buffer)
+ tag.synced = decodeInt64Splice(&buffer)
+
+ t, n := binary.Varint(buffer)
+ tag.startedAt = time.Unix(t, 0)
+ buffer = buffer[n:]
+
+ t, n = binary.Varint(buffer)
+ buffer = buffer[n:]
+ if t > 0 {
+ tag.Address = buffer[:t]
+ }
+ tag.Name = string(buffer[t:])
+
+ return nil
+}
+
+func encodeInt64Append(buffer *[]byte, val int64) {
+ intBuffer := make([]byte, 8)
+ n := binary.PutVarint(intBuffer, val)
+ *buffer = append(*buffer, intBuffer[:n]...)
+}
+
+func decodeInt64Splice(buffer *[]byte) int64 {
+ val, n := binary.Varint((*buffer))
+ *buffer = (*buffer)[n:]
+ return val
+}
diff --git a/swarm/chunk/tag_test.go b/swarm/chunk/tag_test.go
new file mode 100644
index 000000000..e6acfb185
--- /dev/null
+++ b/swarm/chunk/tag_test.go
@@ -0,0 +1,273 @@
+// Copyright 2019 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package chunk
+
+import (
+ "bytes"
+ "sync"
+ "testing"
+ "time"
+)
+
+var (
+ allStates = []State{StateSplit, StateStored, StateSeen, StateSent, StateSynced}
+)
+
+// TestTagSingleIncrements tests if Inc increments the tag state value
+func TestTagSingleIncrements(t *testing.T) {
+ tg := &Tag{total: 10}
+
+ tc := []struct {
+ state uint32
+ inc int
+ expcount int64
+ exptotal int64
+ }{
+ {state: StateSplit, inc: 10, expcount: 10, exptotal: 10},
+ {state: StateStored, inc: 9, expcount: 9, exptotal: 9},
+ {state: StateSeen, inc: 1, expcount: 1, exptotal: 10},
+ {state: StateSent, inc: 9, expcount: 9, exptotal: 9},
+ {state: StateSynced, inc: 9, expcount: 9, exptotal: 9},
+ }
+
+ for _, tc := range tc {
+ for i := 0; i < tc.inc; i++ {
+ tg.Inc(tc.state)
+ }
+ }
+
+ for _, tc := range tc {
+ if tg.Get(tc.state) != tc.expcount {
+ t.Fatalf("not incremented")
+ }
+ }
+}
+
+// TestTagStatus is a unit test to cover Tag.Status method functionality
+func TestTagStatus(t *testing.T) {
+ tg := &Tag{total: 10}
+ tg.Inc(StateSeen)
+ tg.Inc(StateSent)
+ tg.Inc(StateSynced)
+
+ for i := 0; i < 10; i++ {
+ tg.Inc(StateSplit)
+ tg.Inc(StateStored)
+ }
+ for _, v := range []struct {
+ state State
+ expVal int64
+ expTotal int64
+ }{
+ {state: StateStored, expVal: 10, expTotal: 10},
+ {state: StateSplit, expVal: 10, expTotal: 10},
+ {state: StateSeen, expVal: 1, expTotal: 10},
+ {state: StateSent, expVal: 1, expTotal: 9},
+ {state: StateSynced, expVal: 1, expTotal: 9},
+ } {
+ val, total, err := tg.Status(v.state)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if val != v.expVal {
+ t.Fatalf("should be %d, got %d", v.expVal, val)
+ }
+ if total != v.expTotal {
+ t.Fatalf("expected total to be %d, got %d", v.expTotal, total)
+ }
+ }
+}
+
+// tests ETA is precise
+func TestTagETA(t *testing.T) {
+ now := time.Now()
+ maxDiff := 100000 // 100 microsecond
+ tg := &Tag{total: 10, startedAt: now}
+ time.Sleep(100 * time.Millisecond)
+ tg.Inc(StateSplit)
+ eta, err := tg.ETA(StateSplit)
+ if err != nil {
+ t.Fatal(err)
+ }
+ diff := time.Until(eta) - 9*time.Since(now)
+ if int(diff) > maxDiff {
+ t.Fatalf("ETA is not precise, got diff %v > .1ms", diff)
+ }
+}
+
+// TestTagConcurrentIncrements tests Inc calls concurrently
+func TestTagConcurrentIncrements(t *testing.T) {
+ tg := &Tag{}
+ n := 1000
+ wg := sync.WaitGroup{}
+ wg.Add(5 * n)
+ for _, f := range allStates {
+ go func(f State) {
+ for j := 0; j < n; j++ {
+ go func() {
+ tg.Inc(f)
+ wg.Done()
+ }()
+ }
+ }(f)
+ }
+ wg.Wait()
+ for _, f := range allStates {
+ v := tg.Get(f)
+ if v != int64(n) {
+ t.Fatalf("expected state %v to be %v, got %v", f, n, v)
+ }
+ }
+}
+
+// TestTagsMultipleConcurrentIncrements tests Inc calls concurrently
+func TestTagsMultipleConcurrentIncrementsSyncMap(t *testing.T) {
+ ts := NewTags()
+ n := 100
+ wg := sync.WaitGroup{}
+ wg.Add(10 * 5 * n)
+ for i := 0; i < 10; i++ {
+ s := string([]byte{uint8(i)})
+ tag, err := ts.New(s, int64(n))
+ if err != nil {
+ t.Fatal(err)
+ }
+ for _, f := range allStates {
+ go func(tag *Tag, f State) {
+ for j := 0; j < n; j++ {
+ go func() {
+ tag.Inc(f)
+ wg.Done()
+ }()
+ }
+ }(tag, f)
+ }
+ }
+ wg.Wait()
+ i := 0
+ ts.Range(func(k, v interface{}) bool {
+ i++
+ uid := k.(uint32)
+ for _, f := range allStates {
+ tag, err := ts.Get(uid)
+ if err != nil {
+ t.Fatal(err)
+ }
+ stateVal := tag.Get(f)
+ if stateVal != int64(n) {
+ t.Fatalf("expected tag %v state %v to be %v, got %v", uid, f, n, v)
+ }
+ }
+ return true
+
+ })
+ if i != 10 {
+ t.Fatal("not enough tagz")
+ }
+}
+
+// TestMarshallingWithAddr tests that marshalling and unmarshalling is done correctly when the
+// tag Address (byte slice) contains some arbitrary value
+func TestMarshallingWithAddr(t *testing.T) {
+ tg := NewTag(111, "test/tag", 10)
+ tg.Address = []byte{0, 1, 2, 3, 4, 5, 6}
+
+ for _, f := range allStates {
+ tg.Inc(f)
+ }
+
+ b, err := tg.MarshalBinary()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ unmarshalledTag := &Tag{}
+ err = unmarshalledTag.UnmarshalBinary(b)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ if unmarshalledTag.Uid != tg.Uid {
+ t.Fatalf("tag uids not equal. want %d got %d", tg.Uid, unmarshalledTag.Uid)
+ }
+
+ if unmarshalledTag.Name != tg.Name {
+ t.Fatalf("tag names not equal. want %s got %s", tg.Name, unmarshalledTag.Name)
+ }
+
+ for _, state := range allStates {
+ uv, tv := unmarshalledTag.Get(state), tg.Get(state)
+ if uv != tv {
+ t.Fatalf("state %d inconsistent. expected %d to equal %d", state, uv, tv)
+ }
+ }
+
+ if unmarshalledTag.Total() != tg.Total() {
+ t.Fatalf("tag names not equal. want %d got %d", tg.Total(), unmarshalledTag.Total())
+ }
+
+ if len(unmarshalledTag.Address) != len(tg.Address) {
+ t.Fatalf("tag addresses length mismatch, want %d, got %d", len(tg.Address), len(unmarshalledTag.Address))
+ }
+
+ if !bytes.Equal(unmarshalledTag.Address, tg.Address) {
+ t.Fatalf("expected tag address to be %v got %v", unmarshalledTag.Address, tg.Address)
+ }
+}
+
+// TestMarshallingNoAddress tests that marshalling and unmarshalling is done correctly
+// when the tag Address (byte slice) is empty in this case
+func TestMarshallingNoAddr(t *testing.T) {
+ tg := NewTag(111, "test/tag", 10)
+ for _, f := range allStates {
+ tg.Inc(f)
+ }
+
+ b, err := tg.MarshalBinary()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ unmarshalledTag := &Tag{}
+ err = unmarshalledTag.UnmarshalBinary(b)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ if unmarshalledTag.Uid != tg.Uid {
+ t.Fatalf("tag uids not equal. want %d got %d", tg.Uid, unmarshalledTag.Uid)
+ }
+
+ if unmarshalledTag.Name != tg.Name {
+ t.Fatalf("tag names not equal. want %s got %s", tg.Name, unmarshalledTag.Name)
+ }
+
+ for _, state := range allStates {
+ uv, tv := unmarshalledTag.Get(state), tg.Get(state)
+ if uv != tv {
+ t.Fatalf("state %d inconsistent. expected %d to equal %d", state, uv, tv)
+ }
+ }
+
+ if unmarshalledTag.Total() != tg.Total() {
+ t.Fatalf("tag names not equal. want %d got %d", tg.Total(), unmarshalledTag.Total())
+ }
+
+ if len(unmarshalledTag.Address) != len(tg.Address) {
+ t.Fatalf("expected tag addresses to be equal length")
+ }
+}
diff --git a/swarm/chunk/tags.go b/swarm/chunk/tags.go
new file mode 100644
index 000000000..435f5d706
--- /dev/null
+++ b/swarm/chunk/tags.go
@@ -0,0 +1,96 @@
+// Copyright 2019 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package chunk
+
+import (
+ "context"
+ "errors"
+ "math/rand"
+ "sync"
+ "time"
+
+ "github.com/ethereum/go-ethereum/swarm/sctx"
+)
+
+// Tags hold tag information indexed by a unique random uint32
+type Tags struct {
+ tags *sync.Map
+ rng *rand.Rand
+}
+
+// NewTags creates a tags object
+func NewTags() *Tags {
+ return &Tags{
+ tags: &sync.Map{},
+ rng: rand.New(rand.NewSource(time.Now().Unix())),
+ }
+}
+
+// New creates a new tag, stores it by the name and returns it
+// it returns an error if the tag with this name already exists
+func (ts *Tags) New(s string, total int64) (*Tag, error) {
+ t := &Tag{
+ Uid: ts.rng.Uint32(),
+ Name: s,
+ startedAt: time.Now(),
+ total: total,
+ }
+ if _, loaded := ts.tags.LoadOrStore(t.Uid, t); loaded {
+ return nil, errExists
+ }
+ return t, nil
+}
+
+// All returns all existing tags in Tags' sync.Map
+// Note that tags are returned in no particular order
+func (ts *Tags) All() (t []*Tag) {
+ ts.tags.Range(func(k, v interface{}) bool {
+ t = append(t, v.(*Tag))
+
+ return true
+ })
+
+ return t
+}
+
+// Get returns the undelying tag for the uid or an error if not found
+func (ts *Tags) Get(uid uint32) (*Tag, error) {
+ t, ok := ts.tags.Load(uid)
+ if !ok {
+ return nil, errors.New("tag not found")
+ }
+ return t.(*Tag), nil
+}
+
+// GetFromContext gets a tag from the tag uid stored in the context
+func (ts *Tags) GetFromContext(ctx context.Context) (*Tag, error) {
+ uid := sctx.GetTag(ctx)
+ t, ok := ts.tags.Load(uid)
+ if !ok {
+ return nil, errTagNotFound
+ }
+ return t.(*Tag), nil
+}
+
+// Range exposes sync.Map's iterator
+func (ts *Tags) Range(fn func(k, v interface{}) bool) {
+ ts.tags.Range(fn)
+}
+
+func (ts *Tags) Delete(k interface{}) {
+ ts.tags.Delete(k)
+}
diff --git a/swarm/chunk/tags_test.go b/swarm/chunk/tags_test.go
new file mode 100644
index 000000000..f818c4c5c
--- /dev/null
+++ b/swarm/chunk/tags_test.go
@@ -0,0 +1,48 @@
+// Copyright 2019 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package chunk
+
+import "testing"
+
+func TestAll(t *testing.T) {
+ ts := NewTags()
+
+ ts.New("1", 1)
+ ts.New("2", 1)
+
+ all := ts.All()
+
+ if len(all) != 2 {
+ t.Fatalf("expected length to be 2 got %d", len(all))
+ }
+
+ if n := all[0].Total(); n != 1 {
+ t.Fatalf("expected tag 0 total to be 1 got %d", n)
+ }
+
+ if n := all[1].Total(); n != 1 {
+ t.Fatalf("expected tag 1 total to be 1 got %d", n)
+ }
+
+ ts.New("3", 1)
+ all = ts.All()
+
+ if len(all) != 3 {
+ t.Fatalf("expected length to be 3 got %d", len(all))
+ }
+
+}