diff options
Diffstat (limited to 'miner')
-rw-r--r-- | miner/agent.go | 98 | ||||
-rw-r--r-- | miner/miner.go | 63 | ||||
-rw-r--r-- | miner/remote_agent.go | 85 | ||||
-rw-r--r-- | miner/worker.go | 327 |
4 files changed, 573 insertions, 0 deletions
diff --git a/miner/agent.go b/miner/agent.go new file mode 100644 index 000000000..c650fa2f3 --- /dev/null +++ b/miner/agent.go @@ -0,0 +1,98 @@ +package miner + +import ( + "sync" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/pow" +) + +type CpuMiner struct { + chMu sync.Mutex + c chan *types.Block + quit chan struct{} + quitCurrentOp chan struct{} + returnCh chan<- *types.Block + + index int + pow pow.PoW +} + +func NewCpuMiner(index int, pow pow.PoW) *CpuMiner { + miner := &CpuMiner{ + pow: pow, + index: index, + } + + return miner +} + +func (self *CpuMiner) Work() chan<- *types.Block { return self.c } +func (self *CpuMiner) Pow() pow.PoW { return self.pow } +func (self *CpuMiner) SetReturnCh(ch chan<- *types.Block) { self.returnCh = ch } + +func (self *CpuMiner) Stop() { + close(self.quit) + close(self.quitCurrentOp) +} + +func (self *CpuMiner) Start() { + self.quit = make(chan struct{}) + self.quitCurrentOp = make(chan struct{}, 1) + self.c = make(chan *types.Block, 1) + + go self.update() +} + +func (self *CpuMiner) update() { +out: + for { + select { + case block := <-self.c: + self.chMu.Lock() + self.quitCurrentOp <- struct{}{} + self.chMu.Unlock() + + go self.mine(block) + case <-self.quit: + break out + } + } + + close(self.quitCurrentOp) +done: + // Empty channel + for { + select { + case <-self.c: + default: + close(self.c) + + break done + } + } +} + +func (self *CpuMiner) mine(block *types.Block) { + minerlogger.Debugf("(re)started agent[%d]. mining...\n", self.index) + + // Reset the channel + self.chMu.Lock() + self.quitCurrentOp = make(chan struct{}, 1) + self.chMu.Unlock() + + // Mine + nonce, mixDigest, _ := self.pow.Search(block, self.quitCurrentOp) + if nonce != 0 { + block.SetNonce(nonce) + block.Header().MixDigest = common.BytesToHash(mixDigest) + self.returnCh <- block + } else { + self.returnCh <- nil + } +} + +func (self *CpuMiner) GetHashRate() int64 { + return self.pow.GetHashrate() +} diff --git a/miner/miner.go b/miner/miner.go new file mode 100644 index 000000000..cf84c11f3 --- /dev/null +++ b/miner/miner.go @@ -0,0 +1,63 @@ +package miner + +import ( + "math/big" + + "github.com/ethereum/ethash" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/pow" +) + +var minerlogger = logger.NewLogger("MINER") + +type Miner struct { + worker *worker + + MinAcceptedGasPrice *big.Int + Extra string + + mining bool + eth core.Backend + pow pow.PoW +} + +func New(eth core.Backend, pow pow.PoW, minerThreads int) *Miner { + // note: minerThreads is currently ignored because + // ethash is not thread safe. + miner := &Miner{eth: eth, pow: pow, worker: newWorker(common.Address{}, eth)} + for i := 0; i < minerThreads; i++ { + miner.worker.register(NewCpuMiner(i, pow)) + } + return miner +} + +func (self *Miner) Mining() bool { + return self.mining +} + +func (self *Miner) Start(coinbase common.Address) { + self.mining = true + self.worker.coinbase = coinbase + + self.pow.(*ethash.Ethash).UpdateDAG() + + self.worker.start() + self.worker.commitNewWork() +} + +func (self *Miner) Register(agent Agent) { + self.worker.register(agent) +} + +func (self *Miner) Stop() { + self.mining = false + self.worker.stop() + + //self.pow.(*ethash.Ethash).Stop() +} + +func (self *Miner) HashRate() int64 { + return self.worker.HashRate() +} diff --git a/miner/remote_agent.go b/miner/remote_agent.go new file mode 100644 index 000000000..aa04a58aa --- /dev/null +++ b/miner/remote_agent.go @@ -0,0 +1,85 @@ +package miner + +import ( + "github.com/ethereum/ethash" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" +) + +type RemoteAgent struct { + work *types.Block + currentWork *types.Block + + quit chan struct{} + workCh chan *types.Block + returnCh chan<- *types.Block +} + +func NewRemoteAgent() *RemoteAgent { + agent := &RemoteAgent{} + + return agent +} + +func (a *RemoteAgent) Work() chan<- *types.Block { + return a.workCh +} + +func (a *RemoteAgent) SetReturnCh(returnCh chan<- *types.Block) { + a.returnCh = returnCh +} + +func (a *RemoteAgent) Start() { + a.quit = make(chan struct{}) + a.workCh = make(chan *types.Block, 1) + go a.run() +} + +func (a *RemoteAgent) Stop() { + close(a.quit) + close(a.workCh) +} + +func (a *RemoteAgent) GetHashRate() int64 { return 0 } + +func (a *RemoteAgent) run() { +out: + for { + select { + case <-a.quit: + break out + case work := <-a.workCh: + a.work = work + a.returnCh <- nil + } + } +} + +func (a *RemoteAgent) GetWork() [3]string { + var res [3]string + + // XXX Wait here until work != nil ? + if a.work != nil { + res[0] = a.work.HashNoNonce().Hex() + seedHash, _ := ethash.GetSeedHash(a.currentWork.NumberU64()) + res[1] = common.Bytes2Hex(seedHash) + res[2] = common.Bytes2Hex(a.work.Difficulty().Bytes()) + } + + return res +} + +func (a *RemoteAgent) SubmitWork(nonce uint64, mixDigest, seedHash common.Hash) bool { + // Return true or false, but does not indicate if the PoW was correct + + // Make sure the external miner was working on the right hash + if a.currentWork != nil && a.work != nil { + a.currentWork.SetNonce(nonce) + a.currentWork.Header().MixDigest = mixDigest + a.returnCh <- a.currentWork + //a.returnCh <- Work{a.currentWork.Number().Uint64(), nonce, mixDigest.Bytes(), seedHash.Bytes()} + return true + } + + return false +} diff --git a/miner/worker.go b/miner/worker.go new file mode 100644 index 000000000..e3680dea3 --- /dev/null +++ b/miner/worker.go @@ -0,0 +1,327 @@ +package miner + +import ( + "fmt" + "math/big" + "sort" + "sync" + "sync/atomic" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/state" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/pow" + "gopkg.in/fatih/set.v0" +) + +var jsonlogger = logger.NewJsonLogger() + +type environment struct { + totalUsedGas *big.Int + state *state.StateDB + coinbase *state.StateObject + block *types.Block + family *set.Set + uncles *set.Set +} + +func env(block *types.Block, eth core.Backend) *environment { + state := state.New(block.Root(), eth.StateDb()) + env := &environment{ + totalUsedGas: new(big.Int), + state: state, + block: block, + family: set.New(), + uncles: set.New(), + coinbase: state.GetOrNewStateObject(block.Coinbase()), + } + + return env +} + +type Work struct { + Number uint64 + Nonce uint64 + MixDigest []byte + SeedHash []byte +} + +type Agent interface { + Work() chan<- *types.Block + SetReturnCh(chan<- *types.Block) + Stop() + Start() + GetHashRate() int64 +} + +type worker struct { + mu sync.Mutex + + agents []Agent + recv chan *types.Block + mux *event.TypeMux + quit chan struct{} + pow pow.PoW + atWork int64 + + eth core.Backend + chain *core.ChainManager + proc *core.BlockProcessor + coinbase common.Address + + current *environment + + uncleMu sync.Mutex + possibleUncles map[common.Hash]*types.Block + + mining bool +} + +func newWorker(coinbase common.Address, eth core.Backend) *worker { + return &worker{ + eth: eth, + mux: eth.EventMux(), + recv: make(chan *types.Block), + chain: eth.ChainManager(), + proc: eth.BlockProcessor(), + possibleUncles: make(map[common.Hash]*types.Block), + coinbase: coinbase, + } +} + +func (self *worker) start() { + self.mining = true + + self.quit = make(chan struct{}) + + // spin up agents + for _, agent := range self.agents { + agent.Start() + } + + go self.update() + go self.wait() +} + +func (self *worker) stop() { + self.mining = false + atomic.StoreInt64(&self.atWork, 0) + + close(self.quit) +} + +func (self *worker) register(agent Agent) { + self.agents = append(self.agents, agent) + agent.SetReturnCh(self.recv) +} + +func (self *worker) update() { + events := self.mux.Subscribe(core.ChainHeadEvent{}, core.ChainSideEvent{}) + + timer := time.NewTicker(2 * time.Second) + +out: + for { + select { + case event := <-events.Chan(): + switch ev := event.(type) { + case core.ChainHeadEvent: + self.commitNewWork() + case core.ChainSideEvent: + self.uncleMu.Lock() + self.possibleUncles[ev.Block.Hash()] = ev.Block + self.uncleMu.Unlock() + } + + case <-self.quit: + // stop all agents + for _, agent := range self.agents { + agent.Stop() + } + break out + case <-timer.C: + minerlogger.Infoln("Hash rate:", self.HashRate(), "Khash") + + // XXX In case all mined a possible uncle + if atomic.LoadInt64(&self.atWork) == 0 { + self.commitNewWork() + } + } + } + + events.Unsubscribe() +} + +func (self *worker) wait() { + for { + for block := range self.recv { + atomic.AddInt64(&self.atWork, -1) + + if block == nil { + continue + } + + if err := self.chain.InsertChain(types.Blocks{block}); err == nil { + for _, uncle := range block.Uncles() { + delete(self.possibleUncles, uncle.Hash()) + } + self.mux.Post(core.NewMinedBlockEvent{block}) + + jsonlogger.LogJson(&logger.EthMinerNewBlock{ + BlockHash: block.Hash().Hex(), + BlockNumber: block.Number(), + ChainHeadHash: block.ParentHeaderHash.Hex(), + BlockPrevHash: block.ParentHeaderHash.Hex(), + }) + } else { + self.commitNewWork() + } + } + } +} + +func (self *worker) push() { + if self.mining { + self.current.block.Header().GasUsed = self.current.totalUsedGas + self.current.block.SetRoot(self.current.state.Root()) + + // push new work to agents + for _, agent := range self.agents { + atomic.AddInt64(&self.atWork, 1) + + agent.Work() <- self.current.block.Copy() + } + } +} + +func (self *worker) commitNewWork() { + self.mu.Lock() + defer self.mu.Unlock() + self.uncleMu.Lock() + defer self.uncleMu.Unlock() + + block := self.chain.NewBlock(self.coinbase) + + self.current = env(block, self.eth) + for _, ancestor := range self.chain.GetAncestors(block, 7) { + self.current.family.Add(ancestor.Hash()) + } + + parent := self.chain.GetBlock(self.current.block.ParentHash()) + self.current.coinbase.SetGasPool(core.CalcGasLimit(parent, self.current.block)) + + transactions := self.eth.TxPool().GetTransactions() + sort.Sort(types.TxByNonce{transactions}) + + // Keep track of transactions which return errors so they can be removed + var ( + remove types.Transactions + tcount = 0 + ) +gasLimit: + for i, tx := range transactions { + err := self.commitTransaction(tx) + switch { + case core.IsNonceErr(err): + fallthrough + case core.IsInvalidTxErr(err): + // Remove invalid transactions + from, _ := tx.From() + self.chain.TxState().RemoveNonce(from, tx.Nonce()) + remove = append(remove, tx) + minerlogger.Infof("TX (%x) failed, will be removed: %v\n", tx.Hash().Bytes()[:4], err) + minerlogger.Infoln(tx) + case state.IsGasLimitErr(err): + minerlogger.Infof("Gas limit reached for block. %d TXs included in this block\n", i) + // Break on gas limit + break gasLimit + default: + tcount++ + } + } + self.eth.TxPool().RemoveSet(remove) + + var ( + uncles []*types.Header + badUncles []common.Hash + ) + for hash, uncle := range self.possibleUncles { + if len(uncles) == 2 { + break + } + + if err := self.commitUncle(uncle.Header()); err != nil { + minerlogger.Infof("Bad uncle found and will be removed (%x)\n", hash[:4]) + minerlogger.Debugln(uncle) + badUncles = append(badUncles, hash) + } else { + minerlogger.Infof("commiting %x as uncle\n", hash[:4]) + uncles = append(uncles, uncle.Header()) + } + } + minerlogger.Infof("commit new work on block %v with %d txs & %d uncles\n", self.current.block.Number(), tcount, len(uncles)) + for _, hash := range badUncles { + delete(self.possibleUncles, hash) + } + + self.current.block.SetUncles(uncles) + + self.current.state.AddBalance(self.coinbase, core.BlockReward) + + self.current.state.Update(common.Big0) + self.push() +} + +var ( + inclusionReward = new(big.Int).Div(core.BlockReward, big.NewInt(32)) + _uncleReward = new(big.Int).Mul(core.BlockReward, big.NewInt(15)) + uncleReward = new(big.Int).Div(_uncleReward, big.NewInt(16)) +) + +func (self *worker) commitUncle(uncle *types.Header) error { + if self.current.uncles.Has(uncle.Hash()) { + // Error not unique + return core.UncleError("Uncle not unique") + } + self.current.uncles.Add(uncle.Hash()) + + if !self.current.family.Has(uncle.ParentHash) { + return core.UncleError(fmt.Sprintf("Uncle's parent unknown (%x)", uncle.ParentHash[0:4])) + } + + if self.current.family.Has(uncle.Hash()) { + return core.UncleError(fmt.Sprintf("Uncle already in family (%x)", uncle.Hash())) + } + + self.current.state.AddBalance(uncle.Coinbase, uncleReward) + self.current.state.AddBalance(self.coinbase, inclusionReward) + + return nil +} + +func (self *worker) commitTransaction(tx *types.Transaction) error { + snap := self.current.state.Copy() + receipt, _, err := self.proc.ApplyTransaction(self.current.coinbase, self.current.state, self.current.block, tx, self.current.totalUsedGas, true) + if err != nil && (core.IsNonceErr(err) || state.IsGasLimitErr(err) || core.IsInvalidTxErr(err)) { + self.current.state.Set(snap) + return err + } + + self.current.block.AddTransaction(tx) + self.current.block.AddReceipt(receipt) + + return nil +} + +func (self *worker) HashRate() int64 { + var tot int64 + for _, agent := range self.agents { + tot += agent.GetHashRate() + } + + return tot +} |