diff options
50 files changed, 1085 insertions, 857 deletions
diff --git a/.travis.yml b/.travis.yml index 2dfb7e283..1b3104826 100644 --- a/.travis.yml +++ b/.travis.yml @@ -6,18 +6,10 @@ before_install: - sudo apt-get update -qq - sudo apt-get install -yqq libgmp3-dev libreadline6-dev qt54quickcontrols qt54webengine install: - - go get code.google.com/p/go.tools/cmd/goimports - - go get github.com/golang/lint/golint # - go get golang.org/x/tools/cmd/vet - if ! go get code.google.com/p/go.tools/cmd/cover; then go get golang.org/x/tools/cmd/cover; fi - go get github.com/mattn/goveralls - - go get gopkg.in/check.v1 - - go get github.com/tools/godep before_script: - - godep restore - - gofmt -l -w . - - goimports -l -w . - - golint . # - go vet ./... # - go test -race ./... script: diff --git a/accounts/accounts_test.go b/accounts/accounts_test.go index da9406ebe..30e8c6285 100644 --- a/accounts/accounts_test.go +++ b/accounts/accounts_test.go @@ -1,8 +1,10 @@ package accounts import ( - "github.com/ethereum/go-ethereum/crypto" "testing" + + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/crypto/randentropy" ) func TestAccountManager(t *testing.T) { @@ -10,7 +12,7 @@ func TestAccountManager(t *testing.T) { am := NewAccountManager(ks) pass := "" // not used but required by API a1, err := am.NewAccount(pass) - toSign := crypto.GetEntropyCSPRNG(32) + toSign := randentropy.GetEntropyCSPRNG(32) _, err = am.Sign(a1, pass, toSign) if err != nil { t.Fatal(err) diff --git a/cmd/ethereum/main.go b/cmd/ethereum/main.go index 1ffd4b6e9..1f1a0b761 100644 --- a/cmd/ethereum/main.go +++ b/cmd/ethereum/main.go @@ -67,6 +67,7 @@ func main() { DataDir: Datadir, LogFile: LogFile, LogLevel: LogLevel, + LogFormat: LogFormat, MaxPeers: MaxPeer, Port: OutboundPort, NAT: NAT, diff --git a/cmd/ethtest/main.go b/cmd/ethtest/main.go index e1c4806ad..40874616c 100644 --- a/cmd/ethtest/main.go +++ b/cmd/ethtest/main.go @@ -51,8 +51,8 @@ func StateObjectFromAccount(db ethutil.Database, addr string, account Account) * if ethutil.IsHex(account.Code) { account.Code = account.Code[2:] } - obj.Code = ethutil.Hex2Bytes(account.Code) - obj.Nonce = ethutil.Big(account.Nonce).Uint64() + obj.SetCode(ethutil.Hex2Bytes(account.Code)) + obj.SetNonce(ethutil.Big(account.Nonce).Uint64()) return obj } diff --git a/cmd/mist/assets/examples/bomb.html b/cmd/mist/assets/examples/bomb.html new file mode 100644 index 000000000..62540f9bb --- /dev/null +++ b/cmd/mist/assets/examples/bomb.html @@ -0,0 +1,22 @@ +<html> +<head> +<script src="../ext/bignumber.min.js"></script> +<script src="../ext/ethereum.js/dist/ethereum.js"></script> + +<script> +var web3 = require('web3'); +web3.setProvider(new web3.providers.HttpSyncProvider('http://localhost:8545')); +var eth = web3.eth; + +function bomb() { + for (var i = 0; i < 200; i++) { + eth.transact({}) + } +} +</script> +</head> + +<body> +<button onclick="bomb();">BOOM!</button> +</body> +</html> diff --git a/cmd/mist/assets/ext/mist.js b/cmd/mist/assets/ext/mist.js index 8734f8dc7..2fc38cdfa 100644 --- a/cmd/mist/assets/ext/mist.js +++ b/cmd/mist/assets/ext/mist.js @@ -20,16 +20,18 @@ console.log("loaded?"); document.onkeydown = function(evt) { + // This functions keeps track of keyboard inputs in order to allow copy, paste and other features + evt = evt || window.event; if (evt.ctrlKey && evt.keyCode == 67) { window.document.execCommand("copy"); - console.log("Ctrl-C"); } else if (evt.ctrlKey && evt.keyCode == 88) { window.document.execCommand("cut"); - console.log("Ctrl-X"); - } if (evt.ctrlKey && evt.keyCode == 86) { - console.log("Ctrl-V"); - } if (evt.ctrlKey && evt.keyCode == 90) { - console.log("Ctrl-Z"); + } else if (evt.ctrlKey && evt.keyCode == 86) { + window.document.execCommand("paste"); + } else if (evt.ctrlKey && evt.keyCode == 90) { + window.document.execCommand("undo"); + } else if (evt.ctrlKey && evt.shiftKey && evt.keyCode == 90) { + window.document.execCommand("redo"); } };
\ No newline at end of file diff --git a/cmd/mist/assets/qml/main.qml b/cmd/mist/assets/qml/main.qml index f9bfd9b8d..937670bd2 100644 --- a/cmd/mist/assets/qml/main.qml +++ b/cmd/mist/assets/qml/main.qml @@ -131,7 +131,11 @@ ApplicationWindow { var existingDomain = matches && matches[1]; if (requestedDomain == existingDomain) { domainAlreadyOpen = true; - mainSplit.views[i].view.url = url; + + if (mainSplit.views[i].view.url != url){ + mainSplit.views[i].view.url = url; + } + activeView(mainSplit.views[i].view, mainSplit.views[i].menuItem); } } @@ -246,6 +250,7 @@ ApplicationWindow { } } } + } property var blockModel: ListModel { @@ -927,7 +932,8 @@ ApplicationWindow { model: peerModel TableViewColumn{width: 180; role: "addr" ; title: "Remote Address" } TableViewColumn{width: 280; role: "nodeID" ; title: "Node ID" } - TableViewColumn{width: 180; role: "caps" ; title: "Capabilities" } + TableViewColumn{width: 100; role: "name" ; title: "Name" } + TableViewColumn{width: 40; role: "caps" ; title: "Capabilities" } } } } diff --git a/cmd/mist/assets/qml/views/browser.qml b/cmd/mist/assets/qml/views/browser.qml index 54f5d755e..edecc8696 100644 --- a/cmd/mist/assets/qml/views/browser.qml +++ b/cmd/mist/assets/qml/views/browser.qml @@ -3,7 +3,7 @@ import QtQuick.Controls 1.0; import QtQuick.Controls.Styles 1.0 import QtQuick.Layouts 1.0; import QtWebEngine 1.0 -//import QtWebEngine.experimental 1.0 +import QtWebEngine.experimental 1.0 import QtQuick.Window 2.0; Rectangle { @@ -340,7 +340,7 @@ Rectangle { WebEngineView { objectName: "webView" id: webview - //experimental.settings.javascriptCanAccessClipboard: true + experimental.settings.javascriptCanAccessClipboard: true //experimental.settings.localContentCanAccessRemoteUrls: true anchors { left: parent.left @@ -399,7 +399,8 @@ Rectangle { onLoadingChanged: { if (loadRequest.status == WebEngineView.LoadSucceededStatus) { - webview.runJavaScript("document.title", function(pageTitle) { + + webview.runJavaScript("document.title", function(pageTitle) { menuItem.title = pageTitle; }); @@ -441,7 +442,8 @@ Rectangle { webview.runJavaScript(eth.readFile("bignumber.min.js")); webview.runJavaScript(eth.readFile("ethereum.js/dist/ethereum.js")); - + webview.runJavaScript(eth.readFile("mist.js")); + var cleanTitle = webview.url.toString() var matches = cleanTitle.match(/^[a-z]*\:\/\/([^\/?#]+)(?:[\/?#]|$)/i); var domain = matches && matches[1]; diff --git a/cmd/mist/assets/qml/views/catalog.qml b/cmd/mist/assets/qml/views/catalog.qml index 497d69ed1..29e133074 100644 --- a/cmd/mist/assets/qml/views/catalog.qml +++ b/cmd/mist/assets/qml/views/catalog.qml @@ -3,7 +3,7 @@ import QtQuick.Controls 1.0; import QtQuick.Controls.Styles 1.0 import QtQuick.Layouts 1.0; import QtWebEngine 1.0 -//import QtWebEngine.experimental 1.0 +import QtWebEngine.experimental 1.0 import QtQuick.Window 2.0; @@ -21,8 +21,6 @@ Rectangle { property alias windowTitle: webview.title property alias webView: webview - - property var cleanPath: false property var open: function(url) { if(!window.cleanPath) { @@ -66,9 +64,6 @@ Rectangle { } } - Component.onCompleted: { - } - Item { objectName: "root" id: root @@ -85,7 +80,7 @@ Rectangle { property var domain: "ethereum-dapp-catalog.meteor.com" url: protocol + domain - //experimental.settings.javascriptCanAccessClipboard: true + experimental.settings.javascriptCanAccessClipboard: true onJavaScriptConsoleMessage: { @@ -112,11 +107,11 @@ Rectangle { } } - // onLoadingChanged: { - // if (loadRequest.status == WebEngineView.LoadSucceededStatus) { - // webview.runJavaScript(eth.readFile("mist.js")); - // } - // } + onLoadingChanged: { + if (loadRequest.status == WebEngineView.LoadSucceededStatus) { + webview.runJavaScript(eth.readFile("mist.js")); + } + } } diff --git a/cmd/mist/flags.go b/cmd/mist/flags.go index eb280f71b..d9487de9e 100644 --- a/cmd/mist/flags.go +++ b/cmd/mist/flags.go @@ -63,6 +63,7 @@ var ( DebugFile string LogLevel int VmType int + MinerThreads int ) // flags specific to gui client @@ -137,6 +138,8 @@ func Init() { flag.StringVar(&BootNodes, "bootnodes", "", "space-separated node URLs for discovery bootstrap") flag.IntVar(&MaxPeer, "maxpeer", 30, "maximum desired peers") + flag.IntVar(&MinerThreads, "minerthreads", runtime.NumCPU(), "number of miner threads") + flag.Parse() var err error diff --git a/cmd/mist/gui.go b/cmd/mist/gui.go index c9419473c..4af0cff43 100644 --- a/cmd/mist/gui.go +++ b/cmd/mist/gui.go @@ -131,6 +131,7 @@ func (gui *Gui) Start(assetPath string) { context.SetVar("gui", gui) context.SetVar("eth", gui.uiLib) context.SetVar("shh", gui.whisper) + //clipboard.SetQMLClipboard(context) win, err := gui.showWallet(context) if err != nil { @@ -386,14 +387,11 @@ func (gui *Gui) update() { generalUpdateTicker := time.NewTicker(500 * time.Millisecond) statsUpdateTicker := time.NewTicker(5 * time.Second) - state := gui.eth.ChainManager().TransState() - - gui.win.Root().Call("setWalletValue", fmt.Sprintf("%v", ethutil.CurrencyToString(state.GetAccount(gui.address()).Balance()))) - lastBlockLabel := gui.getObjectByName("lastBlockLabel") miningLabel := gui.getObjectByName("miningLabel") events := gui.eth.EventMux().Subscribe( + core.ChainEvent{}, core.TxPreEvent{}, core.TxPostEvent{}, ) @@ -406,6 +404,8 @@ func (gui *Gui) update() { return } switch ev := ev.(type) { + case core.ChainEvent: + gui.processBlock(ev.Block, false) case core.TxPreEvent: gui.insertTransaction("pre", ev.Tx) @@ -421,19 +421,6 @@ func (gui *Gui) update() { lastBlockLabel.Set("text", statusText) miningLabel.Set("text", "Mining @ "+strconv.FormatInt(gui.uiLib.Miner().HashRate(), 10)+"/Khash") - /* - blockLength := gui.eth.BlockPool().BlocksProcessed - chainLength := gui.eth.BlockPool().ChainLength - - var ( - pct float64 = 1.0 / float64(chainLength) * float64(blockLength) - dlWidget = gui.win.Root().ObjectByName("downloadIndicator") - dlLabel = gui.win.Root().ObjectByName("downloadLabel") - ) - dlWidget.Set("value", pct) - dlLabel.Set("text", fmt.Sprintf("%d / %d", blockLength, chainLength)) - */ - case <-statsUpdateTicker.C: gui.setStatsPane() } @@ -466,7 +453,7 @@ NumGC: %d )) } -type qmlpeer struct{ Addr, NodeID, Caps string } +type qmlpeer struct{ Addr, NodeID, Name, Caps string } type peersByID []*qmlpeer @@ -481,6 +468,7 @@ func (gui *Gui) setPeerInfo() { qpeers[i] = &qmlpeer{ NodeID: p.ID().String(), Addr: p.RemoteAddr().String(), + Name: p.Name(), Caps: fmt.Sprint(p.Caps()), } } diff --git a/cmd/mist/main.go b/cmd/mist/main.go index 14f561e99..d41aa34bf 100644 --- a/cmd/mist/main.go +++ b/cmd/mist/main.go @@ -52,19 +52,20 @@ func run() error { config := utils.InitConfig(VmType, ConfigFile, Datadir, "ETH") ethereum, err := eth.New(ð.Config{ - Name: p2p.MakeName(ClientIdentifier, Version), - KeyStore: KeyStore, - DataDir: Datadir, - LogFile: LogFile, - LogLevel: LogLevel, - MaxPeers: MaxPeer, - Port: OutboundPort, - NAT: NAT, - Shh: true, - BootNodes: BootNodes, - NodeKey: NodeKey, - KeyRing: KeyRing, - Dial: true, + Name: p2p.MakeName(ClientIdentifier, Version), + KeyStore: KeyStore, + DataDir: Datadir, + LogFile: LogFile, + LogLevel: LogLevel, + MaxPeers: MaxPeer, + Port: OutboundPort, + NAT: NAT, + Shh: true, + BootNodes: BootNodes, + NodeKey: NodeKey, + KeyRing: KeyRing, + Dial: true, + MinerThreads: MinerThreads, }) if err != nil { mainlogger.Fatalln(err) diff --git a/cmd/mist/ui_lib.go b/cmd/mist/ui_lib.go index 1a4d21012..4fa6e8e55 100644 --- a/cmd/mist/ui_lib.go +++ b/cmd/mist/ui_lib.go @@ -146,8 +146,8 @@ func (ui *UiLib) AssetPath(p string) string { func (self *UiLib) StartDbWithContractAndData(contractHash, data string) { dbWindow := NewDebuggerWindow(self) object := self.eth.ChainManager().State().GetStateObject(ethutil.Hex2Bytes(contractHash)) - if len(object.Code) > 0 { - dbWindow.SetCode(ethutil.Bytes2Hex(object.Code)) + if len(object.Code()) > 0 { + dbWindow.SetCode(ethutil.Bytes2Hex(object.Code())) } dbWindow.SetData(data) diff --git a/cmd/utils/cmd.go b/cmd/utils/cmd.go index ecb847fc3..d252f3ab2 100644 --- a/cmd/utils/cmd.go +++ b/cmd/utils/cmd.go @@ -225,7 +225,7 @@ func StartMining(ethereum *eth.Ethereum) bool { go func() { clilogger.Infoln("Start mining") if gminer == nil { - gminer = miner.New(addr, ethereum) + gminer = miner.New(addr, ethereum, 4) } gminer.Start() }() diff --git a/core/block_processor.go b/core/block_processor.go index b4449100f..bfd9d4560 100644 --- a/core/block_processor.go +++ b/core/block_processor.go @@ -73,24 +73,27 @@ func (sm *BlockProcessor) TransitionState(statedb *state.StateDB, parent, block return receipts, nil } -func (self *BlockProcessor) ApplyTransaction(coinbase *state.StateObject, state *state.StateDB, block *types.Block, tx *types.Transaction, usedGas *big.Int, transientProcess bool) (*types.Receipt, *big.Int, error) { +func (self *BlockProcessor) ApplyTransaction(coinbase *state.StateObject, statedb *state.StateDB, block *types.Block, tx *types.Transaction, usedGas *big.Int, transientProcess bool) (*types.Receipt, *big.Int, error) { // If we are mining this block and validating we want to set the logs back to 0 - state.EmptyLogs() + statedb.EmptyLogs() txGas := new(big.Int).Set(tx.Gas()) - cb := state.GetStateObject(coinbase.Address()) - st := NewStateTransition(NewEnv(state, self.bc, tx, block), tx, cb) + cb := statedb.GetStateObject(coinbase.Address()) + st := NewStateTransition(NewEnv(statedb, self.bc, tx, block), tx, cb) _, err := st.TransitionState() + if err != nil && (IsNonceErr(err) || state.IsGasLimitErr(err)) { + return nil, nil, err + } txGas.Sub(txGas, st.gas) // Update the state with pending changes - state.Update(txGas) + statedb.Update(txGas) cumulative := new(big.Int).Set(usedGas.Add(usedGas, txGas)) - receipt := types.NewReceipt(state.Root(), cumulative) - receipt.SetLogs(state.Logs()) + receipt := types.NewReceipt(statedb.Root(), cumulative) + receipt.SetLogs(statedb.Logs()) receipt.Bloom = types.CreateBloom(types.Receipts{receipt}) chainlogger.Debugln(receipt) @@ -99,12 +102,12 @@ func (self *BlockProcessor) ApplyTransaction(coinbase *state.StateObject, state go self.eventMux.Post(TxPostEvent{tx}) } - go self.eventMux.Post(state.Logs()) + go self.eventMux.Post(statedb.Logs()) return receipt, txGas, err } -func (self *BlockProcessor) ApplyTransactions(coinbase *state.StateObject, state *state.StateDB, block *types.Block, txs types.Transactions, transientProcess bool) (types.Receipts, types.Transactions, types.Transactions, types.Transactions, error) { +func (self *BlockProcessor) ApplyTransactions(coinbase *state.StateObject, statedb *state.StateDB, block *types.Block, txs types.Transactions, transientProcess bool) (types.Receipts, types.Transactions, types.Transactions, types.Transactions, error) { var ( receipts types.Receipts handled, unhandled types.Transactions @@ -115,12 +118,12 @@ func (self *BlockProcessor) ApplyTransactions(coinbase *state.StateObject, state ) for _, tx := range txs { - receipt, txGas, err := self.ApplyTransaction(coinbase, state, block, tx, totalUsedGas, transientProcess) + receipt, txGas, err := self.ApplyTransaction(coinbase, statedb, block, tx, totalUsedGas, transientProcess) if err != nil { switch { case IsNonceErr(err): return nil, nil, nil, nil, err - case IsGasLimitErr(err): + case state.IsGasLimitErr(err): return nil, nil, nil, nil, err default: statelogger.Infoln(err) @@ -293,16 +296,13 @@ func (sm *BlockProcessor) AccumulateRewards(statedb *state.StateDB, block, paren r := new(big.Int) r.Mul(BlockReward, big.NewInt(15)).Div(r, big.NewInt(16)) - uncleAccount := statedb.GetAccount(uncle.Coinbase) - uncleAccount.AddAmount(r) + statedb.AddBalance(uncle.Coinbase, r) reward.Add(reward, new(big.Int).Div(BlockReward, big.NewInt(32))) } // Get the account associated with the coinbase - account := statedb.GetAccount(block.Header().Coinbase) - // Reward amount of ether to the coinbase address - account.AddAmount(reward) + statedb.AddBalance(block.Header().Coinbase, reward) return nil } diff --git a/core/chain_manager.go b/core/chain_manager.go index 286282064..47cad825d 100644 --- a/core/chain_manager.go +++ b/core/chain_manager.go @@ -134,14 +134,11 @@ func (self *ChainManager) State() *state.StateDB { func (self *ChainManager) TransState() *state.StateDB { self.tsmu.RLock() defer self.tsmu.RUnlock() - //tmp := self.transState return self.transState } func (self *ChainManager) setTransState(statedb *state.StateDB) { - self.tsmu.Lock() - defer self.tsmu.Unlock() self.transState = statedb } @@ -361,6 +358,9 @@ func (bc *ChainManager) Stop() { } func (self *ChainManager) InsertChain(chain types.Blocks) error { + self.tsmu.Lock() + defer self.tsmu.Unlock() + for _, block := range chain { td, err := self.processor.Process(block) if err != nil { @@ -376,6 +376,7 @@ func (self *ChainManager) InsertChain(chain types.Blocks) error { } block.Td = td + var chain, split bool self.mu.Lock() { self.write(block) @@ -383,16 +384,27 @@ func (self *ChainManager) InsertChain(chain types.Blocks) error { if td.Cmp(self.td) > 0 { if block.Header().Number.Cmp(new(big.Int).Add(cblock.Header().Number, ethutil.Big1)) < 0 { chainlogger.Infof("Split detected. New head #%v (%x) TD=%v, was #%v (%x) TD=%v\n", block.Header().Number, block.Hash()[:4], td, cblock.Header().Number, cblock.Hash()[:4], self.td) + split = true } self.setTotalDifficulty(td) self.insert(block) - self.setTransState(state.New(cblock.Root(), self.db)) - self.eventMux.Post(ChainEvent{block, td}) + chain = true } } self.mu.Unlock() + + if chain { + fmt.Println("POST START") + self.eventMux.Post(ChainEvent{block, td}) + fmt.Println("POST END") + } + + if split { + self.setTransState(state.New(block.Root(), self.db)) + self.eventMux.Post(ChainSplitEvent{block}) + } } return nil diff --git a/core/error.go b/core/error.go index 6af48ac2d..e86bacb2d 100644 --- a/core/error.go +++ b/core/error.go @@ -68,23 +68,6 @@ func IsValidationErr(err error) bool { return ok } -type GasLimitErr struct { - Message string - Is, Max *big.Int -} - -func IsGasLimitErr(err error) bool { - _, ok := err.(*GasLimitErr) - - return ok -} -func (err *GasLimitErr) Error() string { - return err.Message -} -func GasLimitError(is, max *big.Int) *GasLimitErr { - return &GasLimitErr{Message: fmt.Sprintf("GasLimit error. Max %s, transaction would take it to %s", max, is), Is: is, Max: max} -} - type NonceErr struct { Message string Is, Exp uint64 diff --git a/core/events.go b/core/events.go index fe106da49..4cbbc609c 100644 --- a/core/events.go +++ b/core/events.go @@ -13,3 +13,6 @@ type NewBlockEvent struct{ Block *types.Block } // NewMinedBlockEvent is posted when a block has been imported. type NewMinedBlockEvent struct{ Block *types.Block } + +// ChainSplit is posted when a new head is detected +type ChainSplitEvent struct{ Block *types.Block } diff --git a/core/state_transition.go b/core/state_transition.go index 33dd45f02..36ffa23d9 100644 --- a/core/state_transition.go +++ b/core/state_transition.go @@ -138,8 +138,8 @@ func (self *StateTransition) preCheck() (err error) { ) // Make sure this transaction's nonce is correct - if sender.Nonce != msg.Nonce() { - return NonceError(msg.Nonce(), sender.Nonce) + if sender.Nonce() != msg.Nonce() { + return NonceError(msg.Nonce(), sender.Nonce()) } // Pre-pay gas / Buy gas of the coinbase account @@ -166,7 +166,8 @@ func (self *StateTransition) TransitionState() (ret []byte, err error) { defer self.RefundGas() // Increment the nonce for the next transaction - sender.Nonce += 1 + self.state.SetNonce(sender.Address(), sender.Nonce()+1) + //sender.Nonce += 1 // Transaction gas if err = self.UseGas(vm.GasTx); err != nil { @@ -241,7 +242,7 @@ func MakeContract(msg Message, state *state.StateDB) *state.StateObject { addr := AddressFromMessage(msg) contract := state.GetOrNewStateObject(addr) - contract.InitCode = msg.Data() + contract.SetInitCode(msg.Data()) return contract } diff --git a/core/transaction_pool.go b/core/transaction_pool.go index 7a901fcae..050cff3d8 100644 --- a/core/transaction_pool.go +++ b/core/transaction_pool.go @@ -3,6 +3,7 @@ package core import ( "errors" "fmt" + "sync" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethutil" @@ -35,6 +36,7 @@ type TxProcessor interface { // guarantee a non blocking pool we use a queue channel which can be // independently read without needing access to the actual pool. type TxPool struct { + mu sync.RWMutex // Queueing channel for reading and writing incoming // transactions to queueChan chan *types.Transaction @@ -97,7 +99,7 @@ func (self *TxPool) addTx(tx *types.Transaction) { self.txs[string(tx.Hash())] = tx } -func (self *TxPool) Add(tx *types.Transaction) error { +func (self *TxPool) add(tx *types.Transaction) error { if self.txs[string(tx.Hash())] != nil { return fmt.Errorf("Known transaction (%x)", tx.Hash()[0:4]) } @@ -128,17 +130,28 @@ func (self *TxPool) Size() int { return len(self.txs) } +func (self *TxPool) Add(tx *types.Transaction) error { + self.mu.Lock() + defer self.mu.Unlock() + return self.add(tx) +} func (self *TxPool) AddTransactions(txs []*types.Transaction) { + self.mu.Lock() + defer self.mu.Unlock() + for _, tx := range txs { - if err := self.Add(tx); err != nil { - txplogger.Infoln(err) + if err := self.add(tx); err != nil { + txplogger.Debugln(err) } else { - txplogger.Infof("tx %x\n", tx.Hash()[0:4]) + txplogger.Debugf("tx %x\n", tx.Hash()[0:4]) } } } func (self *TxPool) GetTransactions() (txs types.Transactions) { + self.mu.RLock() + defer self.mu.RUnlock() + txs = make(types.Transactions, self.Size()) i := 0 for _, tx := range self.txs { @@ -150,30 +163,32 @@ func (self *TxPool) GetTransactions() (txs types.Transactions) { } func (pool *TxPool) RemoveInvalid(query StateQuery) { + pool.mu.Lock() + var removedTxs types.Transactions for _, tx := range pool.txs { sender := query.GetAccount(tx.From()) err := pool.ValidateTransaction(tx) - fmt.Println(err, sender.Nonce, tx.Nonce()) - if err != nil || sender.Nonce >= tx.Nonce() { + if err != nil || sender.Nonce() >= tx.Nonce() { removedTxs = append(removedTxs, tx) } } + pool.mu.Unlock() pool.RemoveSet(removedTxs) } func (self *TxPool) RemoveSet(txs types.Transactions) { + self.mu.Lock() + defer self.mu.Unlock() + for _, tx := range txs { delete(self.txs, string(tx.Hash())) } } -func (pool *TxPool) Flush() []*types.Transaction { - txList := pool.GetTransactions() +func (pool *TxPool) Flush() { pool.txs = make(map[string]*types.Transaction) - - return txList } func (pool *TxPool) Start() { diff --git a/eth/backend.go b/eth/backend.go index d109ab98e..b57e9fd69 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -53,6 +53,8 @@ type Config struct { Shh bool Dial bool + MinerThreads int + KeyManager *crypto.KeyManager } @@ -153,7 +155,7 @@ func New(config *Config) (*Ethereum, error) { eth.blockProcessor = core.NewBlockProcessor(db, eth.txPool, eth.chainManager, eth.EventMux()) eth.chainManager.SetProcessor(eth.blockProcessor) eth.whisper = whisper.New() - eth.miner = miner.New(keyManager.Address(), eth) + eth.miner = miner.New(keyManager.Address(), eth, config.MinerThreads) hasBlock := eth.chainManager.HasBlock insertChain := eth.chainManager.InsertChain @@ -209,9 +211,7 @@ func (s *Ethereum) Coinbase() []byte { return nil } // TODO func (s *Ethereum) Start() error { jsonlogger.LogJson(ðlogger.LogStarting{ ClientString: s.net.Name, - Coinbase: ethutil.Bytes2Hex(s.KeyManager().Address()), ProtocolVersion: ProtocolVersion, - LogEvent: ethlogger.LogEvent{Guid: ethutil.Bytes2Hex(crypto.FromECDSAPub(&s.net.PrivateKey.PublicKey))}, }) err := s.net.Start() diff --git a/eth/protocol.go b/eth/protocol.go index fb694c877..44a1184f2 100644 --- a/eth/protocol.go +++ b/eth/protocol.go @@ -13,7 +13,7 @@ import ( ) const ( - ProtocolVersion = 52 + ProtocolVersion = 53 NetworkId = 0 ProtocolLength = uint64(8) ProtocolMaxMsgSize = 10 * 1024 * 1024 diff --git a/ethutil/common.go b/ethutil/common.go index 271c56fd5..2ef2440c7 100644 --- a/ethutil/common.go +++ b/ethutil/common.go @@ -4,6 +4,7 @@ import ( "fmt" "math/big" "runtime" + "time" ) func IsWindows() bool { @@ -86,3 +87,9 @@ var ( Big256 = big.NewInt(0xff) Big257 = big.NewInt(257) ) + +func Bench(pre string, cb func()) { + start := time.Now() + cb() + fmt.Println(pre, ": took:", time.Since(start)) +} diff --git a/event/filter/eth_filter.go b/event/filter/eth_filter.go index d298d914d..73d2cd935 100644 --- a/event/filter/eth_filter.go +++ b/event/filter/eth_filter.go @@ -3,6 +3,7 @@ package filter // TODO make use of the generic filtering system import ( + "fmt" "sync" "github.com/ethereum/go-ethereum/core" @@ -37,17 +38,18 @@ func (self *FilterManager) Stop() { func (self *FilterManager) InstallFilter(filter *core.Filter) (id int) { self.filterMu.Lock() + defer self.filterMu.Unlock() id = self.filterId self.filters[id] = filter self.filterId++ - self.filterMu.Unlock() + return id } func (self *FilterManager) UninstallFilter(id int) { self.filterMu.Lock() + defer self.filterMu.Unlock() delete(self.filters, id) - self.filterMu.Unlock() } // GetFilter retrieves a filter installed using InstallFilter. @@ -62,7 +64,7 @@ func (self *FilterManager) filterLoop() { // Subscribe to events events := self.eventMux.Subscribe( core.PendingBlockEvent{}, - core.ChainEvent{}, + //core.ChainEvent{}, state.Logs(nil)) out: @@ -73,6 +75,7 @@ out: case event := <-events.Chan(): switch event := event.(type) { case core.ChainEvent: + fmt.Println("filter start") self.filterMu.RLock() for _, filter := range self.filters { if filter.BlockCallback != nil { @@ -80,6 +83,7 @@ out: } } self.filterMu.RUnlock() + fmt.Println("filter stop") case core.PendingBlockEvent: self.filterMu.RLock() diff --git a/gocoverage.sh b/gocoverage.sh index 4245e3901..e54a5cab0 100755 --- a/gocoverage.sh +++ b/gocoverage.sh @@ -1,11 +1,16 @@ #!/bin/bash -# The script does automatic checking on a Go package and its sub-packages, including: -# 6. test coverage (http://blog.golang.org/cover) set -e -# Run test coverage on each subdirectories and merge the coverage profile. +# Add godep workspace to GOPATH. We do it manually instead of using +# 'godep go test' or 'godep restore' so godep doesn't need to be installed. +GOPATH="$PWD/Godeps/_workspace:$GOPATH" + +# Install packages before testing. Not doing this would cause +# 'go test' to recompile all package dependencies before testing each package. +go install ./... +# Run test coverage on each subdirectories and merge the coverage profile. echo "mode: count" > profile.cov # Standard go tooling behavior is to ignore dirs with leading underscors @@ -13,7 +18,7 @@ for dir in $(find . -maxdepth 10 -not -path './.git*' -not -path '*/_*' -type d) do if ls $dir/*.go &> /dev/null; then # echo $dir - if [[ $dir != "./tests/vm" ]] + if [[ $dir != "./tests/vm" && $dir != "." ]] then go test -covermode=count -coverprofile=$dir/profile.tmp $dir fi diff --git a/logger/types.go b/logger/types.go index 419382231..7ab4a2b8c 100644 --- a/logger/types.go +++ b/logger/types.go @@ -7,7 +7,6 @@ import ( type utctime8601 struct{} func (utctime8601) MarshalJSON() ([]byte, error) { - // FIX This should be re-formated for proper ISO 8601 return []byte(`"` + time.Now().UTC().Format(time.RFC3339Nano)[:26] + `Z"`), nil } @@ -16,14 +15,13 @@ type JsonLog interface { } type LogEvent struct { - Guid string `json:"guid"` - Ts utctime8601 `json:"ts"` + // Guid string `json:"guid"` + Ts utctime8601 `json:"ts"` // Level string `json:"level"` } type LogStarting struct { - ClientString string `json:"version_string"` - Coinbase string `json:"coinbase"` + ClientString string `json:"client_impl"` ProtocolVersion int `json:"eth_version"` LogEvent } @@ -32,17 +30,6 @@ func (l *LogStarting) EventName() string { return "starting" } -type P2PConnecting struct { - RemoteId string `json:"remote_id"` - RemoteEndpoint string `json:"remote_endpoint"` - NumConnections int `json:"num_connections"` - LogEvent -} - -func (l *P2PConnecting) EventName() string { - return "p2p.connecting" -} - type P2PConnected struct { RemoteId string `json:"remote_id"` RemoteAddress string `json:"remote_addr"` @@ -55,17 +42,6 @@ func (l *P2PConnected) EventName() string { return "p2p.connected" } -type P2PHandshaked struct { - RemoteCapabilities []string `json:"remote_capabilities"` - RemoteId string `json:"remote_id"` - NumConnections int `json:"num_connections"` - LogEvent -} - -func (l *P2PHandshaked) EventName() string { - return "p2p.handshaked" -} - type P2PDisconnected struct { NumConnections int `json:"num_connections"` RemoteId string `json:"remote_id"` @@ -76,247 +52,46 @@ func (l *P2PDisconnected) EventName() string { return "p2p.disconnected" } -type P2PDisconnecting struct { - Reason string `json:"reason"` - RemoteId string `json:"remote_id"` - NumConnections int `json:"num_connections"` - LogEvent -} - -func (l *P2PDisconnecting) EventName() string { - return "p2p.disconnecting" -} - -type P2PDisconnectingBadHandshake struct { - Reason string `json:"reason"` - RemoteId string `json:"remote_id"` - NumConnections int `json:"num_connections"` - LogEvent -} - -func (l *P2PDisconnectingBadHandshake) EventName() string { - return "p2p.disconnecting.bad_handshake" -} - -type P2PDisconnectingBadProtocol struct { - Reason string `json:"reason"` - RemoteId string `json:"remote_id"` - NumConnections int `json:"num_connections"` - LogEvent -} - -func (l *P2PDisconnectingBadProtocol) EventName() string { - return "p2p.disconnecting.bad_protocol" -} - -type P2PDisconnectingReputation struct { - Reason string `json:"reason"` - RemoteId string `json:"remote_id"` - NumConnections int `json:"num_connections"` - LogEvent -} - -func (l *P2PDisconnectingReputation) EventName() string { - return "p2p.disconnecting.reputation" -} - -type P2PDisconnectingDHT struct { - Reason string `json:"reason"` - RemoteId string `json:"remote_id"` - NumConnections int `json:"num_connections"` - LogEvent -} - -func (l *P2PDisconnectingDHT) EventName() string { - return "p2p.disconnecting.dht" -} - -type P2PEthDisconnectingBadBlock struct { - Reason string `json:"reason"` - RemoteId string `json:"remote_id"` - NumConnections int `json:"num_connections"` - LogEvent -} - -func (l *P2PEthDisconnectingBadBlock) EventName() string { - return "p2p.eth.disconnecting.bad_block" -} - -type P2PEthDisconnectingBadTx struct { - Reason string `json:"reason"` - RemoteId string `json:"remote_id"` - NumConnections int `json:"num_connections"` - LogEvent -} - -func (l *P2PEthDisconnectingBadTx) EventName() string { - return "p2p.eth.disconnecting.bad_tx" -} - -type EthNewBlockMined struct { - BlockNumber int `json:"block_number"` - HeadHash string `json:"head_hash"` - BlockHash string `json:"block_hash"` - BlockHexRlp string `json:"block_hexrlp"` - BlockDifficulty int `json:"block_difficulty"` - BlockPrevHash string `json:"block_prev_hash"` - LogEvent -} - -func (l *EthNewBlockMined) EventName() string { - return "eth.newblock.mined" -} - -type EthNewBlockBroadcasted struct { - BlockNumber int `json:"block_number"` - HeadHash string `json:"head_hash"` - BlockHash string `json:"block_hash"` - BlockDifficulty int `json:"block_difficulty"` - BlockPrevHash string `json:"block_prev_hash"` - LogEvent -} - -func (l *EthNewBlockBroadcasted) EventName() string { - return "eth.newblock.broadcasted" -} - -type EthNewBlockReceived struct { - BlockNumber int `json:"block_number"` - HeadHash string `json:"head_hash"` - BlockHash string `json:"block_hash"` - BlockDifficulty int `json:"block_difficulty"` - BlockPrevHash string `json:"block_prev_hash"` - LogEvent -} - -func (l *EthNewBlockReceived) EventName() string { - return "eth.newblock.received" -} - -type EthNewBlockIsKnown struct { - BlockNumber int `json:"block_number"` - HeadHash string `json:"head_hash"` - BlockHash string `json:"block_hash"` - BlockDifficulty int `json:"block_difficulty"` - BlockPrevHash string `json:"block_prev_hash"` - LogEvent -} - -func (l *EthNewBlockIsKnown) EventName() string { - return "eth.newblock.is_known" -} - -type EthNewBlockIsNew struct { - BlockNumber int `json:"block_number"` - HeadHash string `json:"head_hash"` - BlockHash string `json:"block_hash"` - BlockDifficulty int `json:"block_difficulty"` - BlockPrevHash string `json:"block_prev_hash"` - LogEvent -} - -func (l *EthNewBlockIsNew) EventName() string { - return "eth.newblock.is_new" -} - -type EthNewBlockMissingParent struct { - BlockNumber int `json:"block_number"` - HeadHash string `json:"head_hash"` - BlockHash string `json:"block_hash"` - BlockDifficulty int `json:"block_difficulty"` - BlockPrevHash string `json:"block_prev_hash"` - LogEvent -} - -func (l *EthNewBlockMissingParent) EventName() string { - return "eth.newblock.missing_parent" -} - -type EthNewBlockIsInvalid struct { - BlockNumber int `json:"block_number"` - HeadHash string `json:"head_hash"` - BlockHash string `json:"block_hash"` - BlockDifficulty int `json:"block_difficulty"` - BlockPrevHash string `json:"block_prev_hash"` - LogEvent -} - -func (l *EthNewBlockIsInvalid) EventName() string { - return "eth.newblock.is_invalid" -} - -type EthNewBlockChainIsOlder struct { - BlockNumber int `json:"block_number"` - HeadHash string `json:"head_hash"` - BlockHash string `json:"block_hash"` - BlockDifficulty int `json:"block_difficulty"` - BlockPrevHash string `json:"block_prev_hash"` - LogEvent -} - -func (l *EthNewBlockChainIsOlder) EventName() string { - return "eth.newblock.chain.is_older" -} - -type EthNewBlockChainIsCanonical struct { - BlockNumber int `json:"block_number"` - HeadHash string `json:"head_hash"` - BlockHash string `json:"block_hash"` - BlockDifficulty int `json:"block_difficulty"` - BlockPrevHash string `json:"block_prev_hash"` - LogEvent -} - -func (l *EthNewBlockChainIsCanonical) EventName() string { - return "eth.newblock.chain.is_cannonical" -} - -type EthNewBlockChainNotCanonical struct { - BlockNumber int `json:"block_number"` - HeadHash string `json:"head_hash"` - BlockHash string `json:"block_hash"` - BlockDifficulty int `json:"block_difficulty"` - BlockPrevHash string `json:"block_prev_hash"` +type EthMinerNewBlock struct { + BlockHash string `json:"block_hash"` + BlockNumber int `json:"block_number"` + ChainHeadHash string `json:"chain_head_hash"` + BlockPrevHash string `json:"block_prev_hash"` LogEvent } -func (l *EthNewBlockChainNotCanonical) EventName() string { - return "eth.newblock.chain.not_cannonical" +func (l *EthMinerNewBlock) EventName() string { + return "eth.miner.new_block" } -type EthNewBlockChainSwitched struct { - BlockNumber int `json:"block_number"` - HeadHash string `json:"head_hash"` - OldHeadHash string `json:"old_head_hash"` - BlockHash string `json:"block_hash"` - BlockDifficulty int `json:"block_difficulty"` - BlockPrevHash string `json:"block_prev_hash"` +type EthChainReceivedNewBlock struct { + BlockHash string `json:"block_hash"` + BlockNumber int `json:"block_number"` + ChainHeadHash string `json:"chain_head_hash"` + BlockPrevHash string `json:"block_prev_hash"` + RemoteId int `json:"remote_id"` LogEvent } -func (l *EthNewBlockChainSwitched) EventName() string { - return "eth.newblock.chain.switched" +func (l *EthChainReceivedNewBlock) EventName() string { + return "eth.chain.received.new_block" } -type EthTxCreated struct { - TxHash string `json:"tx_hash"` - TxSender string `json:"tx_sender"` - TxAddress string `json:"tx_address"` - TxHexRLP string `json:"tx_hexrlp"` - TxNonce int `json:"tx_nonce"` +type EthChainNewHead struct { + BlockHash string `json:"block_hash"` + BlockNumber int `json:"block_number"` + ChainHeadHash string `json:"chain_head_hash"` + BlockPrevHash string `json:"block_prev_hash"` LogEvent } -func (l *EthTxCreated) EventName() string { - return "eth.tx.created" +func (l *EthChainNewHead) EventName() string { + return "eth.chain.new_head" } type EthTxReceived struct { - TxHash string `json:"tx_hash"` - TxAddress string `json:"tx_address"` - TxHexRLP string `json:"tx_hexrlp"` - RemoteId string `json:"remote_id"` - TxNonce int `json:"tx_nonce"` + TxHash string `json:"tx_hash"` + RemoteId string `json:"remote_id"` LogEvent } @@ -324,39 +99,261 @@ func (l *EthTxReceived) EventName() string { return "eth.tx.received" } -type EthTxBroadcasted struct { - TxHash string `json:"tx_hash"` - TxSender string `json:"tx_sender"` - TxAddress string `json:"tx_address"` - TxNonce int `json:"tx_nonce"` - LogEvent -} - -func (l *EthTxBroadcasted) EventName() string { - return "eth.tx.broadcasted" -} - -type EthTxValidated struct { - TxHash string `json:"tx_hash"` - TxSender string `json:"tx_sender"` - TxAddress string `json:"tx_address"` - TxNonce int `json:"tx_nonce"` - LogEvent -} - -func (l *EthTxValidated) EventName() string { - return "eth.tx.validated" -} - -type EthTxIsInvalid struct { - TxHash string `json:"tx_hash"` - TxSender string `json:"tx_sender"` - TxAddress string `json:"tx_address"` - Reason string `json:"reason"` - TxNonce int `json:"tx_nonce"` - LogEvent -} - -func (l *EthTxIsInvalid) EventName() string { - return "eth.tx.is_invalid" -} +// +// +// The types below are legacy and need to be converted to new format or deleted +// +// + +// type P2PConnecting struct { +// RemoteId string `json:"remote_id"` +// RemoteEndpoint string `json:"remote_endpoint"` +// NumConnections int `json:"num_connections"` +// LogEvent +// } + +// func (l *P2PConnecting) EventName() string { +// return "p2p.connecting" +// } + +// type P2PHandshaked struct { +// RemoteCapabilities []string `json:"remote_capabilities"` +// RemoteId string `json:"remote_id"` +// NumConnections int `json:"num_connections"` +// LogEvent +// } + +// func (l *P2PHandshaked) EventName() string { +// return "p2p.handshaked" +// } + +// type P2PDisconnecting struct { +// Reason string `json:"reason"` +// RemoteId string `json:"remote_id"` +// NumConnections int `json:"num_connections"` +// LogEvent +// } + +// func (l *P2PDisconnecting) EventName() string { +// return "p2p.disconnecting" +// } + +// type P2PDisconnectingBadHandshake struct { +// Reason string `json:"reason"` +// RemoteId string `json:"remote_id"` +// NumConnections int `json:"num_connections"` +// LogEvent +// } + +// func (l *P2PDisconnectingBadHandshake) EventName() string { +// return "p2p.disconnecting.bad_handshake" +// } + +// type P2PDisconnectingBadProtocol struct { +// Reason string `json:"reason"` +// RemoteId string `json:"remote_id"` +// NumConnections int `json:"num_connections"` +// LogEvent +// } + +// func (l *P2PDisconnectingBadProtocol) EventName() string { +// return "p2p.disconnecting.bad_protocol" +// } + +// type P2PDisconnectingReputation struct { +// Reason string `json:"reason"` +// RemoteId string `json:"remote_id"` +// NumConnections int `json:"num_connections"` +// LogEvent +// } + +// func (l *P2PDisconnectingReputation) EventName() string { +// return "p2p.disconnecting.reputation" +// } + +// type P2PDisconnectingDHT struct { +// Reason string `json:"reason"` +// RemoteId string `json:"remote_id"` +// NumConnections int `json:"num_connections"` +// LogEvent +// } + +// func (l *P2PDisconnectingDHT) EventName() string { +// return "p2p.disconnecting.dht" +// } + +// type P2PEthDisconnectingBadBlock struct { +// Reason string `json:"reason"` +// RemoteId string `json:"remote_id"` +// NumConnections int `json:"num_connections"` +// LogEvent +// } + +// func (l *P2PEthDisconnectingBadBlock) EventName() string { +// return "p2p.eth.disconnecting.bad_block" +// } + +// type P2PEthDisconnectingBadTx struct { +// Reason string `json:"reason"` +// RemoteId string `json:"remote_id"` +// NumConnections int `json:"num_connections"` +// LogEvent +// } + +// func (l *P2PEthDisconnectingBadTx) EventName() string { +// return "p2p.eth.disconnecting.bad_tx" +// } + +// type EthNewBlockBroadcasted struct { +// BlockNumber int `json:"block_number"` +// HeadHash string `json:"head_hash"` +// BlockHash string `json:"block_hash"` +// BlockDifficulty int `json:"block_difficulty"` +// BlockPrevHash string `json:"block_prev_hash"` +// LogEvent +// } + +// func (l *EthNewBlockBroadcasted) EventName() string { +// return "eth.newblock.broadcasted" +// } + +// type EthNewBlockIsKnown struct { +// BlockNumber int `json:"block_number"` +// HeadHash string `json:"head_hash"` +// BlockHash string `json:"block_hash"` +// BlockDifficulty int `json:"block_difficulty"` +// BlockPrevHash string `json:"block_prev_hash"` +// LogEvent +// } + +// func (l *EthNewBlockIsKnown) EventName() string { +// return "eth.newblock.is_known" +// } + +// type EthNewBlockIsNew struct { +// BlockNumber int `json:"block_number"` +// HeadHash string `json:"head_hash"` +// BlockHash string `json:"block_hash"` +// BlockDifficulty int `json:"block_difficulty"` +// BlockPrevHash string `json:"block_prev_hash"` +// LogEvent +// } + +// func (l *EthNewBlockIsNew) EventName() string { +// return "eth.newblock.is_new" +// } + +// type EthNewBlockMissingParent struct { +// BlockNumber int `json:"block_number"` +// HeadHash string `json:"head_hash"` +// BlockHash string `json:"block_hash"` +// BlockDifficulty int `json:"block_difficulty"` +// BlockPrevHash string `json:"block_prev_hash"` +// LogEvent +// } + +// func (l *EthNewBlockMissingParent) EventName() string { +// return "eth.newblock.missing_parent" +// } + +// type EthNewBlockIsInvalid struct { +// BlockNumber int `json:"block_number"` +// HeadHash string `json:"head_hash"` +// BlockHash string `json:"block_hash"` +// BlockDifficulty int `json:"block_difficulty"` +// BlockPrevHash string `json:"block_prev_hash"` +// LogEvent +// } + +// func (l *EthNewBlockIsInvalid) EventName() string { +// return "eth.newblock.is_invalid" +// } + +// type EthNewBlockChainIsOlder struct { +// BlockNumber int `json:"block_number"` +// HeadHash string `json:"head_hash"` +// BlockHash string `json:"block_hash"` +// BlockDifficulty int `json:"block_difficulty"` +// BlockPrevHash string `json:"block_prev_hash"` +// LogEvent +// } + +// func (l *EthNewBlockChainIsOlder) EventName() string { +// return "eth.newblock.chain.is_older" +// } + +// type EthNewBlockChainIsCanonical struct { +// BlockNumber int `json:"block_number"` +// HeadHash string `json:"head_hash"` +// BlockHash string `json:"block_hash"` +// BlockDifficulty int `json:"block_difficulty"` +// BlockPrevHash string `json:"block_prev_hash"` +// LogEvent +// } + +// func (l *EthNewBlockChainIsCanonical) EventName() string { +// return "eth.newblock.chain.is_cannonical" +// } + +// type EthNewBlockChainNotCanonical struct { +// BlockNumber int `json:"block_number"` +// HeadHash string `json:"head_hash"` +// BlockHash string `json:"block_hash"` +// BlockDifficulty int `json:"block_difficulty"` +// BlockPrevHash string `json:"block_prev_hash"` +// LogEvent +// } + +// func (l *EthNewBlockChainNotCanonical) EventName() string { +// return "eth.newblock.chain.not_cannonical" +// } + +// type EthTxCreated struct { +// TxHash string `json:"tx_hash"` +// TxSender string `json:"tx_sender"` +// TxAddress string `json:"tx_address"` +// TxHexRLP string `json:"tx_hexrlp"` +// TxNonce int `json:"tx_nonce"` +// LogEvent +// } + +// func (l *EthTxCreated) EventName() string { +// return "eth.tx.created" +// } + +// type EthTxBroadcasted struct { +// TxHash string `json:"tx_hash"` +// TxSender string `json:"tx_sender"` +// TxAddress string `json:"tx_address"` +// TxNonce int `json:"tx_nonce"` +// LogEvent +// } + +// func (l *EthTxBroadcasted) EventName() string { +// return "eth.tx.broadcasted" +// } + +// type EthTxValidated struct { +// TxHash string `json:"tx_hash"` +// TxSender string `json:"tx_sender"` +// TxAddress string `json:"tx_address"` +// TxNonce int `json:"tx_nonce"` +// LogEvent +// } + +// func (l *EthTxValidated) EventName() string { +// return "eth.tx.validated" +// } + +// type EthTxIsInvalid struct { +// TxHash string `json:"tx_hash"` +// TxSender string `json:"tx_sender"` +// TxAddress string `json:"tx_address"` +// Reason string `json:"reason"` +// TxNonce int `json:"tx_nonce"` +// LogEvent +// } + +// func (l *EthTxIsInvalid) EventName() string { +// return "eth.tx.is_invalid" +// } diff --git a/miner/miner.go b/miner/miner.go index 27afcf684..0cc2361c8 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -20,13 +20,13 @@ type Miner struct { mining bool } -func New(coinbase []byte, eth core.Backend) *Miner { +func New(coinbase []byte, eth core.Backend, minerThreads int) *Miner { miner := &Miner{ Coinbase: coinbase, worker: newWorker(coinbase, eth), } - for i := 0; i < 4; i++ { + for i := 0; i < minerThreads; i++ { miner.worker.register(NewCpuMiner(i, ezp.New())) } diff --git a/miner/worker.go b/miner/worker.go index 47b462e53..aea5cc535 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -109,14 +109,20 @@ func (self *worker) register(agent Agent) { } func (self *worker) update() { - events := self.mux.Subscribe(core.ChainEvent{}, core.TxPreEvent{}) + events := self.mux.Subscribe(core.ChainEvent{}, core.NewMinedBlockEvent{}) out: for { select { case event := <-events.Chan(): - switch event.(type) { - case core.ChainEvent, core.TxPreEvent: + switch ev := event.(type) { + case core.ChainEvent: + println("miner start") + if self.current.block != ev.Block { + self.commitNewWork() + } + println("miner end") + case core.NewMinedBlockEvent: self.commitNewWork() } case <-self.quit: @@ -172,17 +178,19 @@ func (self *worker) commitNewWork() { transactions := self.eth.TxPool().GetTransactions() sort.Sort(types.TxByNonce{transactions}) + minerlogger.Infof("committing new work with %d txs\n", len(transactions)) // Keep track of transactions which return errors so they can be removed var remove types.Transactions +gasLimit: for _, tx := range transactions { err := self.commitTransaction(tx) switch { case core.IsNonceErr(err): // Remove invalid transactions remove = append(remove, tx) - case core.IsGasLimitErr(err): + case state.IsGasLimitErr(err): // Break on gas limit - break + break gasLimit } if err != nil { @@ -227,11 +235,9 @@ func (self *worker) commitUncle(uncle *types.Header) error { } func (self *worker) commitTransaction(tx *types.Transaction) error { - snapshot := self.current.state.Copy() + //fmt.Printf("proc %x %v\n", tx.Hash()[:3], tx.Nonce()) receipt, _, err := self.proc.ApplyTransaction(self.current.coinbase, self.current.state, self.current.block, tx, self.current.totalUsedGas, true) - if err != nil && (core.IsNonceErr(err) || core.IsGasLimitErr(err)) { - self.current.state.Set(snapshot) - + if err != nil && (core.IsNonceErr(err) || state.IsGasLimitErr(err)) { return err } diff --git a/p2p/crypto.go b/p2p/handshake.go index 7e4b43712..614711eaf 100644 --- a/p2p/crypto.go +++ b/p2p/handshake.go @@ -1,21 +1,20 @@ package p2p import ( - // "binary" "crypto/ecdsa" "crypto/rand" + "errors" "fmt" "io" + "net" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto/ecies" "github.com/ethereum/go-ethereum/crypto/secp256k1" - ethlogger "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/p2p/discover" + "github.com/ethereum/go-ethereum/rlp" ) -var clogger = ethlogger.NewLogger("CRYPTOID") - const ( sskLen = 16 // ecies.MaxSharedKeyLength(pubKey) / 2 sigLen = 65 // elliptic S256 @@ -30,26 +29,76 @@ const ( rHSLen = authRespLen + eciesBytes // size of the final ECIES payload sent as receiver's handshake ) -type hexkey []byte +type conn struct { + *frameRW + *protoHandshake +} -func (self hexkey) String() string { - return fmt.Sprintf("(%d) %x", len(self), []byte(self)) +func newConn(fd net.Conn, hs *protoHandshake) *conn { + return &conn{newFrameRW(fd, msgWriteTimeout), hs} } -func encHandshake(conn io.ReadWriter, prv *ecdsa.PrivateKey, dial *discover.Node) ( - remoteID discover.NodeID, - sessionToken []byte, - err error, -) { +// encHandshake represents information about the remote end +// of a connection that is negotiated during the encryption handshake. +type encHandshake struct { + ID discover.NodeID + IngressMAC []byte + EgressMAC []byte + Token []byte +} + +// protoHandshake is the RLP structure of the protocol handshake. +type protoHandshake struct { + Version uint64 + Name string + Caps []Cap + ListenPort uint64 + ID discover.NodeID +} + +// setupConn starts a protocol session on the given connection. +// It runs the encryption handshake and the protocol handshake. +// If dial is non-nil, the connection the local node is the initiator. +func setupConn(fd net.Conn, prv *ecdsa.PrivateKey, our *protoHandshake, dial *discover.Node) (*conn, error) { if dial == nil { - var remotePubkey []byte - sessionToken, remotePubkey, err = inboundEncHandshake(conn, prv, nil) - copy(remoteID[:], remotePubkey) + return setupInboundConn(fd, prv, our) } else { - remoteID = dial.ID - sessionToken, err = outboundEncHandshake(conn, prv, remoteID[:], nil) + return setupOutboundConn(fd, prv, our, dial) + } +} + +func setupInboundConn(fd net.Conn, prv *ecdsa.PrivateKey, our *protoHandshake) (*conn, error) { + // var remotePubkey []byte + // sessionToken, remotePubkey, err = inboundEncHandshake(fd, prv, nil) + // copy(remoteID[:], remotePubkey) + + rw := newFrameRW(fd, msgWriteTimeout) + rhs, err := readProtocolHandshake(rw, our) + if err != nil { + return nil, err + } + if err := writeProtocolHandshake(rw, our); err != nil { + return nil, fmt.Errorf("protocol write error: %v", err) + } + return &conn{rw, rhs}, nil +} + +func setupOutboundConn(fd net.Conn, prv *ecdsa.PrivateKey, our *protoHandshake, dial *discover.Node) (*conn, error) { + // remoteID = dial.ID + // sessionToken, err = outboundEncHandshake(fd, prv, remoteID[:], nil) + + rw := newFrameRW(fd, msgWriteTimeout) + if err := writeProtocolHandshake(rw, our); err != nil { + return nil, fmt.Errorf("protocol write error: %v", err) } - return remoteID, sessionToken, err + rhs, err := readProtocolHandshake(rw, our) + if err != nil { + return nil, fmt.Errorf("protocol handshake read error: %v", err) + } + if rhs.ID != dial.ID { + return nil, errors.New("dialed node id mismatch") + } + return &conn{rw, rhs}, nil } // outboundEncHandshake negotiates a session token on conn. @@ -66,18 +115,9 @@ func outboundEncHandshake(conn io.ReadWriter, prvKey *ecdsa.PrivateKey, remotePu if err != nil { return nil, err } - if sessionToken != nil { - clogger.Debugf("session-token: %v", hexkey(sessionToken)) - } - - clogger.Debugf("initiator-nonce: %v", hexkey(initNonce)) - clogger.Debugf("initiator-random-private-key: %v", hexkey(crypto.FromECDSA(randomPrivKey))) - randomPublicKeyS, _ := exportPublicKey(&randomPrivKey.PublicKey) - clogger.Debugf("initiator-random-public-key: %v", hexkey(randomPublicKeyS)) if _, err = conn.Write(auth); err != nil { return nil, err } - clogger.Debugf("initiator handshake: %v", hexkey(auth)) response := make([]byte, rHSLen) if _, err = io.ReadFull(conn, response); err != nil { @@ -88,9 +128,6 @@ func outboundEncHandshake(conn io.ReadWriter, prvKey *ecdsa.PrivateKey, remotePu return nil, err } - clogger.Debugf("receiver-nonce: %v", hexkey(recNonce)) - remoteRandomPubKeyS, _ := exportPublicKey(remoteRandomPubKey) - clogger.Debugf("receiver-random-public-key: %v", hexkey(remoteRandomPubKeyS)) return newSession(initNonce, recNonce, randomPrivKey, remoteRandomPubKey) } @@ -221,12 +258,9 @@ func inboundEncHandshake(conn io.ReadWriter, prvKey *ecdsa.PrivateKey, sessionTo if err != nil { return nil, nil, err } - clogger.Debugf("receiver-nonce: %v", hexkey(recNonce)) - clogger.Debugf("receiver-random-priv-key: %v", hexkey(crypto.FromECDSA(randomPrivKey))) if _, err = conn.Write(response); err != nil { return nil, nil, err } - clogger.Debugf("receiver handshake:\n%v", hexkey(response)) token, err = newSession(initNonce, recNonce, randomPrivKey, remoteRandomPubKey) return token, remotePubKey, err } @@ -361,3 +395,40 @@ func xor(one, other []byte) (xor []byte) { } return xor } + +func writeProtocolHandshake(w MsgWriter, our *protoHandshake) error { + return EncodeMsg(w, handshakeMsg, our.Version, our.Name, our.Caps, our.ListenPort, our.ID[:]) +} + +func readProtocolHandshake(r MsgReader, our *protoHandshake) (*protoHandshake, error) { + // read and handle remote handshake + msg, err := r.ReadMsg() + if err != nil { + return nil, err + } + if msg.Code == discMsg { + // disconnect before protocol handshake is valid according to the + // spec and we send it ourself if Server.addPeer fails. + var reason DiscReason + rlp.Decode(msg.Payload, &reason) + return nil, discRequestedError(reason) + } + if msg.Code != handshakeMsg { + return nil, fmt.Errorf("expected handshake, got %x", msg.Code) + } + if msg.Size > baseProtocolMaxMsgSize { + return nil, fmt.Errorf("message too big (%d > %d)", msg.Size, baseProtocolMaxMsgSize) + } + var hs protoHandshake + if err := msg.Decode(&hs); err != nil { + return nil, err + } + // validate handshake info + if hs.Version != our.Version { + return nil, newPeerError(errP2PVersionMismatch, "required version %d, received %d\n", baseProtocolVersion, hs.Version) + } + if (hs.ID == discover.NodeID{}) { + return nil, newPeerError(errPubkeyInvalid, "missing") + } + return &hs, nil +} diff --git a/p2p/crypto_test.go b/p2p/handshake_test.go index 6cf0f4818..06c6a6932 100644 --- a/p2p/crypto_test.go +++ b/p2p/handshake_test.go @@ -5,10 +5,12 @@ import ( "crypto/ecdsa" "crypto/rand" "net" + "reflect" "testing" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto/ecies" + "github.com/ethereum/go-ethereum/p2p/discover" ) func TestPublicKeyEncoding(t *testing.T) { @@ -91,14 +93,14 @@ func testCryptoHandshake(prv0, prv1 *ecdsa.PrivateKey, sessionToken []byte, t *t if err != nil { t.Errorf("%v", err) } - t.Logf("-> %v", hexkey(auth)) + // t.Logf("-> %v", hexkey(auth)) // receiver reads auth and responds with response response, remoteRecNonce, remoteInitNonce, _, remoteRandomPrivKey, remoteInitRandomPubKey, err := authResp(auth, sessionToken, prv1) if err != nil { t.Errorf("%v", err) } - t.Logf("<- %v\n", hexkey(response)) + // t.Logf("<- %v\n", hexkey(response)) // initiator reads receiver's response and the key exchange completes recNonce, remoteRandomPubKey, _, err := completeHandshake(response, prv0) @@ -132,7 +134,7 @@ func testCryptoHandshake(prv0, prv1 *ecdsa.PrivateKey, sessionToken []byte, t *t } } -func TestHandshake(t *testing.T) { +func TestEncHandshake(t *testing.T) { defer testlog(t).detach() prv0, _ := crypto.GenerateKey() @@ -165,3 +167,58 @@ func TestHandshake(t *testing.T) { t.Error("session token mismatch") } } + +func TestSetupConn(t *testing.T) { + prv0, _ := crypto.GenerateKey() + prv1, _ := crypto.GenerateKey() + node0 := &discover.Node{ + ID: discover.PubkeyID(&prv0.PublicKey), + IP: net.IP{1, 2, 3, 4}, + TCPPort: 33, + } + node1 := &discover.Node{ + ID: discover.PubkeyID(&prv1.PublicKey), + IP: net.IP{5, 6, 7, 8}, + TCPPort: 44, + } + hs0 := &protoHandshake{ + Version: baseProtocolVersion, + ID: node0.ID, + Caps: []Cap{{"a", 0}, {"b", 2}}, + } + hs1 := &protoHandshake{ + Version: baseProtocolVersion, + ID: node1.ID, + Caps: []Cap{{"c", 1}, {"d", 3}}, + } + fd0, fd1 := net.Pipe() + + done := make(chan struct{}) + go func() { + defer close(done) + conn0, err := setupConn(fd0, prv0, hs0, node1) + if err != nil { + t.Errorf("outbound side error: %v", err) + return + } + if conn0.ID != node1.ID { + t.Errorf("outbound conn id mismatch: got %v, want %v", conn0.ID, node1.ID) + } + if !reflect.DeepEqual(conn0.Caps, hs1.Caps) { + t.Errorf("outbound caps mismatch: got %v, want %v", conn0.Caps, hs1.Caps) + } + }() + + conn1, err := setupConn(fd1, prv1, hs1, nil) + if err != nil { + t.Fatalf("inbound side error: %v", err) + } + if conn1.ID != node0.ID { + t.Errorf("inbound conn id mismatch: got %v, want %v", conn1.ID, node0.ID) + } + if !reflect.DeepEqual(conn1.Caps, hs0.Caps) { + t.Errorf("inbound caps mismatch: got %v, want %v", conn1.Caps, hs0.Caps) + } + + <-done +} diff --git a/p2p/message.go b/p2p/message.go index 07916f7b3..7adad4b09 100644 --- a/p2p/message.go +++ b/p2p/message.go @@ -197,7 +197,7 @@ func (rw *frameRW) ReadMsg() (msg Msg, err error) { return msg, err } if !bytes.HasPrefix(start, magicToken) { - return msg, fmt.Errorf("bad magic token %x", start[:4], magicToken) + return msg, fmt.Errorf("bad magic token %x", start[:4]) } size := binary.BigEndian.Uint32(start[4:]) diff --git a/p2p/peer.go b/p2p/peer.go index fd5bec7d5..fb027c834 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -21,6 +21,7 @@ const ( baseProtocolMaxMsgSize = 10 * 1024 * 1024 disconnectGracePeriod = 2 * time.Second + pingInterval = 15 * time.Second ) const ( @@ -33,37 +34,14 @@ const ( peersMsg = 0x05 ) -// handshake is the RLP structure of the protocol handshake. -type handshake struct { - Version uint64 - Name string - Caps []Cap - ListenPort uint64 - NodeID discover.NodeID -} - // Peer represents a connected remote node. type Peer struct { // Peers have all the log methods. // Use them to display messages related to the peer. *logger.Logger - infoMu sync.Mutex - name string - caps []Cap - - ourID, remoteID *discover.NodeID - ourName string - - rw *frameRW - - // These fields maintain the running protocols. - protocols []Protocol - runlock sync.RWMutex // protects running - running map[string]*proto - - // disables protocol handshake, for testing - noHandshake bool + rw *conn + running map[string]*protoRW protoWG sync.WaitGroup protoErr chan error @@ -73,36 +51,27 @@ type Peer struct { // NewPeer returns a peer for testing purposes. func NewPeer(id discover.NodeID, name string, caps []Cap) *Peer { - conn, _ := net.Pipe() - peer := newPeer(conn, nil, "", nil, &id) - peer.setHandshakeInfo(name, caps) + pipe, _ := net.Pipe() + conn := newConn(pipe, &protoHandshake{ID: id, Name: name, Caps: caps}) + peer := newPeer(conn, nil) close(peer.closed) // ensures Disconnect doesn't block return peer } // ID returns the node's public key. func (p *Peer) ID() discover.NodeID { - return *p.remoteID + return p.rw.ID } // Name returns the node name that the remote node advertised. func (p *Peer) Name() string { - // this needs a lock because the information is part of the - // protocol handshake. - p.infoMu.Lock() - name := p.name - p.infoMu.Unlock() - return name + return p.rw.Name } // Caps returns the capabilities (supported subprotocols) of the remote peer. func (p *Peer) Caps() []Cap { - // this needs a lock because the information is part of the - // protocol handshake. - p.infoMu.Lock() - caps := p.caps - p.infoMu.Unlock() - return caps + // TODO: maybe return copy + return p.rw.Caps } // RemoteAddr returns the remote address of the network connection. @@ -126,30 +95,20 @@ func (p *Peer) Disconnect(reason DiscReason) { // String implements fmt.Stringer. func (p *Peer) String() string { - return fmt.Sprintf("Peer %.8x %v", p.remoteID[:], p.RemoteAddr()) + return fmt.Sprintf("Peer %.8x %v", p.rw.ID[:], p.RemoteAddr()) } -func newPeer(conn net.Conn, protocols []Protocol, ourName string, ourID, remoteID *discover.NodeID) *Peer { - logtag := fmt.Sprintf("Peer %.8x %v", remoteID[:], conn.RemoteAddr()) - return &Peer{ - Logger: logger.NewLogger(logtag), - rw: newFrameRW(conn, msgWriteTimeout), - ourID: ourID, - ourName: ourName, - remoteID: remoteID, - protocols: protocols, - running: make(map[string]*proto), - disc: make(chan DiscReason), - protoErr: make(chan error), - closed: make(chan struct{}), +func newPeer(conn *conn, protocols []Protocol) *Peer { + logtag := fmt.Sprintf("Peer %.8x %v", conn.ID[:], conn.RemoteAddr()) + p := &Peer{ + Logger: logger.NewLogger(logtag), + rw: conn, + running: matchProtocols(protocols, conn.Caps, conn), + disc: make(chan DiscReason), + protoErr: make(chan error), + closed: make(chan struct{}), } -} - -func (p *Peer) setHandshakeInfo(name string, caps []Cap) { - p.infoMu.Lock() - p.name = name - p.caps = caps - p.infoMu.Unlock() + return p } func (p *Peer) run() DiscReason { @@ -157,29 +116,36 @@ func (p *Peer) run() DiscReason { defer p.closeProtocols() defer close(p.closed) + p.startProtocols() go func() { readErr <- p.readLoop() }() - if !p.noHandshake { - if err := writeProtocolHandshake(p.rw, p.ourName, *p.ourID, p.protocols); err != nil { - p.DebugDetailf("Protocol handshake error: %v\n", err) - p.rw.Close() - return DiscProtocolError - } - } + ping := time.NewTicker(pingInterval) + defer ping.Stop() // Wait for an error or disconnect. var reason DiscReason - select { - case err := <-readErr: - // We rely on protocols to abort if there is a write error. It - // might be more robust to handle them here as well. - p.DebugDetailf("Read error: %v\n", err) - p.rw.Close() - return DiscNetworkError - - case err := <-p.protoErr: - reason = discReasonForError(err) - case reason = <-p.disc: +loop: + for { + select { + case <-ping.C: + go func() { + if err := EncodeMsg(p.rw, pingMsg, nil); err != nil { + p.protoErr <- err + return + } + }() + case err := <-readErr: + // We rely on protocols to abort if there is a write error. It + // might be more robust to handle them here as well. + p.DebugDetailf("Read error: %v\n", err) + p.rw.Close() + return DiscNetworkError + case err := <-p.protoErr: + reason = discReasonForError(err) + break loop + case reason = <-p.disc: + break loop + } } p.politeDisconnect(reason) @@ -206,11 +172,6 @@ func (p *Peer) politeDisconnect(reason DiscReason) { } func (p *Peer) readLoop() error { - if !p.noHandshake { - if err := readProtocolHandshake(p, p.rw); err != nil { - return err - } - } for { msg, err := p.rw.ReadMsg() if err != nil { @@ -249,105 +210,51 @@ func (p *Peer) handle(msg Msg) error { return nil } -func readProtocolHandshake(p *Peer, rw MsgReadWriter) error { - // read and handle remote handshake - msg, err := rw.ReadMsg() - if err != nil { - return err - } - if msg.Code == discMsg { - // disconnect before protocol handshake is valid according to the - // spec and we send it ourself if Server.addPeer fails. - var reason DiscReason - rlp.Decode(msg.Payload, &reason) - return discRequestedError(reason) - } - if msg.Code != handshakeMsg { - return newPeerError(errProtocolBreach, "expected handshake, got %x", msg.Code) - } - if msg.Size > baseProtocolMaxMsgSize { - return newPeerError(errInvalidMsg, "message too big") - } - var hs handshake - if err := msg.Decode(&hs); err != nil { - return err - } - // validate handshake info - if hs.Version != baseProtocolVersion { - return newPeerError(errP2PVersionMismatch, "required version %d, received %d\n", - baseProtocolVersion, hs.Version) - } - if hs.NodeID == *p.remoteID { - return newPeerError(errPubkeyForbidden, "node ID mismatch") - } - // TODO: remove Caps with empty name - p.setHandshakeInfo(hs.Name, hs.Caps) - p.startSubprotocols(hs.Caps) - return nil -} - -func writeProtocolHandshake(w MsgWriter, name string, id discover.NodeID, ps []Protocol) error { - var caps []interface{} - for _, proto := range ps { - caps = append(caps, proto.cap()) - } - return EncodeMsg(w, handshakeMsg, baseProtocolVersion, name, caps, 0, id) -} - -// startProtocols starts matching named subprotocols. -func (p *Peer) startSubprotocols(caps []Cap) { +// matchProtocols creates structures for matching named subprotocols. +func matchProtocols(protocols []Protocol, caps []Cap, rw MsgReadWriter) map[string]*protoRW { sort.Sort(capsByName(caps)) - p.runlock.Lock() - defer p.runlock.Unlock() offset := baseProtocolLength + result := make(map[string]*protoRW) outer: for _, cap := range caps { - for _, proto := range p.protocols { - if proto.Name == cap.Name && - proto.Version == cap.Version && - p.running[cap.Name] == nil { - p.running[cap.Name] = p.startProto(offset, proto) + for _, proto := range protocols { + if proto.Name == cap.Name && proto.Version == cap.Version && result[cap.Name] == nil { + result[cap.Name] = &protoRW{Protocol: proto, offset: offset, in: make(chan Msg), w: rw} offset += proto.Length continue outer } } } + return result } -func (p *Peer) startProto(offset uint64, impl Protocol) *proto { - p.DebugDetailf("Starting protocol %s/%d\n", impl.Name, impl.Version) - rw := &proto{ - name: impl.Name, - in: make(chan Msg), - offset: offset, - maxcode: impl.Length, - w: p.rw, +func (p *Peer) startProtocols() { + for _, proto := range p.running { + proto := proto + p.DebugDetailf("Starting protocol %s/%d\n", proto.Name, proto.Version) + p.protoWG.Add(1) + go func() { + err := proto.Run(p, proto) + if err == nil { + p.DebugDetailf("Protocol %s/%d returned\n", proto.Name, proto.Version) + err = errors.New("protocol returned") + } else { + p.DebugDetailf("Protocol %s/%d error: %v\n", proto.Name, proto.Version, err) + } + select { + case p.protoErr <- err: + case <-p.closed: + } + p.protoWG.Done() + }() } - p.protoWG.Add(1) - go func() { - err := impl.Run(p, rw) - if err == nil { - p.DebugDetailf("Protocol %s/%d returned\n", impl.Name, impl.Version) - err = errors.New("protocol returned") - } else { - p.DebugDetailf("Protocol %s/%d error: %v\n", impl.Name, impl.Version, err) - } - select { - case p.protoErr <- err: - case <-p.closed: - } - p.protoWG.Done() - }() - return rw } // getProto finds the protocol responsible for handling // the given message code. -func (p *Peer) getProto(code uint64) (*proto, error) { - p.runlock.RLock() - defer p.runlock.RUnlock() +func (p *Peer) getProto(code uint64) (*protoRW, error) { for _, proto := range p.running { - if code >= proto.offset && code < proto.offset+proto.maxcode { + if code >= proto.offset && code < proto.offset+proto.Length { return proto, nil } } @@ -355,46 +262,43 @@ func (p *Peer) getProto(code uint64) (*proto, error) { } func (p *Peer) closeProtocols() { - p.runlock.RLock() for _, p := range p.running { close(p.in) } - p.runlock.RUnlock() p.protoWG.Wait() } // writeProtoMsg sends the given message on behalf of the given named protocol. // this exists because of Server.Broadcast. func (p *Peer) writeProtoMsg(protoName string, msg Msg) error { - p.runlock.RLock() proto, ok := p.running[protoName] - p.runlock.RUnlock() if !ok { return fmt.Errorf("protocol %s not handled by peer", protoName) } - if msg.Code >= proto.maxcode { + if msg.Code >= proto.Length { return newPeerError(errInvalidMsgCode, "code %x is out of range for protocol %q", msg.Code, protoName) } msg.Code += proto.offset return p.rw.WriteMsg(msg) } -type proto struct { - name string - in chan Msg - maxcode, offset uint64 - w MsgWriter +type protoRW struct { + Protocol + + in chan Msg + offset uint64 + w MsgWriter } -func (rw *proto) WriteMsg(msg Msg) error { - if msg.Code >= rw.maxcode { +func (rw *protoRW) WriteMsg(msg Msg) error { + if msg.Code >= rw.Length { return newPeerError(errInvalidMsgCode, "not handled") } msg.Code += rw.offset return rw.w.WriteMsg(msg) } -func (rw *proto) ReadMsg() (Msg, error) { +func (rw *protoRW) ReadMsg() (Msg, error) { msg, ok := <-rw.in if !ok { return msg, io.EOF diff --git a/p2p/peer_test.go b/p2p/peer_test.go index 68c9910a2..a1260adbd 100644 --- a/p2p/peer_test.go +++ b/p2p/peer_test.go @@ -6,11 +6,9 @@ import ( "io/ioutil" "net" "reflect" - "sort" "testing" "time" - "github.com/ethereum/go-ethereum/p2p/discover" "github.com/ethereum/go-ethereum/rlp" ) @@ -23,6 +21,7 @@ var discard = Protocol{ if err != nil { return err } + fmt.Printf("discarding %d\n", msg.Code) if err = msg.Discard(); err != nil { return err } @@ -30,13 +29,20 @@ var discard = Protocol{ }, } -func testPeer(noHandshake bool, protos []Protocol) (*frameRW, *Peer, <-chan DiscReason) { - conn1, conn2 := net.Pipe() - peer := newPeer(conn1, protos, "name", &discover.NodeID{}, &discover.NodeID{}) - peer.noHandshake = noHandshake +func testPeer(protos []Protocol) (*conn, *Peer, <-chan DiscReason) { + fd1, fd2 := net.Pipe() + hs1 := &protoHandshake{ID: randomID(), Version: baseProtocolVersion} + hs2 := &protoHandshake{ID: randomID(), Version: baseProtocolVersion} + for _, p := range protos { + hs1.Caps = append(hs1.Caps, p.cap()) + hs2.Caps = append(hs2.Caps, p.cap()) + } + + peer := newPeer(newConn(fd1, hs1), protos) errc := make(chan DiscReason, 1) go func() { errc <- peer.run() }() - return newFrameRW(conn2, msgWriteTimeout), peer, errc + + return newConn(fd2, hs2), peer, errc } func TestPeerProtoReadMsg(t *testing.T) { @@ -61,9 +67,8 @@ func TestPeerProtoReadMsg(t *testing.T) { }, } - rw, peer, errc := testPeer(true, []Protocol{proto}) + rw, _, errc := testPeer([]Protocol{proto}) defer rw.Close() - peer.startSubprotocols([]Cap{proto.cap()}) EncodeMsg(rw, baseProtocolLength+2, 1) EncodeMsg(rw, baseProtocolLength+3, 2) @@ -100,9 +105,8 @@ func TestPeerProtoReadLargeMsg(t *testing.T) { }, } - rw, peer, errc := testPeer(true, []Protocol{proto}) + rw, _, errc := testPeer([]Protocol{proto}) defer rw.Close() - peer.startSubprotocols([]Cap{proto.cap()}) EncodeMsg(rw, 18, make([]byte, msgsize)) select { @@ -130,9 +134,8 @@ func TestPeerProtoEncodeMsg(t *testing.T) { return nil }, } - rw, peer, _ := testPeer(true, []Protocol{proto}) + rw, _, _ := testPeer([]Protocol{proto}) defer rw.Close() - peer.startSubprotocols([]Cap{proto.cap()}) if err := expectMsg(rw, 17, []string{"foo", "bar"}); err != nil { t.Error(err) @@ -142,9 +145,8 @@ func TestPeerProtoEncodeMsg(t *testing.T) { func TestPeerWriteForBroadcast(t *testing.T) { defer testlog(t).detach() - rw, peer, peerErr := testPeer(true, []Protocol{discard}) + rw, peer, peerErr := testPeer([]Protocol{discard}) defer rw.Close() - peer.startSubprotocols([]Cap{discard.cap()}) // test write errors if err := peer.writeProtoMsg("b", NewMsg(3)); err == nil { @@ -160,7 +162,7 @@ func TestPeerWriteForBroadcast(t *testing.T) { read := make(chan struct{}) go func() { if err := expectMsg(rw, 16, nil); err != nil { - t.Error() + t.Error(err) } close(read) }() @@ -179,7 +181,7 @@ func TestPeerWriteForBroadcast(t *testing.T) { func TestPeerPing(t *testing.T) { defer testlog(t).detach() - rw, _, _ := testPeer(true, nil) + rw, _, _ := testPeer(nil) defer rw.Close() if err := EncodeMsg(rw, pingMsg); err != nil { t.Fatal(err) @@ -192,7 +194,7 @@ func TestPeerPing(t *testing.T) { func TestPeerDisconnect(t *testing.T) { defer testlog(t).detach() - rw, _, disc := testPeer(true, nil) + rw, _, disc := testPeer(nil) defer rw.Close() if err := EncodeMsg(rw, discMsg, DiscQuitting); err != nil { t.Fatal(err) @@ -206,73 +208,6 @@ func TestPeerDisconnect(t *testing.T) { } } -func TestPeerHandshake(t *testing.T) { - defer testlog(t).detach() - - // remote has two matching protocols: a and c - remote := NewPeer(randomID(), "", []Cap{{"a", 1}, {"b", 999}, {"c", 3}}) - remoteID := randomID() - remote.ourID = &remoteID - remote.ourName = "remote peer" - - start := make(chan string) - stop := make(chan struct{}) - run := func(p *Peer, rw MsgReadWriter) error { - name := rw.(*proto).name - if name != "a" && name != "c" { - t.Errorf("protocol %q should not be started", name) - } else { - start <- name - } - <-stop - return nil - } - protocols := []Protocol{ - {Name: "a", Version: 1, Length: 1, Run: run}, - {Name: "b", Version: 2, Length: 1, Run: run}, - {Name: "c", Version: 3, Length: 1, Run: run}, - {Name: "d", Version: 4, Length: 1, Run: run}, - } - rw, p, disc := testPeer(false, protocols) - p.remoteID = remote.ourID - defer rw.Close() - - // run the handshake - remoteProtocols := []Protocol{protocols[0], protocols[2]} - if err := writeProtocolHandshake(rw, "remote peer", remoteID, remoteProtocols); err != nil { - t.Fatalf("handshake write error: %v", err) - } - if err := readProtocolHandshake(remote, rw); err != nil { - t.Fatalf("handshake read error: %v", err) - } - - // check that all protocols have been started - var started []string - for i := 0; i < 2; i++ { - select { - case name := <-start: - started = append(started, name) - case <-time.After(100 * time.Millisecond): - } - } - sort.Strings(started) - if !reflect.DeepEqual(started, []string{"a", "c"}) { - t.Errorf("wrong protocols started: %v", started) - } - - // check that metadata has been set - if p.ID() != remoteID { - t.Errorf("peer has wrong node ID: got %v, want %v", p.ID(), remoteID) - } - if p.Name() != remote.ourName { - t.Errorf("peer has wrong node name: got %q, want %q", p.Name(), remote.ourName) - } - - close(stop) - expectMsg(rw, discMsg, nil) - t.Logf("disc reason: %v", <-disc) -} - func TestNewPeer(t *testing.T) { name := "nodename" caps := []Cap{{"foo", 2}, {"bar", 3}} diff --git a/p2p/server.go b/p2p/server.go index 35b584a27..3ea2538d1 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -5,7 +5,6 @@ import ( "crypto/ecdsa" "errors" "fmt" - "io" "net" "runtime" "sync" @@ -23,6 +22,7 @@ const ( ) var srvlog = logger.NewLogger("P2P Server") +var srvjslog = logger.NewJsonLogger() // MakeName creates a node name that follows the ethereum convention // for such names. It adds the operation system name and Go runtime version @@ -83,9 +83,11 @@ type Server struct { // Hooks for testing. These are useful because we can inhibit // the whole protocol stack. - handshakeFunc + setupFunc newPeerHook + ourHandshake *protoHandshake + lock sync.RWMutex running bool listener net.Listener @@ -99,7 +101,7 @@ type Server struct { peerConnect chan *discover.Node } -type handshakeFunc func(io.ReadWriter, *ecdsa.PrivateKey, *discover.Node) (discover.NodeID, []byte, error) +type setupFunc func(net.Conn, *ecdsa.PrivateKey, *protoHandshake, *discover.Node) (*conn, error) type newPeerHook func(*Peer) // Peers returns all connected peers. @@ -159,7 +161,7 @@ func (srv *Server) Start() (err error) { } srvlog.Infoln("Starting Server") - // initialize all the fields + // static fields if srv.PrivateKey == nil { return fmt.Errorf("Server.PrivateKey must be set to a non-nil key") } @@ -169,25 +171,32 @@ func (srv *Server) Start() (err error) { srv.quit = make(chan struct{}) srv.peers = make(map[discover.NodeID]*Peer) srv.peerConnect = make(chan *discover.Node) - - if srv.handshakeFunc == nil { - srv.handshakeFunc = encHandshake + if srv.setupFunc == nil { + srv.setupFunc = setupConn } if srv.Blacklist == nil { srv.Blacklist = NewBlacklist() } + + // node table + ntab, err := discover.ListenUDP(srv.PrivateKey, srv.ListenAddr, srv.NAT) + if err != nil { + return err + } + srv.ntab = ntab + + // handshake + srv.ourHandshake = &protoHandshake{Version: baseProtocolVersion, Name: srv.Name, ID: ntab.Self()} + for _, p := range srv.Protocols { + srv.ourHandshake.Caps = append(srv.ourHandshake.Caps, p.cap()) + } + + // listen/dial if srv.ListenAddr != "" { if err := srv.startListening(); err != nil { return err } } - - // dial stuff - dt, err := discover.ListenUDP(srv.PrivateKey, srv.ListenAddr, srv.NAT) - if err != nil { - return err - } - srv.ntab = dt if srv.Dialer == nil { srv.Dialer = &net.Dialer{Timeout: defaultDialTimeout} } @@ -347,30 +356,41 @@ func (srv *Server) findPeers() { } } -func (srv *Server) startPeer(conn net.Conn, dest *discover.Node) { +func (srv *Server) startPeer(fd net.Conn, dest *discover.Node) { // TODO: handle/store session token - conn.SetDeadline(time.Now().Add(handshakeTimeout)) - remoteID, _, err := srv.handshakeFunc(conn, srv.PrivateKey, dest) + fd.SetDeadline(time.Now().Add(handshakeTimeout)) + conn, err := srv.setupFunc(fd, srv.PrivateKey, srv.ourHandshake, dest) if err != nil { - conn.Close() - srvlog.Debugf("Encryption Handshake with %v failed: %v", conn.RemoteAddr(), err) + fd.Close() + srvlog.Debugf("Handshake with %v failed: %v", fd.RemoteAddr(), err) return } - ourID := srv.ntab.Self() - p := newPeer(conn, srv.Protocols, srv.Name, &ourID, &remoteID) - if ok, reason := srv.addPeer(remoteID, p); !ok { + p := newPeer(conn, srv.Protocols) + if ok, reason := srv.addPeer(conn.ID, p); !ok { srvlog.DebugDetailf("Not adding %v (%v)\n", p, reason) p.politeDisconnect(reason) return } + srvlog.Debugf("Added %v\n", p) + srvjslog.LogJson(&logger.P2PConnected{ + RemoteId: fmt.Sprintf("%x", conn.ID[:]), + RemoteAddress: conn.RemoteAddr().String(), + RemoteVersionString: conn.Name, + NumConnections: srv.PeerCount(), + }) if srv.newPeerHook != nil { srv.newPeerHook(p) } discreason := p.run() srv.removePeer(p) + srvlog.Debugf("Removed %v (%v)\n", p, discreason) + srvjslog.LogJson(&logger.P2PDisconnected{ + RemoteId: fmt.Sprintf("%x", conn.ID[:]), + NumConnections: srv.PeerCount(), + }) } func (srv *Server) addPeer(id discover.NodeID, p *Peer) (bool, DiscReason) { @@ -394,7 +414,7 @@ func (srv *Server) addPeer(id discover.NodeID, p *Peer) (bool, DiscReason) { func (srv *Server) removePeer(p *Peer) { srv.lock.Lock() - delete(srv.peers, *p.remoteID) + delete(srv.peers, p.ID()) srv.lock.Unlock() srv.peerWG.Done() } diff --git a/p2p/server_test.go b/p2p/server_test.go index aa2b3d243..c109fffb9 100644 --- a/p2p/server_test.go +++ b/p2p/server_test.go @@ -21,8 +21,12 @@ func startTestServer(t *testing.T, pf newPeerHook) *Server { ListenAddr: "127.0.0.1:0", PrivateKey: newkey(), newPeerHook: pf, - handshakeFunc: func(io.ReadWriter, *ecdsa.PrivateKey, *discover.Node) (id discover.NodeID, st []byte, err error) { - return randomID(), nil, err + setupFunc: func(fd net.Conn, prv *ecdsa.PrivateKey, our *protoHandshake, dial *discover.Node) (*conn, error) { + id := randomID() + return &conn{ + frameRW: newFrameRW(fd, msgWriteTimeout), + protoHandshake: &protoHandshake{ID: id, Version: baseProtocolVersion}, + }, nil }, } if err := server.Start(); err != nil { @@ -116,9 +120,7 @@ func TestServerBroadcast(t *testing.T) { var connected sync.WaitGroup srv := startTestServer(t, func(p *Peer) { - p.protocols = []Protocol{discard} - p.startSubprotocols([]Cap{discard.cap()}) - p.noHandshake = true + p.running = matchProtocols([]Protocol{discard}, []Cap{discard.cap()}, p.rw) connected.Done() }) defer srv.Stop() diff --git a/pow/ezp/pow.go b/pow/ezp/pow.go index 540381243..f4a8b80e5 100644 --- a/pow/ezp/pow.go +++ b/pow/ezp/pow.go @@ -21,7 +21,7 @@ type EasyPow struct { } func New() *EasyPow { - return &EasyPow{turbo: false} + return &EasyPow{turbo: true} } func (pow *EasyPow) GetHashrate() int64 { diff --git a/rpc/args.go b/rpc/args.go index 429b385d5..f730819fd 100644 --- a/rpc/args.go +++ b/rpc/args.go @@ -289,7 +289,7 @@ type WhisperMessageArgs struct { Payload string To string From string - Topics []string + Topic []string Priority uint32 Ttl uint32 } diff --git a/rpc/message.go b/rpc/message.go index d02acef0e..825ede05b 100644 --- a/rpc/message.go +++ b/rpc/message.go @@ -346,7 +346,7 @@ func (req *RpcRequest) ToWhisperFilterArgs() (*xeth.Options, error) { return &args, nil } -func (req *RpcRequest) ToWhisperIdArgs() (int, error) { +func (req *RpcRequest) ToIdArgs() (int, error) { if len(req.Params) < 1 { return 0, NewErrorResponse(ErrorArguments) } diff --git a/rpc/packages.go b/rpc/packages.go index d82538c92..56c1751d7 100644 --- a/rpc/packages.go +++ b/rpc/packages.go @@ -13,6 +13,7 @@ import ( "math/big" "strings" "sync" + "time" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" @@ -31,13 +32,14 @@ const ( type EthereumApi struct { xeth *xeth.XEth + quit chan struct{} filterManager *filter.FilterManager logMut sync.RWMutex - logs map[int]state.Logs + logs map[int]*logFilter messagesMut sync.RWMutex - messages map[int][]xeth.WhisperMessage + messages map[int]*whisperFilter // Register keeps a list of accounts and transaction data regmut sync.Mutex register map[string][]*NewTxArgs @@ -49,12 +51,14 @@ func NewEthereumApi(eth *xeth.XEth) *EthereumApi { db, _ := ethdb.NewLDBDatabase("dapps") api := &EthereumApi{ xeth: eth, + quit: make(chan struct{}), filterManager: filter.NewFilterManager(eth.Backend().EventMux()), - logs: make(map[int]state.Logs), - messages: make(map[int][]xeth.WhisperMessage), + logs: make(map[int]*logFilter), + messages: make(map[int]*whisperFilter), db: db, } go api.filterManager.Start() + go api.start() return api } @@ -97,7 +101,11 @@ func (self *EthereumApi) NewFilter(args *FilterOptions, reply *interface{}) erro self.logMut.Lock() defer self.logMut.Unlock() - self.logs[id] = append(self.logs[id], logs...) + if self.logs[id] == nil { + self.logs[id] = &logFilter{timeout: time.Now()} + } + + self.logs[id].add(logs...) } id = self.filterManager.InstallFilter(filter) *reply = id @@ -120,7 +128,11 @@ func (self *EthereumApi) NewFilterString(args string, reply *interface{}) error self.logMut.Lock() defer self.logMut.Unlock() - self.logs[id] = append(self.logs[id], &state.StateLog{}) + if self.logs[id] == nil { + self.logs[id] = &logFilter{timeout: time.Now()} + } + + self.logs[id].add(&state.StateLog{}) } if args == "pending" { filter.PendingCallback = callback @@ -138,9 +150,9 @@ func (self *EthereumApi) FilterChanged(id int, reply *interface{}) error { self.logMut.Lock() defer self.logMut.Unlock() - *reply = toLogs(self.logs[id]) - - self.logs[id] = nil // empty the logs + if self.logs[id] != nil { + *reply = toLogs(self.logs[id].get()) + } return nil } @@ -187,6 +199,7 @@ func (p *EthereumApi) Transact(args *NewTxArgs, reply *interface{}) error { result, _ := p.xeth.Transact( /* TODO specify account */ args.To, args.Value, args.Gas, args.GasPrice, args.Data) *reply = result } + return nil } @@ -357,7 +370,10 @@ func (p *EthereumApi) NewWhisperFilter(args *xeth.Options, reply *interface{}) e args.Fn = func(msg xeth.WhisperMessage) { p.messagesMut.Lock() defer p.messagesMut.Unlock() - p.messages[id] = append(p.messages[id], msg) + if p.messages[id] == nil { + p.messages[id] = &whisperFilter{timeout: time.Now()} + } + p.messages[id].add(msg) // = append(p.messages[id], msg) } id = p.xeth.Whisper().Watch(args) *reply = id @@ -368,15 +384,15 @@ func (self *EthereumApi) MessagesChanged(id int, reply *interface{}) error { self.messagesMut.Lock() defer self.messagesMut.Unlock() - *reply = self.messages[id] - - self.messages[id] = nil // empty the messages + if self.messages[id] != nil { + *reply = self.messages[id].get() + } return nil } func (p *EthereumApi) WhisperPost(args *WhisperMessageArgs, reply *interface{}) error { - err := p.xeth.Whisper().Post(args.Payload, args.To, args.From, args.Topics, args.Priority, args.Ttl) + err := p.xeth.Whisper().Post(args.Payload, args.To, args.From, args.Topic, args.Priority, args.Ttl) if err != nil { return err } @@ -484,13 +500,13 @@ func (p *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) error } return p.UninstallFilter(args, reply) case "eth_changed": - args, err := req.ToFilterChangedArgs() + args, err := req.ToIdArgs() if err != nil { return err } return p.FilterChanged(args, reply) case "eth_filterLogs": - args, err := req.ToFilterChangedArgs() + args, err := req.ToIdArgs() if err != nil { return err } @@ -551,7 +567,7 @@ func (p *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) error } return p.NewWhisperFilter(args, reply) case "shh_changed": - args, err := req.ToWhisperIdArgs() + args, err := req.ToIdArgs() if err != nil { return err } @@ -569,7 +585,7 @@ func (p *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) error } return p.HasWhisperIdentity(args, reply) case "shh_getMessages": - args, err := req.ToWhisperIdArgs() + args, err := req.ToIdArgs() if err != nil { return err } @@ -581,3 +597,34 @@ func (p *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) error rpclogger.DebugDetailf("Reply: %T %s", reply, reply) return nil } + +var filterTickerTime = 15 * time.Second + +func (self *EthereumApi) start() { + timer := time.NewTicker(filterTickerTime) +done: + for { + select { + case <-timer.C: + self.logMut.Lock() + self.messagesMut.Lock() + for id, filter := range self.logs { + if time.Since(filter.timeout) > 20*time.Second { + delete(self.logs, id) + } + } + + for id, filter := range self.messages { + if time.Since(filter.timeout) > 20*time.Second { + delete(self.messages, id) + } + } + case <-self.quit: + break done + } + } +} + +func (self *EthereumApi) stop() { + close(self.quit) +} diff --git a/rpc/packages_test.go b/rpc/packages_test.go new file mode 100644 index 000000000..037fd78b3 --- /dev/null +++ b/rpc/packages_test.go @@ -0,0 +1,37 @@ +package rpc + +import ( + "sync" + "testing" + "time" +) + +func TestFilterClose(t *testing.T) { + api := &EthereumApi{ + logs: make(map[int]*logFilter), + messages: make(map[int]*whisperFilter), + quit: make(chan struct{}), + } + + filterTickerTime = 1 + api.logs[0] = &logFilter{} + api.messages[0] = &whisperFilter{} + var wg sync.WaitGroup + wg.Add(1) + go api.start() + go func() { + select { + case <-time.After(500 * time.Millisecond): + api.stop() + wg.Done() + } + }() + wg.Wait() + if len(api.logs) != 0 { + t.Error("expected logs to be empty") + } + + if len(api.messages) != 0 { + t.Error("expected messages to be empty") + } +} diff --git a/rpc/util.go b/rpc/util.go index 679d83754..29824bcdb 100644 --- a/rpc/util.go +++ b/rpc/util.go @@ -20,10 +20,12 @@ import ( "encoding/json" "io" "net/http" + "time" "github.com/ethereum/go-ethereum/ethutil" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/state" + "github.com/ethereum/go-ethereum/xeth" ) var rpclogger = logger.NewLogger("RPC") @@ -80,7 +82,7 @@ type RpcServer interface { type Log struct { Address string `json:"address"` - Topics []string `json:"topics"` + Topic []string `json:"topics"` Data string `json:"data"` } @@ -89,14 +91,45 @@ func toLogs(logs state.Logs) (ls []Log) { for i, log := range logs { var l Log - l.Topics = make([]string, len(log.Topics())) + l.Topic = make([]string, len(log.Topics())) l.Address = toHex(log.Address()) l.Data = toHex(log.Data()) for j, topic := range log.Topics() { - l.Topics[j] = toHex(topic) + l.Topic[j] = toHex(topic) } ls[i] = l } return } + +type whisperFilter struct { + messages []xeth.WhisperMessage + timeout time.Time +} + +func (w *whisperFilter) add(msgs ...xeth.WhisperMessage) { + w.messages = append(w.messages, msgs...) +} +func (w *whisperFilter) get() []xeth.WhisperMessage { + w.timeout = time.Now() + tmp := w.messages + w.messages = nil + return tmp +} + +type logFilter struct { + logs state.Logs + timeout time.Time +} + +func (l *logFilter) add(logs ...state.Log) { + l.logs = append(l.logs, logs...) +} + +func (l *logFilter) get() state.Logs { + l.timeout = time.Now() + tmp := l.logs + l.logs = nil + return tmp +} diff --git a/state/dump.go b/state/dump.go index ac646480c..81895f1a3 100644 --- a/state/dump.go +++ b/state/dump.go @@ -30,7 +30,7 @@ func (self *StateDB) Dump() []byte { for it.Next() { stateObject := NewStateObjectFromBytes(it.Key, it.Value, self.db) - account := Account{Balance: stateObject.balance.String(), Nonce: stateObject.Nonce, Root: ethutil.Bytes2Hex(stateObject.Root()), CodeHash: ethutil.Bytes2Hex(stateObject.codeHash)} + account := Account{Balance: stateObject.balance.String(), Nonce: stateObject.nonce, Root: ethutil.Bytes2Hex(stateObject.Root()), CodeHash: ethutil.Bytes2Hex(stateObject.codeHash)} account.Storage = make(map[string]string) storageIt := stateObject.State.trie.Iterator() @@ -50,7 +50,7 @@ func (self *StateDB) Dump() []byte { // Debug stuff func (self *StateObject) CreateOutputForDiff() { - fmt.Printf("%x %x %x %x\n", self.Address(), self.State.Root(), self.balance.Bytes(), self.Nonce) + fmt.Printf("%x %x %x %x\n", self.Address(), self.State.Root(), self.balance.Bytes(), self.nonce) it := self.State.trie.Iterator() for it.Next() { fmt.Printf("%x %x\n", it.Key, it.Value) diff --git a/state/state_object.go b/state/state_object.go index 0c157403c..477b912a1 100644 --- a/state/state_object.go +++ b/state/state_object.go @@ -36,11 +36,11 @@ type StateObject struct { // Shared attributes balance *big.Int codeHash []byte - Nonce uint64 + nonce uint64 // Contract related attributes State *StateDB - Code Code - InitCode Code + code Code + initCode Code storage Storage @@ -53,6 +53,7 @@ type StateObject struct { // When an object is marked for deletion it will be delete from the trie // during the "update" phase of the state transition remove bool + dirty bool } func (self *StateObject) Reset() { @@ -64,7 +65,7 @@ func NewStateObject(addr []byte, db ethutil.Database) *StateObject { // This to ensure that it has 20 bytes (and not 0 bytes), thus left or right pad doesn't matter. address := ethutil.Address(addr) - object := &StateObject{db: db, address: address, balance: new(big.Int), gasPool: new(big.Int)} + object := &StateObject{db: db, address: address, balance: new(big.Int), gasPool: new(big.Int), dirty: true} object.State = New(nil, db) //New(trie.New(ethutil.Config.Db, "")) object.storage = make(Storage) object.gasPool = new(big.Int) @@ -88,20 +89,21 @@ func NewStateObjectFromBytes(address, data []byte, db ethutil.Database) *StateOb object := &StateObject{address: address, db: db} //object.RlpDecode(data) - object.Nonce = extobject.Nonce + object.nonce = extobject.Nonce object.balance = extobject.Balance object.codeHash = extobject.CodeHash object.State = New(extobject.Root, db) object.storage = make(map[string]*ethutil.Value) object.gasPool = new(big.Int) - object.Code, _ = db.Get(extobject.CodeHash) + object.code, _ = db.Get(extobject.CodeHash) return object } func (self *StateObject) MarkForDeletion() { self.remove = true - statelogger.DebugDetailf("%x: #%d %v (deletion)\n", self.Address(), self.Nonce, self.balance) + self.dirty = true + statelogger.DebugDetailf("%x: #%d %v (deletion)\n", self.Address(), self.nonce, self.balance) } func (c *StateObject) getAddr(addr []byte) *ethutil.Value { @@ -117,6 +119,7 @@ func (self *StateObject) GetStorage(key *big.Int) *ethutil.Value { } func (self *StateObject) SetStorage(key *big.Int, value *ethutil.Value) { self.SetState(key.Bytes(), value) + self.dirty = true } func (self *StateObject) Storage() map[string]*ethutil.Value { @@ -141,6 +144,7 @@ func (self *StateObject) GetState(k []byte) *ethutil.Value { func (self *StateObject) SetState(k []byte, value *ethutil.Value) { key := ethutil.LeftPadBytes(k, 32) self.storage[string(key)] = value.Copy() + self.dirty = true } func (self *StateObject) Sync() { @@ -152,36 +156,36 @@ func (self *StateObject) Sync() { self.setAddr([]byte(key), value) } + self.storage = make(Storage) } func (c *StateObject) GetInstr(pc *big.Int) *ethutil.Value { - if int64(len(c.Code)-1) < pc.Int64() { + if int64(len(c.code)-1) < pc.Int64() { return ethutil.NewValue(0) } - return ethutil.NewValueFromBytes([]byte{c.Code[pc.Int64()]}) + return ethutil.NewValueFromBytes([]byte{c.code[pc.Int64()]}) } func (c *StateObject) AddBalance(amount *big.Int) { c.SetBalance(new(big.Int).Add(c.balance, amount)) - statelogger.Debugf("%x: #%d %v (+ %v)\n", c.Address(), c.Nonce, c.balance, amount) + statelogger.Debugf("%x: #%d %v (+ %v)\n", c.Address(), c.nonce, c.balance, amount) } func (c *StateObject) AddAmount(amount *big.Int) { c.AddBalance(amount) } func (c *StateObject) SubBalance(amount *big.Int) { c.SetBalance(new(big.Int).Sub(c.balance, amount)) - statelogger.Debugf("%x: #%d %v (- %v)\n", c.Address(), c.Nonce, c.balance, amount) + statelogger.Debugf("%x: #%d %v (- %v)\n", c.Address(), c.nonce, c.balance, amount) } func (c *StateObject) SubAmount(amount *big.Int) { c.SubBalance(amount) } func (c *StateObject) SetBalance(amount *big.Int) { c.balance = amount + c.dirty = true } -func (self *StateObject) Balance() *big.Int { return self.balance } - // // Gas setters and getters // @@ -196,6 +200,8 @@ func (c *StateObject) ConvertGas(gas, price *big.Int) error { c.SubAmount(total) + c.dirty = true + return nil } @@ -210,11 +216,15 @@ func (self *StateObject) BuyGas(gas, price *big.Int) error { return GasLimitError(self.gasPool, gas) } + self.gasPool.Sub(self.gasPool, gas) + rGas := new(big.Int).Set(gas) rGas.Mul(rGas, price) self.AddAmount(rGas) + self.dirty = true + return nil } @@ -231,15 +241,16 @@ func (self *StateObject) Copy() *StateObject { stateObject := NewStateObject(self.Address(), self.db) stateObject.balance.Set(self.balance) stateObject.codeHash = ethutil.CopyBytes(self.codeHash) - stateObject.Nonce = self.Nonce + stateObject.nonce = self.nonce if self.State != nil { stateObject.State = self.State.Copy() } - stateObject.Code = ethutil.CopyBytes(self.Code) - stateObject.InitCode = ethutil.CopyBytes(self.InitCode) + stateObject.code = ethutil.CopyBytes(self.code) + stateObject.initCode = ethutil.CopyBytes(self.initCode) stateObject.storage = self.storage.Copy() stateObject.gasPool.Set(self.gasPool) stateObject.remove = self.remove + stateObject.dirty = self.dirty return stateObject } @@ -252,8 +263,12 @@ func (self *StateObject) Set(stateObject *StateObject) { // Attribute accessors // +func (self *StateObject) Balance() *big.Int { + return self.balance +} + func (c *StateObject) N() *big.Int { - return big.NewInt(int64(c.Nonce)) + return big.NewInt(int64(c.nonce)) } // Returns the address of the contract/account @@ -263,7 +278,7 @@ func (c *StateObject) Address() []byte { // Returns the initialization Code func (c *StateObject) Init() Code { - return c.InitCode + return c.initCode } func (self *StateObject) Trie() *trie.Trie { @@ -274,8 +289,27 @@ func (self *StateObject) Root() []byte { return self.Trie().Root() } +func (self *StateObject) Code() []byte { + return self.code +} + func (self *StateObject) SetCode(code []byte) { - self.Code = code + self.code = code + self.dirty = true +} + +func (self *StateObject) SetInitCode(code []byte) { + self.initCode = code + self.dirty = true +} + +func (self *StateObject) SetNonce(nonce uint64) { + self.nonce = nonce + self.dirty = true +} + +func (self *StateObject) Nonce() uint64 { + return self.nonce } // @@ -284,16 +318,16 @@ func (self *StateObject) SetCode(code []byte) { // State object encoding methods func (c *StateObject) RlpEncode() []byte { - return ethutil.Encode([]interface{}{c.Nonce, c.balance, c.Root(), c.CodeHash()}) + return ethutil.Encode([]interface{}{c.nonce, c.balance, c.Root(), c.CodeHash()}) } func (c *StateObject) CodeHash() ethutil.Bytes { - return crypto.Sha3(c.Code) + return crypto.Sha3(c.code) } func (c *StateObject) RlpDecode(data []byte) { decoder := ethutil.NewValueFromBytes(data) - c.Nonce = decoder.Get(0).Uint() + c.nonce = decoder.Get(0).Uint() c.balance = decoder.Get(1).BigInt() c.State = New(decoder.Get(2).Bytes(), c.db) //New(trie.New(ethutil.Config.Db, decoder.Get(2).Interface())) c.storage = make(map[string]*ethutil.Value) @@ -301,7 +335,7 @@ func (c *StateObject) RlpDecode(data []byte) { c.codeHash = decoder.Get(3).Bytes() - c.Code, _ = c.db.Get(c.codeHash) + c.code, _ = c.db.Get(c.codeHash) } // Storage change object. Used by the manifest for notifying changes to diff --git a/state/state_test.go b/state/state_test.go index 7c54cedc0..ee1cf9286 100644 --- a/state/state_test.go +++ b/state/state_test.go @@ -1,6 +1,8 @@ package state import ( + "math/big" + checker "gopkg.in/check.v1" "github.com/ethereum/go-ethereum/ethdb" @@ -16,11 +18,42 @@ var _ = checker.Suite(&StateSuite{}) // var ZeroHash256 = make([]byte, 32) func (s *StateSuite) TestDump(c *checker.C) { - key := []byte{0x01} - value := []byte("foo") - s.state.trie.Update(key, value) - dump := s.state.Dump() - c.Assert(dump, checker.NotNil) + // generate a few entries + obj1 := s.state.GetOrNewStateObject([]byte{0x01}) + obj1.AddBalance(big.NewInt(22)) + obj2 := s.state.GetOrNewStateObject([]byte{0x01, 0x02}) + obj2.SetCode([]byte{3, 3, 3, 3, 3, 3, 3}) + obj3 := s.state.GetOrNewStateObject([]byte{0x02}) + obj3.SetBalance(big.NewInt(44)) + + // write some of them to the trie + s.state.UpdateStateObject(obj1) + s.state.UpdateStateObject(obj2) + + // check that dump contains the state objects that are in trie + got := string(s.state.Dump()) + want := `{ + "root": "4e3a59299745ba6752247c8b91d0f716dac9ec235861c91f5ac1894a361d87ba", + "accounts": { + "0000000000000000000000000000000000000001": { + "balance": "22", + "nonce": 0, + "root": "56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421", + "codeHash": "c5d2460186f7233c927e7db2dcc703c0e500b653ca82273b7bfad8045d85a470", + "storage": {} + }, + "0000000000000000000000000000000000000102": { + "balance": "0", + "nonce": 0, + "root": "56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421", + "codeHash": "87874902497a5bb968da31a2998d8f22e949d1ef6214bcdedd8bae24cca4b9e3", + "storage": {} + } + } +}` + if got != want { + c.Errorf("dump mismatch:\ngot: %s\nwant: %s\n", got, want) + } } func (s *StateSuite) SetUpTest(c *checker.C) { diff --git a/state/statedb.go b/state/statedb.go index c83d59ed7..7e2b24b94 100644 --- a/state/statedb.go +++ b/state/statedb.go @@ -72,43 +72,42 @@ func (self *StateDB) AddBalance(addr []byte, amount *big.Int) { func (self *StateDB) GetNonce(addr []byte) uint64 { stateObject := self.GetStateObject(addr) if stateObject != nil { - return stateObject.Nonce + return stateObject.nonce } return 0 } -func (self *StateDB) SetNonce(addr []byte, nonce uint64) { +func (self *StateDB) GetCode(addr []byte) []byte { stateObject := self.GetStateObject(addr) if stateObject != nil { - stateObject.Nonce = nonce + return stateObject.code } + + return nil } -func (self *StateDB) GetCode(addr []byte) []byte { - stateObject := self.GetStateObject(addr) +func (self *StateDB) GetState(a, b []byte) []byte { + stateObject := self.GetStateObject(a) if stateObject != nil { - return stateObject.Code + return stateObject.GetState(b).Bytes() } return nil } -func (self *StateDB) SetCode(addr, code []byte) { +func (self *StateDB) SetNonce(addr []byte, nonce uint64) { stateObject := self.GetStateObject(addr) if stateObject != nil { - stateObject.SetCode(code) + stateObject.SetNonce(nonce) } } -// TODO vars -func (self *StateDB) GetState(a, b []byte) []byte { - stateObject := self.GetStateObject(a) +func (self *StateDB) SetCode(addr, code []byte) { + stateObject := self.GetStateObject(addr) if stateObject != nil { - return stateObject.GetState(b).Bytes() + stateObject.SetCode(code) } - - return nil } func (self *StateDB) SetState(addr, key []byte, value interface{}) { @@ -138,7 +137,7 @@ func (self *StateDB) UpdateStateObject(stateObject *StateObject) { addr := stateObject.Address() if len(stateObject.CodeHash()) > 0 { - self.db.Put(stateObject.CodeHash(), stateObject.Code) + self.db.Put(stateObject.CodeHash(), stateObject.code) } self.trie.Update(addr, stateObject.RlpEncode()) @@ -282,16 +281,18 @@ func (self *StateDB) Refunds() map[string]*big.Int { } func (self *StateDB) Update(gasUsed *big.Int) { - self.refund = make(map[string]*big.Int) for _, stateObject := range self.stateObjects { - if stateObject.remove { - self.DeleteStateObject(stateObject) - } else { - stateObject.Sync() - - self.UpdateStateObject(stateObject) + if stateObject.dirty { + if stateObject.remove { + self.DeleteStateObject(stateObject) + } else { + stateObject.Sync() + + self.UpdateStateObject(stateObject) + } + stateObject.dirty = false } } } diff --git a/tests/vm/gh_test.go b/tests/vm/gh_test.go index 17f945910..2151cf9a5 100644 --- a/tests/vm/gh_test.go +++ b/tests/vm/gh_test.go @@ -46,8 +46,8 @@ func StateObjectFromAccount(db ethutil.Database, addr string, account Account) * if ethutil.IsHex(account.Code) { account.Code = account.Code[2:] } - obj.Code = ethutil.Hex2Bytes(account.Code) - obj.Nonce = ethutil.Big(account.Nonce).Uint64() + obj.SetCode(ethutil.Hex2Bytes(account.Code)) + obj.SetNonce(ethutil.Big(account.Nonce).Uint64()) return obj } diff --git a/tests/vm/nowarn.go b/tests/vm/nowarn.go new file mode 100644 index 000000000..2a45a6cc6 --- /dev/null +++ b/tests/vm/nowarn.go @@ -0,0 +1,3 @@ +// This silences the warning given by 'go install ./...'. + +package vm diff --git a/update-license.go b/update-license.go index d5e21fdd3..832a94712 100644 --- a/update-license.go +++ b/update-license.go @@ -1,4 +1,5 @@ // +build none + /* This command generates GPL license headers on top of all source files. You can run it once per month, before cutting a release or just diff --git a/xeth/types.go b/xeth/types.go index a903fccbb..5b2d16018 100644 --- a/xeth/types.go +++ b/xeth/types.go @@ -150,7 +150,7 @@ type Transaction struct { func NewTx(tx *types.Transaction) *Transaction { hash := toHex(tx.Hash()) receiver := toHex(tx.To()) - if receiver == "0000000000000000000000000000000000000000" { + if len(receiver) == 0 { receiver = toHex(core.AddressFromMessage(tx)) } sender := toHex(tx.From()) diff --git a/xeth/xeth.go b/xeth/xeth.go index 06915c5e0..f3569e454 100644 --- a/xeth/xeth.go +++ b/xeth/xeth.go @@ -7,6 +7,7 @@ package xeth import ( "bytes" "encoding/json" + "fmt" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" @@ -138,15 +139,15 @@ func (self *XEth) BalanceAt(addr string) string { } func (self *XEth) TxCountAt(address string) int { - return int(self.State().SafeGet(address).Nonce) + return int(self.State().SafeGet(address).Nonce()) } func (self *XEth) CodeAt(address string) string { - return toHex(self.State().SafeGet(address).Code) + return toHex(self.State().SafeGet(address).Code()) } func (self *XEth) IsContract(address string) bool { - return len(self.State().SafeGet(address).Code) > 0 + return len(self.State().SafeGet(address).Code()) > 0 } func (self *XEth) SecretToAddress(key string) string { @@ -252,7 +253,6 @@ func (self *XEth) Call(toStr, valueStr, gasStr, gasPriceStr, dataStr string) (st } func (self *XEth) Transact(toStr, valueStr, gasStr, gasPriceStr, codeStr string) (string, error) { - var ( to []byte value = ethutil.NewValue(valueStr) @@ -276,29 +276,32 @@ func (self *XEth) Transact(toStr, valueStr, gasStr, gasPriceStr, codeStr string) tx = types.NewTransactionMessage(to, value.BigInt(), gas.BigInt(), price.BigInt(), data) } - state := self.chainManager.TransState() + var err error + state := self.eth.ChainManager().TransState() + if balance := state.GetBalance(key.Address()); balance.Cmp(tx.Value()) < 0 { + return "", fmt.Errorf("insufficient balance. balance=%v tx=%v", balance, tx.Value()) + } nonce := state.GetNonce(key.Address()) tx.SetNonce(nonce) tx.Sign(key.PrivateKey) - // Do some pre processing for our "pre" events and hooks - block := self.chainManager.NewBlock(key.Address()) - coinbase := state.GetOrNewStateObject(key.Address()) - coinbase.SetGasPool(block.GasLimit()) - self.blockProcessor.ApplyTransactions(coinbase, state, block, types.Transactions{tx}, true) + //fmt.Printf("create tx: %x %v\n", tx.Hash()[:4], tx.Nonce()) - err := self.eth.TxPool().Add(tx) + /* + // Do some pre processing for our "pre" events and hooks + block := self.chainManager.NewBlock(key.Address()) + coinbase := state.GetOrNewStateObject(key.Address()) + coinbase.SetGasPool(block.GasLimit()) + self.blockProcessor.ApplyTransactions(coinbase, state, block, types.Transactions{tx}, true) + */ + + err = self.eth.TxPool().Add(tx) if err != nil { return "", err } state.SetNonce(key.Address(), nonce+1) - if contractCreation { - addr := core.AddressFromMessage(tx) - pipelogger.Infof("Contract addr %x\n", addr) - } - if types.IsContractAddr(to) { return toHex(core.AddressFromMessage(tx)), nil } |