diff options
-rw-r--r-- | cmd/utils/flags.go | 12 | ||||
-rw-r--r-- | core/bench_test.go | 2 | ||||
-rw-r--r-- | core/block_processor.go | 64 | ||||
-rw-r--r-- | core/block_processor_test.go | 4 | ||||
-rw-r--r-- | core/chain_makers.go | 2 | ||||
-rw-r--r-- | core/chain_makers_test.go | 2 | ||||
-rw-r--r-- | core/chain_manager.go | 33 | ||||
-rw-r--r-- | core/chain_manager_test.go | 12 | ||||
-rw-r--r-- | core/filter.go | 2 | ||||
-rw-r--r-- | core/manager.go | 1 | ||||
-rw-r--r-- | core/transaction_util.go | 51 | ||||
-rw-r--r-- | core/types/common.go | 2 | ||||
-rw-r--r-- | eth/backend.go | 2 | ||||
-rw-r--r-- | eth/protocol_test.go | 2 | ||||
-rw-r--r-- | miner/worker.go | 35 | ||||
-rw-r--r-- | rpc/codec/json.go | 58 | ||||
-rw-r--r-- | rpc/codec/json_test.go | 141 | ||||
-rw-r--r-- | rpc/jeth.go | 18 | ||||
-rw-r--r-- | xeth/xeth.go | 1 |
19 files changed, 307 insertions, 137 deletions
diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 6f319eb40..b4182ff59 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -412,7 +412,7 @@ func MakeChain(ctx *cli.Context) (chain *core.ChainManager, blockDB, stateDB, ex eventMux := new(event.TypeMux) pow := ethash.New() genesis := core.GenesisBlock(uint64(ctx.GlobalInt(GenesisNonceFlag.Name)), blockDB) - chain, err = core.NewChainManager(genesis, blockDB, stateDB, pow, eventMux) + chain, err = core.NewChainManager(genesis, blockDB, stateDB, extraDB, pow, eventMux) if err != nil { Fatalf("Could not start chainmanager: %v", err) } @@ -432,17 +432,17 @@ func MakeAccountManager(ctx *cli.Context) *accounts.Manager { func IpcSocketPath(ctx *cli.Context) (ipcpath string) { if common.IsWindows() { ipcpath = common.DefaultIpcPath() - if ipcpath != ctx.GlobalString(IPCPathFlag.Name) { + if ctx.GlobalIsSet(IPCPathFlag.Name) { ipcpath = ctx.GlobalString(IPCPathFlag.Name) } } else { ipcpath = common.DefaultIpcPath() - if ctx.GlobalString(IPCPathFlag.Name) != common.DefaultIpcPath() { - ipcpath = ctx.GlobalString(IPCPathFlag.Name) - } else if ctx.GlobalString(DataDirFlag.Name) != "" && - ctx.GlobalString(DataDirFlag.Name) != common.DefaultDataDir() { + if ctx.GlobalIsSet(DataDirFlag.Name) { ipcpath = filepath.Join(ctx.GlobalString(DataDirFlag.Name), "geth.ipc") } + if ctx.GlobalIsSet(IPCPathFlag.Name) { + ipcpath = ctx.GlobalString(IPCPathFlag.Name) + } } return diff --git a/core/bench_test.go b/core/bench_test.go index 6d851febd..8cd8c4299 100644 --- a/core/bench_test.go +++ b/core/bench_test.go @@ -152,7 +152,7 @@ func benchInsertChain(b *testing.B, disk bool, gen func(int, *BlockGen)) { // Time the insertion of the new chain. // State and blocks are stored in the same DB. evmux := new(event.TypeMux) - chainman, _ := NewChainManager(genesis, db, db, FakePow{}, evmux) + chainman, _ := NewChainManager(genesis, db, db, db, FakePow{}, evmux) chainman.SetProcessor(NewBlockProcessor(db, db, FakePow{}, chainman, evmux)) defer chainman.Stop() b.ReportAllocs() diff --git a/core/block_processor.go b/core/block_processor.go index 9b77d10eb..7171e3b2e 100644 --- a/core/block_processor.go +++ b/core/block_processor.go @@ -151,7 +151,7 @@ func (sm *BlockProcessor) RetryProcess(block *types.Block) (logs state.Logs, err errch := make(chan bool) go func() { errch <- sm.Pow.Verify(block) }() - logs, err = sm.processWithParent(block, parent) + logs, _, err = sm.processWithParent(block, parent) if !<-errch { return nil, ValidationError("Block's nonce is invalid (= %x)", block.Nonce) } @@ -162,23 +162,23 @@ func (sm *BlockProcessor) RetryProcess(block *types.Block) (logs state.Logs, err // Process block will attempt to process the given block's transactions and applies them // on top of the block's parent state (given it exists) and will return wether it was // successful or not. -func (sm *BlockProcessor) Process(block *types.Block) (logs state.Logs, err error) { +func (sm *BlockProcessor) Process(block *types.Block) (logs state.Logs, receipts types.Receipts, err error) { // Processing a blocks may never happen simultaneously sm.mutex.Lock() defer sm.mutex.Unlock() if sm.bc.HasBlock(block.Hash()) { - return nil, &KnownBlockError{block.Number(), block.Hash()} + return nil, nil, &KnownBlockError{block.Number(), block.Hash()} } if !sm.bc.HasBlock(block.ParentHash()) { - return nil, ParentError(block.ParentHash()) + return nil, nil, ParentError(block.ParentHash()) } parent := sm.bc.GetBlock(block.ParentHash()) return sm.processWithParent(block, parent) } -func (sm *BlockProcessor) processWithParent(block, parent *types.Block) (logs state.Logs, err error) { +func (sm *BlockProcessor) processWithParent(block, parent *types.Block) (logs state.Logs, receipts types.Receipts, err error) { // Create a new state based on the parent's root (e.g., create copy) state := state.New(parent.Root(), sm.db) header := block.Header() @@ -192,10 +192,10 @@ func (sm *BlockProcessor) processWithParent(block, parent *types.Block) (logs st // There can be at most two uncles if len(uncles) > 2 { - return nil, ValidationError("Block can only contain maximum 2 uncles (contained %v)", len(uncles)) + return nil, nil, ValidationError("Block can only contain maximum 2 uncles (contained %v)", len(uncles)) } - receipts, err := sm.TransitionState(state, parent, block, false) + receipts, err = sm.TransitionState(state, parent, block, false) if err != nil { return } @@ -248,15 +248,7 @@ func (sm *BlockProcessor) processWithParent(block, parent *types.Block) (logs st // Sync the current block's state to the database state.Sync() - // This puts transactions in a extra db for rpc - for i, tx := range block.Transactions() { - putTx(sm.extraDb, tx, block, uint64(i)) - } - - // store the receipts - putReceipts(sm.extraDb, block.Hash(), receipts) - - return state.Logs(), nil + return state.Logs(), receipts, nil } var ( @@ -411,43 +403,3 @@ func getBlockReceipts(db common.Database, bhash common.Hash) (receipts types.Rec } return } - -func putTx(db common.Database, tx *types.Transaction, block *types.Block, i uint64) { - rlpEnc, err := rlp.EncodeToBytes(tx) - if err != nil { - glog.V(logger.Debug).Infoln("Failed encoding tx", err) - return - } - db.Put(tx.Hash().Bytes(), rlpEnc) - - var txExtra struct { - BlockHash common.Hash - BlockIndex uint64 - Index uint64 - } - txExtra.BlockHash = block.Hash() - txExtra.BlockIndex = block.NumberU64() - txExtra.Index = i - rlpMeta, err := rlp.EncodeToBytes(txExtra) - if err != nil { - glog.V(logger.Debug).Infoln("Failed encoding tx meta data", err) - return - } - db.Put(append(tx.Hash().Bytes(), 0x0001), rlpMeta) -} - -func putReceipts(db common.Database, hash common.Hash, receipts types.Receipts) error { - storageReceipts := make([]*types.ReceiptForStorage, len(receipts)) - for i, receipt := range receipts { - storageReceipts[i] = (*types.ReceiptForStorage)(receipt) - } - - bytes, err := rlp.EncodeToBytes(storageReceipts) - if err != nil { - return err - } - - db.Put(append(receiptsPre, hash[:]...), bytes) - - return nil -} diff --git a/core/block_processor_test.go b/core/block_processor_test.go index dc328a3ea..99681dabf 100644 --- a/core/block_processor_test.go +++ b/core/block_processor_test.go @@ -18,7 +18,7 @@ func proc() (*BlockProcessor, *ChainManager) { var mux event.TypeMux genesis := GenesisBlock(0, db) - chainMan, err := NewChainManager(genesis, db, db, thePow(), &mux) + chainMan, err := NewChainManager(genesis, db, db, db, thePow(), &mux) if err != nil { fmt.Println(err) } @@ -64,7 +64,7 @@ func TestPutReceipt(t *testing.T) { Index: 0, }}) - putReceipts(db, hash, types.Receipts{receipt}) + PutReceipts(db, hash, types.Receipts{receipt}) receipts, err := getBlockReceipts(db, hash) if err != nil { t.Error("got err:", err) diff --git a/core/chain_makers.go b/core/chain_makers.go index 013251d74..37475e0ae 100644 --- a/core/chain_makers.go +++ b/core/chain_makers.go @@ -167,7 +167,7 @@ func makeHeader(parent *types.Block, state *state.StateDB) *types.Header { // InsertChain on the result of makeChain. func newCanonical(n int, db common.Database) (*BlockProcessor, error) { evmux := &event.TypeMux{} - chainman, _ := NewChainManager(GenesisBlock(0, db), db, db, FakePow{}, evmux) + chainman, _ := NewChainManager(GenesisBlock(0, db), db, db, db, FakePow{}, evmux) bman := NewBlockProcessor(db, db, FakePow{}, chainman, evmux) bman.bc.SetProcessor(bman) parent := bman.bc.CurrentBlock() diff --git a/core/chain_makers_test.go b/core/chain_makers_test.go index d5125e1c3..f4eeef082 100644 --- a/core/chain_makers_test.go +++ b/core/chain_makers_test.go @@ -58,7 +58,7 @@ func ExampleGenerateChain() { // Import the chain. This runs all block validation rules. evmux := &event.TypeMux{} - chainman, _ := NewChainManager(genesis, db, db, FakePow{}, evmux) + chainman, _ := NewChainManager(genesis, db, db, db, FakePow{}, evmux) chainman.SetProcessor(NewBlockProcessor(db, db, FakePow{}, chainman, evmux)) if i, err := chainman.InsertChain(chain); err != nil { fmt.Printf("insert error (block %d): %v\n", i, err) diff --git a/core/chain_manager.go b/core/chain_manager.go index 70a8b11c6..b5381e336 100644 --- a/core/chain_manager.go +++ b/core/chain_manager.go @@ -42,6 +42,7 @@ type ChainManager struct { //eth EthManager blockDb common.Database stateDb common.Database + extraDb common.Database processor types.BlockProcessor eventMux *event.TypeMux genesisBlock *types.Block @@ -70,11 +71,12 @@ type ChainManager struct { pow pow.PoW } -func NewChainManager(genesis *types.Block, blockDb, stateDb common.Database, pow pow.PoW, mux *event.TypeMux) (*ChainManager, error) { +func NewChainManager(genesis *types.Block, blockDb, stateDb, extraDb common.Database, pow pow.PoW, mux *event.TypeMux) (*ChainManager, error) { cache, _ := lru.New(blockCacheLimit) bc := &ChainManager{ blockDb: blockDb, stateDb: stateDb, + extraDb: extraDb, genesisBlock: GenesisBlock(42, stateDb), eventMux: mux, quit: make(chan struct{}), @@ -477,10 +479,10 @@ func (self *ChainManager) procFutureBlocks() { type writeStatus byte const ( - nonStatTy writeStatus = iota - canonStatTy - splitStatTy - sideStatTy + NonStatTy writeStatus = iota + CanonStatTy + SplitStatTy + SideStatTy ) // WriteBlock writes the block to the chain (or pending queue) @@ -497,10 +499,10 @@ func (self *ChainManager) WriteBlock(block *types.Block, queued bool) (status wr // during split we merge two different chains and create the new canonical chain err := self.merge(cblock, block) if err != nil { - return nonStatTy, err + return NonStatTy, err } - status = splitStatTy + status = SplitStatTy } self.mu.Lock() @@ -511,9 +513,9 @@ func (self *ChainManager) WriteBlock(block *types.Block, queued bool) (status wr self.setTransState(state.New(block.Root(), self.stateDb)) self.txState.SetState(state.New(block.Root(), self.stateDb)) - status = canonStatTy + status = CanonStatTy } else { - status = sideStatTy + status = SideStatTy } self.write(block) @@ -581,7 +583,7 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) { // Call in to the block processor and check for errors. It's likely that if one block fails // all others will fail too (unless a known block is returned). - logs, err := self.processor.Process(block) + logs, receipts, err := self.processor.Process(block) if err != nil { if IsKnownBlockErr(err) { stats.ignored++ @@ -620,19 +622,24 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) { return i, err } switch status { - case canonStatTy: + case CanonStatTy: if glog.V(logger.Debug) { glog.Infof("[%v] inserted block #%d (%d TXs %d UNCs) (%x...). Took %v\n", time.Now().UnixNano(), block.Number(), len(block.Transactions()), len(block.Uncles()), block.Hash().Bytes()[0:4], time.Since(bstart)) } queue[i] = ChainEvent{block, block.Hash(), logs} queueEvent.canonicalCount++ - case sideStatTy: + + // This puts transactions in a extra db for rpc + PutTransactions(self.extraDb, block, block.Transactions()) + // store the receipts + PutReceipts(self.extraDb, block.Hash(), receipts) + case SideStatTy: if glog.V(logger.Detail) { glog.Infof("inserted forked block #%d (TD=%v) (%d TXs %d UNCs) (%x...). Took %v\n", block.Number(), block.Difficulty(), len(block.Transactions()), len(block.Uncles()), block.Hash().Bytes()[0:4], time.Since(bstart)) } queue[i] = ChainSideEvent{block, logs} queueEvent.sideCount++ - case splitStatTy: + case SplitStatTy: queue[i] = ChainSplitEvent{block, logs} queueEvent.splitCount++ } diff --git a/core/chain_manager_test.go b/core/chain_manager_test.go index 6869bc746..c013fc729 100644 --- a/core/chain_manager_test.go +++ b/core/chain_manager_test.go @@ -33,7 +33,7 @@ func thePow() pow.PoW { func theChainManager(db common.Database, t *testing.T) *ChainManager { var eventMux event.TypeMux genesis := GenesisBlock(0, db) - chainMan, err := NewChainManager(genesis, db, db, thePow(), &eventMux) + chainMan, err := NewChainManager(genesis, db, db, db, thePow(), &eventMux) if err != nil { t.Error("failed creating chainmanager:", err) t.FailNow() @@ -96,7 +96,7 @@ func printChain(bc *ChainManager) { func testChain(chainB types.Blocks, bman *BlockProcessor) (*big.Int, error) { td := new(big.Int) for _, block := range chainB { - _, err := bman.bc.processor.Process(block) + _, _, err := bman.bc.processor.Process(block) if err != nil { if IsKnownBlockErr(err) { continue @@ -367,7 +367,7 @@ func TestGetBlocksFromHash(t *testing.T) { type bproc struct{} -func (bproc) Process(*types.Block) (state.Logs, error) { return nil, nil } +func (bproc) Process(*types.Block) (state.Logs, types.Receipts, error) { return nil, nil, nil } func makeChainWithDiff(genesis *types.Block, d []int, seed byte) []*types.Block { var chain []*types.Block @@ -390,7 +390,7 @@ func makeChainWithDiff(genesis *types.Block, d []int, seed byte) []*types.Block func chm(genesis *types.Block, db common.Database) *ChainManager { var eventMux event.TypeMux - bc := &ChainManager{blockDb: db, stateDb: db, genesisBlock: genesis, eventMux: &eventMux, pow: FakePow{}} + bc := &ChainManager{extraDb: db, blockDb: db, stateDb: db, genesisBlock: genesis, eventMux: &eventMux, pow: FakePow{}} bc.cache, _ = lru.New(100) bc.futureBlocks, _ = lru.New(100) bc.processor = bproc{} @@ -479,12 +479,12 @@ func TestGenesisMismatch(t *testing.T) { db, _ := ethdb.NewMemDatabase() var mux event.TypeMux genesis := GenesisBlock(0, db) - _, err := NewChainManager(genesis, db, db, thePow(), &mux) + _, err := NewChainManager(genesis, db, db, db, thePow(), &mux) if err != nil { t.Error(err) } genesis = GenesisBlock(1, db) - _, err = NewChainManager(genesis, db, db, thePow(), &mux) + _, err = NewChainManager(genesis, db, db, db, thePow(), &mux) if err == nil { t.Error("expected genesis mismatch error") } diff --git a/core/filter.go b/core/filter.go index fcdf68dd0..121e4642d 100644 --- a/core/filter.go +++ b/core/filter.go @@ -1,7 +1,6 @@ package core import ( - "fmt" "math" "github.com/ethereum/go-ethereum/common" @@ -80,7 +79,6 @@ func (self *Filter) Find() state.Logs { done: for i := 0; block != nil; i++ { - fmt.Println(block.NumberU64() == 0) // Quit on latest switch { case block.NumberU64() == 0: diff --git a/core/manager.go b/core/manager.go index ba0ecf9d1..576cf55b0 100644 --- a/core/manager.go +++ b/core/manager.go @@ -14,5 +14,6 @@ type Backend interface { TxPool() *TxPool BlockDb() common.Database StateDb() common.Database + ExtraDb() common.Database EventMux() *event.TypeMux } diff --git a/core/transaction_util.go b/core/transaction_util.go new file mode 100644 index 000000000..bbb215d91 --- /dev/null +++ b/core/transaction_util.go @@ -0,0 +1,51 @@ +package core + +import ( + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/logger/glog" + "github.com/ethereum/go-ethereum/rlp" +) + +func PutTransactions(db common.Database, block *types.Block, txs types.Transactions) { + for i, tx := range block.Transactions() { + rlpEnc, err := rlp.EncodeToBytes(tx) + if err != nil { + glog.V(logger.Debug).Infoln("Failed encoding tx", err) + return + } + db.Put(tx.Hash().Bytes(), rlpEnc) + + var txExtra struct { + BlockHash common.Hash + BlockIndex uint64 + Index uint64 + } + txExtra.BlockHash = block.Hash() + txExtra.BlockIndex = block.NumberU64() + txExtra.Index = uint64(i) + rlpMeta, err := rlp.EncodeToBytes(txExtra) + if err != nil { + glog.V(logger.Debug).Infoln("Failed encoding tx meta data", err) + return + } + db.Put(append(tx.Hash().Bytes(), 0x0001), rlpMeta) + } +} + +func PutReceipts(db common.Database, hash common.Hash, receipts types.Receipts) error { + storageReceipts := make([]*types.ReceiptForStorage, len(receipts)) + for i, receipt := range receipts { + storageReceipts[i] = (*types.ReceiptForStorage)(receipt) + } + + bytes, err := rlp.EncodeToBytes(storageReceipts) + if err != nil { + return err + } + + db.Put(append(receiptsPre, hash[:]...), bytes) + + return nil +} diff --git a/core/types/common.go b/core/types/common.go index dbdaaba0c..09d1e2fed 100644 --- a/core/types/common.go +++ b/core/types/common.go @@ -10,7 +10,7 @@ import ( ) type BlockProcessor interface { - Process(*Block) (state.Logs, error) + Process(*Block) (state.Logs, Receipts, error) } const bloomLength = 256 diff --git a/eth/backend.go b/eth/backend.go index d6ad3381d..618eec9fb 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -339,7 +339,7 @@ func New(config *Config) (*Ethereum, error) { eth.pow = ethash.New() genesis := core.GenesisBlock(uint64(config.GenesisNonce), stateDb) - eth.chainManager, err = core.NewChainManager(genesis, blockDb, stateDb, eth.pow, eth.EventMux()) + eth.chainManager, err = core.NewChainManager(genesis, blockDb, stateDb, extraDb, eth.pow, eth.EventMux()) if err != nil { return nil, err } diff --git a/eth/protocol_test.go b/eth/protocol_test.go index 4c1579d4e..2cc3d06ab 100644 --- a/eth/protocol_test.go +++ b/eth/protocol_test.go @@ -165,7 +165,7 @@ func newProtocolManagerForTesting(txAdded chan<- []*types.Transaction) *Protocol var ( em = new(event.TypeMux) db, _ = ethdb.NewMemDatabase() - chain, _ = core.NewChainManager(core.GenesisBlock(0, db), db, db, core.FakePow{}, em) + chain, _ = core.NewChainManager(core.GenesisBlock(0, db), db, db, db, core.FakePow{}, em) txpool = &fakeTxPool{added: txAdded} pm = NewProtocolManager(0, em, txpool, core.FakePow{}, chain) ) diff --git a/miner/worker.go b/miner/worker.go index 90914ddcb..1c1e8f927 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -79,9 +79,10 @@ type worker struct { quit chan struct{} pow pow.PoW - eth core.Backend - chain *core.ChainManager - proc *core.BlockProcessor + eth core.Backend + chain *core.ChainManager + proc *core.BlockProcessor + extraDb common.Database coinbase common.Address gasPrice *big.Int @@ -105,6 +106,7 @@ func newWorker(coinbase common.Address, eth core.Backend) *worker { worker := &worker{ eth: eth, mux: eth.EventMux(), + extraDb: eth.ExtraDb(), recv: make(chan *types.Block), gasPrice: new(big.Int), chain: eth.ChainManager(), @@ -233,11 +235,28 @@ func (self *worker) wait() { continue } - _, err := self.chain.WriteBlock(block, false) + parent := self.chain.GetBlock(block.ParentHash()) + if parent == nil { + glog.V(logger.Error).Infoln("Invalid block found during mining") + continue + } + if err := core.ValidateHeader(self.eth.BlockProcessor().Pow, block.Header(), parent, true); err != nil && err != core.BlockFutureErr { + glog.V(logger.Error).Infoln("Invalid header on mined block:", err) + continue + } + + stat, err := self.chain.WriteBlock(block, false) if err != nil { glog.V(logger.Error).Infoln("error writing block to chain", err) continue } + // check if canon block and write transactions + if stat == core.CanonStatTy { + // This puts transactions in a extra db for rpc + core.PutTransactions(self.extraDb, block, block.Transactions()) + // store the receipts + core.PutReceipts(self.extraDb, block.Hash(), self.current.receipts) + } // check staleness and display confirmation var stale, confirm string @@ -252,7 +271,13 @@ func (self *worker) wait() { glog.V(logger.Info).Infof("🔨 Mined %sblock (#%v / %x). %s", stale, block.Number(), block.Hash().Bytes()[:4], confirm) // broadcast before waiting for validation - go self.mux.Post(core.NewMinedBlockEvent{block}) + go func(block *types.Block, logs state.Logs) { + self.mux.Post(core.NewMinedBlockEvent{block}) + self.mux.Post(core.ChainEvent{block, block.Hash(), logs}) + if stat == core.CanonStatTy { + self.mux.Post(core.ChainHeadEvent{block}) + } + }(block, self.current.state.Logs()) self.commitNewWork() } diff --git a/rpc/codec/json.go b/rpc/codec/json.go index 0b1a90562..8aa0e6bbf 100644 --- a/rpc/codec/json.go +++ b/rpc/codec/json.go @@ -10,7 +10,7 @@ import ( ) const ( - READ_TIMEOUT = 15 // read timeout in seconds + READ_TIMEOUT = 60 // in seconds MAX_REQUEST_SIZE = 1024 * 1024 MAX_RESPONSE_SIZE = 1024 * 1024 ) @@ -18,51 +18,43 @@ const ( // Json serialization support type JsonCodec struct { c net.Conn + d *json.Decoder } // Create new JSON coder instance func NewJsonCoder(conn net.Conn) ApiCoder { return &JsonCodec{ c: conn, + d: json.NewDecoder(conn), } } -// Serialize obj to JSON and write it to conn +// Read incoming request and parse it to RPC request func (self *JsonCodec) ReadRequest() (requests []*shared.Request, isBatch bool, err error) { - bytesInBuffer := 0 - buf := make([]byte, MAX_REQUEST_SIZE) - deadline := time.Now().Add(READ_TIMEOUT * time.Second) if err := self.c.SetDeadline(deadline); err != nil { return nil, false, err } - for { - n, err := self.c.Read(buf[bytesInBuffer:]) - if err != nil { - self.c.Close() - return nil, false, err - } - - bytesInBuffer += n - - singleRequest := shared.Request{} - err = json.Unmarshal(buf[:bytesInBuffer], &singleRequest) - if err == nil { - requests := make([]*shared.Request, 1) - requests[0] = &singleRequest - return requests, false, nil - } - - requests = make([]*shared.Request, 0) - err = json.Unmarshal(buf[:bytesInBuffer], &requests) - if err == nil { - return requests, true, nil + var incoming json.RawMessage + err = self.d.Decode(&incoming) + if err == nil { + isBatch = incoming[0] == '[' + if isBatch { + requests = make([]*shared.Request, 0) + err = json.Unmarshal(incoming, &requests) + } else { + requests = make([]*shared.Request, 1) + var singleRequest shared.Request + if err = json.Unmarshal(incoming, &singleRequest); err == nil { + requests[0] = &singleRequest + } } + return } - self.c.Close() // timeout - return nil, false, fmt.Errorf("Unable to read response") + self.c.Close() + return nil, false, err } func (self *JsonCodec) ReadResponse() (interface{}, error) { @@ -81,15 +73,15 @@ func (self *JsonCodec) ReadResponse() (interface{}, error) { } bytesInBuffer += n + var failure shared.ErrorResponse + if err = json.Unmarshal(buf[:bytesInBuffer], &failure); err == nil && failure.Error != nil { + return failure, fmt.Errorf(failure.Error.Message) + } + var success shared.SuccessResponse if err = json.Unmarshal(buf[:bytesInBuffer], &success); err == nil { return success, nil } - - var failure shared.ErrorResponse - if err = json.Unmarshal(buf[:bytesInBuffer], &failure); err == nil && failure.Error != nil { - return failure, nil - } } self.c.Close() diff --git a/rpc/codec/json_test.go b/rpc/codec/json_test.go new file mode 100644 index 000000000..d5c672cdf --- /dev/null +++ b/rpc/codec/json_test.go @@ -0,0 +1,141 @@ +package codec + +import ( + "bytes" + "io" + "net" + "testing" + "time" +) + +type jsonTestConn struct { + buffer *bytes.Buffer +} + +func newJsonTestConn(data []byte) *jsonTestConn { + return &jsonTestConn{ + buffer: bytes.NewBuffer(data), + } +} + +func (self *jsonTestConn) Read(p []byte) (n int, err error) { + return self.buffer.Read(p) +} + +func (self *jsonTestConn) Write(p []byte) (n int, err error) { + return self.buffer.Write(p) +} + +func (self *jsonTestConn) Close() error { + // not implemented + return nil +} + +func (self *jsonTestConn) LocalAddr() net.Addr { + // not implemented + return nil +} + +func (self *jsonTestConn) RemoteAddr() net.Addr { + // not implemented + return nil +} + +func (self *jsonTestConn) SetDeadline(t time.Time) error { + return nil +} + +func (self *jsonTestConn) SetReadDeadline(t time.Time) error { + return nil +} + +func (self *jsonTestConn) SetWriteDeadline(t time.Time) error { + return nil +} + +func TestJsonDecoderWithValidRequest(t *testing.T) { + reqdata := []byte(`{"jsonrpc":"2.0","method":"modules","params":[],"id":64}`) + decoder := newJsonTestConn(reqdata) + + jsonDecoder := NewJsonCoder(decoder) + requests, batch, err := jsonDecoder.ReadRequest() + + if err != nil { + t.Errorf("Read valid request failed - %v", err) + } + + if len(requests) != 1 { + t.Errorf("Expected to get a single request but got %d", len(requests)) + } + + if batch { + t.Errorf("Got batch indication while expecting single request") + } + + if requests[0].Id != float64(64) { + t.Errorf("Expected req.Id == 64 but got %v", requests[0].Id) + } + + if requests[0].Method != "modules" { + t.Errorf("Expected req.Method == 'modules' got '%s'", requests[0].Method) + } +} + +func TestJsonDecoderWithValidBatchRequest(t *testing.T) { + reqdata := []byte(`[{"jsonrpc":"2.0","method":"modules","params":[],"id":64}, + {"jsonrpc":"2.0","method":"modules","params":[],"id":64}]`) + decoder := newJsonTestConn(reqdata) + + jsonDecoder := NewJsonCoder(decoder) + requests, batch, err := jsonDecoder.ReadRequest() + + if err != nil { + t.Errorf("Read valid batch request failed - %v", err) + } + + if len(requests) != 2 { + t.Errorf("Expected to get two requests but got %d", len(requests)) + } + + if !batch { + t.Errorf("Got no batch indication while expecting batch request") + } + + for i := 0; i < len(requests); i++ { + if requests[i].Id != float64(64) { + t.Errorf("Expected req.Id == 64 but got %v", requests[i].Id) + } + + if requests[i].Method != "modules" { + t.Errorf("Expected req.Method == 'modules' got '%s'", requests[i].Method) + } + } +} + +func TestJsonDecoderWithInvalidIncompleteMessage(t *testing.T) { + reqdata := []byte(`{"jsonrpc":"2.0","method":"modules","pa`) + decoder := newJsonTestConn(reqdata) + + jsonDecoder := NewJsonCoder(decoder) + requests, batch, err := jsonDecoder.ReadRequest() + + if err != io.ErrUnexpectedEOF { + t.Errorf("Expected to read an incomplete request err but got %v", err) + } + + // remaining message + decoder.Write([]byte(`rams":[],"id:64"}`)) + requests, batch, err = jsonDecoder.ReadRequest() + + if err == nil { + t.Errorf("Expected an error but got nil") + } + + if len(requests) != 0 { + t.Errorf("Expected to get no requests but got %d", len(requests)) + } + + if batch { + t.Errorf("Got batch indication while expecting non batch") + } +} diff --git a/rpc/jeth.go b/rpc/jeth.go index 33fcd6efd..78e44c4da 100644 --- a/rpc/jeth.go +++ b/rpc/jeth.go @@ -3,6 +3,8 @@ package rpc import ( "encoding/json" + "fmt" + "github.com/ethereum/go-ethereum/jsre" "github.com/ethereum/go-ethereum/rpc/comms" "github.com/ethereum/go-ethereum/rpc/shared" @@ -20,14 +22,13 @@ func NewJeth(ethApi shared.EthereumApi, re *jsre.JSRE, client comms.EthereumClie } func (self *Jeth) err(call otto.FunctionCall, code int, msg string, id interface{}) (response otto.Value) { - rpcerr := &shared.ErrorObject{code, msg} - call.Otto.Set("ret_jsonrpc", shared.JsonRpcVersion) - call.Otto.Set("ret_id", id) - call.Otto.Set("ret_error", rpcerr) - response, _ = call.Otto.Run(` - ret_response = { jsonrpc: ret_jsonrpc, id: ret_id, error: ret_error }; - `) - return + errObj := fmt.Sprintf("{\"message\": \"%s\", \"code\": %d}", msg, code) + retResponse := fmt.Sprintf("ret_response = JSON.parse('{\"jsonrpc\": \"%s\", \"id\": %v, \"error\": %s}');", shared.JsonRpcVersion, id, errObj) + + call.Otto.Run("ret_error = " + errObj) + res, _ := call.Otto.Run(retResponse) + + return res } func (self *Jeth) Send(call otto.FunctionCall) (response otto.Value) { @@ -56,6 +57,7 @@ func (self *Jeth) Send(call otto.FunctionCall) (response otto.Value) { return self.err(call, -32603, err.Error(), req.Id) } respif, err = self.client.Recv() + if err != nil { return self.err(call, -32603, err.Error(), req.Id) } diff --git a/xeth/xeth.go b/xeth/xeth.go index 2a1366fe1..155ff3eea 100644 --- a/xeth/xeth.go +++ b/xeth/xeth.go @@ -980,6 +980,7 @@ func (self *XEth) Transact(fromStr, toStr, nonceStr, valueStr, gasStr, gasPriceS } else { glog.V(logger.Info).Infof("Tx(%x) to: %x\n", tx.Hash(), tx.To()) } + return tx.Hash().Hex(), nil } |