aboutsummaryrefslogtreecommitdiffstats
path: root/xeth/xeth.go
diff options
context:
space:
mode:
Diffstat (limited to 'xeth/xeth.go')
-rw-r--r--xeth/xeth.go340
1 files changed, 265 insertions, 75 deletions
diff --git a/xeth/xeth.go b/xeth/xeth.go
index ac59069d5..ad8596803 100644
--- a/xeth/xeth.go
+++ b/xeth/xeth.go
@@ -11,6 +11,7 @@ import (
"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/common/compiler"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
@@ -29,6 +30,14 @@ var (
defaultGas = big.NewInt(90000) //500000
)
+// byte will be inferred
+const (
+ UnknownFilterTy = iota
+ BlockFilterTy
+ TransactionFilterTy
+ LogFilterTy
+)
+
func DefaultGas() *big.Int { return new(big.Int).Set(defaultGas) }
func DefaultGasPrice() *big.Int { return new(big.Int).Set(defaultGasPrice) }
@@ -42,15 +51,24 @@ type XEth struct {
quit chan struct{}
filterManager *filter.FilterManager
- logMut sync.RWMutex
- logs map[int]*logFilter
+ logMu sync.RWMutex
+ logQueue map[int]*logQueue
+
+ blockMu sync.RWMutex
+ blockQueue map[int]*hashQueue
- messagesMut sync.RWMutex
- messages map[int]*whisperFilter
+ transactionMu sync.RWMutex
+ transactionQueue map[int]*hashQueue
+
+ messagesMu sync.RWMutex
+ messages map[int]*whisperFilter
// regmut sync.Mutex
// register map[string][]*interface{} // TODO improve return type
+ solcPath string
+ solc *compiler.Solidity
+
agent *miner.RemoteAgent
}
@@ -59,17 +77,18 @@ type XEth struct {
// confirms all transactions will be used.
func New(eth *eth.Ethereum, frontend Frontend) *XEth {
xeth := &XEth{
- backend: eth,
- frontend: frontend,
- whisper: NewWhisper(eth.Whisper()),
- quit: make(chan struct{}),
- filterManager: filter.NewFilterManager(eth.EventMux()),
- logs: make(map[int]*logFilter),
- messages: make(map[int]*whisperFilter),
- agent: miner.NewRemoteAgent(),
+ backend: eth,
+ frontend: frontend,
+ whisper: NewWhisper(eth.Whisper()),
+ quit: make(chan struct{}),
+ filterManager: filter.NewFilterManager(eth.EventMux()),
+ logQueue: make(map[int]*logQueue),
+ blockQueue: make(map[int]*hashQueue),
+ transactionQueue: make(map[int]*hashQueue),
+ messages: make(map[int]*whisperFilter),
+ agent: miner.NewRemoteAgent(),
}
eth.Miner().Register(xeth.agent)
-
if frontend == nil {
xeth.frontend = dummyFrontend{}
}
@@ -87,23 +106,41 @@ done:
for {
select {
case <-timer.C:
- self.logMut.Lock()
- self.messagesMut.Lock()
- for id, filter := range self.logs {
+ self.logMu.Lock()
+ for id, filter := range self.logQueue {
if time.Since(filter.timeout) > filterTickerTime {
self.filterManager.UninstallFilter(id)
- delete(self.logs, id)
+ delete(self.logQueue, id)
}
}
+ self.logMu.Unlock()
+ self.blockMu.Lock()
+ for id, filter := range self.blockQueue {
+ if time.Since(filter.timeout) > filterTickerTime {
+ self.filterManager.UninstallFilter(id)
+ delete(self.blockQueue, id)
+ }
+ }
+ self.blockMu.Unlock()
+
+ self.transactionMu.Lock()
+ for id, filter := range self.transactionQueue {
+ if time.Since(filter.timeout) > filterTickerTime {
+ self.filterManager.UninstallFilter(id)
+ delete(self.transactionQueue, id)
+ }
+ }
+ self.transactionMu.Unlock()
+
+ self.messagesMu.Lock()
for id, filter := range self.messages {
if time.Since(filter.activity()) > filterTickerTime {
self.Whisper().Unwatch(id)
delete(self.messages, id)
}
}
- self.messagesMut.Unlock()
- self.logMut.Unlock()
+ self.messagesMu.Unlock()
case <-self.quit:
break done
}
@@ -151,9 +188,38 @@ func (self *XEth) AtStateNum(num int64) *XEth {
return self.WithState(st)
}
+// applies queued transactions originating from address onto the latest state
+// and creates a block
+// only used in tests
+// - could be removed in favour of mining on testdag (natspec e2e + networking)
+// + filters
+func (self *XEth) ApplyTestTxs(statedb *state.StateDB, address common.Address, txc uint64) (uint64, *XEth) {
+
+ block := self.backend.ChainManager().NewBlock(address)
+ coinbase := statedb.GetStateObject(address)
+ coinbase.SetGasPool(big.NewInt(10000000))
+ txs := self.backend.TxPool().GetQueuedTransactions()
+
+ for i := 0; i < len(txs); i++ {
+ for _, tx := range txs {
+ if tx.Nonce() == txc {
+ _, _, err := core.ApplyMessage(core.NewEnv(statedb, self.backend.ChainManager(), tx, block), tx, coinbase)
+ if err != nil {
+ panic(err)
+ }
+ txc++
+ }
+ }
+ }
+
+ xeth := self.WithState(statedb)
+ return txc, xeth
+}
+
func (self *XEth) WithState(statedb *state.StateDB) *XEth {
xeth := &XEth{
- backend: self.backend,
+ backend: self.backend,
+ frontend: self.frontend,
}
xeth.state = NewState(xeth, statedb)
@@ -162,6 +228,44 @@ func (self *XEth) WithState(statedb *state.StateDB) *XEth {
func (self *XEth) State() *State { return self.state }
+// subscribes to new head block events and
+// waits until blockchain height is greater n at any time
+// given the current head, waits for the next chain event
+// sets the state to the current head
+// loop is async and quit by closing the channel
+// used in tests and JS console debug module to control advancing private chain manually
+// Note: this is not threadsafe, only called in JS single process and tests
+func (self *XEth) UpdateState() (wait chan *big.Int) {
+ wait = make(chan *big.Int)
+ go func() {
+ sub := self.backend.EventMux().Subscribe(core.ChainHeadEvent{})
+ var m, n *big.Int
+ var ok bool
+ out:
+ for {
+ select {
+ case event := <-sub.Chan():
+ ev, ok := event.(core.ChainHeadEvent)
+ if ok {
+ m = ev.Block.Number()
+ if n != nil && n.Cmp(m) < 0 {
+ wait <- n
+ n = nil
+ }
+ statedb := state.New(ev.Block.Root(), self.backend.StateDb())
+ self.state = NewState(self, statedb)
+ }
+ case n, ok = <-wait:
+ if !ok {
+ break out
+ }
+ }
+ }
+ sub.Unsubscribe()
+ }()
+ return
+}
+
func (self *XEth) Whisper() *Whisper { return self.whisper }
func (self *XEth) getBlockByHeight(height int64) *types.Block {
@@ -201,6 +305,8 @@ func (self *XEth) EthTransactionByHash(hash string) (tx *types.Transaction, blha
data, _ := self.backend.ExtraDb().Get(common.FromHex(hash))
if len(data) != 0 {
tx = types.NewTransactionFromBytes(data)
+ } else { // check pending transactions
+ tx = self.backend.TxPool().GetTransaction(common.HexToHash(hash))
}
// meta
@@ -262,6 +368,23 @@ func (self *XEth) Accounts() []string {
return accountAddresses
}
+// accessor for solidity compiler.
+// memoized if available, retried on-demand if not
+func (self *XEth) Solc() (*compiler.Solidity, error) {
+ var err error
+ if self.solc == nil {
+ self.solc, err = compiler.New(self.solcPath)
+ }
+ return self.solc, err
+}
+
+// set in js console via admin interface or wrapper from cli flags
+func (self *XEth) SetSolc(solcPath string) (*compiler.Solidity, error) {
+ self.solcPath = solcPath
+ self.solc = nil
+ return self.Solc()
+}
+
func (self *XEth) DbPut(key, val []byte) bool {
self.backend.ExtraDb().Put(key, val)
return true
@@ -360,7 +483,32 @@ func (self *XEth) SecretToAddress(key string) string {
return common.ToHex(pair.Address())
}
-func (self *XEth) RegisterFilter(earliest, latest int64, skip, max int, address []string, topics [][]string) int {
+func (self *XEth) UninstallFilter(id int) bool {
+ defer self.filterManager.UninstallFilter(id)
+
+ if _, ok := self.logQueue[id]; ok {
+ self.logMu.Lock()
+ defer self.logMu.Unlock()
+ delete(self.logQueue, id)
+ return true
+ }
+ if _, ok := self.blockQueue[id]; ok {
+ self.blockMu.Lock()
+ defer self.blockMu.Unlock()
+ delete(self.blockQueue, id)
+ return true
+ }
+ if _, ok := self.transactionQueue[id]; ok {
+ self.transactionMu.Lock()
+ defer self.transactionMu.Unlock()
+ delete(self.transactionQueue, id)
+ return true
+ }
+
+ return false
+}
+
+func (self *XEth) NewLogFilter(earliest, latest int64, skip, max int, address []string, topics [][]string) int {
var id int
filter := core.NewFilter(self.backend)
filter.SetEarliestBlock(earliest)
@@ -370,71 +518,90 @@ func (self *XEth) RegisterFilter(earliest, latest int64, skip, max int, address
filter.SetAddress(cAddress(address))
filter.SetTopics(cTopics(topics))
filter.LogsCallback = func(logs state.Logs) {
- self.logMut.Lock()
- defer self.logMut.Unlock()
+ self.logMu.Lock()
+ defer self.logMu.Unlock()
- self.logs[id].add(logs...)
+ self.logQueue[id].add(logs...)
}
id = self.filterManager.InstallFilter(filter)
- self.logs[id] = &logFilter{timeout: time.Now()}
+ self.logQueue[id] = &logQueue{timeout: time.Now()}
return id
}
-func (self *XEth) UninstallFilter(id int) bool {
- if _, ok := self.logs[id]; ok {
- delete(self.logs, id)
- self.filterManager.UninstallFilter(id)
- return true
- }
+func (self *XEth) NewTransactionFilter() int {
+ var id int
+ filter := core.NewFilter(self.backend)
+ filter.TransactionCallback = func(tx *types.Transaction) {
+ self.transactionMu.Lock()
+ defer self.transactionMu.Unlock()
- return false
+ self.transactionQueue[id].add(tx.Hash())
+ }
+ id = self.filterManager.InstallFilter(filter)
+ self.transactionQueue[id] = &hashQueue{timeout: time.Now()}
+ return id
}
-func (self *XEth) NewFilterString(word string) int {
+func (self *XEth) NewBlockFilter() int {
var id int
filter := core.NewFilter(self.backend)
+ filter.BlockCallback = func(block *types.Block, logs state.Logs) {
+ self.blockMu.Lock()
+ defer self.blockMu.Unlock()
- switch word {
- case "pending":
- filter.PendingCallback = func(tx *types.Transaction) {
- self.logMut.Lock()
- defer self.logMut.Unlock()
-
- self.logs[id].add(&state.Log{})
- }
- case "latest":
- filter.BlockCallback = func(block *types.Block, logs state.Logs) {
- self.logMut.Lock()
- defer self.logMut.Unlock()
+ self.blockQueue[id].add(block.Hash())
+ }
+ id = self.filterManager.InstallFilter(filter)
+ self.blockQueue[id] = &hashQueue{timeout: time.Now()}
+ return id
+}
- for _, log := range logs {
- self.logs[id].add(log)
- }
- self.logs[id].add(&state.Log{})
- }
+func (self *XEth) GetFilterType(id int) byte {
+ if _, ok := self.blockQueue[id]; ok {
+ return BlockFilterTy
+ } else if _, ok := self.transactionQueue[id]; ok {
+ return TransactionFilterTy
+ } else if _, ok := self.logQueue[id]; ok {
+ return LogFilterTy
}
- id = self.filterManager.InstallFilter(filter)
- self.logs[id] = &logFilter{timeout: time.Now()}
+ return UnknownFilterTy
+}
- return id
+func (self *XEth) LogFilterChanged(id int) state.Logs {
+ self.logMu.Lock()
+ defer self.logMu.Unlock()
+
+ if self.logQueue[id] != nil {
+ return self.logQueue[id].get()
+ }
+ return nil
}
-func (self *XEth) FilterChanged(id int) state.Logs {
- self.logMut.Lock()
- defer self.logMut.Unlock()
+func (self *XEth) BlockFilterChanged(id int) []common.Hash {
+ self.blockMu.Lock()
+ defer self.blockMu.Unlock()
- if self.logs[id] != nil {
- return self.logs[id].get()
+ if self.blockQueue[id] != nil {
+ return self.blockQueue[id].get()
}
+ return nil
+}
+
+func (self *XEth) TransactionFilterChanged(id int) []common.Hash {
+ self.blockMu.Lock()
+ defer self.blockMu.Unlock()
+ if self.transactionQueue[id] != nil {
+ return self.transactionQueue[id].get()
+ }
return nil
}
func (self *XEth) Logs(id int) state.Logs {
- self.logMut.Lock()
- defer self.logMut.Unlock()
+ self.logMu.Lock()
+ defer self.logMu.Unlock()
filter := self.filterManager.GetFilter(id)
if filter != nil {
@@ -465,24 +632,24 @@ func (p *XEth) NewWhisperFilter(to, from string, topics [][]string) int {
// Callback to delegate core whisper messages to this xeth filter
callback := func(msg WhisperMessage) {
- p.messagesMut.RLock() // Only read lock to the filter pool
- defer p.messagesMut.RUnlock()
+ p.messagesMu.RLock() // Only read lock to the filter pool
+ defer p.messagesMu.RUnlock()
p.messages[id].insert(msg)
}
// Initialize the core whisper filter and wrap into xeth
id = p.Whisper().Watch(to, from, topics, callback)
- p.messagesMut.Lock()
+ p.messagesMu.Lock()
p.messages[id] = newWhisperFilter(id, p.Whisper())
- p.messagesMut.Unlock()
+ p.messagesMu.Unlock()
return id
}
// UninstallWhisperFilter disables and removes an existing filter.
func (p *XEth) UninstallWhisperFilter(id int) bool {
- p.messagesMut.Lock()
- defer p.messagesMut.Unlock()
+ p.messagesMu.Lock()
+ defer p.messagesMu.Unlock()
if _, ok := p.messages[id]; ok {
delete(p.messages, id)
@@ -493,8 +660,8 @@ func (p *XEth) UninstallWhisperFilter(id int) bool {
// WhisperMessages retrieves all the known messages that match a specific filter.
func (self *XEth) WhisperMessages(id int) []WhisperMessage {
- self.messagesMut.RLock()
- defer self.messagesMut.RUnlock()
+ self.messagesMu.RLock()
+ defer self.messagesMu.RUnlock()
if self.messages[id] != nil {
return self.messages[id].messages()
@@ -505,8 +672,8 @@ func (self *XEth) WhisperMessages(id int) []WhisperMessage {
// WhisperMessagesChanged retrieves all the new messages matched by a filter
// since the last retrieval
func (self *XEth) WhisperMessagesChanged(id int) []WhisperMessage {
- self.messagesMut.RLock()
- defer self.messagesMut.RUnlock()
+ self.messagesMu.RLock()
+ defer self.messagesMu.RUnlock()
if self.messages[id] != nil {
return self.messages[id].retrieve()
@@ -643,12 +810,18 @@ func (self *XEth) Call(fromStr, toStr, valueStr, gasStr, gasPriceStr, dataStr st
}
func (self *XEth) ConfirmTransaction(tx string) bool {
-
return self.frontend.ConfirmTransaction(tx)
-
}
func (self *XEth) Transact(fromStr, toStr, nonceStr, valueStr, gasStr, gasPriceStr, codeStr string) (string, error) {
+
+ // this minimalistic recoding is enough (works for natspec.js)
+ var jsontx = fmt.Sprintf(`{"params":[{"to":"%s","data": "%s"}]}`, toStr, codeStr)
+ if !self.ConfirmTransaction(jsontx) {
+ err := fmt.Errorf("Transaction not confirmed")
+ return "", err
+ }
+
var (
from = common.HexToAddress(fromStr)
to = common.HexToAddress(toStr)
@@ -767,19 +940,36 @@ func (m callmsg) Gas() *big.Int { return m.gas }
func (m callmsg) Value() *big.Int { return m.value }
func (m callmsg) Data() []byte { return m.data }
-type logFilter struct {
+type logQueue struct {
logs state.Logs
timeout time.Time
id int
}
-func (l *logFilter) add(logs ...*state.Log) {
+func (l *logQueue) add(logs ...*state.Log) {
l.logs = append(l.logs, logs...)
}
-func (l *logFilter) get() state.Logs {
+func (l *logQueue) get() state.Logs {
l.timeout = time.Now()
tmp := l.logs
l.logs = nil
return tmp
}
+
+type hashQueue struct {
+ hashes []common.Hash
+ timeout time.Time
+ id int
+}
+
+func (l *hashQueue) add(hashes ...common.Hash) {
+ l.hashes = append(l.hashes, hashes...)
+}
+
+func (l *hashQueue) get() []common.Hash {
+ l.timeout = time.Now()
+ tmp := l.hashes
+ l.hashes = nil
+ return tmp
+}