aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/storage/types.go
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/storage/types.go')
-rw-r--r--swarm/storage/types.go338
1 files changed, 219 insertions, 119 deletions
diff --git a/swarm/storage/types.go b/swarm/storage/types.go
index d35f1f929..b75f64205 100644
--- a/swarm/storage/types.go
+++ b/swarm/storage/types.go
@@ -19,16 +19,21 @@ package storage
import (
"bytes"
"crypto"
+ "crypto/rand"
+ "encoding/binary"
"fmt"
"hash"
"io"
"sync"
- "github.com/ethereum/go-ethereum/bmt"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto/sha3"
+ "github.com/ethereum/go-ethereum/swarm/bmt"
)
+const MaxPO = 16
+const KeyLength = 32
+
type Hasher func() hash.Hash
type SwarmHasher func() SwarmHash
@@ -36,48 +41,68 @@ type SwarmHasher func() SwarmHash
// should probably not be here? but network should wrap chunk object
type Peer interface{}
-type Key []byte
+type Address []byte
-func (x Key) Size() uint {
- return uint(len(x))
+func (a Address) Size() uint {
+ return uint(len(a))
}
-func (x Key) isEqual(y Key) bool {
- return bytes.Equal(x, y)
+func (a Address) isEqual(y Address) bool {
+ return bytes.Equal(a, y)
}
-func (h Key) bits(i, j uint) uint {
+func (a Address) bits(i, j uint) uint {
ii := i >> 3
jj := i & 7
- if ii >= h.Size() {
+ if ii >= a.Size() {
return 0
}
if jj+j <= 8 {
- return uint((h[ii] >> jj) & ((1 << j) - 1))
+ return uint((a[ii] >> jj) & ((1 << j) - 1))
}
- res := uint(h[ii] >> jj)
+ res := uint(a[ii] >> jj)
jj = 8 - jj
j -= jj
for j != 0 {
ii++
if j < 8 {
- res += uint(h[ii]&((1<<j)-1)) << jj
+ res += uint(a[ii]&((1<<j)-1)) << jj
return res
}
- res += uint(h[ii]) << jj
+ res += uint(a[ii]) << jj
jj += 8
j -= 8
}
return res
}
-func IsZeroKey(key Key) bool {
- return len(key) == 0 || bytes.Equal(key, ZeroKey)
+func Proximity(one, other []byte) (ret int) {
+ b := (MaxPO-1)/8 + 1
+ if b > len(one) {
+ b = len(one)
+ }
+ m := 8
+ for i := 0; i < b; i++ {
+ oxo := one[i] ^ other[i]
+ if i == b-1 {
+ m = MaxPO % 8
+ }
+ for j := 0; j < m; j++ {
+ if (oxo>>uint8(7-j))&0x01 != 0 {
+ return i*8 + j
+ }
+ }
+ }
+ return MaxPO
+}
+
+func IsZeroAddr(addr Address) bool {
+ return len(addr) == 0 || bytes.Equal(addr, ZeroAddr)
}
-var ZeroKey = Key(common.Hash{}.Bytes())
+var ZeroAddr = Address(common.Hash{}.Bytes())
func MakeHashFunc(hash string) SwarmHasher {
switch hash {
@@ -88,59 +113,56 @@ func MakeHashFunc(hash string) SwarmHasher {
case "BMT":
return func() SwarmHash {
hasher := sha3.NewKeccak256
- pool := bmt.NewTreePool(hasher, bmt.DefaultSegmentCount, bmt.DefaultPoolSize)
+ pool := bmt.NewTreePool(hasher, bmt.SegmentCount, bmt.PoolSize)
return bmt.New(pool)
}
}
return nil
}
-func (key Key) Hex() string {
- return fmt.Sprintf("%064x", []byte(key[:]))
+func (a Address) Hex() string {
+ return fmt.Sprintf("%064x", []byte(a[:]))
}
-func (key Key) Log() string {
- if len(key[:]) < 4 {
- return fmt.Sprintf("%x", []byte(key[:]))
+func (a Address) Log() string {
+ if len(a[:]) < 8 {
+ return fmt.Sprintf("%x", []byte(a[:]))
}
- return fmt.Sprintf("%08x", []byte(key[:4]))
+ return fmt.Sprintf("%016x", []byte(a[:8]))
}
-func (key Key) String() string {
- return fmt.Sprintf("%064x", []byte(key)[:])
+func (a Address) String() string {
+ return fmt.Sprintf("%064x", []byte(a)[:])
}
-func (key Key) MarshalJSON() (out []byte, err error) {
- return []byte(`"` + key.String() + `"`), nil
+func (a Address) MarshalJSON() (out []byte, err error) {
+ return []byte(`"` + a.String() + `"`), nil
}
-func (key *Key) UnmarshalJSON(value []byte) error {
+func (a *Address) UnmarshalJSON(value []byte) error {
s := string(value)
- *key = make([]byte, 32)
+ *a = make([]byte, 32)
h := common.Hex2Bytes(s[1 : len(s)-1])
- copy(*key, h)
+ copy(*a, h)
return nil
}
-// each chunk when first requested opens a record associated with the request
-// next time a request for the same chunk arrives, this record is updated
-// this request status keeps track of the request ID-s as well as the requesting
-// peers and has a channel that is closed when the chunk is retrieved. Multiple
-// local callers can wait on this channel (or combined with a timeout, block with a
-// select).
-type RequestStatus struct {
- Key Key
- Source Peer
- C chan bool
- Requesters map[uint64][]interface{}
-}
-
-func newRequestStatus(key Key) *RequestStatus {
- return &RequestStatus{
- Key: key,
- Requesters: make(map[uint64][]interface{}),
- C: make(chan bool),
- }
+type AddressCollection []Address
+
+func NewAddressCollection(l int) AddressCollection {
+ return make(AddressCollection, l)
+}
+
+func (c AddressCollection) Len() int {
+ return len(c)
+}
+
+func (c AddressCollection) Less(i, j int) bool {
+ return bytes.Compare(c[i], c[j]) == -1
+}
+
+func (c AddressCollection) Swap(i, j int) {
+ c[i], c[j] = c[j], c[i]
}
// Chunk also serves as a request object passed to ChunkStores
@@ -149,86 +171,80 @@ func newRequestStatus(key Key) *RequestStatus {
// but the size of the subtree encoded in the chunk
// 0 if request, to be supplied by the dpa
type Chunk struct {
- Key Key // always
- SData []byte // nil if request, to be supplied by dpa
- Size int64 // size of the data covered by the subtree encoded in this chunk
- Source Peer // peer
- C chan bool // to signal data delivery by the dpa
- Req *RequestStatus // request Status needed by netStore
- wg *sync.WaitGroup // wg to synchronize
- dbStored chan bool // never remove a chunk from memStore before it is written to dbStore
-}
-
-func NewChunk(key Key, rs *RequestStatus) *Chunk {
- return &Chunk{Key: key, Req: rs}
-}
-
-/*
-The ChunkStore interface is implemented by :
-
-- MemStore: a memory cache
-- DbStore: local disk/db store
-- LocalStore: a combination (sequence of) memStore and dbStore
-- NetStore: cloud storage abstraction layer
-- DPA: local requests for swarm storage and retrieval
-*/
-type ChunkStore interface {
- Put(*Chunk) // effectively there is no error even if there is an error
- Get(Key) (*Chunk, error)
- Close()
+ Addr Address // always
+ SData []byte // nil if request, to be supplied by dpa
+ Size int64 // size of the data covered by the subtree encoded in this chunk
+ //Source Peer // peer
+ C chan bool // to signal data delivery by the dpa
+ ReqC chan bool // to signal the request done
+ dbStoredC chan bool // never remove a chunk from memStore before it is written to dbStore
+ dbStored bool
+ dbStoredMu *sync.Mutex
+ errored error // flag which is set when the chunk request has errored or timeouted
+ erroredMu sync.Mutex
}
-/*
-Chunker is the interface to a component that is responsible for disassembling and assembling larger data and indended to be the dependency of a DPA storage system with fixed maximum chunksize.
+func (c *Chunk) SetErrored(err error) {
+ c.erroredMu.Lock()
+ defer c.erroredMu.Unlock()
+
+ c.errored = err
+}
-It relies on the underlying chunking model.
+func (c *Chunk) GetErrored() error {
+ c.erroredMu.Lock()
+ defer c.erroredMu.Unlock()
-When calling Split, the caller provides a channel (chan *Chunk) on which it receives chunks to store. The DPA delegates to storage layers (implementing ChunkStore interface).
+ return c.errored
+}
+
+func NewChunk(addr Address, reqC chan bool) *Chunk {
+ return &Chunk{
+ Addr: addr,
+ ReqC: reqC,
+ dbStoredC: make(chan bool),
+ dbStoredMu: &sync.Mutex{},
+ }
+}
-Split returns an error channel, which the caller can monitor.
-After getting notified that all the data has been split (the error channel is closed), the caller can safely read or save the root key. Optionally it times out if not all chunks get stored or not the entire stream of data has been processed. By inspecting the errc channel the caller can check if any explicit errors (typically IO read/write failures) occurred during splitting.
+func (c *Chunk) markAsStored() {
+ c.dbStoredMu.Lock()
+ defer c.dbStoredMu.Unlock()
-When calling Join with a root key, the caller gets returned a seekable lazy reader. The caller again provides a channel on which the caller receives placeholder chunks with missing data. The DPA is supposed to forward this to the chunk stores and notify the chunker if the data has been delivered (i.e. retrieved from memory cache, disk-persisted db or cloud based swarm delivery). As the seekable reader is used, the chunker then puts these together the relevant parts on demand.
-*/
-type Splitter interface {
- /*
- When splitting, data is given as a SectionReader, and the key is a hashSize long byte slice (Key), the root hash of the entire content will fill this once processing finishes.
- New chunks to store are coming to caller via the chunk storage channel, which the caller provides.
- wg is a Waitgroup (can be nil) that can be used to block until the local storage finishes
- The caller gets returned an error channel, if an error is encountered during splitting, it is fed to errC error channel.
- A closed error signals process completion at which point the key can be considered final if there were no errors.
- */
- Split(io.Reader, int64, chan *Chunk, *sync.WaitGroup, *sync.WaitGroup) (Key, error)
+ if !c.dbStored {
+ close(c.dbStoredC)
+ c.dbStored = true
+ }
+}
- /* This is the first step in making files mutable (not chunks)..
- Append allows adding more data chunks to the end of the already existsing file.
- The key for the root chunk is supplied to load the respective tree.
- Rest of the parameters behave like Split.
- */
- Append(Key, io.Reader, chan *Chunk, *sync.WaitGroup, *sync.WaitGroup) (Key, error)
+func (c *Chunk) WaitToStore() error {
+ <-c.dbStoredC
+ return c.GetErrored()
}
-type Joiner interface {
- /*
- Join reconstructs original content based on a root key.
- When joining, the caller gets returned a Lazy SectionReader, which is
- seekable and implements on-demand fetching of chunks as and where it is read.
- New chunks to retrieve are coming to caller via the Chunk channel, which the caller provides.
- If an error is encountered during joining, it appears as a reader error.
- The SectionReader.
- As a result, partial reads from a document are possible even if other parts
- are corrupt or lost.
- The chunks are not meant to be validated by the chunker when joining. This
- is because it is left to the DPA to decide which sources are trusted.
- */
- Join(key Key, chunkC chan *Chunk) LazySectionReader
+func GenerateRandomChunk(dataSize int64) *Chunk {
+ return GenerateRandomChunks(dataSize, 1)[0]
}
-type Chunker interface {
- Joiner
- Splitter
- // returns the key length
- // KeySize() int64
+func GenerateRandomChunks(dataSize int64, count int) (chunks []*Chunk) {
+ var i int
+ hasher := MakeHashFunc(DefaultHash)()
+ if dataSize > DefaultChunkSize {
+ dataSize = DefaultChunkSize
+ }
+
+ for i = 0; i < count; i++ {
+ chunks = append(chunks, NewChunk(nil, nil))
+ chunks[i].SData = make([]byte, dataSize+8)
+ rand.Read(chunks[i].SData)
+ binary.LittleEndian.PutUint64(chunks[i].SData[:8], uint64(dataSize))
+ hasher.ResetWithLength(chunks[i].SData[:8])
+ hasher.Write(chunks[i].SData[8:])
+ chunks[i].Addr = make([]byte, 32)
+ copy(chunks[i].Addr, hasher.Sum(nil))
+ }
+
+ return chunks
}
// Size, Seek, Read, ReadAt
@@ -243,6 +259,90 @@ type LazyTestSectionReader struct {
*io.SectionReader
}
-func (self *LazyTestSectionReader) Size(chan bool) (int64, error) {
- return self.SectionReader.Size(), nil
+func (r *LazyTestSectionReader) Size(chan bool) (int64, error) {
+ return r.SectionReader.Size(), nil
+}
+
+type StoreParams struct {
+ Hash SwarmHasher `toml:"-"`
+ DbCapacity uint64
+ CacheCapacity uint
+ ChunkRequestsCacheCapacity uint
+ BaseKey []byte
+}
+
+func NewDefaultStoreParams() *StoreParams {
+ return NewStoreParams(defaultLDBCapacity, defaultCacheCapacity, defaultChunkRequestsCacheCapacity, nil, nil)
+}
+
+func NewStoreParams(ldbCap uint64, cacheCap uint, requestsCap uint, hash SwarmHasher, basekey []byte) *StoreParams {
+ if basekey == nil {
+ basekey = make([]byte, 32)
+ }
+ if hash == nil {
+ hash = MakeHashFunc(DefaultHash)
+ }
+ return &StoreParams{
+ Hash: hash,
+ DbCapacity: ldbCap,
+ CacheCapacity: cacheCap,
+ ChunkRequestsCacheCapacity: requestsCap,
+ BaseKey: basekey,
+ }
+}
+
+type ChunkData []byte
+
+type Reference []byte
+
+// Putter is responsible to store data and create a reference for it
+type Putter interface {
+ Put(ChunkData) (Reference, error)
+ // RefSize returns the length of the Reference created by this Putter
+ RefSize() int64
+ // Close is to indicate that no more chunk data will be Put on this Putter
+ Close()
+ // Wait returns if all data has been store and the Close() was called.
+ Wait()
+}
+
+// Getter is an interface to retrieve a chunk's data by its reference
+type Getter interface {
+ Get(Reference) (ChunkData, error)
+}
+
+// NOTE: this returns invalid data if chunk is encrypted
+func (c ChunkData) Size() int64 {
+ return int64(binary.LittleEndian.Uint64(c[:8]))
+}
+
+func (c ChunkData) Data() []byte {
+ return c[8:]
+}
+
+type ChunkValidator interface {
+ Validate(addr Address, data []byte) bool
+}
+
+// Provides method for validation of content address in chunks
+// Holds the corresponding hasher to create the address
+type ContentAddressValidator struct {
+ Hasher SwarmHasher
+}
+
+// Constructor
+func NewContentAddressValidator(hasher SwarmHasher) *ContentAddressValidator {
+ return &ContentAddressValidator{
+ Hasher: hasher,
+ }
+}
+
+// Validate that the given key is a valid content address for the given data
+func (v *ContentAddressValidator) Validate(addr Address, data []byte) bool {
+ hasher := v.Hasher()
+ hasher.ResetWithLength(data[:8])
+ hasher.Write(data[8:])
+ hash := hasher.Sum(nil)
+
+ return bytes.Equal(hash, addr[:])
}