aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/bmt
diff options
context:
space:
mode:
authorAnton Evangelatov <anton.evangelatov@gmail.com>2018-07-09 20:11:49 +0800
committerBalint Gabor <balint.g@gmail.com>2018-07-09 20:11:49 +0800
commitb3711af05176f446fad5ee90e2be4bd09c4086a2 (patch)
tree036eb23e423c385c0be00e3f8d3d97dea7040f8c /swarm/bmt
parent30bdf817a0d0afb33f3635f1de877f9caf09be05 (diff)
downloaddexon-b3711af05176f446fad5ee90e2be4bd09c4086a2.tar
dexon-b3711af05176f446fad5ee90e2be4bd09c4086a2.tar.gz
dexon-b3711af05176f446fad5ee90e2be4bd09c4086a2.tar.bz2
dexon-b3711af05176f446fad5ee90e2be4bd09c4086a2.tar.lz
dexon-b3711af05176f446fad5ee90e2be4bd09c4086a2.tar.xz
dexon-b3711af05176f446fad5ee90e2be4bd09c4086a2.tar.zst
dexon-b3711af05176f446fad5ee90e2be4bd09c4086a2.zip
swarm: ctx propagation; bmt fixes; pss generic notification framework (#17150)
* cmd/swarm: minor cli flag text adjustments * swarm/api/http: sticky footer for swarm landing page using flex * swarm/api/http: sticky footer for error pages and fix for multiple choices * cmd/swarm, swarm/storage, swarm: fix mingw on windows test issues * cmd/swarm: update description of swarm cmd * swarm: added network ID test * cmd/swarm: support for smoke tests on the production swarm cluster * cmd/swarm/swarm-smoke: simplify cluster logic as per suggestion * swarm: propagate ctx to internal apis (#754) * swarm/metrics: collect disk measurements * swarm/bmt: fix io.Writer interface * Write now tolerates arbitrary variable buffers * added variable buffer tests * Write loop and finalise optimisation * refactor / rename * add tests for empty input * swarm/pss: (UPDATE) Generic notifications package (#744) swarm/pss: Generic package for creating pss notification svcs * swarm: Adding context to more functions * swarm/api: change colour of landing page in templates * swarm/api: change landing page to react to enter keypress
Diffstat (limited to 'swarm/bmt')
-rw-r--r--swarm/bmt/bmt.go220
-rw-r--r--swarm/bmt/bmt_r.go3
-rw-r--r--swarm/bmt/bmt_test.go122
3 files changed, 220 insertions, 125 deletions
diff --git a/swarm/bmt/bmt.go b/swarm/bmt/bmt.go
index 71aee2495..835587020 100644
--- a/swarm/bmt/bmt.go
+++ b/swarm/bmt/bmt.go
@@ -117,10 +117,7 @@ func NewTreePool(hasher BaseHasherFunc, segmentCount, capacity int) *TreePool {
zerohashes[0] = zeros
h := hasher()
for i := 1; i < depth; i++ {
- h.Reset()
- h.Write(zeros)
- h.Write(zeros)
- zeros = h.Sum(nil)
+ zeros = doHash(h, nil, zeros, zeros)
zerohashes[i] = zeros
}
return &TreePool{
@@ -318,41 +315,19 @@ func (h *Hasher) Sum(b []byte) (r []byte) {
// * if sequential write is used (can read sections)
func (h *Hasher) sum(b []byte, release, section bool) (r []byte) {
t := h.bmt
- h.finalise(section)
- if t.offset > 0 { // get the last node (double segment)
-
- // padding the segment with zero
- copy(t.segment[t.offset:], h.pool.zerohashes[0])
- }
- if section {
- if t.cur%2 == 1 {
- // if just finished current segment, copy it to the right half of the chunk
- copy(t.section[h.pool.SegmentSize:], t.segment)
- } else {
- // copy segment to front of section, zero pad the right half
- copy(t.section, t.segment)
- copy(t.section[h.pool.SegmentSize:], h.pool.zerohashes[0])
- }
- h.writeSection(t.cur, t.section)
- } else {
- // TODO: h.writeSegment(t.cur, t.segment)
- panic("SegmentWriter not implemented")
- }
+ bh := h.pool.hasher()
+ go h.writeSection(t.cur, t.section, true)
bmtHash := <-t.result
span := t.span
-
+ // fmt.Println(t.draw(bmtHash))
if release {
h.releaseTree()
}
- // sha3(span + BMT(pure_chunk))
+ // b + sha3(span + BMT(pure_chunk))
if span == nil {
- return bmtHash
+ return append(b, bmtHash...)
}
- bh := h.pool.hasher()
- bh.Reset()
- bh.Write(span)
- bh.Write(bmtHash)
- return bh.Sum(b)
+ return doHash(bh, b, span, bmtHash)
}
// Hasher implements the SwarmHash interface
@@ -367,37 +342,41 @@ func (h *Hasher) Write(b []byte) (int, error) {
return 0, nil
}
t := h.bmt
- need := (h.pool.SegmentCount - t.cur) * h.pool.SegmentSize
- if l < need {
- need = l
- }
- // calculate missing bit to complete current open segment
- rest := h.pool.SegmentSize - t.offset
- if need < rest {
- rest = need
- }
- copy(t.segment[t.offset:], b[:rest])
- need -= rest
- size := (t.offset + rest) % h.pool.SegmentSize
- // read full segments and the last possibly partial segment
- for need > 0 {
- // push all finished chunks we read
- if t.cur%2 == 0 {
- copy(t.section, t.segment)
- } else {
- copy(t.section[h.pool.SegmentSize:], t.segment)
- h.writeSection(t.cur, t.section)
+ secsize := 2 * h.pool.SegmentSize
+ // calculate length of missing bit to complete current open section
+ smax := secsize - t.offset
+ // if at the beginning of chunk or middle of the section
+ if t.offset < secsize {
+ // fill up current segment from buffer
+ copy(t.section[t.offset:], b)
+ // if input buffer consumed and open section not complete, then
+ // advance offset and return
+ if smax == 0 {
+ smax = secsize
+ }
+ if l <= smax {
+ t.offset += l
+ return l, nil
}
- size = h.pool.SegmentSize
- if need < size {
- size = need
+ } else {
+ if t.cur == h.pool.SegmentCount*2 {
+ return 0, nil
}
- copy(t.segment, b[rest:rest+size])
- need -= size
- rest += size
+ }
+ // read full segments and the last possibly partial segment from the input buffer
+ for smax < l {
+ // section complete; push to tree asynchronously
+ go h.writeSection(t.cur, t.section, false)
+ // reset section
+ t.section = make([]byte, secsize)
+ // copy from imput buffer at smax to right half of section
+ copy(t.section, b[smax:])
+ // advance cursor
t.cur++
+ // smax here represents successive offsets in the input buffer
+ smax += secsize
}
- t.offset = size % h.pool.SegmentSize
+ t.offset = l - smax + secsize
return l, nil
}
@@ -426,6 +405,8 @@ func (h *Hasher) releaseTree() {
t.span = nil
t.hash = nil
h.bmt = nil
+ t.section = make([]byte, h.pool.SegmentSize*2)
+ t.segment = make([]byte, h.pool.SegmentSize)
h.pool.release(t)
}
}
@@ -435,29 +416,37 @@ func (h *Hasher) releaseTree() {
// go h.run(h.bmt.leaves[i/2], h.pool.hasher(), i%2 == 0, s)
// }
-// writeSection writes the hash of i/2-th segction into right level 1 node of the BMT tree
-func (h *Hasher) writeSection(i int, section []byte) {
- n := h.bmt.leaves[i/2]
+// writeSection writes the hash of i-th section into level 1 node of the BMT tree
+func (h *Hasher) writeSection(i int, section []byte, final bool) {
+ // select the leaf node for the section
+ n := h.bmt.leaves[i]
isLeft := n.isLeft
n = n.parent
bh := h.pool.hasher()
- bh.Write(section)
- go func() {
- sum := bh.Sum(nil)
- if n == nil {
- h.bmt.result <- sum
- return
- }
- h.run(n, bh, isLeft, sum)
- }()
+ // hash the section
+ s := doHash(bh, nil, section)
+ // write hash into parent node
+ if final {
+ // for the last segment use writeFinalNode
+ h.writeFinalNode(1, n, bh, isLeft, s)
+ } else {
+ h.writeNode(n, bh, isLeft, s)
+ }
}
-// run pushes the data to the node
+// writeNode pushes the data to the node
// if it is the first of 2 sisters written the routine returns
// if it is the second, it calculates the hash and writes it
// to the parent node recursively
-func (h *Hasher) run(n *node, bh hash.Hash, isLeft bool, s []byte) {
+func (h *Hasher) writeNode(n *node, bh hash.Hash, isLeft bool, s []byte) {
+ level := 1
for {
+ // at the root of the bmt just write the result to the result channel
+ if n == nil {
+ h.bmt.result <- s
+ return
+ }
+ // otherwise assign child hash to branc
if isLeft {
n.left = s
} else {
@@ -467,44 +456,68 @@ func (h *Hasher) run(n *node, bh hash.Hash, isLeft bool, s []byte) {
if n.toggle() {
return
}
- // the second thread now can be sure both left and right children are written
- // it calculates the hash of left|right and take it to the next level
- bh.Reset()
- bh.Write(n.left)
- bh.Write(n.right)
- s = bh.Sum(nil)
-
- // at the root of the bmt just write the result to the result channel
- if n.parent == nil {
- h.bmt.result <- s
- return
- }
-
- // otherwise iterate on parent
+ // the thread coming later now can be sure both left and right children are written
+ // it calculates the hash of left|right and pushes it to the parent
+ s = doHash(bh, nil, n.left, n.right)
isLeft = n.isLeft
n = n.parent
+ level++
}
}
-// finalise is following the path starting from the final datasegment to the
+// writeFinalNode is following the path starting from the final datasegment to the
// BMT root via parents
// for unbalanced trees it fills in the missing right sister nodes using
// the pool's lookup table for BMT subtree root hashes for all-zero sections
-func (h *Hasher) finalise(skip bool) {
- t := h.bmt
- isLeft := t.cur%2 == 0
- n := t.leaves[t.cur/2]
- for level := 0; n != nil; level++ {
- // when the final segment's path is going via left child node
- // we include an all-zero subtree hash for the right level and toggle the node.
- // when the path is going through right child node, nothing to do
- if isLeft && !skip {
+// otherwise behaves like `writeNode`
+func (h *Hasher) writeFinalNode(level int, n *node, bh hash.Hash, isLeft bool, s []byte) {
+
+ for {
+ // at the root of the bmt just write the result to the result channel
+ if n == nil {
+ if s != nil {
+ h.bmt.result <- s
+ }
+ return
+ }
+ var noHash bool
+ if isLeft {
+ // coming from left sister branch
+ // when the final section's path is going via left child node
+ // we include an all-zero subtree hash for the right level and toggle the node.
+ // when the path is going through right child node, nothing to do
n.right = h.pool.zerohashes[level]
- n.toggle()
+ if s != nil {
+ n.left = s
+ // if a left final node carries a hash, it must be the first (and only thread)
+ // so the toggle is already in passive state no need no call
+ // yet thread needs to carry on pushing hash to parent
+ } else {
+ // if again first thread then propagate nil and calculate no hash
+ noHash = n.toggle()
+ }
+ } else {
+ // right sister branch
+ // if s is nil, then thread arrived first at previous node and here there will be two,
+ // so no need to do anything
+ if s != nil {
+ n.right = s
+ noHash = n.toggle()
+ } else {
+ noHash = true
+ }
+ }
+ // the child-thread first arriving will just continue resetting s to nil
+ // the second thread now can be sure both left and right children are written
+ // it calculates the hash of left|right and pushes it to the parent
+ if noHash {
+ s = nil
+ } else {
+ s = doHash(bh, nil, n.left, n.right)
}
- skip = false
isLeft = n.isLeft
n = n.parent
+ level++
}
}
@@ -525,6 +538,15 @@ func (n *node) toggle() bool {
return atomic.AddInt32(&n.state, 1)%2 == 1
}
+// calculates the hash of the data using hash.Hash
+func doHash(h hash.Hash, b []byte, data ...[]byte) []byte {
+ h.Reset()
+ for _, v := range data {
+ h.Write(v)
+ }
+ return h.Sum(b)
+}
+
func hashstr(b []byte) string {
end := len(b)
if end > 4 {
diff --git a/swarm/bmt/bmt_r.go b/swarm/bmt/bmt_r.go
index c61d2dc73..0cb6c146f 100644
--- a/swarm/bmt/bmt_r.go
+++ b/swarm/bmt/bmt_r.go
@@ -80,6 +80,5 @@ func (rh *RefHasher) hash(data []byte, length int) []byte {
}
rh.hasher.Reset()
rh.hasher.Write(section)
- s := rh.hasher.Sum(nil)
- return s
+ return rh.hasher.Sum(nil)
}
diff --git a/swarm/bmt/bmt_test.go b/swarm/bmt/bmt_test.go
index e074d90e7..ae40eadab 100644
--- a/swarm/bmt/bmt_test.go
+++ b/swarm/bmt/bmt_test.go
@@ -34,12 +34,12 @@ import (
// the actual data length generated (could be longer than max datalength of the BMT)
const BufferSize = 4128
+var counts = []int{1, 2, 3, 4, 5, 8, 9, 15, 16, 17, 32, 37, 42, 53, 63, 64, 65, 111, 127, 128}
+
+// calculates the Keccak256 SHA3 hash of the data
func sha3hash(data ...[]byte) []byte {
h := sha3.NewKeccak256()
- for _, v := range data {
- h.Write(v)
- }
- return h.Sum(nil)
+ return doHash(h, nil, data...)
}
// TestRefHasher tests that the RefHasher computes the expected BMT hash for
@@ -129,31 +129,48 @@ func TestRefHasher(t *testing.T) {
}
}
-func TestHasherCorrectness(t *testing.T) {
- err := testHasher(testBaseHasher)
- if err != nil {
- t.Fatal(err)
+// tests if hasher responds with correct hash
+func TestHasherEmptyData(t *testing.T) {
+ hasher := sha3.NewKeccak256
+ var data []byte
+ for _, count := range counts {
+ t.Run(fmt.Sprintf("%d_segments", count), func(t *testing.T) {
+ pool := NewTreePool(hasher, count, PoolSize)
+ defer pool.Drain(0)
+ bmt := New(pool)
+ rbmt := NewRefHasher(hasher, count)
+ refHash := rbmt.Hash(data)
+ expHash := Hash(bmt, nil, data)
+ if !bytes.Equal(expHash, refHash) {
+ t.Fatalf("hash mismatch with reference. expected %x, got %x", refHash, expHash)
+ }
+ })
}
}
-func testHasher(f func(BaseHasherFunc, []byte, int, int) error) error {
+func TestHasherCorrectness(t *testing.T) {
data := newData(BufferSize)
hasher := sha3.NewKeccak256
size := hasher().Size()
- counts := []int{1, 2, 3, 4, 5, 8, 16, 32, 64, 128}
var err error
for _, count := range counts {
- max := count * size
- incr := 1
- for n := 1; n <= max; n += incr {
- err = f(hasher, data, n, count)
- if err != nil {
- return err
+ t.Run(fmt.Sprintf("segments_%v", count), func(t *testing.T) {
+ max := count * size
+ incr := 1
+ capacity := 1
+ pool := NewTreePool(hasher, count, capacity)
+ defer pool.Drain(0)
+ for n := 0; n <= max; n += incr {
+ incr = 1 + rand.Intn(5)
+ bmt := New(pool)
+ err = testHasherCorrectness(bmt, hasher, data, n, count)
+ if err != nil {
+ t.Fatal(err)
+ }
}
- }
+ })
}
- return nil
}
// Tests that the BMT hasher can be synchronously reused with poolsizes 1 and PoolSize
@@ -215,12 +232,69 @@ LOOP:
}
}
-// helper function that creates a tree pool
-func testBaseHasher(hasher BaseHasherFunc, d []byte, n, count int) error {
- pool := NewTreePool(hasher, count, 1)
- defer pool.Drain(0)
- bmt := New(pool)
- return testHasherCorrectness(bmt, hasher, d, n, count)
+// Tests BMT Hasher io.Writer interface is working correctly
+// even multiple short random write buffers
+func TestBMTHasherWriterBuffers(t *testing.T) {
+ hasher := sha3.NewKeccak256
+
+ for _, count := range counts {
+ t.Run(fmt.Sprintf("%d_segments", count), func(t *testing.T) {
+ errc := make(chan error)
+ pool := NewTreePool(hasher, count, PoolSize)
+ defer pool.Drain(0)
+ n := count * 32
+ bmt := New(pool)
+ data := newData(n)
+ rbmt := NewRefHasher(hasher, count)
+ refHash := rbmt.Hash(data)
+ expHash := Hash(bmt, nil, data)
+ if !bytes.Equal(expHash, refHash) {
+ t.Fatalf("hash mismatch with reference. expected %x, got %x", refHash, expHash)
+ }
+ attempts := 10
+ f := func() error {
+ bmt := New(pool)
+ bmt.Reset()
+ var buflen int
+ for offset := 0; offset < n; offset += buflen {
+ buflen = rand.Intn(n-offset) + 1
+ read, err := bmt.Write(data[offset : offset+buflen])
+ if err != nil {
+ return err
+ }
+ if read != buflen {
+ return fmt.Errorf("incorrect read. expected %v bytes, got %v", buflen, read)
+ }
+ }
+ hash := bmt.Sum(nil)
+ if !bytes.Equal(hash, expHash) {
+ return fmt.Errorf("hash mismatch. expected %x, got %x", hash, expHash)
+ }
+ return nil
+ }
+
+ for j := 0; j < attempts; j++ {
+ go func() {
+ errc <- f()
+ }()
+ }
+ timeout := time.NewTimer(2 * time.Second)
+ for {
+ select {
+ case err := <-errc:
+ if err != nil {
+ t.Fatal(err)
+ }
+ attempts--
+ if attempts == 0 {
+ return
+ }
+ case <-timeout.C:
+ t.Fatalf("timeout")
+ }
+ }
+ })
+ }
}
// helper function that compares reference and optimised implementations on