diff options
-rw-r--r-- | eth/api.go | 20 | ||||
-rw-r--r-- | eth/downloader/downloader_test.go | 19 | ||||
-rw-r--r-- | eth/filters/api.go | 1 | ||||
-rw-r--r-- | eth/filters/filter.go | 3 | ||||
-rw-r--r-- | eth/filters/filter_system.go | 1 | ||||
-rw-r--r-- | eth/filters/filter_system_test.go | 7 | ||||
-rw-r--r-- | eth/sync.go | 6 | ||||
-rw-r--r-- | ethdb/database_test.go | 161 | ||||
-rw-r--r-- | rpc/server.go | 29 | ||||
-rw-r--r-- | rpc/subscription.go | 1 | ||||
-rw-r--r-- | rpc/subscription_test.go | 16 | ||||
-rw-r--r-- | rpc/types.go | 13 | ||||
-rw-r--r-- | rpc/utils.go | 27 |
13 files changed, 193 insertions, 111 deletions
diff --git a/eth/api.go b/eth/api.go index 0d90759b6..f5214fc37 100644 --- a/eth/api.go +++ b/eth/api.go @@ -465,26 +465,6 @@ func (api *PrivateDebugAPI) traceBlock(block *types.Block, logConfig *vm.LogConf return true, structLogger.StructLogs(), nil } -// callmsg is the message type used for call transitions. -type callmsg struct { - addr common.Address - to *common.Address - gas, gasPrice *big.Int - value *big.Int - data []byte -} - -// accessor boilerplate to implement core.Message -func (m callmsg) From() (common.Address, error) { return m.addr, nil } -func (m callmsg) FromFrontier() (common.Address, error) { return m.addr, nil } -func (m callmsg) Nonce() uint64 { return 0 } -func (m callmsg) CheckNonce() bool { return false } -func (m callmsg) To() *common.Address { return m.to } -func (m callmsg) GasPrice() *big.Int { return m.gasPrice } -func (m callmsg) Gas() *big.Int { return m.gas } -func (m callmsg) Value() *big.Int { return m.value } -func (m callmsg) Data() []byte { return m.data } - // formatError formats a Go error into either an empty string or the data content // of the error itself. func formatError(err error) string { diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index b354682a1..36e8c800f 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -403,8 +403,7 @@ func (dl *downloadTester) newSlowPeer(id string, version int, hashes []common.Ha dl.lock.Lock() defer dl.lock.Unlock() - var err error - err = dl.downloader.RegisterPeer(id, version, &downloadTesterPeer{dl, id, delay}) + var err = dl.downloader.RegisterPeer(id, version, &downloadTesterPeer{dl, id, delay}) if err == nil { // Assign the owned hashes, headers and blocks to the peer (deep copy) dl.peerHashes[id] = make([]common.Hash, len(hashes)) @@ -1381,7 +1380,7 @@ func testSyncProgress(t *testing.T, protocol int, mode SyncMode) { go func() { defer pending.Done() if err := tester.sync("peer-half", nil, mode); err != nil { - t.Fatalf("failed to synchronise blocks: %v", err) + panic(fmt.Sprintf("failed to synchronise blocks: %v", err)) } }() <-starting @@ -1398,7 +1397,7 @@ func testSyncProgress(t *testing.T, protocol int, mode SyncMode) { go func() { defer pending.Done() if err := tester.sync("peer-full", nil, mode); err != nil { - t.Fatalf("failed to synchronise blocks: %v", err) + panic(fmt.Sprintf("failed to synchronise blocks: %v", err)) } }() <-starting @@ -1454,7 +1453,7 @@ func testForkedSyncProgress(t *testing.T, protocol int, mode SyncMode) { go func() { defer pending.Done() if err := tester.sync("fork A", nil, mode); err != nil { - t.Fatalf("failed to synchronise blocks: %v", err) + panic(fmt.Sprintf("failed to synchronise blocks: %v", err)) } }() <-starting @@ -1474,7 +1473,7 @@ func testForkedSyncProgress(t *testing.T, protocol int, mode SyncMode) { go func() { defer pending.Done() if err := tester.sync("fork B", nil, mode); err != nil { - t.Fatalf("failed to synchronise blocks: %v", err) + panic(fmt.Sprintf("failed to synchronise blocks: %v", err)) } }() <-starting @@ -1535,7 +1534,7 @@ func testFailedSyncProgress(t *testing.T, protocol int, mode SyncMode) { go func() { defer pending.Done() if err := tester.sync("faulty", nil, mode); err == nil { - t.Fatalf("succeeded faulty synchronisation") + panic("succeeded faulty synchronisation") } }() <-starting @@ -1552,7 +1551,7 @@ func testFailedSyncProgress(t *testing.T, protocol int, mode SyncMode) { go func() { defer pending.Done() if err := tester.sync("valid", nil, mode); err != nil { - t.Fatalf("failed to synchronise blocks: %v", err) + panic(fmt.Sprintf("failed to synchronise blocks: %v", err)) } }() <-starting @@ -1613,7 +1612,7 @@ func testFakedSyncProgress(t *testing.T, protocol int, mode SyncMode) { go func() { defer pending.Done() if err := tester.sync("attack", nil, mode); err == nil { - t.Fatalf("succeeded attacker synchronisation") + panic("succeeded attacker synchronisation") } }() <-starting @@ -1630,7 +1629,7 @@ func testFakedSyncProgress(t *testing.T, protocol int, mode SyncMode) { go func() { defer pending.Done() if err := tester.sync("valid", nil, mode); err != nil { - t.Fatalf("failed to synchronise blocks: %v", err) + panic(fmt.Sprintf("failed to synchronise blocks: %v", err)) } }() <-starting diff --git a/eth/filters/api.go b/eth/filters/api.go index 61647a5d0..fff58a268 100644 --- a/eth/filters/api.go +++ b/eth/filters/api.go @@ -54,7 +54,6 @@ type PublicFilterAPI struct { backend Backend useMipMap bool mux *event.TypeMux - quit chan struct{} chainDb ethdb.Database events *EventSystem filtersMu sync.Mutex diff --git a/eth/filters/filter.go b/eth/filters/filter.go index 0a0b81224..f27b76929 100644 --- a/eth/filters/filter.go +++ b/eth/filters/filter.go @@ -20,7 +20,6 @@ import ( "context" "math" "math/big" - "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" @@ -42,8 +41,6 @@ type Filter struct { backend Backend useMipMap bool - created time.Time - db ethdb.Database begin, end int64 addresses []common.Address diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index 7abace1e6..ab0b7473e 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -74,7 +74,6 @@ type subscription struct { // subscription which match the subscription criteria. type EventSystem struct { mux *event.TypeMux - sub *event.TypeMuxSubscription backend Backend lightMode bool lastHead *types.Header diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go index 822580b56..23e6d66e1 100644 --- a/eth/filters/filter_system_test.go +++ b/eth/filters/filter_system_test.go @@ -18,6 +18,7 @@ package filters import ( "context" + "fmt" "math/big" "reflect" "testing" @@ -439,15 +440,15 @@ func TestPendingLogsSubscription(t *testing.T) { } if len(fetched) != len(tt.expected) { - t.Fatalf("invalid number of logs for case %d, want %d log(s), got %d", i, len(tt.expected), len(fetched)) + panic(fmt.Sprintf("invalid number of logs for case %d, want %d log(s), got %d", i, len(tt.expected), len(fetched))) } for l := range fetched { if fetched[l].Removed { - t.Errorf("expected log not to be removed for log %d in case %d", l, i) + panic(fmt.Sprintf("expected log not to be removed for log %d in case %d", l, i)) } if !reflect.DeepEqual(fetched[l], tt.expected[l]) { - t.Errorf("invalid log on index %d for case %d", l, i) + panic(fmt.Sprintf("invalid log on index %d for case %d", l, i)) } } }() diff --git a/eth/sync.go b/eth/sync.go index 8784b225d..7442f912c 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -138,7 +138,9 @@ func (pm *ProtocolManager) syncer() { defer pm.downloader.Terminate() // Wait for different events to fire synchronisation operations - forceSync := time.Tick(forceSyncCycle) + forceSync := time.NewTicker(forceSyncCycle) + defer forceSync.Stop() + for { select { case <-pm.newPeerCh: @@ -148,7 +150,7 @@ func (pm *ProtocolManager) syncer() { } go pm.synchronise(pm.peers.BestPeer()) - case <-forceSync: + case <-forceSync.C: // Force a sync even if not enough peers are present go pm.synchronise(pm.peers.BestPeer()) diff --git a/ethdb/database_test.go b/ethdb/database_test.go index 0e69a1218..4740cdaed 100644 --- a/ethdb/database_test.go +++ b/ethdb/database_test.go @@ -14,21 +14,164 @@ // You should have received a copy of the GNU Lesser General Public License // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. -package ethdb +package ethdb_test import ( + "bytes" + "fmt" + "io/ioutil" "os" - "path/filepath" + "strconv" + "sync" + "testing" - "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/ethdb" ) -func newDb() *LDBDatabase { - file := filepath.Join("/", "tmp", "ldbtesttmpfile") - if common.FileExist(file) { - os.RemoveAll(file) +func newTestLDB() (*ethdb.LDBDatabase, func()) { + dirname, err := ioutil.TempDir(os.TempDir(), "ethdb_test_") + if err != nil { + panic("failed to create test file: " + err.Error()) + } + db, err := ethdb.NewLDBDatabase(dirname, 0, 0) + if err != nil { + panic("failed to create test database: " + err.Error()) + } + + return db, func() { + db.Close() + os.RemoveAll(dirname) + } +} + +var test_values = []string{"", "a", "1251", "\x00123\x00"} + +func TestLDB_PutGet(t *testing.T) { + db, remove := newTestLDB() + defer remove() + testPutGet(db, t) +} + +func TestMemoryDB_PutGet(t *testing.T) { + db, _ := ethdb.NewMemDatabase() + testPutGet(db, t) +} + +func testPutGet(db ethdb.Database, t *testing.T) { + t.Parallel() + + for _, v := range test_values { + err := db.Put([]byte(v), []byte(v)) + if err != nil { + t.Fatalf("put failed: %v", err) + } + } + + for _, v := range test_values { + data, err := db.Get([]byte(v)) + if err != nil { + t.Fatalf("get failed: %v", err) + } + if !bytes.Equal(data, []byte(v)) { + t.Fatalf("get returned wrong result, got %q expected %q", string(data), v) + } + } + + for _, v := range test_values { + err := db.Put([]byte(v), []byte("?")) + if err != nil { + t.Fatalf("put override failed: %v", err) + } + } + + for _, v := range test_values { + data, err := db.Get([]byte(v)) + if err != nil { + t.Fatalf("get failed: %v", err) + } + if !bytes.Equal(data, []byte("?")) { + t.Fatalf("get returned wrong result, got %q expected ?", string(data)) + } } - db, _ := NewLDBDatabase(file, 0, 0) - return db + for _, v := range test_values { + err := db.Delete([]byte(v)) + if err != nil { + t.Fatalf("delete %q failed: %v", v, err) + } + } + + for _, v := range test_values { + _, err := db.Get([]byte(v)) + if err == nil { + t.Fatalf("got deleted value %q", v) + } + } +} + +func TestLDB_ParallelPutGet(t *testing.T) { + db, remove := newTestLDB() + defer remove() + testParallelPutGet(db, t) +} + +func TestMemoryDB_ParallelPutGet(t *testing.T) { + db, _ := ethdb.NewMemDatabase() + testParallelPutGet(db, t) +} + +func testParallelPutGet(db ethdb.Database, t *testing.T) { + const n = 8 + var pending sync.WaitGroup + + pending.Add(n) + for i := 0; i < n; i++ { + go func(key string) { + defer pending.Done() + err := db.Put([]byte(key), []byte("v"+key)) + if err != nil { + panic("put failed: " + err.Error()) + } + }(strconv.Itoa(i)) + } + pending.Wait() + + pending.Add(n) + for i := 0; i < n; i++ { + go func(key string) { + defer pending.Done() + data, err := db.Get([]byte(key)) + if err != nil { + panic("get failed: " + err.Error()) + } + if !bytes.Equal(data, []byte("v"+key)) { + panic(fmt.Sprintf("get failed, got %q expected %q", []byte(data), []byte("v"+key))) + } + }(strconv.Itoa(i)) + } + pending.Wait() + + pending.Add(n) + for i := 0; i < n; i++ { + go func(key string) { + defer pending.Done() + err := db.Delete([]byte(key)) + if err != nil { + panic("delete failed: " + err.Error()) + } + }(strconv.Itoa(i)) + } + pending.Wait() + + pending.Add(n) + for i := 0; i < n; i++ { + go func(key string) { + defer pending.Done() + _, err := db.Get([]byte(key)) + if err == nil { + panic("get succeeded") + } + }(strconv.Itoa(i)) + } + pending.Wait() } diff --git a/rpc/server.go b/rpc/server.go index 62b84af34..30c288349 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -29,11 +29,7 @@ import ( "gopkg.in/fatih/set.v0" ) -const ( - notificationBufferSize = 10000 // max buffered notifications before codec is closed - - MetadataApi = "rpc" -) +const MetadataApi = "rpc" // CodecOption specifies which type of messages this codec supports type CodecOption int @@ -49,10 +45,9 @@ const ( // NewServer will create a new server instance with no registered handlers. func NewServer() *Server { server := &Server{ - services: make(serviceRegistry), - subscriptions: make(subscriptionRegistry), - codecs: set.New(), - run: 1, + services: make(serviceRegistry), + codecs: set.New(), + run: 1, } // register a default service which will provide meta information about the RPC service such as the services and @@ -124,16 +119,6 @@ func (s *Server) RegisterName(name string, rcvr interface{}) error { return nil } -// hasOption returns true if option is included in options, otherwise false -func hasOption(option CodecOption, options []CodecOption) bool { - for _, o := range options { - if option == o { - return true - } - } - return false -} - // serveRequest will reads requests from the codec, calls the RPC callback and // writes the response to the given codec. // @@ -148,13 +133,11 @@ func (s *Server) serveRequest(codec ServerCodec, singleShot bool, options CodecO const size = 64 << 10 buf := make([]byte, size) buf = buf[:runtime.Stack(buf, false)] - log.Error(fmt.Sprint(string(buf))) + log.Error(string(buf)) } s.codecsMu.Lock() s.codecs.Remove(codec) s.codecsMu.Unlock() - - return }() ctx, cancel := context.WithCancel(context.Background()) @@ -246,7 +229,7 @@ func (s *Server) ServeSingleRequest(codec ServerCodec, options CodecOption) { // close all codecs which will cancel pending requests/subscriptions. func (s *Server) Stop() { if atomic.CompareAndSwapInt32(&s.run, 1, 0) { - log.Debug(fmt.Sprint("RPC Server shutdown initiatied")) + log.Debug("RPC Server shutdown initiatied") s.codecsMu.Lock() defer s.codecsMu.Unlock() s.codecs.Each(func(c interface{}) bool { diff --git a/rpc/subscription.go b/rpc/subscription.go index 720e4dd06..6ce7befa1 100644 --- a/rpc/subscription.go +++ b/rpc/subscription.go @@ -53,7 +53,6 @@ type notifierKey struct{} type Notifier struct { codec ServerCodec subMu sync.RWMutex // guards active and inactive maps - stopped bool active map[ID]*Subscription inactive map[ID]*Subscription } diff --git a/rpc/subscription_test.go b/rpc/subscription_test.go index 0ed15ddfe..39f759692 100644 --- a/rpc/subscription_test.go +++ b/rpc/subscription_test.go @@ -165,7 +165,7 @@ func TestNotifications(t *testing.T) { } func waitForMessages(t *testing.T, in *json.Decoder, successes chan<- jsonSuccessResponse, - failures chan<- jsonErrResponse, notifications chan<- jsonNotification) { + failures chan<- jsonErrResponse, notifications chan<- jsonNotification, errors chan<- error) { // read and parse server messages for { @@ -177,12 +177,14 @@ func waitForMessages(t *testing.T, in *json.Decoder, successes chan<- jsonSucces var responses []map[string]interface{} if rmsg[0] == '[' { if err := json.Unmarshal(rmsg, &responses); err != nil { - t.Fatalf("Received invalid message: %s", rmsg) + errors <- fmt.Errorf("Received invalid message: %s", rmsg) + return } } else { var msg map[string]interface{} if err := json.Unmarshal(rmsg, &msg); err != nil { - t.Fatalf("Received invalid message: %s", rmsg) + errors <- fmt.Errorf("Received invalid message: %s", rmsg) + return } responses = append(responses, msg) } @@ -216,7 +218,7 @@ func waitForMessages(t *testing.T, in *json.Decoder, successes chan<- jsonSucces } continue } - t.Fatalf("Received invalid message: %s", msg) + errors <- fmt.Errorf("Received invalid message: %s", msg) } } } @@ -235,6 +237,8 @@ func TestSubscriptionMultipleNamespaces(t *testing.T) { successes = make(chan jsonSuccessResponse) failures = make(chan jsonErrResponse) notifications = make(chan jsonNotification) + + errors = make(chan error, 10) ) // setup and start server @@ -248,7 +252,7 @@ func TestSubscriptionMultipleNamespaces(t *testing.T) { defer server.Stop() // wait for message and write them to the given channels - go waitForMessages(t, in, successes, failures, notifications) + go waitForMessages(t, in, successes, failures, notifications, errors) // create subscriptions one by one n := 3 @@ -297,6 +301,8 @@ func TestSubscriptionMultipleNamespaces(t *testing.T) { } select { + case err := <-errors: + t.Fatal(err) case suc := <-successes: // subscription created subids[namespaces[int(suc.Id.(float64))]] = suc.Result.(string) case failure := <-failures: diff --git a/rpc/types.go b/rpc/types.go index a7b8c9788..f2375604e 100644 --- a/rpc/types.go +++ b/rpc/types.go @@ -48,7 +48,6 @@ type callback struct { // service represents a registered object type service struct { name string // name for service - rcvr reflect.Value // receiver of methods for the service typ reflect.Type // receiver type callbacks callbacks // registered handlers subscriptions subscriptions // available subscriptions/notifications @@ -58,23 +57,19 @@ type service struct { type serverRequest struct { id interface{} svcname string - rcvr reflect.Value callb *callback args []reflect.Value isUnsubscribe bool err Error } -type serviceRegistry map[string]*service // collection of services -type callbacks map[string]*callback // collection of RPC callbacks -type subscriptions map[string]*callback // collection of subscription callbacks -type subscriptionRegistry map[string]*callback // collection of subscription callbacks +type serviceRegistry map[string]*service // collection of services +type callbacks map[string]*callback // collection of RPC callbacks +type subscriptions map[string]*callback // collection of subscription callbacks // Server represents a RPC server type Server struct { - services serviceRegistry - muSubcriptions sync.Mutex // protects subscriptions - subscriptions subscriptionRegistry + services serviceRegistry run int32 codecsMu sync.Mutex diff --git a/rpc/utils.go b/rpc/utils.go index 2506c4833..9315cab59 100644 --- a/rpc/utils.go +++ b/rpc/utils.go @@ -119,21 +119,6 @@ func isHexNum(t reflect.Type) bool { return t == bigIntType } -var blockNumberType = reflect.TypeOf((*BlockNumber)(nil)).Elem() - -// Indication if the given block is a BlockNumber -func isBlockNumber(t reflect.Type) bool { - if t == nil { - return false - } - - for t.Kind() == reflect.Ptr { - t = t.Elem() - } - - return t == blockNumberType -} - // suitableCallbacks iterates over the methods of the given type. It will determine if a method satisfies the criteria // for a RPC callback or a subscription callback and adds it to the collection of callbacks or subscriptions. See server // documentation for a summary of these criteria. @@ -210,18 +195,12 @@ METHODS: } switch mtype.NumOut() { - case 0, 1: - break - case 2: - if h.errPos == -1 { // method must one return value and 1 error + case 0, 1, 2: + if mtype.NumOut() == 2 && h.errPos == -1 { // method must one return value and 1 error continue METHODS } - break - default: - continue METHODS + callbacks[mname] = &h } - - callbacks[mname] = &h } return callbacks, subscriptions |