diff options
Diffstat (limited to 'les')
-rw-r--r-- | les/handler.go | 65 | ||||
-rw-r--r-- | les/handler_test.go | 47 | ||||
-rw-r--r-- | les/peer.go | 5 | ||||
-rw-r--r-- | les/protocol.go | 7 |
4 files changed, 73 insertions, 51 deletions
diff --git a/les/handler.go b/les/handler.go index de07b7244..613fbb79f 100644 --- a/les/handler.go +++ b/les/handler.go @@ -89,7 +89,8 @@ type BlockChain interface { } type txPool interface { - AddOrGetTxStatus(txs []*types.Transaction, txHashes []common.Hash) []core.TxStatusData + AddRemotes(txs []*types.Transaction) []error + Status(hashes []common.Hash) []core.TxStatus } type ProtocolManager struct { @@ -983,12 +984,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { if reject(uint64(reqCnt), MaxTxSend) { return errResp(ErrRequestRejected, "") } - - txHashes := make([]common.Hash, len(txs)) - for i, tx := range txs { - txHashes[i] = tx.Hash() - } - pm.addOrGetTxStatus(txs, txHashes) + pm.txpool.AddRemotes(txs) _, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost) pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost) @@ -1010,16 +1006,25 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { return errResp(ErrRequestRejected, "") } - txHashes := make([]common.Hash, len(req.Txs)) + hashes := make([]common.Hash, len(req.Txs)) for i, tx := range req.Txs { - txHashes[i] = tx.Hash() + hashes[i] = tx.Hash() + } + stats := pm.txStatus(hashes) + for i, stat := range stats { + if stat.Status == core.TxStatusUnknown { + if errs := pm.txpool.AddRemotes([]*types.Transaction{req.Txs[i]}); errs[0] != nil { + stats[i].Error = errs[0] + continue + } + stats[i] = pm.txStatus([]common.Hash{hashes[i]})[0] + } } - - res := pm.addOrGetTxStatus(req.Txs, txHashes) bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost) pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost) - return p.SendTxStatus(req.ReqID, bv, res) + + return p.SendTxStatus(req.ReqID, bv, stats) case GetTxStatusMsg: if pm.txpool == nil { @@ -1027,22 +1032,20 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } // Transactions arrived, parse all of them and deliver to the pool var req struct { - ReqID uint64 - TxHashes []common.Hash + ReqID uint64 + Hashes []common.Hash } if err := msg.Decode(&req); err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) } - reqCnt := len(req.TxHashes) + reqCnt := len(req.Hashes) if reject(uint64(reqCnt), MaxTxStatus) { return errResp(ErrRequestRejected, "") } - - res := pm.addOrGetTxStatus(nil, req.TxHashes) - bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost) pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost) - return p.SendTxStatus(req.ReqID, bv, res) + + return p.SendTxStatus(req.ReqID, bv, pm.txStatus(req.Hashes)) case TxStatusMsg: if pm.odr == nil { @@ -1052,7 +1055,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { p.Log().Trace("Received tx status response") var resp struct { ReqID, BV uint64 - Status []core.TxStatusData + Status []core.TxStatus } if err := msg.Decode(&resp); err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) @@ -1103,19 +1106,21 @@ func (pm *ProtocolManager) getHelperTrieAuxData(req HelperTrieReq) []byte { return nil } -func (pm *ProtocolManager) addOrGetTxStatus(txs []*types.Transaction, txHashes []common.Hash) []core.TxStatusData { - status := pm.txpool.AddOrGetTxStatus(txs, txHashes) - for i, _ := range status { - blockHash, blockNum, txIndex := core.GetTxLookupEntry(pm.chainDb, txHashes[i]) - if blockHash != (common.Hash{}) { - enc, err := rlp.EncodeToBytes(core.TxLookupEntry{BlockHash: blockHash, BlockIndex: blockNum, Index: txIndex}) - if err != nil { - panic(err) +func (pm *ProtocolManager) txStatus(hashes []common.Hash) []txStatus { + stats := make([]txStatus, len(hashes)) + for i, stat := range pm.txpool.Status(hashes) { + // Save the status we've got from the transaction pool + stats[i].Status = stat + + // If the transaction is unknown to the pool, try looking it up locally + if stat == core.TxStatusUnknown { + if block, number, index := core.GetTxLookupEntry(pm.chainDb, hashes[i]); block != (common.Hash{}) { + stats[i].Status = core.TxStatusIncluded + stats[i].Lookup = &core.TxLookupEntry{BlockHash: block, BlockIndex: number, Index: index} } - status[i] = core.TxStatusData{Status: core.TxStatusIncluded, Data: enc} } } - return status + return stats } // NodeInfo retrieves some protocol metadata about the running host node. diff --git a/les/handler_test.go b/les/handler_test.go index a094cdc84..9e63e15a6 100644 --- a/les/handler_test.go +++ b/les/handler_test.go @@ -20,8 +20,8 @@ import ( "bytes" "math/big" "math/rand" - "runtime" "testing" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" @@ -423,7 +423,7 @@ func TestTransactionStatusLes2(t *testing.T) { var reqID uint64 - test := func(tx *types.Transaction, send bool, expStatus core.TxStatusData) { + test := func(tx *types.Transaction, send bool, expStatus txStatus) { reqID++ if send { cost := peer.GetRequestCost(SendTxV2Msg, 1) @@ -432,7 +432,7 @@ func TestTransactionStatusLes2(t *testing.T) { cost := peer.GetRequestCost(GetTxStatusMsg, 1) sendRequest(peer.app, GetTxStatusMsg, reqID, cost, []common.Hash{tx.Hash()}) } - if err := expectResponse(peer.app, TxStatusMsg, reqID, testBufLimit, []core.TxStatusData{expStatus}); err != nil { + if err := expectResponse(peer.app, TxStatusMsg, reqID, testBufLimit, []txStatus{expStatus}); err != nil { t.Errorf("transaction status mismatch") } } @@ -441,20 +441,20 @@ func TestTransactionStatusLes2(t *testing.T) { // test error status by sending an underpriced transaction tx0, _ := types.SignTx(types.NewTransaction(0, acc1Addr, big.NewInt(10000), bigTxGas, nil, nil), signer, testBankKey) - test(tx0, true, core.TxStatusData{Status: core.TxStatusError, Data: []byte("transaction underpriced")}) + test(tx0, true, txStatus{Status: core.TxStatusUnknown, Error: core.ErrUnderpriced}) tx1, _ := types.SignTx(types.NewTransaction(0, acc1Addr, big.NewInt(10000), bigTxGas, big.NewInt(100000000000), nil), signer, testBankKey) - test(tx1, false, core.TxStatusData{Status: core.TxStatusUnknown}) // query before sending, should be unknown - test(tx1, true, core.TxStatusData{Status: core.TxStatusPending}) // send valid processable tx, should return pending - test(tx1, true, core.TxStatusData{Status: core.TxStatusPending}) // adding it again should not return an error + test(tx1, false, txStatus{Status: core.TxStatusUnknown}) // query before sending, should be unknown + test(tx1, true, txStatus{Status: core.TxStatusPending}) // send valid processable tx, should return pending + test(tx1, true, txStatus{Status: core.TxStatusPending}) // adding it again should not return an error tx2, _ := types.SignTx(types.NewTransaction(1, acc1Addr, big.NewInt(10000), bigTxGas, big.NewInt(100000000000), nil), signer, testBankKey) tx3, _ := types.SignTx(types.NewTransaction(2, acc1Addr, big.NewInt(10000), bigTxGas, big.NewInt(100000000000), nil), signer, testBankKey) // send transactions in the wrong order, tx3 should be queued - test(tx3, true, core.TxStatusData{Status: core.TxStatusQueued}) - test(tx2, true, core.TxStatusData{Status: core.TxStatusPending}) + test(tx3, true, txStatus{Status: core.TxStatusQueued}) + test(tx2, true, txStatus{Status: core.TxStatusPending}) // query again, now tx3 should be pending too - test(tx3, false, core.TxStatusData{Status: core.TxStatusPending}) + test(tx3, false, txStatus{Status: core.TxStatusPending}) // generate and add a block with tx1 and tx2 included gchain, _ := core.GenerateChain(params.TestChainConfig, chain.GetBlockByNumber(0), db, 1, func(i int, block *core.BlockGen) { @@ -464,13 +464,21 @@ func TestTransactionStatusLes2(t *testing.T) { if _, err := chain.InsertChain(gchain); err != nil { panic(err) } + // wait until TxPool processes the inserted block + for i := 0; i < 10; i++ { + if pending, _ := txpool.Stats(); pending == 1 { + break + } + time.Sleep(100 * time.Millisecond) + } + if pending, _ := txpool.Stats(); pending != 1 { + t.Fatalf("pending count mismatch: have %d, want 1", pending) + } // check if their status is included now block1hash := core.GetCanonicalHash(db, 1) - tx1pos, _ := rlp.EncodeToBytes(core.TxLookupEntry{BlockHash: block1hash, BlockIndex: 1, Index: 0}) - tx2pos, _ := rlp.EncodeToBytes(core.TxLookupEntry{BlockHash: block1hash, BlockIndex: 1, Index: 1}) - test(tx1, false, core.TxStatusData{Status: core.TxStatusIncluded, Data: tx1pos}) - test(tx2, false, core.TxStatusData{Status: core.TxStatusIncluded, Data: tx2pos}) + test(tx1, false, txStatus{Status: core.TxStatusIncluded, Lookup: &core.TxLookupEntry{BlockHash: block1hash, BlockIndex: 1, Index: 0}}) + test(tx2, false, txStatus{Status: core.TxStatusIncluded, Lookup: &core.TxLookupEntry{BlockHash: block1hash, BlockIndex: 1, Index: 1}}) // create a reorg that rolls them back gchain, _ = core.GenerateChain(params.TestChainConfig, chain.GetBlockByNumber(0), db, 2, func(i int, block *core.BlockGen) {}) @@ -478,13 +486,16 @@ func TestTransactionStatusLes2(t *testing.T) { panic(err) } // wait until TxPool processes the reorg - for { + for i := 0; i < 10; i++ { if pending, _ := txpool.Stats(); pending == 3 { break } - runtime.Gosched() + time.Sleep(100 * time.Millisecond) + } + if pending, _ := txpool.Stats(); pending != 3 { + t.Fatalf("pending count mismatch: have %d, want 3", pending) } // check if their status is pending again - test(tx1, false, core.TxStatusData{Status: core.TxStatusPending}) - test(tx2, false, core.TxStatusData{Status: core.TxStatusPending}) + test(tx1, false, txStatus{Status: core.TxStatusPending}) + test(tx2, false, txStatus{Status: core.TxStatusPending}) } diff --git a/les/peer.go b/les/peer.go index 104afb6dc..524690e2f 100644 --- a/les/peer.go +++ b/les/peer.go @@ -27,7 +27,6 @@ import ( "time" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/eth" "github.com/ethereum/go-ethereum/les/flowcontrol" @@ -233,8 +232,8 @@ func (p *peer) SendHelperTrieProofs(reqID, bv uint64, resp HelperTrieResps) erro } // SendTxStatus sends a batch of transaction status records, corresponding to the ones requested. -func (p *peer) SendTxStatus(reqID, bv uint64, status []core.TxStatusData) error { - return sendResponse(p.rw, TxStatusMsg, reqID, bv, status) +func (p *peer) SendTxStatus(reqID, bv uint64, stats []txStatus) error { + return sendResponse(p.rw, TxStatusMsg, reqID, bv, stats) } // RequestHeadersByHash fetches a batch of blocks' headers corresponding to the diff --git a/les/protocol.go b/les/protocol.go index 146b02030..05e6654d6 100644 --- a/les/protocol.go +++ b/les/protocol.go @@ -27,6 +27,7 @@ import ( "math/big" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto/secp256k1" "github.com/ethereum/go-ethereum/rlp" @@ -219,3 +220,9 @@ type CodeData []struct { } type proofsData [][]rlp.RawValue + +type txStatus struct { + Status core.TxStatus + Lookup *core.TxLookupEntry + Error error +} |