diff options
32 files changed, 657 insertions, 324 deletions
diff --git a/.travis.yml b/.travis.yml index ce6500c46..6fb70f27d 100644 --- a/.travis.yml +++ b/.travis.yml @@ -126,7 +126,7 @@ matrix: # This builder does the Android Maven and Azure uploads - os: linux - dist: precise # Needed for the android tools + dist: trusty addons: apt: packages: @@ -146,7 +146,7 @@ matrix: git: submodules: false # avoid cloning ethereum/tests before_install: - - curl https://storage.googleapis.com/golang/go1.10.1.linux-amd64.tar.gz | tar -xz + - curl https://storage.googleapis.com/golang/go1.10.2.linux-amd64.tar.gz | tar -xz - export PATH=`pwd`/go/bin:$PATH - export GOROOT=`pwd`/go - export GOPATH=$HOME/go @@ -1 +1 @@ -1.8.8 +1.8.9 diff --git a/accounts/abi/argument.go b/accounts/abi/argument.go index 512d8fdfa..93b513c34 100644 --- a/accounts/abi/argument.go +++ b/accounts/abi/argument.go @@ -111,9 +111,14 @@ func (arguments Arguments) unpackTuple(v interface{}, marshalledValues []interfa if err := requireUnpackKind(value, typ, kind, arguments); err != nil { return err } - // If the output interface is a struct, make sure names don't collide + + // If the interface is a struct, get of abi->struct_field mapping + + var abi2struct map[string]string if kind == reflect.Struct { - if err := requireUniqueStructFieldNames(arguments); err != nil { + var err error + abi2struct, err = mapAbiToStructFields(arguments, value) + if err != nil { return err } } @@ -123,9 +128,10 @@ func (arguments Arguments) unpackTuple(v interface{}, marshalledValues []interfa switch kind { case reflect.Struct: - err := unpackStruct(value, reflectValue, arg) - if err != nil { - return err + if structField, ok := abi2struct[arg.Name]; ok { + if err := set(value.FieldByName(structField), reflectValue, arg); err != nil { + return err + } } case reflect.Slice, reflect.Array: if value.Len() < i { @@ -151,17 +157,22 @@ func (arguments Arguments) unpackAtomic(v interface{}, marshalledValues []interf if len(marshalledValues) != 1 { return fmt.Errorf("abi: wrong length, expected single value, got %d", len(marshalledValues)) } + elem := reflect.ValueOf(v).Elem() kind := elem.Kind() reflectValue := reflect.ValueOf(marshalledValues[0]) + var abi2struct map[string]string if kind == reflect.Struct { - //make sure names don't collide - if err := requireUniqueStructFieldNames(arguments); err != nil { + var err error + if abi2struct, err = mapAbiToStructFields(arguments, elem); err != nil { return err } - - return unpackStruct(elem, reflectValue, arguments[0]) + arg := arguments.NonIndexed()[0] + if structField, ok := abi2struct[arg.Name]; ok { + return set(elem.FieldByName(structField), reflectValue, arg) + } + return nil } return set(elem, reflectValue, arguments.NonIndexed()[0]) @@ -277,18 +288,3 @@ func capitalise(input string) string { } return strings.ToUpper(input[:1]) + input[1:] } - -//unpackStruct extracts each argument into its corresponding struct field -func unpackStruct(value, reflectValue reflect.Value, arg Argument) error { - name := capitalise(arg.Name) - typ := value.Type() - for j := 0; j < typ.NumField(); j++ { - // TODO read tags: `abi:"fieldName"` - if typ.Field(j).Name == name { - if err := set(value.Field(j), reflectValue, arg); err != nil { - return err - } - } - } - return nil -} diff --git a/accounts/abi/bind/backends/simulated.go b/accounts/abi/bind/backends/simulated.go index 7b605e1c1..fd69538d5 100644 --- a/accounts/abi/bind/backends/simulated.go +++ b/accounts/abi/bind/backends/simulated.go @@ -454,7 +454,7 @@ func (fb *filterBackend) GetLogs(ctx context.Context, hash common.Hash) ([][]*ty return logs, nil } -func (fb *filterBackend) SubscribeTxPreEvent(ch chan<- core.TxPreEvent) event.Subscription { +func (fb *filterBackend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription { return event.NewSubscription(func(quit <-chan struct{}) error { <-quit return nil diff --git a/accounts/abi/event_test.go b/accounts/abi/event_test.go index cca61e433..3bfdd7c0a 100644 --- a/accounts/abi/event_test.go +++ b/accounts/abi/event_test.go @@ -58,12 +58,28 @@ var jsonEventPledge = []byte(`{ "type": "event" }`) +var jsonEventMixedCase = []byte(`{ + "anonymous": false, + "inputs": [{ + "indexed": false, "name": "value", "type": "uint256" + }, { + "indexed": false, "name": "_value", "type": "uint256" + }, { + "indexed": false, "name": "Value", "type": "uint256" + }], + "name": "MixedCase", + "type": "event" + }`) + // 1000000 var transferData1 = "00000000000000000000000000000000000000000000000000000000000f4240" // "0x00Ce0d46d924CC8437c806721496599FC3FFA268", 2218516807680, "usd" var pledgeData1 = "00000000000000000000000000ce0d46d924cc8437c806721496599fc3ffa2680000000000000000000000000000000000000000000000000000020489e800007573640000000000000000000000000000000000000000000000000000000000" +// 1000000,2218516807680,1000001 +var mixedCaseData1 = "00000000000000000000000000000000000000000000000000000000000f42400000000000000000000000000000000000000000000000000000020489e8000000000000000000000000000000000000000000000000000000000000000f4241" + func TestEventId(t *testing.T) { var table = []struct { definition string @@ -121,6 +137,27 @@ func TestEventTupleUnpack(t *testing.T) { Value *big.Int } + type EventTransferWithTag struct { + // this is valid because `value` is not exportable, + // so value is only unmarshalled into `Value1`. + value *big.Int + Value1 *big.Int `abi:"value"` + } + + type BadEventTransferWithSameFieldAndTag struct { + Value *big.Int + Value1 *big.Int `abi:"value"` + } + + type BadEventTransferWithDuplicatedTag struct { + Value1 *big.Int `abi:"value"` + Value2 *big.Int `abi:"value"` + } + + type BadEventTransferWithEmptyTag struct { + Value *big.Int `abi:""` + } + type EventPledge struct { Who common.Address Wad *big.Int @@ -133,9 +170,16 @@ func TestEventTupleUnpack(t *testing.T) { Currency [3]byte } + type EventMixedCase struct { + Value1 *big.Int `abi:"value"` + Value2 *big.Int `abi:"_value"` + Value3 *big.Int `abi:"Value"` + } + bigint := new(big.Int) bigintExpected := big.NewInt(1000000) bigintExpected2 := big.NewInt(2218516807680) + bigintExpected3 := big.NewInt(1000001) addr := common.HexToAddress("0x00Ce0d46d924CC8437c806721496599FC3FFA268") var testCases = []struct { data string @@ -159,6 +203,34 @@ func TestEventTupleUnpack(t *testing.T) { "", "Can unpack ERC20 Transfer event into slice", }, { + transferData1, + &EventTransferWithTag{}, + &EventTransferWithTag{Value1: bigintExpected}, + jsonEventTransfer, + "", + "Can unpack ERC20 Transfer event into structure with abi: tag", + }, { + transferData1, + &BadEventTransferWithDuplicatedTag{}, + &BadEventTransferWithDuplicatedTag{}, + jsonEventTransfer, + "struct: abi tag in 'Value2' already mapped", + "Can not unpack ERC20 Transfer event with duplicated abi tag", + }, { + transferData1, + &BadEventTransferWithSameFieldAndTag{}, + &BadEventTransferWithSameFieldAndTag{}, + jsonEventTransfer, + "abi: multiple variables maps to the same abi field 'value'", + "Can not unpack ERC20 Transfer event with a field and a tag mapping to the same abi variable", + }, { + transferData1, + &BadEventTransferWithEmptyTag{}, + &BadEventTransferWithEmptyTag{}, + jsonEventTransfer, + "struct: abi tag in 'Value' is empty", + "Can not unpack ERC20 Transfer event with an empty tag", + }, { pledgeData1, &EventPledge{}, &EventPledge{ @@ -216,6 +288,13 @@ func TestEventTupleUnpack(t *testing.T) { jsonEventPledge, "abi: cannot unmarshal tuple into map[string]interface {}", "Can not unpack Pledge event into map", + }, { + mixedCaseData1, + &EventMixedCase{}, + &EventMixedCase{Value1: bigintExpected, Value2: bigintExpected2, Value3: bigintExpected3}, + jsonEventMixedCase, + "", + "Can unpack abi variables with mixed case", }} for _, tc := range testCases { @@ -227,7 +306,7 @@ func TestEventTupleUnpack(t *testing.T) { assert.Nil(err, "Should be able to unpack event data.") assert.Equal(tc.expected, tc.dest, tc.name) } else { - assert.EqualError(err, tc.error) + assert.EqualError(err, tc.error, tc.name) } }) } diff --git a/accounts/abi/reflect.go b/accounts/abi/reflect.go index 5620a7084..0193517a4 100644 --- a/accounts/abi/reflect.go +++ b/accounts/abi/reflect.go @@ -19,6 +19,7 @@ package abi import ( "fmt" "reflect" + "strings" ) // indirect recursively dereferences the value until it either gets the value @@ -111,18 +112,101 @@ func requireUnpackKind(v reflect.Value, t reflect.Type, k reflect.Kind, return nil } -// requireUniqueStructFieldNames makes sure field names don't collide -func requireUniqueStructFieldNames(args Arguments) error { - exists := make(map[string]bool) +// mapAbiToStringField maps abi to struct fields. +// first round: for each Exportable field that contains a `abi:""` tag +// and this field name exists in the arguments, pair them together. +// second round: for each argument field that has not been already linked, +// find what variable is expected to be mapped into, if it exists and has not been +// used, pair them. +func mapAbiToStructFields(args Arguments, value reflect.Value) (map[string]string, error) { + + typ := value.Type() + + abi2struct := make(map[string]string) + struct2abi := make(map[string]string) + + // first round ~~~ + for i := 0; i < typ.NumField(); i++ { + structFieldName := typ.Field(i).Name + + // skip private struct fields. + if structFieldName[:1] != strings.ToUpper(structFieldName[:1]) { + continue + } + + // skip fields that have no abi:"" tag. + var ok bool + var tagName string + if tagName, ok = typ.Field(i).Tag.Lookup("abi"); !ok { + continue + } + + // check if tag is empty. + if tagName == "" { + return nil, fmt.Errorf("struct: abi tag in '%s' is empty", structFieldName) + } + + // check which argument field matches with the abi tag. + found := false + for _, abiField := range args.NonIndexed() { + if abiField.Name == tagName { + if abi2struct[abiField.Name] != "" { + return nil, fmt.Errorf("struct: abi tag in '%s' already mapped", structFieldName) + } + // pair them + abi2struct[abiField.Name] = structFieldName + struct2abi[structFieldName] = abiField.Name + found = true + } + } + + // check if this tag has been mapped. + if !found { + return nil, fmt.Errorf("struct: abi tag '%s' defined but not found in abi", tagName) + } + + } + + // second round ~~~ for _, arg := range args { - field := capitalise(arg.Name) - if field == "" { - return fmt.Errorf("abi: purely underscored output cannot unpack to struct") + + abiFieldName := arg.Name + structFieldName := capitalise(abiFieldName) + + if structFieldName == "" { + return nil, fmt.Errorf("abi: purely underscored output cannot unpack to struct") } - if exists[field] { - return fmt.Errorf("abi: multiple outputs mapping to the same struct field '%s'", field) + + // this abi has already been paired, skip it... unless there exists another, yet unassigned + // struct field with the same field name. If so, raise an error: + // abi: [ { "name": "value" } ] + // struct { Value *big.Int , Value1 *big.Int `abi:"value"`} + if abi2struct[abiFieldName] != "" { + if abi2struct[abiFieldName] != structFieldName && + struct2abi[structFieldName] == "" && + value.FieldByName(structFieldName).IsValid() { + return nil, fmt.Errorf("abi: multiple variables maps to the same abi field '%s'", abiFieldName) + } + continue } - exists[field] = true + + // return an error if this struct field has already been paired. + if struct2abi[structFieldName] != "" { + return nil, fmt.Errorf("abi: multiple outputs mapping to the same struct field '%s'", structFieldName) + } + + if value.FieldByName(structFieldName).IsValid() { + // pair them + abi2struct[abiFieldName] = structFieldName + struct2abi[structFieldName] = abiFieldName + } else { + // not paired, but annotate as used, to detect cases like + // abi : [ { "name": "value" }, { "name": "_value" } ] + // struct { Value *big.Int } + struct2abi[structFieldName] = abiFieldName + } + } - return nil + + return abi2struct, nil } diff --git a/appveyor.yml b/appveyor.yml index 141ef16ff..f10000af8 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -23,8 +23,8 @@ environment: install: - git submodule update --init - rmdir C:\go /s /q - - appveyor DownloadFile https://storage.googleapis.com/golang/go1.10.1.windows-%GETH_ARCH%.zip - - 7z x go1.10.1.windows-%GETH_ARCH%.zip -y -oC:\ > NUL + - appveyor DownloadFile https://storage.googleapis.com/golang/go1.10.2.windows-%GETH_ARCH%.zip + - 7z x go1.10.2.windows-%GETH_ARCH%.zip -y -oC:\ > NUL - go version - gcc --version diff --git a/core/events.go b/core/events.go index 6f404f612..8d200f2a2 100644 --- a/core/events.go +++ b/core/events.go @@ -21,8 +21,8 @@ import ( "github.com/ethereum/go-ethereum/core/types" ) -// TxPreEvent is posted when a transaction enters the transaction pool. -type TxPreEvent struct{ Tx *types.Transaction } +// NewTxsEvent is posted when a batch of transactions enter the transaction pool. +type NewTxsEvent struct{ Txs []*types.Transaction } // PendingLogsEvent is posted pre mining and notifies of pending logs. type PendingLogsEvent struct { @@ -35,9 +35,6 @@ type PendingStateEvent struct{} // NewMinedBlockEvent is posted when a block has been imported. type NewMinedBlockEvent struct{ Block *types.Block } -// RemovedTransactionEvent is posted when a reorg happens -type RemovedTransactionEvent struct{ Txs types.Transactions } - // RemovedLogsEvent is posted when a reorg happens type RemovedLogsEvent struct{ Logs []*types.Log } diff --git a/core/tx_journal.go b/core/tx_journal.go index e872d7b53..1397e9fd3 100644 --- a/core/tx_journal.go +++ b/core/tx_journal.go @@ -56,7 +56,7 @@ func newTxJournal(path string) *txJournal { // load parses a transaction journal dump from disk, loading its contents into // the specified pool. -func (journal *txJournal) load(add func(*types.Transaction) error) error { +func (journal *txJournal) load(add func([]*types.Transaction) []error) error { // Skip the parsing if the journal file doens't exist at all if _, err := os.Stat(journal.path); os.IsNotExist(err) { return nil @@ -76,7 +76,21 @@ func (journal *txJournal) load(add func(*types.Transaction) error) error { stream := rlp.NewStream(input, 0) total, dropped := 0, 0 - var failure error + // Create a method to load a limited batch of transactions and bump the + // appropriate progress counters. Then use this method to load all the + // journalled transactions in small-ish batches. + loadBatch := func(txs types.Transactions) { + for _, err := range add(txs) { + if err != nil { + log.Debug("Failed to add journaled transaction", "err", err) + dropped++ + } + } + } + var ( + failure error + batch types.Transactions + ) for { // Parse the next transaction and terminate on error tx := new(types.Transaction) @@ -84,14 +98,17 @@ func (journal *txJournal) load(add func(*types.Transaction) error) error { if err != io.EOF { failure = err } + if batch.Len() > 0 { + loadBatch(batch) + } break } - // Import the transaction and bump the appropriate progress counters + // New transaction parsed, queue up for later, import if threnshold is reached total++ - if err = add(tx); err != nil { - log.Debug("Failed to add journaled transaction", "err", err) - dropped++ - continue + + if batch = append(batch, tx); batch.Len() > 1024 { + loadBatch(batch) + batch = batch[:0] } } log.Info("Loaded local transaction journal", "transactions", total, "dropped", dropped) diff --git a/core/tx_pool.go b/core/tx_pool.go index 388b40058..f89e11441 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -238,7 +238,7 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block if !config.NoLocals && config.Journal != "" { pool.journal = newTxJournal(config.Journal) - if err := pool.journal.load(pool.AddLocal); err != nil { + if err := pool.journal.load(pool.AddLocals); err != nil { log.Warn("Failed to load transaction journal", "err", err) } if err := pool.journal.rotate(pool.local()); err != nil { @@ -444,9 +444,9 @@ func (pool *TxPool) Stop() { log.Info("Transaction pool stopped") } -// SubscribeTxPreEvent registers a subscription of TxPreEvent and +// SubscribeNewTxsEvent registers a subscription of NewTxsEvent and // starts sending event to the given channel. -func (pool *TxPool) SubscribeTxPreEvent(ch chan<- TxPreEvent) event.Subscription { +func (pool *TxPool) SubscribeNewTxsEvent(ch chan<- NewTxsEvent) event.Subscription { return pool.scope.Track(pool.txFeed.Subscribe(ch)) } @@ -653,7 +653,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) { log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To()) // We've directly injected a replacement transaction, notify subsystems - go pool.txFeed.Send(TxPreEvent{tx}) + go pool.txFeed.Send(NewTxsEvent{types.Transactions{tx}}) return old != nil, nil } @@ -712,10 +712,11 @@ func (pool *TxPool) journalTx(from common.Address, tx *types.Transaction) { } } -// promoteTx adds a transaction to the pending (processable) list of transactions. +// promoteTx adds a transaction to the pending (processable) list of transactions +// and returns whether it was inserted or an older was better. // // Note, this method assumes the pool lock is held! -func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.Transaction) { +func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.Transaction) bool { // Try to insert the transaction into the pending queue if pool.pending[addr] == nil { pool.pending[addr] = newTxList(true) @@ -729,7 +730,7 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T pool.priced.Removed() pendingDiscardCounter.Inc(1) - return + return false } // Otherwise discard any previous transaction and mark this if old != nil { @@ -747,7 +748,7 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T pool.beats[addr] = time.Now() pool.pendingState.SetNonce(addr, tx.Nonce()+1) - go pool.txFeed.Send(TxPreEvent{tx}) + return true } // AddLocal enqueues a single transaction into the pool if it is valid, marking @@ -907,6 +908,9 @@ func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) { // future queue to the set of pending transactions. During this process, all // invalidated transactions (low nonce, low balance) are deleted. func (pool *TxPool) promoteExecutables(accounts []common.Address) { + // Track the promoted transactions to broadcast them at once + var promoted []*types.Transaction + // Gather all the accounts potentially needing updates if accounts == nil { accounts = make([]common.Address, 0, len(pool.queue)) @@ -939,8 +943,10 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) { // Gather all executable transactions and promote them for _, tx := range list.Ready(pool.pendingState.GetNonce(addr)) { hash := tx.Hash() - log.Trace("Promoting queued transaction", "hash", hash) - pool.promoteTx(addr, hash, tx) + if pool.promoteTx(addr, hash, tx) { + log.Trace("Promoting queued transaction", "hash", hash) + promoted = append(promoted, tx) + } } // Drop all transactions over the allowed limit if !pool.locals.contains(addr) { @@ -957,6 +963,10 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) { delete(pool.queue, addr) } } + // Notify subsystem for new promoted transactions. + if len(promoted) > 0 { + pool.txFeed.Send(NewTxsEvent{promoted}) + } // If the pending limit is overflown, start equalizing allowances pending := uint64(0) for _, list := range pool.pending { diff --git a/core/tx_pool_test.go b/core/tx_pool_test.go index e7f52075e..25993c258 100644 --- a/core/tx_pool_test.go +++ b/core/tx_pool_test.go @@ -118,21 +118,27 @@ func validateTxPoolInternals(pool *TxPool) error { // validateEvents checks that the correct number of transaction addition events // were fired on the pool's event feed. -func validateEvents(events chan TxPreEvent, count int) error { - for i := 0; i < count; i++ { +func validateEvents(events chan NewTxsEvent, count int) error { + var received []*types.Transaction + + for len(received) < count { select { - case <-events: + case ev := <-events: + received = append(received, ev.Txs...) case <-time.After(time.Second): - return fmt.Errorf("event #%d not fired", i) + return fmt.Errorf("event #%d not fired", received) } } + if len(received) > count { + return fmt.Errorf("more than %d events fired: %v", count, received[count:]) + } select { - case tx := <-events: - return fmt.Errorf("more than %d events fired: %v", count, tx.Tx) + case ev := <-events: + return fmt.Errorf("more than %d events fired: %v", count, ev.Txs) case <-time.After(50 * time.Millisecond): // This branch should be "default", but it's a data race between goroutines, - // reading the event channel and pushng into it, so better wait a bit ensuring + // reading the event channel and pushing into it, so better wait a bit ensuring // really nothing gets injected. } return nil @@ -669,7 +675,7 @@ func TestTransactionGapFilling(t *testing.T) { pool.currentState.AddBalance(account, big.NewInt(1000000)) // Keep track of transaction events to ensure all executables get announced - events := make(chan TxPreEvent, testTxPoolConfig.AccountQueue+5) + events := make(chan NewTxsEvent, testTxPoolConfig.AccountQueue+5) sub := pool.txFeed.Subscribe(events) defer sub.Unsubscribe() @@ -920,7 +926,7 @@ func TestTransactionPendingLimiting(t *testing.T) { pool.currentState.AddBalance(account, big.NewInt(1000000)) // Keep track of transaction events to ensure all executables get announced - events := make(chan TxPreEvent, testTxPoolConfig.AccountQueue+5) + events := make(chan NewTxsEvent, testTxPoolConfig.AccountQueue+5) sub := pool.txFeed.Subscribe(events) defer sub.Unsubscribe() @@ -1140,7 +1146,7 @@ func TestTransactionPoolRepricing(t *testing.T) { defer pool.Stop() // Keep track of transaction events to ensure all executables get announced - events := make(chan TxPreEvent, 32) + events := make(chan NewTxsEvent, 32) sub := pool.txFeed.Subscribe(events) defer sub.Unsubscribe() @@ -1327,7 +1333,7 @@ func TestTransactionPoolUnderpricing(t *testing.T) { defer pool.Stop() // Keep track of transaction events to ensure all executables get announced - events := make(chan TxPreEvent, 32) + events := make(chan NewTxsEvent, 32) sub := pool.txFeed.Subscribe(events) defer sub.Unsubscribe() @@ -1433,7 +1439,7 @@ func TestTransactionPoolStableUnderpricing(t *testing.T) { defer pool.Stop() // Keep track of transaction events to ensure all executables get announced - events := make(chan TxPreEvent, 32) + events := make(chan NewTxsEvent, 32) sub := pool.txFeed.Subscribe(events) defer sub.Unsubscribe() @@ -1495,7 +1501,7 @@ func TestTransactionReplacement(t *testing.T) { defer pool.Stop() // Keep track of transaction events to ensure all executables get announced - events := make(chan TxPreEvent, 32) + events := make(chan NewTxsEvent, 32) sub := pool.txFeed.Subscribe(events) defer sub.Unsubscribe() diff --git a/eth/api_backend.go b/eth/api_backend.go index 4ace9b594..91a7dc7e0 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -188,8 +188,8 @@ func (b *EthAPIBackend) TxPoolContent() (map[common.Address]types.Transactions, return b.eth.TxPool().Content() } -func (b *EthAPIBackend) SubscribeTxPreEvent(ch chan<- core.TxPreEvent) event.Subscription { - return b.eth.TxPool().SubscribeTxPreEvent(ch) +func (b *EthAPIBackend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription { + return b.eth.TxPool().SubscribeNewTxsEvent(ch) } func (b *EthAPIBackend) Downloader() *downloader.Downloader { diff --git a/eth/filters/api.go b/eth/filters/api.go index 1297b7478..592ad3b82 100644 --- a/eth/filters/api.go +++ b/eth/filters/api.go @@ -104,8 +104,8 @@ func (api *PublicFilterAPI) timeoutLoop() { // https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newpendingtransactionfilter func (api *PublicFilterAPI) NewPendingTransactionFilter() rpc.ID { var ( - pendingTxs = make(chan common.Hash) - pendingTxSub = api.events.SubscribePendingTxEvents(pendingTxs) + pendingTxs = make(chan []common.Hash) + pendingTxSub = api.events.SubscribePendingTxs(pendingTxs) ) api.filtersMu.Lock() @@ -118,7 +118,7 @@ func (api *PublicFilterAPI) NewPendingTransactionFilter() rpc.ID { case ph := <-pendingTxs: api.filtersMu.Lock() if f, found := api.filters[pendingTxSub.ID]; found { - f.hashes = append(f.hashes, ph) + f.hashes = append(f.hashes, ph...) } api.filtersMu.Unlock() case <-pendingTxSub.Err(): @@ -144,13 +144,17 @@ func (api *PublicFilterAPI) NewPendingTransactions(ctx context.Context) (*rpc.Su rpcSub := notifier.CreateSubscription() go func() { - txHashes := make(chan common.Hash) - pendingTxSub := api.events.SubscribePendingTxEvents(txHashes) + txHashes := make(chan []common.Hash, 128) + pendingTxSub := api.events.SubscribePendingTxs(txHashes) for { select { - case h := <-txHashes: - notifier.Notify(rpcSub.ID, h) + case hashes := <-txHashes: + // To keep the original behaviour, send a single tx hash in one notification. + // TODO(rjl493456442) Send a batch of tx hashes in one notification + for _, h := range hashes { + notifier.Notify(rpcSub.ID, h) + } case <-rpcSub.Err(): pendingTxSub.Unsubscribe() return diff --git a/eth/filters/filter.go b/eth/filters/filter.go index 5dfe60e77..67b4612ae 100644 --- a/eth/filters/filter.go +++ b/eth/filters/filter.go @@ -36,7 +36,7 @@ type Backend interface { GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error) GetLogs(ctx context.Context, blockHash common.Hash) ([][]*types.Log, error) - SubscribeTxPreEvent(chan<- core.TxPreEvent) event.Subscription + SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index bb11734a7..4e999cda8 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -59,7 +59,7 @@ const ( const ( - // txChanSize is the size of channel listening to TxPreEvent. + // txChanSize is the size of channel listening to NewTxsEvent. // The number is referenced from the size of tx pool. txChanSize = 4096 // rmLogsChanSize is the size of channel listening to RemovedLogsEvent. @@ -80,7 +80,7 @@ type subscription struct { created time.Time logsCrit ethereum.FilterQuery logs chan []*types.Log - hashes chan common.Hash + hashes chan []common.Hash headers chan *types.Header installed chan struct{} // closed when the filter is installed err chan error // closed when the filter is uninstalled @@ -95,7 +95,7 @@ type EventSystem struct { lastHead *types.Header // Subscriptions - txSub event.Subscription // Subscription for new transaction event + txsSub event.Subscription // Subscription for new transaction event logsSub event.Subscription // Subscription for new log event rmLogsSub event.Subscription // Subscription for removed log event chainSub event.Subscription // Subscription for new chain event @@ -104,7 +104,7 @@ type EventSystem struct { // Channels install chan *subscription // install filter for event notification uninstall chan *subscription // remove filter for event notification - txCh chan core.TxPreEvent // Channel to receive new transaction event + txsCh chan core.NewTxsEvent // Channel to receive new transactions event logsCh chan []*types.Log // Channel to receive new log event rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event chainCh chan core.ChainEvent // Channel to receive new chain event @@ -123,14 +123,14 @@ func NewEventSystem(mux *event.TypeMux, backend Backend, lightMode bool) *EventS lightMode: lightMode, install: make(chan *subscription), uninstall: make(chan *subscription), - txCh: make(chan core.TxPreEvent, txChanSize), + txsCh: make(chan core.NewTxsEvent, txChanSize), logsCh: make(chan []*types.Log, logsChanSize), rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize), chainCh: make(chan core.ChainEvent, chainEvChanSize), } // Subscribe events - m.txSub = m.backend.SubscribeTxPreEvent(m.txCh) + m.txsSub = m.backend.SubscribeNewTxsEvent(m.txsCh) m.logsSub = m.backend.SubscribeLogsEvent(m.logsCh) m.rmLogsSub = m.backend.SubscribeRemovedLogsEvent(m.rmLogsCh) m.chainSub = m.backend.SubscribeChainEvent(m.chainCh) @@ -138,7 +138,7 @@ func NewEventSystem(mux *event.TypeMux, backend Backend, lightMode bool) *EventS m.pendingLogSub = m.mux.Subscribe(core.PendingLogsEvent{}) // Make sure none of the subscriptions are empty - if m.txSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil || + if m.txsSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil || m.pendingLogSub.Closed() { log.Crit("Subscribe for event system failed") } @@ -240,7 +240,7 @@ func (es *EventSystem) subscribeMinedPendingLogs(crit ethereum.FilterQuery, logs logsCrit: crit, created: time.Now(), logs: logs, - hashes: make(chan common.Hash), + hashes: make(chan []common.Hash), headers: make(chan *types.Header), installed: make(chan struct{}), err: make(chan error), @@ -257,7 +257,7 @@ func (es *EventSystem) subscribeLogs(crit ethereum.FilterQuery, logs chan []*typ logsCrit: crit, created: time.Now(), logs: logs, - hashes: make(chan common.Hash), + hashes: make(chan []common.Hash), headers: make(chan *types.Header), installed: make(chan struct{}), err: make(chan error), @@ -274,7 +274,7 @@ func (es *EventSystem) subscribePendingLogs(crit ethereum.FilterQuery, logs chan logsCrit: crit, created: time.Now(), logs: logs, - hashes: make(chan common.Hash), + hashes: make(chan []common.Hash), headers: make(chan *types.Header), installed: make(chan struct{}), err: make(chan error), @@ -290,7 +290,7 @@ func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscripti typ: BlocksSubscription, created: time.Now(), logs: make(chan []*types.Log), - hashes: make(chan common.Hash), + hashes: make(chan []common.Hash), headers: headers, installed: make(chan struct{}), err: make(chan error), @@ -298,9 +298,9 @@ func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscripti return es.subscribe(sub) } -// SubscribePendingTxEvents creates a subscription that writes transaction hashes for +// SubscribePendingTxs creates a subscription that writes transaction hashes for // transactions that enter the transaction pool. -func (es *EventSystem) SubscribePendingTxEvents(hashes chan common.Hash) *Subscription { +func (es *EventSystem) SubscribePendingTxs(hashes chan []common.Hash) *Subscription { sub := &subscription{ id: rpc.NewID(), typ: PendingTransactionsSubscription, @@ -348,9 +348,13 @@ func (es *EventSystem) broadcast(filters filterIndex, ev interface{}) { } } } - case core.TxPreEvent: + case core.NewTxsEvent: + hashes := make([]common.Hash, 0, len(e.Txs)) + for _, tx := range e.Txs { + hashes = append(hashes, tx.Hash()) + } for _, f := range filters[PendingTransactionsSubscription] { - f.hashes <- e.Tx.Hash() + f.hashes <- hashes } case core.ChainEvent: for _, f := range filters[BlocksSubscription] { @@ -446,7 +450,7 @@ func (es *EventSystem) eventLoop() { // Ensure all subscriptions get cleaned up defer func() { es.pendingLogSub.Unsubscribe() - es.txSub.Unsubscribe() + es.txsSub.Unsubscribe() es.logsSub.Unsubscribe() es.rmLogsSub.Unsubscribe() es.chainSub.Unsubscribe() @@ -460,7 +464,7 @@ func (es *EventSystem) eventLoop() { for { select { // Handle subscribed events - case ev := <-es.txCh: + case ev := <-es.txsCh: es.broadcast(index, ev) case ev := <-es.logsCh: es.broadcast(index, ev) @@ -495,7 +499,7 @@ func (es *EventSystem) eventLoop() { close(f.err) // System stopped - case <-es.txSub.Err(): + case <-es.txsSub.Err(): return case <-es.logsSub.Err(): return diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go index b4df24b47..ff1af85a8 100644 --- a/eth/filters/filter_system_test.go +++ b/eth/filters/filter_system_test.go @@ -96,7 +96,7 @@ func (b *testBackend) GetLogs(ctx context.Context, hash common.Hash) ([][]*types return logs, nil } -func (b *testBackend) SubscribeTxPreEvent(ch chan<- core.TxPreEvent) event.Subscription { +func (b *testBackend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription { return b.txFeed.Subscribe(ch) } @@ -232,10 +232,7 @@ func TestPendingTxFilter(t *testing.T) { fid0 := api.NewPendingTransactionFilter() time.Sleep(1 * time.Second) - for _, tx := range transactions { - ev := core.TxPreEvent{Tx: tx} - txFeed.Send(ev) - } + txFeed.Send(core.NewTxsEvent{Txs: transactions}) timeout := time.Now().Add(1 * time.Second) for { diff --git a/eth/handler.go b/eth/handler.go index c8f7e13f1..8993afe15 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -46,7 +46,7 @@ const ( softResponseLimit = 2 * 1024 * 1024 // Target maximum size of returned blocks, headers or node data. estHeaderRlpSize = 500 // Approximate size of an RLP encoded block header - // txChanSize is the size of channel listening to TxPreEvent. + // txChanSize is the size of channel listening to NewTxsEvent. // The number is referenced from the size of tx pool. txChanSize = 4096 ) @@ -81,8 +81,8 @@ type ProtocolManager struct { SubProtocols []p2p.Protocol eventMux *event.TypeMux - txCh chan core.TxPreEvent - txSub event.Subscription + txsCh chan core.NewTxsEvent + txsSub event.Subscription minedBlockSub *event.TypeMuxSubscription // channels for fetcher, syncer, txsyncLoop @@ -204,8 +204,8 @@ func (pm *ProtocolManager) Start(maxPeers int) { pm.maxPeers = maxPeers // broadcast transactions - pm.txCh = make(chan core.TxPreEvent, txChanSize) - pm.txSub = pm.txpool.SubscribeTxPreEvent(pm.txCh) + pm.txsCh = make(chan core.NewTxsEvent, txChanSize) + pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh) go pm.txBroadcastLoop() // broadcast mined blocks @@ -220,7 +220,7 @@ func (pm *ProtocolManager) Start(maxPeers int) { func (pm *ProtocolManager) Stop() { log.Info("Stopping Ethereum protocol") - pm.txSub.Unsubscribe() // quits txBroadcastLoop + pm.txsSub.Unsubscribe() // quits txBroadcastLoop pm.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop // Quit the sync loop. @@ -712,16 +712,23 @@ func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) { } } -// BroadcastTx will propagate a transaction to all peers which are not known to +// BroadcastTxs will propagate a batch of transactions to all peers which are not known to // already have the given transaction. -func (pm *ProtocolManager) BroadcastTx(hash common.Hash, tx *types.Transaction) { - // Broadcast transaction to a batch of peers not knowing about it - peers := pm.peers.PeersWithoutTx(hash) - //FIXME include this again: peers = peers[:int(math.Sqrt(float64(len(peers))))] - for _, peer := range peers { - peer.SendTransactions(types.Transactions{tx}) +func (pm *ProtocolManager) BroadcastTxs(txs types.Transactions) { + var txset = make(map[*peer]types.Transactions) + + // Broadcast transactions to a batch of peers not knowing about it + for _, tx := range txs { + peers := pm.peers.PeersWithoutTx(tx.Hash()) + for _, peer := range peers { + txset[peer] = append(txset[peer], tx) + } + log.Trace("Broadcast transaction", "hash", tx.Hash(), "recipients", len(peers)) + } + // FIXME include this again: peers = peers[:int(math.Sqrt(float64(len(peers))))] + for peer, txs := range txset { + peer.SendTransactions(txs) } - log.Trace("Broadcast transaction", "hash", hash, "recipients", len(peers)) } // Mined broadcast loop @@ -739,11 +746,11 @@ func (pm *ProtocolManager) minedBroadcastLoop() { func (pm *ProtocolManager) txBroadcastLoop() { for { select { - case event := <-pm.txCh: - pm.BroadcastTx(event.Tx.Hash(), event.Tx) + case event := <-pm.txsCh: + pm.BroadcastTxs(event.Txs) // Err() channel will be closed when unsubscribing. - case <-pm.txSub.Err(): + case <-pm.txsSub.Err(): return } } diff --git a/eth/helper_test.go b/eth/helper_test.go index 8a0260fc9..3d2ab0aba 100644 --- a/eth/helper_test.go +++ b/eth/helper_test.go @@ -124,7 +124,7 @@ func (p *testTxPool) Pending() (map[common.Address]types.Transactions, error) { return batches, nil } -func (p *testTxPool) SubscribeTxPreEvent(ch chan<- core.TxPreEvent) event.Subscription { +func (p *testTxPool) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription { return p.txFeed.Subscribe(ch) } diff --git a/eth/protocol.go b/eth/protocol.go index 328d5b993..0e90e6a2e 100644 --- a/eth/protocol.go +++ b/eth/protocol.go @@ -103,9 +103,9 @@ type txPool interface { // The slice should be modifiable by the caller. Pending() (map[common.Address]types.Transactions, error) - // SubscribeTxPreEvent should return an event subscription of - // TxPreEvent and send events to the given channel. - SubscribeTxPreEvent(chan<- core.TxPreEvent) event.Subscription + // SubscribeNewTxsEvent should return an event subscription of + // NewTxsEvent and send events to the given channel. + SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription } // statusData is the network packet for the status message. diff --git a/eth/protocol_test.go b/eth/protocol_test.go index b2f93d8dd..aa43dfa92 100644 --- a/eth/protocol_test.go +++ b/eth/protocol_test.go @@ -116,7 +116,7 @@ func testRecvTransactions(t *testing.T, protocol int) { t.Errorf("added wrong tx hash: got %v, want %v", added[0].Hash(), tx.Hash()) } case <-time.After(2 * time.Second): - t.Errorf("no TxPreEvent received within 2 seconds") + t.Errorf("no NewTxsEvent received within 2 seconds") } } diff --git a/ethstats/ethstats.go b/ethstats/ethstats.go index a15d84615..c11601435 100644 --- a/ethstats/ethstats.go +++ b/ethstats/ethstats.go @@ -49,7 +49,7 @@ const ( // history request. historyUpdateRange = 50 - // txChanSize is the size of channel listening to TxPreEvent. + // txChanSize is the size of channel listening to NewTxsEvent. // The number is referenced from the size of tx pool. txChanSize = 4096 // chainHeadChanSize is the size of channel listening to ChainHeadEvent. @@ -57,9 +57,9 @@ const ( ) type txPool interface { - // SubscribeTxPreEvent should return an event subscription of - // TxPreEvent and send events to the given channel. - SubscribeTxPreEvent(chan<- core.TxPreEvent) event.Subscription + // SubscribeNewTxsEvent should return an event subscription of + // NewTxsEvent and send events to the given channel. + SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription } type blockChain interface { @@ -150,8 +150,8 @@ func (s *Service) loop() { headSub := blockchain.SubscribeChainHeadEvent(chainHeadCh) defer headSub.Unsubscribe() - txEventCh := make(chan core.TxPreEvent, txChanSize) - txSub := txpool.SubscribeTxPreEvent(txEventCh) + txEventCh := make(chan core.NewTxsEvent, txChanSize) + txSub := txpool.SubscribeNewTxsEvent(txEventCh) defer txSub.Unsubscribe() // Start a goroutine that exhausts the subsciptions to avoid events piling up diff --git a/internal/ethapi/backend.go b/internal/ethapi/backend.go index af95d7906..c9ffe230c 100644 --- a/internal/ethapi/backend.go +++ b/internal/ethapi/backend.go @@ -65,7 +65,7 @@ type Backend interface { GetPoolNonce(ctx context.Context, addr common.Address) (uint64, error) Stats() (pending int, queued int) TxPoolContent() (map[common.Address]types.Transactions, map[common.Address]types.Transactions) - SubscribeTxPreEvent(chan<- core.TxPreEvent) event.Subscription + SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription ChainConfig() *params.ChainConfig CurrentBlock() *types.Block diff --git a/les/api_backend.go b/les/api_backend.go index 1d3c99513..dea33c470 100644 --- a/les/api_backend.go +++ b/les/api_backend.go @@ -136,8 +136,8 @@ func (b *LesApiBackend) TxPoolContent() (map[common.Address]types.Transactions, return b.eth.txPool.Content() } -func (b *LesApiBackend) SubscribeTxPreEvent(ch chan<- core.TxPreEvent) event.Subscription { - return b.eth.txPool.SubscribeTxPreEvent(ch) +func (b *LesApiBackend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription { + return b.eth.txPool.SubscribeNewTxsEvent(ch) } func (b *LesApiBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription { diff --git a/light/postprocess.go b/light/postprocess.go index 8b9a02911..c06c18027 100644 --- a/light/postprocess.go +++ b/light/postprocess.go @@ -59,18 +59,18 @@ type trustedCheckpoint struct { var ( mainnetCheckpoint = trustedCheckpoint{ name: "mainnet", - sectionIdx: 165, - sectionHead: common.HexToHash("21028acf9cd9ce80257221adc437c3c58ce046c4d43c21c3e9b1d1349059ec73"), - chtRoot: common.HexToHash("26b2458cb7d0080d3a39311c914be92c368777a65ec074e1893b8bdc79e3910a"), - bloomTrieRoot: common.HexToHash("5d06908769179186165a72db7fc3473b25c28ed27efe78a392a9ff2c3fa67f84"), + sectionIdx: 170, + sectionHead: common.HexToHash("3bb2c28bcce463d57968f14f56cdb3fbf35349ab7a701f44c1afb57349c9a356"), + chtRoot: common.HexToHash("d92b6d0853455f8439086292338e87f69781921680dd7aa072fb71547b87415e"), + bloomTrieRoot: common.HexToHash("e4e8250a2fefddead7ae42daecd848cbf9b66d748a8270f8bbd4370b764bb9e9"), } ropstenCheckpoint = trustedCheckpoint{ name: "ropsten", - sectionIdx: 92, - sectionHead: common.HexToHash("21a158f9cc643da13a237cafceb37381072649f7278cf98c5820bfbced7cfcec"), - chtRoot: common.HexToHash("1a8ddb8b086d7a33ca90eea90730225948fa504ae0283b15aff3c15c0e089bf9"), - bloomTrieRoot: common.HexToHash("fd192f92afbcdd0020c81ca0625116b5995509659653b10123bd986fe5129cc1"), + sectionIdx: 97, + sectionHead: common.HexToHash("719448c67c01eb5b9f27833a36a4e34612f66801316d7ff37daf9e77fb4cd095"), + chtRoot: common.HexToHash("a7857afc15930ca6e583b6c3d563a025144011655843d52d28e2fdaadd417bea"), + bloomTrieRoot: common.HexToHash("9c71d4b50cbec86dfeaa8e08992de8a4667b81d13c54d6522b17ce2fc5d36416"), } ) diff --git a/light/txpool.go b/light/txpool.go index 94c8139cb..1fabc3dc5 100644 --- a/light/txpool.go +++ b/light/txpool.go @@ -321,9 +321,9 @@ func (pool *TxPool) Stop() { log.Info("Transaction pool stopped") } -// SubscribeTxPreEvent registers a subscription of core.TxPreEvent and +// SubscribeNewTxsEvent registers a subscription of core.NewTxsEvent and // starts sending event to the given channel. -func (pool *TxPool) SubscribeTxPreEvent(ch chan<- core.TxPreEvent) event.Subscription { +func (pool *TxPool) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription { return pool.scope.Track(pool.txFeed.Subscribe(ch)) } @@ -412,7 +412,7 @@ func (self *TxPool) add(ctx context.Context, tx *types.Transaction) error { // Notify the subscribers. This event is posted in a goroutine // because it's possible that somewhere during the post "Remove transaction" // gets called which will then wait for the global tx pool lock and deadlock. - go self.txFeed.Send(core.TxPreEvent{Tx: tx}) + go self.txFeed.Send(core.NewTxsEvent{Txs: types.Transactions{tx}}) } // Print a log message if low enough level is set diff --git a/miner/worker.go b/miner/worker.go index 48b0b2765..640e9032e 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -42,7 +42,7 @@ const ( resultQueueSize = 10 miningLogAtDepth = 5 - // txChanSize is the size of channel listening to TxPreEvent. + // txChanSize is the size of channel listening to NewTxsEvent. // The number is referenced from the size of tx pool. txChanSize = 4096 // chainHeadChanSize is the size of channel listening to ChainHeadEvent. @@ -71,6 +71,7 @@ type Work struct { family *set.Set // family set (used for checking uncle invalidity) uncles *set.Set // uncle set tcount int // tx count in cycle + gasPool *core.GasPool // available gas used to pack transactions Block *types.Block // the new block @@ -95,8 +96,8 @@ type worker struct { // update loop mux *event.TypeMux - txCh chan core.TxPreEvent - txSub event.Subscription + txsCh chan core.NewTxsEvent + txsSub event.Subscription chainHeadCh chan core.ChainHeadEvent chainHeadSub event.Subscription chainSideCh chan core.ChainSideEvent @@ -137,7 +138,7 @@ func newWorker(config *params.ChainConfig, engine consensus.Engine, coinbase com engine: engine, eth: eth, mux: mux, - txCh: make(chan core.TxPreEvent, txChanSize), + txsCh: make(chan core.NewTxsEvent, txChanSize), chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize), chainSideCh: make(chan core.ChainSideEvent, chainSideChanSize), chainDb: eth.ChainDb(), @@ -149,8 +150,8 @@ func newWorker(config *params.ChainConfig, engine consensus.Engine, coinbase com agents: make(map[Agent]struct{}), unconfirmed: newUnconfirmedBlocks(eth.BlockChain(), miningLogAtDepth), } - // Subscribe TxPreEvent for tx pool - worker.txSub = eth.TxPool().SubscribeTxPreEvent(worker.txCh) + // Subscribe NewTxsEvent for tx pool + worker.txsSub = eth.TxPool().SubscribeNewTxsEvent(worker.txsCh) // Subscribe events for blockchain worker.chainHeadSub = eth.BlockChain().SubscribeChainHeadEvent(worker.chainHeadCh) worker.chainSideSub = eth.BlockChain().SubscribeChainSideEvent(worker.chainSideCh) @@ -241,7 +242,7 @@ func (self *worker) unregister(agent Agent) { } func (self *worker) update() { - defer self.txSub.Unsubscribe() + defer self.txsSub.Unsubscribe() defer self.chainHeadSub.Unsubscribe() defer self.chainSideSub.Unsubscribe() @@ -258,15 +259,21 @@ func (self *worker) update() { self.possibleUncles[ev.Block.Hash()] = ev.Block self.uncleMu.Unlock() - // Handle TxPreEvent - case ev := <-self.txCh: - // Apply transaction to the pending state if we're not mining + // Handle NewTxsEvent + case ev := <-self.txsCh: + // Apply transactions to the pending state if we're not mining. + // + // Note all transactions received may not be continuous with transactions + // already included in the current mining block. These transactions will + // be automatically eliminated. if atomic.LoadInt32(&self.mining) == 0 { self.currentMu.Lock() - acc, _ := types.Sender(self.current.signer, ev.Tx) - txs := map[common.Address]types.Transactions{acc: {ev.Tx}} + txs := make(map[common.Address]types.Transactions) + for _, tx := range ev.Txs { + acc, _ := types.Sender(self.current.signer, tx) + txs[acc] = append(txs[acc], tx) + } txset := types.NewTransactionsByPriceAndNonce(self.current.signer, txs) - self.current.commitTransactions(self.mux, txset, self.chain, self.coinbase) self.updateSnapshot() self.currentMu.Unlock() @@ -278,7 +285,7 @@ func (self *worker) update() { } // System stopped - case <-self.txSub.Err(): + case <-self.txsSub.Err(): return case <-self.chainHeadSub.Err(): return @@ -522,14 +529,16 @@ func (self *worker) updateSnapshot() { } func (env *Work) commitTransactions(mux *event.TypeMux, txs *types.TransactionsByPriceAndNonce, bc *core.BlockChain, coinbase common.Address) { - gp := new(core.GasPool).AddGas(env.header.GasLimit) + if env.gasPool == nil { + env.gasPool = new(core.GasPool).AddGas(env.header.GasLimit) + } var coalescedLogs []*types.Log for { // If we don't have enough gas for any further transactions then we're done - if gp.Gas() < params.TxGas { - log.Trace("Not enough gas for further transactions", "gp", gp) + if env.gasPool.Gas() < params.TxGas { + log.Trace("Not enough gas for further transactions", "have", env.gasPool, "want", params.TxGas) break } // Retrieve the next transaction and abort if all done @@ -553,7 +562,7 @@ func (env *Work) commitTransactions(mux *event.TypeMux, txs *types.TransactionsB // Start executing the transaction env.state.Prepare(tx.Hash(), common.Hash{}, env.tcount) - err, logs := env.commitTransaction(tx, bc, coinbase, gp) + err, logs := env.commitTransaction(tx, bc, coinbase, env.gasPool) switch err { case core.ErrGasLimitReached: // Pop the current out-of-gas transaction without shifting in the next from the account diff --git a/p2p/enr/enr.go b/p2p/enr/enr.go index c018895cc..48683471d 100644 --- a/p2p/enr/enr.go +++ b/p2p/enr/enr.go @@ -29,21 +29,16 @@ package enr import ( "bytes" - "crypto/ecdsa" "errors" "fmt" "io" "sort" - "github.com/ethereum/go-ethereum/crypto" - "github.com/ethereum/go-ethereum/crypto/sha3" "github.com/ethereum/go-ethereum/rlp" ) const SizeLimit = 300 // maximum encoded size of a node record in bytes -const ID_SECP256k1_KECCAK = ID("secp256k1-keccak") // the default identity scheme - var ( errNoID = errors.New("unknown or unspecified identity scheme") errInvalidSig = errors.New("invalid signature") @@ -80,8 +75,8 @@ func (r *Record) Seq() uint64 { } // SetSeq updates the record sequence number. This invalidates any signature on the record. -// Calling SetSeq is usually not required because signing the redord increments the -// sequence number. +// Calling SetSeq is usually not required because setting any key in a signed record +// increments the sequence number. func (r *Record) SetSeq(s uint64) { r.signature = nil r.raw = nil @@ -104,33 +99,42 @@ func (r *Record) Load(e Entry) error { return &KeyError{Key: e.ENRKey(), Err: errNotFound} } -// Set adds or updates the given entry in the record. -// It panics if the value can't be encoded. +// Set adds or updates the given entry in the record. It panics if the value can't be +// encoded. If the record is signed, Set increments the sequence number and invalidates +// the sequence number. func (r *Record) Set(e Entry) { - r.signature = nil - r.raw = nil blob, err := rlp.EncodeToBytes(e) if err != nil { panic(fmt.Errorf("enr: can't encode %s: %v", e.ENRKey(), err)) } + r.invalidate() - i := sort.Search(len(r.pairs), func(i int) bool { return r.pairs[i].k >= e.ENRKey() }) - - if i < len(r.pairs) && r.pairs[i].k == e.ENRKey() { + pairs := make([]pair, len(r.pairs)) + copy(pairs, r.pairs) + i := sort.Search(len(pairs), func(i int) bool { return pairs[i].k >= e.ENRKey() }) + switch { + case i < len(pairs) && pairs[i].k == e.ENRKey(): // element is present at r.pairs[i] - r.pairs[i].v = blob - return - } else if i < len(r.pairs) { + pairs[i].v = blob + case i < len(r.pairs): // insert pair before i-th elem el := pair{e.ENRKey(), blob} - r.pairs = append(r.pairs, pair{}) - copy(r.pairs[i+1:], r.pairs[i:]) - r.pairs[i] = el - return + pairs = append(pairs, pair{}) + copy(pairs[i+1:], pairs[i:]) + pairs[i] = el + default: + // element should be placed at the end of r.pairs + pairs = append(pairs, pair{e.ENRKey(), blob}) } + r.pairs = pairs +} - // element should be placed at the end of r.pairs - r.pairs = append(r.pairs, pair{e.ENRKey(), blob}) +func (r *Record) invalidate() { + if r.signature == nil { + r.seq++ + } + r.signature = nil + r.raw = nil } // EncodeRLP implements rlp.Encoder. Encoding fails if @@ -196,39 +200,55 @@ func (r *Record) DecodeRLP(s *rlp.Stream) error { return err } - // Verify signature. - if err = dec.verifySignature(); err != nil { + _, scheme := dec.idScheme() + if scheme == nil { + return errNoID + } + if err := scheme.Verify(&dec, dec.signature); err != nil { return err } *r = dec return nil } -type s256raw []byte - -func (s256raw) ENRKey() string { return "secp256k1" } - // NodeAddr returns the node address. The return value will be nil if the record is -// unsigned. +// unsigned or uses an unknown identity scheme. func (r *Record) NodeAddr() []byte { - var entry s256raw - if r.Load(&entry) != nil { + _, scheme := r.idScheme() + if scheme == nil { return nil } - return crypto.Keccak256(entry) + return scheme.NodeAddr(r) } -// Sign signs the record with the given private key. It updates the record's identity -// scheme, public key and increments the sequence number. Sign returns an error if the -// encoded record is larger than the size limit. -func (r *Record) Sign(privkey *ecdsa.PrivateKey) error { - r.seq = r.seq + 1 - r.Set(ID_SECP256k1_KECCAK) - r.Set(Secp256k1(privkey.PublicKey)) - return r.signAndEncode(privkey) +// SetSig sets the record signature. It returns an error if the encoded record is larger +// than the size limit or if the signature is invalid according to the passed scheme. +func (r *Record) SetSig(idscheme string, sig []byte) error { + // Check that "id" is set and matches the given scheme. This panics because + // inconsitencies here are always implementation bugs in the signing function calling + // this method. + id, s := r.idScheme() + if s == nil { + panic(errNoID) + } + if id != idscheme { + panic(fmt.Errorf("identity scheme mismatch in Sign: record has %s, want %s", id, idscheme)) + } + + // Verify against the scheme. + if err := s.Verify(r, sig); err != nil { + return err + } + raw, err := r.encode(sig) + if err != nil { + return err + } + r.signature, r.raw = sig, raw + return nil } -func (r *Record) appendPairs(list []interface{}) []interface{} { +// AppendElements appends the sequence number and entries to the given slice. +func (r *Record) AppendElements(list []interface{}) []interface{} { list = append(list, r.seq) for _, p := range r.pairs { list = append(list, p.k, p.v) @@ -236,54 +256,23 @@ func (r *Record) appendPairs(list []interface{}) []interface{} { return list } -func (r *Record) signAndEncode(privkey *ecdsa.PrivateKey) error { - // Put record elements into a flat list. Leave room for the signature. - list := make([]interface{}, 1, len(r.pairs)*2+2) - list = r.appendPairs(list) - - // Sign the tail of the list. - h := sha3.NewKeccak256() - rlp.Encode(h, list[1:]) - sig, err := crypto.Sign(h.Sum(nil), privkey) - if err != nil { - return err - } - sig = sig[:len(sig)-1] // remove v - - // Put signature in front. - r.signature, list[0] = sig, sig - r.raw, err = rlp.EncodeToBytes(list) - if err != nil { - return err +func (r *Record) encode(sig []byte) (raw []byte, err error) { + list := make([]interface{}, 1, 2*len(r.pairs)+1) + list[0] = sig + list = r.AppendElements(list) + if raw, err = rlp.EncodeToBytes(list); err != nil { + return nil, err } - if len(r.raw) > SizeLimit { - return errTooBig + if len(raw) > SizeLimit { + return nil, errTooBig } - return nil + return raw, nil } -func (r *Record) verifySignature() error { - // Get identity scheme, public key, signature. +func (r *Record) idScheme() (string, IdentityScheme) { var id ID - var entry s256raw if err := r.Load(&id); err != nil { - return err - } else if id != ID_SECP256k1_KECCAK { - return errNoID + return "", nil } - if err := r.Load(&entry); err != nil { - return err - } else if len(entry) != 33 { - return fmt.Errorf("invalid public key") - } - - // Verify the signature. - list := make([]interface{}, 0, len(r.pairs)*2+1) - list = r.appendPairs(list) - h := sha3.NewKeccak256() - rlp.Encode(h, list) - if !crypto.VerifySignature(entry, h.Sum(nil), r.signature) { - return errInvalidSig - } - return nil + return string(id), FindIdentityScheme(string(id)) } diff --git a/p2p/enr/enr_test.go b/p2p/enr/enr_test.go index ce7767d10..d1d088756 100644 --- a/p2p/enr/enr_test.go +++ b/p2p/enr/enr_test.go @@ -54,35 +54,35 @@ func TestGetSetID(t *testing.T) { assert.Equal(t, id, id2) } -// TestGetSetIP4 tests encoding/decoding and setting/getting of the IP4 key. +// TestGetSetIP4 tests encoding/decoding and setting/getting of the IP key. func TestGetSetIP4(t *testing.T) { - ip := IP4{192, 168, 0, 3} + ip := IP{192, 168, 0, 3} var r Record r.Set(ip) - var ip2 IP4 + var ip2 IP require.NoError(t, r.Load(&ip2)) assert.Equal(t, ip, ip2) } -// TestGetSetIP6 tests encoding/decoding and setting/getting of the IP6 key. +// TestGetSetIP6 tests encoding/decoding and setting/getting of the IP key. func TestGetSetIP6(t *testing.T) { - ip := IP6{0x20, 0x01, 0x48, 0x60, 0, 0, 0x20, 0x01, 0, 0, 0, 0, 0, 0, 0x00, 0x68} + ip := IP{0x20, 0x01, 0x48, 0x60, 0, 0, 0x20, 0x01, 0, 0, 0, 0, 0, 0, 0x00, 0x68} var r Record r.Set(ip) - var ip2 IP6 + var ip2 IP require.NoError(t, r.Load(&ip2)) assert.Equal(t, ip, ip2) } // TestGetSetDiscPort tests encoding/decoding and setting/getting of the DiscPort key. -func TestGetSetDiscPort(t *testing.T) { - port := DiscPort(30309) +func TestGetSetUDP(t *testing.T) { + port := UDP(30309) var r Record r.Set(port) - var port2 DiscPort + var port2 UDP require.NoError(t, r.Load(&port2)) assert.Equal(t, port, port2) } @@ -90,7 +90,7 @@ func TestGetSetDiscPort(t *testing.T) { // TestGetSetSecp256k1 tests encoding/decoding and setting/getting of the Secp256k1 key. func TestGetSetSecp256k1(t *testing.T) { var r Record - if err := r.Sign(privkey); err != nil { + if err := SignV4(&r, privkey); err != nil { t.Fatal(err) } @@ -101,16 +101,16 @@ func TestGetSetSecp256k1(t *testing.T) { func TestLoadErrors(t *testing.T) { var r Record - ip4 := IP4{127, 0, 0, 1} + ip4 := IP{127, 0, 0, 1} r.Set(ip4) // Check error for missing keys. - var ip6 IP6 - err := r.Load(&ip6) + var udp UDP + err := r.Load(&udp) if !IsNotFound(err) { t.Error("IsNotFound should return true for missing key") } - assert.Equal(t, &KeyError{Key: ip6.ENRKey(), Err: errNotFound}, err) + assert.Equal(t, &KeyError{Key: udp.ENRKey(), Err: errNotFound}, err) // Check error for invalid keys. var list []uint @@ -174,7 +174,7 @@ func TestDirty(t *testing.T) { t.Errorf("expected errEncodeUnsigned, got %#v", err) } - require.NoError(t, r.Sign(privkey)) + require.NoError(t, SignV4(&r, privkey)) if !r.Signed() { t.Error("Signed return false for signed record") } @@ -194,13 +194,13 @@ func TestDirty(t *testing.T) { func TestGetSetOverwrite(t *testing.T) { var r Record - ip := IP4{192, 168, 0, 3} + ip := IP{192, 168, 0, 3} r.Set(ip) - ip2 := IP4{192, 168, 0, 4} + ip2 := IP{192, 168, 0, 4} r.Set(ip2) - var ip3 IP4 + var ip3 IP require.NoError(t, r.Load(&ip3)) assert.Equal(t, ip2, ip3) } @@ -208,9 +208,9 @@ func TestGetSetOverwrite(t *testing.T) { // TestSignEncodeAndDecode tests signing, RLP encoding and RLP decoding of a record. func TestSignEncodeAndDecode(t *testing.T) { var r Record - r.Set(DiscPort(30303)) - r.Set(IP4{127, 0, 0, 1}) - require.NoError(t, r.Sign(privkey)) + r.Set(UDP(30303)) + r.Set(IP{127, 0, 0, 1}) + require.NoError(t, SignV4(&r, privkey)) blob, err := rlp.EncodeToBytes(r) require.NoError(t, err) @@ -230,12 +230,12 @@ func TestNodeAddr(t *testing.T) { t.Errorf("wrong address on empty record: got %v, want %v", addr, nil) } - require.NoError(t, r.Sign(privkey)) - expected := "caaa1485d83b18b32ed9ad666026151bf0cae8a0a88c857ae2d4c5be2daa6726" + require.NoError(t, SignV4(&r, privkey)) + expected := "a448f24c6d18e575453db13171562b71999873db5b286df957af199ec94617f7" assert.Equal(t, expected, hex.EncodeToString(r.NodeAddr())) } -var pyRecord, _ = hex.DecodeString("f896b840954dc36583c1f4b69ab59b1375f362f06ee99f3723cd77e64b6de6d211c27d7870642a79d4516997f94091325d2a7ca6215376971455fb221d34f35b277149a1018664697363763582765f82696490736563703235366b312d6b656363616b83697034847f00000189736563703235366b31a103ca634cae0d49acb401d8a4c6b6fe8c55b70d115bf400769cc1400f3258cd3138") +var pyRecord, _ = hex.DecodeString("f884b8407098ad865b00a582051940cb9cf36836572411a47278783077011599ed5cd16b76f2635f4e234738f30813a89eb9137e3e3df5266e3a1f11df72ecf1145ccb9c01826964827634826970847f00000189736563703235366b31a103ca634cae0d49acb401d8a4c6b6fe8c55b70d115bf400769cc1400f3258cd31388375647082765f") // TestPythonInterop checks that we can decode and verify a record produced by the Python // implementation. @@ -246,10 +246,10 @@ func TestPythonInterop(t *testing.T) { } var ( - wantAddr, _ = hex.DecodeString("caaa1485d83b18b32ed9ad666026151bf0cae8a0a88c857ae2d4c5be2daa6726") - wantSeq = uint64(1) - wantIP = IP4{127, 0, 0, 1} - wantDiscport = DiscPort(30303) + wantAddr, _ = hex.DecodeString("a448f24c6d18e575453db13171562b71999873db5b286df957af199ec94617f7") + wantSeq = uint64(1) + wantIP = IP{127, 0, 0, 1} + wantUDP = UDP(30303) ) if r.Seq() != wantSeq { t.Errorf("wrong seq: got %d, want %d", r.Seq(), wantSeq) @@ -257,7 +257,7 @@ func TestPythonInterop(t *testing.T) { if addr := r.NodeAddr(); !bytes.Equal(addr, wantAddr) { t.Errorf("wrong addr: got %x, want %x", addr, wantAddr) } - want := map[Entry]interface{}{new(IP4): &wantIP, new(DiscPort): &wantDiscport} + want := map[Entry]interface{}{new(IP): &wantIP, new(UDP): &wantUDP} for k, v := range want { desc := fmt.Sprintf("loading key %q", k.ENRKey()) if assert.NoError(t, r.Load(k), desc) { @@ -272,14 +272,14 @@ func TestRecordTooBig(t *testing.T) { key := randomString(10) // set a big value for random key, expect error - r.Set(WithEntry(key, randomString(300))) - if err := r.Sign(privkey); err != errTooBig { + r.Set(WithEntry(key, randomString(SizeLimit))) + if err := SignV4(&r, privkey); err != errTooBig { t.Fatalf("expected to get errTooBig, got %#v", err) } // set an acceptable value for random key, expect no error r.Set(WithEntry(key, randomString(100))) - require.NoError(t, r.Sign(privkey)) + require.NoError(t, SignV4(&r, privkey)) } // TestSignEncodeAndDecodeRandom tests encoding/decoding of records containing random key/value pairs. @@ -295,7 +295,7 @@ func TestSignEncodeAndDecodeRandom(t *testing.T) { r.Set(WithEntry(key, &value)) } - require.NoError(t, r.Sign(privkey)) + require.NoError(t, SignV4(&r, privkey)) _, err := rlp.EncodeToBytes(r) require.NoError(t, err) diff --git a/p2p/enr/entries.go b/p2p/enr/entries.go index 7591e6eff..71c7653a2 100644 --- a/p2p/enr/entries.go +++ b/p2p/enr/entries.go @@ -57,59 +57,43 @@ func WithEntry(k string, v interface{}) Entry { return &generic{key: k, value: v} } -// DiscPort is the "discv5" key, which holds the UDP port for discovery v5. -type DiscPort uint16 +// TCP is the "tcp" key, which holds the TCP port of the node. +type TCP uint16 -func (v DiscPort) ENRKey() string { return "discv5" } +func (v TCP) ENRKey() string { return "tcp" } + +// UDP is the "udp" key, which holds the UDP port of the node. +type UDP uint16 + +func (v UDP) ENRKey() string { return "udp" } // ID is the "id" key, which holds the name of the identity scheme. type ID string +const IDv4 = ID("v4") // the default identity scheme + func (v ID) ENRKey() string { return "id" } -// IP4 is the "ip4" key, which holds a 4-byte IPv4 address. -type IP4 net.IP +// IP is the "ip" key, which holds the IP address of the node. +type IP net.IP -func (v IP4) ENRKey() string { return "ip4" } +func (v IP) ENRKey() string { return "ip" } // EncodeRLP implements rlp.Encoder. -func (v IP4) EncodeRLP(w io.Writer) error { - ip4 := net.IP(v).To4() - if ip4 == nil { - return fmt.Errorf("invalid IPv4 address: %v", v) - } - return rlp.Encode(w, ip4) -} - -// DecodeRLP implements rlp.Decoder. -func (v *IP4) DecodeRLP(s *rlp.Stream) error { - if err := s.Decode((*net.IP)(v)); err != nil { - return err - } - if len(*v) != 4 { - return fmt.Errorf("invalid IPv4 address, want 4 bytes: %v", *v) +func (v IP) EncodeRLP(w io.Writer) error { + if ip4 := net.IP(v).To4(); ip4 != nil { + return rlp.Encode(w, ip4) } - return nil -} - -// IP6 is the "ip6" key, which holds a 16-byte IPv6 address. -type IP6 net.IP - -func (v IP6) ENRKey() string { return "ip6" } - -// EncodeRLP implements rlp.Encoder. -func (v IP6) EncodeRLP(w io.Writer) error { - ip6 := net.IP(v) - return rlp.Encode(w, ip6) + return rlp.Encode(w, net.IP(v)) } // DecodeRLP implements rlp.Decoder. -func (v *IP6) DecodeRLP(s *rlp.Stream) error { +func (v *IP) DecodeRLP(s *rlp.Stream) error { if err := s.Decode((*net.IP)(v)); err != nil { return err } - if len(*v) != 16 { - return fmt.Errorf("invalid IPv6 address, want 16 bytes: %v", *v) + if len(*v) != 4 && len(*v) != 16 { + return fmt.Errorf("invalid IP address, want 4 or 16 bytes: %v", *v) } return nil } diff --git a/p2p/enr/idscheme.go b/p2p/enr/idscheme.go new file mode 100644 index 000000000..efaf68041 --- /dev/null +++ b/p2p/enr/idscheme.go @@ -0,0 +1,114 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package enr + +import ( + "crypto/ecdsa" + "fmt" + "sync" + + "github.com/ethereum/go-ethereum/common/math" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/crypto/sha3" + "github.com/ethereum/go-ethereum/rlp" +) + +// Registry of known identity schemes. +var schemes sync.Map + +// An IdentityScheme is capable of verifying record signatures and +// deriving node addresses. +type IdentityScheme interface { + Verify(r *Record, sig []byte) error + NodeAddr(r *Record) []byte +} + +// RegisterIdentityScheme adds an identity scheme to the global registry. +func RegisterIdentityScheme(name string, scheme IdentityScheme) { + if _, loaded := schemes.LoadOrStore(name, scheme); loaded { + panic("identity scheme " + name + " already registered") + } +} + +// FindIdentityScheme resolves name to an identity scheme in the global registry. +func FindIdentityScheme(name string) IdentityScheme { + s, ok := schemes.Load(name) + if !ok { + return nil + } + return s.(IdentityScheme) +} + +// v4ID is the "v4" identity scheme. +type v4ID struct{} + +func init() { + RegisterIdentityScheme("v4", v4ID{}) +} + +// SignV4 signs a record using the v4 scheme. +func SignV4(r *Record, privkey *ecdsa.PrivateKey) error { + // Copy r to avoid modifying it if signing fails. + cpy := *r + cpy.Set(ID("v4")) + cpy.Set(Secp256k1(privkey.PublicKey)) + + h := sha3.NewKeccak256() + rlp.Encode(h, cpy.AppendElements(nil)) + sig, err := crypto.Sign(h.Sum(nil), privkey) + if err != nil { + return err + } + sig = sig[:len(sig)-1] // remove v + if err = cpy.SetSig("v4", sig); err == nil { + *r = cpy + } + return err +} + +// s256raw is an unparsed secp256k1 public key entry. +type s256raw []byte + +func (s256raw) ENRKey() string { return "secp256k1" } + +func (v4ID) Verify(r *Record, sig []byte) error { + var entry s256raw + if err := r.Load(&entry); err != nil { + return err + } else if len(entry) != 33 { + return fmt.Errorf("invalid public key") + } + + h := sha3.NewKeccak256() + rlp.Encode(h, r.AppendElements(nil)) + if !crypto.VerifySignature(entry, h.Sum(nil), sig) { + return errInvalidSig + } + return nil +} + +func (v4ID) NodeAddr(r *Record) []byte { + var pubkey Secp256k1 + err := r.Load(&pubkey) + if err != nil { + return nil + } + buf := make([]byte, 64) + math.ReadBits(pubkey.X, buf[:32]) + math.ReadBits(pubkey.Y, buf[32:]) + return crypto.Keccak256(buf) +} diff --git a/p2p/enr/idscheme_test.go b/p2p/enr/idscheme_test.go new file mode 100644 index 000000000..d790e12f1 --- /dev/null +++ b/p2p/enr/idscheme_test.go @@ -0,0 +1,36 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package enr + +import ( + "crypto/ecdsa" + "math/big" + "testing" +) + +// Checks that failure to sign leaves the record unmodified. +func TestSignError(t *testing.T) { + invalidKey := &ecdsa.PrivateKey{D: new(big.Int), PublicKey: *pubkey} + + var r Record + if err := SignV4(&r, invalidKey); err == nil { + t.Fatal("expected error from SignV4") + } + if len(r.pairs) > 0 { + t.Fatal("expected empty record, have", r.pairs) + } +} diff --git a/params/version.go b/params/version.go index 8b96f2516..1e8c43bf8 100644 --- a/params/version.go +++ b/params/version.go @@ -23,7 +23,7 @@ import ( const ( VersionMajor = 1 // Major version component of the current release VersionMinor = 8 // Minor version component of the current release - VersionPatch = 8 // Patch version component of the current release + VersionPatch = 9 // Patch version component of the current release VersionMeta = "unstable" // Version metadata to append to the version string ) |