aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--eth/api.go20
-rw-r--r--eth/downloader/downloader_test.go19
-rw-r--r--eth/filters/api.go1
-rw-r--r--eth/filters/filter.go3
-rw-r--r--eth/filters/filter_system.go1
-rw-r--r--eth/filters/filter_system_test.go7
-rw-r--r--eth/sync.go6
-rw-r--r--ethdb/database_test.go161
-rw-r--r--rpc/server.go29
-rw-r--r--rpc/subscription.go1
-rw-r--r--rpc/subscription_test.go16
-rw-r--r--rpc/types.go13
-rw-r--r--rpc/utils.go27
13 files changed, 193 insertions, 111 deletions
diff --git a/eth/api.go b/eth/api.go
index 0d90759b6..f5214fc37 100644
--- a/eth/api.go
+++ b/eth/api.go
@@ -465,26 +465,6 @@ func (api *PrivateDebugAPI) traceBlock(block *types.Block, logConfig *vm.LogConf
return true, structLogger.StructLogs(), nil
}
-// callmsg is the message type used for call transitions.
-type callmsg struct {
- addr common.Address
- to *common.Address
- gas, gasPrice *big.Int
- value *big.Int
- data []byte
-}
-
-// accessor boilerplate to implement core.Message
-func (m callmsg) From() (common.Address, error) { return m.addr, nil }
-func (m callmsg) FromFrontier() (common.Address, error) { return m.addr, nil }
-func (m callmsg) Nonce() uint64 { return 0 }
-func (m callmsg) CheckNonce() bool { return false }
-func (m callmsg) To() *common.Address { return m.to }
-func (m callmsg) GasPrice() *big.Int { return m.gasPrice }
-func (m callmsg) Gas() *big.Int { return m.gas }
-func (m callmsg) Value() *big.Int { return m.value }
-func (m callmsg) Data() []byte { return m.data }
-
// formatError formats a Go error into either an empty string or the data content
// of the error itself.
func formatError(err error) string {
diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go
index b354682a1..36e8c800f 100644
--- a/eth/downloader/downloader_test.go
+++ b/eth/downloader/downloader_test.go
@@ -403,8 +403,7 @@ func (dl *downloadTester) newSlowPeer(id string, version int, hashes []common.Ha
dl.lock.Lock()
defer dl.lock.Unlock()
- var err error
- err = dl.downloader.RegisterPeer(id, version, &downloadTesterPeer{dl, id, delay})
+ var err = dl.downloader.RegisterPeer(id, version, &downloadTesterPeer{dl, id, delay})
if err == nil {
// Assign the owned hashes, headers and blocks to the peer (deep copy)
dl.peerHashes[id] = make([]common.Hash, len(hashes))
@@ -1381,7 +1380,7 @@ func testSyncProgress(t *testing.T, protocol int, mode SyncMode) {
go func() {
defer pending.Done()
if err := tester.sync("peer-half", nil, mode); err != nil {
- t.Fatalf("failed to synchronise blocks: %v", err)
+ panic(fmt.Sprintf("failed to synchronise blocks: %v", err))
}
}()
<-starting
@@ -1398,7 +1397,7 @@ func testSyncProgress(t *testing.T, protocol int, mode SyncMode) {
go func() {
defer pending.Done()
if err := tester.sync("peer-full", nil, mode); err != nil {
- t.Fatalf("failed to synchronise blocks: %v", err)
+ panic(fmt.Sprintf("failed to synchronise blocks: %v", err))
}
}()
<-starting
@@ -1454,7 +1453,7 @@ func testForkedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
go func() {
defer pending.Done()
if err := tester.sync("fork A", nil, mode); err != nil {
- t.Fatalf("failed to synchronise blocks: %v", err)
+ panic(fmt.Sprintf("failed to synchronise blocks: %v", err))
}
}()
<-starting
@@ -1474,7 +1473,7 @@ func testForkedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
go func() {
defer pending.Done()
if err := tester.sync("fork B", nil, mode); err != nil {
- t.Fatalf("failed to synchronise blocks: %v", err)
+ panic(fmt.Sprintf("failed to synchronise blocks: %v", err))
}
}()
<-starting
@@ -1535,7 +1534,7 @@ func testFailedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
go func() {
defer pending.Done()
if err := tester.sync("faulty", nil, mode); err == nil {
- t.Fatalf("succeeded faulty synchronisation")
+ panic("succeeded faulty synchronisation")
}
}()
<-starting
@@ -1552,7 +1551,7 @@ func testFailedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
go func() {
defer pending.Done()
if err := tester.sync("valid", nil, mode); err != nil {
- t.Fatalf("failed to synchronise blocks: %v", err)
+ panic(fmt.Sprintf("failed to synchronise blocks: %v", err))
}
}()
<-starting
@@ -1613,7 +1612,7 @@ func testFakedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
go func() {
defer pending.Done()
if err := tester.sync("attack", nil, mode); err == nil {
- t.Fatalf("succeeded attacker synchronisation")
+ panic("succeeded attacker synchronisation")
}
}()
<-starting
@@ -1630,7 +1629,7 @@ func testFakedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
go func() {
defer pending.Done()
if err := tester.sync("valid", nil, mode); err != nil {
- t.Fatalf("failed to synchronise blocks: %v", err)
+ panic(fmt.Sprintf("failed to synchronise blocks: %v", err))
}
}()
<-starting
diff --git a/eth/filters/api.go b/eth/filters/api.go
index 61647a5d0..fff58a268 100644
--- a/eth/filters/api.go
+++ b/eth/filters/api.go
@@ -54,7 +54,6 @@ type PublicFilterAPI struct {
backend Backend
useMipMap bool
mux *event.TypeMux
- quit chan struct{}
chainDb ethdb.Database
events *EventSystem
filtersMu sync.Mutex
diff --git a/eth/filters/filter.go b/eth/filters/filter.go
index 0a0b81224..f27b76929 100644
--- a/eth/filters/filter.go
+++ b/eth/filters/filter.go
@@ -20,7 +20,6 @@ import (
"context"
"math"
"math/big"
- "time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
@@ -42,8 +41,6 @@ type Filter struct {
backend Backend
useMipMap bool
- created time.Time
-
db ethdb.Database
begin, end int64
addresses []common.Address
diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go
index 7abace1e6..ab0b7473e 100644
--- a/eth/filters/filter_system.go
+++ b/eth/filters/filter_system.go
@@ -74,7 +74,6 @@ type subscription struct {
// subscription which match the subscription criteria.
type EventSystem struct {
mux *event.TypeMux
- sub *event.TypeMuxSubscription
backend Backend
lightMode bool
lastHead *types.Header
diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go
index 822580b56..23e6d66e1 100644
--- a/eth/filters/filter_system_test.go
+++ b/eth/filters/filter_system_test.go
@@ -18,6 +18,7 @@ package filters
import (
"context"
+ "fmt"
"math/big"
"reflect"
"testing"
@@ -439,15 +440,15 @@ func TestPendingLogsSubscription(t *testing.T) {
}
if len(fetched) != len(tt.expected) {
- t.Fatalf("invalid number of logs for case %d, want %d log(s), got %d", i, len(tt.expected), len(fetched))
+ panic(fmt.Sprintf("invalid number of logs for case %d, want %d log(s), got %d", i, len(tt.expected), len(fetched)))
}
for l := range fetched {
if fetched[l].Removed {
- t.Errorf("expected log not to be removed for log %d in case %d", l, i)
+ panic(fmt.Sprintf("expected log not to be removed for log %d in case %d", l, i))
}
if !reflect.DeepEqual(fetched[l], tt.expected[l]) {
- t.Errorf("invalid log on index %d for case %d", l, i)
+ panic(fmt.Sprintf("invalid log on index %d for case %d", l, i))
}
}
}()
diff --git a/eth/sync.go b/eth/sync.go
index 8784b225d..7442f912c 100644
--- a/eth/sync.go
+++ b/eth/sync.go
@@ -138,7 +138,9 @@ func (pm *ProtocolManager) syncer() {
defer pm.downloader.Terminate()
// Wait for different events to fire synchronisation operations
- forceSync := time.Tick(forceSyncCycle)
+ forceSync := time.NewTicker(forceSyncCycle)
+ defer forceSync.Stop()
+
for {
select {
case <-pm.newPeerCh:
@@ -148,7 +150,7 @@ func (pm *ProtocolManager) syncer() {
}
go pm.synchronise(pm.peers.BestPeer())
- case <-forceSync:
+ case <-forceSync.C:
// Force a sync even if not enough peers are present
go pm.synchronise(pm.peers.BestPeer())
diff --git a/ethdb/database_test.go b/ethdb/database_test.go
index 0e69a1218..4740cdaed 100644
--- a/ethdb/database_test.go
+++ b/ethdb/database_test.go
@@ -14,21 +14,164 @@
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
-package ethdb
+package ethdb_test
import (
+ "bytes"
+ "fmt"
+ "io/ioutil"
"os"
- "path/filepath"
+ "strconv"
+ "sync"
+ "testing"
- "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/ethdb"
)
-func newDb() *LDBDatabase {
- file := filepath.Join("/", "tmp", "ldbtesttmpfile")
- if common.FileExist(file) {
- os.RemoveAll(file)
+func newTestLDB() (*ethdb.LDBDatabase, func()) {
+ dirname, err := ioutil.TempDir(os.TempDir(), "ethdb_test_")
+ if err != nil {
+ panic("failed to create test file: " + err.Error())
+ }
+ db, err := ethdb.NewLDBDatabase(dirname, 0, 0)
+ if err != nil {
+ panic("failed to create test database: " + err.Error())
+ }
+
+ return db, func() {
+ db.Close()
+ os.RemoveAll(dirname)
+ }
+}
+
+var test_values = []string{"", "a", "1251", "\x00123\x00"}
+
+func TestLDB_PutGet(t *testing.T) {
+ db, remove := newTestLDB()
+ defer remove()
+ testPutGet(db, t)
+}
+
+func TestMemoryDB_PutGet(t *testing.T) {
+ db, _ := ethdb.NewMemDatabase()
+ testPutGet(db, t)
+}
+
+func testPutGet(db ethdb.Database, t *testing.T) {
+ t.Parallel()
+
+ for _, v := range test_values {
+ err := db.Put([]byte(v), []byte(v))
+ if err != nil {
+ t.Fatalf("put failed: %v", err)
+ }
+ }
+
+ for _, v := range test_values {
+ data, err := db.Get([]byte(v))
+ if err != nil {
+ t.Fatalf("get failed: %v", err)
+ }
+ if !bytes.Equal(data, []byte(v)) {
+ t.Fatalf("get returned wrong result, got %q expected %q", string(data), v)
+ }
+ }
+
+ for _, v := range test_values {
+ err := db.Put([]byte(v), []byte("?"))
+ if err != nil {
+ t.Fatalf("put override failed: %v", err)
+ }
+ }
+
+ for _, v := range test_values {
+ data, err := db.Get([]byte(v))
+ if err != nil {
+ t.Fatalf("get failed: %v", err)
+ }
+ if !bytes.Equal(data, []byte("?")) {
+ t.Fatalf("get returned wrong result, got %q expected ?", string(data))
+ }
}
- db, _ := NewLDBDatabase(file, 0, 0)
- return db
+ for _, v := range test_values {
+ err := db.Delete([]byte(v))
+ if err != nil {
+ t.Fatalf("delete %q failed: %v", v, err)
+ }
+ }
+
+ for _, v := range test_values {
+ _, err := db.Get([]byte(v))
+ if err == nil {
+ t.Fatalf("got deleted value %q", v)
+ }
+ }
+}
+
+func TestLDB_ParallelPutGet(t *testing.T) {
+ db, remove := newTestLDB()
+ defer remove()
+ testParallelPutGet(db, t)
+}
+
+func TestMemoryDB_ParallelPutGet(t *testing.T) {
+ db, _ := ethdb.NewMemDatabase()
+ testParallelPutGet(db, t)
+}
+
+func testParallelPutGet(db ethdb.Database, t *testing.T) {
+ const n = 8
+ var pending sync.WaitGroup
+
+ pending.Add(n)
+ for i := 0; i < n; i++ {
+ go func(key string) {
+ defer pending.Done()
+ err := db.Put([]byte(key), []byte("v"+key))
+ if err != nil {
+ panic("put failed: " + err.Error())
+ }
+ }(strconv.Itoa(i))
+ }
+ pending.Wait()
+
+ pending.Add(n)
+ for i := 0; i < n; i++ {
+ go func(key string) {
+ defer pending.Done()
+ data, err := db.Get([]byte(key))
+ if err != nil {
+ panic("get failed: " + err.Error())
+ }
+ if !bytes.Equal(data, []byte("v"+key)) {
+ panic(fmt.Sprintf("get failed, got %q expected %q", []byte(data), []byte("v"+key)))
+ }
+ }(strconv.Itoa(i))
+ }
+ pending.Wait()
+
+ pending.Add(n)
+ for i := 0; i < n; i++ {
+ go func(key string) {
+ defer pending.Done()
+ err := db.Delete([]byte(key))
+ if err != nil {
+ panic("delete failed: " + err.Error())
+ }
+ }(strconv.Itoa(i))
+ }
+ pending.Wait()
+
+ pending.Add(n)
+ for i := 0; i < n; i++ {
+ go func(key string) {
+ defer pending.Done()
+ _, err := db.Get([]byte(key))
+ if err == nil {
+ panic("get succeeded")
+ }
+ }(strconv.Itoa(i))
+ }
+ pending.Wait()
}
diff --git a/rpc/server.go b/rpc/server.go
index 62b84af34..30c288349 100644
--- a/rpc/server.go
+++ b/rpc/server.go
@@ -29,11 +29,7 @@ import (
"gopkg.in/fatih/set.v0"
)
-const (
- notificationBufferSize = 10000 // max buffered notifications before codec is closed
-
- MetadataApi = "rpc"
-)
+const MetadataApi = "rpc"
// CodecOption specifies which type of messages this codec supports
type CodecOption int
@@ -49,10 +45,9 @@ const (
// NewServer will create a new server instance with no registered handlers.
func NewServer() *Server {
server := &Server{
- services: make(serviceRegistry),
- subscriptions: make(subscriptionRegistry),
- codecs: set.New(),
- run: 1,
+ services: make(serviceRegistry),
+ codecs: set.New(),
+ run: 1,
}
// register a default service which will provide meta information about the RPC service such as the services and
@@ -124,16 +119,6 @@ func (s *Server) RegisterName(name string, rcvr interface{}) error {
return nil
}
-// hasOption returns true if option is included in options, otherwise false
-func hasOption(option CodecOption, options []CodecOption) bool {
- for _, o := range options {
- if option == o {
- return true
- }
- }
- return false
-}
-
// serveRequest will reads requests from the codec, calls the RPC callback and
// writes the response to the given codec.
//
@@ -148,13 +133,11 @@ func (s *Server) serveRequest(codec ServerCodec, singleShot bool, options CodecO
const size = 64 << 10
buf := make([]byte, size)
buf = buf[:runtime.Stack(buf, false)]
- log.Error(fmt.Sprint(string(buf)))
+ log.Error(string(buf))
}
s.codecsMu.Lock()
s.codecs.Remove(codec)
s.codecsMu.Unlock()
-
- return
}()
ctx, cancel := context.WithCancel(context.Background())
@@ -246,7 +229,7 @@ func (s *Server) ServeSingleRequest(codec ServerCodec, options CodecOption) {
// close all codecs which will cancel pending requests/subscriptions.
func (s *Server) Stop() {
if atomic.CompareAndSwapInt32(&s.run, 1, 0) {
- log.Debug(fmt.Sprint("RPC Server shutdown initiatied"))
+ log.Debug("RPC Server shutdown initiatied")
s.codecsMu.Lock()
defer s.codecsMu.Unlock()
s.codecs.Each(func(c interface{}) bool {
diff --git a/rpc/subscription.go b/rpc/subscription.go
index 720e4dd06..6ce7befa1 100644
--- a/rpc/subscription.go
+++ b/rpc/subscription.go
@@ -53,7 +53,6 @@ type notifierKey struct{}
type Notifier struct {
codec ServerCodec
subMu sync.RWMutex // guards active and inactive maps
- stopped bool
active map[ID]*Subscription
inactive map[ID]*Subscription
}
diff --git a/rpc/subscription_test.go b/rpc/subscription_test.go
index 0ed15ddfe..39f759692 100644
--- a/rpc/subscription_test.go
+++ b/rpc/subscription_test.go
@@ -165,7 +165,7 @@ func TestNotifications(t *testing.T) {
}
func waitForMessages(t *testing.T, in *json.Decoder, successes chan<- jsonSuccessResponse,
- failures chan<- jsonErrResponse, notifications chan<- jsonNotification) {
+ failures chan<- jsonErrResponse, notifications chan<- jsonNotification, errors chan<- error) {
// read and parse server messages
for {
@@ -177,12 +177,14 @@ func waitForMessages(t *testing.T, in *json.Decoder, successes chan<- jsonSucces
var responses []map[string]interface{}
if rmsg[0] == '[' {
if err := json.Unmarshal(rmsg, &responses); err != nil {
- t.Fatalf("Received invalid message: %s", rmsg)
+ errors <- fmt.Errorf("Received invalid message: %s", rmsg)
+ return
}
} else {
var msg map[string]interface{}
if err := json.Unmarshal(rmsg, &msg); err != nil {
- t.Fatalf("Received invalid message: %s", rmsg)
+ errors <- fmt.Errorf("Received invalid message: %s", rmsg)
+ return
}
responses = append(responses, msg)
}
@@ -216,7 +218,7 @@ func waitForMessages(t *testing.T, in *json.Decoder, successes chan<- jsonSucces
}
continue
}
- t.Fatalf("Received invalid message: %s", msg)
+ errors <- fmt.Errorf("Received invalid message: %s", msg)
}
}
}
@@ -235,6 +237,8 @@ func TestSubscriptionMultipleNamespaces(t *testing.T) {
successes = make(chan jsonSuccessResponse)
failures = make(chan jsonErrResponse)
notifications = make(chan jsonNotification)
+
+ errors = make(chan error, 10)
)
// setup and start server
@@ -248,7 +252,7 @@ func TestSubscriptionMultipleNamespaces(t *testing.T) {
defer server.Stop()
// wait for message and write them to the given channels
- go waitForMessages(t, in, successes, failures, notifications)
+ go waitForMessages(t, in, successes, failures, notifications, errors)
// create subscriptions one by one
n := 3
@@ -297,6 +301,8 @@ func TestSubscriptionMultipleNamespaces(t *testing.T) {
}
select {
+ case err := <-errors:
+ t.Fatal(err)
case suc := <-successes: // subscription created
subids[namespaces[int(suc.Id.(float64))]] = suc.Result.(string)
case failure := <-failures:
diff --git a/rpc/types.go b/rpc/types.go
index a7b8c9788..f2375604e 100644
--- a/rpc/types.go
+++ b/rpc/types.go
@@ -48,7 +48,6 @@ type callback struct {
// service represents a registered object
type service struct {
name string // name for service
- rcvr reflect.Value // receiver of methods for the service
typ reflect.Type // receiver type
callbacks callbacks // registered handlers
subscriptions subscriptions // available subscriptions/notifications
@@ -58,23 +57,19 @@ type service struct {
type serverRequest struct {
id interface{}
svcname string
- rcvr reflect.Value
callb *callback
args []reflect.Value
isUnsubscribe bool
err Error
}
-type serviceRegistry map[string]*service // collection of services
-type callbacks map[string]*callback // collection of RPC callbacks
-type subscriptions map[string]*callback // collection of subscription callbacks
-type subscriptionRegistry map[string]*callback // collection of subscription callbacks
+type serviceRegistry map[string]*service // collection of services
+type callbacks map[string]*callback // collection of RPC callbacks
+type subscriptions map[string]*callback // collection of subscription callbacks
// Server represents a RPC server
type Server struct {
- services serviceRegistry
- muSubcriptions sync.Mutex // protects subscriptions
- subscriptions subscriptionRegistry
+ services serviceRegistry
run int32
codecsMu sync.Mutex
diff --git a/rpc/utils.go b/rpc/utils.go
index 2506c4833..9315cab59 100644
--- a/rpc/utils.go
+++ b/rpc/utils.go
@@ -119,21 +119,6 @@ func isHexNum(t reflect.Type) bool {
return t == bigIntType
}
-var blockNumberType = reflect.TypeOf((*BlockNumber)(nil)).Elem()
-
-// Indication if the given block is a BlockNumber
-func isBlockNumber(t reflect.Type) bool {
- if t == nil {
- return false
- }
-
- for t.Kind() == reflect.Ptr {
- t = t.Elem()
- }
-
- return t == blockNumberType
-}
-
// suitableCallbacks iterates over the methods of the given type. It will determine if a method satisfies the criteria
// for a RPC callback or a subscription callback and adds it to the collection of callbacks or subscriptions. See server
// documentation for a summary of these criteria.
@@ -210,18 +195,12 @@ METHODS:
}
switch mtype.NumOut() {
- case 0, 1:
- break
- case 2:
- if h.errPos == -1 { // method must one return value and 1 error
+ case 0, 1, 2:
+ if mtype.NumOut() == 2 && h.errPos == -1 { // method must one return value and 1 error
continue METHODS
}
- break
- default:
- continue METHODS
+ callbacks[mname] = &h
}
-
- callbacks[mname] = &h
}
return callbacks, subscriptions