diff options
author | Balint Gabor <balint.g@gmail.com> | 2018-09-13 17:42:19 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-09-13 17:42:19 +0800 |
commit | 3ff2f756368f2d8ec0d1d9d25f6ba9cdabd7383e (patch) | |
tree | 62a2896b3b824449595272f0b92dda877ba1c58d /swarm/storage/types.go | |
parent | ff3a5d24d2e40fd66f7813173e9cfc31144f3c53 (diff) | |
download | go-tangerine-3ff2f756368f2d8ec0d1d9d25f6ba9cdabd7383e.tar go-tangerine-3ff2f756368f2d8ec0d1d9d25f6ba9cdabd7383e.tar.gz go-tangerine-3ff2f756368f2d8ec0d1d9d25f6ba9cdabd7383e.tar.bz2 go-tangerine-3ff2f756368f2d8ec0d1d9d25f6ba9cdabd7383e.tar.lz go-tangerine-3ff2f756368f2d8ec0d1d9d25f6ba9cdabd7383e.tar.xz go-tangerine-3ff2f756368f2d8ec0d1d9d25f6ba9cdabd7383e.tar.zst go-tangerine-3ff2f756368f2d8ec0d1d9d25f6ba9cdabd7383e.zip |
swarm: Chunk refactor (#17659)
Co-authored-by: Janos Guljas <janos@resenje.org>
Co-authored-by: Balint Gabor <balint.g@gmail.com>
Co-authored-by: Anton Evangelatov <anton.evangelatov@gmail.com>
Co-authored-by: Viktor TrĂ³n <viktor.tron@gmail.com>
Diffstat (limited to 'swarm/storage/types.go')
-rw-r--r-- | swarm/storage/types.go | 189 |
1 files changed, 111 insertions, 78 deletions
diff --git a/swarm/storage/types.go b/swarm/storage/types.go index 53e3af485..bc2af2cd7 100644 --- a/swarm/storage/types.go +++ b/swarm/storage/types.go @@ -25,16 +25,16 @@ import ( "fmt" "hash" "io" - "sync" + "io/ioutil" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto/sha3" "github.com/ethereum/go-ethereum/swarm/bmt" - "github.com/ethereum/go-ethereum/swarm/chunk" + ch "github.com/ethereum/go-ethereum/swarm/chunk" ) const MaxPO = 16 -const KeyLength = 32 +const AddressLength = 32 type Hasher func() hash.Hash type SwarmHasher func() SwarmHash @@ -116,7 +116,7 @@ func MakeHashFunc(hash string) SwarmHasher { return func() SwarmHash { hasher := sha3.NewKeccak256 hasherSize := hasher().Size() - segmentCount := chunk.DefaultSize / hasherSize + segmentCount := ch.DefaultSize / hasherSize pool := bmt.NewTreePool(hasher, segmentCount, bmt.PoolSize) return bmt.New(pool) } @@ -169,88 +169,88 @@ func (c AddressCollection) Swap(i, j int) { c[i], c[j] = c[j], c[i] } -// Chunk also serves as a request object passed to ChunkStores -// in case it is a retrieval request, Data is nil and Size is 0 -// Note that Size is not the size of the data chunk, which is Data.Size() -// but the size of the subtree encoded in the chunk -// 0 if request, to be supplied by the dpa -type Chunk struct { - Addr Address // always - SData []byte // nil if request, to be supplied by dpa - Size int64 // size of the data covered by the subtree encoded in this chunk - //Source Peer // peer - C chan bool // to signal data delivery by the dpa - ReqC chan bool // to signal the request done - dbStoredC chan bool // never remove a chunk from memStore before it is written to dbStore - dbStored bool - dbStoredMu *sync.Mutex - errored error // flag which is set when the chunk request has errored or timeouted - erroredMu sync.Mutex +// Chunk interface implemented by context.Contexts and data chunks +type Chunk interface { + Address() Address + Payload() []byte + SpanBytes() []byte + Span() int64 + Data() []byte } -func (c *Chunk) SetErrored(err error) { - c.erroredMu.Lock() - defer c.erroredMu.Unlock() +type chunk struct { + addr Address + sdata []byte + span int64 +} - c.errored = err +func NewChunk(addr Address, data []byte) *chunk { + return &chunk{ + addr: addr, + sdata: data, + span: -1, + } } -func (c *Chunk) GetErrored() error { - c.erroredMu.Lock() - defer c.erroredMu.Unlock() +func (c *chunk) Address() Address { + return c.addr +} - return c.errored +func (c *chunk) SpanBytes() []byte { + return c.sdata[:8] } -func NewChunk(addr Address, reqC chan bool) *Chunk { - return &Chunk{ - Addr: addr, - ReqC: reqC, - dbStoredC: make(chan bool), - dbStoredMu: &sync.Mutex{}, +func (c *chunk) Span() int64 { + if c.span == -1 { + c.span = int64(binary.LittleEndian.Uint64(c.sdata[:8])) } + return c.span } -func (c *Chunk) markAsStored() { - c.dbStoredMu.Lock() - defer c.dbStoredMu.Unlock() - - if !c.dbStored { - close(c.dbStoredC) - c.dbStored = true - } +func (c *chunk) Data() []byte { + return c.sdata } -func (c *Chunk) WaitToStore() error { - <-c.dbStoredC - return c.GetErrored() +func (c *chunk) Payload() []byte { + return c.sdata[8:] } -func GenerateRandomChunk(dataSize int64) *Chunk { - return GenerateRandomChunks(dataSize, 1)[0] +// String() for pretty printing +func (self *chunk) String() string { + return fmt.Sprintf("Address: %v TreeSize: %v Chunksize: %v", self.addr.Log(), self.span, len(self.sdata)) } -func GenerateRandomChunks(dataSize int64, count int) (chunks []*Chunk) { - var i int +func GenerateRandomChunk(dataSize int64) Chunk { hasher := MakeHashFunc(DefaultHash)() - if dataSize > chunk.DefaultSize { - dataSize = chunk.DefaultSize - } + sdata := make([]byte, dataSize+8) + rand.Read(sdata[8:]) + binary.LittleEndian.PutUint64(sdata[:8], uint64(dataSize)) + hasher.ResetWithLength(sdata[:8]) + hasher.Write(sdata[8:]) + return NewChunk(hasher.Sum(nil), sdata) +} - for i = 0; i < count; i++ { - chunks = append(chunks, NewChunk(nil, nil)) - chunks[i].SData = make([]byte, dataSize+8) - rand.Read(chunks[i].SData) - binary.LittleEndian.PutUint64(chunks[i].SData[:8], uint64(dataSize)) - hasher.ResetWithLength(chunks[i].SData[:8]) - hasher.Write(chunks[i].SData[8:]) - chunks[i].Addr = make([]byte, 32) - copy(chunks[i].Addr, hasher.Sum(nil)) +func GenerateRandomChunks(dataSize int64, count int) (chunks []Chunk) { + if dataSize > ch.DefaultSize { + dataSize = ch.DefaultSize + } + for i := 0; i < count; i++ { + ch := GenerateRandomChunk(ch.DefaultSize) + chunks = append(chunks, ch) } - return chunks } +func GenerateRandomData(l int) (r io.Reader, slice []byte) { + slice, err := ioutil.ReadAll(io.LimitReader(rand.Reader, int64(l))) + if err != nil { + panic("rand error") + } + // log.Warn("generate random data", "len", len(slice), "data", common.Bytes2Hex(slice)) + r = io.LimitReader(bytes.NewReader(slice), int64(l)) + return r, slice +} + // Size, Seek, Read, ReadAt type LazySectionReader interface { Context() context.Context @@ -273,18 +273,17 @@ func (r *LazyTestSectionReader) Context() context.Context { } type StoreParams struct { - Hash SwarmHasher `toml:"-"` - DbCapacity uint64 - CacheCapacity uint - ChunkRequestsCacheCapacity uint - BaseKey []byte + Hash SwarmHasher `toml:"-"` + DbCapacity uint64 + CacheCapacity uint + BaseKey []byte } func NewDefaultStoreParams() *StoreParams { - return NewStoreParams(defaultLDBCapacity, defaultCacheCapacity, defaultChunkRequestsCacheCapacity, nil, nil) + return NewStoreParams(defaultLDBCapacity, defaultCacheCapacity, nil, nil) } -func NewStoreParams(ldbCap uint64, cacheCap uint, requestsCap uint, hash SwarmHasher, basekey []byte) *StoreParams { +func NewStoreParams(ldbCap uint64, cacheCap uint, hash SwarmHasher, basekey []byte) *StoreParams { if basekey == nil { basekey = make([]byte, 32) } @@ -292,11 +291,10 @@ func NewStoreParams(ldbCap uint64, cacheCap uint, requestsCap uint, hash SwarmHa hash = MakeHashFunc(DefaultHash) } return &StoreParams{ - Hash: hash, - DbCapacity: ldbCap, - CacheCapacity: cacheCap, - ChunkRequestsCacheCapacity: requestsCap, - BaseKey: basekey, + Hash: hash, + DbCapacity: ldbCap, + CacheCapacity: cacheCap, + BaseKey: basekey, } } @@ -321,8 +319,8 @@ type Getter interface { } // NOTE: this returns invalid data if chunk is encrypted -func (c ChunkData) Size() int64 { - return int64(binary.LittleEndian.Uint64(c[:8])) +func (c ChunkData) Size() uint64 { + return binary.LittleEndian.Uint64(c[:8]) } func (c ChunkData) Data() []byte { @@ -348,7 +346,8 @@ func NewContentAddressValidator(hasher SwarmHasher) *ContentAddressValidator { // Validate that the given key is a valid content address for the given data func (v *ContentAddressValidator) Validate(addr Address, data []byte) bool { - if l := len(data); l < 9 || l > chunk.DefaultSize+8 { + if l := len(data); l < 9 || l > ch.DefaultSize+8 { + // log.Error("invalid chunk size", "chunk", addr.Hex(), "size", l) return false } @@ -359,3 +358,37 @@ func (v *ContentAddressValidator) Validate(addr Address, data []byte) bool { return bytes.Equal(hash, addr[:]) } + +type ChunkStore interface { + Put(ctx context.Context, ch Chunk) (err error) + Get(rctx context.Context, ref Address) (ch Chunk, err error) + Close() +} + +// SyncChunkStore is a ChunkStore which supports syncing +type SyncChunkStore interface { + ChunkStore + BinIndex(po uint8) uint64 + Iterator(from uint64, to uint64, po uint8, f func(Address, uint64) bool) error + FetchFunc(ctx context.Context, ref Address) func(context.Context) error +} + +// FakeChunkStore doesn't store anything, just implements the ChunkStore interface +// It can be used to inject into a hasherStore if you don't want to actually store data just do the +// hashing +type FakeChunkStore struct { +} + +// Put doesn't store anything it is just here to implement ChunkStore +func (f *FakeChunkStore) Put(_ context.Context, ch Chunk) error { + return nil +} + +// Gut doesn't store anything it is just here to implement ChunkStore +func (f *FakeChunkStore) Get(_ context.Context, ref Address) (Chunk, error) { + panic("FakeChunkStore doesn't support Get") +} + +// Close doesn't store anything it is just here to implement ChunkStore +func (f *FakeChunkStore) Close() { +} |