aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--cmd/mist/assets/examples/bomb.html22
-rw-r--r--cmd/mist/assets/qml/main.qml1
-rw-r--r--cmd/mist/gui.go21
-rw-r--r--core/block_processor.go25
-rw-r--r--core/chain_manager.go25
-rw-r--r--core/error.go17
-rw-r--r--core/events.go3
-rw-r--r--core/state_transition.go3
-rw-r--r--core/transaction_pool.go33
-rw-r--r--ethutil/common.go7
-rw-r--r--miner/worker.go22
-rw-r--r--pow/ezp/pow.go2
-rw-r--r--rpc/args.go2
-rw-r--r--rpc/message.go17
-rw-r--r--rpc/packages.go83
-rw-r--r--rpc/packages_test.go37
-rw-r--r--rpc/util.go39
-rw-r--r--state/state_object.go5
-rw-r--r--state/statedb.go45
-rw-r--r--xeth/types.go2
-rw-r--r--xeth/xeth.go29
21 files changed, 298 insertions, 142 deletions
diff --git a/cmd/mist/assets/examples/bomb.html b/cmd/mist/assets/examples/bomb.html
new file mode 100644
index 000000000..62540f9bb
--- /dev/null
+++ b/cmd/mist/assets/examples/bomb.html
@@ -0,0 +1,22 @@
+<html>
+<head>
+<script src="../ext/bignumber.min.js"></script>
+<script src="../ext/ethereum.js/dist/ethereum.js"></script>
+
+<script>
+var web3 = require('web3');
+web3.setProvider(new web3.providers.HttpSyncProvider('http://localhost:8545'));
+var eth = web3.eth;
+
+function bomb() {
+ for (var i = 0; i < 200; i++) {
+ eth.transact({})
+ }
+}
+</script>
+</head>
+
+<body>
+<button onclick="bomb();">BOOM!</button>
+</body>
+</html>
diff --git a/cmd/mist/assets/qml/main.qml b/cmd/mist/assets/qml/main.qml
index 86d0ea911..937670bd2 100644
--- a/cmd/mist/assets/qml/main.qml
+++ b/cmd/mist/assets/qml/main.qml
@@ -250,6 +250,7 @@ ApplicationWindow {
}
}
}
+
}
property var blockModel: ListModel {
diff --git a/cmd/mist/gui.go b/cmd/mist/gui.go
index efab18064..4af0cff43 100644
--- a/cmd/mist/gui.go
+++ b/cmd/mist/gui.go
@@ -131,6 +131,7 @@ func (gui *Gui) Start(assetPath string) {
context.SetVar("gui", gui)
context.SetVar("eth", gui.uiLib)
context.SetVar("shh", gui.whisper)
+ //clipboard.SetQMLClipboard(context)
win, err := gui.showWallet(context)
if err != nil {
@@ -386,14 +387,11 @@ func (gui *Gui) update() {
generalUpdateTicker := time.NewTicker(500 * time.Millisecond)
statsUpdateTicker := time.NewTicker(5 * time.Second)
- state := gui.eth.ChainManager().TransState()
-
- gui.win.Root().Call("setWalletValue", fmt.Sprintf("%v", ethutil.CurrencyToString(state.GetAccount(gui.address()).Balance())))
-
lastBlockLabel := gui.getObjectByName("lastBlockLabel")
miningLabel := gui.getObjectByName("miningLabel")
events := gui.eth.EventMux().Subscribe(
+ core.ChainEvent{},
core.TxPreEvent{},
core.TxPostEvent{},
)
@@ -406,6 +404,8 @@ func (gui *Gui) update() {
return
}
switch ev := ev.(type) {
+ case core.ChainEvent:
+ gui.processBlock(ev.Block, false)
case core.TxPreEvent:
gui.insertTransaction("pre", ev.Tx)
@@ -421,19 +421,6 @@ func (gui *Gui) update() {
lastBlockLabel.Set("text", statusText)
miningLabel.Set("text", "Mining @ "+strconv.FormatInt(gui.uiLib.Miner().HashRate(), 10)+"/Khash")
- /*
- blockLength := gui.eth.BlockPool().BlocksProcessed
- chainLength := gui.eth.BlockPool().ChainLength
-
- var (
- pct float64 = 1.0 / float64(chainLength) * float64(blockLength)
- dlWidget = gui.win.Root().ObjectByName("downloadIndicator")
- dlLabel = gui.win.Root().ObjectByName("downloadLabel")
- )
- dlWidget.Set("value", pct)
- dlLabel.Set("text", fmt.Sprintf("%d / %d", blockLength, chainLength))
- */
-
case <-statsUpdateTicker.C:
gui.setStatsPane()
}
diff --git a/core/block_processor.go b/core/block_processor.go
index b4449100f..a9795385f 100644
--- a/core/block_processor.go
+++ b/core/block_processor.go
@@ -73,24 +73,27 @@ func (sm *BlockProcessor) TransitionState(statedb *state.StateDB, parent, block
return receipts, nil
}
-func (self *BlockProcessor) ApplyTransaction(coinbase *state.StateObject, state *state.StateDB, block *types.Block, tx *types.Transaction, usedGas *big.Int, transientProcess bool) (*types.Receipt, *big.Int, error) {
+func (self *BlockProcessor) ApplyTransaction(coinbase *state.StateObject, statedb *state.StateDB, block *types.Block, tx *types.Transaction, usedGas *big.Int, transientProcess bool) (*types.Receipt, *big.Int, error) {
// If we are mining this block and validating we want to set the logs back to 0
- state.EmptyLogs()
+ statedb.EmptyLogs()
txGas := new(big.Int).Set(tx.Gas())
- cb := state.GetStateObject(coinbase.Address())
- st := NewStateTransition(NewEnv(state, self.bc, tx, block), tx, cb)
+ cb := statedb.GetStateObject(coinbase.Address())
+ st := NewStateTransition(NewEnv(statedb, self.bc, tx, block), tx, cb)
_, err := st.TransitionState()
+ if err != nil && (IsNonceErr(err) || state.IsGasLimitErr(err)) {
+ return nil, nil, err
+ }
txGas.Sub(txGas, st.gas)
// Update the state with pending changes
- state.Update(txGas)
+ statedb.Update(txGas)
cumulative := new(big.Int).Set(usedGas.Add(usedGas, txGas))
- receipt := types.NewReceipt(state.Root(), cumulative)
- receipt.SetLogs(state.Logs())
+ receipt := types.NewReceipt(statedb.Root(), cumulative)
+ receipt.SetLogs(statedb.Logs())
receipt.Bloom = types.CreateBloom(types.Receipts{receipt})
chainlogger.Debugln(receipt)
@@ -99,12 +102,12 @@ func (self *BlockProcessor) ApplyTransaction(coinbase *state.StateObject, state
go self.eventMux.Post(TxPostEvent{tx})
}
- go self.eventMux.Post(state.Logs())
+ go self.eventMux.Post(statedb.Logs())
return receipt, txGas, err
}
-func (self *BlockProcessor) ApplyTransactions(coinbase *state.StateObject, state *state.StateDB, block *types.Block, txs types.Transactions, transientProcess bool) (types.Receipts, types.Transactions, types.Transactions, types.Transactions, error) {
+func (self *BlockProcessor) ApplyTransactions(coinbase *state.StateObject, statedb *state.StateDB, block *types.Block, txs types.Transactions, transientProcess bool) (types.Receipts, types.Transactions, types.Transactions, types.Transactions, error) {
var (
receipts types.Receipts
handled, unhandled types.Transactions
@@ -115,12 +118,12 @@ func (self *BlockProcessor) ApplyTransactions(coinbase *state.StateObject, state
)
for _, tx := range txs {
- receipt, txGas, err := self.ApplyTransaction(coinbase, state, block, tx, totalUsedGas, transientProcess)
+ receipt, txGas, err := self.ApplyTransaction(coinbase, statedb, block, tx, totalUsedGas, transientProcess)
if err != nil {
switch {
case IsNonceErr(err):
return nil, nil, nil, nil, err
- case IsGasLimitErr(err):
+ case state.IsGasLimitErr(err):
return nil, nil, nil, nil, err
default:
statelogger.Infoln(err)
diff --git a/core/chain_manager.go b/core/chain_manager.go
index 286282064..003781791 100644
--- a/core/chain_manager.go
+++ b/core/chain_manager.go
@@ -134,14 +134,11 @@ func (self *ChainManager) State() *state.StateDB {
func (self *ChainManager) TransState() *state.StateDB {
self.tsmu.RLock()
defer self.tsmu.RUnlock()
- //tmp := self.transState
return self.transState
}
func (self *ChainManager) setTransState(statedb *state.StateDB) {
- self.tsmu.Lock()
- defer self.tsmu.Unlock()
self.transState = statedb
}
@@ -361,6 +358,9 @@ func (bc *ChainManager) Stop() {
}
func (self *ChainManager) InsertChain(chain types.Blocks) error {
+ self.tsmu.Lock()
+ defer self.tsmu.Unlock()
+
for _, block := range chain {
td, err := self.processor.Process(block)
if err != nil {
@@ -376,6 +376,7 @@ func (self *ChainManager) InsertChain(chain types.Blocks) error {
}
block.Td = td
+ var chain, split bool
self.mu.Lock()
{
self.write(block)
@@ -383,16 +384,26 @@ func (self *ChainManager) InsertChain(chain types.Blocks) error {
if td.Cmp(self.td) > 0 {
if block.Header().Number.Cmp(new(big.Int).Add(cblock.Header().Number, ethutil.Big1)) < 0 {
chainlogger.Infof("Split detected. New head #%v (%x) TD=%v, was #%v (%x) TD=%v\n", block.Header().Number, block.Hash()[:4], td, cblock.Header().Number, cblock.Hash()[:4], self.td)
+ split = true
}
self.setTotalDifficulty(td)
self.insert(block)
- self.setTransState(state.New(cblock.Root(), self.db))
- self.eventMux.Post(ChainEvent{block, td})
+ chain = true
}
}
self.mu.Unlock()
+
+ if chain {
+ //self.setTransState(state.New(block.Root(), self.db))
+ self.eventMux.Post(ChainEvent{block, td})
+ }
+
+ if split {
+ self.setTransState(state.New(block.Root(), self.db))
+ self.eventMux.Post(ChainSplitEvent{block})
+ }
}
return nil
@@ -402,3 +413,7 @@ func (self *ChainManager) InsertChain(chain types.Blocks) error {
func (self *ChainManager) GetAccount(addr []byte) *state.StateObject {
return self.State().GetAccount(addr)
}
+
+func (self *ChainManager) TransMut() *sync.RWMutex {
+ return &self.tsmu
+}
diff --git a/core/error.go b/core/error.go
index 6af48ac2d..e86bacb2d 100644
--- a/core/error.go
+++ b/core/error.go
@@ -68,23 +68,6 @@ func IsValidationErr(err error) bool {
return ok
}
-type GasLimitErr struct {
- Message string
- Is, Max *big.Int
-}
-
-func IsGasLimitErr(err error) bool {
- _, ok := err.(*GasLimitErr)
-
- return ok
-}
-func (err *GasLimitErr) Error() string {
- return err.Message
-}
-func GasLimitError(is, max *big.Int) *GasLimitErr {
- return &GasLimitErr{Message: fmt.Sprintf("GasLimit error. Max %s, transaction would take it to %s", max, is), Is: is, Max: max}
-}
-
type NonceErr struct {
Message string
Is, Exp uint64
diff --git a/core/events.go b/core/events.go
index fe106da49..4cbbc609c 100644
--- a/core/events.go
+++ b/core/events.go
@@ -13,3 +13,6 @@ type NewBlockEvent struct{ Block *types.Block }
// NewMinedBlockEvent is posted when a block has been imported.
type NewMinedBlockEvent struct{ Block *types.Block }
+
+// ChainSplit is posted when a new head is detected
+type ChainSplitEvent struct{ Block *types.Block }
diff --git a/core/state_transition.go b/core/state_transition.go
index 33dd45f02..e82be647d 100644
--- a/core/state_transition.go
+++ b/core/state_transition.go
@@ -166,7 +166,8 @@ func (self *StateTransition) TransitionState() (ret []byte, err error) {
defer self.RefundGas()
// Increment the nonce for the next transaction
- sender.Nonce += 1
+ self.state.SetNonce(sender.Address(), sender.Nonce+1)
+ //sender.Nonce += 1
// Transaction gas
if err = self.UseGas(vm.GasTx); err != nil {
diff --git a/core/transaction_pool.go b/core/transaction_pool.go
index 7a901fcae..894b6c440 100644
--- a/core/transaction_pool.go
+++ b/core/transaction_pool.go
@@ -3,6 +3,7 @@ package core
import (
"errors"
"fmt"
+ "sync"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethutil"
@@ -35,6 +36,7 @@ type TxProcessor interface {
// guarantee a non blocking pool we use a queue channel which can be
// independently read without needing access to the actual pool.
type TxPool struct {
+ mu sync.RWMutex
// Queueing channel for reading and writing incoming
// transactions to
queueChan chan *types.Transaction
@@ -97,7 +99,7 @@ func (self *TxPool) addTx(tx *types.Transaction) {
self.txs[string(tx.Hash())] = tx
}
-func (self *TxPool) Add(tx *types.Transaction) error {
+func (self *TxPool) add(tx *types.Transaction) error {
if self.txs[string(tx.Hash())] != nil {
return fmt.Errorf("Known transaction (%x)", tx.Hash()[0:4])
}
@@ -128,17 +130,28 @@ func (self *TxPool) Size() int {
return len(self.txs)
}
+func (self *TxPool) Add(tx *types.Transaction) error {
+ self.mu.Lock()
+ defer self.mu.Unlock()
+ return self.add(tx)
+}
func (self *TxPool) AddTransactions(txs []*types.Transaction) {
+ self.mu.Lock()
+ defer self.mu.Unlock()
+
for _, tx := range txs {
- if err := self.Add(tx); err != nil {
- txplogger.Infoln(err)
+ if err := self.add(tx); err != nil {
+ txplogger.Debugln(err)
} else {
- txplogger.Infof("tx %x\n", tx.Hash()[0:4])
+ txplogger.Debugf("tx %x\n", tx.Hash()[0:4])
}
}
}
func (self *TxPool) GetTransactions() (txs types.Transactions) {
+ self.mu.RLock()
+ defer self.mu.RUnlock()
+
txs = make(types.Transactions, self.Size())
i := 0
for _, tx := range self.txs {
@@ -150,30 +163,32 @@ func (self *TxPool) GetTransactions() (txs types.Transactions) {
}
func (pool *TxPool) RemoveInvalid(query StateQuery) {
+ pool.mu.Lock()
+
var removedTxs types.Transactions
for _, tx := range pool.txs {
sender := query.GetAccount(tx.From())
err := pool.ValidateTransaction(tx)
- fmt.Println(err, sender.Nonce, tx.Nonce())
if err != nil || sender.Nonce >= tx.Nonce() {
removedTxs = append(removedTxs, tx)
}
}
+ pool.mu.Unlock()
pool.RemoveSet(removedTxs)
}
func (self *TxPool) RemoveSet(txs types.Transactions) {
+ self.mu.Lock()
+ defer self.mu.Unlock()
+
for _, tx := range txs {
delete(self.txs, string(tx.Hash()))
}
}
-func (pool *TxPool) Flush() []*types.Transaction {
- txList := pool.GetTransactions()
+func (pool *TxPool) Flush() {
pool.txs = make(map[string]*types.Transaction)
-
- return txList
}
func (pool *TxPool) Start() {
diff --git a/ethutil/common.go b/ethutil/common.go
index 271c56fd5..2ef2440c7 100644
--- a/ethutil/common.go
+++ b/ethutil/common.go
@@ -4,6 +4,7 @@ import (
"fmt"
"math/big"
"runtime"
+ "time"
)
func IsWindows() bool {
@@ -86,3 +87,9 @@ var (
Big256 = big.NewInt(0xff)
Big257 = big.NewInt(257)
)
+
+func Bench(pre string, cb func()) {
+ start := time.Now()
+ cb()
+ fmt.Println(pre, ": took:", time.Since(start))
+}
diff --git a/miner/worker.go b/miner/worker.go
index 47b462e53..1f3a52ab5 100644
--- a/miner/worker.go
+++ b/miner/worker.go
@@ -109,14 +109,18 @@ func (self *worker) register(agent Agent) {
}
func (self *worker) update() {
- events := self.mux.Subscribe(core.ChainEvent{}, core.TxPreEvent{})
+ events := self.mux.Subscribe(core.ChainEvent{}, core.NewMinedBlockEvent{})
out:
for {
select {
case event := <-events.Chan():
- switch event.(type) {
- case core.ChainEvent, core.TxPreEvent:
+ switch ev := event.(type) {
+ case core.ChainEvent:
+ if self.current.block != ev.Block {
+ self.commitNewWork()
+ }
+ case core.NewMinedBlockEvent:
self.commitNewWork()
}
case <-self.quit:
@@ -172,17 +176,19 @@ func (self *worker) commitNewWork() {
transactions := self.eth.TxPool().GetTransactions()
sort.Sort(types.TxByNonce{transactions})
+ minerlogger.Infof("committing new work with %d txs\n", len(transactions))
// Keep track of transactions which return errors so they can be removed
var remove types.Transactions
+gasLimit:
for _, tx := range transactions {
err := self.commitTransaction(tx)
switch {
case core.IsNonceErr(err):
// Remove invalid transactions
remove = append(remove, tx)
- case core.IsGasLimitErr(err):
+ case state.IsGasLimitErr(err):
// Break on gas limit
- break
+ break gasLimit
}
if err != nil {
@@ -227,11 +233,9 @@ func (self *worker) commitUncle(uncle *types.Header) error {
}
func (self *worker) commitTransaction(tx *types.Transaction) error {
- snapshot := self.current.state.Copy()
+ //fmt.Printf("proc %x %v\n", tx.Hash()[:3], tx.Nonce())
receipt, _, err := self.proc.ApplyTransaction(self.current.coinbase, self.current.state, self.current.block, tx, self.current.totalUsedGas, true)
- if err != nil && (core.IsNonceErr(err) || core.IsGasLimitErr(err)) {
- self.current.state.Set(snapshot)
-
+ if err != nil && (core.IsNonceErr(err) || state.IsGasLimitErr(err)) {
return err
}
diff --git a/pow/ezp/pow.go b/pow/ezp/pow.go
index 540381243..f4a8b80e5 100644
--- a/pow/ezp/pow.go
+++ b/pow/ezp/pow.go
@@ -21,7 +21,7 @@ type EasyPow struct {
}
func New() *EasyPow {
- return &EasyPow{turbo: false}
+ return &EasyPow{turbo: true}
}
func (pow *EasyPow) GetHashrate() int64 {
diff --git a/rpc/args.go b/rpc/args.go
index 429b385d5..f730819fd 100644
--- a/rpc/args.go
+++ b/rpc/args.go
@@ -289,7 +289,7 @@ type WhisperMessageArgs struct {
Payload string
To string
From string
- Topics []string
+ Topic []string
Priority uint32
Ttl uint32
}
diff --git a/rpc/message.go b/rpc/message.go
index b5b852f54..d96c35d7e 100644
--- a/rpc/message.go
+++ b/rpc/message.go
@@ -231,21 +231,6 @@ func (req *RpcRequest) ToFilterStringArgs() (string, error) {
return args, nil
}
-func (req *RpcRequest) ToFilterChangedArgs() (int, error) {
- if len(req.Params) < 1 {
- return 0, NewErrorResponse(ErrorArguments)
- }
-
- var id int
- r := bytes.NewReader(req.Params[0])
- err := json.NewDecoder(r).Decode(&id)
- if err != nil {
- return 0, NewErrorResponse(ErrorDecodeArgs)
- }
- rpclogger.DebugDetailf("%T %v", id, id)
- return id, nil
-}
-
func (req *RpcRequest) ToDbPutArgs() (*DbArgs, error) {
if len(req.Params) < 3 {
return nil, NewErrorResponse(ErrorArguments)
@@ -301,7 +286,7 @@ func (req *RpcRequest) ToWhisperFilterArgs() (*xeth.Options, error) {
return &args, nil
}
-func (req *RpcRequest) ToWhisperIdArgs() (int, error) {
+func (req *RpcRequest) ToIdArgs() (int, error) {
if len(req.Params) < 1 {
return 0, NewErrorResponse(ErrorArguments)
}
diff --git a/rpc/packages.go b/rpc/packages.go
index 8aa604aa5..7411392c2 100644
--- a/rpc/packages.go
+++ b/rpc/packages.go
@@ -13,6 +13,7 @@ import (
"math/big"
"strings"
"sync"
+ "time"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
@@ -31,13 +32,14 @@ const (
type EthereumApi struct {
xeth *xeth.XEth
+ quit chan struct{}
filterManager *filter.FilterManager
logMut sync.RWMutex
- logs map[int]state.Logs
+ logs map[int]*logFilter
messagesMut sync.RWMutex
- messages map[int][]xeth.WhisperMessage
+ messages map[int]*whisperFilter
// Register keeps a list of accounts and transaction data
regmut sync.Mutex
register map[string][]*NewTxArgs
@@ -49,12 +51,14 @@ func NewEthereumApi(eth *xeth.XEth) *EthereumApi {
db, _ := ethdb.NewLDBDatabase("dapps")
api := &EthereumApi{
xeth: eth,
+ quit: make(chan struct{}),
filterManager: filter.NewFilterManager(eth.Backend().EventMux()),
- logs: make(map[int]state.Logs),
- messages: make(map[int][]xeth.WhisperMessage),
+ logs: make(map[int]*logFilter),
+ messages: make(map[int]*whisperFilter),
db: db,
}
go api.filterManager.Start()
+ go api.start()
return api
}
@@ -97,7 +101,11 @@ func (self *EthereumApi) NewFilter(args *FilterOptions, reply *interface{}) erro
self.logMut.Lock()
defer self.logMut.Unlock()
- self.logs[id] = append(self.logs[id], logs...)
+ if self.logs[id] == nil {
+ self.logs[id] = &logFilter{timeout: time.Now()}
+ }
+
+ self.logs[id].add(logs...)
}
id = self.filterManager.InstallFilter(filter)
*reply = id
@@ -113,7 +121,11 @@ func (self *EthereumApi) NewFilterString(args string, reply *interface{}) error
self.logMut.Lock()
defer self.logMut.Unlock()
- self.logs[id] = append(self.logs[id], &state.StateLog{})
+ if self.logs[id] == nil {
+ self.logs[id] = &logFilter{timeout: time.Now()}
+ }
+
+ self.logs[id].add(&state.StateLog{})
}
if args == "pending" {
filter.PendingCallback = callback
@@ -131,9 +143,9 @@ func (self *EthereumApi) FilterChanged(id int, reply *interface{}) error {
self.logMut.Lock()
defer self.logMut.Unlock()
- *reply = toLogs(self.logs[id])
-
- self.logs[id] = nil // empty the logs
+ if self.logs[id] != nil {
+ *reply = toLogs(self.logs[id].get())
+ }
return nil
}
@@ -180,6 +192,7 @@ func (p *EthereumApi) Transact(args *NewTxArgs, reply *interface{}) error {
result, _ := p.xeth.Transact( /* TODO specify account */ args.To, args.Value, args.Gas, args.GasPrice, args.Data)
*reply = result
}
+
return nil
}
@@ -330,7 +343,10 @@ func (p *EthereumApi) NewWhisperFilter(args *xeth.Options, reply *interface{}) e
args.Fn = func(msg xeth.WhisperMessage) {
p.messagesMut.Lock()
defer p.messagesMut.Unlock()
- p.messages[id] = append(p.messages[id], msg)
+ if p.messages[id] == nil {
+ p.messages[id] = &whisperFilter{timeout: time.Now()}
+ }
+ p.messages[id].add(msg) // = append(p.messages[id], msg)
}
id = p.xeth.Whisper().Watch(args)
*reply = id
@@ -341,15 +357,15 @@ func (self *EthereumApi) MessagesChanged(id int, reply *interface{}) error {
self.messagesMut.Lock()
defer self.messagesMut.Unlock()
- *reply = self.messages[id]
-
- self.messages[id] = nil // empty the messages
+ if self.messages[id] != nil {
+ *reply = self.messages[id].get()
+ }
return nil
}
func (p *EthereumApi) WhisperPost(args *WhisperMessageArgs, reply *interface{}) error {
- err := p.xeth.Whisper().Post(args.Payload, args.To, args.From, args.Topics, args.Priority, args.Ttl)
+ err := p.xeth.Whisper().Post(args.Payload, args.To, args.From, args.Topic, args.Priority, args.Ttl)
if err != nil {
return err
}
@@ -445,13 +461,13 @@ func (p *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) error
}
return p.NewFilterString(args, reply)
case "eth_changed":
- args, err := req.ToFilterChangedArgs()
+ args, err := req.ToIdArgs()
if err != nil {
return err
}
return p.FilterChanged(args, reply)
case "eth_filterLogs":
- args, err := req.ToFilterChangedArgs()
+ args, err := req.ToIdArgs()
if err != nil {
return err
}
@@ -504,7 +520,7 @@ func (p *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) error
}
return p.NewWhisperFilter(args, reply)
case "shh_changed":
- args, err := req.ToWhisperIdArgs()
+ args, err := req.ToIdArgs()
if err != nil {
return err
}
@@ -522,7 +538,7 @@ func (p *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) error
}
return p.HasWhisperIdentity(args, reply)
case "shh_getMessages":
- args, err := req.ToWhisperIdArgs()
+ args, err := req.ToIdArgs()
if err != nil {
return err
}
@@ -534,3 +550,34 @@ func (p *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) error
rpclogger.DebugDetailf("Reply: %T %s", reply, reply)
return nil
}
+
+var filterTickerTime = 15 * time.Second
+
+func (self *EthereumApi) start() {
+ timer := time.NewTicker(filterTickerTime)
+done:
+ for {
+ select {
+ case <-timer.C:
+ self.logMut.Lock()
+ self.messagesMut.Lock()
+ for id, filter := range self.logs {
+ if time.Since(filter.timeout) > 20*time.Second {
+ delete(self.logs, id)
+ }
+ }
+
+ for id, filter := range self.messages {
+ if time.Since(filter.timeout) > 20*time.Second {
+ delete(self.messages, id)
+ }
+ }
+ case <-self.quit:
+ break done
+ }
+ }
+}
+
+func (self *EthereumApi) stop() {
+ close(self.quit)
+}
diff --git a/rpc/packages_test.go b/rpc/packages_test.go
new file mode 100644
index 000000000..037fd78b3
--- /dev/null
+++ b/rpc/packages_test.go
@@ -0,0 +1,37 @@
+package rpc
+
+import (
+ "sync"
+ "testing"
+ "time"
+)
+
+func TestFilterClose(t *testing.T) {
+ api := &EthereumApi{
+ logs: make(map[int]*logFilter),
+ messages: make(map[int]*whisperFilter),
+ quit: make(chan struct{}),
+ }
+
+ filterTickerTime = 1
+ api.logs[0] = &logFilter{}
+ api.messages[0] = &whisperFilter{}
+ var wg sync.WaitGroup
+ wg.Add(1)
+ go api.start()
+ go func() {
+ select {
+ case <-time.After(500 * time.Millisecond):
+ api.stop()
+ wg.Done()
+ }
+ }()
+ wg.Wait()
+ if len(api.logs) != 0 {
+ t.Error("expected logs to be empty")
+ }
+
+ if len(api.messages) != 0 {
+ t.Error("expected messages to be empty")
+ }
+}
diff --git a/rpc/util.go b/rpc/util.go
index 679d83754..29824bcdb 100644
--- a/rpc/util.go
+++ b/rpc/util.go
@@ -20,10 +20,12 @@ import (
"encoding/json"
"io"
"net/http"
+ "time"
"github.com/ethereum/go-ethereum/ethutil"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/state"
+ "github.com/ethereum/go-ethereum/xeth"
)
var rpclogger = logger.NewLogger("RPC")
@@ -80,7 +82,7 @@ type RpcServer interface {
type Log struct {
Address string `json:"address"`
- Topics []string `json:"topics"`
+ Topic []string `json:"topics"`
Data string `json:"data"`
}
@@ -89,14 +91,45 @@ func toLogs(logs state.Logs) (ls []Log) {
for i, log := range logs {
var l Log
- l.Topics = make([]string, len(log.Topics()))
+ l.Topic = make([]string, len(log.Topics()))
l.Address = toHex(log.Address())
l.Data = toHex(log.Data())
for j, topic := range log.Topics() {
- l.Topics[j] = toHex(topic)
+ l.Topic[j] = toHex(topic)
}
ls[i] = l
}
return
}
+
+type whisperFilter struct {
+ messages []xeth.WhisperMessage
+ timeout time.Time
+}
+
+func (w *whisperFilter) add(msgs ...xeth.WhisperMessage) {
+ w.messages = append(w.messages, msgs...)
+}
+func (w *whisperFilter) get() []xeth.WhisperMessage {
+ w.timeout = time.Now()
+ tmp := w.messages
+ w.messages = nil
+ return tmp
+}
+
+type logFilter struct {
+ logs state.Logs
+ timeout time.Time
+}
+
+func (l *logFilter) add(logs ...state.Log) {
+ l.logs = append(l.logs, logs...)
+}
+
+func (l *logFilter) get() state.Logs {
+ l.timeout = time.Now()
+ tmp := l.logs
+ l.logs = nil
+ return tmp
+}
diff --git a/state/state_object.go b/state/state_object.go
index 0c157403c..d50c9fd7a 100644
--- a/state/state_object.go
+++ b/state/state_object.go
@@ -53,6 +53,7 @@ type StateObject struct {
// When an object is marked for deletion it will be delete from the trie
// during the "update" phase of the state transition
remove bool
+ dirty bool
}
func (self *StateObject) Reset() {
@@ -152,6 +153,7 @@ func (self *StateObject) Sync() {
self.setAddr([]byte(key), value)
}
+ self.storage = make(Storage)
}
func (c *StateObject) GetInstr(pc *big.Int) *ethutil.Value {
@@ -210,6 +212,8 @@ func (self *StateObject) BuyGas(gas, price *big.Int) error {
return GasLimitError(self.gasPool, gas)
}
+ self.gasPool.Sub(self.gasPool, gas)
+
rGas := new(big.Int).Set(gas)
rGas.Mul(rGas, price)
@@ -240,6 +244,7 @@ func (self *StateObject) Copy() *StateObject {
stateObject.storage = self.storage.Copy()
stateObject.gasPool.Set(self.gasPool)
stateObject.remove = self.remove
+ stateObject.dirty = self.dirty
return stateObject
}
diff --git a/state/statedb.go b/state/statedb.go
index c83d59ed7..8c8a21db9 100644
--- a/state/statedb.go
+++ b/state/statedb.go
@@ -78,43 +78,45 @@ func (self *StateDB) GetNonce(addr []byte) uint64 {
return 0
}
-func (self *StateDB) SetNonce(addr []byte, nonce uint64) {
+func (self *StateDB) GetCode(addr []byte) []byte {
stateObject := self.GetStateObject(addr)
if stateObject != nil {
- stateObject.Nonce = nonce
+ return stateObject.Code
}
+
+ return nil
}
-func (self *StateDB) GetCode(addr []byte) []byte {
- stateObject := self.GetStateObject(addr)
+func (self *StateDB) GetState(a, b []byte) []byte {
+ stateObject := self.GetStateObject(a)
if stateObject != nil {
- return stateObject.Code
+ return stateObject.GetState(b).Bytes()
}
return nil
}
-func (self *StateDB) SetCode(addr, code []byte) {
+func (self *StateDB) SetNonce(addr []byte, nonce uint64) {
stateObject := self.GetStateObject(addr)
if stateObject != nil {
- stateObject.SetCode(code)
+ stateObject.Nonce = nonce
+ stateObject.dirty = true
}
}
-// TODO vars
-func (self *StateDB) GetState(a, b []byte) []byte {
- stateObject := self.GetStateObject(a)
+func (self *StateDB) SetCode(addr, code []byte) {
+ stateObject := self.GetStateObject(addr)
if stateObject != nil {
- return stateObject.GetState(b).Bytes()
+ stateObject.SetCode(code)
+ stateObject.dirty = true
}
-
- return nil
}
func (self *StateDB) SetState(addr, key []byte, value interface{}) {
stateObject := self.GetStateObject(addr)
if stateObject != nil {
stateObject.SetState(key, ethutil.NewValue(value))
+ stateObject.dirty = true
}
}
@@ -122,6 +124,7 @@ func (self *StateDB) Delete(addr []byte) bool {
stateObject := self.GetStateObject(addr)
if stateObject != nil {
stateObject.MarkForDeletion()
+ stateObject.dirty = true
return true
}
@@ -282,16 +285,18 @@ func (self *StateDB) Refunds() map[string]*big.Int {
}
func (self *StateDB) Update(gasUsed *big.Int) {
-
self.refund = make(map[string]*big.Int)
for _, stateObject := range self.stateObjects {
- if stateObject.remove {
- self.DeleteStateObject(stateObject)
- } else {
- stateObject.Sync()
-
- self.UpdateStateObject(stateObject)
+ if stateObject.dirty {
+ if stateObject.remove {
+ self.DeleteStateObject(stateObject)
+ } else {
+ stateObject.Sync()
+
+ self.UpdateStateObject(stateObject)
+ }
+ stateObject.dirty = false
}
}
}
diff --git a/xeth/types.go b/xeth/types.go
index a903fccbb..5b2d16018 100644
--- a/xeth/types.go
+++ b/xeth/types.go
@@ -150,7 +150,7 @@ type Transaction struct {
func NewTx(tx *types.Transaction) *Transaction {
hash := toHex(tx.Hash())
receiver := toHex(tx.To())
- if receiver == "0000000000000000000000000000000000000000" {
+ if len(receiver) == 0 {
receiver = toHex(core.AddressFromMessage(tx))
}
sender := toHex(tx.From())
diff --git a/xeth/xeth.go b/xeth/xeth.go
index f005105bb..d578c03c9 100644
--- a/xeth/xeth.go
+++ b/xeth/xeth.go
@@ -7,6 +7,7 @@ package xeth
import (
"bytes"
"encoding/json"
+ "fmt"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
@@ -241,7 +242,6 @@ func (self *XEth) Call(toStr, valueStr, gasStr, gasPriceStr, dataStr string) (st
}
func (self *XEth) Transact(toStr, valueStr, gasStr, gasPriceStr, codeStr string) (string, error) {
-
var (
to []byte
value = ethutil.NewValue(valueStr)
@@ -265,29 +265,32 @@ func (self *XEth) Transact(toStr, valueStr, gasStr, gasPriceStr, codeStr string)
tx = types.NewTransactionMessage(to, value.BigInt(), gas.BigInt(), price.BigInt(), data)
}
- state := self.chainManager.TransState()
+ var err error
+ state := self.eth.ChainManager().TransState()
+ if balance := state.GetBalance(key.Address()); balance.Cmp(tx.Value()) < 0 {
+ return "", fmt.Errorf("insufficient balance. balance=%v tx=%v", balance, tx.Value())
+ }
nonce := state.GetNonce(key.Address())
tx.SetNonce(nonce)
tx.Sign(key.PrivateKey)
- // Do some pre processing for our "pre" events and hooks
- block := self.chainManager.NewBlock(key.Address())
- coinbase := state.GetOrNewStateObject(key.Address())
- coinbase.SetGasPool(block.GasLimit())
- self.blockProcessor.ApplyTransactions(coinbase, state, block, types.Transactions{tx}, true)
+ //fmt.Printf("create tx: %x %v\n", tx.Hash()[:4], tx.Nonce())
- err := self.eth.TxPool().Add(tx)
+ /*
+ // Do some pre processing for our "pre" events and hooks
+ block := self.chainManager.NewBlock(key.Address())
+ coinbase := state.GetOrNewStateObject(key.Address())
+ coinbase.SetGasPool(block.GasLimit())
+ self.blockProcessor.ApplyTransactions(coinbase, state, block, types.Transactions{tx}, true)
+ */
+
+ err = self.eth.TxPool().Add(tx)
if err != nil {
return "", err
}
state.SetNonce(key.Address(), nonce+1)
- if contractCreation {
- addr := core.AddressFromMessage(tx)
- pipelogger.Infof("Contract addr %x\n", addr)
- }
-
if types.IsContractAddr(to) {
return toHex(core.AddressFromMessage(tx)), nil
}