aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorZahoor Mohamed <zahoor@zahoor.in>2017-09-22 04:22:51 +0800
committerFelix Lange <fjl@users.noreply.github.com>2017-09-22 04:22:51 +0800
commitd558a595adf4e89bab5b28ffde1448dc1e5768b0 (patch)
tree1316cca927bfd4dfc4a8673ae0b9c2f75724f07e
parent3c8656347f67dbc8e57c663ec5c26d24c4151678 (diff)
downloadgo-tangerine-d558a595adf4e89bab5b28ffde1448dc1e5768b0.tar
go-tangerine-d558a595adf4e89bab5b28ffde1448dc1e5768b0.tar.gz
go-tangerine-d558a595adf4e89bab5b28ffde1448dc1e5768b0.tar.bz2
go-tangerine-d558a595adf4e89bab5b28ffde1448dc1e5768b0.tar.lz
go-tangerine-d558a595adf4e89bab5b28ffde1448dc1e5768b0.tar.xz
go-tangerine-d558a595adf4e89bab5b28ffde1448dc1e5768b0.tar.zst
go-tangerine-d558a595adf4e89bab5b28ffde1448dc1e5768b0.zip
swarm/storage: pyramid chunker re-write (#14382)
-rw-r--r--.gitignore3
-rw-r--r--swarm/network/depo.go4
-rw-r--r--swarm/storage/chunker.go93
-rw-r--r--swarm/storage/chunker_test.go388
-rw-r--r--swarm/storage/common_test.go2
-rw-r--r--swarm/storage/dbstore.go4
-rw-r--r--swarm/storage/dbstore_test.go2
-rw-r--r--swarm/storage/localstore.go2
-rw-r--r--swarm/storage/netstore.go4
-rw-r--r--swarm/storage/pyramid.go681
-rw-r--r--swarm/storage/swarmhasher.go40
-rw-r--r--swarm/storage/types.go22
12 files changed, 1010 insertions, 235 deletions
diff --git a/.gitignore b/.gitignore
index e53e461dc..cb2c2d14d 100644
--- a/.gitignore
+++ b/.gitignore
@@ -30,3 +30,6 @@ build/_vendor/pkg
# travis
profile.tmp
profile.cov
+
+# IdeaIDE
+.idea
diff --git a/swarm/network/depo.go b/swarm/network/depo.go
index e76bfa66c..8695bf5d9 100644
--- a/swarm/network/depo.go
+++ b/swarm/network/depo.go
@@ -29,12 +29,12 @@ import (
// Handler for storage/retrieval related protocol requests
// implements the StorageHandler interface used by the bzz protocol
type Depo struct {
- hashfunc storage.Hasher
+ hashfunc storage.SwarmHasher
localStore storage.ChunkStore
netStore storage.ChunkStore
}
-func NewDepo(hash storage.Hasher, localStore, remoteStore storage.ChunkStore) *Depo {
+func NewDepo(hash storage.SwarmHasher, localStore, remoteStore storage.ChunkStore) *Depo {
return &Depo{
hashfunc: hash,
localStore: localStore,
diff --git a/swarm/storage/chunker.go b/swarm/storage/chunker.go
index ca85e4333..0454828b9 100644
--- a/swarm/storage/chunker.go
+++ b/swarm/storage/chunker.go
@@ -20,9 +20,9 @@ import (
"encoding/binary"
"errors"
"fmt"
- "hash"
"io"
"sync"
+ "time"
)
/*
@@ -50,14 +50,6 @@ data_{i} := size(subtree_{i}) || key_{j} || key_{j+1} .... || key_{j+n-1}
The underlying hash function is configurable
*/
-const (
- defaultHash = "SHA3"
- // defaultHash = "BMTSHA3" // http://golang.org/pkg/hash/#Hash
- // defaultHash = "SHA256" // http://golang.org/pkg/hash/#Hash
- defaultBranches int64 = 128
- // hashSize int64 = hasherfunc.New().Size() // hasher knows about its own length in bytes
- // chunksize int64 = branches * hashSize // chunk is defined as this
-)
/*
Tree chunker is a concrete implementation of data chunking.
@@ -67,25 +59,19 @@ If all is well it is possible to implement this by simply composing readers so t
The hashing itself does use extra copies and allocation though, since it does need it.
*/
-type ChunkerParams struct {
- Branches int64
- Hash string
-}
-
-func NewChunkerParams() *ChunkerParams {
- return &ChunkerParams{
- Branches: defaultBranches,
- Hash: defaultHash,
- }
-}
+var (
+ errAppendOppNotSuported = errors.New("Append operation not supported")
+ errOperationTimedOut = errors.New("operation timed out")
+)
type TreeChunker struct {
branches int64
- hashFunc Hasher
+ hashFunc SwarmHasher
// calculated
hashSize int64 // self.hashFunc.New().Size()
chunkSize int64 // hashSize* branches
- workerCount int
+ workerCount int64 // the number of worker routines used
+ workerLock sync.RWMutex // lock for the worker count
}
func NewTreeChunker(params *ChunkerParams) (self *TreeChunker) {
@@ -94,7 +80,8 @@ func NewTreeChunker(params *ChunkerParams) (self *TreeChunker) {
self.branches = params.Branches
self.hashSize = int64(self.hashFunc().Size())
self.chunkSize = self.hashSize * self.branches
- self.workerCount = 1
+ self.workerCount = 0
+
return
}
@@ -114,13 +101,31 @@ type hashJob struct {
parentWg *sync.WaitGroup
}
-func (self *TreeChunker) Split(data io.Reader, size int64, chunkC chan *Chunk, swg, wwg *sync.WaitGroup) (Key, error) {
+func (self *TreeChunker) incrementWorkerCount() {
+ self.workerLock.Lock()
+ defer self.workerLock.Unlock()
+ self.workerCount += 1
+}
+
+func (self *TreeChunker) getWorkerCount() int64 {
+ self.workerLock.RLock()
+ defer self.workerLock.RUnlock()
+ return self.workerCount
+}
+func (self *TreeChunker) decrementWorkerCount() {
+ self.workerLock.Lock()
+ defer self.workerLock.Unlock()
+ self.workerCount -= 1
+}
+
+func (self *TreeChunker) Split(data io.Reader, size int64, chunkC chan *Chunk, swg, wwg *sync.WaitGroup) (Key, error) {
if self.chunkSize <= 0 {
panic("chunker must be initialised")
}
- jobC := make(chan *hashJob, 2*processors)
+
+ jobC := make(chan *hashJob, 2*ChunkProcessors)
wg := &sync.WaitGroup{}
errC := make(chan error)
quitC := make(chan bool)
@@ -129,6 +134,8 @@ func (self *TreeChunker) Split(data io.Reader, size int64, chunkC chan *Chunk, s
if wwg != nil {
wwg.Add(1)
}
+
+ self.incrementWorkerCount()
go self.hashWorker(jobC, chunkC, errC, quitC, swg, wwg)
depth := 0
@@ -157,10 +164,15 @@ func (self *TreeChunker) Split(data io.Reader, size int64, chunkC chan *Chunk, s
close(errC)
}()
- //TODO: add a timeout
- if err := <-errC; err != nil {
- close(quitC)
- return nil, err
+
+ defer close(quitC)
+ select {
+ case err := <-errC:
+ if err != nil {
+ return nil, err
+ }
+ case <-time.NewTimer(splitTimeout).C:
+ return nil,errOperationTimedOut
}
return key, nil
@@ -168,6 +180,8 @@ func (self *TreeChunker) Split(data io.Reader, size int64, chunkC chan *Chunk, s
func (self *TreeChunker) split(depth int, treeSize int64, key Key, data io.Reader, size int64, jobC chan *hashJob, chunkC chan *Chunk, errC chan error, quitC chan bool, parentWg, swg, wwg *sync.WaitGroup) {
+ //
+
for depth > 0 && size < treeSize {
treeSize /= self.branches
depth--
@@ -223,12 +237,15 @@ func (self *TreeChunker) split(depth int, treeSize int64, key Key, data io.Reade
// parentWg.Add(1)
// go func() {
childrenWg.Wait()
- if len(jobC) > self.workerCount && self.workerCount < processors {
+
+ worker := self.getWorkerCount()
+ if int64(len(jobC)) > worker && worker < ChunkProcessors {
if wwg != nil {
wwg.Add(1)
}
- self.workerCount++
+ self.incrementWorkerCount()
go self.hashWorker(jobC, chunkC, errC, quitC, swg, wwg)
+
}
select {
case jobC <- &hashJob{key, chunk, size, parentWg}:
@@ -237,6 +254,8 @@ func (self *TreeChunker) split(depth int, treeSize int64, key Key, data io.Reade
}
func (self *TreeChunker) hashWorker(jobC chan *hashJob, chunkC chan *Chunk, errC chan error, quitC chan bool, swg, wwg *sync.WaitGroup) {
+ defer self.decrementWorkerCount()
+
hasher := self.hashFunc()
if wwg != nil {
defer wwg.Done()
@@ -249,7 +268,6 @@ func (self *TreeChunker) hashWorker(jobC chan *hashJob, chunkC chan *Chunk, errC
return
}
// now we got the hashes in the chunk, then hash the chunks
- hasher.Reset()
self.hashChunk(hasher, job, chunkC, swg)
case <-quitC:
return
@@ -260,9 +278,11 @@ func (self *TreeChunker) hashWorker(jobC chan *hashJob, chunkC chan *Chunk, errC
// The treeChunkers own Hash hashes together
// - the size (of the subtree encoded in the Chunk)
// - the Chunk, ie. the contents read from the input reader
-func (self *TreeChunker) hashChunk(hasher hash.Hash, job *hashJob, chunkC chan *Chunk, swg *sync.WaitGroup) {
- hasher.Write(job.chunk)
+func (self *TreeChunker) hashChunk(hasher SwarmHash, job *hashJob, chunkC chan *Chunk, swg *sync.WaitGroup) {
+ hasher.ResetWithLength(job.chunk[:8]) // 8 bytes of length
+ hasher.Write(job.chunk[8:]) // minus 8 []byte length
h := hasher.Sum(nil)
+
newChunk := &Chunk{
Key: h,
SData: job.chunk,
@@ -285,6 +305,10 @@ func (self *TreeChunker) hashChunk(hasher hash.Hash, job *hashJob, chunkC chan *
}
}
+func (self *TreeChunker) Append(key Key, data io.Reader, chunkC chan *Chunk, swg, wwg *sync.WaitGroup) (Key, error) {
+ return nil, errAppendOppNotSuported
+}
+
// LazyChunkReader implements LazySectionReader
type LazyChunkReader struct {
key Key // root key
@@ -298,7 +322,6 @@ type LazyChunkReader struct {
// implements the Joiner interface
func (self *TreeChunker) Join(key Key, chunkC chan *Chunk) LazySectionReader {
-
return &LazyChunkReader{
key: key,
chunkC: chunkC,
diff --git a/swarm/storage/chunker_test.go b/swarm/storage/chunker_test.go
index 426074e59..b41d7dd33 100644
--- a/swarm/storage/chunker_test.go
+++ b/swarm/storage/chunker_test.go
@@ -20,12 +20,14 @@ import (
"bytes"
"crypto/rand"
"encoding/binary"
+ "errors"
"fmt"
"io"
- "runtime"
"sync"
"testing"
"time"
+
+ "github.com/ethereum/go-ethereum/crypto/sha3"
)
/*
@@ -43,7 +45,7 @@ type chunkerTester struct {
t test
}
-func (self *chunkerTester) Split(chunker Splitter, data io.Reader, size int64, chunkC chan *Chunk, swg *sync.WaitGroup, expectedError error) (key Key) {
+func (self *chunkerTester) Split(chunker Splitter, data io.Reader, size int64, chunkC chan *Chunk, swg *sync.WaitGroup, expectedError error) (key Key, err error) {
// reset
self.chunks = make(map[string]*Chunk)
@@ -54,13 +56,13 @@ func (self *chunkerTester) Split(chunker Splitter, data io.Reader, size int64, c
quitC := make(chan bool)
timeout := time.After(600 * time.Second)
if chunkC != nil {
- go func() {
+ go func() error {
for {
select {
case <-timeout:
- self.t.Fatalf("Join timeout error")
+ return errors.New(("Split timeout error"))
case <-quitC:
- return
+ return nil
case chunk := <-chunkC:
// self.chunks = append(self.chunks, chunk)
self.chunks[chunk.Key.String()] = chunk
@@ -68,22 +70,69 @@ func (self *chunkerTester) Split(chunker Splitter, data io.Reader, size int64, c
chunk.wg.Done()
}
}
+
}
}()
}
- key, err := chunker.Split(data, size, chunkC, swg, nil)
+
+ key, err = chunker.Split(data, size, chunkC, swg, nil)
if err != nil && expectedError == nil {
- self.t.Fatalf("Split error: %v", err)
- } else if expectedError != nil && (err == nil || err.Error() != expectedError.Error()) {
- self.t.Fatalf("Not receiving the correct error! Expected %v, received %v", expectedError, err)
+ err = errors.New(fmt.Sprintf("Split error: %v", err))
}
+
if chunkC != nil {
if swg != nil {
swg.Wait()
}
close(quitC)
}
- return
+ return key, err
+}
+
+func (self *chunkerTester) Append(chunker Splitter, rootKey Key, data io.Reader, chunkC chan *Chunk, swg *sync.WaitGroup, expectedError error) (key Key, err error) {
+ quitC := make(chan bool)
+ timeout := time.After(60 * time.Second)
+ if chunkC != nil {
+ go func() error {
+ for {
+ select {
+ case <-timeout:
+ return errors.New(("Append timeout error"))
+ case <-quitC:
+ return nil
+ case chunk := <-chunkC:
+ if chunk != nil {
+ stored, success := self.chunks[chunk.Key.String()]
+ if !success {
+ // Requesting data
+ self.chunks[chunk.Key.String()] = chunk
+ if chunk.wg != nil {
+ chunk.wg.Done()
+ }
+ } else {
+ // getting data
+ chunk.SData = stored.SData
+ chunk.Size = int64(binary.LittleEndian.Uint64(chunk.SData[0:8]))
+ close(chunk.C)
+ }
+ }
+ }
+ }
+ }()
+ }
+
+ key, err = chunker.Append(rootKey, data, chunkC, swg, nil)
+ if err != nil && expectedError == nil {
+ err = errors.New(fmt.Sprintf("Append error: %v", err))
+ }
+
+ if chunkC != nil {
+ if swg != nil {
+ swg.Wait()
+ }
+ close(quitC)
+ }
+ return key, err
}
func (self *chunkerTester) Join(chunker Chunker, key Key, c int, chunkC chan *Chunk, quitC chan bool) LazySectionReader {
@@ -93,22 +142,20 @@ func (self *chunkerTester) Join(chunker Chunker, key Key, c int, chunkC chan *Ch
timeout := time.After(600 * time.Second)
i := 0
- go func() {
+ go func() error {
for {
select {
case <-timeout:
- self.t.Fatalf("Join timeout error")
-
+ return errors.New(("Join timeout error"))
case chunk, ok := <-chunkC:
if !ok {
close(quitC)
- return
+ return nil
}
// this just mocks the behaviour of a chunk store retrieval
stored, success := self.chunks[chunk.Key.String()]
if !success {
- self.t.Fatalf("not found")
- return
+ return errors.New(("Not found"))
}
chunk.SData = stored.SData
chunk.Size = int64(binary.LittleEndian.Uint64(chunk.SData[0:8]))
@@ -136,11 +183,15 @@ func testRandomBrokenData(splitter Splitter, n int, tester *chunkerTester) {
chunkC := make(chan *Chunk, 1000)
swg := &sync.WaitGroup{}
- key := tester.Split(splitter, brokendata, int64(n), chunkC, swg, fmt.Errorf("Broken reader"))
+ expectedError := fmt.Errorf("Broken reader")
+ key, err := tester.Split(splitter, brokendata, int64(n), chunkC, swg, expectedError)
+ if err == nil || err.Error() != expectedError.Error() {
+ tester.t.Fatalf("Not receiving the correct error! Expected %v, received %v", expectedError, err)
+ }
tester.t.Logf(" Key = %v\n", key)
}
-func testRandomData(splitter Splitter, n int, tester *chunkerTester) {
+func testRandomData(splitter Splitter, n int, tester *chunkerTester) Key {
if tester.inputs == nil {
tester.inputs = make(map[uint64][]byte)
}
@@ -156,7 +207,10 @@ func testRandomData(splitter Splitter, n int, tester *chunkerTester) {
chunkC := make(chan *Chunk, 1000)
swg := &sync.WaitGroup{}
- key := tester.Split(splitter, data, int64(n), chunkC, swg, nil)
+ key, err := tester.Split(splitter, data, int64(n), chunkC, swg, nil)
+ if err != nil {
+ tester.t.Fatalf(err.Error())
+ }
tester.t.Logf(" Key = %v\n", key)
chunkC = make(chan *Chunk, 1000)
@@ -176,29 +230,145 @@ func testRandomData(splitter Splitter, n int, tester *chunkerTester) {
}
close(chunkC)
<-quitC
+
+ return key
+}
+
+func testRandomDataAppend(splitter Splitter, n, m int, tester *chunkerTester) {
+ if tester.inputs == nil {
+ tester.inputs = make(map[uint64][]byte)
+ }
+ input, found := tester.inputs[uint64(n)]
+ var data io.Reader
+ if !found {
+ data, input = testDataReaderAndSlice(n)
+ tester.inputs[uint64(n)] = input
+ } else {
+ data = io.LimitReader(bytes.NewReader(input), int64(n))
+ }
+
+ chunkC := make(chan *Chunk, 1000)
+ swg := &sync.WaitGroup{}
+
+ key, err := tester.Split(splitter, data, int64(n), chunkC, swg, nil)
+ if err != nil {
+ tester.t.Fatalf(err.Error())
+ }
+ tester.t.Logf(" Key = %v\n", key)
+
+ //create a append data stream
+ appendInput, found := tester.inputs[uint64(m)]
+ var appendData io.Reader
+ if !found {
+ appendData, appendInput = testDataReaderAndSlice(m)
+ tester.inputs[uint64(m)] = appendInput
+ } else {
+ appendData = io.LimitReader(bytes.NewReader(appendInput), int64(m))
+ }
+
+ chunkC = make(chan *Chunk, 1000)
+ swg = &sync.WaitGroup{}
+
+ newKey, err := tester.Append(splitter, key, appendData, chunkC, swg, nil)
+ if err != nil {
+ tester.t.Fatalf(err.Error())
+ }
+ tester.t.Logf(" NewKey = %v\n", newKey)
+
+ chunkC = make(chan *Chunk, 1000)
+ quitC := make(chan bool)
+
+ chunker := NewTreeChunker(NewChunkerParams())
+ reader := tester.Join(chunker, newKey, 0, chunkC, quitC)
+ newOutput := make([]byte, n+m)
+ r, err := reader.Read(newOutput)
+ if r != (n + m) {
+ tester.t.Fatalf("read error read: %v n = %v err = %v\n", r, n, err)
+ }
+
+ newInput := append(input, appendInput...)
+ if !bytes.Equal(newOutput, newInput) {
+ tester.t.Fatalf("input and output mismatch\n IN: %v\nOUT: %v\n", newInput, newOutput)
+ }
+
+ close(chunkC)
+}
+
+func TestSha3ForCorrectness(t *testing.T) {
+ tester := &chunkerTester{t: t}
+
+ size := 4096
+ input := make([]byte, size+8)
+ binary.LittleEndian.PutUint64(input[:8], uint64(size))
+
+ io.LimitReader(bytes.NewReader(input[8:]), int64(size))
+
+ rawSha3 := sha3.NewKeccak256()
+ rawSha3.Reset()
+ rawSha3.Write(input)
+ rawSha3Output := rawSha3.Sum(nil)
+
+ sha3FromMakeFunc := MakeHashFunc(SHA3Hash)()
+ sha3FromMakeFunc.ResetWithLength(input[:8])
+ sha3FromMakeFunc.Write(input[8:])
+ sha3FromMakeFuncOutput := sha3FromMakeFunc.Sum(nil)
+
+ if len(rawSha3Output) != len(sha3FromMakeFuncOutput) {
+ tester.t.Fatalf("Original SHA3 and abstracted Sha3 has different length %v:%v\n", len(rawSha3Output), len(sha3FromMakeFuncOutput))
+ }
+
+ if !bytes.Equal(rawSha3Output, sha3FromMakeFuncOutput) {
+ tester.t.Fatalf("Original SHA3 and abstracted Sha3 mismatch %v:%v\n", rawSha3Output, sha3FromMakeFuncOutput)
+ }
+
+}
+
+func TestDataAppend(t *testing.T) {
+ sizes := []int{1, 1, 1, 4095, 4096, 4097, 1, 1, 1, 123456, 2345678, 2345678}
+ appendSizes := []int{4095, 4096, 4097, 1, 1, 1, 8191, 8192, 8193, 9000, 3000, 5000}
+
+ tester := &chunkerTester{t: t}
+ chunker := NewPyramidChunker(NewChunkerParams())
+ for i, s := range sizes {
+ testRandomDataAppend(chunker, s, appendSizes[i], tester)
+
+ }
}
func TestRandomData(t *testing.T) {
- // sizes := []int{123456}
- sizes := []int{1, 60, 83, 179, 253, 1024, 4095, 4096, 4097, 8191, 8192, 8193, 123456, 2345678}
+ sizes := []int{1, 60, 83, 179, 253, 1024, 4095, 4096, 4097, 8191, 8192, 8193, 12287, 12288, 12289, 123456, 2345678}
tester := &chunkerTester{t: t}
+
chunker := NewTreeChunker(NewChunkerParams())
+ pyramid := NewPyramidChunker(NewChunkerParams())
for _, s := range sizes {
- testRandomData(chunker, s, tester)
+ treeChunkerKey := testRandomData(chunker, s, tester)
+ pyramidChunkerKey := testRandomData(pyramid, s, tester)
+ if treeChunkerKey.String() != pyramidChunkerKey.String() {
+ tester.t.Fatalf("tree chunker and pyramid chunker key mismatch for size %v\n TC: %v\n PC: %v\n", s, treeChunkerKey.String(), pyramidChunkerKey.String())
+ }
}
- pyramid := NewPyramidChunker(NewChunkerParams())
+
+ cp := NewChunkerParams()
+ cp.Hash = BMTHash
+ chunker = NewTreeChunker(cp)
+ pyramid = NewPyramidChunker(cp)
for _, s := range sizes {
- testRandomData(pyramid, s, tester)
+ treeChunkerKey := testRandomData(chunker, s, tester)
+ pyramidChunkerKey := testRandomData(pyramid, s, tester)
+ if treeChunkerKey.String() != pyramidChunkerKey.String() {
+ tester.t.Fatalf("tree chunker BMT and pyramid chunker BMT key mismatch for size %v \n TC: %v\n PC: %v\n", s, treeChunkerKey.String(), pyramidChunkerKey.String())
+ }
}
+
}
func TestRandomBrokenData(t *testing.T) {
- sizes := []int{1, 60, 83, 179, 253, 1024, 4095, 4096, 4097, 8191, 8192, 8193, 123456, 2345678}
+ sizes := []int{1, 60, 83, 179, 253, 1024, 4095, 4096, 4097, 8191, 8192, 8193, 12287, 12288, 12289, 123456, 2345678}
tester := &chunkerTester{t: t}
chunker := NewTreeChunker(NewChunkerParams())
for _, s := range sizes {
testRandomBrokenData(chunker, s, tester)
- t.Logf("done size: %v", s)
}
}
@@ -220,45 +390,100 @@ func benchmarkJoin(n int, t *testing.B) {
chunkC := make(chan *Chunk, 1000)
swg := &sync.WaitGroup{}
- key := tester.Split(chunker, data, int64(n), chunkC, swg, nil)
- // t.StartTimer()
+ key, err := tester.Split(chunker, data, int64(n), chunkC, swg, nil)
+ if err != nil {
+ tester.t.Fatalf(err.Error())
+ }
chunkC = make(chan *Chunk, 1000)
quitC := make(chan bool)
reader := tester.Join(chunker, key, i, chunkC, quitC)
benchReadAll(reader)
close(chunkC)
<-quitC
- // t.StopTimer()
}
- stats := new(runtime.MemStats)
- runtime.ReadMemStats(stats)
- fmt.Println(stats.Sys)
}
-func benchmarkSplitTree(n int, t *testing.B) {
+func benchmarkSplitTreeSHA3(n int, t *testing.B) {
t.ReportAllocs()
for i := 0; i < t.N; i++ {
chunker := NewTreeChunker(NewChunkerParams())
tester := &chunkerTester{t: t}
data := testDataReader(n)
- tester.Split(chunker, data, int64(n), nil, nil, nil)
+ _, err := tester.Split(chunker, data, int64(n), nil, nil, nil)
+ if err != nil {
+ tester.t.Fatalf(err.Error())
+ }
}
- stats := new(runtime.MemStats)
- runtime.ReadMemStats(stats)
- fmt.Println(stats.Sys)
}
-func benchmarkSplitPyramid(n int, t *testing.B) {
+func benchmarkSplitTreeBMT(n int, t *testing.B) {
+ t.ReportAllocs()
+ for i := 0; i < t.N; i++ {
+ cp := NewChunkerParams()
+ cp.Hash = BMTHash
+ chunker := NewTreeChunker(cp)
+ tester := &chunkerTester{t: t}
+ data := testDataReader(n)
+ _, err := tester.Split(chunker, data, int64(n), nil, nil, nil)
+ if err != nil {
+ tester.t.Fatalf(err.Error())
+ }
+ }
+}
+
+func benchmarkSplitPyramidSHA3(n int, t *testing.B) {
t.ReportAllocs()
for i := 0; i < t.N; i++ {
splitter := NewPyramidChunker(NewChunkerParams())
tester := &chunkerTester{t: t}
data := testDataReader(n)
- tester.Split(splitter, data, int64(n), nil, nil, nil)
+ _, err := tester.Split(splitter, data, int64(n), nil, nil, nil)
+ if err != nil {
+ tester.t.Fatalf(err.Error())
+ }
+ }
+}
+
+func benchmarkSplitPyramidBMT(n int, t *testing.B) {
+ t.ReportAllocs()
+ for i := 0; i < t.N; i++ {
+ cp := NewChunkerParams()
+ cp.Hash = BMTHash
+ splitter := NewPyramidChunker(cp)
+ tester := &chunkerTester{t: t}
+ data := testDataReader(n)
+ _, err := tester.Split(splitter, data, int64(n), nil, nil, nil)
+ if err != nil {
+ tester.t.Fatalf(err.Error())
+ }
+ }
+}
+
+func benchmarkAppendPyramid(n, m int, t *testing.B) {
+ t.ReportAllocs()
+ for i := 0; i < t.N; i++ {
+ chunker := NewPyramidChunker(NewChunkerParams())
+ tester := &chunkerTester{t: t}
+ data := testDataReader(n)
+ data1 := testDataReader(m)
+
+ chunkC := make(chan *Chunk, 1000)
+ swg := &sync.WaitGroup{}
+ key, err := tester.Split(chunker, data, int64(n), chunkC, swg, nil)
+ if err != nil {
+ tester.t.Fatalf(err.Error())
+ }
+
+ chunkC = make(chan *Chunk, 1000)
+ swg = &sync.WaitGroup{}
+
+ _, err = tester.Append(chunker, key, data1, chunkC, swg, nil)
+ if err != nil {
+ tester.t.Fatalf(err.Error())
+ }
+
+ close(chunkC)
}
- stats := new(runtime.MemStats)
- runtime.ReadMemStats(stats)
- fmt.Println(stats.Sys)
}
func BenchmarkJoin_2(t *testing.B) { benchmarkJoin(100, t) }
@@ -269,26 +494,59 @@ func BenchmarkJoin_6(t *testing.B) { benchmarkJoin(1000000, t) }
func BenchmarkJoin_7(t *testing.B) { benchmarkJoin(10000000, t) }
func BenchmarkJoin_8(t *testing.B) { benchmarkJoin(100000000, t) }
-func BenchmarkSplitTree_2(t *testing.B) { benchmarkSplitTree(100, t) }
-func BenchmarkSplitTree_2h(t *testing.B) { benchmarkSplitTree(500, t) }
-func BenchmarkSplitTree_3(t *testing.B) { benchmarkSplitTree(1000, t) }
-func BenchmarkSplitTree_3h(t *testing.B) { benchmarkSplitTree(5000, t) }
-func BenchmarkSplitTree_4(t *testing.B) { benchmarkSplitTree(10000, t) }
-func BenchmarkSplitTree_4h(t *testing.B) { benchmarkSplitTree(50000, t) }
-func BenchmarkSplitTree_5(t *testing.B) { benchmarkSplitTree(100000, t) }
-func BenchmarkSplitTree_6(t *testing.B) { benchmarkSplitTree(1000000, t) }
-func BenchmarkSplitTree_7(t *testing.B) { benchmarkSplitTree(10000000, t) }
-func BenchmarkSplitTree_8(t *testing.B) { benchmarkSplitTree(100000000, t) }
-
-func BenchmarkSplitPyramid_2(t *testing.B) { benchmarkSplitPyramid(100, t) }
-func BenchmarkSplitPyramid_2h(t *testing.B) { benchmarkSplitPyramid(500, t) }
-func BenchmarkSplitPyramid_3(t *testing.B) { benchmarkSplitPyramid(1000, t) }
-func BenchmarkSplitPyramid_3h(t *testing.B) { benchmarkSplitPyramid(5000, t) }
-func BenchmarkSplitPyramid_4(t *testing.B) { benchmarkSplitPyramid(10000, t) }
-func BenchmarkSplitPyramid_4h(t *testing.B) { benchmarkSplitPyramid(50000, t) }
-func BenchmarkSplitPyramid_5(t *testing.B) { benchmarkSplitPyramid(100000, t) }
-func BenchmarkSplitPyramid_6(t *testing.B) { benchmarkSplitPyramid(1000000, t) }
-func BenchmarkSplitPyramid_7(t *testing.B) { benchmarkSplitPyramid(10000000, t) }
-func BenchmarkSplitPyramid_8(t *testing.B) { benchmarkSplitPyramid(100000000, t) }
-
-// godep go test -bench ./swarm/storage -cpuprofile cpu.out -memprofile mem.out
+func BenchmarkSplitTreeSHA3_2(t *testing.B) { benchmarkSplitTreeSHA3(100, t) }
+func BenchmarkSplitTreeSHA3_2h(t *testing.B) { benchmarkSplitTreeSHA3(500, t) }
+func BenchmarkSplitTreeSHA3_3(t *testing.B) { benchmarkSplitTreeSHA3(1000, t) }
+func BenchmarkSplitTreeSHA3_3h(t *testing.B) { benchmarkSplitTreeSHA3(5000, t) }
+func BenchmarkSplitTreeSHA3_4(t *testing.B) { benchmarkSplitTreeSHA3(10000, t) }
+func BenchmarkSplitTreeSHA3_4h(t *testing.B) { benchmarkSplitTreeSHA3(50000, t) }
+func BenchmarkSplitTreeSHA3_5(t *testing.B) { benchmarkSplitTreeSHA3(100000, t) }
+func BenchmarkSplitTreeSHA3_6(t *testing.B) { benchmarkSplitTreeSHA3(1000000, t) }
+func BenchmarkSplitTreeSHA3_7(t *testing.B) { benchmarkSplitTreeSHA3(10000000, t) }
+func BenchmarkSplitTreeSHA3_8(t *testing.B) { benchmarkSplitTreeSHA3(100000000, t) }
+
+func BenchmarkSplitTreeBMT_2(t *testing.B) { benchmarkSplitTreeBMT(100, t) }
+func BenchmarkSplitTreeBMT_2h(t *testing.B) { benchmarkSplitTreeBMT(500, t) }
+func BenchmarkSplitTreeBMT_3(t *testing.B) { benchmarkSplitTreeBMT(1000, t) }
+func BenchmarkSplitTreeBMT_3h(t *testing.B) { benchmarkSplitTreeBMT(5000, t) }
+func BenchmarkSplitTreeBMT_4(t *testing.B) { benchmarkSplitTreeBMT(10000, t) }
+func BenchmarkSplitTreeBMT_4h(t *testing.B) { benchmarkSplitTreeBMT(50000, t) }
+func BenchmarkSplitTreeBMT_5(t *testing.B) { benchmarkSplitTreeBMT(100000, t) }
+func BenchmarkSplitTreeBMT_6(t *testing.B) { benchmarkSplitTreeBMT(1000000, t) }
+func BenchmarkSplitTreeBMT_7(t *testing.B) { benchmarkSplitTreeBMT(10000000, t) }
+func BenchmarkSplitTreeBMT_8(t *testing.B) { benchmarkSplitTreeBMT(100000000, t) }
+
+func BenchmarkSplitPyramidSHA3_2(t *testing.B) { benchmarkSplitPyramidSHA3(100, t) }
+func BenchmarkSplitPyramidSHA3_2h(t *testing.B) { benchmarkSplitPyramidSHA3(500, t) }
+func BenchmarkSplitPyramidSHA3_3(t *testing.B) { benchmarkSplitPyramidSHA3(1000, t) }
+func BenchmarkSplitPyramidSHA3_3h(t *testing.B) { benchmarkSplitPyramidSHA3(5000, t) }
+func BenchmarkSplitPyramidSHA3_4(t *testing.B) { benchmarkSplitPyramidSHA3(10000, t) }
+func BenchmarkSplitPyramidSHA3_4h(t *testing.B) { benchmarkSplitPyramidSHA3(50000, t) }
+func BenchmarkSplitPyramidSHA3_5(t *testing.B) { benchmarkSplitPyramidSHA3(100000, t) }
+func BenchmarkSplitPyramidSHA3_6(t *testing.B) { benchmarkSplitPyramidSHA3(1000000, t) }
+func BenchmarkSplitPyramidSHA3_7(t *testing.B) { benchmarkSplitPyramidSHA3(10000000, t) }
+func BenchmarkSplitPyramidSHA3_8(t *testing.B) { benchmarkSplitPyramidSHA3(100000000, t) }
+
+func BenchmarkSplitPyramidBMT_2(t *testing.B) { benchmarkSplitPyramidBMT(100, t) }
+func BenchmarkSplitPyramidBMT_2h(t *testing.B) { benchmarkSplitPyramidBMT(500, t) }
+func BenchmarkSplitPyramidBMT_3(t *testing.B) { benchmarkSplitPyramidBMT(1000, t) }
+func BenchmarkSplitPyramidBMT_3h(t *testing.B) { benchmarkSplitPyramidBMT(5000, t) }
+func BenchmarkSplitPyramidBMT_4(t *testing.B) { benchmarkSplitPyramidBMT(10000, t) }
+func BenchmarkSplitPyramidBMT_4h(t *testing.B) { benchmarkSplitPyramidBMT(50000, t) }
+func BenchmarkSplitPyramidBMT_5(t *testing.B) { benchmarkSplitPyramidBMT(100000, t) }
+func BenchmarkSplitPyramidBMT_6(t *testing.B) { benchmarkSplitPyramidBMT(1000000, t) }
+func BenchmarkSplitPyramidBMT_7(t *testing.B) { benchmarkSplitPyramidBMT(10000000, t) }
+func BenchmarkSplitPyramidBMT_8(t *testing.B) { benchmarkSplitPyramidBMT(100000000, t) }
+
+func BenchmarkAppendPyramid_2(t *testing.B) { benchmarkAppendPyramid(100, 1000, t) }
+func BenchmarkAppendPyramid_2h(t *testing.B) { benchmarkAppendPyramid(500, 1000, t) }
+func BenchmarkAppendPyramid_3(t *testing.B) { benchmarkAppendPyramid(1000, 1000, t) }
+func BenchmarkAppendPyramid_4(t *testing.B) { benchmarkAppendPyramid(10000, 1000, t) }
+func BenchmarkAppendPyramid_4h(t *testing.B) { benchmarkAppendPyramid(50000, 1000, t) }
+func BenchmarkAppendPyramid_5(t *testing.B) { benchmarkAppendPyramid(1000000, 1000, t) }
+func BenchmarkAppendPyramid_6(t *testing.B) { benchmarkAppendPyramid(1000000, 1000, t) }
+func BenchmarkAppendPyramid_7(t *testing.B) { benchmarkAppendPyramid(10000000, 1000, t) }
+func BenchmarkAppendPyramid_8(t *testing.B) { benchmarkAppendPyramid(100000000, 1000, t) }
+
+// go test -timeout 20m -cpu 4 -bench=./swarm/storage -run no
+// If you dont add the timeout argument above .. the benchmark will timeout and dump
diff --git a/swarm/storage/common_test.go b/swarm/storage/common_test.go
index 44d1dd1f7..cd4c2ef13 100644
--- a/swarm/storage/common_test.go
+++ b/swarm/storage/common_test.go
@@ -76,7 +76,7 @@ func testStore(m ChunkStore, l int64, branches int64, t *testing.T) {
}()
chunker := NewTreeChunker(&ChunkerParams{
Branches: branches,
- Hash: defaultHash,
+ Hash: SHA3Hash,
})
swg := &sync.WaitGroup{}
key, _ := chunker.Split(rand.Reader, l, chunkC, swg, nil)
diff --git a/swarm/storage/dbstore.go b/swarm/storage/dbstore.go
index cbeddb8cb..46a5c16cc 100644
--- a/swarm/storage/dbstore.go
+++ b/swarm/storage/dbstore.go
@@ -72,12 +72,12 @@ type DbStore struct {
gcPos, gcStartPos []byte
gcArray []*gcItem
- hashfunc Hasher
+ hashfunc SwarmHasher
lock sync.Mutex
}
-func NewDbStore(path string, hash Hasher, capacity uint64, radius int) (s *DbStore, err error) {
+func NewDbStore(path string, hash SwarmHasher, capacity uint64, radius int) (s *DbStore, err error) {
s = new(DbStore)
s.hashfunc = hash
diff --git a/swarm/storage/dbstore_test.go b/swarm/storage/dbstore_test.go
index ddce7ccfe..dd165b576 100644
--- a/swarm/storage/dbstore_test.go
+++ b/swarm/storage/dbstore_test.go
@@ -29,7 +29,7 @@ func initDbStore(t *testing.T) *DbStore {
if err != nil {
t.Fatal(err)
}
- m, err := NewDbStore(dir, MakeHashFunc(defaultHash), defaultDbCapacity, defaultRadius)
+ m, err := NewDbStore(dir, MakeHashFunc(SHA3Hash), defaultDbCapacity, defaultRadius)
if err != nil {
t.Fatal("can't create store:", err)
}
diff --git a/swarm/storage/localstore.go b/swarm/storage/localstore.go
index 58f59d0a2..b442e6cc5 100644
--- a/swarm/storage/localstore.go
+++ b/swarm/storage/localstore.go
@@ -28,7 +28,7 @@ type LocalStore struct {
}
// This constructor uses MemStore and DbStore as components
-func NewLocalStore(hash Hasher, params *StoreParams) (*LocalStore, error) {
+func NewLocalStore(hash SwarmHasher, params *StoreParams) (*LocalStore, error) {
dbStore, err := NewDbStore(params.ChunkDbPath, hash, params.DbCapacity, params.Radius)
if err != nil {
return nil, err
diff --git a/swarm/storage/netstore.go b/swarm/storage/netstore.go
index 746dd85f6..7b0612edc 100644
--- a/swarm/storage/netstore.go
+++ b/swarm/storage/netstore.go
@@ -36,7 +36,7 @@ NetStore falls back to a backend (CloudStorage interface)
implemented by bzz/network/forwarder. forwarder or IPFS or IPΞS
*/
type NetStore struct {
- hashfunc Hasher
+ hashfunc SwarmHasher
localStore *LocalStore
cloud CloudStore
}
@@ -69,7 +69,7 @@ func NewStoreParams(path string) (self *StoreParams) {
// netstore contructor, takes path argument that is used to initialise dbStore,
// the persistent (disk) storage component of LocalStore
// the second argument is the hive, the connection/logistics manager for the node
-func NewNetStore(hash Hasher, lstore *LocalStore, cloud CloudStore, params *StoreParams) *NetStore {
+func NewNetStore(hash SwarmHasher, lstore *LocalStore, cloud CloudStore, params *StoreParams) *NetStore {
return &NetStore{
hashfunc: hash,
localStore: lstore,
diff --git a/swarm/storage/pyramid.go b/swarm/storage/pyramid.go
index 74e00a497..e3be2a987 100644
--- a/swarm/storage/pyramid.go
+++ b/swarm/storage/pyramid.go
@@ -18,53 +18,112 @@ package storage
import (
"encoding/binary"
- "fmt"
+ "errors"
"io"
- "math"
- "strings"
"sync"
+ "time"
+)
+
+/*
+ The main idea of a pyramid chunker is to process the input data without knowing the entire size apriori.
+ For this to be achieved, the chunker tree is built from the ground up until the data is exhausted.
+ This opens up new aveneus such as easy append and other sort of modifications to the tree therby avoiding
+ duplication of data chunks.
+
+
+ Below is an example of a two level chunks tree. The leaf chunks are called data chunks and all the above
+ chunks are called tree chunks. The tree chunk above data chunks is level 0 and so on until it reaches
+ the root tree chunk.
+
+
+
+ T10 <- Tree chunk lvl1
+ |
+ __________________________|_____________________________
+ / | | \
+ / | \ \
+ __T00__ ___T01__ ___T02__ ___T03__ <- Tree chunks lvl 0
+ / / \ / / \ / / \ / / \
+ / / \ / / \ / / \ / / \
+ D1 D2 ... D128 D1 D2 ... D128 D1 D2 ... D128 D1 D2 ... D128 <- Data Chunks
+
+
+ The split function continuously read the data and creates data chunks and send them to storage.
+ When certain no of data chunks are created (defaultBranches), a signal is sent to create a tree
+ entry. When the level 0 tree entries reaches certain threshold (defaultBranches), another signal
+ is sent to a tree entry one level up.. and so on... until only the data is exhausted AND only one
+ tree entry is present in certain level. The key of tree entry is given out as the rootKey of the file.
+
+*/
+
+var (
+ errLoadingTreeRootChunk = errors.New("LoadTree Error: Could not load root chunk")
+ errLoadingTreeChunk = errors.New("LoadTree Error: Could not load chunk")
+)
- "github.com/ethereum/go-ethereum/common"
+const (
+ ChunkProcessors = 8
+ DefaultBranches int64 = 128
+ splitTimeout = time.Minute * 5
)
const (
- processors = 8
+ DataChunk = 0
+ TreeChunk = 1
)
-type Tree struct {
- Chunks int64
- Levels []map[int64]*Node
- Lock sync.RWMutex
+type ChunkerParams struct {
+ Branches int64
+ Hash string
+}
+
+func NewChunkerParams() *ChunkerParams {
+ return &ChunkerParams{
+ Branches: DefaultBranches,
+ Hash: SHA3Hash,
+ }
}
-type Node struct {
- Pending int64
- Size uint64
- Children []common.Hash
- Last bool
+// Entry to create a tree node
+type TreeEntry struct {
+ level int
+ branchCount int64
+ subtreeSize uint64
+ chunk []byte
+ key []byte
+ index int // used in append to indicate the index of existing tree entry
+ updatePending bool // indicates if the entry is loaded from existing tree
}
-func (self *Node) String() string {
- var children []string
- for _, node := range self.Children {
- children = append(children, node.Hex())
+func NewTreeEntry(pyramid *PyramidChunker) *TreeEntry {
+ return &TreeEntry{
+ level: 0,
+ branchCount: 0,
+ subtreeSize: 0,
+ chunk: make([]byte, pyramid.chunkSize+8),
+ key: make([]byte, pyramid.hashSize),
+ index: 0,
+ updatePending: false,
}
- return fmt.Sprintf("pending: %v, size: %v, last :%v, children: %v", self.Pending, self.Size, self.Last, strings.Join(children, ", "))
}
-type Task struct {
- Index int64 // Index of the chunk being processed
- Size uint64
- Data []byte // Binary blob of the chunk
- Last bool
+// Used by the hash processor to create a data/tree chunk and send to storage
+type chunkJob struct {
+ key Key
+ chunk []byte
+ size int64
+ parentWg *sync.WaitGroup
+ chunkType int // used to identify the tree related chunks for debugging
+ chunkLvl int // leaf-1 is level 0 and goes upwards until it reaches root
}
type PyramidChunker struct {
- hashFunc Hasher
+ hashFunc SwarmHasher
chunkSize int64
hashSize int64
branches int64
- workerCount int
+ workerCount int64
+ workerLock sync.RWMutex
}
func NewPyramidChunker(params *ChunkerParams) (self *PyramidChunker) {
@@ -73,128 +132,506 @@ func NewPyramidChunker(params *ChunkerParams) (self *PyramidChunker) {
self.branches = params.Branches
self.hashSize = int64(self.hashFunc().Size())
self.chunkSize = self.hashSize * self.branches
- self.workerCount = 1
+ self.workerCount = 0
return
}
-func (self *PyramidChunker) Split(data io.Reader, size int64, chunkC chan *Chunk, swg, wwg *sync.WaitGroup) (Key, error) {
+func (self *PyramidChunker) Join(key Key, chunkC chan *Chunk) LazySectionReader {
+ return &LazyChunkReader{
+ key: key,
+ chunkC: chunkC,
+ chunkSize: self.chunkSize,
+ branches: self.branches,
+ hashSize: self.hashSize,
+ }
+}
- chunks := (size + self.chunkSize - 1) / self.chunkSize
- depth := int(math.Ceil(math.Log(float64(chunks))/math.Log(float64(self.branches)))) + 1
+func (self *PyramidChunker) incrementWorkerCount() {
+ self.workerLock.Lock()
+ defer self.workerLock.Unlock()
+ self.workerCount += 1
+}
- results := Tree{
- Chunks: chunks,
- Levels: make([]map[int64]*Node, depth),
+func (self *PyramidChunker) getWorkerCount() int64 {
+ self.workerLock.Lock()
+ defer self.workerLock.Unlock()
+ return self.workerCount
+}
+
+func (self *PyramidChunker) decrementWorkerCount() {
+ self.workerLock.Lock()
+ defer self.workerLock.Unlock()
+ self.workerCount -= 1
+}
+
+func (self *PyramidChunker) Split(data io.Reader, size int64, chunkC chan *Chunk, storageWG, processorWG *sync.WaitGroup) (Key, error) {
+ jobC := make(chan *chunkJob, 2*ChunkProcessors)
+ wg := &sync.WaitGroup{}
+ errC := make(chan error)
+ quitC := make(chan bool)
+ rootKey := make([]byte, self.hashSize)
+ chunkLevel := make([][]*TreeEntry, self.branches)
+
+ wg.Add(1)
+ go self.prepareChunks(false, chunkLevel, data, rootKey, quitC, wg, jobC, processorWG, chunkC, errC, storageWG)
+
+ // closes internal error channel if all subprocesses in the workgroup finished
+ go func() {
+
+ // waiting for all chunks to finish
+ wg.Wait()
+
+ // if storage waitgroup is non-nil, we wait for storage to finish too
+ if storageWG != nil {
+ storageWG.Wait()
+ }
+ //We close errC here because this is passed down to 8 parallel routines underneath.
+ // if a error happens in one of them.. that particular routine raises error...
+ // once they all complete successfully, the control comes back and we can safely close this here.
+ close(errC)
+ }()
+
+ defer close(quitC)
+
+ select {
+ case err := <-errC:
+ if err != nil {
+ return nil, err
+ }
+ case <-time.NewTimer(splitTimeout).C:
}
- for i := 0; i < depth; i++ {
- results.Levels[i] = make(map[int64]*Node)
+ return rootKey, nil
+
+}
+
+func (self *PyramidChunker) Append(key Key, data io.Reader, chunkC chan *Chunk, storageWG, processorWG *sync.WaitGroup) (Key, error) {
+ quitC := make(chan bool)
+ rootKey := make([]byte, self.hashSize)
+ chunkLevel := make([][]*TreeEntry, self.branches)
+
+ // Load the right most unfinished tree chunks in every level
+ self.loadTree(chunkLevel, key, chunkC, quitC)
+
+ jobC := make(chan *chunkJob, 2*ChunkProcessors)
+ wg := &sync.WaitGroup{}
+ errC := make(chan error)
+
+ wg.Add(1)
+ go self.prepareChunks(true, chunkLevel, data, rootKey, quitC, wg, jobC, processorWG, chunkC, errC, storageWG)
+
+ // closes internal error channel if all subprocesses in the workgroup finished
+ go func() {
+
+ // waiting for all chunks to finish
+ wg.Wait()
+
+ // if storage waitgroup is non-nil, we wait for storage to finish too
+ if storageWG != nil {
+ storageWG.Wait()
+ }
+ close(errC)
+ }()
+
+ defer close(quitC)
+
+ select {
+ case err := <-errC:
+ if err != nil {
+ return nil, err
+ }
+ case <-time.NewTimer(splitTimeout).C:
}
- // Create a pool of workers to crunch through the file
- tasks := make(chan *Task, 2*processors)
- pend := new(sync.WaitGroup)
- abortC := make(chan bool)
- for i := 0; i < processors; i++ {
- pend.Add(1)
- go self.processor(pend, swg, tasks, chunkC, &results)
+ return rootKey, nil
+
+}
+
+func (self *PyramidChunker) processor(id int64, jobC chan *chunkJob, chunkC chan *Chunk, errC chan error, quitC chan bool, swg, wwg *sync.WaitGroup) {
+ defer self.decrementWorkerCount()
+
+ hasher := self.hashFunc()
+ if wwg != nil {
+ defer wwg.Done()
}
- // Feed the chunks into the task pool
- read := 0
- for index := 0; ; index++ {
- buffer := make([]byte, self.chunkSize+8)
- n, err := data.Read(buffer[8:])
- read += n
- last := int64(read) == size || err == io.ErrUnexpectedEOF || err == io.EOF
- if err != nil && !last {
- close(abortC)
- break
- }
- binary.LittleEndian.PutUint64(buffer[:8], uint64(n))
- pend.Add(1)
+ for {
select {
- case tasks <- &Task{Index: int64(index), Size: uint64(n), Data: buffer[:n+8], Last: last}:
- case <-abortC:
- return nil, err
+
+ case job, ok := <-jobC:
+ if !ok {
+ return
+ }
+ self.processChunk(id, hasher, job, chunkC, swg)
+ case <-quitC:
+ return
}
- if last {
- break
+ }
+}
+
+func (self *PyramidChunker) processChunk(id int64, hasher SwarmHash, job *chunkJob, chunkC chan *Chunk, swg *sync.WaitGroup) {
+ hasher.ResetWithLength(job.chunk[:8]) // 8 bytes of length
+ hasher.Write(job.chunk[8:]) // minus 8 []byte length
+ h := hasher.Sum(nil)
+
+ newChunk := &Chunk{
+ Key: h,
+ SData: job.chunk,
+ Size: job.size,
+ wg: swg,
+ }
+
+ // report hash of this chunk one level up (keys corresponds to the proper subslice of the parent chunk)
+ copy(job.key, h)
+
+ // send off new chunk to storage
+ if chunkC != nil {
+ if swg != nil {
+ swg.Add(1)
}
}
- // Wait for the workers and return
- close(tasks)
- pend.Wait()
+ job.parentWg.Done()
- key := results.Levels[0][0].Children[0][:]
- return key, nil
+ if chunkC != nil {
+ chunkC <- newChunk
+ }
}
-func (self *PyramidChunker) processor(pend, swg *sync.WaitGroup, tasks chan *Task, chunkC chan *Chunk, results *Tree) {
- defer pend.Done()
+func (self *PyramidChunker) loadTree(chunkLevel [][]*TreeEntry, key Key, chunkC chan *Chunk, quitC chan bool) error {
+ // Get the root chunk to get the total size
+ chunk := retrieve(key, chunkC, quitC)
+ if chunk == nil {
+ return errLoadingTreeRootChunk
+ }
- // Start processing leaf chunks ad infinitum
- hasher := self.hashFunc()
- for task := range tasks {
- depth, pow := len(results.Levels)-1, self.branches
- size := task.Size
- data := task.Data
- var node *Node
- for depth >= 0 {
- // New chunk received, reset the hasher and start processing
- hasher.Reset()
- if node == nil { // Leaf node, hash the data chunk
- hasher.Write(task.Data)
- } else { // Internal node, hash the children
- size = node.Size
- data = make([]byte, hasher.Size()*len(node.Children)+8)
- binary.LittleEndian.PutUint64(data[:8], size)
-
- hasher.Write(data[:8])
- for i, hash := range node.Children {
- copy(data[i*hasher.Size()+8:], hash[:])
- hasher.Write(hash[:])
+ //if data size is less than a chunk... add a parent with update as pending
+ if chunk.Size <= self.chunkSize {
+ newEntry := &TreeEntry{
+ level: 0,
+ branchCount: 1,
+ subtreeSize: uint64(chunk.Size),
+ chunk: make([]byte, self.chunkSize+8),
+ key: make([]byte, self.hashSize),
+ index: 0,
+ updatePending: true,
+ }
+ copy(newEntry.chunk[8:], chunk.Key)
+ chunkLevel[0] = append(chunkLevel[0], newEntry)
+ return nil
+ }
+
+ var treeSize int64
+ var depth int
+ treeSize = self.chunkSize
+ for ; treeSize < chunk.Size; treeSize *= self.branches {
+ depth++
+ }
+
+ // Add the root chunk entry
+ branchCount := int64(len(chunk.SData)-8) / self.hashSize
+ newEntry := &TreeEntry{
+ level: int(depth - 1),
+ branchCount: branchCount,
+ subtreeSize: uint64(chunk.Size),
+ chunk: chunk.SData,
+ key: key,
+ index: 0,
+ updatePending: true,
+ }
+ chunkLevel[depth-1] = append(chunkLevel[depth-1], newEntry)
+
+ // Add the rest of the tree
+ for lvl := (depth - 1); lvl >= 1; lvl-- {
+
+ //TODO(jmozah): instead of loading finished branches and then trim in the end,
+ //avoid loading them in the first place
+ for _, ent := range chunkLevel[lvl] {
+ branchCount = int64(len(ent.chunk)-8) / self.hashSize
+ for i := int64(0); i < branchCount; i++ {
+ key := ent.chunk[8+(i*self.hashSize) : 8+((i+1)*self.hashSize)]
+ newChunk := retrieve(key, chunkC, quitC)
+ if newChunk == nil {
+ return errLoadingTreeChunk
}
- }
- hash := hasher.Sum(nil)
- last := task.Last || (node != nil) && node.Last
- // Insert the subresult into the memoization tree
- results.Lock.Lock()
- if node = results.Levels[depth][task.Index/pow]; node == nil {
- // Figure out the pending tasks
- pending := self.branches
- if task.Index/pow == results.Chunks/pow {
- pending = (results.Chunks + pow/self.branches - 1) / (pow / self.branches) % self.branches
+ bewBranchCount := int64(len(newChunk.SData)-8) / self.hashSize
+ newEntry := &TreeEntry{
+ level: int(lvl - 1),
+ branchCount: bewBranchCount,
+ subtreeSize: uint64(newChunk.Size),
+ chunk: newChunk.SData,
+ key: key,
+ index: 0,
+ updatePending: true,
}
- node = &Node{pending, 0, make([]common.Hash, pending), last}
- results.Levels[depth][task.Index/pow] = node
+ chunkLevel[lvl-1] = append(chunkLevel[lvl-1], newEntry)
+
}
- node.Pending--
- i := task.Index / (pow / self.branches) % self.branches
- if last {
- node.Last = true
+
+ // We need to get only the right most unfinished branch.. so trim all finished branches
+ if int64(len(chunkLevel[lvl-1])) >= self.branches {
+ chunkLevel[lvl-1] = nil
}
- copy(node.Children[i][:], hash)
- node.Size += size
- left := node.Pending
- if chunkC != nil {
- if swg != nil {
- swg.Add(1)
- }
+ }
+ }
+
+ return nil
+}
+
+func (self *PyramidChunker) prepareChunks(isAppend bool, chunkLevel [][]*TreeEntry, data io.Reader, rootKey []byte, quitC chan bool, wg *sync.WaitGroup, jobC chan *chunkJob, processorWG *sync.WaitGroup, chunkC chan *Chunk, errC chan error, storageWG *sync.WaitGroup) {
+ defer wg.Done()
+
+ chunkWG := &sync.WaitGroup{}
+ totalDataSize := 0
- chunkC <- &Chunk{Key: hash, SData: data, wg: swg}
- // TODO: consider selecting on self.quitC to avoid blocking forever on shutdown
+ // processorWG keeps track of workers spawned for hashing chunks
+ if processorWG != nil {
+ processorWG.Add(1)
+ }
+
+ self.incrementWorkerCount()
+ go self.processor(self.workerCount, jobC, chunkC, errC, quitC, storageWG, processorWG)
+
+ parent := NewTreeEntry(self)
+ var unFinishedChunk *Chunk
+
+ if isAppend == true && len(chunkLevel[0]) != 0 {
+
+ lastIndex := len(chunkLevel[0]) - 1
+ ent := chunkLevel[0][lastIndex]
+
+ if ent.branchCount < self.branches {
+ parent = &TreeEntry{
+ level: 0,
+ branchCount: ent.branchCount,
+ subtreeSize: ent.subtreeSize,
+ chunk: ent.chunk,
+ key: ent.key,
+ index: lastIndex,
+ updatePending: true,
}
- if depth+1 < len(results.Levels) {
- delete(results.Levels[depth+1], task.Index/(pow/self.branches))
+
+ lastBranch := parent.branchCount - 1
+ lastKey := parent.chunk[8+lastBranch*self.hashSize : 8+(lastBranch+1)*self.hashSize]
+
+ unFinishedChunk = retrieve(lastKey, chunkC, quitC)
+ if unFinishedChunk.Size < self.chunkSize {
+
+ parent.subtreeSize = parent.subtreeSize - uint64(unFinishedChunk.Size)
+ parent.branchCount = parent.branchCount - 1
+ } else {
+ unFinishedChunk = nil
}
+ }
+ }
- results.Lock.Unlock()
- // If there's more work to be done, leave for others
- if left > 0 {
+ for index := 0; ; index++ {
+
+ var n int
+ var err error
+ chunkData := make([]byte, self.chunkSize+8)
+ if unFinishedChunk != nil {
+ copy(chunkData, unFinishedChunk.SData)
+ n, err = data.Read(chunkData[8+unFinishedChunk.Size:])
+ n += int(unFinishedChunk.Size)
+ unFinishedChunk = nil
+ } else {
+ n, err = data.Read(chunkData[8:])
+ }
+
+ totalDataSize += n
+ if err != nil {
+ if err == io.EOF || err == io.ErrUnexpectedEOF {
+ if parent.branchCount == 1 {
+ // Data is exactly one chunk.. pick the last chunk key as root
+ chunkWG.Wait()
+ lastChunksKey := parent.chunk[8 : 8+self.hashSize]
+ copy(rootKey, lastChunksKey)
+ break
+ }
+ } else {
+ close(quitC)
break
}
- // We're the last ones in this batch, merge the children together
- depth--
- pow *= self.branches
}
- pend.Done()
+
+ // Data ended in chunk boundry.. just signal to start bulding tree
+ if n == 0 {
+ self.buildTree(isAppend, chunkLevel, parent, chunkWG, jobC, quitC, true, rootKey)
+ break
+ } else {
+
+ pkey := self.enqueueDataChunk(chunkData, uint64(n), parent, chunkWG, jobC, quitC)
+
+ // update tree related parent data structures
+ parent.subtreeSize += uint64(n)
+ parent.branchCount++
+
+ // Data got exhausted... signal to send any parent tree related chunks
+ if int64(n) < self.chunkSize {
+
+ // only one data chunk .. so dont add any parent chunk
+ if parent.branchCount <= 1 {
+ chunkWG.Wait()
+ copy(rootKey, pkey)
+ break
+ }
+
+ self.buildTree(isAppend, chunkLevel, parent, chunkWG, jobC, quitC, true, rootKey)
+ break
+ }
+
+ if parent.branchCount == self.branches {
+ self.buildTree(isAppend, chunkLevel, parent, chunkWG, jobC, quitC, false, rootKey)
+ parent = NewTreeEntry(self)
+ }
+
+ }
+
+ workers := self.getWorkerCount()
+ if int64(len(jobC)) > workers && workers < ChunkProcessors {
+ if processorWG != nil {
+ processorWG.Add(1)
+ }
+ self.incrementWorkerCount()
+ go self.processor(self.workerCount, jobC, chunkC, errC, quitC, storageWG, processorWG)
+ }
+
+ }
+
+}
+
+func (self *PyramidChunker) buildTree(isAppend bool, chunkLevel [][]*TreeEntry, ent *TreeEntry, chunkWG *sync.WaitGroup, jobC chan *chunkJob, quitC chan bool, last bool, rootKey []byte) {
+ chunkWG.Wait()
+ self.enqueueTreeChunk(chunkLevel, ent, chunkWG, jobC, quitC, last)
+
+ compress := false
+ endLvl := self.branches
+ for lvl := int64(0); lvl < self.branches; lvl++ {
+ lvlCount := int64(len(chunkLevel[lvl]))
+ if lvlCount >= self.branches {
+ endLvl = lvl + 1
+ compress = true
+ break
+ }
+ }
+
+ if compress == false && last == false {
+ return
+ }
+
+ // Wait for all the keys to be processed before compressing the tree
+ chunkWG.Wait()
+
+ for lvl := int64(ent.level); lvl < endLvl; lvl++ {
+
+ lvlCount := int64(len(chunkLevel[lvl]))
+ if lvlCount == 1 && last == true {
+ copy(rootKey, chunkLevel[lvl][0].key)
+ return
+ }
+
+ for startCount := int64(0); startCount < lvlCount; startCount += self.branches {
+
+ endCount := startCount + self.branches
+ if endCount > lvlCount {
+ endCount = lvlCount
+ }
+
+ var nextLvlCount int64
+ var tempEntry *TreeEntry
+ if len(chunkLevel[lvl+1]) > 0 {
+ nextLvlCount = int64(len(chunkLevel[lvl+1]) - 1)
+ tempEntry = chunkLevel[lvl+1][nextLvlCount]
+ }
+ if isAppend == true && tempEntry != nil && tempEntry.updatePending == true {
+ updateEntry := &TreeEntry{
+ level: int(lvl + 1),
+ branchCount: 0,
+ subtreeSize: 0,
+ chunk: make([]byte, self.chunkSize+8),
+ key: make([]byte, self.hashSize),
+ index: int(nextLvlCount),
+ updatePending: true,
+ }
+ for index := int64(0); index < lvlCount; index++ {
+ updateEntry.branchCount++
+ updateEntry.subtreeSize += chunkLevel[lvl][index].subtreeSize
+ copy(updateEntry.chunk[8+(index*self.hashSize):8+((index+1)*self.hashSize)], chunkLevel[lvl][index].key[:self.hashSize])
+ }
+
+ self.enqueueTreeChunk(chunkLevel, updateEntry, chunkWG, jobC, quitC, last)
+
+ } else {
+
+ noOfBranches := endCount - startCount
+ newEntry := &TreeEntry{
+ level: int(lvl + 1),
+ branchCount: noOfBranches,
+ subtreeSize: 0,
+ chunk: make([]byte, (noOfBranches*self.hashSize)+8),
+ key: make([]byte, self.hashSize),
+ index: int(nextLvlCount),
+ updatePending: false,
+ }
+
+ index := int64(0)
+ for i := startCount; i < endCount; i++ {
+ entry := chunkLevel[lvl][i]
+ newEntry.subtreeSize += entry.subtreeSize
+ copy(newEntry.chunk[8+(index*self.hashSize):8+((index+1)*self.hashSize)], entry.key[:self.hashSize])
+ index++
+ }
+
+ self.enqueueTreeChunk(chunkLevel, newEntry, chunkWG, jobC, quitC, last)
+
+ }
+
+ }
+
+ if isAppend == false {
+ chunkWG.Wait()
+ if compress == true {
+ chunkLevel[lvl] = nil
+ }
+ }
}
+
}
+
+func (self *PyramidChunker) enqueueTreeChunk(chunkLevel [][]*TreeEntry, ent *TreeEntry, chunkWG *sync.WaitGroup, jobC chan *chunkJob, quitC chan bool, last bool) {
+ if ent != nil {
+
+ // wait for data chunks to get over before processing the tree chunk
+ if last == true {
+ chunkWG.Wait()
+ }
+
+ binary.LittleEndian.PutUint64(ent.chunk[:8], ent.subtreeSize)
+ ent.key = make([]byte, self.hashSize)
+ chunkWG.Add(1)
+ select {
+ case jobC <- &chunkJob{ent.key, ent.chunk[:ent.branchCount*self.hashSize+8], int64(ent.subtreeSize), chunkWG, TreeChunk, 0}:
+ case <-quitC:
+ }
+
+ // Update or append based on weather it is a new entry or being reused
+ if ent.updatePending == true {
+ chunkWG.Wait()
+ chunkLevel[ent.level][ent.index] = ent
+ } else {
+ chunkLevel[ent.level] = append(chunkLevel[ent.level], ent)
+ }
+
+ }
+}
+
+func (self *PyramidChunker) enqueueDataChunk(chunkData []byte, size uint64, parent *TreeEntry, chunkWG *sync.WaitGroup, jobC chan *chunkJob, quitC chan bool) Key {
+ binary.LittleEndian.PutUint64(chunkData[:8], size)
+ pkey := parent.chunk[8+parent.branchCount*self.hashSize : 8+(parent.branchCount+1)*self.hashSize]
+
+ chunkWG.Add(1)
+ select {
+ case jobC <- &chunkJob{pkey, chunkData[:size+8], int64(size), chunkWG, DataChunk, -1}:
+ case <-quitC:
+ }
+
+ return pkey
+
+} \ No newline at end of file
diff --git a/swarm/storage/swarmhasher.go b/swarm/storage/swarmhasher.go
new file mode 100644
index 000000000..38b86373c
--- /dev/null
+++ b/swarm/storage/swarmhasher.go
@@ -0,0 +1,40 @@
+// Copyright 2017 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package storage
+
+import (
+ "hash"
+)
+
+const (
+ BMTHash = "BMT"
+ SHA3Hash = "SHA3" // http://golang.org/pkg/hash/#Hash
+)
+
+type SwarmHash interface {
+ hash.Hash
+ ResetWithLength([]byte)
+}
+
+type HashWithLength struct {
+ hash.Hash
+}
+
+func (self *HashWithLength) ResetWithLength(length []byte) {
+ self.Reset()
+ self.Write(length)
+}
diff --git a/swarm/storage/types.go b/swarm/storage/types.go
index a9de23c93..d35f1f929 100644
--- a/swarm/storage/types.go
+++ b/swarm/storage/types.go
@@ -24,12 +24,13 @@ import (
"io"
"sync"
- // "github.com/ethereum/go-ethereum/bmt"
+ "github.com/ethereum/go-ethereum/bmt"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto/sha3"
)
type Hasher func() hash.Hash
+type SwarmHasher func() SwarmHash
// Peer is the recorded as Source on the chunk
// should probably not be here? but network should wrap chunk object
@@ -78,12 +79,18 @@ func IsZeroKey(key Key) bool {
var ZeroKey = Key(common.Hash{}.Bytes())
-func MakeHashFunc(hash string) Hasher {
+func MakeHashFunc(hash string) SwarmHasher {
switch hash {
case "SHA256":
- return crypto.SHA256.New
+ return func() SwarmHash { return &HashWithLength{crypto.SHA256.New()} }
case "SHA3":
- return sha3.NewKeccak256
+ return func() SwarmHash { return &HashWithLength{sha3.NewKeccak256()} }
+ case "BMT":
+ return func() SwarmHash {
+ hasher := sha3.NewKeccak256
+ pool := bmt.NewTreePool(hasher, bmt.DefaultSegmentCount, bmt.DefaultPoolSize)
+ return bmt.New(pool)
+ }
}
return nil
}
@@ -192,6 +199,13 @@ type Splitter interface {
A closed error signals process completion at which point the key can be considered final if there were no errors.
*/
Split(io.Reader, int64, chan *Chunk, *sync.WaitGroup, *sync.WaitGroup) (Key, error)
+
+ /* This is the first step in making files mutable (not chunks)..
+ Append allows adding more data chunks to the end of the already existsing file.
+ The key for the root chunk is supplied to load the respective tree.
+ Rest of the parameters behave like Split.
+ */
+ Append(Key, io.Reader, chan *Chunk, *sync.WaitGroup, *sync.WaitGroup) (Key, error)
}
type Joiner interface {