From c207edf2a3a6f48b4fc78cc55982d648eedab198 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jano=C5=A1=20Gulja=C5=A1?= Date: Mon, 26 Nov 2018 18:49:01 +0100 Subject: swarm: add database abstractions (shed package) (#18183) --- swarm/shed/db.go | 130 ++++++++++++ swarm/shed/db_test.go | 110 ++++++++++ swarm/shed/example_store_test.go | 332 ++++++++++++++++++++++++++++++ swarm/shed/field_string.go | 66 ++++++ swarm/shed/field_string_test.go | 110 ++++++++++ swarm/shed/field_struct.go | 71 +++++++ swarm/shed/field_struct_test.go | 127 ++++++++++++ swarm/shed/field_uint64.go | 108 ++++++++++ swarm/shed/field_uint64_test.go | 194 ++++++++++++++++++ swarm/shed/index.go | 264 ++++++++++++++++++++++++ swarm/shed/index_test.go | 426 +++++++++++++++++++++++++++++++++++++++ swarm/shed/schema.go | 134 ++++++++++++ swarm/shed/schema_test.go | 126 ++++++++++++ swarm/storage/mock/db/db.go | 7 + swarm/storage/mock/mem/mem.go | 16 ++ swarm/storage/mock/mock.go | 7 + swarm/storage/mock/rpc/rpc.go | 6 + swarm/storage/mock/test/test.go | 53 +++++ 18 files changed, 2287 insertions(+) create mode 100644 swarm/shed/db.go create mode 100644 swarm/shed/db_test.go create mode 100644 swarm/shed/example_store_test.go create mode 100644 swarm/shed/field_string.go create mode 100644 swarm/shed/field_string_test.go create mode 100644 swarm/shed/field_struct.go create mode 100644 swarm/shed/field_struct_test.go create mode 100644 swarm/shed/field_uint64.go create mode 100644 swarm/shed/field_uint64_test.go create mode 100644 swarm/shed/index.go create mode 100644 swarm/shed/index_test.go create mode 100644 swarm/shed/schema.go create mode 100644 swarm/shed/schema_test.go diff --git a/swarm/shed/db.go b/swarm/shed/db.go new file mode 100644 index 000000000..e128b8cbc --- /dev/null +++ b/swarm/shed/db.go @@ -0,0 +1,130 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +// Package shed provides a simple abstraction components to compose +// more complex operations on storage data organized in fields and indexes. +// +// Only type which holds logical information about swarm storage chunks data +// and metadata is IndexItem. This part is not generalized mostly for +// performance reasons. +package shed + +import ( + "github.com/ethereum/go-ethereum/metrics" + "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/iterator" + "github.com/syndtr/goleveldb/leveldb/opt" +) + +// The limit for LevelDB OpenFilesCacheCapacity. +const openFileLimit = 128 + +// DB provides abstractions over LevelDB in order to +// implement complex structures using fields and ordered indexes. +// It provides a schema functionality to store fields and indexes +// information about naming and types. +type DB struct { + ldb *leveldb.DB +} + +// NewDB constructs a new DB and validates the schema +// if it exists in database on the given path. +func NewDB(path string) (db *DB, err error) { + ldb, err := leveldb.OpenFile(path, &opt.Options{ + OpenFilesCacheCapacity: openFileLimit, + }) + if err != nil { + return nil, err + } + db = &DB{ + ldb: ldb, + } + + if _, err = db.getSchema(); err != nil { + if err == leveldb.ErrNotFound { + // save schema with initialized default fields + if err = db.putSchema(schema{ + Fields: make(map[string]fieldSpec), + Indexes: make(map[byte]indexSpec), + }); err != nil { + return nil, err + } + } else { + return nil, err + } + } + return db, nil +} + +// Put wraps LevelDB Put method to increment metrics counter. +func (db *DB) Put(key []byte, value []byte) (err error) { + err = db.ldb.Put(key, value, nil) + if err != nil { + metrics.GetOrRegisterCounter("DB.putFail", nil).Inc(1) + return err + } + metrics.GetOrRegisterCounter("DB.put", nil).Inc(1) + return nil +} + +// Get wraps LevelDB Get method to increment metrics counter. +func (db *DB) Get(key []byte) (value []byte, err error) { + value, err = db.ldb.Get(key, nil) + if err != nil { + if err == leveldb.ErrNotFound { + metrics.GetOrRegisterCounter("DB.getNotFound", nil).Inc(1) + } else { + metrics.GetOrRegisterCounter("DB.getFail", nil).Inc(1) + } + return nil, err + } + metrics.GetOrRegisterCounter("DB.get", nil).Inc(1) + return value, nil +} + +// Delete wraps LevelDB Delete method to increment metrics counter. +func (db *DB) Delete(key []byte) (err error) { + err = db.ldb.Delete(key, nil) + if err != nil { + metrics.GetOrRegisterCounter("DB.deleteFail", nil).Inc(1) + return err + } + metrics.GetOrRegisterCounter("DB.delete", nil).Inc(1) + return nil +} + +// NewIterator wraps LevelDB NewIterator method to increment metrics counter. +func (db *DB) NewIterator() iterator.Iterator { + metrics.GetOrRegisterCounter("DB.newiterator", nil).Inc(1) + + return db.ldb.NewIterator(nil, nil) +} + +// WriteBatch wraps LevelDB Write method to increment metrics counter. +func (db *DB) WriteBatch(batch *leveldb.Batch) (err error) { + err = db.ldb.Write(batch, nil) + if err != nil { + metrics.GetOrRegisterCounter("DB.writebatchFail", nil).Inc(1) + return err + } + metrics.GetOrRegisterCounter("DB.writebatch", nil).Inc(1) + return nil +} + +// Close closes LevelDB database. +func (db *DB) Close() (err error) { + return db.ldb.Close() +} diff --git a/swarm/shed/db_test.go b/swarm/shed/db_test.go new file mode 100644 index 000000000..45325beeb --- /dev/null +++ b/swarm/shed/db_test.go @@ -0,0 +1,110 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package shed + +import ( + "io/ioutil" + "os" + "testing" +) + +// TestNewDB constructs a new DB +// and validates if the schema is initialized properly. +func TestNewDB(t *testing.T) { + db, cleanupFunc := newTestDB(t) + defer cleanupFunc() + + s, err := db.getSchema() + if err != nil { + t.Fatal(err) + } + if s.Fields == nil { + t.Error("schema fields are empty") + } + if len(s.Fields) != 0 { + t.Errorf("got schema fields length %v, want %v", len(s.Fields), 0) + } + if s.Indexes == nil { + t.Error("schema indexes are empty") + } + if len(s.Indexes) != 0 { + t.Errorf("got schema indexes length %v, want %v", len(s.Indexes), 0) + } +} + +// TestDB_persistence creates one DB, saves a field and closes that DB. +// Then, it constructs another DB and trues to retrieve the saved value. +func TestDB_persistence(t *testing.T) { + dir, err := ioutil.TempDir("", "shed-test-persistence") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(dir) + + db, err := NewDB(dir) + if err != nil { + t.Fatal(err) + } + stringField, err := db.NewStringField("preserve-me") + if err != nil { + t.Fatal(err) + } + want := "persistent value" + err = stringField.Put(want) + if err != nil { + t.Fatal(err) + } + err = db.Close() + if err != nil { + t.Fatal(err) + } + + db2, err := NewDB(dir) + if err != nil { + t.Fatal(err) + } + stringField2, err := db2.NewStringField("preserve-me") + if err != nil { + t.Fatal(err) + } + got, err := stringField2.Get() + if err != nil { + t.Fatal(err) + } + if got != want { + t.Errorf("got string %q, want %q", got, want) + } +} + +// newTestDB is a helper function that constructs a +// temporary database and returns a cleanup function that must +// be called to remove the data. +func newTestDB(t *testing.T) (db *DB, cleanupFunc func()) { + t.Helper() + + dir, err := ioutil.TempDir("", "shed-test") + if err != nil { + t.Fatal(err) + } + cleanupFunc = func() { os.RemoveAll(dir) } + db, err = NewDB(dir) + if err != nil { + cleanupFunc() + t.Fatal(err) + } + return db, cleanupFunc +} diff --git a/swarm/shed/example_store_test.go b/swarm/shed/example_store_test.go new file mode 100644 index 000000000..2ed0be141 --- /dev/null +++ b/swarm/shed/example_store_test.go @@ -0,0 +1,332 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package shed_test + +import ( + "bytes" + "context" + "encoding/binary" + "fmt" + "io/ioutil" + "log" + "os" + "time" + + "github.com/ethereum/go-ethereum/swarm/shed" + "github.com/ethereum/go-ethereum/swarm/storage" + "github.com/syndtr/goleveldb/leveldb" +) + +// Store holds fields and indexes (including their encoding functions) +// and defines operations on them by composing data from them. +// It implements storage.ChunkStore interface. +// It is just an example without any support for parallel operations +// or real world implementation. +type Store struct { + db *shed.DB + + // fields and indexes + schemaName shed.StringField + sizeCounter shed.Uint64Field + accessCounter shed.Uint64Field + retrievalIndex shed.Index + accessIndex shed.Index + gcIndex shed.Index +} + +// New returns new Store. All fields and indexes are initialized +// and possible conflicts with schema from existing database is checked +// automatically. +func New(path string) (s *Store, err error) { + db, err := shed.NewDB(path) + if err != nil { + return nil, err + } + s = &Store{ + db: db, + } + // Identify current storage schema by arbitrary name. + s.schemaName, err = db.NewStringField("schema-name") + if err != nil { + return nil, err + } + // Global ever incrementing index of chunk accesses. + s.accessCounter, err = db.NewUint64Field("access-counter") + if err != nil { + return nil, err + } + // Index storing actual chunk address, data and store timestamp. + s.retrievalIndex, err = db.NewIndex("Address->StoreTimestamp|Data", shed.IndexFuncs{ + EncodeKey: func(fields shed.IndexItem) (key []byte, err error) { + return fields.Address, nil + }, + DecodeKey: func(key []byte) (e shed.IndexItem, err error) { + e.Address = key + return e, nil + }, + EncodeValue: func(fields shed.IndexItem) (value []byte, err error) { + b := make([]byte, 8) + binary.BigEndian.PutUint64(b, uint64(fields.StoreTimestamp)) + value = append(b, fields.Data...) + return value, nil + }, + DecodeValue: func(value []byte) (e shed.IndexItem, err error) { + e.StoreTimestamp = int64(binary.BigEndian.Uint64(value[:8])) + e.Data = value[8:] + return e, nil + }, + }) + if err != nil { + return nil, err + } + // Index storing access timestamp for a particular address. + // It is needed in order to update gc index keys for iteration order. + s.accessIndex, err = db.NewIndex("Address->AccessTimestamp", shed.IndexFuncs{ + EncodeKey: func(fields shed.IndexItem) (key []byte, err error) { + return fields.Address, nil + }, + DecodeKey: func(key []byte) (e shed.IndexItem, err error) { + e.Address = key + return e, nil + }, + EncodeValue: func(fields shed.IndexItem) (value []byte, err error) { + b := make([]byte, 8) + binary.BigEndian.PutUint64(b, uint64(fields.AccessTimestamp)) + return b, nil + }, + DecodeValue: func(value []byte) (e shed.IndexItem, err error) { + e.AccessTimestamp = int64(binary.BigEndian.Uint64(value)) + return e, nil + }, + }) + if err != nil { + return nil, err + } + // Index with keys ordered by access timestamp for garbage collection prioritization. + s.gcIndex, err = db.NewIndex("AccessTimestamp|StoredTimestamp|Address->nil", shed.IndexFuncs{ + EncodeKey: func(fields shed.IndexItem) (key []byte, err error) { + b := make([]byte, 16, 16+len(fields.Address)) + binary.BigEndian.PutUint64(b[:8], uint64(fields.AccessTimestamp)) + binary.BigEndian.PutUint64(b[8:16], uint64(fields.StoreTimestamp)) + key = append(b, fields.Address...) + return key, nil + }, + DecodeKey: func(key []byte) (e shed.IndexItem, err error) { + e.AccessTimestamp = int64(binary.BigEndian.Uint64(key[:8])) + e.StoreTimestamp = int64(binary.BigEndian.Uint64(key[8:16])) + e.Address = key[16:] + return e, nil + }, + EncodeValue: func(fields shed.IndexItem) (value []byte, err error) { + return nil, nil + }, + DecodeValue: func(value []byte) (e shed.IndexItem, err error) { + return e, nil + }, + }) + if err != nil { + return nil, err + } + return s, nil +} + +// Put stores the chunk and sets it store timestamp. +func (s *Store) Put(_ context.Context, ch storage.Chunk) (err error) { + return s.retrievalIndex.Put(shed.IndexItem{ + Address: ch.Address(), + Data: ch.Data(), + StoreTimestamp: time.Now().UTC().UnixNano(), + }) +} + +// Get retrieves a chunk with the provided address. +// It updates access and gc indexes by removing the previous +// items from them and adding new items as keys of index entries +// are changed. +func (s *Store) Get(_ context.Context, addr storage.Address) (c storage.Chunk, err error) { + batch := new(leveldb.Batch) + + // Get the chunk data and storage timestamp. + item, err := s.retrievalIndex.Get(shed.IndexItem{ + Address: addr, + }) + if err != nil { + if err == leveldb.ErrNotFound { + return nil, storage.ErrChunkNotFound + } + return nil, err + } + + // Get the chunk access timestamp. + accessItem, err := s.accessIndex.Get(shed.IndexItem{ + Address: addr, + }) + switch err { + case nil: + // Remove gc index entry if access timestamp is found. + err = s.gcIndex.DeleteInBatch(batch, shed.IndexItem{ + Address: item.Address, + StoreTimestamp: accessItem.AccessTimestamp, + AccessTimestamp: item.StoreTimestamp, + }) + if err != nil { + return nil, err + } + case leveldb.ErrNotFound: + // Access timestamp is not found. Do not do anything. + // This is the firs get request. + default: + return nil, err + } + + // Specify new access timestamp + accessTimestamp := time.Now().UTC().UnixNano() + + // Put new access timestamp in access index. + err = s.accessIndex.PutInBatch(batch, shed.IndexItem{ + Address: addr, + AccessTimestamp: accessTimestamp, + }) + if err != nil { + return nil, err + } + + // Put new access timestamp in gc index. + err = s.gcIndex.PutInBatch(batch, shed.IndexItem{ + Address: item.Address, + AccessTimestamp: accessTimestamp, + StoreTimestamp: item.StoreTimestamp, + }) + if err != nil { + return nil, err + } + + // Increment access counter. + // Currently this information is not used anywhere. + _, err = s.accessCounter.IncInBatch(batch) + if err != nil { + return nil, err + } + + // Write the batch. + err = s.db.WriteBatch(batch) + if err != nil { + return nil, err + } + + // Return the chunk. + return storage.NewChunk(item.Address, item.Data), nil +} + +// CollectGarbage is an example of index iteration. +// It provides no reliable garbage collection functionality. +func (s *Store) CollectGarbage() (err error) { + const maxTrashSize = 100 + maxRounds := 10 // arbitrary number, needs to be calculated + + // Run a few gc rounds. + for roundCount := 0; roundCount < maxRounds; roundCount++ { + var garbageCount int + // New batch for a new cg round. + trash := new(leveldb.Batch) + // Iterate through all index items and break when needed. + err = s.gcIndex.IterateAll(func(item shed.IndexItem) (stop bool, err error) { + // Remove the chunk. + err = s.retrievalIndex.DeleteInBatch(trash, item) + if err != nil { + return false, err + } + // Remove the element in gc index. + err = s.gcIndex.DeleteInBatch(trash, item) + if err != nil { + return false, err + } + // Remove the relation in access index. + err = s.accessIndex.DeleteInBatch(trash, item) + if err != nil { + return false, err + } + garbageCount++ + if garbageCount >= maxTrashSize { + return true, nil + } + return false, nil + }) + if err != nil { + return err + } + if garbageCount == 0 { + return nil + } + err = s.db.WriteBatch(trash) + if err != nil { + return err + } + } + return nil +} + +// GetSchema is an example of retrieveing the most simple +// string from a database field. +func (s *Store) GetSchema() (name string, err error) { + name, err = s.schemaName.Get() + if err == leveldb.ErrNotFound { + return "", nil + } + return name, err +} + +// GetSchema is an example of storing the most simple +// string in a database field. +func (s *Store) PutSchema(name string) (err error) { + return s.schemaName.Put(name) +} + +// Close closes the underlying database. +func (s *Store) Close() error { + return s.db.Close() +} + +// Example_store constructs a simple storage implementation using shed package. +func Example_store() { + dir, err := ioutil.TempDir("", "ephemeral") + if err != nil { + log.Fatal(err) + } + defer os.RemoveAll(dir) + + s, err := New(dir) + if err != nil { + log.Fatal(err) + } + defer s.Close() + + ch := storage.GenerateRandomChunk(1024) + err = s.Put(context.Background(), ch) + if err != nil { + log.Fatal(err) + } + + got, err := s.Get(context.Background(), ch.Address()) + if err != nil { + log.Fatal(err) + } + + fmt.Println(bytes.Equal(got.Data(), ch.Data())) + + //Output: true +} diff --git a/swarm/shed/field_string.go b/swarm/shed/field_string.go new file mode 100644 index 000000000..a7e8f0c75 --- /dev/null +++ b/swarm/shed/field_string.go @@ -0,0 +1,66 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package shed + +import ( + "github.com/syndtr/goleveldb/leveldb" +) + +// StringField is the most simple field implementation +// that stores an arbitrary string under a specific LevelDB key. +type StringField struct { + db *DB + key []byte +} + +// NewStringField retruns a new Instance of StringField. +// It validates its name and type against the database schema. +func (db *DB) NewStringField(name string) (f StringField, err error) { + key, err := db.schemaFieldKey(name, "string") + if err != nil { + return f, err + } + return StringField{ + db: db, + key: key, + }, nil +} + +// Get returns a string value from database. +// If the value is not found, an empty string is returned +// an no error. +func (f StringField) Get() (val string, err error) { + b, err := f.db.Get(f.key) + if err != nil { + if err == leveldb.ErrNotFound { + return "", nil + } + return "", err + } + return string(b), nil +} + +// Put stores a string in the database. +func (f StringField) Put(val string) (err error) { + return f.db.Put(f.key, []byte(val)) +} + +// PutInBatch stores a string in a batch that can be +// saved later in database. +func (f StringField) PutInBatch(batch *leveldb.Batch, val string) { + batch.Put(f.key, []byte(val)) +} diff --git a/swarm/shed/field_string_test.go b/swarm/shed/field_string_test.go new file mode 100644 index 000000000..4215075bc --- /dev/null +++ b/swarm/shed/field_string_test.go @@ -0,0 +1,110 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package shed + +import ( + "testing" + + "github.com/syndtr/goleveldb/leveldb" +) + +// TestStringField validates put and get operations +// of the StringField. +func TestStringField(t *testing.T) { + db, cleanupFunc := newTestDB(t) + defer cleanupFunc() + + simpleString, err := db.NewStringField("simple-string") + if err != nil { + t.Fatal(err) + } + + t.Run("get empty", func(t *testing.T) { + got, err := simpleString.Get() + if err != nil { + t.Fatal(err) + } + want := "" + if got != want { + t.Errorf("got string %q, want %q", got, want) + } + }) + + t.Run("put", func(t *testing.T) { + want := "simple string value" + err = simpleString.Put(want) + if err != nil { + t.Fatal(err) + } + got, err := simpleString.Get() + if err != nil { + t.Fatal(err) + } + if got != want { + t.Errorf("got string %q, want %q", got, want) + } + + t.Run("overwrite", func(t *testing.T) { + want := "overwritten string value" + err = simpleString.Put(want) + if err != nil { + t.Fatal(err) + } + got, err := simpleString.Get() + if err != nil { + t.Fatal(err) + } + if got != want { + t.Errorf("got string %q, want %q", got, want) + } + }) + }) + + t.Run("put in batch", func(t *testing.T) { + batch := new(leveldb.Batch) + want := "simple string batch value" + simpleString.PutInBatch(batch, want) + err = db.WriteBatch(batch) + if err != nil { + t.Fatal(err) + } + got, err := simpleString.Get() + if err != nil { + t.Fatal(err) + } + if got != want { + t.Errorf("got string %q, want %q", got, want) + } + + t.Run("overwrite", func(t *testing.T) { + batch := new(leveldb.Batch) + want := "overwritten string batch value" + simpleString.PutInBatch(batch, want) + err = db.WriteBatch(batch) + if err != nil { + t.Fatal(err) + } + got, err := simpleString.Get() + if err != nil { + t.Fatal(err) + } + if got != want { + t.Errorf("got string %q, want %q", got, want) + } + }) + }) +} diff --git a/swarm/shed/field_struct.go b/swarm/shed/field_struct.go new file mode 100644 index 000000000..90daee7fc --- /dev/null +++ b/swarm/shed/field_struct.go @@ -0,0 +1,71 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package shed + +import ( + "github.com/ethereum/go-ethereum/rlp" + "github.com/syndtr/goleveldb/leveldb" +) + +// StructField is a helper to store complex structure by +// encoding it in RLP format. +type StructField struct { + db *DB + key []byte +} + +// NewStructField returns a new StructField. +// It validates its name and type against the database schema. +func (db *DB) NewStructField(name string) (f StructField, err error) { + key, err := db.schemaFieldKey(name, "struct-rlp") + if err != nil { + return f, err + } + return StructField{ + db: db, + key: key, + }, nil +} + +// Get unmarshals data from the database to a provided val. +// If the data is not found leveldb.ErrNotFound is returned. +func (f StructField) Get(val interface{}) (err error) { + b, err := f.db.Get(f.key) + if err != nil { + return err + } + return rlp.DecodeBytes(b, val) +} + +// Put marshals provided val and saves it to the database. +func (f StructField) Put(val interface{}) (err error) { + b, err := rlp.EncodeToBytes(val) + if err != nil { + return err + } + return f.db.Put(f.key, b) +} + +// PutInBatch marshals provided val and puts it into the batch. +func (f StructField) PutInBatch(batch *leveldb.Batch, val interface{}) (err error) { + b, err := rlp.EncodeToBytes(val) + if err != nil { + return err + } + batch.Put(f.key, b) + return nil +} diff --git a/swarm/shed/field_struct_test.go b/swarm/shed/field_struct_test.go new file mode 100644 index 000000000..cc0be0186 --- /dev/null +++ b/swarm/shed/field_struct_test.go @@ -0,0 +1,127 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package shed + +import ( + "testing" + + "github.com/syndtr/goleveldb/leveldb" +) + +// TestStructField validates put and get operations +// of the StructField. +func TestStructField(t *testing.T) { + db, cleanupFunc := newTestDB(t) + defer cleanupFunc() + + complexField, err := db.NewStructField("complex-field") + if err != nil { + t.Fatal(err) + } + + type complexStructure struct { + A string + } + + t.Run("get empty", func(t *testing.T) { + var s complexStructure + err := complexField.Get(&s) + if err != leveldb.ErrNotFound { + t.Fatalf("got error %v, want %v", err, leveldb.ErrNotFound) + } + want := "" + if s.A != want { + t.Errorf("got string %q, want %q", s.A, want) + } + }) + + t.Run("put", func(t *testing.T) { + want := complexStructure{ + A: "simple string value", + } + err = complexField.Put(want) + if err != nil { + t.Fatal(err) + } + var got complexStructure + err = complexField.Get(&got) + if err != nil { + t.Fatal(err) + } + if got.A != want.A { + t.Errorf("got string %q, want %q", got.A, want.A) + } + + t.Run("overwrite", func(t *testing.T) { + want := complexStructure{ + A: "overwritten string value", + } + err = complexField.Put(want) + if err != nil { + t.Fatal(err) + } + var got complexStructure + err = complexField.Get(&got) + if err != nil { + t.Fatal(err) + } + if got.A != want.A { + t.Errorf("got string %q, want %q", got.A, want.A) + } + }) + }) + + t.Run("put in batch", func(t *testing.T) { + batch := new(leveldb.Batch) + want := complexStructure{ + A: "simple string batch value", + } + complexField.PutInBatch(batch, want) + err = db.WriteBatch(batch) + if err != nil { + t.Fatal(err) + } + var got complexStructure + err := complexField.Get(&got) + if err != nil { + t.Fatal(err) + } + if got.A != want.A { + t.Errorf("got string %q, want %q", got, want) + } + + t.Run("overwrite", func(t *testing.T) { + batch := new(leveldb.Batch) + want := complexStructure{ + A: "overwritten string batch value", + } + complexField.PutInBatch(batch, want) + err = db.WriteBatch(batch) + if err != nil { + t.Fatal(err) + } + var got complexStructure + err := complexField.Get(&got) + if err != nil { + t.Fatal(err) + } + if got.A != want.A { + t.Errorf("got string %q, want %q", got, want) + } + }) + }) +} diff --git a/swarm/shed/field_uint64.go b/swarm/shed/field_uint64.go new file mode 100644 index 000000000..80e0069ae --- /dev/null +++ b/swarm/shed/field_uint64.go @@ -0,0 +1,108 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package shed + +import ( + "encoding/binary" + + "github.com/syndtr/goleveldb/leveldb" +) + +// Uint64Field provides a way to have a simple counter in the database. +// It transparently encodes uint64 type value to bytes. +type Uint64Field struct { + db *DB + key []byte +} + +// NewUint64Field returns a new Uint64Field. +// It validates its name and type against the database schema. +func (db *DB) NewUint64Field(name string) (f Uint64Field, err error) { + key, err := db.schemaFieldKey(name, "uint64") + if err != nil { + return f, err + } + return Uint64Field{ + db: db, + key: key, + }, nil +} + +// Get retrieves a uint64 value from the database. +// If the value is not found in the database a 0 value +// is returned and no error. +func (f Uint64Field) Get() (val uint64, err error) { + b, err := f.db.Get(f.key) + if err != nil { + if err == leveldb.ErrNotFound { + return 0, nil + } + return 0, err + } + return binary.BigEndian.Uint64(b), nil +} + +// Put encodes uin64 value and stores it in the database. +func (f Uint64Field) Put(val uint64) (err error) { + return f.db.Put(f.key, encodeUint64(val)) +} + +// PutInBatch stores a uint64 value in a batch +// that can be saved later in the database. +func (f Uint64Field) PutInBatch(batch *leveldb.Batch, val uint64) { + batch.Put(f.key, encodeUint64(val)) +} + +// Inc increments a uint64 value in the database. +// This operation is not goroutine save. +func (f Uint64Field) Inc() (val uint64, err error) { + val, err = f.Get() + if err != nil { + if err == leveldb.ErrNotFound { + val = 0 + } else { + return 0, err + } + } + val++ + return val, f.Put(val) +} + +// IncInBatch increments a uint64 value in the batch +// by retreiving a value from the database, not the same batch. +// This operation is not goroutine save. +func (f Uint64Field) IncInBatch(batch *leveldb.Batch) (val uint64, err error) { + val, err = f.Get() + if err != nil { + if err == leveldb.ErrNotFound { + val = 0 + } else { + return 0, err + } + } + val++ + f.PutInBatch(batch, val) + return val, nil +} + +// encode transforms uint64 to 8 byte long +// slice in big endian encoding. +func encodeUint64(val uint64) (b []byte) { + b = make([]byte, 8) + binary.BigEndian.PutUint64(b, val) + return b +} diff --git a/swarm/shed/field_uint64_test.go b/swarm/shed/field_uint64_test.go new file mode 100644 index 000000000..69ade71ba --- /dev/null +++ b/swarm/shed/field_uint64_test.go @@ -0,0 +1,194 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package shed + +import ( + "testing" + + "github.com/syndtr/goleveldb/leveldb" +) + +// TestUint64Field validates put and get operations +// of the Uint64Field. +func TestUint64Field(t *testing.T) { + db, cleanupFunc := newTestDB(t) + defer cleanupFunc() + + counter, err := db.NewUint64Field("counter") + if err != nil { + t.Fatal(err) + } + + t.Run("get empty", func(t *testing.T) { + got, err := counter.Get() + if err != nil { + t.Fatal(err) + } + var want uint64 + if got != want { + t.Errorf("got uint64 %v, want %v", got, want) + } + }) + + t.Run("put", func(t *testing.T) { + var want uint64 = 42 + err = counter.Put(want) + if err != nil { + t.Fatal(err) + } + got, err := counter.Get() + if err != nil { + t.Fatal(err) + } + if got != want { + t.Errorf("got uint64 %v, want %v", got, want) + } + + t.Run("overwrite", func(t *testing.T) { + var want uint64 = 84 + err = counter.Put(want) + if err != nil { + t.Fatal(err) + } + got, err := counter.Get() + if err != nil { + t.Fatal(err) + } + if got != want { + t.Errorf("got uint64 %v, want %v", got, want) + } + }) + }) + + t.Run("put in batch", func(t *testing.T) { + batch := new(leveldb.Batch) + var want uint64 = 42 + counter.PutInBatch(batch, want) + err = db.WriteBatch(batch) + if err != nil { + t.Fatal(err) + } + got, err := counter.Get() + if err != nil { + t.Fatal(err) + } + if got != want { + t.Errorf("got uint64 %v, want %v", got, want) + } + + t.Run("overwrite", func(t *testing.T) { + batch := new(leveldb.Batch) + var want uint64 = 84 + counter.PutInBatch(batch, want) + err = db.WriteBatch(batch) + if err != nil { + t.Fatal(err) + } + got, err := counter.Get() + if err != nil { + t.Fatal(err) + } + if got != want { + t.Errorf("got uint64 %v, want %v", got, want) + } + }) + }) +} + +// TestUint64Field_Inc validates Inc operation +// of the Uint64Field. +func TestUint64Field_Inc(t *testing.T) { + db, cleanupFunc := newTestDB(t) + defer cleanupFunc() + + counter, err := db.NewUint64Field("counter") + if err != nil { + t.Fatal(err) + } + + var want uint64 = 1 + got, err := counter.Inc() + if err != nil { + t.Fatal(err) + } + if got != want { + t.Errorf("got uint64 %v, want %v", got, want) + } + + want = 2 + got, err = counter.Inc() + if err != nil { + t.Fatal(err) + } + if got != want { + t.Errorf("got uint64 %v, want %v", got, want) + } +} + +// TestUint64Field_IncInBatch validates IncInBatch operation +// of the Uint64Field. +func TestUint64Field_IncInBatch(t *testing.T) { + db, cleanupFunc := newTestDB(t) + defer cleanupFunc() + + counter, err := db.NewUint64Field("counter") + if err != nil { + t.Fatal(err) + } + + batch := new(leveldb.Batch) + var want uint64 = 1 + got, err := counter.IncInBatch(batch) + if err != nil { + t.Fatal(err) + } + if got != want { + t.Errorf("got uint64 %v, want %v", got, want) + } + err = db.WriteBatch(batch) + if err != nil { + t.Fatal(err) + } + got, err = counter.Get() + if err != nil { + t.Fatal(err) + } + if got != want { + t.Errorf("got uint64 %v, want %v", got, want) + } + + batch2 := new(leveldb.Batch) + want = 2 + got, err = counter.IncInBatch(batch2) + if err != nil { + t.Fatal(err) + } + if got != want { + t.Errorf("got uint64 %v, want %v", got, want) + } + err = db.WriteBatch(batch2) + if err != nil { + t.Fatal(err) + } + got, err = counter.Get() + if err != nil { + t.Fatal(err) + } + if got != want { + t.Errorf("got uint64 %v, want %v", got, want) + } +} diff --git a/swarm/shed/index.go b/swarm/shed/index.go new file mode 100644 index 000000000..ba803e3c2 --- /dev/null +++ b/swarm/shed/index.go @@ -0,0 +1,264 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package shed + +import ( + "github.com/syndtr/goleveldb/leveldb" +) + +// IndexItem holds fields relevant to Swarm Chunk data and metadata. +// All information required for swarm storage and operations +// on that storage must be defined here. +// This structure is logically connected to swarm storage, +// the only part of this package that is not generalized, +// mostly for performance reasons. +// +// IndexItem is a type that is used for retrieving, storing and encoding +// chunk data and metadata. It is passed as an argument to Index encoding +// functions, get function and put function. +// But it is also returned with additional data from get function call +// and as the argument in iterator function definition. +type IndexItem struct { + Address []byte + Data []byte + AccessTimestamp int64 + StoreTimestamp int64 + // UseMockStore is a pointer to identify + // an unset state of the field in Join function. + UseMockStore *bool +} + +// Merge is a helper method to construct a new +// IndexItem by filling up fields with default values +// of a particular IndexItem with values from another one. +func (i IndexItem) Merge(i2 IndexItem) (new IndexItem) { + if i.Address == nil { + i.Address = i2.Address + } + if i.Data == nil { + i.Data = i2.Data + } + if i.AccessTimestamp == 0 { + i.AccessTimestamp = i2.AccessTimestamp + } + if i.StoreTimestamp == 0 { + i.StoreTimestamp = i2.StoreTimestamp + } + if i.UseMockStore == nil { + i.UseMockStore = i2.UseMockStore + } + return i +} + +// Index represents a set of LevelDB key value pairs that have common +// prefix. It holds functions for encoding and decoding keys and values +// to provide transparent actions on saved data which inclide: +// - getting a particular IndexItem +// - saving a particular IndexItem +// - iterating over a sorted LevelDB keys +// It implements IndexIteratorInterface interface. +type Index struct { + db *DB + prefix []byte + encodeKeyFunc func(fields IndexItem) (key []byte, err error) + decodeKeyFunc func(key []byte) (e IndexItem, err error) + encodeValueFunc func(fields IndexItem) (value []byte, err error) + decodeValueFunc func(value []byte) (e IndexItem, err error) +} + +// IndexFuncs structure defines functions for encoding and decoding +// LevelDB keys and values for a specific index. +type IndexFuncs struct { + EncodeKey func(fields IndexItem) (key []byte, err error) + DecodeKey func(key []byte) (e IndexItem, err error) + EncodeValue func(fields IndexItem) (value []byte, err error) + DecodeValue func(value []byte) (e IndexItem, err error) +} + +// NewIndex returns a new Index instance with defined name and +// encoding functions. The name must be unique and will be validated +// on database schema for a key prefix byte. +func (db *DB) NewIndex(name string, funcs IndexFuncs) (f Index, err error) { + id, err := db.schemaIndexPrefix(name) + if err != nil { + return f, err + } + prefix := []byte{id} + return Index{ + db: db, + prefix: prefix, + // This function adjusts Index LevelDB key + // by appending the provided index id byte. + // This is needed to avoid collisions between keys of different + // indexes as all index ids are unique. + encodeKeyFunc: func(e IndexItem) (key []byte, err error) { + key, err = funcs.EncodeKey(e) + if err != nil { + return nil, err + } + return append(append(make([]byte, 0, len(key)+1), prefix...), key...), nil + }, + // This function reverses the encodeKeyFunc constructed key + // to transparently work with index keys without their index ids. + // It assumes that index keys are prefixed with only one byte. + decodeKeyFunc: func(key []byte) (e IndexItem, err error) { + return funcs.DecodeKey(key[1:]) + }, + encodeValueFunc: funcs.EncodeValue, + decodeValueFunc: funcs.DecodeValue, + }, nil +} + +// Get accepts key fields represented as IndexItem to retrieve a +// value from the index and return maximum available information +// from the index represented as another IndexItem. +func (f Index) Get(keyFields IndexItem) (out IndexItem, err error) { + key, err := f.encodeKeyFunc(keyFields) + if err != nil { + return out, err + } + value, err := f.db.Get(key) + if err != nil { + return out, err + } + out, err = f.decodeValueFunc(value) + if err != nil { + return out, err + } + return out.Merge(keyFields), nil +} + +// Put accepts IndexItem to encode information from it +// and save it to the database. +func (f Index) Put(i IndexItem) (err error) { + key, err := f.encodeKeyFunc(i) + if err != nil { + return err + } + value, err := f.encodeValueFunc(i) + if err != nil { + return err + } + return f.db.Put(key, value) +} + +// PutInBatch is the same as Put method, but it just +// saves the key/value pair to the batch instead +// directly to the database. +func (f Index) PutInBatch(batch *leveldb.Batch, i IndexItem) (err error) { + key, err := f.encodeKeyFunc(i) + if err != nil { + return err + } + value, err := f.encodeValueFunc(i) + if err != nil { + return err + } + batch.Put(key, value) + return nil +} + +// Delete accepts IndexItem to remove a key/value pair +// from the database based on its fields. +func (f Index) Delete(keyFields IndexItem) (err error) { + key, err := f.encodeKeyFunc(keyFields) + if err != nil { + return err + } + return f.db.Delete(key) +} + +// DeleteInBatch is the same as Delete just the operation +// is performed on the batch instead on the database. +func (f Index) DeleteInBatch(batch *leveldb.Batch, keyFields IndexItem) (err error) { + key, err := f.encodeKeyFunc(keyFields) + if err != nil { + return err + } + batch.Delete(key) + return nil +} + +// IndexIterFunc is a callback on every IndexItem that is decoded +// by iterating on an Index keys. +// By returning a true for stop variable, iteration will +// stop, and by returning the error, that error will be +// propagated to the called iterator method on Index. +type IndexIterFunc func(item IndexItem) (stop bool, err error) + +// IterateAll iterates over all keys of the Index. +func (f Index) IterateAll(fn IndexIterFunc) (err error) { + it := f.db.NewIterator() + defer it.Release() + + for ok := it.Seek(f.prefix); ok; ok = it.Next() { + key := it.Key() + if key[0] != f.prefix[0] { + break + } + keyIndexItem, err := f.decodeKeyFunc(key) + if err != nil { + return err + } + valueIndexItem, err := f.decodeValueFunc(it.Value()) + if err != nil { + return err + } + stop, err := fn(keyIndexItem.Merge(valueIndexItem)) + if err != nil { + return err + } + if stop { + break + } + } + return it.Error() +} + +// IterateFrom iterates over Index keys starting from the key +// encoded from the provided IndexItem. +func (f Index) IterateFrom(start IndexItem, fn IndexIterFunc) (err error) { + startKey, err := f.encodeKeyFunc(start) + if err != nil { + return err + } + it := f.db.NewIterator() + defer it.Release() + + for ok := it.Seek(startKey); ok; ok = it.Next() { + key := it.Key() + if key[0] != f.prefix[0] { + break + } + keyIndexItem, err := f.decodeKeyFunc(key) + if err != nil { + return err + } + valueIndexItem, err := f.decodeValueFunc(it.Value()) + if err != nil { + return err + } + stop, err := fn(keyIndexItem.Merge(valueIndexItem)) + if err != nil { + return err + } + if stop { + break + } + } + return it.Error() +} diff --git a/swarm/shed/index_test.go b/swarm/shed/index_test.go new file mode 100644 index 000000000..ba82216df --- /dev/null +++ b/swarm/shed/index_test.go @@ -0,0 +1,426 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package shed + +import ( + "bytes" + "encoding/binary" + "fmt" + "sort" + "testing" + "time" + + "github.com/syndtr/goleveldb/leveldb" +) + +// Index functions for the index that is used in tests in this file. +var retrievalIndexFuncs = IndexFuncs{ + EncodeKey: func(fields IndexItem) (key []byte, err error) { + return fields.Address, nil + }, + DecodeKey: func(key []byte) (e IndexItem, err error) { + e.Address = key + return e, nil + }, + EncodeValue: func(fields IndexItem) (value []byte, err error) { + b := make([]byte, 8) + binary.BigEndian.PutUint64(b, uint64(fields.StoreTimestamp)) + value = append(b, fields.Data...) + return value, nil + }, + DecodeValue: func(value []byte) (e IndexItem, err error) { + e.StoreTimestamp = int64(binary.BigEndian.Uint64(value[:8])) + e.Data = value[8:] + return e, nil + }, +} + +// TestIndex validates put, get and delete functions of the Index implementation. +func TestIndex(t *testing.T) { + db, cleanupFunc := newTestDB(t) + defer cleanupFunc() + + index, err := db.NewIndex("retrieval", retrievalIndexFuncs) + if err != nil { + t.Fatal(err) + } + + t.Run("put", func(t *testing.T) { + want := IndexItem{ + Address: []byte("put-hash"), + Data: []byte("DATA"), + StoreTimestamp: time.Now().UTC().UnixNano(), + } + + err := index.Put(want) + if err != nil { + t.Fatal(err) + } + got, err := index.Get(IndexItem{ + Address: want.Address, + }) + if err != nil { + t.Fatal(err) + } + checkIndexItem(t, got, want) + + t.Run("overwrite", func(t *testing.T) { + want := IndexItem{ + Address: []byte("put-hash"), + Data: []byte("New DATA"), + StoreTimestamp: time.Now().UTC().UnixNano(), + } + + err = index.Put(want) + if err != nil { + t.Fatal(err) + } + got, err := index.Get(IndexItem{ + Address: want.Address, + }) + if err != nil { + t.Fatal(err) + } + checkIndexItem(t, got, want) + }) + }) + + t.Run("put in batch", func(t *testing.T) { + want := IndexItem{ + Address: []byte("put-in-batch-hash"), + Data: []byte("DATA"), + StoreTimestamp: time.Now().UTC().UnixNano(), + } + + batch := new(leveldb.Batch) + index.PutInBatch(batch, want) + err := db.WriteBatch(batch) + if err != nil { + t.Fatal(err) + } + got, err := index.Get(IndexItem{ + Address: want.Address, + }) + if err != nil { + t.Fatal(err) + } + checkIndexItem(t, got, want) + + t.Run("overwrite", func(t *testing.T) { + want := IndexItem{ + Address: []byte("put-in-batch-hash"), + Data: []byte("New DATA"), + StoreTimestamp: time.Now().UTC().UnixNano(), + } + + batch := new(leveldb.Batch) + index.PutInBatch(batch, want) + db.WriteBatch(batch) + if err != nil { + t.Fatal(err) + } + got, err := index.Get(IndexItem{ + Address: want.Address, + }) + if err != nil { + t.Fatal(err) + } + checkIndexItem(t, got, want) + }) + }) + + t.Run("put in batch twice", func(t *testing.T) { + // ensure that the last item of items with the same db keys + // is actually saved + batch := new(leveldb.Batch) + address := []byte("put-in-batch-twice-hash") + + // put the first item + index.PutInBatch(batch, IndexItem{ + Address: address, + Data: []byte("DATA"), + StoreTimestamp: time.Now().UTC().UnixNano(), + }) + + want := IndexItem{ + Address: address, + Data: []byte("New DATA"), + StoreTimestamp: time.Now().UTC().UnixNano(), + } + // then put the item that will produce the same key + // but different value in the database + index.PutInBatch(batch, want) + db.WriteBatch(batch) + if err != nil { + t.Fatal(err) + } + got, err := index.Get(IndexItem{ + Address: address, + }) + if err != nil { + t.Fatal(err) + } + checkIndexItem(t, got, want) + }) + + t.Run("delete", func(t *testing.T) { + want := IndexItem{ + Address: []byte("delete-hash"), + Data: []byte("DATA"), + StoreTimestamp: time.Now().UTC().UnixNano(), + } + + err := index.Put(want) + if err != nil { + t.Fatal(err) + } + got, err := index.Get(IndexItem{ + Address: want.Address, + }) + if err != nil { + t.Fatal(err) + } + checkIndexItem(t, got, want) + + err = index.Delete(IndexItem{ + Address: want.Address, + }) + if err != nil { + t.Fatal(err) + } + + wantErr := leveldb.ErrNotFound + got, err = index.Get(IndexItem{ + Address: want.Address, + }) + if err != wantErr { + t.Fatalf("got error %v, want %v", err, wantErr) + } + }) + + t.Run("delete in batch", func(t *testing.T) { + want := IndexItem{ + Address: []byte("delete-in-batch-hash"), + Data: []byte("DATA"), + StoreTimestamp: time.Now().UTC().UnixNano(), + } + + err := index.Put(want) + if err != nil { + t.Fatal(err) + } + got, err := index.Get(IndexItem{ + Address: want.Address, + }) + if err != nil { + t.Fatal(err) + } + checkIndexItem(t, got, want) + + batch := new(leveldb.Batch) + index.DeleteInBatch(batch, IndexItem{ + Address: want.Address, + }) + err = db.WriteBatch(batch) + if err != nil { + t.Fatal(err) + } + + wantErr := leveldb.ErrNotFound + got, err = index.Get(IndexItem{ + Address: want.Address, + }) + if err != wantErr { + t.Fatalf("got error %v, want %v", err, wantErr) + } + }) +} + +// TestIndex_iterate validates index iterator functions for correctness. +func TestIndex_iterate(t *testing.T) { + db, cleanupFunc := newTestDB(t) + defer cleanupFunc() + + index, err := db.NewIndex("retrieval", retrievalIndexFuncs) + if err != nil { + t.Fatal(err) + } + + items := []IndexItem{ + { + Address: []byte("iterate-hash-01"), + Data: []byte("data80"), + }, + { + Address: []byte("iterate-hash-03"), + Data: []byte("data22"), + }, + { + Address: []byte("iterate-hash-05"), + Data: []byte("data41"), + }, + { + Address: []byte("iterate-hash-02"), + Data: []byte("data84"), + }, + { + Address: []byte("iterate-hash-06"), + Data: []byte("data1"), + }, + } + batch := new(leveldb.Batch) + for _, i := range items { + index.PutInBatch(batch, i) + } + err = db.WriteBatch(batch) + if err != nil { + t.Fatal(err) + } + item04 := IndexItem{ + Address: []byte("iterate-hash-04"), + Data: []byte("data0"), + } + err = index.Put(item04) + if err != nil { + t.Fatal(err) + } + items = append(items, item04) + + sort.SliceStable(items, func(i, j int) bool { + return bytes.Compare(items[i].Address, items[j].Address) < 0 + }) + + t.Run("all", func(t *testing.T) { + var i int + err := index.IterateAll(func(item IndexItem) (stop bool, err error) { + if i > len(items)-1 { + return true, fmt.Errorf("got unexpected index item: %#v", item) + } + want := items[i] + checkIndexItem(t, item, want) + i++ + return false, nil + }) + if err != nil { + t.Fatal(err) + } + }) + + t.Run("from", func(t *testing.T) { + startIndex := 2 + i := startIndex + err := index.IterateFrom(items[startIndex], func(item IndexItem) (stop bool, err error) { + if i > len(items)-1 { + return true, fmt.Errorf("got unexpected index item: %#v", item) + } + want := items[i] + checkIndexItem(t, item, want) + i++ + return false, nil + }) + if err != nil { + t.Fatal(err) + } + }) + + t.Run("stop", func(t *testing.T) { + var i int + stopIndex := 3 + var count int + err := index.IterateAll(func(item IndexItem) (stop bool, err error) { + if i > len(items)-1 { + return true, fmt.Errorf("got unexpected index item: %#v", item) + } + want := items[i] + checkIndexItem(t, item, want) + count++ + if i == stopIndex { + return true, nil + } + i++ + return false, nil + }) + if err != nil { + t.Fatal(err) + } + wantItemsCount := stopIndex + 1 + if count != wantItemsCount { + t.Errorf("got %v items, expected %v", count, wantItemsCount) + } + }) + + t.Run("no overflow", func(t *testing.T) { + secondIndex, err := db.NewIndex("second-index", retrievalIndexFuncs) + if err != nil { + t.Fatal(err) + } + + secondIndexItem := IndexItem{ + Address: []byte("iterate-hash-10"), + Data: []byte("data-second"), + } + err = secondIndex.Put(secondIndexItem) + if err != nil { + t.Fatal(err) + } + + var i int + err = index.IterateAll(func(item IndexItem) (stop bool, err error) { + if i > len(items)-1 { + return true, fmt.Errorf("got unexpected index item: %#v", item) + } + want := items[i] + checkIndexItem(t, item, want) + i++ + return false, nil + }) + if err != nil { + t.Fatal(err) + } + + i = 0 + err = secondIndex.IterateAll(func(item IndexItem) (stop bool, err error) { + if i > 1 { + return true, fmt.Errorf("got unexpected index item: %#v", item) + } + checkIndexItem(t, item, secondIndexItem) + i++ + return false, nil + }) + if err != nil { + t.Fatal(err) + } + }) +} + +// checkIndexItem is a test helper function that compares if two Index items are the same. +func checkIndexItem(t *testing.T, got, want IndexItem) { + t.Helper() + + if !bytes.Equal(got.Address, want.Address) { + t.Errorf("got hash %q, expected %q", string(got.Address), string(want.Address)) + } + if !bytes.Equal(got.Data, want.Data) { + t.Errorf("got data %q, expected %q", string(got.Data), string(want.Data)) + } + if got.StoreTimestamp != want.StoreTimestamp { + t.Errorf("got store timestamp %v, expected %v", got.StoreTimestamp, want.StoreTimestamp) + } + if got.AccessTimestamp != want.AccessTimestamp { + t.Errorf("got access timestamp %v, expected %v", got.AccessTimestamp, want.AccessTimestamp) + } +} diff --git a/swarm/shed/schema.go b/swarm/shed/schema.go new file mode 100644 index 000000000..cfb7c6d64 --- /dev/null +++ b/swarm/shed/schema.go @@ -0,0 +1,134 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package shed + +import ( + "encoding/json" + "errors" + "fmt" +) + +var ( + // LevelDB key value for storing the schema. + keySchema = []byte{0} + // LevelDB key prefix for all field type. + // LevelDB keys will be constructed by appending name values to this prefix. + keyPrefixFields byte = 1 + // LevelDB key prefix from which indexing keys start. + // Every index has its own key prefix and this value defines the first one. + keyPrefixIndexStart byte = 2 // Q: or maybe a higher number like 7, to have more space for potential specific perfixes +) + +// schema is used to serialize known database structure information. +type schema struct { + Fields map[string]fieldSpec `json:"fields"` // keys are field names + Indexes map[byte]indexSpec `json:"indexes"` // keys are index prefix bytes +} + +// fieldSpec holds information about a particular field. +// It does not need Name field as it is contained in the +// schema.Field map key. +type fieldSpec struct { + Type string `json:"type"` +} + +// indxSpec holds information about a particular index. +// It does not contain index type, as indexes do not have type. +type indexSpec struct { + Name string `json:"name"` +} + +// schemaFieldKey retrives the complete LevelDB key for +// a particular field form the schema definition. +func (db *DB) schemaFieldKey(name, fieldType string) (key []byte, err error) { + if name == "" { + return nil, errors.New("field name can not be blank") + } + if fieldType == "" { + return nil, errors.New("field type can not be blank") + } + s, err := db.getSchema() + if err != nil { + return nil, err + } + var found bool + for n, f := range s.Fields { + if n == name { + if f.Type != fieldType { + return nil, fmt.Errorf("field %q of type %q stored as %q in db", name, fieldType, f.Type) + } + break + } + } + if !found { + s.Fields[name] = fieldSpec{ + Type: fieldType, + } + err := db.putSchema(s) + if err != nil { + return nil, err + } + } + return append([]byte{keyPrefixFields}, []byte(name)...), nil +} + +// schemaIndexID retrieves the complete LevelDB prefix for +// a particular index. +func (db *DB) schemaIndexPrefix(name string) (id byte, err error) { + if name == "" { + return 0, errors.New("index name can not be blank") + } + s, err := db.getSchema() + if err != nil { + return 0, err + } + nextID := keyPrefixIndexStart + for i, f := range s.Indexes { + if i >= nextID { + nextID = i + 1 + } + if f.Name == name { + return i, nil + } + } + id = nextID + s.Indexes[id] = indexSpec{ + Name: name, + } + return id, db.putSchema(s) +} + +// getSchema retrieves the complete schema from +// the database. +func (db *DB) getSchema() (s schema, err error) { + b, err := db.Get(keySchema) + if err != nil { + return s, err + } + err = json.Unmarshal(b, &s) + return s, err +} + +// putSchema stores the complete schema to +// the database. +func (db *DB) putSchema(s schema) (err error) { + b, err := json.Marshal(s) + if err != nil { + return err + } + return db.Put(keySchema, b) +} diff --git a/swarm/shed/schema_test.go b/swarm/shed/schema_test.go new file mode 100644 index 000000000..a0c1838c8 --- /dev/null +++ b/swarm/shed/schema_test.go @@ -0,0 +1,126 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package shed + +import ( + "bytes" + "testing" +) + +// TestDB_schemaFieldKey validates correctness of schemaFieldKey. +func TestDB_schemaFieldKey(t *testing.T) { + db, cleanupFunc := newTestDB(t) + defer cleanupFunc() + + t.Run("empty name or type", func(t *testing.T) { + _, err := db.schemaFieldKey("", "") + if err == nil { + t.Errorf("error not returned, but expected") + } + _, err = db.schemaFieldKey("", "type") + if err == nil { + t.Errorf("error not returned, but expected") + } + + _, err = db.schemaFieldKey("test", "") + if err == nil { + t.Errorf("error not returned, but expected") + } + }) + + t.Run("same field", func(t *testing.T) { + key1, err := db.schemaFieldKey("test", "undefined") + if err != nil { + t.Fatal(err) + } + + key2, err := db.schemaFieldKey("test", "undefined") + if err != nil { + t.Fatal(err) + } + + if !bytes.Equal(key1, key2) { + t.Errorf("schema keys for the same field name are not the same: %q, %q", string(key1), string(key2)) + } + }) + + t.Run("different fields", func(t *testing.T) { + key1, err := db.schemaFieldKey("test1", "undefined") + if err != nil { + t.Fatal(err) + } + + key2, err := db.schemaFieldKey("test2", "undefined") + if err != nil { + t.Fatal(err) + } + + if bytes.Equal(key1, key2) { + t.Error("schema keys for the same field name are the same, but must not be") + } + }) + + t.Run("same field name different types", func(t *testing.T) { + _, err := db.schemaFieldKey("the-field", "one-type") + if err != nil { + t.Fatal(err) + } + + _, err = db.schemaFieldKey("the-field", "another-type") + if err == nil { + t.Errorf("error not returned, but expected") + } + }) +} + +// TestDB_schemaIndexPrefix validates correctness of schemaIndexPrefix. +func TestDB_schemaIndexPrefix(t *testing.T) { + db, cleanupFunc := newTestDB(t) + defer cleanupFunc() + + t.Run("same name", func(t *testing.T) { + id1, err := db.schemaIndexPrefix("test") + if err != nil { + t.Fatal(err) + } + + id2, err := db.schemaIndexPrefix("test") + if err != nil { + t.Fatal(err) + } + + if id1 != id2 { + t.Errorf("schema keys for the same field name are not the same: %v, %v", id1, id2) + } + }) + + t.Run("different names", func(t *testing.T) { + id1, err := db.schemaIndexPrefix("test1") + if err != nil { + t.Fatal(err) + } + + id2, err := db.schemaIndexPrefix("test2") + if err != nil { + t.Fatal(err) + } + + if id1 == id2 { + t.Error("schema ids for the same index name are the same, but must not be") + } + }) +} diff --git a/swarm/storage/mock/db/db.go b/swarm/storage/mock/db/db.go index 43bfa24f0..73ae199e8 100644 --- a/swarm/storage/mock/db/db.go +++ b/swarm/storage/mock/db/db.go @@ -86,6 +86,13 @@ func (s *GlobalStore) Put(addr common.Address, key []byte, data []byte) error { return s.db.Write(batch, nil) } +// Delete removes the chunk reference to node with address addr. +func (s *GlobalStore) Delete(addr common.Address, key []byte) error { + batch := new(leveldb.Batch) + batch.Delete(nodeDBKey(addr, key)) + return s.db.Write(batch, nil) +} + // HasKey returns whether a node with addr contains the key. func (s *GlobalStore) HasKey(addr common.Address, key []byte) bool { has, err := s.db.Has(nodeDBKey(addr, key), nil) diff --git a/swarm/storage/mock/mem/mem.go b/swarm/storage/mock/mem/mem.go index 8878309d0..3a0a2beb8 100644 --- a/swarm/storage/mock/mem/mem.go +++ b/swarm/storage/mock/mem/mem.go @@ -83,6 +83,22 @@ func (s *GlobalStore) Put(addr common.Address, key []byte, data []byte) error { return nil } +// Delete removes the chunk data for node with address addr. +func (s *GlobalStore) Delete(addr common.Address, key []byte) error { + s.mu.Lock() + defer s.mu.Unlock() + + var count int + if _, ok := s.nodes[string(key)]; ok { + delete(s.nodes[string(key)], addr) + count = len(s.nodes[string(key)]) + } + if count == 0 { + delete(s.data, string(key)) + } + return nil +} + // HasKey returns whether a node with addr contains the key. func (s *GlobalStore) HasKey(addr common.Address, key []byte) bool { s.mu.Lock() diff --git a/swarm/storage/mock/mock.go b/swarm/storage/mock/mock.go index 81340f927..1fb71b70a 100644 --- a/swarm/storage/mock/mock.go +++ b/swarm/storage/mock/mock.go @@ -70,6 +70,12 @@ func (n *NodeStore) Put(key []byte, data []byte) error { return n.store.Put(n.addr, key, data) } +// Delete removes chunk data for a key for a node that has the address +// provided on NodeStore initialization. +func (n *NodeStore) Delete(key []byte) error { + return n.store.Delete(n.addr, key) +} + // GlobalStorer defines methods for mock db store // that stores chunk data for all swarm nodes. // It is used in tests to construct mock NodeStores @@ -77,6 +83,7 @@ func (n *NodeStore) Put(key []byte, data []byte) error { type GlobalStorer interface { Get(addr common.Address, key []byte) (data []byte, err error) Put(addr common.Address, key []byte, data []byte) error + Delete(addr common.Address, key []byte) error HasKey(addr common.Address, key []byte) bool // NewNodeStore creates an instance of NodeStore // to be used by a single swarm node with diff --git a/swarm/storage/mock/rpc/rpc.go b/swarm/storage/mock/rpc/rpc.go index 6e735f698..8cd6c83a7 100644 --- a/swarm/storage/mock/rpc/rpc.go +++ b/swarm/storage/mock/rpc/rpc.go @@ -73,6 +73,12 @@ func (s *GlobalStore) Put(addr common.Address, key []byte, data []byte) error { return err } +// Delete calls a Delete method to RPC server. +func (s *GlobalStore) Delete(addr common.Address, key []byte) error { + err := s.client.Call(nil, "mockStore_delete", addr, key) + return err +} + // HasKey calls a HasKey method to RPC server. func (s *GlobalStore) HasKey(addr common.Address, key []byte) bool { var has bool diff --git a/swarm/storage/mock/test/test.go b/swarm/storage/mock/test/test.go index 02da3af55..10180985f 100644 --- a/swarm/storage/mock/test/test.go +++ b/swarm/storage/mock/test/test.go @@ -72,6 +72,31 @@ func MockStore(t *testing.T, globalStore mock.GlobalStorer, n int) { } } } + t.Run("delete", func(t *testing.T) { + chunkAddr := storage.Address([]byte("1234567890abcd")) + for _, addr := range addrs { + err := globalStore.Put(addr, chunkAddr, []byte("data")) + if err != nil { + t.Fatalf("put data to store %s key %s: %v", addr.Hex(), chunkAddr.Hex(), err) + } + } + firstNodeAddr := addrs[0] + if err := globalStore.Delete(firstNodeAddr, chunkAddr); err != nil { + t.Fatalf("delete from store %s key %s: %v", firstNodeAddr.Hex(), chunkAddr.Hex(), err) + } + for i, addr := range addrs { + _, err := globalStore.Get(addr, chunkAddr) + if i == 0 { + if err != mock.ErrNotFound { + t.Errorf("get data from store %s key %s: expected mock.ErrNotFound error, got %v", addr.Hex(), chunkAddr.Hex(), err) + } + } else { + if err != nil { + t.Errorf("get data from store %s key %s: %v", addr.Hex(), chunkAddr.Hex(), err) + } + } + } + }) }) t.Run("NodeStore", func(t *testing.T) { @@ -114,6 +139,34 @@ func MockStore(t *testing.T, globalStore mock.GlobalStorer, n int) { } } } + t.Run("delete", func(t *testing.T) { + chunkAddr := storage.Address([]byte("1234567890abcd")) + var chosenStore *mock.NodeStore + for addr, store := range nodes { + if chosenStore == nil { + chosenStore = store + } + err := store.Put(chunkAddr, []byte("data")) + if err != nil { + t.Fatalf("put data to store %s key %s: %v", addr.Hex(), chunkAddr.Hex(), err) + } + } + if err := chosenStore.Delete(chunkAddr); err != nil { + t.Fatalf("delete key %s: %v", chunkAddr.Hex(), err) + } + for addr, store := range nodes { + _, err := store.Get(chunkAddr) + if store == chosenStore { + if err != mock.ErrNotFound { + t.Errorf("get data from store %s key %s: expected mock.ErrNotFound error, got %v", addr.Hex(), chunkAddr.Hex(), err) + } + } else { + if err != nil { + t.Errorf("get data from store %s key %s: %v", addr.Hex(), chunkAddr.Hex(), err) + } + } + } + }) }) } -- cgit v1.2.3