aboutsummaryrefslogtreecommitdiffstats
path: root/les
diff options
context:
space:
mode:
authorFelföldi Zsolt <zsfelfoldi@gmail.com>2019-05-13 19:41:10 +0800
committerPéter Szilágyi <peterke@gmail.com>2019-05-13 19:41:10 +0800
commit40cdcf8c47ff094775aca08fd5d94051f9cf1dbb (patch)
tree3a9fc715ec501ab0fec8c81004e17477bd136f9f /les
parentf4fb1a18015d88aa7b424709aac346da59edf410 (diff)
downloadgo-tangerine-40cdcf8c47ff094775aca08fd5d94051f9cf1dbb.tar
go-tangerine-40cdcf8c47ff094775aca08fd5d94051f9cf1dbb.tar.gz
go-tangerine-40cdcf8c47ff094775aca08fd5d94051f9cf1dbb.tar.bz2
go-tangerine-40cdcf8c47ff094775aca08fd5d94051f9cf1dbb.tar.lz
go-tangerine-40cdcf8c47ff094775aca08fd5d94051f9cf1dbb.tar.xz
go-tangerine-40cdcf8c47ff094775aca08fd5d94051f9cf1dbb.tar.zst
go-tangerine-40cdcf8c47ff094775aca08fd5d94051f9cf1dbb.zip
les, light: implement ODR transaction lookup by hash (#19069)
* les, light: implement ODR transaction lookup by hash * les: delete useless file * internal/ethapi: always use backend to find transaction * les, eth, internal/ethapi: renamed GetCanonicalTransaction to GetTransaction * light: add canonical header verification to GetTransaction
Diffstat (limited to 'les')
-rw-r--r--les/api_backend.go4
-rw-r--r--les/backend.go3
-rw-r--r--les/handler.go17
-rw-r--r--les/handler_test.go26
-rw-r--r--les/helper_test.go4
-rw-r--r--les/odr.go1
-rw-r--r--les/odr_requests.go40
-rw-r--r--les/odr_test.go45
-rw-r--r--les/peer.go2
-rw-r--r--les/protocol.go8
-rw-r--r--les/txrelay.go15
11 files changed, 122 insertions, 43 deletions
diff --git a/les/api_backend.go b/les/api_backend.go
index 589cf572d..6de15e7bd 100644
--- a/les/api_backend.go
+++ b/les/api_backend.go
@@ -132,6 +132,10 @@ func (b *LesApiBackend) GetPoolTransaction(txHash common.Hash) *types.Transactio
return b.eth.txPool.GetTransaction(txHash)
}
+func (b *LesApiBackend) GetTransaction(ctx context.Context, txHash common.Hash) (*types.Transaction, common.Hash, uint64, uint64, error) {
+ return light.GetTransaction(ctx, b.eth.odr, txHash)
+}
+
func (b *LesApiBackend) GetPoolNonce(ctx context.Context, addr common.Address) (uint64, error) {
return b.eth.txPool.GetNonce(ctx, addr)
}
diff --git a/les/backend.go b/les/backend.go
index 80a912816..887f88210 100644
--- a/les/backend.go
+++ b/les/backend.go
@@ -114,9 +114,9 @@ func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) {
if leth.config.ULC != nil {
trustedNodes = leth.config.ULC.TrustedServers
}
- leth.relay = NewLesTxRelay(peers, leth.reqDist)
leth.serverPool = newServerPool(chainDb, quitSync, &leth.wg, trustedNodes)
leth.retriever = newRetrieveManager(peers, leth.reqDist, leth.serverPool)
+ leth.relay = NewLesTxRelay(peers, leth.retriever)
leth.odr = NewLesOdr(chainDb, light.DefaultClientIndexerConfig, leth.retriever)
leth.chtIndexer = light.NewChtIndexer(chainDb, leth.odr, params.CHTFrequency, params.HelperTrieConfirmations)
@@ -271,6 +271,7 @@ func (s *LightEthereum) Start(srvr *p2p.Server) error {
// Ethereum protocol.
func (s *LightEthereum) Stop() error {
s.odr.Stop()
+ s.relay.Stop()
s.bloomIndexer.Close()
s.chtIndexer.Close()
s.blockchain.Stop()
diff --git a/les/handler.go b/les/handler.go
index d46eeb03a..c6baeece4 100644
--- a/les/handler.go
+++ b/les/handler.go
@@ -979,7 +979,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
return errResp(ErrRequestRejected, "")
}
go func() {
- stats := make([]txStatus, len(req.Txs))
+ stats := make([]light.TxStatus, len(req.Txs))
for i, tx := range req.Txs {
if i != 0 && !task.waitOrStop() {
return
@@ -1014,7 +1014,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
return errResp(ErrRequestRejected, "")
}
go func() {
- stats := make([]txStatus, len(req.Hashes))
+ stats := make([]light.TxStatus, len(req.Hashes))
for i, hash := range req.Hashes {
if i != 0 && !task.waitOrStop() {
return
@@ -1032,7 +1032,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
p.Log().Trace("Received tx status response")
var resp struct {
ReqID, BV uint64
- Status []txStatus
+ Status []light.TxStatus
}
if err := msg.Decode(&resp); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
@@ -1040,6 +1040,13 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
p.fcServer.ReceivedReply(resp.ReqID, resp.BV)
+ p.Log().Trace("Received helper trie proof response")
+ deliverMsg = &Msg{
+ MsgType: MsgTxStatus,
+ ReqID: resp.ReqID,
+ Obj: resp.Status,
+ }
+
default:
p.Log().Trace("Received unknown message", "code", msg.Code)
return errResp(ErrInvalidMsgCode, "%v", msg.Code)
@@ -1097,8 +1104,8 @@ func (pm *ProtocolManager) getHelperTrieAuxData(req HelperTrieReq) []byte {
return nil
}
-func (pm *ProtocolManager) txStatus(hash common.Hash) txStatus {
- var stat txStatus
+func (pm *ProtocolManager) txStatus(hash common.Hash) light.TxStatus {
+ var stat light.TxStatus
stat.Status = pm.txpool.Status([]common.Hash{hash})[0]
// If the transaction is unknown to the pool, try looking it up locally
if stat.Status == core.TxStatusUnknown {
diff --git a/les/handler_test.go b/les/handler_test.go
index c659f8088..d8051c145 100644
--- a/les/handler_test.go
+++ b/les/handler_test.go
@@ -443,7 +443,7 @@ func TestTransactionStatusLes2(t *testing.T) {
var reqID uint64
- test := func(tx *types.Transaction, send bool, expStatus txStatus) {
+ test := func(tx *types.Transaction, send bool, expStatus light.TxStatus) {
reqID++
if send {
cost := peer.GetRequestCost(SendTxV2Msg, 1)
@@ -452,7 +452,7 @@ func TestTransactionStatusLes2(t *testing.T) {
cost := peer.GetRequestCost(GetTxStatusMsg, 1)
sendRequest(peer.app, GetTxStatusMsg, reqID, cost, []common.Hash{tx.Hash()})
}
- if err := expectResponse(peer.app, TxStatusMsg, reqID, testBufLimit, []txStatus{expStatus}); err != nil {
+ if err := expectResponse(peer.app, TxStatusMsg, reqID, testBufLimit, []light.TxStatus{expStatus}); err != nil {
t.Errorf("transaction status mismatch")
}
}
@@ -461,20 +461,20 @@ func TestTransactionStatusLes2(t *testing.T) {
// test error status by sending an underpriced transaction
tx0, _ := types.SignTx(types.NewTransaction(0, acc1Addr, big.NewInt(10000), params.TxGas, nil, nil), signer, testBankKey)
- test(tx0, true, txStatus{Status: core.TxStatusUnknown, Error: core.ErrUnderpriced.Error()})
+ test(tx0, true, light.TxStatus{Status: core.TxStatusUnknown, Error: core.ErrUnderpriced.Error()})
tx1, _ := types.SignTx(types.NewTransaction(0, acc1Addr, big.NewInt(10000), params.TxGas, big.NewInt(100000000000), nil), signer, testBankKey)
- test(tx1, false, txStatus{Status: core.TxStatusUnknown}) // query before sending, should be unknown
- test(tx1, true, txStatus{Status: core.TxStatusPending}) // send valid processable tx, should return pending
- test(tx1, true, txStatus{Status: core.TxStatusPending}) // adding it again should not return an error
+ test(tx1, false, light.TxStatus{Status: core.TxStatusUnknown}) // query before sending, should be unknown
+ test(tx1, true, light.TxStatus{Status: core.TxStatusPending}) // send valid processable tx, should return pending
+ test(tx1, true, light.TxStatus{Status: core.TxStatusPending}) // adding it again should not return an error
tx2, _ := types.SignTx(types.NewTransaction(1, acc1Addr, big.NewInt(10000), params.TxGas, big.NewInt(100000000000), nil), signer, testBankKey)
tx3, _ := types.SignTx(types.NewTransaction(2, acc1Addr, big.NewInt(10000), params.TxGas, big.NewInt(100000000000), nil), signer, testBankKey)
// send transactions in the wrong order, tx3 should be queued
- test(tx3, true, txStatus{Status: core.TxStatusQueued})
- test(tx2, true, txStatus{Status: core.TxStatusPending})
+ test(tx3, true, light.TxStatus{Status: core.TxStatusQueued})
+ test(tx2, true, light.TxStatus{Status: core.TxStatusPending})
// query again, now tx3 should be pending too
- test(tx3, false, txStatus{Status: core.TxStatusPending})
+ test(tx3, false, light.TxStatus{Status: core.TxStatusPending})
// generate and add a block with tx1 and tx2 included
gchain, _ := core.GenerateChain(params.TestChainConfig, chain.GetBlockByNumber(0), ethash.NewFaker(), db, 1, func(i int, block *core.BlockGen) {
@@ -497,8 +497,8 @@ func TestTransactionStatusLes2(t *testing.T) {
// check if their status is included now
block1hash := rawdb.ReadCanonicalHash(db, 1)
- test(tx1, false, txStatus{Status: core.TxStatusIncluded, Lookup: &rawdb.LegacyTxLookupEntry{BlockHash: block1hash, BlockIndex: 1, Index: 0}})
- test(tx2, false, txStatus{Status: core.TxStatusIncluded, Lookup: &rawdb.LegacyTxLookupEntry{BlockHash: block1hash, BlockIndex: 1, Index: 1}})
+ test(tx1, false, light.TxStatus{Status: core.TxStatusIncluded, Lookup: &rawdb.LegacyTxLookupEntry{BlockHash: block1hash, BlockIndex: 1, Index: 0}})
+ test(tx2, false, light.TxStatus{Status: core.TxStatusIncluded, Lookup: &rawdb.LegacyTxLookupEntry{BlockHash: block1hash, BlockIndex: 1, Index: 1}})
// create a reorg that rolls them back
gchain, _ = core.GenerateChain(params.TestChainConfig, chain.GetBlockByNumber(0), ethash.NewFaker(), db, 2, func(i int, block *core.BlockGen) {})
@@ -516,6 +516,6 @@ func TestTransactionStatusLes2(t *testing.T) {
t.Fatalf("pending count mismatch: have %d, want 3", pending)
}
// check if their status is pending again
- test(tx1, false, txStatus{Status: core.TxStatusPending})
- test(tx2, false, txStatus{Status: core.TxStatusPending})
+ test(tx1, false, light.TxStatus{Status: core.TxStatusPending})
+ test(tx2, false, light.TxStatus{Status: core.TxStatusPending})
}
diff --git a/les/helper_test.go b/les/helper_test.go
index 020c69e17..9a302f837 100644
--- a/les/helper_test.go
+++ b/les/helper_test.go
@@ -148,6 +148,7 @@ func newTestProtocolManager(lightSync bool, blocks int, generator func(int, *cor
}
genesis = gspec.MustCommit(db)
chain BlockChain
+ pool txPool
)
if peers == nil {
peers = newPeerSet()
@@ -162,13 +163,14 @@ func newTestProtocolManager(lightSync bool, blocks int, generator func(int, *cor
panic(err)
}
chain = blockchain
+ pool = core.NewTxPool(core.DefaultTxPoolConfig, gspec.Config, blockchain)
}
indexConfig := light.TestServerIndexerConfig
if lightSync {
indexConfig = light.TestClientIndexerConfig
}
- pm, err := NewProtocolManager(gspec.Config, indexConfig, lightSync, NetworkId, evmux, engine, peers, chain, nil, db, odr, nil, nil, make(chan struct{}), new(sync.WaitGroup), ulcConfig)
+ pm, err := NewProtocolManager(gspec.Config, indexConfig, lightSync, NetworkId, evmux, engine, peers, chain, pool, db, odr, nil, nil, make(chan struct{}), new(sync.WaitGroup), ulcConfig)
if err != nil {
return nil, err
}
diff --git a/les/odr.go b/les/odr.go
index daf2ea19e..9176924cb 100644
--- a/les/odr.go
+++ b/les/odr.go
@@ -86,6 +86,7 @@ const (
MsgReceipts
MsgProofsV2
MsgHelperTrieProofs
+ MsgTxStatus
)
// Msg encodes a LES message that delivers reply data for a request
diff --git a/les/odr_requests.go b/les/odr_requests.go
index 66d6175b8..328750d7a 100644
--- a/les/odr_requests.go
+++ b/les/odr_requests.go
@@ -66,6 +66,8 @@ func LesRequest(req light.OdrRequest) LesOdrRequest {
return (*ChtRequest)(r)
case *light.BloomRequest:
return (*BloomRequest)(r)
+ case *light.TxStatusRequest:
+ return (*TxStatusRequest)(r)
default:
return nil
}
@@ -471,6 +473,44 @@ func (r *BloomRequest) Validate(db ethdb.Database, msg *Msg) error {
return nil
}
+// TxStatusRequest is the ODR request type for transaction status
+type TxStatusRequest light.TxStatusRequest
+
+// GetCost returns the cost of the given ODR request according to the serving
+// peer's cost table (implementation of LesOdrRequest)
+func (r *TxStatusRequest) GetCost(peer *peer) uint64 {
+ return peer.GetRequestCost(GetTxStatusMsg, len(r.Hashes))
+}
+
+// CanSend tells if a certain peer is suitable for serving the given request
+func (r *TxStatusRequest) CanSend(peer *peer) bool {
+ return peer.version >= lpv2
+}
+
+// Request sends an ODR request to the LES network (implementation of LesOdrRequest)
+func (r *TxStatusRequest) Request(reqID uint64, peer *peer) error {
+ peer.Log().Debug("Requesting transaction status", "count", len(r.Hashes))
+ return peer.RequestTxStatus(reqID, r.GetCost(peer), r.Hashes)
+}
+
+// Valid processes an ODR request reply message from the LES network
+// returns true and stores results in memory if the message was a valid reply
+// to the request (implementation of LesOdrRequest)
+func (r *TxStatusRequest) Validate(db ethdb.Database, msg *Msg) error {
+ log.Debug("Validating transaction status", "count", len(r.Hashes))
+
+ // Ensure we have a correct message with a single block body
+ if msg.MsgType != MsgTxStatus {
+ return errInvalidMessageType
+ }
+ status := msg.Obj.([]light.TxStatus)
+ if len(status) != len(r.Hashes) {
+ return errInvalidEntryCount
+ }
+ r.Status = status
+ return nil
+}
+
// readTraceDB stores the keys of database reads. We use this to check that received node
// sets contain only the trie nodes necessary to make proofs pass.
type readTraceDB struct {
diff --git a/les/odr_test.go b/les/odr_test.go
index 2cc28e384..a1d547956 100644
--- a/les/odr_test.go
+++ b/les/odr_test.go
@@ -38,7 +38,7 @@ import (
type odrTestFn func(ctx context.Context, db ethdb.Database, config *params.ChainConfig, bc *core.BlockChain, lc *light.LightChain, bhash common.Hash) []byte
-func TestOdrGetBlockLes2(t *testing.T) { testOdr(t, 2, 1, odrGetBlock) }
+func TestOdrGetBlockLes2(t *testing.T) { testOdr(t, 2, 1, true, odrGetBlock) }
func odrGetBlock(ctx context.Context, db ethdb.Database, config *params.ChainConfig, bc *core.BlockChain, lc *light.LightChain, bhash common.Hash) []byte {
var block *types.Block
@@ -54,7 +54,7 @@ func odrGetBlock(ctx context.Context, db ethdb.Database, config *params.ChainCon
return rlp
}
-func TestOdrGetReceiptsLes2(t *testing.T) { testOdr(t, 2, 1, odrGetReceipts) }
+func TestOdrGetReceiptsLes2(t *testing.T) { testOdr(t, 2, 1, true, odrGetReceipts) }
func odrGetReceipts(ctx context.Context, db ethdb.Database, config *params.ChainConfig, bc *core.BlockChain, lc *light.LightChain, bhash common.Hash) []byte {
var receipts types.Receipts
@@ -74,7 +74,7 @@ func odrGetReceipts(ctx context.Context, db ethdb.Database, config *params.Chain
return rlp
}
-func TestOdrAccountsLes2(t *testing.T) { testOdr(t, 2, 1, odrAccounts) }
+func TestOdrAccountsLes2(t *testing.T) { testOdr(t, 2, 1, true, odrAccounts) }
func odrAccounts(ctx context.Context, db ethdb.Database, config *params.ChainConfig, bc *core.BlockChain, lc *light.LightChain, bhash common.Hash) []byte {
dummyAddr := common.HexToAddress("1234567812345678123456781234567812345678")
@@ -102,7 +102,7 @@ func odrAccounts(ctx context.Context, db ethdb.Database, config *params.ChainCon
return res
}
-func TestOdrContractCallLes2(t *testing.T) { testOdr(t, 2, 2, odrContractCall) }
+func TestOdrContractCallLes2(t *testing.T) { testOdr(t, 2, 2, true, odrContractCall) }
type callmsg struct {
types.Message
@@ -151,8 +151,32 @@ func odrContractCall(ctx context.Context, db ethdb.Database, config *params.Chai
return res
}
+func TestOdrTxStatusLes2(t *testing.T) { testOdr(t, 2, 1, false, odrTxStatus) }
+
+func odrTxStatus(ctx context.Context, db ethdb.Database, config *params.ChainConfig, bc *core.BlockChain, lc *light.LightChain, bhash common.Hash) []byte {
+ var txs types.Transactions
+ if bc != nil {
+ block := bc.GetBlockByHash(bhash)
+ txs = block.Transactions()
+ } else {
+ if block, _ := lc.GetBlockByHash(ctx, bhash); block != nil {
+ btxs := block.Transactions()
+ txs = make(types.Transactions, len(btxs))
+ for i, tx := range btxs {
+ var err error
+ txs[i], _, _, _, err = light.GetTransaction(ctx, lc.Odr(), tx.Hash())
+ if err != nil {
+ return nil
+ }
+ }
+ }
+ }
+ rlp, _ := rlp.EncodeToBytes(txs)
+ return rlp
+}
+
// testOdr tests odr requests whose validation guaranteed by block headers.
-func testOdr(t *testing.T, protocol int, expFail uint64, fn odrTestFn) {
+func testOdr(t *testing.T, protocol int, expFail uint64, checkCached bool, fn odrTestFn) {
// Assemble the test environment
server, client, tearDown := newClientServerEnv(t, 4, protocol, nil, true)
defer tearDown()
@@ -193,9 +217,10 @@ func testOdr(t *testing.T, protocol int, expFail uint64, fn odrTestFn) {
client.rPeer.hasBlock = func(common.Hash, uint64, bool) bool { return true }
client.peers.lock.Unlock()
test(5)
-
- // still expect all retrievals to pass, now data should be cached locally
- client.peers.Unregister(client.rPeer.id)
- time.Sleep(time.Millisecond * 10) // ensure that all peerSetNotify callbacks are executed
- test(5)
+ if checkCached {
+ // still expect all retrievals to pass, now data should be cached locally
+ client.peers.Unregister(client.rPeer.id)
+ time.Sleep(time.Millisecond * 10) // ensure that all peerSetNotify callbacks are executed
+ test(5)
+ }
}
diff --git a/les/peer.go b/les/peer.go
index 7a163cd1d..42c13ab7d 100644
--- a/les/peer.go
+++ b/les/peer.go
@@ -317,7 +317,7 @@ func (p *peer) ReplyHelperTrieProofs(reqID uint64, resp HelperTrieResps) *reply
}
// ReplyTxStatus creates a reply with a batch of transaction status records, corresponding to the ones requested.
-func (p *peer) ReplyTxStatus(reqID uint64, stats []txStatus) *reply {
+func (p *peer) ReplyTxStatus(reqID uint64, stats []light.TxStatus) *reply {
data, _ := rlp.EncodeToBytes(stats)
return &reply{p.rw, TxStatusMsg, reqID, data}
}
diff --git a/les/protocol.go b/les/protocol.go
index 86e450d01..24dc9bdea 100644
--- a/les/protocol.go
+++ b/les/protocol.go
@@ -24,8 +24,6 @@ import (
"math/big"
"github.com/ethereum/go-ethereum/common"
- "github.com/ethereum/go-ethereum/core"
- "github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/rlp"
@@ -227,9 +225,3 @@ type CodeData []struct {
}
type proofsData [][]rlp.RawValue
-
-type txStatus struct {
- Status core.TxStatus
- Lookup *rawdb.LegacyTxLookupEntry `rlp:"nil"`
- Error string
-}
diff --git a/les/txrelay.go b/les/txrelay.go
index a790bbec9..5ebef1c22 100644
--- a/les/txrelay.go
+++ b/les/txrelay.go
@@ -17,6 +17,7 @@
package les
import (
+ "context"
"sync"
"github.com/ethereum/go-ethereum/common"
@@ -36,21 +37,27 @@ type LesTxRelay struct {
peerList []*peer
peerStartPos int
lock sync.RWMutex
+ stop chan struct{}
- reqDist *requestDistributor
+ retriever *retrieveManager
}
-func NewLesTxRelay(ps *peerSet, reqDist *requestDistributor) *LesTxRelay {
+func NewLesTxRelay(ps *peerSet, retriever *retrieveManager) *LesTxRelay {
r := &LesTxRelay{
txSent: make(map[common.Hash]*ltrInfo),
txPending: make(map[common.Hash]struct{}),
ps: ps,
- reqDist: reqDist,
+ retriever: retriever,
+ stop: make(chan struct{}),
}
ps.notify(r)
return r
}
+func (self *LesTxRelay) Stop() {
+ close(self.stop)
+}
+
func (self *LesTxRelay) registerPeer(p *peer) {
self.lock.Lock()
defer self.lock.Unlock()
@@ -132,7 +139,7 @@ func (self *LesTxRelay) send(txs types.Transactions, count int) {
return func() { peer.SendTxs(reqID, cost, enc) }
},
}
- self.reqDist.queue(rq)
+ go self.retriever.retrieve(context.Background(), reqID, rq, func(p distPeer, msg *Msg) error { return nil }, self.stop)
}
}