diff options
-rw-r--r-- | Godeps/Godeps.json | 24 | ||||
-rw-r--r-- | Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/batch.go | 1 | ||||
-rw-r--r-- | Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_compaction.go | 4 | ||||
-rw-r--r-- | Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/storage/file_storage_nacl.go | 34 | ||||
-rw-r--r-- | Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/util/hash.go | 28 | ||||
-rw-r--r-- | cmd/bootnode/main.go | 28 | ||||
-rw-r--r-- | eth/db_upgrade.go | 13 | ||||
-rw-r--r-- | eth/filters/api.go | 63 | ||||
-rw-r--r-- | eth/filters/filter_system.go | 19 |
9 files changed, 146 insertions, 68 deletions
diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index fc9523e53..edfa8ad1e 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -152,51 +152,51 @@ }, { "ImportPath": "github.com/syndtr/goleveldb/leveldb", - "Rev": "917f41c560270110ceb73c5b38be2a9127387071" + "Rev": "ab8b5dcf1042e818ab68e770d465112a899b668e" }, { "ImportPath": "github.com/syndtr/goleveldb/leveldb/cache", - "Rev": "917f41c560270110ceb73c5b38be2a9127387071" + "Rev": "ab8b5dcf1042e818ab68e770d465112a899b668e" }, { "ImportPath": "github.com/syndtr/goleveldb/leveldb/comparer", - "Rev": "917f41c560270110ceb73c5b38be2a9127387071" + "Rev": "ab8b5dcf1042e818ab68e770d465112a899b668e" }, { "ImportPath": "github.com/syndtr/goleveldb/leveldb/errors", - "Rev": "917f41c560270110ceb73c5b38be2a9127387071" + "Rev": "ab8b5dcf1042e818ab68e770d465112a899b668e" }, { "ImportPath": "github.com/syndtr/goleveldb/leveldb/filter", - "Rev": "917f41c560270110ceb73c5b38be2a9127387071" + "Rev": "ab8b5dcf1042e818ab68e770d465112a899b668e" }, { "ImportPath": "github.com/syndtr/goleveldb/leveldb/iterator", - "Rev": "917f41c560270110ceb73c5b38be2a9127387071" + "Rev": "ab8b5dcf1042e818ab68e770d465112a899b668e" }, { "ImportPath": "github.com/syndtr/goleveldb/leveldb/journal", - "Rev": "917f41c560270110ceb73c5b38be2a9127387071" + "Rev": "ab8b5dcf1042e818ab68e770d465112a899b668e" }, { "ImportPath": "github.com/syndtr/goleveldb/leveldb/memdb", - "Rev": "917f41c560270110ceb73c5b38be2a9127387071" + "Rev": "ab8b5dcf1042e818ab68e770d465112a899b668e" }, { "ImportPath": "github.com/syndtr/goleveldb/leveldb/opt", - "Rev": "917f41c560270110ceb73c5b38be2a9127387071" + "Rev": "ab8b5dcf1042e818ab68e770d465112a899b668e" }, { "ImportPath": "github.com/syndtr/goleveldb/leveldb/storage", - "Rev": "917f41c560270110ceb73c5b38be2a9127387071" + "Rev": "ab8b5dcf1042e818ab68e770d465112a899b668e" }, { "ImportPath": "github.com/syndtr/goleveldb/leveldb/table", - "Rev": "917f41c560270110ceb73c5b38be2a9127387071" + "Rev": "ab8b5dcf1042e818ab68e770d465112a899b668e" }, { "ImportPath": "github.com/syndtr/goleveldb/leveldb/util", - "Rev": "917f41c560270110ceb73c5b38be2a9127387071" + "Rev": "ab8b5dcf1042e818ab68e770d465112a899b668e" }, { "ImportPath": "golang.org/x/crypto/pbkdf2", diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/batch.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/batch.go index 652fa4124..501006717 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/batch.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/batch.go @@ -158,6 +158,7 @@ func (b *Batch) append(p *Batch) { b.grow(len(p.data) - batchHdrLen) b.data = append(b.data, p.data[batchHdrLen:]...) b.rLen += p.rLen + b.bLen += p.bLen } if p.sync { b.sync = true diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_compaction.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_compaction.go index 9664e64d0..659f00dc6 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_compaction.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_compaction.go @@ -172,7 +172,7 @@ func (db *DB) compactionTransact(name string, t compactionTransactInterface) { disableBackoff = db.s.o.GetDisableCompactionBackoff() ) for n := 0; ; n++ { - // Check wether the DB is closed. + // Check whether the DB is closed. if db.isClosed() { db.logf("%s exiting", name) db.compactionExitTransact() @@ -688,7 +688,7 @@ func (db *DB) compTrigger(compC chan<- cCmd) { } } -// This will trigger auto compation and/or wait for all compaction to be done. +// This will trigger auto compaction and/or wait for all compaction to be done. func (db *DB) compTriggerWait(compC chan<- cCmd) (err error) { ch := make(chan error) defer close(ch) diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/storage/file_storage_nacl.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/storage/file_storage_nacl.go new file mode 100644 index 000000000..5545aeef2 --- /dev/null +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/storage/file_storage_nacl.go @@ -0,0 +1,34 @@ +// Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com> +// All rights reserved. +// +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +// +build nacl + +package storage + +import ( + "os" + "syscall" +) + +func newFileLock(path string, readOnly bool) (fl fileLock, err error) { + return nil, syscall.ENOTSUP +} + +func setFileLock(f *os.File, readOnly, lock bool) error { + return syscall.ENOTSUP +} + +func rename(oldpath, newpath string) error { + return syscall.ENOTSUP +} + +func isErrInvalid(err error) bool { + return false +} + +func syncDir(name string) error { + return syscall.ENOTSUP +} diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/util/hash.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/util/hash.go index 54903660f..7f3fa4e2c 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/util/hash.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/util/hash.go @@ -7,38 +7,38 @@ package util import ( - "bytes" "encoding/binary" ) // Hash return hash of the given data. func Hash(data []byte, seed uint32) uint32 { // Similar to murmur hash - var m uint32 = 0xc6a4a793 - var r uint32 = 24 - h := seed ^ (uint32(len(data)) * m) + const ( + m = uint32(0xc6a4a793) + r = uint32(24) + ) + var ( + h = seed ^ (uint32(len(data)) * m) + i int + ) - buf := bytes.NewBuffer(data) - for buf.Len() >= 4 { - var w uint32 - binary.Read(buf, binary.LittleEndian, &w) - h += w + for n := len(data) - len(data)%4; i < n; i += 4 { + h += binary.LittleEndian.Uint32(data[i:]) h *= m h ^= (h >> 16) } - rest := buf.Bytes() - switch len(rest) { + switch len(data) - i { default: panic("not reached") case 3: - h += uint32(rest[2]) << 16 + h += uint32(data[i+2]) << 16 fallthrough case 2: - h += uint32(rest[1]) << 8 + h += uint32(data[i+1]) << 8 fallthrough case 1: - h += uint32(rest[0]) + h += uint32(data[i]) h *= m h ^= (h >> r) case 0: diff --git a/cmd/bootnode/main.go b/cmd/bootnode/main.go index 7d3f9fdb3..40d3cdc17 100644 --- a/cmd/bootnode/main.go +++ b/cmd/bootnode/main.go @@ -20,6 +20,7 @@ package main import ( "crypto/ecdsa" "flag" + "fmt" "os" "github.com/ethereum/go-ethereum/cmd/utils" @@ -32,7 +33,8 @@ import ( func main() { var ( listenAddr = flag.String("addr", ":30301", "listen address") - genKey = flag.String("genkey", "", "generate a node key and quit") + genKey = flag.String("genkey", "", "generate a node key") + writeAddr = flag.Bool("writeaddress", false, "write out the node's pubkey hash and quit") nodeKeyFile = flag.String("nodekey", "", "private key filename") nodeKeyHex = flag.String("nodekeyhex", "", "private key as hex (for testing)") natdesc = flag.String("nat", "none", "port mapping mechanism (any|none|upnp|pmp|extip:<IP>)") @@ -45,22 +47,19 @@ func main() { glog.SetToStderr(true) flag.Parse() - if *genKey != "" { - key, err := crypto.GenerateKey() - if err != nil { - utils.Fatalf("could not generate key: %v", err) - } - if err := crypto.SaveECDSA(*genKey, key); err != nil { - utils.Fatalf("%v", err) - } - os.Exit(0) - } - natm, err := nat.Parse(*natdesc) if err != nil { utils.Fatalf("-nat: %v", err) } switch { + case *genKey != "": + nodeKey, err = crypto.GenerateKey() + if err != nil { + utils.Fatalf("could not generate key: %v", err) + } + if err = crypto.SaveECDSA(*genKey, nodeKey); err != nil { + utils.Fatalf("%v", err) + } case *nodeKeyFile == "" && *nodeKeyHex == "": utils.Fatalf("Use -nodekey or -nodekeyhex to specify a private key") case *nodeKeyFile != "" && *nodeKeyHex != "": @@ -75,6 +74,11 @@ func main() { } } + if *writeAddr { + fmt.Printf("%v\n", discover.PubkeyID(&nodeKey.PublicKey)) + os.Exit(0) + } + if _, err := discover.ListenUDP(nodeKey, *listenAddr, natm, ""); err != nil { utils.Fatalf("%v", err) } diff --git a/eth/db_upgrade.go b/eth/db_upgrade.go index 12de60fe7..172bb0954 100644 --- a/eth/db_upgrade.go +++ b/eth/db_upgrade.go @@ -93,6 +93,9 @@ func upgradeSequentialKeys(db ethdb.Database) (stopFn func()) { func upgradeSequentialCanonicalNumbers(db ethdb.Database, stopFn func() bool) (error, bool) { prefix := []byte("block-num-") it := db.(*ethdb.LDBDatabase).NewIterator() + defer func() { + it.Release() + }() it.Seek(prefix) cnt := 0 for bytes.HasPrefix(it.Key(), prefix) { @@ -100,6 +103,9 @@ func upgradeSequentialCanonicalNumbers(db ethdb.Database, stopFn func() bool) (e if len(keyPtr) < 20 { cnt++ if cnt%100000 == 0 { + it.Release() + it = db.(*ethdb.LDBDatabase).NewIterator() + it.Seek(keyPtr) glog.V(logger.Info).Infof("converting %d canonical numbers...", cnt) } number := big.NewInt(0).SetBytes(keyPtr[10:]).Uint64() @@ -130,6 +136,9 @@ func upgradeSequentialCanonicalNumbers(db ethdb.Database, stopFn func() bool) (e func upgradeSequentialBlocks(db ethdb.Database, stopFn func() bool) (error, bool) { prefix := []byte("block-") it := db.(*ethdb.LDBDatabase).NewIterator() + defer func() { + it.Release() + }() it.Seek(prefix) cnt := 0 for bytes.HasPrefix(it.Key(), prefix) { @@ -137,6 +146,9 @@ func upgradeSequentialBlocks(db ethdb.Database, stopFn func() bool) (error, bool if len(keyPtr) >= 38 { cnt++ if cnt%10000 == 0 { + it.Release() + it = db.(*ethdb.LDBDatabase).NewIterator() + it.Seek(keyPtr) glog.V(logger.Info).Infof("converting %d blocks...", cnt) } // convert header, body, td and block receipts @@ -175,6 +187,7 @@ func upgradeSequentialBlocks(db ethdb.Database, stopFn func() bool) (error, bool func upgradeSequentialOrphanedReceipts(db ethdb.Database, stopFn func() bool) (error, bool) { prefix := []byte("receipts-block-") it := db.(*ethdb.LDBDatabase).NewIterator() + defer it.Release() it.Seek(prefix) cnt := 0 for bytes.HasPrefix(it.Key(), prefix) { diff --git a/eth/filters/api.go b/eth/filters/api.go index 393019f8b..65c5b9380 100644 --- a/eth/filters/api.go +++ b/eth/filters/api.go @@ -68,8 +68,6 @@ type PublicFilterAPI struct { transactionMu sync.RWMutex transactionQueue map[int]*hashQueue - - transactMu sync.Mutex } // NewPublicFilterAPI returns a new PublicFilterAPI instance. @@ -100,6 +98,7 @@ done: for { select { case <-timer.C: + s.filterManager.Lock() // lock order like filterLoop() s.logMu.Lock() for id, filter := range s.logQueue { if time.Since(filter.timeout) > filterTickerTime { @@ -126,6 +125,7 @@ done: } } s.transactionMu.Unlock() + s.filterManager.Unlock() case <-s.quit: break done } @@ -135,19 +135,24 @@ done: // NewBlockFilter create a new filter that returns blocks that are included into the canonical chain. func (s *PublicFilterAPI) NewBlockFilter() (string, error) { + // protect filterManager.Add() and setting of filter fields + s.filterManager.Lock() + defer s.filterManager.Unlock() + externalId, err := newFilterId() if err != nil { return "", err } - s.blockMu.Lock() filter := New(s.chainDb) id, err := s.filterManager.Add(filter, ChainFilter) if err != nil { return "", err } + s.blockMu.Lock() s.blockQueue[id] = &hashQueue{timeout: time.Now()} + s.blockMu.Unlock() filter.BlockCallback = func(block *types.Block, logs vm.Logs) { s.blockMu.Lock() @@ -158,8 +163,6 @@ func (s *PublicFilterAPI) NewBlockFilter() (string, error) { } } - defer s.blockMu.Unlock() - s.filterMapMu.Lock() s.filterMapping[externalId] = id s.filterMapMu.Unlock() @@ -169,21 +172,24 @@ func (s *PublicFilterAPI) NewBlockFilter() (string, error) { // NewPendingTransactionFilter creates a filter that returns new pending transactions. func (s *PublicFilterAPI) NewPendingTransactionFilter() (string, error) { + // protect filterManager.Add() and setting of filter fields + s.filterManager.Lock() + defer s.filterManager.Unlock() + externalId, err := newFilterId() if err != nil { return "", err } - s.transactionMu.Lock() - defer s.transactionMu.Unlock() - filter := New(s.chainDb) id, err := s.filterManager.Add(filter, PendingTxFilter) if err != nil { return "", err } + s.transactionMu.Lock() s.transactionQueue[id] = &hashQueue{timeout: time.Now()} + s.transactionMu.Unlock() filter.TransactionCallback = func(tx *types.Transaction) { s.transactionMu.Lock() @@ -203,8 +209,9 @@ func (s *PublicFilterAPI) NewPendingTransactionFilter() (string, error) { // newLogFilter creates a new log filter. func (s *PublicFilterAPI) newLogFilter(earliest, latest int64, addresses []common.Address, topics [][]common.Hash, callback func(log *vm.Log, removed bool)) (int, error) { - s.logMu.Lock() - defer s.logMu.Unlock() + // protect filterManager.Add() and setting of filter fields + s.filterManager.Lock() + defer s.filterManager.Unlock() filter := New(s.chainDb) id, err := s.filterManager.Add(filter, LogFilter) @@ -212,7 +219,9 @@ func (s *PublicFilterAPI) newLogFilter(earliest, latest int64, addresses []commo return 0, err } + s.logMu.Lock() s.logQueue[id] = &logQueue{timeout: time.Now()} + s.logMu.Unlock() filter.SetBeginBlock(earliest) filter.SetEndBlock(latest) @@ -443,35 +452,43 @@ func (s *PublicFilterAPI) GetLogs(args NewFilterArgs) []vmlog { // UninstallFilter removes the filter with the given filter id. func (s *PublicFilterAPI) UninstallFilter(filterId string) bool { - s.filterMapMu.Lock() - defer s.filterMapMu.Unlock() + s.filterManager.Lock() + defer s.filterManager.Unlock() + s.filterMapMu.Lock() id, ok := s.filterMapping[filterId] if !ok { + s.filterMapMu.Unlock() return false } - - defer s.filterManager.Remove(id) delete(s.filterMapping, filterId) + s.filterMapMu.Unlock() + s.filterManager.Remove(id) + + s.logMu.Lock() if _, ok := s.logQueue[id]; ok { - s.logMu.Lock() - defer s.logMu.Unlock() delete(s.logQueue, id) + s.logMu.Unlock() return true } + s.logMu.Unlock() + + s.blockMu.Lock() if _, ok := s.blockQueue[id]; ok { - s.blockMu.Lock() - defer s.blockMu.Unlock() delete(s.blockQueue, id) + s.blockMu.Unlock() return true } + s.blockMu.Unlock() + + s.transactionMu.Lock() if _, ok := s.transactionQueue[id]; ok { - s.transactionMu.Lock() - defer s.transactionMu.Unlock() delete(s.transactionQueue, id) + s.transactionMu.Unlock() return true } + s.transactionMu.Unlock() return false } @@ -525,7 +542,9 @@ func (s *PublicFilterAPI) logFilterChanged(id int) []vmlog { // GetFilterLogs returns the logs for the filter with the given id. func (s *PublicFilterAPI) GetFilterLogs(filterId string) []vmlog { + s.filterMapMu.RLock() id, ok := s.filterMapping[filterId] + s.filterMapMu.RUnlock() if !ok { return toRPCLogs(nil, false) } @@ -540,9 +559,9 @@ func (s *PublicFilterAPI) GetFilterLogs(filterId string) []vmlog { // GetFilterChanges returns the logs for the filter with the given id since last time is was called. // This can be used for polling. func (s *PublicFilterAPI) GetFilterChanges(filterId string) interface{} { - s.filterMapMu.Lock() + s.filterMapMu.RLock() id, ok := s.filterMapping[filterId] - s.filterMapMu.Unlock() + s.filterMapMu.RUnlock() if !ok { // filter not found return []interface{}{} diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index 4343dfa21..256464213 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -82,11 +82,20 @@ func (fs *FilterSystem) Stop() { fs.sub.Unsubscribe() } -// Add adds a filter to the filter manager -func (fs *FilterSystem) Add(filter *Filter, filterType FilterType) (int, error) { +// Acquire filter system maps lock, required to force lock acquisition +// sequence with filterMu acquired first to avoid deadlocks by callbacks +func (fs *FilterSystem) Lock() { fs.filterMu.Lock() - defer fs.filterMu.Unlock() +} + +// Release filter system maps lock +func (fs *FilterSystem) Unlock() { + fs.filterMu.Unlock() +} +// Add adds a filter to the filter manager +// Expects filterMu to be locked. +func (fs *FilterSystem) Add(filter *Filter, filterType FilterType) (int, error) { id := fs.filterId filter.created = time.Now() @@ -110,10 +119,8 @@ func (fs *FilterSystem) Add(filter *Filter, filterType FilterType) (int, error) } // Remove removes a filter by filter id +// Expects filterMu to be locked. func (fs *FilterSystem) Remove(id int) { - fs.filterMu.Lock() - defer fs.filterMu.Unlock() - delete(fs.chainFilters, id) delete(fs.pendingTxFilters, id) delete(fs.logFilters, id) |