aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDaniel A. Nagy <nagy.da@gmail.com>2015-05-11 18:47:14 +0800
committerDaniel A. Nagy <nagy.da@gmail.com>2015-05-11 18:47:14 +0800
commita9e1d38612cfde56c285a5de5b5bfe5326bdc9b5 (patch)
tree6c16d3e2b216fdf0027a477a8975c9052930e34a
parent1fe70a66ba2ef0f148affa7a72b4e65023474859 (diff)
parent5176fbc6faaa5e7f0305ad7f2b896c092781deaa (diff)
downloaddexon-a9e1d38612cfde56c285a5de5b5bfe5326bdc9b5.tar
dexon-a9e1d38612cfde56c285a5de5b5bfe5326bdc9b5.tar.gz
dexon-a9e1d38612cfde56c285a5de5b5bfe5326bdc9b5.tar.bz2
dexon-a9e1d38612cfde56c285a5de5b5bfe5326bdc9b5.tar.lz
dexon-a9e1d38612cfde56c285a5de5b5bfe5326bdc9b5.tar.xz
dexon-a9e1d38612cfde56c285a5de5b5bfe5326bdc9b5.tar.zst
dexon-a9e1d38612cfde56c285a5de5b5bfe5326bdc9b5.zip
Merge branch 'develop' of github.com:ethereum/go-ethereum into develop
Conflicts: rpc/jeth.go
-rw-r--r--cmd/geth/admin.go12
-rw-r--r--cmd/geth/main.go5
-rw-r--r--cmd/mist/main.go4
-rw-r--r--cmd/utils/flags.go15
-rw-r--r--common/size.go6
-rw-r--r--common/size_test.go14
-rw-r--r--core/events.go6
-rw-r--r--core/manager.go2
-rw-r--r--core/transaction_pool.go2
-rw-r--r--eth/backend.go26
-rw-r--r--eth/downloader/downloader.go62
-rw-r--r--eth/downloader/downloader_test.go43
-rw-r--r--eth/sync.go3
-rw-r--r--ethdb/database.go5
-rw-r--r--miner/miner.go13
-rw-r--r--miner/worker.go198
-rw-r--r--rpc/api.go33
-rw-r--r--rpc/http.go2
-rw-r--r--rpc/jeth.go2
-rw-r--r--rpc/types.go16
-rw-r--r--tests/block_test.go2
-rw-r--r--xeth/xeth.go4
22 files changed, 372 insertions, 103 deletions
diff --git a/cmd/geth/admin.go b/cmd/geth/admin.go
index 49e2dc6f8..2b9956638 100644
--- a/cmd/geth/admin.go
+++ b/cmd/geth/admin.go
@@ -70,6 +70,7 @@ func (js *jsre) adminBindings() {
miner.Set("stop", js.stopMining)
miner.Set("hashrate", js.hashrate)
miner.Set("setExtra", js.setExtra)
+ miner.Set("setGasPrice", js.setGasPrice)
admin.Set("debug", struct{}{})
t, _ = admin.Get("debug")
@@ -236,6 +237,17 @@ func (js *jsre) setExtra(call otto.FunctionCall) otto.Value {
return otto.UndefinedValue()
}
+func (js *jsre) setGasPrice(call otto.FunctionCall) otto.Value {
+ gasPrice, err := call.Argument(0).ToString()
+ if err != nil {
+ fmt.Println(err)
+ return otto.UndefinedValue()
+ }
+
+ js.ethereum.Miner().SetGasPrice(common.String2Big(gasPrice))
+ return otto.UndefinedValue()
+}
+
func (js *jsre) hashrate(otto.FunctionCall) otto.Value {
return js.re.ToVal(js.ethereum.Miner().HashRate())
}
diff --git a/cmd/geth/main.go b/cmd/geth/main.go
index fd6925e6d..fd7aae4c2 100644
--- a/cmd/geth/main.go
+++ b/cmd/geth/main.go
@@ -51,7 +51,7 @@ import _ "net/http/pprof"
const (
ClientIdentifier = "Geth"
- Version = "0.9.17"
+ Version = "0.9.19"
)
var (
@@ -244,6 +244,7 @@ JavaScript API. See https://github.com/ethereum/go-ethereum/wiki/Javascipt-Conso
utils.MaxPeersFlag,
utils.MaxPendingPeersFlag,
utils.EtherbaseFlag,
+ utils.GasPriceFlag,
utils.MinerThreadsFlag,
utils.MiningEnabledFlag,
utils.NATFlag,
@@ -258,7 +259,7 @@ JavaScript API. See https://github.com/ethereum/go-ethereum/wiki/Javascipt-Conso
utils.ProtocolVersionFlag,
utils.NetworkIdFlag,
utils.RPCCORSDomainFlag,
- utils.LogLevelFlag,
+ utils.VerbosityFlag,
utils.BacktraceAtFlag,
utils.LogToStdErrFlag,
utils.LogVModuleFlag,
diff --git a/cmd/mist/main.go b/cmd/mist/main.go
index 9d92cc175..4b55b3026 100644
--- a/cmd/mist/main.go
+++ b/cmd/mist/main.go
@@ -37,7 +37,7 @@ import (
const (
ClientIdentifier = "Mist"
- Version = "0.9.0"
+ Version = "0.9.19"
)
var (
@@ -73,7 +73,7 @@ func init() {
utils.DataDirFlag,
utils.ListenPortFlag,
utils.LogFileFlag,
- utils.LogLevelFlag,
+ utils.VerbosityFlag,
utils.MaxPeersFlag,
utils.MaxPendingPeersFlag,
utils.MinerThreadsFlag,
diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go
index b18d9851f..dd3b6c8a2 100644
--- a/cmd/utils/flags.go
+++ b/cmd/utils/flags.go
@@ -4,6 +4,7 @@ import (
"crypto/ecdsa"
"fmt"
"log"
+ "math/big"
"net/http"
"os"
"path"
@@ -116,6 +117,11 @@ var (
Usage: "Public address for block mining rewards. By default the address of your primary account is used",
Value: "primary",
}
+ GasPriceFlag = cli.StringFlag{
+ Name: "gasprice",
+ Usage: "Sets the minimal gasprice when mining transactions",
+ Value: new(big.Int).Mul(big.NewInt(10), common.Szabo).String(),
+ }
UnlockedAccountFlag = cli.StringFlag{
Name: "unlock",
@@ -133,8 +139,8 @@ var (
Name: "logfile",
Usage: "Send log output to a file",
}
- LogLevelFlag = cli.IntFlag{
- Name: "loglevel",
+ VerbosityFlag = cli.IntFlag{
+ Name: "verbosity",
Usage: "Logging verbosity: 0-6 (0=silent, 1=error, 2=warn, 3=info, 4=core, 5=debug, 6=debug detail)",
Value: int(logger.InfoLevel),
}
@@ -270,7 +276,7 @@ func GetNodeKey(ctx *cli.Context) (key *ecdsa.PrivateKey) {
func MakeEthConfig(clientID, version string, ctx *cli.Context) *eth.Config {
// Set verbosity on glog
- glog.SetV(ctx.GlobalInt(LogLevelFlag.Name))
+ glog.SetV(ctx.GlobalInt(VerbosityFlag.Name))
// Set the log type
//glog.SetToStderr(ctx.GlobalBool(LogToStdErrFlag.Name))
glog.SetToStderr(true)
@@ -290,7 +296,7 @@ func MakeEthConfig(clientID, version string, ctx *cli.Context) *eth.Config {
SkipBcVersionCheck: false,
NetworkId: ctx.GlobalInt(NetworkIdFlag.Name),
LogFile: ctx.GlobalString(LogFileFlag.Name),
- LogLevel: ctx.GlobalInt(LogLevelFlag.Name),
+ Verbosity: ctx.GlobalInt(VerbosityFlag.Name),
LogJSON: ctx.GlobalString(LogJSONFlag.Name),
Etherbase: ctx.GlobalString(EtherbaseFlag.Name),
MinerThreads: ctx.GlobalInt(MinerThreadsFlag.Name),
@@ -305,6 +311,7 @@ func MakeEthConfig(clientID, version string, ctx *cli.Context) *eth.Config {
Shh: ctx.GlobalBool(WhisperEnabledFlag.Name),
Dial: true,
BootNodes: ctx.GlobalString(BootnodesFlag.Name),
+ GasPrice: common.String2Big(ctx.GlobalString(GasPriceFlag.Name)),
}
}
diff --git a/common/size.go b/common/size.go
index 0d9dbf558..4ea7f7b11 100644
--- a/common/size.go
+++ b/common/size.go
@@ -44,12 +44,6 @@ func CurrencyToString(num *big.Int) string {
)
switch {
- case num.Cmp(Douglas) >= 0:
- fin = new(big.Int).Div(num, Douglas)
- denom = "Douglas"
- case num.Cmp(Einstein) >= 0:
- fin = new(big.Int).Div(num, Einstein)
- denom = "Einstein"
case num.Cmp(Ether) >= 0:
fin = new(big.Int).Div(num, Ether)
denom = "Ether"
diff --git a/common/size_test.go b/common/size_test.go
index 1cbeff0a8..cfe7efe31 100644
--- a/common/size_test.go
+++ b/common/size_test.go
@@ -25,8 +25,6 @@ func (s *SizeSuite) TestStorageSizeString(c *checker.C) {
}
func (s *CommonSuite) TestCommon(c *checker.C) {
- douglas := CurrencyToString(BigPow(10, 43))
- einstein := CurrencyToString(BigPow(10, 22))
ether := CurrencyToString(BigPow(10, 19))
finney := CurrencyToString(BigPow(10, 16))
szabo := CurrencyToString(BigPow(10, 13))
@@ -35,8 +33,6 @@ func (s *CommonSuite) TestCommon(c *checker.C) {
ada := CurrencyToString(BigPow(10, 4))
wei := CurrencyToString(big.NewInt(10))
- c.Assert(douglas, checker.Equals, "10 Douglas")
- c.Assert(einstein, checker.Equals, "10 Einstein")
c.Assert(ether, checker.Equals, "10 Ether")
c.Assert(finney, checker.Equals, "10 Finney")
c.Assert(szabo, checker.Equals, "10 Szabo")
@@ -45,13 +41,3 @@ func (s *CommonSuite) TestCommon(c *checker.C) {
c.Assert(ada, checker.Equals, "10 Ada")
c.Assert(wei, checker.Equals, "10 Wei")
}
-
-func (s *CommonSuite) TestLarge(c *checker.C) {
- douglaslarge := CurrencyToString(BigPow(100000000, 43))
- adalarge := CurrencyToString(BigPow(100000000, 4))
- weilarge := CurrencyToString(big.NewInt(100000000))
-
- c.Assert(douglaslarge, checker.Equals, "10000E298 Douglas")
- c.Assert(adalarge, checker.Equals, "10000E7 Einstein")
- c.Assert(weilarge, checker.Equals, "100 Babbage")
-}
diff --git a/core/events.go b/core/events.go
index 3da668af5..1ea35c2f4 100644
--- a/core/events.go
+++ b/core/events.go
@@ -1,8 +1,10 @@
package core
import (
- "github.com/ethereum/go-ethereum/core/types"
+ "math/big"
+
"github.com/ethereum/go-ethereum/core/state"
+ "github.com/ethereum/go-ethereum/core/types"
)
// TxPreEvent is posted when a transaction enters the transaction pool.
@@ -44,6 +46,8 @@ type ChainUncleEvent struct {
type ChainHeadEvent struct{ Block *types.Block }
+type GasPriceChanged struct{ Price *big.Int }
+
// Mining operation events
type StartMining struct{}
type TopMining struct{}
diff --git a/core/manager.go b/core/manager.go
index 9b5407a9e..433ada7ee 100644
--- a/core/manager.go
+++ b/core/manager.go
@@ -1,12 +1,14 @@
package core
import (
+ "github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/p2p"
)
type Backend interface {
+ AccountManager() *accounts.Manager
BlockProcessor() *BlockProcessor
ChainManager() *ChainManager
TxPool() *TxPool
diff --git a/core/transaction_pool.go b/core/transaction_pool.go
index 6898a4bda..e68f7406a 100644
--- a/core/transaction_pool.go
+++ b/core/transaction_pool.go
@@ -21,7 +21,7 @@ var (
ErrInvalidSender = errors.New("Invalid sender")
ErrNonce = errors.New("Nonce too low")
ErrBalance = errors.New("Insufficient balance")
- ErrNonExistentAccount = errors.New("Account does not exist")
+ ErrNonExistentAccount = errors.New("Account does not exist or account balance too low")
ErrInsufficientFunds = errors.New("Insufficient funds for gas * price + value")
ErrIntrinsicGas = errors.New("Intrinsic gas too low")
ErrGasLimit = errors.New("Exceeds block gas limit")
diff --git a/eth/backend.go b/eth/backend.go
index 0f23cde2f..cdbe35b26 100644
--- a/eth/backend.go
+++ b/eth/backend.go
@@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
+ "math/big"
"os"
"path"
"path/filepath"
@@ -53,12 +54,12 @@ type Config struct {
BlockChainVersion int
SkipBcVersionCheck bool // e.g. blockchain export
- DataDir string
- LogFile string
- LogLevel int
- LogJSON string
- VmDebug bool
- NatSpec bool
+ DataDir string
+ LogFile string
+ Verbosity int
+ LogJSON string
+ VmDebug bool
+ NatSpec bool
MaxPeers int
MaxPendingPeers int
@@ -76,6 +77,7 @@ type Config struct {
Dial bool
Etherbase string
+ GasPrice *big.Int
MinerThreads int
AccountManager *accounts.Manager
@@ -200,7 +202,7 @@ type Ethereum struct {
func New(config *Config) (*Ethereum, error) {
// Bootstrap database
- logger.New(config.DataDir, config.LogFile, config.LogLevel)
+ logger.New(config.DataDir, config.LogFile, config.Verbosity)
if len(config.LogJSON) > 0 {
logger.NewJSONsystem(config.DataDir, config.LogJSON)
}
@@ -266,6 +268,8 @@ func New(config *Config) (*Ethereum, error) {
eth.blockProcessor = core.NewBlockProcessor(stateDb, extraDb, eth.pow, eth.txPool, eth.chainManager, eth.EventMux())
eth.chainManager.SetProcessor(eth.blockProcessor)
eth.miner = miner.New(eth, eth.pow, config.MinerThreads)
+ eth.miner.SetGasPrice(config.GasPrice)
+
eth.protocolManager = NewProtocolManager(config.ProtocolVersion, config.NetworkId, eth.eventMux, eth.txPool, eth.chainManager, eth.downloader)
if config.Shh {
eth.whisper = whisper.New()
@@ -447,6 +451,8 @@ func (s *Ethereum) Start() error {
return nil
}
+// sync databases every minute. If flushing fails we exit immediatly. The system
+// may not continue under any circumstances.
func (s *Ethereum) syncDatabases() {
ticker := time.NewTicker(1 * time.Minute)
done:
@@ -455,13 +461,13 @@ done:
case <-ticker.C:
// don't change the order of database flushes
if err := s.extraDb.Flush(); err != nil {
- glog.V(logger.Error).Infof("error: flush extraDb: %v\n", err)
+ glog.Fatalf("fatal error: flush extraDb: %v\n", err)
}
if err := s.stateDb.Flush(); err != nil {
- glog.V(logger.Error).Infof("error: flush stateDb: %v\n", err)
+ glog.Fatalf("fatal error: flush stateDb: %v\n", err)
}
if err := s.blockDb.Flush(); err != nil {
- glog.V(logger.Error).Infof("error: flush blockDb: %v\n", err)
+ glog.Fatalf("fatal error: flush blockDb: %v\n", err)
}
case <-s.shutdownChan:
break done
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go
index 18f8d2ba8..14ca2cd3d 100644
--- a/eth/downloader/downloader.go
+++ b/eth/downloader/downloader.go
@@ -34,6 +34,9 @@ var (
errPeersUnavailable = errors.New("no peers available or all peers tried for block download process")
errAlreadyInPool = errors.New("hash already in pool")
errBlockNumberOverflow = errors.New("received block which overflows")
+ errCancelHashFetch = errors.New("hash fetching cancelled (requested)")
+ errCancelBlockFetch = errors.New("block downloading cancelled (requested)")
+ errNoSyncActive = errors.New("no sync active")
)
type hashCheckFn func(common.Hash) bool
@@ -74,6 +77,7 @@ type Downloader struct {
newPeerCh chan *peer
hashCh chan hashPack
blockCh chan blockPack
+ cancelCh chan struct{}
}
func New(hasBlock hashCheckFn, getBlock getBlockFn) *Downloader {
@@ -129,6 +133,9 @@ func (d *Downloader) Synchronise(id string, hash common.Hash) error {
}
defer atomic.StoreInt32(&d.synchronising, 0)
+ // Create cancel channel for aborting midflight
+ d.cancelCh = make(chan struct{})
+
// Abort if the queue still contains some leftover data
if _, cached := d.queue.Size(); cached > 0 && d.queue.GetHeadBlock() != nil {
return errPendingQueue
@@ -161,7 +168,6 @@ func (d *Downloader) Has(hash common.Hash) bool {
}
func (d *Downloader) getFromPeer(p *peer, hash common.Hash, ignoreInitial bool) (err error) {
-
d.activePeer = p.id
defer func() {
// reset on error
@@ -191,6 +197,42 @@ func (d *Downloader) getFromPeer(p *peer, hash common.Hash, ignoreInitial bool)
return nil
}
+// Cancel cancels all of the operations and resets the queue. It returns true
+// if the cancel operation was completed.
+func (d *Downloader) Cancel() bool {
+ hs, bs := d.queue.Size()
+ // If we're not syncing just return.
+ if atomic.LoadInt32(&d.synchronising) == 0 && hs == 0 && bs == 0 {
+ return false
+ }
+
+ close(d.cancelCh)
+
+ // clean up
+hashDone:
+ for {
+ select {
+ case <-d.hashCh:
+ default:
+ break hashDone
+ }
+ }
+
+blockDone:
+ for {
+ select {
+ case <-d.blockCh:
+ default:
+ break blockDone
+ }
+ }
+
+ // reset the queue
+ d.queue.Reset()
+
+ return true
+}
+
// XXX Make synchronous
func (d *Downloader) startFetchingHashes(p *peer, h common.Hash, ignoreInitial bool) error {
glog.V(logger.Debug).Infof("Downloading hashes (%x) from %s", h[:4], p.id)
@@ -217,6 +259,8 @@ func (d *Downloader) startFetchingHashes(p *peer, h common.Hash, ignoreInitial b
out:
for {
select {
+ case <-d.cancelCh:
+ return errCancelHashFetch
case hashPack := <-d.hashCh:
// Make sure the active peer is giving us the hashes
if hashPack.peerId != activePeer.id {
@@ -305,6 +349,8 @@ func (d *Downloader) startFetchingBlocks(p *peer) error {
out:
for {
select {
+ case <-d.cancelCh:
+ return errCancelBlockFetch
case blockPack := <-d.blockCh:
// If the peer was previously banned and failed to deliver it's pack
// in a reasonable time frame, ignore it's message.
@@ -394,11 +440,23 @@ out:
// Deliver a chunk to the downloader. This is usually done through the BlocksMsg by
// the protocol handler.
-func (d *Downloader) DeliverChunk(id string, blocks []*types.Block) {
+func (d *Downloader) DeliverChunk(id string, blocks []*types.Block) error {
+ // Make sure the downloader is active
+ if atomic.LoadInt32(&d.synchronising) == 0 {
+ return errNoSyncActive
+ }
+
d.blockCh <- blockPack{id, blocks}
+
+ return nil
}
func (d *Downloader) AddHashes(id string, hashes []common.Hash) error {
+ // Make sure the downloader is active
+ if atomic.LoadInt32(&d.synchronising) == 0 {
+ return errNoSyncActive
+ }
+
// make sure that the hashes that are being added are actually from the peer
// that's the current active peer. hashes that have been received from other
// peers are dropped and ignored.
diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go
index 8ccc4d1a5..d0f8d4c8f 100644
--- a/eth/downloader/downloader_test.go
+++ b/eth/downloader/downloader_test.go
@@ -182,6 +182,49 @@ func TestTaking(t *testing.T) {
}
}
+func TestInactiveDownloader(t *testing.T) {
+ targetBlocks := 1000
+ hashes := createHashes(0, targetBlocks)
+ blocks := createBlocksFromHashSet(createHashSet(hashes))
+ tester := newTester(t, hashes, nil)
+
+ err := tester.downloader.AddHashes("bad peer 001", hashes)
+ if err != errNoSyncActive {
+ t.Error("expected no sync error, got", err)
+ }
+
+ err = tester.downloader.DeliverChunk("bad peer 001", blocks)
+ if err != errNoSyncActive {
+ t.Error("expected no sync error, got", err)
+ }
+}
+
+func TestCancel(t *testing.T) {
+ minDesiredPeerCount = 4
+ blockTtl = 1 * time.Second
+
+ targetBlocks := 1000
+ hashes := createHashes(0, targetBlocks)
+ blocks := createBlocksFromHashes(hashes)
+ tester := newTester(t, hashes, blocks)
+
+ tester.newPeer("peer1", big.NewInt(10000), hashes[0])
+
+ err := tester.sync("peer1", hashes[0])
+ if err != nil {
+ t.Error("download error", err)
+ }
+
+ if !tester.downloader.Cancel() {
+ t.Error("cancel operation unsuccessfull")
+ }
+
+ hashSize, blockSize := tester.downloader.queue.Size()
+ if hashSize > 0 || blockSize > 0 {
+ t.Error("block (", blockSize, ") or hash (", hashSize, ") not 0")
+ }
+}
+
func TestThrottling(t *testing.T) {
minDesiredPeerCount = 4
blockTtl = 1 * time.Second
diff --git a/eth/sync.go b/eth/sync.go
index c49f5209d..d955eaa50 100644
--- a/eth/sync.go
+++ b/eth/sync.go
@@ -63,6 +63,9 @@ func (pm *ProtocolManager) processBlocks() error {
max := int(math.Min(float64(len(blocks)), float64(blockProcAmount)))
_, err := pm.chainman.InsertChain(blocks[:max])
if err != nil {
+ // cancel download process
+ pm.downloader.Cancel()
+
return err
}
blocks = blocks[max:]
diff --git a/ethdb/database.go b/ethdb/database.go
index 57a3f9ee6..15af02fdf 100644
--- a/ethdb/database.go
+++ b/ethdb/database.go
@@ -8,8 +8,11 @@ import (
"github.com/ethereum/go-ethereum/logger/glog"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/iterator"
+ "github.com/syndtr/goleveldb/leveldb/opt"
)
+const openFileLimit = 128
+
type LDBDatabase struct {
fn string
@@ -23,7 +26,7 @@ type LDBDatabase struct {
func NewLDBDatabase(file string) (*LDBDatabase, error) {
// Open the db
- db, err := leveldb.OpenFile(file, nil)
+ db, err := leveldb.OpenFile(file, &opt.Options{OpenFilesCacheCapacity: openFileLimit})
if err != nil {
return nil, err
}
diff --git a/miner/miner.go b/miner/miner.go
index bff0026dc..efe6d3051 100644
--- a/miner/miner.go
+++ b/miner/miner.go
@@ -7,6 +7,8 @@ import (
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/logger"
+ "github.com/ethereum/go-ethereum/logger/glog"
"github.com/ethereum/go-ethereum/pow"
)
@@ -37,7 +39,18 @@ func (self *Miner) Mining() bool {
return self.mining
}
+func (m *Miner) SetGasPrice(price *big.Int) {
+ // FIXME block tests set a nil gas price. Quick dirty fix
+ if price == nil {
+ return
+ }
+
+ m.worker.gasPrice = price
+}
+
func (self *Miner) Start(coinbase common.Address) {
+ glog.V(logger.Info).Infoln("Starting mining operation")
+
self.mining = true
self.worker.coinbase = coinbase
self.worker.start()
diff --git a/miner/worker.go b/miner/worker.go
index 87d17dfd6..e3dbae717 100644
--- a/miner/worker.go
+++ b/miner/worker.go
@@ -7,6 +7,7 @@ import (
"sync"
"sync/atomic"
+ "github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/state"
@@ -21,12 +22,18 @@ import (
var jsonlogger = logger.NewJsonLogger()
type environment struct {
- totalUsedGas *big.Int
- state *state.StateDB
- coinbase *state.StateObject
- block *types.Block
- family *set.Set
- uncles *set.Set
+ totalUsedGas *big.Int
+ state *state.StateDB
+ coinbase *state.StateObject
+ block *types.Block
+ family *set.Set
+ uncles *set.Set
+ remove *set.Set
+ tcount int
+ ignoredTransactors *set.Set
+ lowGasTransactors *set.Set
+ ownedAccounts *set.Set
+ lowGasTxs types.Transactions
}
func env(block *types.Block, eth core.Backend) *environment {
@@ -72,6 +79,7 @@ type worker struct {
proc *core.BlockProcessor
coinbase common.Address
+ gasPrice *big.Int
extra []byte
currentMu sync.Mutex
@@ -93,6 +101,7 @@ func newWorker(coinbase common.Address, eth core.Backend) *worker {
eth: eth,
mux: eth.EventMux(),
recv: make(chan *types.Block),
+ gasPrice: new(big.Int),
chain: eth.ChainManager(),
proc: eth.BlockProcessor(),
possibleUncles: make(map[common.Hash]*types.Block),
@@ -123,15 +132,22 @@ func (self *worker) pendingBlock() *types.Block {
}
func (self *worker) start() {
+ self.mu.Lock()
+ defer self.mu.Unlock()
+
+ atomic.StoreInt32(&self.mining, 1)
+
// spin up agents
for _, agent := range self.agents {
agent.Start()
}
- atomic.StoreInt32(&self.mining, 1)
}
func (self *worker) stop() {
+ self.mu.Lock()
+ defer self.mu.Unlock()
+
if atomic.LoadInt32(&self.mining) == 1 {
// stop all agents
for _, agent := range self.agents {
@@ -144,6 +160,9 @@ func (self *worker) stop() {
}
func (self *worker) register(agent Agent) {
+ self.mu.Lock()
+ defer self.mu.Unlock()
+
self.agents = append(self.agents, agent)
agent.SetReturnCh(self.recv)
}
@@ -163,8 +182,11 @@ out:
self.possibleUncles[ev.Block.Hash()] = ev.Block
self.uncleMu.Unlock()
case core.TxPreEvent:
+ // Apply transaction to the pending state if we're not mining
if atomic.LoadInt32(&self.mining) == 0 {
- self.commitNewWork()
+ self.mu.Lock()
+ self.commitTransactions(types.Transactions{ev.Tx})
+ self.mu.Unlock()
}
}
case <-self.quit:
@@ -230,13 +252,33 @@ func (self *worker) makeCurrent() {
}
block.Header().Extra = self.extra
- self.current = env(block, self.eth)
+ current := env(block, self.eth)
for _, ancestor := range self.chain.GetAncestors(block, 7) {
- self.current.family.Add(ancestor.Hash())
+ current.family.Add(ancestor.Hash())
}
+ accounts, _ := self.eth.AccountManager().Accounts()
+ // Keep track of transactions which return errors so they can be removed
+ current.remove = set.New()
+ current.tcount = 0
+ current.ignoredTransactors = set.New()
+ current.lowGasTransactors = set.New()
+ current.ownedAccounts = accountAddressesSet(accounts)
- parent := self.chain.GetBlock(self.current.block.ParentHash())
- self.current.coinbase.SetGasPool(core.CalcGasLimit(parent))
+ parent := self.chain.GetBlock(current.block.ParentHash())
+ current.coinbase.SetGasPool(core.CalcGasLimit(parent))
+
+ self.current = current
+}
+
+func (w *worker) setGasPrice(p *big.Int) {
+ w.mu.Lock()
+ defer w.mu.Unlock()
+
+ // calculate the minimal gas price the miner accepts when sorting out transactions.
+ const pct = int64(90)
+ w.gasPrice = gasprice(p, pct)
+
+ w.mux.Post(core.GasPriceChanged{w.gasPrice})
}
func (self *worker) commitNewWork() {
@@ -248,54 +290,14 @@ func (self *worker) commitNewWork() {
defer self.currentMu.Unlock()
self.makeCurrent()
+ current := self.current
transactions := self.eth.TxPool().GetTransactions()
sort.Sort(types.TxByNonce{transactions})
- // Keep track of transactions which return errors so they can be removed
- var (
- remove = set.New()
- tcount = 0
- ignoredTransactors = set.New()
- )
-
- for _, tx := range transactions {
- // We can skip err. It has already been validated in the tx pool
- from, _ := tx.From()
- // Move on to the next transaction when the transactor is in ignored transactions set
- // This may occur when a transaction hits the gas limit. When a gas limit is hit and
- // the transaction is processed (that could potentially be included in the block) it
- // will throw a nonce error because the previous transaction hasn't been processed.
- // Therefor we need to ignore any transaction after the ignored one.
- if ignoredTransactors.Has(from) {
- continue
- }
-
- self.current.state.StartRecord(tx.Hash(), common.Hash{}, 0)
-
- err := self.commitTransaction(tx)
- switch {
- case core.IsNonceErr(err) || core.IsInvalidTxErr(err):
- // Remove invalid transactions
- from, _ := tx.From()
-
- self.chain.TxState().RemoveNonce(from, tx.Nonce())
- remove.Add(tx.Hash())
-
- if glog.V(logger.Detail) {
- glog.Infof("TX (%x) failed, will be removed: %v\n", tx.Hash().Bytes()[:4], err)
- }
- case state.IsGasLimitErr(err):
- from, _ := tx.From()
- // ignore the transactor so no nonce errors will be thrown for this account
- // next time the worker is run, they'll be picked up again.
- ignoredTransactors.Add(from)
-
- glog.V(logger.Detail).Infof("Gas limit reached for (%x) in this block. Continue to try smaller txs\n", from[:4])
- default:
- tcount++
- }
- }
+ // commit transactions for this run
+ self.commitTransactions(transactions)
+ self.eth.TxPool().RemoveTransactions(current.lowGasTxs)
var (
uncles []*types.Header
@@ -321,7 +323,7 @@ func (self *worker) commitNewWork() {
// We only care about logging if we're actually mining
if atomic.LoadInt32(&self.mining) == 1 {
- glog.V(logger.Info).Infof("commit new work on block %v with %d txs & %d uncles\n", self.current.block.Number(), tcount, len(uncles))
+ glog.V(logger.Info).Infof("commit new work on block %v with %d txs & %d uncles\n", current.block.Number(), current.tcount, len(uncles))
}
for _, hash := range badUncles {
@@ -361,6 +363,71 @@ func (self *worker) commitUncle(uncle *types.Header) error {
return nil
}
+func (self *worker) commitTransactions(transactions types.Transactions) {
+ current := self.current
+
+ for _, tx := range transactions {
+ // We can skip err. It has already been validated in the tx pool
+ from, _ := tx.From()
+
+ // check if it falls within margin
+ if tx.GasPrice().Cmp(self.gasPrice) < 0 {
+ // ignore the transaction and transactor. We ignore the transactor
+ // because nonce will fail after ignoring this transaction so there's
+ // no point
+ current.lowGasTransactors.Add(from)
+
+ glog.V(logger.Info).Infof("transaction(%x) below gas price (tx=%v ask=%v). All sequential txs from this address(%x) will be ignored\n", tx.Hash().Bytes()[:4], common.CurrencyToString(tx.GasPrice()), common.CurrencyToString(self.gasPrice), from[:4])
+ }
+
+ // Continue with the next transaction if the transaction sender is included in
+ // the low gas tx set. This will also remove the tx and all sequential transaction
+ // from this transactor
+ if current.lowGasTransactors.Has(from) {
+ // add tx to the low gas set. This will be removed at the end of the run
+ // owned accounts are ignored
+ if !current.ownedAccounts.Has(from) {
+ current.lowGasTxs = append(current.lowGasTxs, tx)
+ }
+ continue
+ }
+
+ // Move on to the next transaction when the transactor is in ignored transactions set
+ // This may occur when a transaction hits the gas limit. When a gas limit is hit and
+ // the transaction is processed (that could potentially be included in the block) it
+ // will throw a nonce error because the previous transaction hasn't been processed.
+ // Therefor we need to ignore any transaction after the ignored one.
+ if current.ignoredTransactors.Has(from) {
+ continue
+ }
+
+ self.current.state.StartRecord(tx.Hash(), common.Hash{}, 0)
+
+ err := self.commitTransaction(tx)
+ switch {
+ case core.IsNonceErr(err) || core.IsInvalidTxErr(err):
+ // Remove invalid transactions
+ from, _ := tx.From()
+
+ self.chain.TxState().RemoveNonce(from, tx.Nonce())
+ current.remove.Add(tx.Hash())
+
+ if glog.V(logger.Detail) {
+ glog.Infof("TX (%x) failed, will be removed: %v\n", tx.Hash().Bytes()[:4], err)
+ }
+ case state.IsGasLimitErr(err):
+ from, _ := tx.From()
+ // ignore the transactor so no nonce errors will be thrown for this account
+ // next time the worker is run, they'll be picked up again.
+ current.ignoredTransactors.Add(from)
+
+ glog.V(logger.Detail).Infof("Gas limit reached for (%x) in this block. Continue to try smaller txs\n", from[:4])
+ default:
+ current.tcount++
+ }
+ }
+}
+
func (self *worker) commitTransaction(tx *types.Transaction) error {
snap := self.current.state.Copy()
receipt, _, err := self.proc.ApplyTransaction(self.current.coinbase, self.current.state, self.current.block, tx, self.current.totalUsedGas, true)
@@ -383,3 +450,20 @@ func (self *worker) HashRate() int64 {
return tot
}
+
+// gasprice calculates a reduced gas price based on the pct
+// XXX Use big.Rat?
+func gasprice(price *big.Int, pct int64) *big.Int {
+ p := new(big.Int).Set(price)
+ p.Div(p, big.NewInt(100))
+ p.Mul(p, big.NewInt(pct))
+ return p
+}
+
+func accountAddressesSet(accounts []accounts.Account) *set.Set {
+ accountSet := set.New()
+ for _, account := range accounts {
+ accountSet.Add(common.BytesToAddress(account.Address))
+ }
+ return accountSet
+}
diff --git a/rpc/api.go b/rpc/api.go
index 7fab589f2..309c161ad 100644
--- a/rpc/api.go
+++ b/rpc/api.go
@@ -450,10 +450,18 @@ func (api *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) err
*reply = newHexData(res)
case "shh_version":
+ // Short circuit if whisper is not running
+ if api.xeth().Whisper() == nil {
+ return NewNotAvailableError(req.Method, "whisper offline")
+ }
// Retrieves the currently running whisper protocol version
*reply = api.xeth().WhisperVersion()
case "shh_post":
+ // Short circuit if whisper is not running
+ if api.xeth().Whisper() == nil {
+ return NewNotAvailableError(req.Method, "whisper offline")
+ }
// Injects a new message into the whisper network
args := new(WhisperMessageArgs)
if err := json.Unmarshal(req.Params, &args); err != nil {
@@ -466,10 +474,18 @@ func (api *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) err
*reply = true
case "shh_newIdentity":
+ // Short circuit if whisper is not running
+ if api.xeth().Whisper() == nil {
+ return NewNotAvailableError(req.Method, "whisper offline")
+ }
// Creates a new whisper identity to use for sending/receiving messages
*reply = api.xeth().Whisper().NewIdentity()
case "shh_hasIdentity":
+ // Short circuit if whisper is not running
+ if api.xeth().Whisper() == nil {
+ return NewNotAvailableError(req.Method, "whisper offline")
+ }
// Checks if an identity if owned or not
args := new(WhisperIdentityArgs)
if err := json.Unmarshal(req.Params, &args); err != nil {
@@ -478,6 +494,10 @@ func (api *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) err
*reply = api.xeth().Whisper().HasIdentity(args.Identity)
case "shh_newFilter":
+ // Short circuit if whisper is not running
+ if api.xeth().Whisper() == nil {
+ return NewNotAvailableError(req.Method, "whisper offline")
+ }
// Create a new filter to watch and match messages with
args := new(WhisperFilterArgs)
if err := json.Unmarshal(req.Params, &args); err != nil {
@@ -487,6 +507,10 @@ func (api *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) err
*reply = newHexNum(big.NewInt(int64(id)).Bytes())
case "shh_uninstallFilter":
+ // Short circuit if whisper is not running
+ if api.xeth().Whisper() == nil {
+ return NewNotAvailableError(req.Method, "whisper offline")
+ }
// Remove an existing filter watching messages
args := new(FilterIdArgs)
if err := json.Unmarshal(req.Params, &args); err != nil {
@@ -495,6 +519,10 @@ func (api *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) err
*reply = api.xeth().UninstallWhisperFilter(args.Id)
case "shh_getFilterChanges":
+ // Short circuit if whisper is not running
+ if api.xeth().Whisper() == nil {
+ return NewNotAvailableError(req.Method, "whisper offline")
+ }
// Retrieve all the new messages arrived since the last request
args := new(FilterIdArgs)
if err := json.Unmarshal(req.Params, &args); err != nil {
@@ -503,12 +531,17 @@ func (api *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) err
*reply = api.xeth().WhisperMessagesChanged(args.Id)
case "shh_getMessages":
+ // Short circuit if whisper is not running
+ if api.xeth().Whisper() == nil {
+ return NewNotAvailableError(req.Method, "whisper offline")
+ }
// Retrieve all the cached messages matching a specific, existing filter
args := new(FilterIdArgs)
if err := json.Unmarshal(req.Params, &args); err != nil {
return err
}
*reply = api.xeth().WhisperMessages(args.Id)
+
case "eth_hashrate":
*reply = newHexNum(api.xeth().HashRate())
diff --git a/rpc/http.go b/rpc/http.go
index 4760601d8..c5bb10c80 100644
--- a/rpc/http.go
+++ b/rpc/http.go
@@ -116,7 +116,7 @@ func RpcResponse(api *EthereumApi, request *RpcRequest) *interface{} {
switch reserr.(type) {
case nil:
response = &RpcSuccessResponse{Jsonrpc: jsonrpcver, Id: request.Id, Result: reply}
- case *NotImplementedError:
+ case *NotImplementedError, *NotAvailableError:
jsonerr := &RpcErrorObject{-32601, reserr.Error()}
response = &RpcErrorResponse{Jsonrpc: jsonrpcver, Id: request.Id, Error: jsonerr}
case *DecodeParamError, *InsufficientParamsError, *ValidationError, *InvalidTypeError:
diff --git a/rpc/jeth.go b/rpc/jeth.go
index ad52b72d7..2097ac30d 100644
--- a/rpc/jeth.go
+++ b/rpc/jeth.go
@@ -2,6 +2,7 @@ package rpc
import (
"encoding/json"
+ "fmt"
"github.com/ethereum/go-ethereum/jsre"
"github.com/robertkrimen/otto"
)
@@ -50,6 +51,7 @@ func (self *Jeth) Send(call otto.FunctionCall) (response otto.Value) {
var respif interface{}
err = self.ethApi.GetRequestReply(&req, &respif)
if err != nil {
+ fmt.Println("Error response:", err)
return self.err(call, -32603, err.Error(), req.Id)
}
call.Otto.Set("ret_jsonrpc", jsonrpcver)
diff --git a/rpc/types.go b/rpc/types.go
index 1784759a4..e6eb4f856 100644
--- a/rpc/types.go
+++ b/rpc/types.go
@@ -209,6 +209,22 @@ func NewNotImplementedError(method string) *NotImplementedError {
}
}
+type NotAvailableError struct {
+ Method string
+ Reason string
+}
+
+func (e *NotAvailableError) Error() string {
+ return fmt.Sprintf("%s method not available: %s", e.Method, e.Reason)
+}
+
+func NewNotAvailableError(method string, reason string) *NotAvailableError {
+ return &NotAvailableError{
+ Method: method,
+ Reason: reason,
+ }
+}
+
type DecodeParamError struct {
err string
}
diff --git a/tests/block_test.go b/tests/block_test.go
index 79e3335b1..e72f2b548 100644
--- a/tests/block_test.go
+++ b/tests/block_test.go
@@ -103,7 +103,7 @@ func testEthConfig() *eth.Config {
return &eth.Config{
DataDir: common.DefaultDataDir(),
- LogLevel: 5,
+ Verbosity: 5,
Etherbase: "primary",
AccountManager: accounts.NewManager(ks),
NewDB: func(path string) (common.Database, error) { return ethdb.NewMemDatabase() },
diff --git a/xeth/xeth.go b/xeth/xeth.go
index dc2d4f06f..06cd9dc1b 100644
--- a/xeth/xeth.go
+++ b/xeth/xeth.go
@@ -79,7 +79,6 @@ func New(eth *eth.Ethereum, frontend Frontend) *XEth {
xeth := &XEth{
backend: eth,
frontend: frontend,
- whisper: NewWhisper(eth.Whisper()),
quit: make(chan struct{}),
filterManager: filter.NewFilterManager(eth.EventMux()),
logQueue: make(map[int]*logQueue),
@@ -88,6 +87,9 @@ func New(eth *eth.Ethereum, frontend Frontend) *XEth {
messages: make(map[int]*whisperFilter),
agent: miner.NewRemoteAgent(),
}
+ if eth.Whisper() != nil {
+ xeth.whisper = NewWhisper(eth.Whisper())
+ }
eth.Miner().Register(xeth.agent)
if frontend == nil {
xeth.frontend = dummyFrontend{}