aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--.travis.yml4
-rw-r--r--VERSION2
-rw-r--r--accounts/abi/argument.go44
-rw-r--r--accounts/abi/bind/backends/simulated.go2
-rw-r--r--accounts/abi/event_test.go81
-rw-r--r--accounts/abi/reflect.go104
-rw-r--r--appveyor.yml4
-rw-r--r--core/events.go7
-rw-r--r--core/tx_journal.go31
-rw-r--r--core/tx_pool.go30
-rw-r--r--core/tx_pool_test.go32
-rw-r--r--eth/api_backend.go4
-rw-r--r--eth/filters/api.go18
-rw-r--r--eth/filters/filter.go2
-rw-r--r--eth/filters/filter_system.go40
-rw-r--r--eth/filters/filter_system_test.go7
-rw-r--r--eth/handler.go41
-rw-r--r--eth/helper_test.go2
-rw-r--r--eth/protocol.go6
-rw-r--r--eth/protocol_test.go2
-rw-r--r--ethstats/ethstats.go12
-rw-r--r--internal/ethapi/backend.go2
-rw-r--r--les/api_backend.go4
-rw-r--r--light/postprocess.go16
-rw-r--r--light/txpool.go6
-rw-r--r--miner/worker.go45
-rw-r--r--p2p/enr/enr.go159
-rw-r--r--p2p/enr/enr_test.go66
-rw-r--r--p2p/enr/entries.go56
-rw-r--r--p2p/enr/idscheme.go114
-rw-r--r--p2p/enr/idscheme_test.go36
-rw-r--r--params/version.go2
32 files changed, 657 insertions, 324 deletions
diff --git a/.travis.yml b/.travis.yml
index ce6500c46..6fb70f27d 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -126,7 +126,7 @@ matrix:
# This builder does the Android Maven and Azure uploads
- os: linux
- dist: precise # Needed for the android tools
+ dist: trusty
addons:
apt:
packages:
@@ -146,7 +146,7 @@ matrix:
git:
submodules: false # avoid cloning ethereum/tests
before_install:
- - curl https://storage.googleapis.com/golang/go1.10.1.linux-amd64.tar.gz | tar -xz
+ - curl https://storage.googleapis.com/golang/go1.10.2.linux-amd64.tar.gz | tar -xz
- export PATH=`pwd`/go/bin:$PATH
- export GOROOT=`pwd`/go
- export GOPATH=$HOME/go
diff --git a/VERSION b/VERSION
index 1790d3598..53ed4ba51 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-1.8.8
+1.8.9
diff --git a/accounts/abi/argument.go b/accounts/abi/argument.go
index 512d8fdfa..93b513c34 100644
--- a/accounts/abi/argument.go
+++ b/accounts/abi/argument.go
@@ -111,9 +111,14 @@ func (arguments Arguments) unpackTuple(v interface{}, marshalledValues []interfa
if err := requireUnpackKind(value, typ, kind, arguments); err != nil {
return err
}
- // If the output interface is a struct, make sure names don't collide
+
+ // If the interface is a struct, get of abi->struct_field mapping
+
+ var abi2struct map[string]string
if kind == reflect.Struct {
- if err := requireUniqueStructFieldNames(arguments); err != nil {
+ var err error
+ abi2struct, err = mapAbiToStructFields(arguments, value)
+ if err != nil {
return err
}
}
@@ -123,9 +128,10 @@ func (arguments Arguments) unpackTuple(v interface{}, marshalledValues []interfa
switch kind {
case reflect.Struct:
- err := unpackStruct(value, reflectValue, arg)
- if err != nil {
- return err
+ if structField, ok := abi2struct[arg.Name]; ok {
+ if err := set(value.FieldByName(structField), reflectValue, arg); err != nil {
+ return err
+ }
}
case reflect.Slice, reflect.Array:
if value.Len() < i {
@@ -151,17 +157,22 @@ func (arguments Arguments) unpackAtomic(v interface{}, marshalledValues []interf
if len(marshalledValues) != 1 {
return fmt.Errorf("abi: wrong length, expected single value, got %d", len(marshalledValues))
}
+
elem := reflect.ValueOf(v).Elem()
kind := elem.Kind()
reflectValue := reflect.ValueOf(marshalledValues[0])
+ var abi2struct map[string]string
if kind == reflect.Struct {
- //make sure names don't collide
- if err := requireUniqueStructFieldNames(arguments); err != nil {
+ var err error
+ if abi2struct, err = mapAbiToStructFields(arguments, elem); err != nil {
return err
}
-
- return unpackStruct(elem, reflectValue, arguments[0])
+ arg := arguments.NonIndexed()[0]
+ if structField, ok := abi2struct[arg.Name]; ok {
+ return set(elem.FieldByName(structField), reflectValue, arg)
+ }
+ return nil
}
return set(elem, reflectValue, arguments.NonIndexed()[0])
@@ -277,18 +288,3 @@ func capitalise(input string) string {
}
return strings.ToUpper(input[:1]) + input[1:]
}
-
-//unpackStruct extracts each argument into its corresponding struct field
-func unpackStruct(value, reflectValue reflect.Value, arg Argument) error {
- name := capitalise(arg.Name)
- typ := value.Type()
- for j := 0; j < typ.NumField(); j++ {
- // TODO read tags: `abi:"fieldName"`
- if typ.Field(j).Name == name {
- if err := set(value.Field(j), reflectValue, arg); err != nil {
- return err
- }
- }
- }
- return nil
-}
diff --git a/accounts/abi/bind/backends/simulated.go b/accounts/abi/bind/backends/simulated.go
index 7b605e1c1..fd69538d5 100644
--- a/accounts/abi/bind/backends/simulated.go
+++ b/accounts/abi/bind/backends/simulated.go
@@ -454,7 +454,7 @@ func (fb *filterBackend) GetLogs(ctx context.Context, hash common.Hash) ([][]*ty
return logs, nil
}
-func (fb *filterBackend) SubscribeTxPreEvent(ch chan<- core.TxPreEvent) event.Subscription {
+func (fb *filterBackend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription {
return event.NewSubscription(func(quit <-chan struct{}) error {
<-quit
return nil
diff --git a/accounts/abi/event_test.go b/accounts/abi/event_test.go
index cca61e433..3bfdd7c0a 100644
--- a/accounts/abi/event_test.go
+++ b/accounts/abi/event_test.go
@@ -58,12 +58,28 @@ var jsonEventPledge = []byte(`{
"type": "event"
}`)
+var jsonEventMixedCase = []byte(`{
+ "anonymous": false,
+ "inputs": [{
+ "indexed": false, "name": "value", "type": "uint256"
+ }, {
+ "indexed": false, "name": "_value", "type": "uint256"
+ }, {
+ "indexed": false, "name": "Value", "type": "uint256"
+ }],
+ "name": "MixedCase",
+ "type": "event"
+ }`)
+
// 1000000
var transferData1 = "00000000000000000000000000000000000000000000000000000000000f4240"
// "0x00Ce0d46d924CC8437c806721496599FC3FFA268", 2218516807680, "usd"
var pledgeData1 = "00000000000000000000000000ce0d46d924cc8437c806721496599fc3ffa2680000000000000000000000000000000000000000000000000000020489e800007573640000000000000000000000000000000000000000000000000000000000"
+// 1000000,2218516807680,1000001
+var mixedCaseData1 = "00000000000000000000000000000000000000000000000000000000000f42400000000000000000000000000000000000000000000000000000020489e8000000000000000000000000000000000000000000000000000000000000000f4241"
+
func TestEventId(t *testing.T) {
var table = []struct {
definition string
@@ -121,6 +137,27 @@ func TestEventTupleUnpack(t *testing.T) {
Value *big.Int
}
+ type EventTransferWithTag struct {
+ // this is valid because `value` is not exportable,
+ // so value is only unmarshalled into `Value1`.
+ value *big.Int
+ Value1 *big.Int `abi:"value"`
+ }
+
+ type BadEventTransferWithSameFieldAndTag struct {
+ Value *big.Int
+ Value1 *big.Int `abi:"value"`
+ }
+
+ type BadEventTransferWithDuplicatedTag struct {
+ Value1 *big.Int `abi:"value"`
+ Value2 *big.Int `abi:"value"`
+ }
+
+ type BadEventTransferWithEmptyTag struct {
+ Value *big.Int `abi:""`
+ }
+
type EventPledge struct {
Who common.Address
Wad *big.Int
@@ -133,9 +170,16 @@ func TestEventTupleUnpack(t *testing.T) {
Currency [3]byte
}
+ type EventMixedCase struct {
+ Value1 *big.Int `abi:"value"`
+ Value2 *big.Int `abi:"_value"`
+ Value3 *big.Int `abi:"Value"`
+ }
+
bigint := new(big.Int)
bigintExpected := big.NewInt(1000000)
bigintExpected2 := big.NewInt(2218516807680)
+ bigintExpected3 := big.NewInt(1000001)
addr := common.HexToAddress("0x00Ce0d46d924CC8437c806721496599FC3FFA268")
var testCases = []struct {
data string
@@ -159,6 +203,34 @@ func TestEventTupleUnpack(t *testing.T) {
"",
"Can unpack ERC20 Transfer event into slice",
}, {
+ transferData1,
+ &EventTransferWithTag{},
+ &EventTransferWithTag{Value1: bigintExpected},
+ jsonEventTransfer,
+ "",
+ "Can unpack ERC20 Transfer event into structure with abi: tag",
+ }, {
+ transferData1,
+ &BadEventTransferWithDuplicatedTag{},
+ &BadEventTransferWithDuplicatedTag{},
+ jsonEventTransfer,
+ "struct: abi tag in 'Value2' already mapped",
+ "Can not unpack ERC20 Transfer event with duplicated abi tag",
+ }, {
+ transferData1,
+ &BadEventTransferWithSameFieldAndTag{},
+ &BadEventTransferWithSameFieldAndTag{},
+ jsonEventTransfer,
+ "abi: multiple variables maps to the same abi field 'value'",
+ "Can not unpack ERC20 Transfer event with a field and a tag mapping to the same abi variable",
+ }, {
+ transferData1,
+ &BadEventTransferWithEmptyTag{},
+ &BadEventTransferWithEmptyTag{},
+ jsonEventTransfer,
+ "struct: abi tag in 'Value' is empty",
+ "Can not unpack ERC20 Transfer event with an empty tag",
+ }, {
pledgeData1,
&EventPledge{},
&EventPledge{
@@ -216,6 +288,13 @@ func TestEventTupleUnpack(t *testing.T) {
jsonEventPledge,
"abi: cannot unmarshal tuple into map[string]interface {}",
"Can not unpack Pledge event into map",
+ }, {
+ mixedCaseData1,
+ &EventMixedCase{},
+ &EventMixedCase{Value1: bigintExpected, Value2: bigintExpected2, Value3: bigintExpected3},
+ jsonEventMixedCase,
+ "",
+ "Can unpack abi variables with mixed case",
}}
for _, tc := range testCases {
@@ -227,7 +306,7 @@ func TestEventTupleUnpack(t *testing.T) {
assert.Nil(err, "Should be able to unpack event data.")
assert.Equal(tc.expected, tc.dest, tc.name)
} else {
- assert.EqualError(err, tc.error)
+ assert.EqualError(err, tc.error, tc.name)
}
})
}
diff --git a/accounts/abi/reflect.go b/accounts/abi/reflect.go
index 5620a7084..0193517a4 100644
--- a/accounts/abi/reflect.go
+++ b/accounts/abi/reflect.go
@@ -19,6 +19,7 @@ package abi
import (
"fmt"
"reflect"
+ "strings"
)
// indirect recursively dereferences the value until it either gets the value
@@ -111,18 +112,101 @@ func requireUnpackKind(v reflect.Value, t reflect.Type, k reflect.Kind,
return nil
}
-// requireUniqueStructFieldNames makes sure field names don't collide
-func requireUniqueStructFieldNames(args Arguments) error {
- exists := make(map[string]bool)
+// mapAbiToStringField maps abi to struct fields.
+// first round: for each Exportable field that contains a `abi:""` tag
+// and this field name exists in the arguments, pair them together.
+// second round: for each argument field that has not been already linked,
+// find what variable is expected to be mapped into, if it exists and has not been
+// used, pair them.
+func mapAbiToStructFields(args Arguments, value reflect.Value) (map[string]string, error) {
+
+ typ := value.Type()
+
+ abi2struct := make(map[string]string)
+ struct2abi := make(map[string]string)
+
+ // first round ~~~
+ for i := 0; i < typ.NumField(); i++ {
+ structFieldName := typ.Field(i).Name
+
+ // skip private struct fields.
+ if structFieldName[:1] != strings.ToUpper(structFieldName[:1]) {
+ continue
+ }
+
+ // skip fields that have no abi:"" tag.
+ var ok bool
+ var tagName string
+ if tagName, ok = typ.Field(i).Tag.Lookup("abi"); !ok {
+ continue
+ }
+
+ // check if tag is empty.
+ if tagName == "" {
+ return nil, fmt.Errorf("struct: abi tag in '%s' is empty", structFieldName)
+ }
+
+ // check which argument field matches with the abi tag.
+ found := false
+ for _, abiField := range args.NonIndexed() {
+ if abiField.Name == tagName {
+ if abi2struct[abiField.Name] != "" {
+ return nil, fmt.Errorf("struct: abi tag in '%s' already mapped", structFieldName)
+ }
+ // pair them
+ abi2struct[abiField.Name] = structFieldName
+ struct2abi[structFieldName] = abiField.Name
+ found = true
+ }
+ }
+
+ // check if this tag has been mapped.
+ if !found {
+ return nil, fmt.Errorf("struct: abi tag '%s' defined but not found in abi", tagName)
+ }
+
+ }
+
+ // second round ~~~
for _, arg := range args {
- field := capitalise(arg.Name)
- if field == "" {
- return fmt.Errorf("abi: purely underscored output cannot unpack to struct")
+
+ abiFieldName := arg.Name
+ structFieldName := capitalise(abiFieldName)
+
+ if structFieldName == "" {
+ return nil, fmt.Errorf("abi: purely underscored output cannot unpack to struct")
}
- if exists[field] {
- return fmt.Errorf("abi: multiple outputs mapping to the same struct field '%s'", field)
+
+ // this abi has already been paired, skip it... unless there exists another, yet unassigned
+ // struct field with the same field name. If so, raise an error:
+ // abi: [ { "name": "value" } ]
+ // struct { Value *big.Int , Value1 *big.Int `abi:"value"`}
+ if abi2struct[abiFieldName] != "" {
+ if abi2struct[abiFieldName] != structFieldName &&
+ struct2abi[structFieldName] == "" &&
+ value.FieldByName(structFieldName).IsValid() {
+ return nil, fmt.Errorf("abi: multiple variables maps to the same abi field '%s'", abiFieldName)
+ }
+ continue
}
- exists[field] = true
+
+ // return an error if this struct field has already been paired.
+ if struct2abi[structFieldName] != "" {
+ return nil, fmt.Errorf("abi: multiple outputs mapping to the same struct field '%s'", structFieldName)
+ }
+
+ if value.FieldByName(structFieldName).IsValid() {
+ // pair them
+ abi2struct[abiFieldName] = structFieldName
+ struct2abi[structFieldName] = abiFieldName
+ } else {
+ // not paired, but annotate as used, to detect cases like
+ // abi : [ { "name": "value" }, { "name": "_value" } ]
+ // struct { Value *big.Int }
+ struct2abi[structFieldName] = abiFieldName
+ }
+
}
- return nil
+
+ return abi2struct, nil
}
diff --git a/appveyor.yml b/appveyor.yml
index 141ef16ff..f10000af8 100644
--- a/appveyor.yml
+++ b/appveyor.yml
@@ -23,8 +23,8 @@ environment:
install:
- git submodule update --init
- rmdir C:\go /s /q
- - appveyor DownloadFile https://storage.googleapis.com/golang/go1.10.1.windows-%GETH_ARCH%.zip
- - 7z x go1.10.1.windows-%GETH_ARCH%.zip -y -oC:\ > NUL
+ - appveyor DownloadFile https://storage.googleapis.com/golang/go1.10.2.windows-%GETH_ARCH%.zip
+ - 7z x go1.10.2.windows-%GETH_ARCH%.zip -y -oC:\ > NUL
- go version
- gcc --version
diff --git a/core/events.go b/core/events.go
index 6f404f612..8d200f2a2 100644
--- a/core/events.go
+++ b/core/events.go
@@ -21,8 +21,8 @@ import (
"github.com/ethereum/go-ethereum/core/types"
)
-// TxPreEvent is posted when a transaction enters the transaction pool.
-type TxPreEvent struct{ Tx *types.Transaction }
+// NewTxsEvent is posted when a batch of transactions enter the transaction pool.
+type NewTxsEvent struct{ Txs []*types.Transaction }
// PendingLogsEvent is posted pre mining and notifies of pending logs.
type PendingLogsEvent struct {
@@ -35,9 +35,6 @@ type PendingStateEvent struct{}
// NewMinedBlockEvent is posted when a block has been imported.
type NewMinedBlockEvent struct{ Block *types.Block }
-// RemovedTransactionEvent is posted when a reorg happens
-type RemovedTransactionEvent struct{ Txs types.Transactions }
-
// RemovedLogsEvent is posted when a reorg happens
type RemovedLogsEvent struct{ Logs []*types.Log }
diff --git a/core/tx_journal.go b/core/tx_journal.go
index e872d7b53..1397e9fd3 100644
--- a/core/tx_journal.go
+++ b/core/tx_journal.go
@@ -56,7 +56,7 @@ func newTxJournal(path string) *txJournal {
// load parses a transaction journal dump from disk, loading its contents into
// the specified pool.
-func (journal *txJournal) load(add func(*types.Transaction) error) error {
+func (journal *txJournal) load(add func([]*types.Transaction) []error) error {
// Skip the parsing if the journal file doens't exist at all
if _, err := os.Stat(journal.path); os.IsNotExist(err) {
return nil
@@ -76,7 +76,21 @@ func (journal *txJournal) load(add func(*types.Transaction) error) error {
stream := rlp.NewStream(input, 0)
total, dropped := 0, 0
- var failure error
+ // Create a method to load a limited batch of transactions and bump the
+ // appropriate progress counters. Then use this method to load all the
+ // journalled transactions in small-ish batches.
+ loadBatch := func(txs types.Transactions) {
+ for _, err := range add(txs) {
+ if err != nil {
+ log.Debug("Failed to add journaled transaction", "err", err)
+ dropped++
+ }
+ }
+ }
+ var (
+ failure error
+ batch types.Transactions
+ )
for {
// Parse the next transaction and terminate on error
tx := new(types.Transaction)
@@ -84,14 +98,17 @@ func (journal *txJournal) load(add func(*types.Transaction) error) error {
if err != io.EOF {
failure = err
}
+ if batch.Len() > 0 {
+ loadBatch(batch)
+ }
break
}
- // Import the transaction and bump the appropriate progress counters
+ // New transaction parsed, queue up for later, import if threnshold is reached
total++
- if err = add(tx); err != nil {
- log.Debug("Failed to add journaled transaction", "err", err)
- dropped++
- continue
+
+ if batch = append(batch, tx); batch.Len() > 1024 {
+ loadBatch(batch)
+ batch = batch[:0]
}
}
log.Info("Loaded local transaction journal", "transactions", total, "dropped", dropped)
diff --git a/core/tx_pool.go b/core/tx_pool.go
index 388b40058..f89e11441 100644
--- a/core/tx_pool.go
+++ b/core/tx_pool.go
@@ -238,7 +238,7 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block
if !config.NoLocals && config.Journal != "" {
pool.journal = newTxJournal(config.Journal)
- if err := pool.journal.load(pool.AddLocal); err != nil {
+ if err := pool.journal.load(pool.AddLocals); err != nil {
log.Warn("Failed to load transaction journal", "err", err)
}
if err := pool.journal.rotate(pool.local()); err != nil {
@@ -444,9 +444,9 @@ func (pool *TxPool) Stop() {
log.Info("Transaction pool stopped")
}
-// SubscribeTxPreEvent registers a subscription of TxPreEvent and
+// SubscribeNewTxsEvent registers a subscription of NewTxsEvent and
// starts sending event to the given channel.
-func (pool *TxPool) SubscribeTxPreEvent(ch chan<- TxPreEvent) event.Subscription {
+func (pool *TxPool) SubscribeNewTxsEvent(ch chan<- NewTxsEvent) event.Subscription {
return pool.scope.Track(pool.txFeed.Subscribe(ch))
}
@@ -653,7 +653,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) {
log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To())
// We've directly injected a replacement transaction, notify subsystems
- go pool.txFeed.Send(TxPreEvent{tx})
+ go pool.txFeed.Send(NewTxsEvent{types.Transactions{tx}})
return old != nil, nil
}
@@ -712,10 +712,11 @@ func (pool *TxPool) journalTx(from common.Address, tx *types.Transaction) {
}
}
-// promoteTx adds a transaction to the pending (processable) list of transactions.
+// promoteTx adds a transaction to the pending (processable) list of transactions
+// and returns whether it was inserted or an older was better.
//
// Note, this method assumes the pool lock is held!
-func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.Transaction) {
+func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.Transaction) bool {
// Try to insert the transaction into the pending queue
if pool.pending[addr] == nil {
pool.pending[addr] = newTxList(true)
@@ -729,7 +730,7 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T
pool.priced.Removed()
pendingDiscardCounter.Inc(1)
- return
+ return false
}
// Otherwise discard any previous transaction and mark this
if old != nil {
@@ -747,7 +748,7 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T
pool.beats[addr] = time.Now()
pool.pendingState.SetNonce(addr, tx.Nonce()+1)
- go pool.txFeed.Send(TxPreEvent{tx})
+ return true
}
// AddLocal enqueues a single transaction into the pool if it is valid, marking
@@ -907,6 +908,9 @@ func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) {
// future queue to the set of pending transactions. During this process, all
// invalidated transactions (low nonce, low balance) are deleted.
func (pool *TxPool) promoteExecutables(accounts []common.Address) {
+ // Track the promoted transactions to broadcast them at once
+ var promoted []*types.Transaction
+
// Gather all the accounts potentially needing updates
if accounts == nil {
accounts = make([]common.Address, 0, len(pool.queue))
@@ -939,8 +943,10 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
// Gather all executable transactions and promote them
for _, tx := range list.Ready(pool.pendingState.GetNonce(addr)) {
hash := tx.Hash()
- log.Trace("Promoting queued transaction", "hash", hash)
- pool.promoteTx(addr, hash, tx)
+ if pool.promoteTx(addr, hash, tx) {
+ log.Trace("Promoting queued transaction", "hash", hash)
+ promoted = append(promoted, tx)
+ }
}
// Drop all transactions over the allowed limit
if !pool.locals.contains(addr) {
@@ -957,6 +963,10 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
delete(pool.queue, addr)
}
}
+ // Notify subsystem for new promoted transactions.
+ if len(promoted) > 0 {
+ pool.txFeed.Send(NewTxsEvent{promoted})
+ }
// If the pending limit is overflown, start equalizing allowances
pending := uint64(0)
for _, list := range pool.pending {
diff --git a/core/tx_pool_test.go b/core/tx_pool_test.go
index e7f52075e..25993c258 100644
--- a/core/tx_pool_test.go
+++ b/core/tx_pool_test.go
@@ -118,21 +118,27 @@ func validateTxPoolInternals(pool *TxPool) error {
// validateEvents checks that the correct number of transaction addition events
// were fired on the pool's event feed.
-func validateEvents(events chan TxPreEvent, count int) error {
- for i := 0; i < count; i++ {
+func validateEvents(events chan NewTxsEvent, count int) error {
+ var received []*types.Transaction
+
+ for len(received) < count {
select {
- case <-events:
+ case ev := <-events:
+ received = append(received, ev.Txs...)
case <-time.After(time.Second):
- return fmt.Errorf("event #%d not fired", i)
+ return fmt.Errorf("event #%d not fired", received)
}
}
+ if len(received) > count {
+ return fmt.Errorf("more than %d events fired: %v", count, received[count:])
+ }
select {
- case tx := <-events:
- return fmt.Errorf("more than %d events fired: %v", count, tx.Tx)
+ case ev := <-events:
+ return fmt.Errorf("more than %d events fired: %v", count, ev.Txs)
case <-time.After(50 * time.Millisecond):
// This branch should be "default", but it's a data race between goroutines,
- // reading the event channel and pushng into it, so better wait a bit ensuring
+ // reading the event channel and pushing into it, so better wait a bit ensuring
// really nothing gets injected.
}
return nil
@@ -669,7 +675,7 @@ func TestTransactionGapFilling(t *testing.T) {
pool.currentState.AddBalance(account, big.NewInt(1000000))
// Keep track of transaction events to ensure all executables get announced
- events := make(chan TxPreEvent, testTxPoolConfig.AccountQueue+5)
+ events := make(chan NewTxsEvent, testTxPoolConfig.AccountQueue+5)
sub := pool.txFeed.Subscribe(events)
defer sub.Unsubscribe()
@@ -920,7 +926,7 @@ func TestTransactionPendingLimiting(t *testing.T) {
pool.currentState.AddBalance(account, big.NewInt(1000000))
// Keep track of transaction events to ensure all executables get announced
- events := make(chan TxPreEvent, testTxPoolConfig.AccountQueue+5)
+ events := make(chan NewTxsEvent, testTxPoolConfig.AccountQueue+5)
sub := pool.txFeed.Subscribe(events)
defer sub.Unsubscribe()
@@ -1140,7 +1146,7 @@ func TestTransactionPoolRepricing(t *testing.T) {
defer pool.Stop()
// Keep track of transaction events to ensure all executables get announced
- events := make(chan TxPreEvent, 32)
+ events := make(chan NewTxsEvent, 32)
sub := pool.txFeed.Subscribe(events)
defer sub.Unsubscribe()
@@ -1327,7 +1333,7 @@ func TestTransactionPoolUnderpricing(t *testing.T) {
defer pool.Stop()
// Keep track of transaction events to ensure all executables get announced
- events := make(chan TxPreEvent, 32)
+ events := make(chan NewTxsEvent, 32)
sub := pool.txFeed.Subscribe(events)
defer sub.Unsubscribe()
@@ -1433,7 +1439,7 @@ func TestTransactionPoolStableUnderpricing(t *testing.T) {
defer pool.Stop()
// Keep track of transaction events to ensure all executables get announced
- events := make(chan TxPreEvent, 32)
+ events := make(chan NewTxsEvent, 32)
sub := pool.txFeed.Subscribe(events)
defer sub.Unsubscribe()
@@ -1495,7 +1501,7 @@ func TestTransactionReplacement(t *testing.T) {
defer pool.Stop()
// Keep track of transaction events to ensure all executables get announced
- events := make(chan TxPreEvent, 32)
+ events := make(chan NewTxsEvent, 32)
sub := pool.txFeed.Subscribe(events)
defer sub.Unsubscribe()
diff --git a/eth/api_backend.go b/eth/api_backend.go
index 4ace9b594..91a7dc7e0 100644
--- a/eth/api_backend.go
+++ b/eth/api_backend.go
@@ -188,8 +188,8 @@ func (b *EthAPIBackend) TxPoolContent() (map[common.Address]types.Transactions,
return b.eth.TxPool().Content()
}
-func (b *EthAPIBackend) SubscribeTxPreEvent(ch chan<- core.TxPreEvent) event.Subscription {
- return b.eth.TxPool().SubscribeTxPreEvent(ch)
+func (b *EthAPIBackend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription {
+ return b.eth.TxPool().SubscribeNewTxsEvent(ch)
}
func (b *EthAPIBackend) Downloader() *downloader.Downloader {
diff --git a/eth/filters/api.go b/eth/filters/api.go
index 1297b7478..592ad3b82 100644
--- a/eth/filters/api.go
+++ b/eth/filters/api.go
@@ -104,8 +104,8 @@ func (api *PublicFilterAPI) timeoutLoop() {
// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newpendingtransactionfilter
func (api *PublicFilterAPI) NewPendingTransactionFilter() rpc.ID {
var (
- pendingTxs = make(chan common.Hash)
- pendingTxSub = api.events.SubscribePendingTxEvents(pendingTxs)
+ pendingTxs = make(chan []common.Hash)
+ pendingTxSub = api.events.SubscribePendingTxs(pendingTxs)
)
api.filtersMu.Lock()
@@ -118,7 +118,7 @@ func (api *PublicFilterAPI) NewPendingTransactionFilter() rpc.ID {
case ph := <-pendingTxs:
api.filtersMu.Lock()
if f, found := api.filters[pendingTxSub.ID]; found {
- f.hashes = append(f.hashes, ph)
+ f.hashes = append(f.hashes, ph...)
}
api.filtersMu.Unlock()
case <-pendingTxSub.Err():
@@ -144,13 +144,17 @@ func (api *PublicFilterAPI) NewPendingTransactions(ctx context.Context) (*rpc.Su
rpcSub := notifier.CreateSubscription()
go func() {
- txHashes := make(chan common.Hash)
- pendingTxSub := api.events.SubscribePendingTxEvents(txHashes)
+ txHashes := make(chan []common.Hash, 128)
+ pendingTxSub := api.events.SubscribePendingTxs(txHashes)
for {
select {
- case h := <-txHashes:
- notifier.Notify(rpcSub.ID, h)
+ case hashes := <-txHashes:
+ // To keep the original behaviour, send a single tx hash in one notification.
+ // TODO(rjl493456442) Send a batch of tx hashes in one notification
+ for _, h := range hashes {
+ notifier.Notify(rpcSub.ID, h)
+ }
case <-rpcSub.Err():
pendingTxSub.Unsubscribe()
return
diff --git a/eth/filters/filter.go b/eth/filters/filter.go
index 5dfe60e77..67b4612ae 100644
--- a/eth/filters/filter.go
+++ b/eth/filters/filter.go
@@ -36,7 +36,7 @@ type Backend interface {
GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error)
GetLogs(ctx context.Context, blockHash common.Hash) ([][]*types.Log, error)
- SubscribeTxPreEvent(chan<- core.TxPreEvent) event.Subscription
+ SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription
SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription
SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription
SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription
diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go
index bb11734a7..4e999cda8 100644
--- a/eth/filters/filter_system.go
+++ b/eth/filters/filter_system.go
@@ -59,7 +59,7 @@ const (
const (
- // txChanSize is the size of channel listening to TxPreEvent.
+ // txChanSize is the size of channel listening to NewTxsEvent.
// The number is referenced from the size of tx pool.
txChanSize = 4096
// rmLogsChanSize is the size of channel listening to RemovedLogsEvent.
@@ -80,7 +80,7 @@ type subscription struct {
created time.Time
logsCrit ethereum.FilterQuery
logs chan []*types.Log
- hashes chan common.Hash
+ hashes chan []common.Hash
headers chan *types.Header
installed chan struct{} // closed when the filter is installed
err chan error // closed when the filter is uninstalled
@@ -95,7 +95,7 @@ type EventSystem struct {
lastHead *types.Header
// Subscriptions
- txSub event.Subscription // Subscription for new transaction event
+ txsSub event.Subscription // Subscription for new transaction event
logsSub event.Subscription // Subscription for new log event
rmLogsSub event.Subscription // Subscription for removed log event
chainSub event.Subscription // Subscription for new chain event
@@ -104,7 +104,7 @@ type EventSystem struct {
// Channels
install chan *subscription // install filter for event notification
uninstall chan *subscription // remove filter for event notification
- txCh chan core.TxPreEvent // Channel to receive new transaction event
+ txsCh chan core.NewTxsEvent // Channel to receive new transactions event
logsCh chan []*types.Log // Channel to receive new log event
rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event
chainCh chan core.ChainEvent // Channel to receive new chain event
@@ -123,14 +123,14 @@ func NewEventSystem(mux *event.TypeMux, backend Backend, lightMode bool) *EventS
lightMode: lightMode,
install: make(chan *subscription),
uninstall: make(chan *subscription),
- txCh: make(chan core.TxPreEvent, txChanSize),
+ txsCh: make(chan core.NewTxsEvent, txChanSize),
logsCh: make(chan []*types.Log, logsChanSize),
rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize),
chainCh: make(chan core.ChainEvent, chainEvChanSize),
}
// Subscribe events
- m.txSub = m.backend.SubscribeTxPreEvent(m.txCh)
+ m.txsSub = m.backend.SubscribeNewTxsEvent(m.txsCh)
m.logsSub = m.backend.SubscribeLogsEvent(m.logsCh)
m.rmLogsSub = m.backend.SubscribeRemovedLogsEvent(m.rmLogsCh)
m.chainSub = m.backend.SubscribeChainEvent(m.chainCh)
@@ -138,7 +138,7 @@ func NewEventSystem(mux *event.TypeMux, backend Backend, lightMode bool) *EventS
m.pendingLogSub = m.mux.Subscribe(core.PendingLogsEvent{})
// Make sure none of the subscriptions are empty
- if m.txSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil ||
+ if m.txsSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil ||
m.pendingLogSub.Closed() {
log.Crit("Subscribe for event system failed")
}
@@ -240,7 +240,7 @@ func (es *EventSystem) subscribeMinedPendingLogs(crit ethereum.FilterQuery, logs
logsCrit: crit,
created: time.Now(),
logs: logs,
- hashes: make(chan common.Hash),
+ hashes: make(chan []common.Hash),
headers: make(chan *types.Header),
installed: make(chan struct{}),
err: make(chan error),
@@ -257,7 +257,7 @@ func (es *EventSystem) subscribeLogs(crit ethereum.FilterQuery, logs chan []*typ
logsCrit: crit,
created: time.Now(),
logs: logs,
- hashes: make(chan common.Hash),
+ hashes: make(chan []common.Hash),
headers: make(chan *types.Header),
installed: make(chan struct{}),
err: make(chan error),
@@ -274,7 +274,7 @@ func (es *EventSystem) subscribePendingLogs(crit ethereum.FilterQuery, logs chan
logsCrit: crit,
created: time.Now(),
logs: logs,
- hashes: make(chan common.Hash),
+ hashes: make(chan []common.Hash),
headers: make(chan *types.Header),
installed: make(chan struct{}),
err: make(chan error),
@@ -290,7 +290,7 @@ func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscripti
typ: BlocksSubscription,
created: time.Now(),
logs: make(chan []*types.Log),
- hashes: make(chan common.Hash),
+ hashes: make(chan []common.Hash),
headers: headers,
installed: make(chan struct{}),
err: make(chan error),
@@ -298,9 +298,9 @@ func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscripti
return es.subscribe(sub)
}
-// SubscribePendingTxEvents creates a subscription that writes transaction hashes for
+// SubscribePendingTxs creates a subscription that writes transaction hashes for
// transactions that enter the transaction pool.
-func (es *EventSystem) SubscribePendingTxEvents(hashes chan common.Hash) *Subscription {
+func (es *EventSystem) SubscribePendingTxs(hashes chan []common.Hash) *Subscription {
sub := &subscription{
id: rpc.NewID(),
typ: PendingTransactionsSubscription,
@@ -348,9 +348,13 @@ func (es *EventSystem) broadcast(filters filterIndex, ev interface{}) {
}
}
}
- case core.TxPreEvent:
+ case core.NewTxsEvent:
+ hashes := make([]common.Hash, 0, len(e.Txs))
+ for _, tx := range e.Txs {
+ hashes = append(hashes, tx.Hash())
+ }
for _, f := range filters[PendingTransactionsSubscription] {
- f.hashes <- e.Tx.Hash()
+ f.hashes <- hashes
}
case core.ChainEvent:
for _, f := range filters[BlocksSubscription] {
@@ -446,7 +450,7 @@ func (es *EventSystem) eventLoop() {
// Ensure all subscriptions get cleaned up
defer func() {
es.pendingLogSub.Unsubscribe()
- es.txSub.Unsubscribe()
+ es.txsSub.Unsubscribe()
es.logsSub.Unsubscribe()
es.rmLogsSub.Unsubscribe()
es.chainSub.Unsubscribe()
@@ -460,7 +464,7 @@ func (es *EventSystem) eventLoop() {
for {
select {
// Handle subscribed events
- case ev := <-es.txCh:
+ case ev := <-es.txsCh:
es.broadcast(index, ev)
case ev := <-es.logsCh:
es.broadcast(index, ev)
@@ -495,7 +499,7 @@ func (es *EventSystem) eventLoop() {
close(f.err)
// System stopped
- case <-es.txSub.Err():
+ case <-es.txsSub.Err():
return
case <-es.logsSub.Err():
return
diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go
index b4df24b47..ff1af85a8 100644
--- a/eth/filters/filter_system_test.go
+++ b/eth/filters/filter_system_test.go
@@ -96,7 +96,7 @@ func (b *testBackend) GetLogs(ctx context.Context, hash common.Hash) ([][]*types
return logs, nil
}
-func (b *testBackend) SubscribeTxPreEvent(ch chan<- core.TxPreEvent) event.Subscription {
+func (b *testBackend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription {
return b.txFeed.Subscribe(ch)
}
@@ -232,10 +232,7 @@ func TestPendingTxFilter(t *testing.T) {
fid0 := api.NewPendingTransactionFilter()
time.Sleep(1 * time.Second)
- for _, tx := range transactions {
- ev := core.TxPreEvent{Tx: tx}
- txFeed.Send(ev)
- }
+ txFeed.Send(core.NewTxsEvent{Txs: transactions})
timeout := time.Now().Add(1 * time.Second)
for {
diff --git a/eth/handler.go b/eth/handler.go
index c8f7e13f1..8993afe15 100644
--- a/eth/handler.go
+++ b/eth/handler.go
@@ -46,7 +46,7 @@ const (
softResponseLimit = 2 * 1024 * 1024 // Target maximum size of returned blocks, headers or node data.
estHeaderRlpSize = 500 // Approximate size of an RLP encoded block header
- // txChanSize is the size of channel listening to TxPreEvent.
+ // txChanSize is the size of channel listening to NewTxsEvent.
// The number is referenced from the size of tx pool.
txChanSize = 4096
)
@@ -81,8 +81,8 @@ type ProtocolManager struct {
SubProtocols []p2p.Protocol
eventMux *event.TypeMux
- txCh chan core.TxPreEvent
- txSub event.Subscription
+ txsCh chan core.NewTxsEvent
+ txsSub event.Subscription
minedBlockSub *event.TypeMuxSubscription
// channels for fetcher, syncer, txsyncLoop
@@ -204,8 +204,8 @@ func (pm *ProtocolManager) Start(maxPeers int) {
pm.maxPeers = maxPeers
// broadcast transactions
- pm.txCh = make(chan core.TxPreEvent, txChanSize)
- pm.txSub = pm.txpool.SubscribeTxPreEvent(pm.txCh)
+ pm.txsCh = make(chan core.NewTxsEvent, txChanSize)
+ pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh)
go pm.txBroadcastLoop()
// broadcast mined blocks
@@ -220,7 +220,7 @@ func (pm *ProtocolManager) Start(maxPeers int) {
func (pm *ProtocolManager) Stop() {
log.Info("Stopping Ethereum protocol")
- pm.txSub.Unsubscribe() // quits txBroadcastLoop
+ pm.txsSub.Unsubscribe() // quits txBroadcastLoop
pm.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop
// Quit the sync loop.
@@ -712,16 +712,23 @@ func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) {
}
}
-// BroadcastTx will propagate a transaction to all peers which are not known to
+// BroadcastTxs will propagate a batch of transactions to all peers which are not known to
// already have the given transaction.
-func (pm *ProtocolManager) BroadcastTx(hash common.Hash, tx *types.Transaction) {
- // Broadcast transaction to a batch of peers not knowing about it
- peers := pm.peers.PeersWithoutTx(hash)
- //FIXME include this again: peers = peers[:int(math.Sqrt(float64(len(peers))))]
- for _, peer := range peers {
- peer.SendTransactions(types.Transactions{tx})
+func (pm *ProtocolManager) BroadcastTxs(txs types.Transactions) {
+ var txset = make(map[*peer]types.Transactions)
+
+ // Broadcast transactions to a batch of peers not knowing about it
+ for _, tx := range txs {
+ peers := pm.peers.PeersWithoutTx(tx.Hash())
+ for _, peer := range peers {
+ txset[peer] = append(txset[peer], tx)
+ }
+ log.Trace("Broadcast transaction", "hash", tx.Hash(), "recipients", len(peers))
+ }
+ // FIXME include this again: peers = peers[:int(math.Sqrt(float64(len(peers))))]
+ for peer, txs := range txset {
+ peer.SendTransactions(txs)
}
- log.Trace("Broadcast transaction", "hash", hash, "recipients", len(peers))
}
// Mined broadcast loop
@@ -739,11 +746,11 @@ func (pm *ProtocolManager) minedBroadcastLoop() {
func (pm *ProtocolManager) txBroadcastLoop() {
for {
select {
- case event := <-pm.txCh:
- pm.BroadcastTx(event.Tx.Hash(), event.Tx)
+ case event := <-pm.txsCh:
+ pm.BroadcastTxs(event.Txs)
// Err() channel will be closed when unsubscribing.
- case <-pm.txSub.Err():
+ case <-pm.txsSub.Err():
return
}
}
diff --git a/eth/helper_test.go b/eth/helper_test.go
index 8a0260fc9..3d2ab0aba 100644
--- a/eth/helper_test.go
+++ b/eth/helper_test.go
@@ -124,7 +124,7 @@ func (p *testTxPool) Pending() (map[common.Address]types.Transactions, error) {
return batches, nil
}
-func (p *testTxPool) SubscribeTxPreEvent(ch chan<- core.TxPreEvent) event.Subscription {
+func (p *testTxPool) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription {
return p.txFeed.Subscribe(ch)
}
diff --git a/eth/protocol.go b/eth/protocol.go
index 328d5b993..0e90e6a2e 100644
--- a/eth/protocol.go
+++ b/eth/protocol.go
@@ -103,9 +103,9 @@ type txPool interface {
// The slice should be modifiable by the caller.
Pending() (map[common.Address]types.Transactions, error)
- // SubscribeTxPreEvent should return an event subscription of
- // TxPreEvent and send events to the given channel.
- SubscribeTxPreEvent(chan<- core.TxPreEvent) event.Subscription
+ // SubscribeNewTxsEvent should return an event subscription of
+ // NewTxsEvent and send events to the given channel.
+ SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription
}
// statusData is the network packet for the status message.
diff --git a/eth/protocol_test.go b/eth/protocol_test.go
index b2f93d8dd..aa43dfa92 100644
--- a/eth/protocol_test.go
+++ b/eth/protocol_test.go
@@ -116,7 +116,7 @@ func testRecvTransactions(t *testing.T, protocol int) {
t.Errorf("added wrong tx hash: got %v, want %v", added[0].Hash(), tx.Hash())
}
case <-time.After(2 * time.Second):
- t.Errorf("no TxPreEvent received within 2 seconds")
+ t.Errorf("no NewTxsEvent received within 2 seconds")
}
}
diff --git a/ethstats/ethstats.go b/ethstats/ethstats.go
index a15d84615..c11601435 100644
--- a/ethstats/ethstats.go
+++ b/ethstats/ethstats.go
@@ -49,7 +49,7 @@ const (
// history request.
historyUpdateRange = 50
- // txChanSize is the size of channel listening to TxPreEvent.
+ // txChanSize is the size of channel listening to NewTxsEvent.
// The number is referenced from the size of tx pool.
txChanSize = 4096
// chainHeadChanSize is the size of channel listening to ChainHeadEvent.
@@ -57,9 +57,9 @@ const (
)
type txPool interface {
- // SubscribeTxPreEvent should return an event subscription of
- // TxPreEvent and send events to the given channel.
- SubscribeTxPreEvent(chan<- core.TxPreEvent) event.Subscription
+ // SubscribeNewTxsEvent should return an event subscription of
+ // NewTxsEvent and send events to the given channel.
+ SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription
}
type blockChain interface {
@@ -150,8 +150,8 @@ func (s *Service) loop() {
headSub := blockchain.SubscribeChainHeadEvent(chainHeadCh)
defer headSub.Unsubscribe()
- txEventCh := make(chan core.TxPreEvent, txChanSize)
- txSub := txpool.SubscribeTxPreEvent(txEventCh)
+ txEventCh := make(chan core.NewTxsEvent, txChanSize)
+ txSub := txpool.SubscribeNewTxsEvent(txEventCh)
defer txSub.Unsubscribe()
// Start a goroutine that exhausts the subsciptions to avoid events piling up
diff --git a/internal/ethapi/backend.go b/internal/ethapi/backend.go
index af95d7906..c9ffe230c 100644
--- a/internal/ethapi/backend.go
+++ b/internal/ethapi/backend.go
@@ -65,7 +65,7 @@ type Backend interface {
GetPoolNonce(ctx context.Context, addr common.Address) (uint64, error)
Stats() (pending int, queued int)
TxPoolContent() (map[common.Address]types.Transactions, map[common.Address]types.Transactions)
- SubscribeTxPreEvent(chan<- core.TxPreEvent) event.Subscription
+ SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription
ChainConfig() *params.ChainConfig
CurrentBlock() *types.Block
diff --git a/les/api_backend.go b/les/api_backend.go
index 1d3c99513..dea33c470 100644
--- a/les/api_backend.go
+++ b/les/api_backend.go
@@ -136,8 +136,8 @@ func (b *LesApiBackend) TxPoolContent() (map[common.Address]types.Transactions,
return b.eth.txPool.Content()
}
-func (b *LesApiBackend) SubscribeTxPreEvent(ch chan<- core.TxPreEvent) event.Subscription {
- return b.eth.txPool.SubscribeTxPreEvent(ch)
+func (b *LesApiBackend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription {
+ return b.eth.txPool.SubscribeNewTxsEvent(ch)
}
func (b *LesApiBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription {
diff --git a/light/postprocess.go b/light/postprocess.go
index 8b9a02911..c06c18027 100644
--- a/light/postprocess.go
+++ b/light/postprocess.go
@@ -59,18 +59,18 @@ type trustedCheckpoint struct {
var (
mainnetCheckpoint = trustedCheckpoint{
name: "mainnet",
- sectionIdx: 165,
- sectionHead: common.HexToHash("21028acf9cd9ce80257221adc437c3c58ce046c4d43c21c3e9b1d1349059ec73"),
- chtRoot: common.HexToHash("26b2458cb7d0080d3a39311c914be92c368777a65ec074e1893b8bdc79e3910a"),
- bloomTrieRoot: common.HexToHash("5d06908769179186165a72db7fc3473b25c28ed27efe78a392a9ff2c3fa67f84"),
+ sectionIdx: 170,
+ sectionHead: common.HexToHash("3bb2c28bcce463d57968f14f56cdb3fbf35349ab7a701f44c1afb57349c9a356"),
+ chtRoot: common.HexToHash("d92b6d0853455f8439086292338e87f69781921680dd7aa072fb71547b87415e"),
+ bloomTrieRoot: common.HexToHash("e4e8250a2fefddead7ae42daecd848cbf9b66d748a8270f8bbd4370b764bb9e9"),
}
ropstenCheckpoint = trustedCheckpoint{
name: "ropsten",
- sectionIdx: 92,
- sectionHead: common.HexToHash("21a158f9cc643da13a237cafceb37381072649f7278cf98c5820bfbced7cfcec"),
- chtRoot: common.HexToHash("1a8ddb8b086d7a33ca90eea90730225948fa504ae0283b15aff3c15c0e089bf9"),
- bloomTrieRoot: common.HexToHash("fd192f92afbcdd0020c81ca0625116b5995509659653b10123bd986fe5129cc1"),
+ sectionIdx: 97,
+ sectionHead: common.HexToHash("719448c67c01eb5b9f27833a36a4e34612f66801316d7ff37daf9e77fb4cd095"),
+ chtRoot: common.HexToHash("a7857afc15930ca6e583b6c3d563a025144011655843d52d28e2fdaadd417bea"),
+ bloomTrieRoot: common.HexToHash("9c71d4b50cbec86dfeaa8e08992de8a4667b81d13c54d6522b17ce2fc5d36416"),
}
)
diff --git a/light/txpool.go b/light/txpool.go
index 94c8139cb..1fabc3dc5 100644
--- a/light/txpool.go
+++ b/light/txpool.go
@@ -321,9 +321,9 @@ func (pool *TxPool) Stop() {
log.Info("Transaction pool stopped")
}
-// SubscribeTxPreEvent registers a subscription of core.TxPreEvent and
+// SubscribeNewTxsEvent registers a subscription of core.NewTxsEvent and
// starts sending event to the given channel.
-func (pool *TxPool) SubscribeTxPreEvent(ch chan<- core.TxPreEvent) event.Subscription {
+func (pool *TxPool) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription {
return pool.scope.Track(pool.txFeed.Subscribe(ch))
}
@@ -412,7 +412,7 @@ func (self *TxPool) add(ctx context.Context, tx *types.Transaction) error {
// Notify the subscribers. This event is posted in a goroutine
// because it's possible that somewhere during the post "Remove transaction"
// gets called which will then wait for the global tx pool lock and deadlock.
- go self.txFeed.Send(core.TxPreEvent{Tx: tx})
+ go self.txFeed.Send(core.NewTxsEvent{Txs: types.Transactions{tx}})
}
// Print a log message if low enough level is set
diff --git a/miner/worker.go b/miner/worker.go
index 48b0b2765..640e9032e 100644
--- a/miner/worker.go
+++ b/miner/worker.go
@@ -42,7 +42,7 @@ const (
resultQueueSize = 10
miningLogAtDepth = 5
- // txChanSize is the size of channel listening to TxPreEvent.
+ // txChanSize is the size of channel listening to NewTxsEvent.
// The number is referenced from the size of tx pool.
txChanSize = 4096
// chainHeadChanSize is the size of channel listening to ChainHeadEvent.
@@ -71,6 +71,7 @@ type Work struct {
family *set.Set // family set (used for checking uncle invalidity)
uncles *set.Set // uncle set
tcount int // tx count in cycle
+ gasPool *core.GasPool // available gas used to pack transactions
Block *types.Block // the new block
@@ -95,8 +96,8 @@ type worker struct {
// update loop
mux *event.TypeMux
- txCh chan core.TxPreEvent
- txSub event.Subscription
+ txsCh chan core.NewTxsEvent
+ txsSub event.Subscription
chainHeadCh chan core.ChainHeadEvent
chainHeadSub event.Subscription
chainSideCh chan core.ChainSideEvent
@@ -137,7 +138,7 @@ func newWorker(config *params.ChainConfig, engine consensus.Engine, coinbase com
engine: engine,
eth: eth,
mux: mux,
- txCh: make(chan core.TxPreEvent, txChanSize),
+ txsCh: make(chan core.NewTxsEvent, txChanSize),
chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize),
chainSideCh: make(chan core.ChainSideEvent, chainSideChanSize),
chainDb: eth.ChainDb(),
@@ -149,8 +150,8 @@ func newWorker(config *params.ChainConfig, engine consensus.Engine, coinbase com
agents: make(map[Agent]struct{}),
unconfirmed: newUnconfirmedBlocks(eth.BlockChain(), miningLogAtDepth),
}
- // Subscribe TxPreEvent for tx pool
- worker.txSub = eth.TxPool().SubscribeTxPreEvent(worker.txCh)
+ // Subscribe NewTxsEvent for tx pool
+ worker.txsSub = eth.TxPool().SubscribeNewTxsEvent(worker.txsCh)
// Subscribe events for blockchain
worker.chainHeadSub = eth.BlockChain().SubscribeChainHeadEvent(worker.chainHeadCh)
worker.chainSideSub = eth.BlockChain().SubscribeChainSideEvent(worker.chainSideCh)
@@ -241,7 +242,7 @@ func (self *worker) unregister(agent Agent) {
}
func (self *worker) update() {
- defer self.txSub.Unsubscribe()
+ defer self.txsSub.Unsubscribe()
defer self.chainHeadSub.Unsubscribe()
defer self.chainSideSub.Unsubscribe()
@@ -258,15 +259,21 @@ func (self *worker) update() {
self.possibleUncles[ev.Block.Hash()] = ev.Block
self.uncleMu.Unlock()
- // Handle TxPreEvent
- case ev := <-self.txCh:
- // Apply transaction to the pending state if we're not mining
+ // Handle NewTxsEvent
+ case ev := <-self.txsCh:
+ // Apply transactions to the pending state if we're not mining.
+ //
+ // Note all transactions received may not be continuous with transactions
+ // already included in the current mining block. These transactions will
+ // be automatically eliminated.
if atomic.LoadInt32(&self.mining) == 0 {
self.currentMu.Lock()
- acc, _ := types.Sender(self.current.signer, ev.Tx)
- txs := map[common.Address]types.Transactions{acc: {ev.Tx}}
+ txs := make(map[common.Address]types.Transactions)
+ for _, tx := range ev.Txs {
+ acc, _ := types.Sender(self.current.signer, tx)
+ txs[acc] = append(txs[acc], tx)
+ }
txset := types.NewTransactionsByPriceAndNonce(self.current.signer, txs)
-
self.current.commitTransactions(self.mux, txset, self.chain, self.coinbase)
self.updateSnapshot()
self.currentMu.Unlock()
@@ -278,7 +285,7 @@ func (self *worker) update() {
}
// System stopped
- case <-self.txSub.Err():
+ case <-self.txsSub.Err():
return
case <-self.chainHeadSub.Err():
return
@@ -522,14 +529,16 @@ func (self *worker) updateSnapshot() {
}
func (env *Work) commitTransactions(mux *event.TypeMux, txs *types.TransactionsByPriceAndNonce, bc *core.BlockChain, coinbase common.Address) {
- gp := new(core.GasPool).AddGas(env.header.GasLimit)
+ if env.gasPool == nil {
+ env.gasPool = new(core.GasPool).AddGas(env.header.GasLimit)
+ }
var coalescedLogs []*types.Log
for {
// If we don't have enough gas for any further transactions then we're done
- if gp.Gas() < params.TxGas {
- log.Trace("Not enough gas for further transactions", "gp", gp)
+ if env.gasPool.Gas() < params.TxGas {
+ log.Trace("Not enough gas for further transactions", "have", env.gasPool, "want", params.TxGas)
break
}
// Retrieve the next transaction and abort if all done
@@ -553,7 +562,7 @@ func (env *Work) commitTransactions(mux *event.TypeMux, txs *types.TransactionsB
// Start executing the transaction
env.state.Prepare(tx.Hash(), common.Hash{}, env.tcount)
- err, logs := env.commitTransaction(tx, bc, coinbase, gp)
+ err, logs := env.commitTransaction(tx, bc, coinbase, env.gasPool)
switch err {
case core.ErrGasLimitReached:
// Pop the current out-of-gas transaction without shifting in the next from the account
diff --git a/p2p/enr/enr.go b/p2p/enr/enr.go
index c018895cc..48683471d 100644
--- a/p2p/enr/enr.go
+++ b/p2p/enr/enr.go
@@ -29,21 +29,16 @@ package enr
import (
"bytes"
- "crypto/ecdsa"
"errors"
"fmt"
"io"
"sort"
- "github.com/ethereum/go-ethereum/crypto"
- "github.com/ethereum/go-ethereum/crypto/sha3"
"github.com/ethereum/go-ethereum/rlp"
)
const SizeLimit = 300 // maximum encoded size of a node record in bytes
-const ID_SECP256k1_KECCAK = ID("secp256k1-keccak") // the default identity scheme
-
var (
errNoID = errors.New("unknown or unspecified identity scheme")
errInvalidSig = errors.New("invalid signature")
@@ -80,8 +75,8 @@ func (r *Record) Seq() uint64 {
}
// SetSeq updates the record sequence number. This invalidates any signature on the record.
-// Calling SetSeq is usually not required because signing the redord increments the
-// sequence number.
+// Calling SetSeq is usually not required because setting any key in a signed record
+// increments the sequence number.
func (r *Record) SetSeq(s uint64) {
r.signature = nil
r.raw = nil
@@ -104,33 +99,42 @@ func (r *Record) Load(e Entry) error {
return &KeyError{Key: e.ENRKey(), Err: errNotFound}
}
-// Set adds or updates the given entry in the record.
-// It panics if the value can't be encoded.
+// Set adds or updates the given entry in the record. It panics if the value can't be
+// encoded. If the record is signed, Set increments the sequence number and invalidates
+// the sequence number.
func (r *Record) Set(e Entry) {
- r.signature = nil
- r.raw = nil
blob, err := rlp.EncodeToBytes(e)
if err != nil {
panic(fmt.Errorf("enr: can't encode %s: %v", e.ENRKey(), err))
}
+ r.invalidate()
- i := sort.Search(len(r.pairs), func(i int) bool { return r.pairs[i].k >= e.ENRKey() })
-
- if i < len(r.pairs) && r.pairs[i].k == e.ENRKey() {
+ pairs := make([]pair, len(r.pairs))
+ copy(pairs, r.pairs)
+ i := sort.Search(len(pairs), func(i int) bool { return pairs[i].k >= e.ENRKey() })
+ switch {
+ case i < len(pairs) && pairs[i].k == e.ENRKey():
// element is present at r.pairs[i]
- r.pairs[i].v = blob
- return
- } else if i < len(r.pairs) {
+ pairs[i].v = blob
+ case i < len(r.pairs):
// insert pair before i-th elem
el := pair{e.ENRKey(), blob}
- r.pairs = append(r.pairs, pair{})
- copy(r.pairs[i+1:], r.pairs[i:])
- r.pairs[i] = el
- return
+ pairs = append(pairs, pair{})
+ copy(pairs[i+1:], pairs[i:])
+ pairs[i] = el
+ default:
+ // element should be placed at the end of r.pairs
+ pairs = append(pairs, pair{e.ENRKey(), blob})
}
+ r.pairs = pairs
+}
- // element should be placed at the end of r.pairs
- r.pairs = append(r.pairs, pair{e.ENRKey(), blob})
+func (r *Record) invalidate() {
+ if r.signature == nil {
+ r.seq++
+ }
+ r.signature = nil
+ r.raw = nil
}
// EncodeRLP implements rlp.Encoder. Encoding fails if
@@ -196,39 +200,55 @@ func (r *Record) DecodeRLP(s *rlp.Stream) error {
return err
}
- // Verify signature.
- if err = dec.verifySignature(); err != nil {
+ _, scheme := dec.idScheme()
+ if scheme == nil {
+ return errNoID
+ }
+ if err := scheme.Verify(&dec, dec.signature); err != nil {
return err
}
*r = dec
return nil
}
-type s256raw []byte
-
-func (s256raw) ENRKey() string { return "secp256k1" }
-
// NodeAddr returns the node address. The return value will be nil if the record is
-// unsigned.
+// unsigned or uses an unknown identity scheme.
func (r *Record) NodeAddr() []byte {
- var entry s256raw
- if r.Load(&entry) != nil {
+ _, scheme := r.idScheme()
+ if scheme == nil {
return nil
}
- return crypto.Keccak256(entry)
+ return scheme.NodeAddr(r)
}
-// Sign signs the record with the given private key. It updates the record's identity
-// scheme, public key and increments the sequence number. Sign returns an error if the
-// encoded record is larger than the size limit.
-func (r *Record) Sign(privkey *ecdsa.PrivateKey) error {
- r.seq = r.seq + 1
- r.Set(ID_SECP256k1_KECCAK)
- r.Set(Secp256k1(privkey.PublicKey))
- return r.signAndEncode(privkey)
+// SetSig sets the record signature. It returns an error if the encoded record is larger
+// than the size limit or if the signature is invalid according to the passed scheme.
+func (r *Record) SetSig(idscheme string, sig []byte) error {
+ // Check that "id" is set and matches the given scheme. This panics because
+ // inconsitencies here are always implementation bugs in the signing function calling
+ // this method.
+ id, s := r.idScheme()
+ if s == nil {
+ panic(errNoID)
+ }
+ if id != idscheme {
+ panic(fmt.Errorf("identity scheme mismatch in Sign: record has %s, want %s", id, idscheme))
+ }
+
+ // Verify against the scheme.
+ if err := s.Verify(r, sig); err != nil {
+ return err
+ }
+ raw, err := r.encode(sig)
+ if err != nil {
+ return err
+ }
+ r.signature, r.raw = sig, raw
+ return nil
}
-func (r *Record) appendPairs(list []interface{}) []interface{} {
+// AppendElements appends the sequence number and entries to the given slice.
+func (r *Record) AppendElements(list []interface{}) []interface{} {
list = append(list, r.seq)
for _, p := range r.pairs {
list = append(list, p.k, p.v)
@@ -236,54 +256,23 @@ func (r *Record) appendPairs(list []interface{}) []interface{} {
return list
}
-func (r *Record) signAndEncode(privkey *ecdsa.PrivateKey) error {
- // Put record elements into a flat list. Leave room for the signature.
- list := make([]interface{}, 1, len(r.pairs)*2+2)
- list = r.appendPairs(list)
-
- // Sign the tail of the list.
- h := sha3.NewKeccak256()
- rlp.Encode(h, list[1:])
- sig, err := crypto.Sign(h.Sum(nil), privkey)
- if err != nil {
- return err
- }
- sig = sig[:len(sig)-1] // remove v
-
- // Put signature in front.
- r.signature, list[0] = sig, sig
- r.raw, err = rlp.EncodeToBytes(list)
- if err != nil {
- return err
+func (r *Record) encode(sig []byte) (raw []byte, err error) {
+ list := make([]interface{}, 1, 2*len(r.pairs)+1)
+ list[0] = sig
+ list = r.AppendElements(list)
+ if raw, err = rlp.EncodeToBytes(list); err != nil {
+ return nil, err
}
- if len(r.raw) > SizeLimit {
- return errTooBig
+ if len(raw) > SizeLimit {
+ return nil, errTooBig
}
- return nil
+ return raw, nil
}
-func (r *Record) verifySignature() error {
- // Get identity scheme, public key, signature.
+func (r *Record) idScheme() (string, IdentityScheme) {
var id ID
- var entry s256raw
if err := r.Load(&id); err != nil {
- return err
- } else if id != ID_SECP256k1_KECCAK {
- return errNoID
+ return "", nil
}
- if err := r.Load(&entry); err != nil {
- return err
- } else if len(entry) != 33 {
- return fmt.Errorf("invalid public key")
- }
-
- // Verify the signature.
- list := make([]interface{}, 0, len(r.pairs)*2+1)
- list = r.appendPairs(list)
- h := sha3.NewKeccak256()
- rlp.Encode(h, list)
- if !crypto.VerifySignature(entry, h.Sum(nil), r.signature) {
- return errInvalidSig
- }
- return nil
+ return string(id), FindIdentityScheme(string(id))
}
diff --git a/p2p/enr/enr_test.go b/p2p/enr/enr_test.go
index ce7767d10..d1d088756 100644
--- a/p2p/enr/enr_test.go
+++ b/p2p/enr/enr_test.go
@@ -54,35 +54,35 @@ func TestGetSetID(t *testing.T) {
assert.Equal(t, id, id2)
}
-// TestGetSetIP4 tests encoding/decoding and setting/getting of the IP4 key.
+// TestGetSetIP4 tests encoding/decoding and setting/getting of the IP key.
func TestGetSetIP4(t *testing.T) {
- ip := IP4{192, 168, 0, 3}
+ ip := IP{192, 168, 0, 3}
var r Record
r.Set(ip)
- var ip2 IP4
+ var ip2 IP
require.NoError(t, r.Load(&ip2))
assert.Equal(t, ip, ip2)
}
-// TestGetSetIP6 tests encoding/decoding and setting/getting of the IP6 key.
+// TestGetSetIP6 tests encoding/decoding and setting/getting of the IP key.
func TestGetSetIP6(t *testing.T) {
- ip := IP6{0x20, 0x01, 0x48, 0x60, 0, 0, 0x20, 0x01, 0, 0, 0, 0, 0, 0, 0x00, 0x68}
+ ip := IP{0x20, 0x01, 0x48, 0x60, 0, 0, 0x20, 0x01, 0, 0, 0, 0, 0, 0, 0x00, 0x68}
var r Record
r.Set(ip)
- var ip2 IP6
+ var ip2 IP
require.NoError(t, r.Load(&ip2))
assert.Equal(t, ip, ip2)
}
// TestGetSetDiscPort tests encoding/decoding and setting/getting of the DiscPort key.
-func TestGetSetDiscPort(t *testing.T) {
- port := DiscPort(30309)
+func TestGetSetUDP(t *testing.T) {
+ port := UDP(30309)
var r Record
r.Set(port)
- var port2 DiscPort
+ var port2 UDP
require.NoError(t, r.Load(&port2))
assert.Equal(t, port, port2)
}
@@ -90,7 +90,7 @@ func TestGetSetDiscPort(t *testing.T) {
// TestGetSetSecp256k1 tests encoding/decoding and setting/getting of the Secp256k1 key.
func TestGetSetSecp256k1(t *testing.T) {
var r Record
- if err := r.Sign(privkey); err != nil {
+ if err := SignV4(&r, privkey); err != nil {
t.Fatal(err)
}
@@ -101,16 +101,16 @@ func TestGetSetSecp256k1(t *testing.T) {
func TestLoadErrors(t *testing.T) {
var r Record
- ip4 := IP4{127, 0, 0, 1}
+ ip4 := IP{127, 0, 0, 1}
r.Set(ip4)
// Check error for missing keys.
- var ip6 IP6
- err := r.Load(&ip6)
+ var udp UDP
+ err := r.Load(&udp)
if !IsNotFound(err) {
t.Error("IsNotFound should return true for missing key")
}
- assert.Equal(t, &KeyError{Key: ip6.ENRKey(), Err: errNotFound}, err)
+ assert.Equal(t, &KeyError{Key: udp.ENRKey(), Err: errNotFound}, err)
// Check error for invalid keys.
var list []uint
@@ -174,7 +174,7 @@ func TestDirty(t *testing.T) {
t.Errorf("expected errEncodeUnsigned, got %#v", err)
}
- require.NoError(t, r.Sign(privkey))
+ require.NoError(t, SignV4(&r, privkey))
if !r.Signed() {
t.Error("Signed return false for signed record")
}
@@ -194,13 +194,13 @@ func TestDirty(t *testing.T) {
func TestGetSetOverwrite(t *testing.T) {
var r Record
- ip := IP4{192, 168, 0, 3}
+ ip := IP{192, 168, 0, 3}
r.Set(ip)
- ip2 := IP4{192, 168, 0, 4}
+ ip2 := IP{192, 168, 0, 4}
r.Set(ip2)
- var ip3 IP4
+ var ip3 IP
require.NoError(t, r.Load(&ip3))
assert.Equal(t, ip2, ip3)
}
@@ -208,9 +208,9 @@ func TestGetSetOverwrite(t *testing.T) {
// TestSignEncodeAndDecode tests signing, RLP encoding and RLP decoding of a record.
func TestSignEncodeAndDecode(t *testing.T) {
var r Record
- r.Set(DiscPort(30303))
- r.Set(IP4{127, 0, 0, 1})
- require.NoError(t, r.Sign(privkey))
+ r.Set(UDP(30303))
+ r.Set(IP{127, 0, 0, 1})
+ require.NoError(t, SignV4(&r, privkey))
blob, err := rlp.EncodeToBytes(r)
require.NoError(t, err)
@@ -230,12 +230,12 @@ func TestNodeAddr(t *testing.T) {
t.Errorf("wrong address on empty record: got %v, want %v", addr, nil)
}
- require.NoError(t, r.Sign(privkey))
- expected := "caaa1485d83b18b32ed9ad666026151bf0cae8a0a88c857ae2d4c5be2daa6726"
+ require.NoError(t, SignV4(&r, privkey))
+ expected := "a448f24c6d18e575453db13171562b71999873db5b286df957af199ec94617f7"
assert.Equal(t, expected, hex.EncodeToString(r.NodeAddr()))
}
-var pyRecord, _ = hex.DecodeString("f896b840954dc36583c1f4b69ab59b1375f362f06ee99f3723cd77e64b6de6d211c27d7870642a79d4516997f94091325d2a7ca6215376971455fb221d34f35b277149a1018664697363763582765f82696490736563703235366b312d6b656363616b83697034847f00000189736563703235366b31a103ca634cae0d49acb401d8a4c6b6fe8c55b70d115bf400769cc1400f3258cd3138")
+var pyRecord, _ = hex.DecodeString("f884b8407098ad865b00a582051940cb9cf36836572411a47278783077011599ed5cd16b76f2635f4e234738f30813a89eb9137e3e3df5266e3a1f11df72ecf1145ccb9c01826964827634826970847f00000189736563703235366b31a103ca634cae0d49acb401d8a4c6b6fe8c55b70d115bf400769cc1400f3258cd31388375647082765f")
// TestPythonInterop checks that we can decode and verify a record produced by the Python
// implementation.
@@ -246,10 +246,10 @@ func TestPythonInterop(t *testing.T) {
}
var (
- wantAddr, _ = hex.DecodeString("caaa1485d83b18b32ed9ad666026151bf0cae8a0a88c857ae2d4c5be2daa6726")
- wantSeq = uint64(1)
- wantIP = IP4{127, 0, 0, 1}
- wantDiscport = DiscPort(30303)
+ wantAddr, _ = hex.DecodeString("a448f24c6d18e575453db13171562b71999873db5b286df957af199ec94617f7")
+ wantSeq = uint64(1)
+ wantIP = IP{127, 0, 0, 1}
+ wantUDP = UDP(30303)
)
if r.Seq() != wantSeq {
t.Errorf("wrong seq: got %d, want %d", r.Seq(), wantSeq)
@@ -257,7 +257,7 @@ func TestPythonInterop(t *testing.T) {
if addr := r.NodeAddr(); !bytes.Equal(addr, wantAddr) {
t.Errorf("wrong addr: got %x, want %x", addr, wantAddr)
}
- want := map[Entry]interface{}{new(IP4): &wantIP, new(DiscPort): &wantDiscport}
+ want := map[Entry]interface{}{new(IP): &wantIP, new(UDP): &wantUDP}
for k, v := range want {
desc := fmt.Sprintf("loading key %q", k.ENRKey())
if assert.NoError(t, r.Load(k), desc) {
@@ -272,14 +272,14 @@ func TestRecordTooBig(t *testing.T) {
key := randomString(10)
// set a big value for random key, expect error
- r.Set(WithEntry(key, randomString(300)))
- if err := r.Sign(privkey); err != errTooBig {
+ r.Set(WithEntry(key, randomString(SizeLimit)))
+ if err := SignV4(&r, privkey); err != errTooBig {
t.Fatalf("expected to get errTooBig, got %#v", err)
}
// set an acceptable value for random key, expect no error
r.Set(WithEntry(key, randomString(100)))
- require.NoError(t, r.Sign(privkey))
+ require.NoError(t, SignV4(&r, privkey))
}
// TestSignEncodeAndDecodeRandom tests encoding/decoding of records containing random key/value pairs.
@@ -295,7 +295,7 @@ func TestSignEncodeAndDecodeRandom(t *testing.T) {
r.Set(WithEntry(key, &value))
}
- require.NoError(t, r.Sign(privkey))
+ require.NoError(t, SignV4(&r, privkey))
_, err := rlp.EncodeToBytes(r)
require.NoError(t, err)
diff --git a/p2p/enr/entries.go b/p2p/enr/entries.go
index 7591e6eff..71c7653a2 100644
--- a/p2p/enr/entries.go
+++ b/p2p/enr/entries.go
@@ -57,59 +57,43 @@ func WithEntry(k string, v interface{}) Entry {
return &generic{key: k, value: v}
}
-// DiscPort is the "discv5" key, which holds the UDP port for discovery v5.
-type DiscPort uint16
+// TCP is the "tcp" key, which holds the TCP port of the node.
+type TCP uint16
-func (v DiscPort) ENRKey() string { return "discv5" }
+func (v TCP) ENRKey() string { return "tcp" }
+
+// UDP is the "udp" key, which holds the UDP port of the node.
+type UDP uint16
+
+func (v UDP) ENRKey() string { return "udp" }
// ID is the "id" key, which holds the name of the identity scheme.
type ID string
+const IDv4 = ID("v4") // the default identity scheme
+
func (v ID) ENRKey() string { return "id" }
-// IP4 is the "ip4" key, which holds a 4-byte IPv4 address.
-type IP4 net.IP
+// IP is the "ip" key, which holds the IP address of the node.
+type IP net.IP
-func (v IP4) ENRKey() string { return "ip4" }
+func (v IP) ENRKey() string { return "ip" }
// EncodeRLP implements rlp.Encoder.
-func (v IP4) EncodeRLP(w io.Writer) error {
- ip4 := net.IP(v).To4()
- if ip4 == nil {
- return fmt.Errorf("invalid IPv4 address: %v", v)
- }
- return rlp.Encode(w, ip4)
-}
-
-// DecodeRLP implements rlp.Decoder.
-func (v *IP4) DecodeRLP(s *rlp.Stream) error {
- if err := s.Decode((*net.IP)(v)); err != nil {
- return err
- }
- if len(*v) != 4 {
- return fmt.Errorf("invalid IPv4 address, want 4 bytes: %v", *v)
+func (v IP) EncodeRLP(w io.Writer) error {
+ if ip4 := net.IP(v).To4(); ip4 != nil {
+ return rlp.Encode(w, ip4)
}
- return nil
-}
-
-// IP6 is the "ip6" key, which holds a 16-byte IPv6 address.
-type IP6 net.IP
-
-func (v IP6) ENRKey() string { return "ip6" }
-
-// EncodeRLP implements rlp.Encoder.
-func (v IP6) EncodeRLP(w io.Writer) error {
- ip6 := net.IP(v)
- return rlp.Encode(w, ip6)
+ return rlp.Encode(w, net.IP(v))
}
// DecodeRLP implements rlp.Decoder.
-func (v *IP6) DecodeRLP(s *rlp.Stream) error {
+func (v *IP) DecodeRLP(s *rlp.Stream) error {
if err := s.Decode((*net.IP)(v)); err != nil {
return err
}
- if len(*v) != 16 {
- return fmt.Errorf("invalid IPv6 address, want 16 bytes: %v", *v)
+ if len(*v) != 4 && len(*v) != 16 {
+ return fmt.Errorf("invalid IP address, want 4 or 16 bytes: %v", *v)
}
return nil
}
diff --git a/p2p/enr/idscheme.go b/p2p/enr/idscheme.go
new file mode 100644
index 000000000..efaf68041
--- /dev/null
+++ b/p2p/enr/idscheme.go
@@ -0,0 +1,114 @@
+// Copyright 2018 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package enr
+
+import (
+ "crypto/ecdsa"
+ "fmt"
+ "sync"
+
+ "github.com/ethereum/go-ethereum/common/math"
+ "github.com/ethereum/go-ethereum/crypto"
+ "github.com/ethereum/go-ethereum/crypto/sha3"
+ "github.com/ethereum/go-ethereum/rlp"
+)
+
+// Registry of known identity schemes.
+var schemes sync.Map
+
+// An IdentityScheme is capable of verifying record signatures and
+// deriving node addresses.
+type IdentityScheme interface {
+ Verify(r *Record, sig []byte) error
+ NodeAddr(r *Record) []byte
+}
+
+// RegisterIdentityScheme adds an identity scheme to the global registry.
+func RegisterIdentityScheme(name string, scheme IdentityScheme) {
+ if _, loaded := schemes.LoadOrStore(name, scheme); loaded {
+ panic("identity scheme " + name + " already registered")
+ }
+}
+
+// FindIdentityScheme resolves name to an identity scheme in the global registry.
+func FindIdentityScheme(name string) IdentityScheme {
+ s, ok := schemes.Load(name)
+ if !ok {
+ return nil
+ }
+ return s.(IdentityScheme)
+}
+
+// v4ID is the "v4" identity scheme.
+type v4ID struct{}
+
+func init() {
+ RegisterIdentityScheme("v4", v4ID{})
+}
+
+// SignV4 signs a record using the v4 scheme.
+func SignV4(r *Record, privkey *ecdsa.PrivateKey) error {
+ // Copy r to avoid modifying it if signing fails.
+ cpy := *r
+ cpy.Set(ID("v4"))
+ cpy.Set(Secp256k1(privkey.PublicKey))
+
+ h := sha3.NewKeccak256()
+ rlp.Encode(h, cpy.AppendElements(nil))
+ sig, err := crypto.Sign(h.Sum(nil), privkey)
+ if err != nil {
+ return err
+ }
+ sig = sig[:len(sig)-1] // remove v
+ if err = cpy.SetSig("v4", sig); err == nil {
+ *r = cpy
+ }
+ return err
+}
+
+// s256raw is an unparsed secp256k1 public key entry.
+type s256raw []byte
+
+func (s256raw) ENRKey() string { return "secp256k1" }
+
+func (v4ID) Verify(r *Record, sig []byte) error {
+ var entry s256raw
+ if err := r.Load(&entry); err != nil {
+ return err
+ } else if len(entry) != 33 {
+ return fmt.Errorf("invalid public key")
+ }
+
+ h := sha3.NewKeccak256()
+ rlp.Encode(h, r.AppendElements(nil))
+ if !crypto.VerifySignature(entry, h.Sum(nil), sig) {
+ return errInvalidSig
+ }
+ return nil
+}
+
+func (v4ID) NodeAddr(r *Record) []byte {
+ var pubkey Secp256k1
+ err := r.Load(&pubkey)
+ if err != nil {
+ return nil
+ }
+ buf := make([]byte, 64)
+ math.ReadBits(pubkey.X, buf[:32])
+ math.ReadBits(pubkey.Y, buf[32:])
+ return crypto.Keccak256(buf)
+}
diff --git a/p2p/enr/idscheme_test.go b/p2p/enr/idscheme_test.go
new file mode 100644
index 000000000..d790e12f1
--- /dev/null
+++ b/p2p/enr/idscheme_test.go
@@ -0,0 +1,36 @@
+// Copyright 2018 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package enr
+
+import (
+ "crypto/ecdsa"
+ "math/big"
+ "testing"
+)
+
+// Checks that failure to sign leaves the record unmodified.
+func TestSignError(t *testing.T) {
+ invalidKey := &ecdsa.PrivateKey{D: new(big.Int), PublicKey: *pubkey}
+
+ var r Record
+ if err := SignV4(&r, invalidKey); err == nil {
+ t.Fatal("expected error from SignV4")
+ }
+ if len(r.pairs) > 0 {
+ t.Fatal("expected empty record, have", r.pairs)
+ }
+}
diff --git a/params/version.go b/params/version.go
index 8b96f2516..1e8c43bf8 100644
--- a/params/version.go
+++ b/params/version.go
@@ -23,7 +23,7 @@ import (
const (
VersionMajor = 1 // Major version component of the current release
VersionMinor = 8 // Minor version component of the current release
- VersionPatch = 8 // Patch version component of the current release
+ VersionPatch = 9 // Patch version component of the current release
VersionMeta = "unstable" // Version metadata to append to the version string
)