aboutsummaryrefslogtreecommitdiffstats
path: root/xeth/xeth.go
diff options
context:
space:
mode:
Diffstat (limited to 'xeth/xeth.go')
-rw-r--r--xeth/xeth.go108
1 files changed, 65 insertions, 43 deletions
diff --git a/xeth/xeth.go b/xeth/xeth.go
index 1be42734d..701932f97 100644
--- a/xeth/xeth.go
+++ b/xeth/xeth.go
@@ -33,9 +33,10 @@ import (
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/eth"
- "github.com/ethereum/go-ethereum/event/filter"
+ "github.com/ethereum/go-ethereum/eth/filters"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
"github.com/ethereum/go-ethereum/miner"
@@ -75,7 +76,7 @@ type XEth struct {
whisper *Whisper
quit chan struct{}
- filterManager *filter.FilterManager
+ filterManager *filters.FilterSystem
logMu sync.RWMutex
logQueue map[int]*logQueue
@@ -111,7 +112,7 @@ func New(ethereum *eth.Ethereum, frontend Frontend) *XEth {
backend: ethereum,
frontend: frontend,
quit: make(chan struct{}),
- filterManager: filter.NewFilterManager(ethereum.EventMux()),
+ filterManager: filters.NewFilterSystem(ethereum.EventMux()),
logQueue: make(map[int]*logQueue),
blockQueue: make(map[int]*hashQueue),
transactionQueue: make(map[int]*hashQueue),
@@ -125,10 +126,13 @@ func New(ethereum *eth.Ethereum, frontend Frontend) *XEth {
if frontend == nil {
xeth.frontend = dummyFrontend{}
}
- xeth.state = NewState(xeth, xeth.backend.ChainManager().State())
+ state, err := xeth.backend.BlockChain().State()
+ if err != nil {
+ return nil
+ }
+ xeth.state = NewState(xeth, state)
go xeth.start()
- go xeth.filterManager.Start()
return xeth
}
@@ -142,7 +146,7 @@ done:
self.logMu.Lock()
for id, filter := range self.logQueue {
if time.Since(filter.timeout) > filterTickerTime {
- self.filterManager.UninstallFilter(id)
+ self.filterManager.Remove(id)
delete(self.logQueue, id)
}
}
@@ -151,7 +155,7 @@ done:
self.blockMu.Lock()
for id, filter := range self.blockQueue {
if time.Since(filter.timeout) > filterTickerTime {
- self.filterManager.UninstallFilter(id)
+ self.filterManager.Remove(id)
delete(self.blockQueue, id)
}
}
@@ -160,7 +164,7 @@ done:
self.transactionMu.Lock()
for id, filter := range self.transactionQueue {
if time.Since(filter.timeout) > filterTickerTime {
- self.filterManager.UninstallFilter(id)
+ self.filterManager.Remove(id)
delete(self.transactionQueue, id)
}
}
@@ -207,14 +211,21 @@ func (self *XEth) RemoteMining() *miner.RemoteAgent { return self.agent }
func (self *XEth) AtStateNum(num int64) *XEth {
var st *state.StateDB
+ var err error
switch num {
case -2:
st = self.backend.Miner().PendingState().Copy()
default:
if block := self.getBlockByHeight(num); block != nil {
- st = state.New(block.Root(), self.backend.ChainDb())
+ st, err = state.New(block.Root(), self.backend.ChainDb())
+ if err != nil {
+ return nil
+ }
} else {
- st = state.New(self.backend.ChainManager().GetBlockByNumber(0).Root(), self.backend.ChainDb())
+ st, err = state.New(self.backend.BlockChain().GetBlockByNumber(0).Root(), self.backend.ChainDb())
+ if err != nil {
+ return nil
+ }
}
}
@@ -244,30 +255,41 @@ func (self *XEth) State() *State { return self.state }
func (self *XEth) UpdateState() (wait chan *big.Int) {
wait = make(chan *big.Int)
go func() {
- sub := self.backend.EventMux().Subscribe(core.ChainHeadEvent{})
+ eventSub := self.backend.EventMux().Subscribe(core.ChainHeadEvent{})
+ defer eventSub.Unsubscribe()
+
var m, n *big.Int
var ok bool
- out:
+
+ eventCh := eventSub.Chan()
for {
select {
- case event := <-sub.Chan():
- ev, ok := event.(core.ChainHeadEvent)
- if ok {
- m = ev.Block.Number()
+ case event, ok := <-eventCh:
+ if !ok {
+ // Event subscription closed, set the channel to nil to stop spinning
+ eventCh = nil
+ continue
+ }
+ // A real event arrived, process if new head block assignment
+ if event, ok := event.Data.(core.ChainHeadEvent); ok {
+ m = event.Block.Number()
if n != nil && n.Cmp(m) < 0 {
wait <- n
n = nil
}
- statedb := state.New(ev.Block.Root(), self.backend.ChainDb())
+ statedb, err := state.New(event.Block.Root(), self.backend.ChainDb())
+ if err != nil {
+ glog.V(logger.Error).Infoln("Could not create new state: %v", err)
+ return
+ }
self.state = NewState(self, statedb)
}
case n, ok = <-wait:
if !ok {
- break out
+ return
}
}
}
- sub.Unsubscribe()
}()
return
}
@@ -290,19 +312,19 @@ func (self *XEth) getBlockByHeight(height int64) *types.Block {
num = uint64(height)
}
- return self.backend.ChainManager().GetBlockByNumber(num)
+ return self.backend.BlockChain().GetBlockByNumber(num)
}
func (self *XEth) BlockByHash(strHash string) *Block {
hash := common.HexToHash(strHash)
- block := self.backend.ChainManager().GetBlock(hash)
+ block := self.backend.BlockChain().GetBlock(hash)
return NewBlock(block)
}
func (self *XEth) EthBlockByHash(strHash string) *types.Block {
hash := common.HexToHash(strHash)
- block := self.backend.ChainManager().GetBlock(hash)
+ block := self.backend.BlockChain().GetBlock(hash)
return block
}
@@ -356,11 +378,11 @@ func (self *XEth) EthBlockByNumber(num int64) *types.Block {
}
func (self *XEth) Td(hash common.Hash) *big.Int {
- return self.backend.ChainManager().GetTd(hash)
+ return self.backend.BlockChain().GetTd(hash)
}
func (self *XEth) CurrentBlock() *types.Block {
- return self.backend.ChainManager().CurrentBlock()
+ return self.backend.BlockChain().CurrentBlock()
}
func (self *XEth) GetBlockReceipts(bhash common.Hash) types.Receipts {
@@ -372,7 +394,7 @@ func (self *XEth) GetTxReceipt(txhash common.Hash) *types.Receipt {
}
func (self *XEth) GasLimit() *big.Int {
- return self.backend.ChainManager().GasLimit()
+ return self.backend.BlockChain().GasLimit()
}
func (self *XEth) Block(v interface{}) *Block {
@@ -504,7 +526,7 @@ func (self *XEth) IsContract(address string) bool {
}
func (self *XEth) UninstallFilter(id int) bool {
- defer self.filterManager.UninstallFilter(id)
+ defer self.filterManager.Remove(id)
if _, ok := self.logQueue[id]; ok {
self.logMu.Lock()
@@ -532,8 +554,8 @@ func (self *XEth) NewLogFilter(earliest, latest int64, skip, max int, address []
self.logMu.Lock()
defer self.logMu.Unlock()
- filter := core.NewFilter(self.backend)
- id := self.filterManager.InstallFilter(filter)
+ filter := filters.New(self.backend.ChainDb())
+ id := self.filterManager.Add(filter)
self.logQueue[id] = &logQueue{timeout: time.Now()}
filter.SetEarliestBlock(earliest)
@@ -542,7 +564,7 @@ func (self *XEth) NewLogFilter(earliest, latest int64, skip, max int, address []
filter.SetMax(max)
filter.SetAddress(cAddress(address))
filter.SetTopics(cTopics(topics))
- filter.LogsCallback = func(logs state.Logs) {
+ filter.LogsCallback = func(logs vm.Logs) {
self.logMu.Lock()
defer self.logMu.Unlock()
@@ -558,8 +580,8 @@ func (self *XEth) NewTransactionFilter() int {
self.transactionMu.Lock()
defer self.transactionMu.Unlock()
- filter := core.NewFilter(self.backend)
- id := self.filterManager.InstallFilter(filter)
+ filter := filters.New(self.backend.ChainDb())
+ id := self.filterManager.Add(filter)
self.transactionQueue[id] = &hashQueue{timeout: time.Now()}
filter.TransactionCallback = func(tx *types.Transaction) {
@@ -577,11 +599,11 @@ func (self *XEth) NewBlockFilter() int {
self.blockMu.Lock()
defer self.blockMu.Unlock()
- filter := core.NewFilter(self.backend)
- id := self.filterManager.InstallFilter(filter)
+ filter := filters.New(self.backend.ChainDb())
+ id := self.filterManager.Add(filter)
self.blockQueue[id] = &hashQueue{timeout: time.Now()}
- filter.BlockCallback = func(block *types.Block, logs state.Logs) {
+ filter.BlockCallback = func(block *types.Block, logs vm.Logs) {
self.blockMu.Lock()
defer self.blockMu.Unlock()
@@ -604,7 +626,7 @@ func (self *XEth) GetFilterType(id int) byte {
return UnknownFilterTy
}
-func (self *XEth) LogFilterChanged(id int) state.Logs {
+func (self *XEth) LogFilterChanged(id int) vm.Logs {
self.logMu.Lock()
defer self.logMu.Unlock()
@@ -634,8 +656,8 @@ func (self *XEth) TransactionFilterChanged(id int) []common.Hash {
return nil
}
-func (self *XEth) Logs(id int) state.Logs {
- filter := self.filterManager.GetFilter(id)
+func (self *XEth) Logs(id int) vm.Logs {
+ filter := self.filterManager.Get(id)
if filter != nil {
return filter.Find()
}
@@ -643,8 +665,8 @@ func (self *XEth) Logs(id int) state.Logs {
return nil
}
-func (self *XEth) AllLogs(earliest, latest int64, skip, max int, address []string, topics [][]string) state.Logs {
- filter := core.NewFilter(self.backend)
+func (self *XEth) AllLogs(earliest, latest int64, skip, max int, address []string, topics [][]string) vm.Logs {
+ filter := filters.New(self.backend.ChainDb())
filter.SetEarliestBlock(earliest)
filter.SetLatestBlock(latest)
filter.SetSkip(skip)
@@ -855,7 +877,7 @@ func (self *XEth) Call(fromStr, toStr, valueStr, gasStr, gasPriceStr, dataStr st
}
header := self.CurrentBlock().Header()
- vmenv := core.NewEnv(statedb, self.backend.ChainManager(), msg, header)
+ vmenv := core.NewEnv(statedb, self.backend.BlockChain(), msg, header)
res, gas, err := core.ApplyMessage(vmenv, msg, from)
return common.ToHex(res), gas.String(), err
@@ -1030,19 +1052,19 @@ func (m callmsg) Data() []byte { return m.data }
type logQueue struct {
mu sync.Mutex
- logs state.Logs
+ logs vm.Logs
timeout time.Time
id int
}
-func (l *logQueue) add(logs ...*state.Log) {
+func (l *logQueue) add(logs ...*vm.Log) {
l.mu.Lock()
defer l.mu.Unlock()
l.logs = append(l.logs, logs...)
}
-func (l *logQueue) get() state.Logs {
+func (l *logQueue) get() vm.Logs {
l.mu.Lock()
defer l.mu.Unlock()