aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/storage/types.go
diff options
context:
space:
mode:
authorBalint Gabor <balint.g@gmail.com>2018-09-13 17:42:19 +0800
committerGitHub <noreply@github.com>2018-09-13 17:42:19 +0800
commit3ff2f756368f2d8ec0d1d9d25f6ba9cdabd7383e (patch)
tree62a2896b3b824449595272f0b92dda877ba1c58d /swarm/storage/types.go
parentff3a5d24d2e40fd66f7813173e9cfc31144f3c53 (diff)
downloadgo-tangerine-3ff2f756368f2d8ec0d1d9d25f6ba9cdabd7383e.tar
go-tangerine-3ff2f756368f2d8ec0d1d9d25f6ba9cdabd7383e.tar.gz
go-tangerine-3ff2f756368f2d8ec0d1d9d25f6ba9cdabd7383e.tar.bz2
go-tangerine-3ff2f756368f2d8ec0d1d9d25f6ba9cdabd7383e.tar.lz
go-tangerine-3ff2f756368f2d8ec0d1d9d25f6ba9cdabd7383e.tar.xz
go-tangerine-3ff2f756368f2d8ec0d1d9d25f6ba9cdabd7383e.tar.zst
go-tangerine-3ff2f756368f2d8ec0d1d9d25f6ba9cdabd7383e.zip
swarm: Chunk refactor (#17659)
Co-authored-by: Janos Guljas <janos@resenje.org> Co-authored-by: Balint Gabor <balint.g@gmail.com> Co-authored-by: Anton Evangelatov <anton.evangelatov@gmail.com> Co-authored-by: Viktor TrĂ³n <viktor.tron@gmail.com>
Diffstat (limited to 'swarm/storage/types.go')
-rw-r--r--swarm/storage/types.go189
1 files changed, 111 insertions, 78 deletions
diff --git a/swarm/storage/types.go b/swarm/storage/types.go
index 53e3af485..bc2af2cd7 100644
--- a/swarm/storage/types.go
+++ b/swarm/storage/types.go
@@ -25,16 +25,16 @@ import (
"fmt"
"hash"
"io"
- "sync"
+ "io/ioutil"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto/sha3"
"github.com/ethereum/go-ethereum/swarm/bmt"
- "github.com/ethereum/go-ethereum/swarm/chunk"
+ ch "github.com/ethereum/go-ethereum/swarm/chunk"
)
const MaxPO = 16
-const KeyLength = 32
+const AddressLength = 32
type Hasher func() hash.Hash
type SwarmHasher func() SwarmHash
@@ -116,7 +116,7 @@ func MakeHashFunc(hash string) SwarmHasher {
return func() SwarmHash {
hasher := sha3.NewKeccak256
hasherSize := hasher().Size()
- segmentCount := chunk.DefaultSize / hasherSize
+ segmentCount := ch.DefaultSize / hasherSize
pool := bmt.NewTreePool(hasher, segmentCount, bmt.PoolSize)
return bmt.New(pool)
}
@@ -169,88 +169,88 @@ func (c AddressCollection) Swap(i, j int) {
c[i], c[j] = c[j], c[i]
}
-// Chunk also serves as a request object passed to ChunkStores
-// in case it is a retrieval request, Data is nil and Size is 0
-// Note that Size is not the size of the data chunk, which is Data.Size()
-// but the size of the subtree encoded in the chunk
-// 0 if request, to be supplied by the dpa
-type Chunk struct {
- Addr Address // always
- SData []byte // nil if request, to be supplied by dpa
- Size int64 // size of the data covered by the subtree encoded in this chunk
- //Source Peer // peer
- C chan bool // to signal data delivery by the dpa
- ReqC chan bool // to signal the request done
- dbStoredC chan bool // never remove a chunk from memStore before it is written to dbStore
- dbStored bool
- dbStoredMu *sync.Mutex
- errored error // flag which is set when the chunk request has errored or timeouted
- erroredMu sync.Mutex
+// Chunk interface implemented by context.Contexts and data chunks
+type Chunk interface {
+ Address() Address
+ Payload() []byte
+ SpanBytes() []byte
+ Span() int64
+ Data() []byte
}
-func (c *Chunk) SetErrored(err error) {
- c.erroredMu.Lock()
- defer c.erroredMu.Unlock()
+type chunk struct {
+ addr Address
+ sdata []byte
+ span int64
+}
- c.errored = err
+func NewChunk(addr Address, data []byte) *chunk {
+ return &chunk{
+ addr: addr,
+ sdata: data,
+ span: -1,
+ }
}
-func (c *Chunk) GetErrored() error {
- c.erroredMu.Lock()
- defer c.erroredMu.Unlock()
+func (c *chunk) Address() Address {
+ return c.addr
+}
- return c.errored
+func (c *chunk) SpanBytes() []byte {
+ return c.sdata[:8]
}
-func NewChunk(addr Address, reqC chan bool) *Chunk {
- return &Chunk{
- Addr: addr,
- ReqC: reqC,
- dbStoredC: make(chan bool),
- dbStoredMu: &sync.Mutex{},
+func (c *chunk) Span() int64 {
+ if c.span == -1 {
+ c.span = int64(binary.LittleEndian.Uint64(c.sdata[:8]))
}
+ return c.span
}
-func (c *Chunk) markAsStored() {
- c.dbStoredMu.Lock()
- defer c.dbStoredMu.Unlock()
-
- if !c.dbStored {
- close(c.dbStoredC)
- c.dbStored = true
- }
+func (c *chunk) Data() []byte {
+ return c.sdata
}
-func (c *Chunk) WaitToStore() error {
- <-c.dbStoredC
- return c.GetErrored()
+func (c *chunk) Payload() []byte {
+ return c.sdata[8:]
}
-func GenerateRandomChunk(dataSize int64) *Chunk {
- return GenerateRandomChunks(dataSize, 1)[0]
+// String() for pretty printing
+func (self *chunk) String() string {
+ return fmt.Sprintf("Address: %v TreeSize: %v Chunksize: %v", self.addr.Log(), self.span, len(self.sdata))
}
-func GenerateRandomChunks(dataSize int64, count int) (chunks []*Chunk) {
- var i int
+func GenerateRandomChunk(dataSize int64) Chunk {
hasher := MakeHashFunc(DefaultHash)()
- if dataSize > chunk.DefaultSize {
- dataSize = chunk.DefaultSize
- }
+ sdata := make([]byte, dataSize+8)
+ rand.Read(sdata[8:])
+ binary.LittleEndian.PutUint64(sdata[:8], uint64(dataSize))
+ hasher.ResetWithLength(sdata[:8])
+ hasher.Write(sdata[8:])
+ return NewChunk(hasher.Sum(nil), sdata)
+}
- for i = 0; i < count; i++ {
- chunks = append(chunks, NewChunk(nil, nil))
- chunks[i].SData = make([]byte, dataSize+8)
- rand.Read(chunks[i].SData)
- binary.LittleEndian.PutUint64(chunks[i].SData[:8], uint64(dataSize))
- hasher.ResetWithLength(chunks[i].SData[:8])
- hasher.Write(chunks[i].SData[8:])
- chunks[i].Addr = make([]byte, 32)
- copy(chunks[i].Addr, hasher.Sum(nil))
+func GenerateRandomChunks(dataSize int64, count int) (chunks []Chunk) {
+ if dataSize > ch.DefaultSize {
+ dataSize = ch.DefaultSize
+ }
+ for i := 0; i < count; i++ {
+ ch := GenerateRandomChunk(ch.DefaultSize)
+ chunks = append(chunks, ch)
}
-
return chunks
}
+func GenerateRandomData(l int) (r io.Reader, slice []byte) {
+ slice, err := ioutil.ReadAll(io.LimitReader(rand.Reader, int64(l)))
+ if err != nil {
+ panic("rand error")
+ }
+ // log.Warn("generate random data", "len", len(slice), "data", common.Bytes2Hex(slice))
+ r = io.LimitReader(bytes.NewReader(slice), int64(l))
+ return r, slice
+}
+
// Size, Seek, Read, ReadAt
type LazySectionReader interface {
Context() context.Context
@@ -273,18 +273,17 @@ func (r *LazyTestSectionReader) Context() context.Context {
}
type StoreParams struct {
- Hash SwarmHasher `toml:"-"`
- DbCapacity uint64
- CacheCapacity uint
- ChunkRequestsCacheCapacity uint
- BaseKey []byte
+ Hash SwarmHasher `toml:"-"`
+ DbCapacity uint64
+ CacheCapacity uint
+ BaseKey []byte
}
func NewDefaultStoreParams() *StoreParams {
- return NewStoreParams(defaultLDBCapacity, defaultCacheCapacity, defaultChunkRequestsCacheCapacity, nil, nil)
+ return NewStoreParams(defaultLDBCapacity, defaultCacheCapacity, nil, nil)
}
-func NewStoreParams(ldbCap uint64, cacheCap uint, requestsCap uint, hash SwarmHasher, basekey []byte) *StoreParams {
+func NewStoreParams(ldbCap uint64, cacheCap uint, hash SwarmHasher, basekey []byte) *StoreParams {
if basekey == nil {
basekey = make([]byte, 32)
}
@@ -292,11 +291,10 @@ func NewStoreParams(ldbCap uint64, cacheCap uint, requestsCap uint, hash SwarmHa
hash = MakeHashFunc(DefaultHash)
}
return &StoreParams{
- Hash: hash,
- DbCapacity: ldbCap,
- CacheCapacity: cacheCap,
- ChunkRequestsCacheCapacity: requestsCap,
- BaseKey: basekey,
+ Hash: hash,
+ DbCapacity: ldbCap,
+ CacheCapacity: cacheCap,
+ BaseKey: basekey,
}
}
@@ -321,8 +319,8 @@ type Getter interface {
}
// NOTE: this returns invalid data if chunk is encrypted
-func (c ChunkData) Size() int64 {
- return int64(binary.LittleEndian.Uint64(c[:8]))
+func (c ChunkData) Size() uint64 {
+ return binary.LittleEndian.Uint64(c[:8])
}
func (c ChunkData) Data() []byte {
@@ -348,7 +346,8 @@ func NewContentAddressValidator(hasher SwarmHasher) *ContentAddressValidator {
// Validate that the given key is a valid content address for the given data
func (v *ContentAddressValidator) Validate(addr Address, data []byte) bool {
- if l := len(data); l < 9 || l > chunk.DefaultSize+8 {
+ if l := len(data); l < 9 || l > ch.DefaultSize+8 {
+ // log.Error("invalid chunk size", "chunk", addr.Hex(), "size", l)
return false
}
@@ -359,3 +358,37 @@ func (v *ContentAddressValidator) Validate(addr Address, data []byte) bool {
return bytes.Equal(hash, addr[:])
}
+
+type ChunkStore interface {
+ Put(ctx context.Context, ch Chunk) (err error)
+ Get(rctx context.Context, ref Address) (ch Chunk, err error)
+ Close()
+}
+
+// SyncChunkStore is a ChunkStore which supports syncing
+type SyncChunkStore interface {
+ ChunkStore
+ BinIndex(po uint8) uint64
+ Iterator(from uint64, to uint64, po uint8, f func(Address, uint64) bool) error
+ FetchFunc(ctx context.Context, ref Address) func(context.Context) error
+}
+
+// FakeChunkStore doesn't store anything, just implements the ChunkStore interface
+// It can be used to inject into a hasherStore if you don't want to actually store data just do the
+// hashing
+type FakeChunkStore struct {
+}
+
+// Put doesn't store anything it is just here to implement ChunkStore
+func (f *FakeChunkStore) Put(_ context.Context, ch Chunk) error {
+ return nil
+}
+
+// Gut doesn't store anything it is just here to implement ChunkStore
+func (f *FakeChunkStore) Get(_ context.Context, ref Address) (Chunk, error) {
+ panic("FakeChunkStore doesn't support Get")
+}
+
+// Close doesn't store anything it is just here to implement ChunkStore
+func (f *FakeChunkStore) Close() {
+}