diff options
Diffstat (limited to 'xeth/xeth.go')
-rw-r--r-- | xeth/xeth.go | 340 |
1 files changed, 265 insertions, 75 deletions
diff --git a/xeth/xeth.go b/xeth/xeth.go index ac59069d5..ad8596803 100644 --- a/xeth/xeth.go +++ b/xeth/xeth.go @@ -11,6 +11,7 @@ import ( "github.com/ethereum/go-ethereum/accounts" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/compiler" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" @@ -29,6 +30,14 @@ var ( defaultGas = big.NewInt(90000) //500000 ) +// byte will be inferred +const ( + UnknownFilterTy = iota + BlockFilterTy + TransactionFilterTy + LogFilterTy +) + func DefaultGas() *big.Int { return new(big.Int).Set(defaultGas) } func DefaultGasPrice() *big.Int { return new(big.Int).Set(defaultGasPrice) } @@ -42,15 +51,24 @@ type XEth struct { quit chan struct{} filterManager *filter.FilterManager - logMut sync.RWMutex - logs map[int]*logFilter + logMu sync.RWMutex + logQueue map[int]*logQueue + + blockMu sync.RWMutex + blockQueue map[int]*hashQueue - messagesMut sync.RWMutex - messages map[int]*whisperFilter + transactionMu sync.RWMutex + transactionQueue map[int]*hashQueue + + messagesMu sync.RWMutex + messages map[int]*whisperFilter // regmut sync.Mutex // register map[string][]*interface{} // TODO improve return type + solcPath string + solc *compiler.Solidity + agent *miner.RemoteAgent } @@ -59,17 +77,18 @@ type XEth struct { // confirms all transactions will be used. func New(eth *eth.Ethereum, frontend Frontend) *XEth { xeth := &XEth{ - backend: eth, - frontend: frontend, - whisper: NewWhisper(eth.Whisper()), - quit: make(chan struct{}), - filterManager: filter.NewFilterManager(eth.EventMux()), - logs: make(map[int]*logFilter), - messages: make(map[int]*whisperFilter), - agent: miner.NewRemoteAgent(), + backend: eth, + frontend: frontend, + whisper: NewWhisper(eth.Whisper()), + quit: make(chan struct{}), + filterManager: filter.NewFilterManager(eth.EventMux()), + logQueue: make(map[int]*logQueue), + blockQueue: make(map[int]*hashQueue), + transactionQueue: make(map[int]*hashQueue), + messages: make(map[int]*whisperFilter), + agent: miner.NewRemoteAgent(), } eth.Miner().Register(xeth.agent) - if frontend == nil { xeth.frontend = dummyFrontend{} } @@ -87,23 +106,41 @@ done: for { select { case <-timer.C: - self.logMut.Lock() - self.messagesMut.Lock() - for id, filter := range self.logs { + self.logMu.Lock() + for id, filter := range self.logQueue { if time.Since(filter.timeout) > filterTickerTime { self.filterManager.UninstallFilter(id) - delete(self.logs, id) + delete(self.logQueue, id) } } + self.logMu.Unlock() + self.blockMu.Lock() + for id, filter := range self.blockQueue { + if time.Since(filter.timeout) > filterTickerTime { + self.filterManager.UninstallFilter(id) + delete(self.blockQueue, id) + } + } + self.blockMu.Unlock() + + self.transactionMu.Lock() + for id, filter := range self.transactionQueue { + if time.Since(filter.timeout) > filterTickerTime { + self.filterManager.UninstallFilter(id) + delete(self.transactionQueue, id) + } + } + self.transactionMu.Unlock() + + self.messagesMu.Lock() for id, filter := range self.messages { if time.Since(filter.activity()) > filterTickerTime { self.Whisper().Unwatch(id) delete(self.messages, id) } } - self.messagesMut.Unlock() - self.logMut.Unlock() + self.messagesMu.Unlock() case <-self.quit: break done } @@ -151,9 +188,38 @@ func (self *XEth) AtStateNum(num int64) *XEth { return self.WithState(st) } +// applies queued transactions originating from address onto the latest state +// and creates a block +// only used in tests +// - could be removed in favour of mining on testdag (natspec e2e + networking) +// + filters +func (self *XEth) ApplyTestTxs(statedb *state.StateDB, address common.Address, txc uint64) (uint64, *XEth) { + + block := self.backend.ChainManager().NewBlock(address) + coinbase := statedb.GetStateObject(address) + coinbase.SetGasPool(big.NewInt(10000000)) + txs := self.backend.TxPool().GetQueuedTransactions() + + for i := 0; i < len(txs); i++ { + for _, tx := range txs { + if tx.Nonce() == txc { + _, _, err := core.ApplyMessage(core.NewEnv(statedb, self.backend.ChainManager(), tx, block), tx, coinbase) + if err != nil { + panic(err) + } + txc++ + } + } + } + + xeth := self.WithState(statedb) + return txc, xeth +} + func (self *XEth) WithState(statedb *state.StateDB) *XEth { xeth := &XEth{ - backend: self.backend, + backend: self.backend, + frontend: self.frontend, } xeth.state = NewState(xeth, statedb) @@ -162,6 +228,44 @@ func (self *XEth) WithState(statedb *state.StateDB) *XEth { func (self *XEth) State() *State { return self.state } +// subscribes to new head block events and +// waits until blockchain height is greater n at any time +// given the current head, waits for the next chain event +// sets the state to the current head +// loop is async and quit by closing the channel +// used in tests and JS console debug module to control advancing private chain manually +// Note: this is not threadsafe, only called in JS single process and tests +func (self *XEth) UpdateState() (wait chan *big.Int) { + wait = make(chan *big.Int) + go func() { + sub := self.backend.EventMux().Subscribe(core.ChainHeadEvent{}) + var m, n *big.Int + var ok bool + out: + for { + select { + case event := <-sub.Chan(): + ev, ok := event.(core.ChainHeadEvent) + if ok { + m = ev.Block.Number() + if n != nil && n.Cmp(m) < 0 { + wait <- n + n = nil + } + statedb := state.New(ev.Block.Root(), self.backend.StateDb()) + self.state = NewState(self, statedb) + } + case n, ok = <-wait: + if !ok { + break out + } + } + } + sub.Unsubscribe() + }() + return +} + func (self *XEth) Whisper() *Whisper { return self.whisper } func (self *XEth) getBlockByHeight(height int64) *types.Block { @@ -201,6 +305,8 @@ func (self *XEth) EthTransactionByHash(hash string) (tx *types.Transaction, blha data, _ := self.backend.ExtraDb().Get(common.FromHex(hash)) if len(data) != 0 { tx = types.NewTransactionFromBytes(data) + } else { // check pending transactions + tx = self.backend.TxPool().GetTransaction(common.HexToHash(hash)) } // meta @@ -262,6 +368,23 @@ func (self *XEth) Accounts() []string { return accountAddresses } +// accessor for solidity compiler. +// memoized if available, retried on-demand if not +func (self *XEth) Solc() (*compiler.Solidity, error) { + var err error + if self.solc == nil { + self.solc, err = compiler.New(self.solcPath) + } + return self.solc, err +} + +// set in js console via admin interface or wrapper from cli flags +func (self *XEth) SetSolc(solcPath string) (*compiler.Solidity, error) { + self.solcPath = solcPath + self.solc = nil + return self.Solc() +} + func (self *XEth) DbPut(key, val []byte) bool { self.backend.ExtraDb().Put(key, val) return true @@ -360,7 +483,32 @@ func (self *XEth) SecretToAddress(key string) string { return common.ToHex(pair.Address()) } -func (self *XEth) RegisterFilter(earliest, latest int64, skip, max int, address []string, topics [][]string) int { +func (self *XEth) UninstallFilter(id int) bool { + defer self.filterManager.UninstallFilter(id) + + if _, ok := self.logQueue[id]; ok { + self.logMu.Lock() + defer self.logMu.Unlock() + delete(self.logQueue, id) + return true + } + if _, ok := self.blockQueue[id]; ok { + self.blockMu.Lock() + defer self.blockMu.Unlock() + delete(self.blockQueue, id) + return true + } + if _, ok := self.transactionQueue[id]; ok { + self.transactionMu.Lock() + defer self.transactionMu.Unlock() + delete(self.transactionQueue, id) + return true + } + + return false +} + +func (self *XEth) NewLogFilter(earliest, latest int64, skip, max int, address []string, topics [][]string) int { var id int filter := core.NewFilter(self.backend) filter.SetEarliestBlock(earliest) @@ -370,71 +518,90 @@ func (self *XEth) RegisterFilter(earliest, latest int64, skip, max int, address filter.SetAddress(cAddress(address)) filter.SetTopics(cTopics(topics)) filter.LogsCallback = func(logs state.Logs) { - self.logMut.Lock() - defer self.logMut.Unlock() + self.logMu.Lock() + defer self.logMu.Unlock() - self.logs[id].add(logs...) + self.logQueue[id].add(logs...) } id = self.filterManager.InstallFilter(filter) - self.logs[id] = &logFilter{timeout: time.Now()} + self.logQueue[id] = &logQueue{timeout: time.Now()} return id } -func (self *XEth) UninstallFilter(id int) bool { - if _, ok := self.logs[id]; ok { - delete(self.logs, id) - self.filterManager.UninstallFilter(id) - return true - } +func (self *XEth) NewTransactionFilter() int { + var id int + filter := core.NewFilter(self.backend) + filter.TransactionCallback = func(tx *types.Transaction) { + self.transactionMu.Lock() + defer self.transactionMu.Unlock() - return false + self.transactionQueue[id].add(tx.Hash()) + } + id = self.filterManager.InstallFilter(filter) + self.transactionQueue[id] = &hashQueue{timeout: time.Now()} + return id } -func (self *XEth) NewFilterString(word string) int { +func (self *XEth) NewBlockFilter() int { var id int filter := core.NewFilter(self.backend) + filter.BlockCallback = func(block *types.Block, logs state.Logs) { + self.blockMu.Lock() + defer self.blockMu.Unlock() - switch word { - case "pending": - filter.PendingCallback = func(tx *types.Transaction) { - self.logMut.Lock() - defer self.logMut.Unlock() - - self.logs[id].add(&state.Log{}) - } - case "latest": - filter.BlockCallback = func(block *types.Block, logs state.Logs) { - self.logMut.Lock() - defer self.logMut.Unlock() + self.blockQueue[id].add(block.Hash()) + } + id = self.filterManager.InstallFilter(filter) + self.blockQueue[id] = &hashQueue{timeout: time.Now()} + return id +} - for _, log := range logs { - self.logs[id].add(log) - } - self.logs[id].add(&state.Log{}) - } +func (self *XEth) GetFilterType(id int) byte { + if _, ok := self.blockQueue[id]; ok { + return BlockFilterTy + } else if _, ok := self.transactionQueue[id]; ok { + return TransactionFilterTy + } else if _, ok := self.logQueue[id]; ok { + return LogFilterTy } - id = self.filterManager.InstallFilter(filter) - self.logs[id] = &logFilter{timeout: time.Now()} + return UnknownFilterTy +} - return id +func (self *XEth) LogFilterChanged(id int) state.Logs { + self.logMu.Lock() + defer self.logMu.Unlock() + + if self.logQueue[id] != nil { + return self.logQueue[id].get() + } + return nil } -func (self *XEth) FilterChanged(id int) state.Logs { - self.logMut.Lock() - defer self.logMut.Unlock() +func (self *XEth) BlockFilterChanged(id int) []common.Hash { + self.blockMu.Lock() + defer self.blockMu.Unlock() - if self.logs[id] != nil { - return self.logs[id].get() + if self.blockQueue[id] != nil { + return self.blockQueue[id].get() } + return nil +} + +func (self *XEth) TransactionFilterChanged(id int) []common.Hash { + self.blockMu.Lock() + defer self.blockMu.Unlock() + if self.transactionQueue[id] != nil { + return self.transactionQueue[id].get() + } return nil } func (self *XEth) Logs(id int) state.Logs { - self.logMut.Lock() - defer self.logMut.Unlock() + self.logMu.Lock() + defer self.logMu.Unlock() filter := self.filterManager.GetFilter(id) if filter != nil { @@ -465,24 +632,24 @@ func (p *XEth) NewWhisperFilter(to, from string, topics [][]string) int { // Callback to delegate core whisper messages to this xeth filter callback := func(msg WhisperMessage) { - p.messagesMut.RLock() // Only read lock to the filter pool - defer p.messagesMut.RUnlock() + p.messagesMu.RLock() // Only read lock to the filter pool + defer p.messagesMu.RUnlock() p.messages[id].insert(msg) } // Initialize the core whisper filter and wrap into xeth id = p.Whisper().Watch(to, from, topics, callback) - p.messagesMut.Lock() + p.messagesMu.Lock() p.messages[id] = newWhisperFilter(id, p.Whisper()) - p.messagesMut.Unlock() + p.messagesMu.Unlock() return id } // UninstallWhisperFilter disables and removes an existing filter. func (p *XEth) UninstallWhisperFilter(id int) bool { - p.messagesMut.Lock() - defer p.messagesMut.Unlock() + p.messagesMu.Lock() + defer p.messagesMu.Unlock() if _, ok := p.messages[id]; ok { delete(p.messages, id) @@ -493,8 +660,8 @@ func (p *XEth) UninstallWhisperFilter(id int) bool { // WhisperMessages retrieves all the known messages that match a specific filter. func (self *XEth) WhisperMessages(id int) []WhisperMessage { - self.messagesMut.RLock() - defer self.messagesMut.RUnlock() + self.messagesMu.RLock() + defer self.messagesMu.RUnlock() if self.messages[id] != nil { return self.messages[id].messages() @@ -505,8 +672,8 @@ func (self *XEth) WhisperMessages(id int) []WhisperMessage { // WhisperMessagesChanged retrieves all the new messages matched by a filter // since the last retrieval func (self *XEth) WhisperMessagesChanged(id int) []WhisperMessage { - self.messagesMut.RLock() - defer self.messagesMut.RUnlock() + self.messagesMu.RLock() + defer self.messagesMu.RUnlock() if self.messages[id] != nil { return self.messages[id].retrieve() @@ -643,12 +810,18 @@ func (self *XEth) Call(fromStr, toStr, valueStr, gasStr, gasPriceStr, dataStr st } func (self *XEth) ConfirmTransaction(tx string) bool { - return self.frontend.ConfirmTransaction(tx) - } func (self *XEth) Transact(fromStr, toStr, nonceStr, valueStr, gasStr, gasPriceStr, codeStr string) (string, error) { + + // this minimalistic recoding is enough (works for natspec.js) + var jsontx = fmt.Sprintf(`{"params":[{"to":"%s","data": "%s"}]}`, toStr, codeStr) + if !self.ConfirmTransaction(jsontx) { + err := fmt.Errorf("Transaction not confirmed") + return "", err + } + var ( from = common.HexToAddress(fromStr) to = common.HexToAddress(toStr) @@ -767,19 +940,36 @@ func (m callmsg) Gas() *big.Int { return m.gas } func (m callmsg) Value() *big.Int { return m.value } func (m callmsg) Data() []byte { return m.data } -type logFilter struct { +type logQueue struct { logs state.Logs timeout time.Time id int } -func (l *logFilter) add(logs ...*state.Log) { +func (l *logQueue) add(logs ...*state.Log) { l.logs = append(l.logs, logs...) } -func (l *logFilter) get() state.Logs { +func (l *logQueue) get() state.Logs { l.timeout = time.Now() tmp := l.logs l.logs = nil return tmp } + +type hashQueue struct { + hashes []common.Hash + timeout time.Time + id int +} + +func (l *hashQueue) add(hashes ...common.Hash) { + l.hashes = append(l.hashes, hashes...) +} + +func (l *hashQueue) get() []common.Hash { + l.timeout = time.Now() + tmp := l.hashes + l.hashes = nil + return tmp +} |