diff options
author | Jeffrey Wilcke <geffobscura@gmail.com> | 2015-10-12 23:58:51 +0800 |
---|---|---|
committer | Jeffrey Wilcke <geffobscura@gmail.com> | 2015-10-17 03:28:59 +0800 |
commit | 6dc14788a238f3e0ec786c6c04d476a3b957e645 (patch) | |
tree | 8f3f5f91506bc4c7532543043add1eaea3fd28e7 /core | |
parent | 30f057aaf9891fb37f82d94c24b8aa35d388e07b (diff) | |
download | dexon-6dc14788a238f3e0ec786c6c04d476a3b957e645.tar dexon-6dc14788a238f3e0ec786c6c04d476a3b957e645.tar.gz dexon-6dc14788a238f3e0ec786c6c04d476a3b957e645.tar.bz2 dexon-6dc14788a238f3e0ec786c6c04d476a3b957e645.tar.lz dexon-6dc14788a238f3e0ec786c6c04d476a3b957e645.tar.xz dexon-6dc14788a238f3e0ec786c6c04d476a3b957e645.tar.zst dexon-6dc14788a238f3e0ec786c6c04d476a3b957e645.zip |
core, eth/filters, miner, xeth: Optimised log filtering
Log filtering is now using a MIPmap like approach where addresses of
logs are added to a mapped bloom bin. The current levels for the MIP are
in ranges of 1.000.000, 500.000, 100.000, 50.000, 1.000. Logs are
therefor filtered in batches of 1.000.
Diffstat (limited to 'core')
-rw-r--r-- | core/blockchain.go | 27 | ||||
-rw-r--r-- | core/chain_makers.go | 7 | ||||
-rw-r--r-- | core/chain_util.go | 44 | ||||
-rw-r--r-- | core/chain_util_test.go | 112 | ||||
-rw-r--r-- | core/transaction_util.go | 32 | ||||
-rw-r--r-- | core/types/bloom9.go | 41 | ||||
-rw-r--r-- | core/types/bloom9_test.go | 34 | ||||
-rw-r--r-- | core/types/common.go | 35 |
8 files changed, 271 insertions, 61 deletions
diff --git a/core/blockchain.go b/core/blockchain.go index 6c555e9ee..5cb800f1d 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -654,10 +654,17 @@ func (self *BlockChain) InsertChain(chain types.Blocks) (int, error) { events = append(events, ChainEvent{block, block.Hash(), logs}) // This puts transactions in a extra db for rpc - PutTransactions(self.chainDb, block, block.Transactions()) + if err := PutTransactions(self.chainDb, block, block.Transactions()); err != nil { + return i, err + } // store the receipts - PutReceipts(self.chainDb, receipts) - + if err := PutReceipts(self.chainDb, receipts); err != nil { + return i, err + } + // Write map map bloom filters + if err := WriteMipmapBloom(self.chainDb, block.NumberU64(), receipts); err != nil { + return i, err + } case SideStatTy: if glog.V(logger.Detail) { glog.Infof("inserted forked block #%d (TD=%v) (%d TXs %d UNCs) (%x...). Took %v\n", block.Number(), block.Difficulty(), len(block.Transactions()), len(block.Uncles()), block.Hash().Bytes()[0:4], time.Since(bstart)) @@ -743,8 +750,18 @@ func (self *BlockChain) reorg(oldBlock, newBlock *types.Block) error { // insert the block in the canonical way, re-writing history self.insert(block) // write canonical receipts and transactions - PutTransactions(self.chainDb, block, block.Transactions()) - PutReceipts(self.chainDb, GetBlockReceipts(self.chainDb, block.Hash())) + if err := PutTransactions(self.chainDb, block, block.Transactions()); err != nil { + return err + } + receipts := GetBlockReceipts(self.chainDb, block.Hash()) + // write receipts + if err := PutReceipts(self.chainDb, receipts); err != nil { + return err + } + // Write map map bloom filters + if err := WriteMipmapBloom(self.chainDb, block.NumberU64(), receipts); err != nil { + return err + } addedTxs = append(addedTxs, block.Transactions()...) } diff --git a/core/chain_makers.go b/core/chain_makers.go index c2871a097..5f2cfeb63 100644 --- a/core/chain_makers.go +++ b/core/chain_makers.go @@ -105,7 +105,12 @@ func (b *BlockGen) AddTx(tx *types.Transaction) { b.receipts = append(b.receipts, receipt) } -func (b *BlockGen) AddReceipt(receipt *types.Receipt) { +// AddUncheckedReceipts forcefully adds a receipts to the block without a +// backing transaction. +// +// AddUncheckedReceipts will cause consensus failures when used during real +// chain processing. This is best used in conjuction with raw block insertion. +func (b *BlockGen) AddUncheckedReceipt(receipt *types.Receipt) { b.receipts = append(b.receipts, receipt) } diff --git a/core/chain_util.go b/core/chain_util.go index 33d94cebd..42b6a5be2 100644 --- a/core/chain_util.go +++ b/core/chain_util.go @@ -18,6 +18,8 @@ package core import ( "bytes" + "encoding/binary" + "fmt" "math/big" "github.com/ethereum/go-ethereum/common" @@ -42,6 +44,9 @@ var ( ExpDiffPeriod = big.NewInt(100000) blockHashPre = []byte("block-hash-") // [deprecated by eth/63] + + mipmapPre = []byte("mipmap-log-bloom-") + MIPMapLevels = []uint64{1000000, 500000, 100000, 50000, 1000} ) // CalcDifficulty is the difficulty adjustment algorithm. It returns @@ -346,3 +351,42 @@ func GetBlockByHashOld(db ethdb.Database, hash common.Hash) *types.Block { } return (*types.Block)(&block) } + +// returns a formatted MIP mapped key by adding prefix, canonical number and level +// +// ex. fn(98, 1000) = (prefix || 1000 || 0) +func mipmapKey(num, level uint64) []byte { + lkey := make([]byte, 8) + binary.BigEndian.PutUint64(lkey, level) + key := new(big.Int).SetUint64(num / level * level) + + return append(mipmapPre, append(lkey, key.Bytes()...)...) +} + +// WriteMapmapBloom writes each address included in the receipts' logs to the +// MIP bloom bin. +func WriteMipmapBloom(db ethdb.Database, number uint64, receipts types.Receipts) error { + batch := db.NewBatch() + for _, level := range MIPMapLevels { + key := mipmapKey(number, level) + bloomDat, _ := db.Get(key) + bloom := types.BytesToBloom(bloomDat) + for _, receipt := range receipts { + for _, log := range receipt.Logs() { + bloom.Add(log.Address.Big()) + } + } + batch.Put(key, bloom.Bytes()) + } + if err := batch.Write(); err != nil { + return fmt.Errorf("mipmap write fail for: %d: %v", number, err) + } + return nil +} + +// GetMipmapBloom returns a bloom filter using the number and level as input +// parameters. For available levels see MIPMapLevels. +func GetMipmapBloom(db ethdb.Database, number, level uint64) types.Bloom { + bloomDat, _ := db.Get(mipmapKey(number, level)) + return types.BytesToBloom(bloomDat) +} diff --git a/core/chain_util_test.go b/core/chain_util_test.go index 3f0446715..62b73a064 100644 --- a/core/chain_util_test.go +++ b/core/chain_util_test.go @@ -18,12 +18,15 @@ package core import ( "encoding/json" + "io/ioutil" "math/big" "os" "testing" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/core/vm" + "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto/sha3" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/rlp" @@ -318,3 +321,112 @@ func TestHeadStorage(t *testing.T) { t.Fatalf("Head block hash mismatch: have %v, want %v", entry, blockFull.Hash()) } } + +func TestMipmapBloom(t *testing.T) { + db, _ := ethdb.NewMemDatabase() + + receipt1 := new(types.Receipt) + receipt1.SetLogs(vm.Logs{ + &vm.Log{Address: common.BytesToAddress([]byte("test"))}, + &vm.Log{Address: common.BytesToAddress([]byte("address"))}, + }) + receipt2 := new(types.Receipt) + receipt2.SetLogs(vm.Logs{ + &vm.Log{Address: common.BytesToAddress([]byte("test"))}, + &vm.Log{Address: common.BytesToAddress([]byte("address1"))}, + }) + + WriteMipmapBloom(db, 1, types.Receipts{receipt1}) + WriteMipmapBloom(db, 2, types.Receipts{receipt2}) + + for _, level := range MIPMapLevels { + bloom := GetMipmapBloom(db, 2, level) + if !bloom.Test(new(big.Int).SetBytes([]byte("address1"))) { + t.Error("expected test to be included on level:", level) + } + } + + // reset + db, _ = ethdb.NewMemDatabase() + receipt := new(types.Receipt) + receipt.SetLogs(vm.Logs{ + &vm.Log{Address: common.BytesToAddress([]byte("test"))}, + }) + WriteMipmapBloom(db, 999, types.Receipts{receipt1}) + + receipt = new(types.Receipt) + receipt.SetLogs(vm.Logs{ + &vm.Log{Address: common.BytesToAddress([]byte("test 1"))}, + }) + WriteMipmapBloom(db, 1000, types.Receipts{receipt}) + + bloom := GetMipmapBloom(db, 1000, 1000) + if bloom.TestBytes([]byte("test")) { + t.Error("test should not have been included") + } +} + +func TestMipmapChain(t *testing.T) { + dir, err := ioutil.TempDir("", "mipmap") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(dir) + + var ( + db, _ = ethdb.NewLDBDatabase(dir, 16) + key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") + addr = crypto.PubkeyToAddress(key1.PublicKey) + addr2 = common.BytesToAddress([]byte("jeff")) + + hash1 = common.BytesToHash([]byte("topic1")) + ) + defer db.Close() + + genesis := WriteGenesisBlockForTesting(db, GenesisAccount{addr, big.NewInt(1000000)}) + chain := GenerateChain(genesis, db, 1010, func(i int, gen *BlockGen) { + var receipts types.Receipts + switch i { + case 1: + receipt := types.NewReceipt(nil, new(big.Int)) + receipt.SetLogs(vm.Logs{ + &vm.Log{ + Address: addr, + Topics: []common.Hash{hash1}, + }, + }) + gen.AddUncheckedReceipt(receipt) + receipts = types.Receipts{receipt} + case 1000: + receipt := types.NewReceipt(nil, new(big.Int)) + receipt.SetLogs(vm.Logs{&vm.Log{Address: addr2}}) + gen.AddUncheckedReceipt(receipt) + receipts = types.Receipts{receipt} + + } + + // store the receipts + err := PutReceipts(db, receipts) + if err != nil { + t.Fatal(err) + } + WriteMipmapBloom(db, uint64(i+1), receipts) + }) + for _, block := range chain { + WriteBlock(db, block) + if err := WriteCanonicalHash(db, block.Hash(), block.NumberU64()); err != nil { + t.Fatalf("failed to insert block number: %v", err) + } + if err := WriteHeadBlockHash(db, block.Hash()); err != nil { + t.Fatalf("failed to insert block number: %v", err) + } + if err := PutBlockReceipts(db, block, block.Receipts()); err != nil { + t.Fatal("error writing block receipts:", err) + } + } + + bloom := GetMipmapBloom(db, 0, 1000) + if bloom.TestBytes(addr2[:]) { + t.Error("address was included in bloom and should not have") + } +} diff --git a/core/transaction_util.go b/core/transaction_util.go index ebe095abb..d55ed14da 100644 --- a/core/transaction_util.go +++ b/core/transaction_util.go @@ -17,6 +17,8 @@ package core import ( + "fmt" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethdb" @@ -32,22 +34,16 @@ var ( ) // PutTransactions stores the transactions in the given database -func PutTransactions(db ethdb.Database, block *types.Block, txs types.Transactions) { - batch := new(leveldb.Batch) - _, batchWrite := db.(*ethdb.LDBDatabase) +func PutTransactions(db ethdb.Database, block *types.Block, txs types.Transactions) error { + batch := db.NewBatch() for i, tx := range block.Transactions() { rlpEnc, err := rlp.EncodeToBytes(tx) if err != nil { - glog.V(logger.Debug).Infoln("Failed encoding tx", err) - return + return fmt.Errorf("failed encoding tx: %v", err) } - if batchWrite { - batch.Put(tx.Hash().Bytes(), rlpEnc) - } else { - db.Put(tx.Hash().Bytes(), rlpEnc) - } + batch.Put(tx.Hash().Bytes(), rlpEnc) var txExtra struct { BlockHash common.Hash @@ -59,22 +55,16 @@ func PutTransactions(db ethdb.Database, block *types.Block, txs types.Transactio txExtra.Index = uint64(i) rlpMeta, err := rlp.EncodeToBytes(txExtra) if err != nil { - glog.V(logger.Debug).Infoln("Failed encoding tx meta data", err) - return + return fmt.Errorf("failed encoding tx meta data: %v", err) } - if batchWrite { - batch.Put(append(tx.Hash().Bytes(), 0x0001), rlpMeta) - } else { - db.Put(append(tx.Hash().Bytes(), 0x0001), rlpMeta) - } + batch.Put(append(tx.Hash().Bytes(), 0x0001), rlpMeta) } - if db, ok := db.(*ethdb.LDBDatabase); ok { - if err := db.LDB().Write(batch, nil); err != nil { - glog.V(logger.Error).Infoln("db write err:", err) - } + if err := batch.Write(); err != nil { + return fmt.Errorf("failed writing tx to db: %v", err) } + return nil } func DeleteTransaction(db ethdb.Database, txHash common.Hash) { diff --git a/core/types/bloom9.go b/core/types/bloom9.go index f87ae58e6..97db20ee9 100644 --- a/core/types/bloom9.go +++ b/core/types/bloom9.go @@ -17,6 +17,7 @@ package types import ( + "fmt" "math/big" "github.com/ethereum/go-ethereum/common" @@ -28,6 +29,46 @@ type bytesBacked interface { Bytes() []byte } +const bloomLength = 256 + +type Bloom [bloomLength]byte + +func BytesToBloom(b []byte) Bloom { + var bloom Bloom + bloom.SetBytes(b) + return bloom +} + +func (b *Bloom) SetBytes(d []byte) { + if len(b) < len(d) { + panic(fmt.Sprintf("bloom bytes too big %d %d", len(b), len(d))) + } + + copy(b[bloomLength-len(d):], d) +} + +func (b *Bloom) Add(d *big.Int) { + bin := new(big.Int).SetBytes(b[:]) + bin.Or(bin, bloom9(d.Bytes())) + b.SetBytes(bin.Bytes()) +} + +func (b Bloom) Big() *big.Int { + return common.Bytes2Big(b[:]) +} + +func (b Bloom) Bytes() []byte { + return b[:] +} + +func (b Bloom) Test(test *big.Int) bool { + return BloomLookup(b, test) +} + +func (b Bloom) TestBytes(test []byte) bool { + return b.Test(common.BytesToBig(test)) +} + func CreateBloom(receipts Receipts) Bloom { bin := new(big.Int) for _, receipt := range receipts { diff --git a/core/types/bloom9_test.go b/core/types/bloom9_test.go index f020670b1..5744bec6c 100644 --- a/core/types/bloom9_test.go +++ b/core/types/bloom9_test.go @@ -16,6 +16,40 @@ package types +import ( + "math/big" + "testing" +) + +func TestBloom(t *testing.T) { + positive := []string{ + "testtest", + "test", + "hallo", + "other", + } + negative := []string{ + "tes", + "lo", + } + + var bloom Bloom + for _, data := range positive { + bloom.Add(new(big.Int).SetBytes([]byte(data))) + } + + for _, data := range positive { + if !bloom.Test(new(big.Int).SetBytes([]byte(data))) { + t.Error("expected", data, "to test true") + } + } + for _, data := range negative { + if bloom.Test(new(big.Int).SetBytes([]byte(data))) { + t.Error("did not expect", data, "to test true") + } + } +} + /* import ( "testing" diff --git a/core/types/common.go b/core/types/common.go index dc428c00c..29019a1b4 100644 --- a/core/types/common.go +++ b/core/types/common.go @@ -16,41 +16,8 @@ package types -import ( - "math/big" - - "fmt" - - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/vm" -) +import "github.com/ethereum/go-ethereum/core/vm" type BlockProcessor interface { Process(*Block) (vm.Logs, Receipts, error) } - -const bloomLength = 256 - -type Bloom [bloomLength]byte - -func BytesToBloom(b []byte) Bloom { - var bloom Bloom - bloom.SetBytes(b) - return bloom -} - -func (b *Bloom) SetBytes(d []byte) { - if len(b) < len(d) { - panic(fmt.Sprintf("bloom bytes too big %d %d", len(b), len(d))) - } - - copy(b[bloomLength-len(d):], d) -} - -func (b Bloom) Big() *big.Int { - return common.Bytes2Big(b[:]) -} - -func (b Bloom) Bytes() []byte { - return b[:] -} |