aboutsummaryrefslogtreecommitdiffstats
path: root/eth
diff options
context:
space:
mode:
Diffstat (limited to 'eth')
-rw-r--r--eth/backend.go341
-rw-r--r--eth/peer_util.go23
-rw-r--r--eth/protocol.go378
-rw-r--r--eth/protocol_test.go264
-rw-r--r--eth/wallet.go80
5 files changed, 1086 insertions, 0 deletions
diff --git a/eth/backend.go b/eth/backend.go
new file mode 100644
index 000000000..9c497a586
--- /dev/null
+++ b/eth/backend.go
@@ -0,0 +1,341 @@
+package eth
+
+import (
+ "crypto/ecdsa"
+ "fmt"
+ "io/ioutil"
+ "path"
+ "strings"
+
+ "github.com/ethereum/ethash"
+ "github.com/ethereum/go-ethereum/accounts"
+ "github.com/ethereum/go-ethereum/blockpool"
+ "github.com/ethereum/go-ethereum/core"
+ "github.com/ethereum/go-ethereum/crypto"
+ "github.com/ethereum/go-ethereum/ethdb"
+ "github.com/ethereum/go-ethereum/ethutil"
+ "github.com/ethereum/go-ethereum/event"
+ "github.com/ethereum/go-ethereum/logger"
+ "github.com/ethereum/go-ethereum/miner"
+ "github.com/ethereum/go-ethereum/p2p"
+ "github.com/ethereum/go-ethereum/p2p/discover"
+ "github.com/ethereum/go-ethereum/p2p/nat"
+ "github.com/ethereum/go-ethereum/vm"
+ "github.com/ethereum/go-ethereum/whisper"
+)
+
+var (
+ servlogger = logger.NewLogger("SERV")
+ jsonlogger = logger.NewJsonLogger()
+
+ defaultBootNodes = []*discover.Node{
+ // ETH/DEV cmd/bootnode
+ discover.MustParseNode("enode://6cdd090303f394a1cac34ecc9f7cda18127eafa2a3a06de39f6d920b0e583e062a7362097c7c65ee490a758b442acd5c80c6fce4b148c6a391e946b45131365b@54.169.166.226:30303"),
+ // ETH/DEV cpp-ethereum (poc-8.ethdev.com)
+ discover.MustParseNode("enode://4a44599974518ea5b0f14c31c4463692ac0329cb84851f3435e6d1b18ee4eae4aa495f846a0fa1219bd58035671881d44423876e57db2abd57254d0197da0ebe@5.1.83.226:30303"),
+ }
+)
+
+type Config struct {
+ Name string
+ DataDir string
+ LogFile string
+ LogLevel int
+ LogFormat string
+ VmDebug bool
+
+ MaxPeers int
+ Port string
+
+ // This should be a space-separated list of
+ // discovery node URLs.
+ BootNodes string
+
+ // This key is used to identify the node on the network.
+ // If nil, an ephemeral key is used.
+ NodeKey *ecdsa.PrivateKey
+
+ NAT nat.Interface
+ Shh bool
+ Dial bool
+
+ MinerThreads int
+ AccountManager *accounts.Manager
+}
+
+func (cfg *Config) parseBootNodes() []*discover.Node {
+ if cfg.BootNodes == "" {
+ return defaultBootNodes
+ }
+ var ns []*discover.Node
+ for _, url := range strings.Split(cfg.BootNodes, " ") {
+ if url == "" {
+ continue
+ }
+ n, err := discover.ParseNode(url)
+ if err != nil {
+ servlogger.Errorf("Bootstrap URL %s: %v\n", url, err)
+ continue
+ }
+ ns = append(ns, n)
+ }
+ return ns
+}
+
+func (cfg *Config) nodeKey() (*ecdsa.PrivateKey, error) {
+ // use explicit key from command line args if set
+ if cfg.NodeKey != nil {
+ return cfg.NodeKey, nil
+ }
+ // use persistent key if present
+ keyfile := path.Join(cfg.DataDir, "nodekey")
+ key, err := crypto.LoadECDSA(keyfile)
+ if err == nil {
+ return key, nil
+ }
+ // no persistent key, generate and store a new one
+ if key, err = crypto.GenerateKey(); err != nil {
+ return nil, fmt.Errorf("could not generate server key: %v", err)
+ }
+ if err := ioutil.WriteFile(keyfile, crypto.FromECDSA(key), 0600); err != nil {
+ servlogger.Errorln("could not persist nodekey: ", err)
+ }
+ return key, nil
+}
+
+type Ethereum struct {
+ // Channel for shutting down the ethereum
+ shutdownChan chan bool
+
+ // DB interface
+ blockDb ethutil.Database
+ stateDb ethutil.Database
+
+ //*** SERVICES ***
+ // State manager for processing new blocks and managing the over all states
+ blockProcessor *core.BlockProcessor
+ txPool *core.TxPool
+ chainManager *core.ChainManager
+ blockPool *blockpool.BlockPool
+ accountManager *accounts.Manager
+ whisper *whisper.Whisper
+
+ net *p2p.Server
+ eventMux *event.TypeMux
+ txSub event.Subscription
+ blockSub event.Subscription
+ miner *miner.Miner
+
+ logger logger.LogSystem
+
+ Mining bool
+ DataDir string
+}
+
+func New(config *Config) (*Ethereum, error) {
+ // Boostrap database
+ servlogger := logger.New(config.DataDir, config.LogFile, config.LogLevel, config.LogFormat)
+
+ blockDb, err := ethdb.NewLDBDatabase(path.Join(config.DataDir, "blockchain"))
+ if err != nil {
+ return nil, err
+ }
+ stateDb, err := ethdb.NewLDBDatabase(path.Join(config.DataDir, "state"))
+ if err != nil {
+ return nil, err
+ }
+
+ // Perform database sanity checks
+ d, _ := blockDb.Get([]byte("ProtocolVersion"))
+ protov := ethutil.NewValue(d).Uint()
+ if protov != ProtocolVersion && protov != 0 {
+ path := path.Join(config.DataDir, "blockchain")
+ return nil, fmt.Errorf("Database version mismatch. Protocol(%d / %d). `rm -rf %s`", protov, ProtocolVersion, path)
+ }
+
+ saveProtocolVersion(blockDb)
+ //ethutil.Config.Db = db
+
+ eth := &Ethereum{
+ shutdownChan: make(chan bool),
+ blockDb: blockDb,
+ stateDb: stateDb,
+ eventMux: &event.TypeMux{},
+ logger: servlogger,
+ accountManager: config.AccountManager,
+ DataDir: config.DataDir,
+ }
+
+ eth.chainManager = core.NewChainManager(blockDb, stateDb, eth.EventMux())
+ pow := ethash.New(eth.chainManager)
+ eth.txPool = core.NewTxPool(eth.EventMux())
+ eth.blockProcessor = core.NewBlockProcessor(stateDb, pow, eth.txPool, eth.chainManager, eth.EventMux())
+ eth.chainManager.SetProcessor(eth.blockProcessor)
+ eth.whisper = whisper.New()
+ eth.miner = miner.New(eth, pow, config.MinerThreads)
+
+ hasBlock := eth.chainManager.HasBlock
+ insertChain := eth.chainManager.InsertChain
+ eth.blockPool = blockpool.New(hasBlock, insertChain, pow.Verify)
+
+ netprv, err := config.nodeKey()
+ if err != nil {
+ return nil, err
+ }
+ ethProto := EthProtocol(eth.txPool, eth.chainManager, eth.blockPool)
+ protocols := []p2p.Protocol{ethProto}
+ if config.Shh {
+ protocols = append(protocols, eth.whisper.Protocol())
+ }
+
+ eth.net = &p2p.Server{
+ PrivateKey: netprv,
+ Name: config.Name,
+ MaxPeers: config.MaxPeers,
+ Protocols: protocols,
+ NAT: config.NAT,
+ NoDial: !config.Dial,
+ BootstrapNodes: config.parseBootNodes(),
+ }
+ if len(config.Port) > 0 {
+ eth.net.ListenAddr = ":" + config.Port
+ }
+
+ vm.Debug = config.VmDebug
+
+ return eth, nil
+}
+
+func (s *Ethereum) StartMining() error {
+ cb, err := s.accountManager.Coinbase()
+ if err != nil {
+ servlogger.Errorf("Cannot start mining without coinbase: %v\n", err)
+ return fmt.Errorf("no coinbase: %v", err)
+ }
+ s.miner.Start(cb)
+ return nil
+}
+
+func (s *Ethereum) StopMining() { s.miner.Stop() }
+func (s *Ethereum) IsMining() bool { return s.miner.Mining() }
+
+func (s *Ethereum) Logger() logger.LogSystem { return s.logger }
+func (s *Ethereum) Name() string { return s.net.Name }
+func (s *Ethereum) AccountManager() *accounts.Manager { return s.accountManager }
+func (s *Ethereum) ChainManager() *core.ChainManager { return s.chainManager }
+func (s *Ethereum) BlockProcessor() *core.BlockProcessor { return s.blockProcessor }
+func (s *Ethereum) TxPool() *core.TxPool { return s.txPool }
+func (s *Ethereum) BlockPool() *blockpool.BlockPool { return s.blockPool }
+func (s *Ethereum) Whisper() *whisper.Whisper { return s.whisper }
+func (s *Ethereum) EventMux() *event.TypeMux { return s.eventMux }
+func (s *Ethereum) BlockDb() ethutil.Database { return s.blockDb }
+func (s *Ethereum) StateDb() ethutil.Database { return s.stateDb }
+func (s *Ethereum) IsListening() bool { return true } // Always listening
+func (s *Ethereum) PeerCount() int { return s.net.PeerCount() }
+func (s *Ethereum) Peers() []*p2p.Peer { return s.net.Peers() }
+func (s *Ethereum) MaxPeers() int { return s.net.MaxPeers }
+
+// Start the ethereum
+func (s *Ethereum) Start() error {
+ jsonlogger.LogJson(&logger.LogStarting{
+ ClientString: s.net.Name,
+ ProtocolVersion: ProtocolVersion,
+ })
+
+ err := s.net.Start()
+ if err != nil {
+ return err
+ }
+
+ // Start services
+ s.txPool.Start()
+ s.blockPool.Start()
+
+ if s.whisper != nil {
+ s.whisper.Start()
+ }
+
+ // broadcast transactions
+ s.txSub = s.eventMux.Subscribe(core.TxPreEvent{})
+ go s.txBroadcastLoop()
+
+ // broadcast mined blocks
+ s.blockSub = s.eventMux.Subscribe(core.NewMinedBlockEvent{})
+ go s.blockBroadcastLoop()
+
+ servlogger.Infoln("Server started")
+ return nil
+}
+
+func (s *Ethereum) StartForTest() {
+ jsonlogger.LogJson(&logger.LogStarting{
+ ClientString: s.net.Name,
+ ProtocolVersion: ProtocolVersion,
+ })
+
+ // Start services
+ s.txPool.Start()
+ s.blockPool.Start()
+}
+
+func (self *Ethereum) SuggestPeer(nodeURL string) error {
+ n, err := discover.ParseNode(nodeURL)
+ if err != nil {
+ return fmt.Errorf("invalid node URL: %v", err)
+ }
+ self.net.SuggestPeer(n)
+ return nil
+}
+
+func (s *Ethereum) Stop() {
+ // Close the database
+ defer s.blockDb.Close()
+ defer s.stateDb.Close()
+
+ s.txSub.Unsubscribe() // quits txBroadcastLoop
+ s.blockSub.Unsubscribe() // quits blockBroadcastLoop
+
+ s.txPool.Stop()
+ s.eventMux.Stop()
+ s.blockPool.Stop()
+ if s.whisper != nil {
+ s.whisper.Stop()
+ }
+
+ servlogger.Infoln("Server stopped")
+ close(s.shutdownChan)
+}
+
+// This function will wait for a shutdown and resumes main thread execution
+func (s *Ethereum) WaitForShutdown() {
+ <-s.shutdownChan
+}
+
+// now tx broadcasting is taken out of txPool
+// handled here via subscription, efficiency?
+func (self *Ethereum) txBroadcastLoop() {
+ // automatically stops if unsubscribe
+ for obj := range self.txSub.Chan() {
+ event := obj.(core.TxPreEvent)
+ self.net.Broadcast("eth", TxMsg, event.Tx.RlpData())
+ }
+}
+
+func (self *Ethereum) blockBroadcastLoop() {
+ // automatically stops if unsubscribe
+ for obj := range self.blockSub.Chan() {
+ switch ev := obj.(type) {
+ case core.NewMinedBlockEvent:
+ self.net.Broadcast("eth", NewBlockMsg, ev.Block.RlpData(), ev.Block.Td)
+ }
+ }
+}
+
+func saveProtocolVersion(db ethutil.Database) {
+ d, _ := db.Get([]byte("ProtocolVersion"))
+ protocolVersion := ethutil.NewValue(d).Uint()
+
+ if protocolVersion == 0 {
+ db.Put([]byte("ProtocolVersion"), ethutil.NewValue(ProtocolVersion).Bytes())
+ }
+}
diff --git a/eth/peer_util.go b/eth/peer_util.go
new file mode 100644
index 000000000..6cf80cde2
--- /dev/null
+++ b/eth/peer_util.go
@@ -0,0 +1,23 @@
+package eth
+
+import (
+ "encoding/json"
+
+ "github.com/ethereum/go-ethereum/ethutil"
+)
+
+func WritePeers(path string, addresses []string) {
+ if len(addresses) > 0 {
+ data, _ := json.MarshalIndent(addresses, "", " ")
+ ethutil.WriteFile(path, data)
+ }
+}
+
+func ReadPeers(path string) (ips []string, err error) {
+ var data string
+ data, err = ethutil.ReadAllFile(path)
+ if err != nil {
+ json.Unmarshal([]byte(data), &ips)
+ }
+ return
+}
diff --git a/eth/protocol.go b/eth/protocol.go
new file mode 100644
index 000000000..b9b485292
--- /dev/null
+++ b/eth/protocol.go
@@ -0,0 +1,378 @@
+package eth
+
+import (
+ "bytes"
+ "fmt"
+ "math/big"
+
+ "github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/errs"
+ "github.com/ethereum/go-ethereum/ethutil"
+ "github.com/ethereum/go-ethereum/logger"
+ "github.com/ethereum/go-ethereum/p2p"
+ "github.com/ethereum/go-ethereum/rlp"
+)
+
+const (
+ ProtocolVersion = 56
+ NetworkId = 0
+ ProtocolLength = uint64(8)
+ ProtocolMaxMsgSize = 10 * 1024 * 1024
+ maxHashes = 256
+ maxBlocks = 64
+)
+
+// eth protocol message codes
+const (
+ StatusMsg = iota
+ GetTxMsg // unused
+ TxMsg
+ GetBlockHashesMsg
+ BlockHashesMsg
+ GetBlocksMsg
+ BlocksMsg
+ NewBlockMsg
+)
+
+const (
+ ErrMsgTooLarge = iota
+ ErrDecode
+ ErrInvalidMsgCode
+ ErrProtocolVersionMismatch
+ ErrNetworkIdMismatch
+ ErrGenesisBlockMismatch
+ ErrNoStatusMsg
+ ErrExtraStatusMsg
+)
+
+var errorToString = map[int]string{
+ ErrMsgTooLarge: "Message too long",
+ ErrDecode: "Invalid message",
+ ErrInvalidMsgCode: "Invalid message code",
+ ErrProtocolVersionMismatch: "Protocol version mismatch",
+ ErrNetworkIdMismatch: "NetworkId mismatch",
+ ErrGenesisBlockMismatch: "Genesis block mismatch",
+ ErrNoStatusMsg: "No status message",
+ ErrExtraStatusMsg: "Extra status message",
+}
+
+// ethProtocol represents the ethereum wire protocol
+// instance is running on each peer
+type ethProtocol struct {
+ txPool txPool
+ chainManager chainManager
+ blockPool blockPool
+ peer *p2p.Peer
+ id string
+ rw p2p.MsgReadWriter
+ errors *errs.Errors
+}
+
+// backend is the interface the ethereum protocol backend should implement
+// used as an argument to EthProtocol
+type txPool interface {
+ AddTransactions([]*types.Transaction)
+ GetTransactions() types.Transactions
+}
+
+type chainManager interface {
+ GetBlockHashesFromHash(hash []byte, amount uint64) (hashes [][]byte)
+ GetBlock(hash []byte) (block *types.Block)
+ Status() (td *big.Int, currentBlock []byte, genesisBlock []byte)
+}
+
+type blockPool interface {
+ AddBlockHashes(next func() ([]byte, bool), peerId string)
+ AddBlock(block *types.Block, peerId string)
+ AddPeer(td *big.Int, currentBlock []byte, peerId string, requestHashes func([]byte) error, requestBlocks func([][]byte) error, peerError func(*errs.Error)) (best bool)
+ RemovePeer(peerId string)
+}
+
+// message structs used for rlp decoding
+type newBlockMsgData struct {
+ Block *types.Block
+ TD *big.Int
+}
+
+type getBlockHashesMsgData struct {
+ Hash []byte
+ Amount uint64
+}
+
+// main entrypoint, wrappers starting a server running the eth protocol
+// use this constructor to attach the protocol ("class") to server caps
+// the Dev p2p layer then runs the protocol instance on each peer
+func EthProtocol(txPool txPool, chainManager chainManager, blockPool blockPool) p2p.Protocol {
+ return p2p.Protocol{
+ Name: "eth",
+ Version: ProtocolVersion,
+ Length: ProtocolLength,
+ Run: func(peer *p2p.Peer, rw p2p.MsgReadWriter) error {
+ return runEthProtocol(txPool, chainManager, blockPool, peer, rw)
+ },
+ }
+}
+
+// the main loop that handles incoming messages
+// note RemovePeer in the post-disconnect hook
+func runEthProtocol(txPool txPool, chainManager chainManager, blockPool blockPool, peer *p2p.Peer, rw p2p.MsgReadWriter) (err error) {
+ id := peer.ID()
+ self := &ethProtocol{
+ txPool: txPool,
+ chainManager: chainManager,
+ blockPool: blockPool,
+ rw: rw,
+ peer: peer,
+ errors: &errs.Errors{
+ Package: "ETH",
+ Errors: errorToString,
+ },
+ id: fmt.Sprintf("%x", id[:8]),
+ }
+ err = self.handleStatus()
+ if err == nil {
+ self.propagateTxs()
+ for {
+ err = self.handle()
+ if err != nil {
+ self.blockPool.RemovePeer(self.id)
+ break
+ }
+ }
+ }
+ return
+}
+
+func (self *ethProtocol) handle() error {
+ msg, err := self.rw.ReadMsg()
+ if err != nil {
+ return err
+ }
+ if msg.Size > ProtocolMaxMsgSize {
+ return self.protoError(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize)
+ }
+ // make sure that the payload has been fully consumed
+ defer msg.Discard()
+
+ switch msg.Code {
+ case GetTxMsg: // ignore
+ case StatusMsg:
+ return self.protoError(ErrExtraStatusMsg, "")
+
+ case TxMsg:
+ // TODO: rework using lazy RLP stream
+ var txs []*types.Transaction
+ if err := msg.Decode(&txs); err != nil {
+ return self.protoError(ErrDecode, "msg %v: %v", msg, err)
+ }
+ for _, tx := range txs {
+ jsonlogger.LogJson(&logger.EthTxReceived{
+ TxHash: ethutil.Bytes2Hex(tx.Hash()),
+ RemoteId: self.peer.ID().String(),
+ })
+ }
+ self.txPool.AddTransactions(txs)
+
+ case GetBlockHashesMsg:
+ var request getBlockHashesMsgData
+ if err := msg.Decode(&request); err != nil {
+ return self.protoError(ErrDecode, "->msg %v: %v", msg, err)
+ }
+
+ if request.Amount > maxHashes {
+ request.Amount = maxHashes
+ }
+ hashes := self.chainManager.GetBlockHashesFromHash(request.Hash, request.Amount)
+ return p2p.EncodeMsg(self.rw, BlockHashesMsg, ethutil.ByteSliceToInterface(hashes)...)
+
+ case BlockHashesMsg:
+ msgStream := rlp.NewStream(msg.Payload)
+ if _, err := msgStream.List(); err != nil {
+ return err
+ }
+
+ var i int
+ iter := func() (hash []byte, ok bool) {
+ hash, err := msgStream.Bytes()
+ if err == rlp.EOL {
+ return nil, false
+ } else if err != nil {
+ self.protoError(ErrDecode, "msg %v: after %v hashes : %v", msg, i, err)
+ return nil, false
+ }
+ i++
+ return hash, true
+ }
+ self.blockPool.AddBlockHashes(iter, self.id)
+
+ case GetBlocksMsg:
+ msgStream := rlp.NewStream(msg.Payload)
+ if _, err := msgStream.List(); err != nil {
+ return err
+ }
+
+ var blocks []interface{}
+ var i int
+ for {
+ i++
+ var hash []byte
+ if err := msgStream.Decode(&hash); err != nil {
+ if err == rlp.EOL {
+ break
+ } else {
+ return self.protoError(ErrDecode, "msg %v: %v", msg, err)
+ }
+ }
+ block := self.chainManager.GetBlock(hash)
+ if block != nil {
+ blocks = append(blocks, block)
+ }
+ if i == maxBlocks {
+ break
+ }
+ }
+ return p2p.EncodeMsg(self.rw, BlocksMsg, blocks...)
+
+ case BlocksMsg:
+ msgStream := rlp.NewStream(msg.Payload)
+ if _, err := msgStream.List(); err != nil {
+ return err
+ }
+ for {
+ var block types.Block
+ if err := msgStream.Decode(&block); err != nil {
+ if err == rlp.EOL {
+ break
+ } else {
+ return self.protoError(ErrDecode, "msg %v: %v", msg, err)
+ }
+ }
+ self.blockPool.AddBlock(&block, self.id)
+ }
+
+ case NewBlockMsg:
+ var request newBlockMsgData
+ if err := msg.Decode(&request); err != nil {
+ return self.protoError(ErrDecode, "%v: %v", msg, err)
+ }
+ hash := request.Block.Hash()
+ _, chainHead, _ := self.chainManager.Status()
+
+ jsonlogger.LogJson(&logger.EthChainReceivedNewBlock{
+ BlockHash: ethutil.Bytes2Hex(hash),
+ BlockNumber: request.Block.Number(), // this surely must be zero
+ ChainHeadHash: ethutil.Bytes2Hex(chainHead),
+ BlockPrevHash: ethutil.Bytes2Hex(request.Block.ParentHash()),
+ RemoteId: self.peer.ID().String(),
+ })
+ // to simplify backend interface adding a new block
+ // uses AddPeer followed by AddBlock only if peer is the best peer
+ // (or selected as new best peer)
+ if self.blockPool.AddPeer(request.TD, hash, self.id, self.requestBlockHashes, self.requestBlocks, self.protoErrorDisconnect) {
+ self.blockPool.AddBlock(request.Block, self.id)
+ }
+
+ default:
+ return self.protoError(ErrInvalidMsgCode, "%v", msg.Code)
+ }
+ return nil
+}
+
+type statusMsgData struct {
+ ProtocolVersion uint32
+ NetworkId uint32
+ TD *big.Int
+ CurrentBlock []byte
+ GenesisBlock []byte
+}
+
+func (self *ethProtocol) statusMsg() p2p.Msg {
+ td, currentBlock, genesisBlock := self.chainManager.Status()
+
+ return p2p.NewMsg(StatusMsg,
+ uint32(ProtocolVersion),
+ uint32(NetworkId),
+ td,
+ currentBlock,
+ genesisBlock,
+ )
+}
+
+func (self *ethProtocol) handleStatus() error {
+ // send precanned status message
+ if err := self.rw.WriteMsg(self.statusMsg()); err != nil {
+ return err
+ }
+
+ // read and handle remote status
+ msg, err := self.rw.ReadMsg()
+ if err != nil {
+ return err
+ }
+
+ if msg.Code != StatusMsg {
+ return self.protoError(ErrNoStatusMsg, "first msg has code %x (!= %x)", msg.Code, StatusMsg)
+ }
+
+ if msg.Size > ProtocolMaxMsgSize {
+ return self.protoError(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize)
+ }
+
+ var status statusMsgData
+ if err := msg.Decode(&status); err != nil {
+ return self.protoError(ErrDecode, "msg %v: %v", msg, err)
+ }
+
+ _, _, genesisBlock := self.chainManager.Status()
+
+ if !bytes.Equal(status.GenesisBlock, genesisBlock) {
+ return self.protoError(ErrGenesisBlockMismatch, "%x (!= %x)", status.GenesisBlock, genesisBlock)
+ }
+
+ if status.NetworkId != NetworkId {
+ return self.protoError(ErrNetworkIdMismatch, "%d (!= %d)", status.NetworkId, NetworkId)
+ }
+
+ if ProtocolVersion != status.ProtocolVersion {
+ return self.protoError(ErrProtocolVersionMismatch, "%d (!= %d)", status.ProtocolVersion, ProtocolVersion)
+ }
+
+ self.peer.Infof("Peer is [eth] capable (%d/%d). TD=%v H=%x\n", status.ProtocolVersion, status.NetworkId, status.TD, status.CurrentBlock[:4])
+
+ self.blockPool.AddPeer(status.TD, status.CurrentBlock, self.id, self.requestBlockHashes, self.requestBlocks, self.protoErrorDisconnect)
+
+ return nil
+}
+
+func (self *ethProtocol) requestBlockHashes(from []byte) error {
+ self.peer.Debugf("fetching hashes (%d) %x...\n", maxHashes, from[0:4])
+ return p2p.EncodeMsg(self.rw, GetBlockHashesMsg, interface{}(from), uint64(maxHashes))
+}
+
+func (self *ethProtocol) requestBlocks(hashes [][]byte) error {
+ self.peer.Debugf("fetching %v blocks", len(hashes))
+ return p2p.EncodeMsg(self.rw, GetBlocksMsg, ethutil.ByteSliceToInterface(hashes)...)
+}
+
+func (self *ethProtocol) protoError(code int, format string, params ...interface{}) (err *errs.Error) {
+ err = self.errors.New(code, format, params...)
+ err.Log(self.peer.Logger)
+ return
+}
+
+func (self *ethProtocol) protoErrorDisconnect(err *errs.Error) {
+ err.Log(self.peer.Logger)
+ if err.Fatal() {
+ self.peer.Disconnect(p2p.DiscSubprotocolError)
+ }
+}
+
+func (self *ethProtocol) propagateTxs() {
+ transactions := self.txPool.GetTransactions()
+ iface := make([]interface{}, len(transactions))
+ for i, transaction := range transactions {
+ iface[i] = transaction
+ }
+
+ self.rw.WriteMsg(p2p.NewMsg(TxMsg, iface...))
+}
diff --git a/eth/protocol_test.go b/eth/protocol_test.go
new file mode 100644
index 000000000..f499d033e
--- /dev/null
+++ b/eth/protocol_test.go
@@ -0,0 +1,264 @@
+package eth
+
+import (
+ "bytes"
+ "io"
+ "log"
+ "math/big"
+ "os"
+ "testing"
+ "time"
+
+ "github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/crypto"
+ "github.com/ethereum/go-ethereum/errs"
+ "github.com/ethereum/go-ethereum/ethutil"
+ ethlogger "github.com/ethereum/go-ethereum/logger"
+ "github.com/ethereum/go-ethereum/p2p"
+ "github.com/ethereum/go-ethereum/p2p/discover"
+)
+
+var logsys = ethlogger.NewStdLogSystem(os.Stdout, log.LstdFlags, ethlogger.LogLevel(ethlogger.DebugDetailLevel))
+
+var ini = false
+
+func logInit() {
+ if !ini {
+ ethlogger.AddLogSystem(logsys)
+ ini = true
+ }
+}
+
+type testMsgReadWriter struct {
+ in chan p2p.Msg
+ out []p2p.Msg
+}
+
+func (self *testMsgReadWriter) In(msg p2p.Msg) {
+ self.in <- msg
+}
+
+func (self *testMsgReadWriter) Out() (msg p2p.Msg, ok bool) {
+ if len(self.out) > 0 {
+ msg = self.out[0]
+ self.out = self.out[1:]
+ ok = true
+ }
+ return
+}
+
+func (self *testMsgReadWriter) WriteMsg(msg p2p.Msg) error {
+ self.out = append(self.out, msg)
+ return nil
+}
+
+func (self *testMsgReadWriter) ReadMsg() (p2p.Msg, error) {
+ msg, ok := <-self.in
+ if !ok {
+ return msg, io.EOF
+ }
+ return msg, nil
+}
+
+type testTxPool struct {
+ getTransactions func() []*types.Transaction
+ addTransactions func(txs []*types.Transaction)
+}
+
+type testChainManager struct {
+ getBlockHashes func(hash []byte, amount uint64) (hashes [][]byte)
+ getBlock func(hash []byte) *types.Block
+ status func() (td *big.Int, currentBlock []byte, genesisBlock []byte)
+}
+
+type testBlockPool struct {
+ addBlockHashes func(next func() ([]byte, bool), peerId string)
+ addBlock func(block *types.Block, peerId string) (err error)
+ addPeer func(td *big.Int, currentBlock []byte, peerId string, requestHashes func([]byte) error, requestBlocks func([][]byte) error, peerError func(*errs.Error)) (best bool)
+ removePeer func(peerId string)
+}
+
+// func (self *testTxPool) GetTransactions() (txs []*types.Transaction) {
+// if self.getTransactions != nil {
+// txs = self.getTransactions()
+// }
+// return
+// }
+
+func (self *testTxPool) AddTransactions(txs []*types.Transaction) {
+ if self.addTransactions != nil {
+ self.addTransactions(txs)
+ }
+}
+
+func (self *testTxPool) GetTransactions() types.Transactions { return nil }
+
+func (self *testChainManager) GetBlockHashesFromHash(hash []byte, amount uint64) (hashes [][]byte) {
+ if self.getBlockHashes != nil {
+ hashes = self.getBlockHashes(hash, amount)
+ }
+ return
+}
+
+func (self *testChainManager) Status() (td *big.Int, currentBlock []byte, genesisBlock []byte) {
+ if self.status != nil {
+ td, currentBlock, genesisBlock = self.status()
+ }
+ return
+}
+
+func (self *testChainManager) GetBlock(hash []byte) (block *types.Block) {
+ if self.getBlock != nil {
+ block = self.getBlock(hash)
+ }
+ return
+}
+
+func (self *testBlockPool) AddBlockHashes(next func() ([]byte, bool), peerId string) {
+ if self.addBlockHashes != nil {
+ self.addBlockHashes(next, peerId)
+ }
+}
+
+func (self *testBlockPool) AddBlock(block *types.Block, peerId string) {
+ if self.addBlock != nil {
+ self.addBlock(block, peerId)
+ }
+}
+
+func (self *testBlockPool) AddPeer(td *big.Int, currentBlock []byte, peerId string, requestBlockHashes func([]byte) error, requestBlocks func([][]byte) error, peerError func(*errs.Error)) (best bool) {
+ if self.addPeer != nil {
+ best = self.addPeer(td, currentBlock, peerId, requestBlockHashes, requestBlocks, peerError)
+ }
+ return
+}
+
+func (self *testBlockPool) RemovePeer(peerId string) {
+ if self.removePeer != nil {
+ self.removePeer(peerId)
+ }
+}
+
+func testPeer() *p2p.Peer {
+ var id discover.NodeID
+ pk := crypto.GenerateNewKeyPair().PublicKey
+ copy(id[:], pk)
+ return p2p.NewPeer(id, "test peer", []p2p.Cap{})
+}
+
+type ethProtocolTester struct {
+ quit chan error
+ rw *testMsgReadWriter // p2p.MsgReadWriter
+ txPool *testTxPool // txPool
+ chainManager *testChainManager // chainManager
+ blockPool *testBlockPool // blockPool
+ t *testing.T
+}
+
+func newEth(t *testing.T) *ethProtocolTester {
+ return &ethProtocolTester{
+ quit: make(chan error),
+ rw: &testMsgReadWriter{in: make(chan p2p.Msg, 10)},
+ txPool: &testTxPool{},
+ chainManager: &testChainManager{},
+ blockPool: &testBlockPool{},
+ t: t,
+ }
+}
+
+func (self *ethProtocolTester) reset() {
+ self.rw = &testMsgReadWriter{in: make(chan p2p.Msg, 10)}
+ self.quit = make(chan error)
+}
+
+func (self *ethProtocolTester) checkError(expCode int, delay time.Duration) (err error) {
+ var timer = time.After(delay)
+ select {
+ case err = <-self.quit:
+ case <-timer:
+ self.t.Errorf("no error after %v, expected %v", delay, expCode)
+ return
+ }
+ perr, ok := err.(*errs.Error)
+ if ok && perr != nil {
+ if code := perr.Code; code != expCode {
+ self.t.Errorf("expected protocol error (code %v), got %v (%v)", expCode, code, err)
+ }
+ } else {
+ self.t.Errorf("expected protocol error (code %v), got %v", expCode, err)
+ }
+ return
+}
+
+func (self *ethProtocolTester) In(msg p2p.Msg) {
+ self.rw.In(msg)
+}
+
+func (self *ethProtocolTester) Out() (p2p.Msg, bool) {
+ return self.rw.Out()
+}
+
+func (self *ethProtocolTester) checkMsg(i int, code uint64, val interface{}) (msg p2p.Msg) {
+ if i >= len(self.rw.out) {
+ self.t.Errorf("expected at least %v msgs, got %v", i, len(self.rw.out))
+ return
+ }
+ msg = self.rw.out[i]
+ if msg.Code != code {
+ self.t.Errorf("expected msg code %v, got %v", code, msg.Code)
+ }
+ if val != nil {
+ if err := msg.Decode(val); err != nil {
+ self.t.Errorf("rlp encoding error: %v", err)
+ }
+ }
+ return
+}
+
+func (self *ethProtocolTester) run() {
+ err := runEthProtocol(self.txPool, self.chainManager, self.blockPool, testPeer(), self.rw)
+ self.quit <- err
+}
+
+func TestStatusMsgErrors(t *testing.T) {
+ logInit()
+ eth := newEth(t)
+ td := ethutil.Big1
+ currentBlock := []byte{1}
+ genesis := []byte{2}
+ eth.chainManager.status = func() (*big.Int, []byte, []byte) { return td, currentBlock, genesis }
+ go eth.run()
+ statusMsg := p2p.NewMsg(4)
+ eth.In(statusMsg)
+ delay := 1 * time.Second
+ eth.checkError(ErrNoStatusMsg, delay)
+ var status statusMsgData
+ eth.checkMsg(0, StatusMsg, &status) // first outgoing msg should be StatusMsg
+ if status.TD.Cmp(td) != 0 ||
+ status.ProtocolVersion != ProtocolVersion ||
+ status.NetworkId != NetworkId ||
+ status.TD.Cmp(td) != 0 ||
+ bytes.Compare(status.CurrentBlock, currentBlock) != 0 ||
+ bytes.Compare(status.GenesisBlock, genesis) != 0 {
+ t.Errorf("incorrect outgoing status")
+ }
+
+ eth.reset()
+ go eth.run()
+ statusMsg = p2p.NewMsg(0, uint32(48), uint32(0), td, currentBlock, genesis)
+ eth.In(statusMsg)
+ eth.checkError(ErrProtocolVersionMismatch, delay)
+
+ eth.reset()
+ go eth.run()
+ statusMsg = p2p.NewMsg(0, uint32(49), uint32(1), td, currentBlock, genesis)
+ eth.In(statusMsg)
+ eth.checkError(ErrNetworkIdMismatch, delay)
+
+ eth.reset()
+ go eth.run()
+ statusMsg = p2p.NewMsg(0, uint32(49), uint32(0), td, currentBlock, []byte{3})
+ eth.In(statusMsg)
+ eth.checkError(ErrGenesisBlockMismatch, delay)
+
+}
diff --git a/eth/wallet.go b/eth/wallet.go
new file mode 100644
index 000000000..9ec834309
--- /dev/null
+++ b/eth/wallet.go
@@ -0,0 +1,80 @@
+package eth
+
+/*
+import (
+ "crypto/ecdsa"
+ "errors"
+ "math/big"
+
+ "github.com/ethereum/go-ethereum/core"
+ "github.com/ethereum/go-ethereum/core/types"
+)
+
+type Account struct {
+ w *Wallet
+}
+
+func (self *Account) Transact(to *Account, value, gas, price *big.Int, data []byte) error {
+ return self.w.transact(self, to, value, gas, price, data)
+}
+
+func (self *Account) Address() []byte {
+ return nil
+}
+
+func (self *Account) PrivateKey() *ecdsa.PrivateKey {
+ return nil
+}
+
+type Wallet struct{}
+
+func NewWallet() *Wallet {
+ return &Wallet{}
+}
+
+func (self *Wallet) GetAccount(i int) *Account {
+}
+
+func (self *Wallet) transact(from, to *Account, value, gas, price *big.Int, data []byte) error {
+ if from.PrivateKey() == nil {
+ return errors.New("accounts is not owned (no private key available)")
+ }
+
+ var createsContract bool
+ if to == nil {
+ createsContract = true
+ }
+
+ var msg *types.Transaction
+ if contractCreation {
+ msg = types.NewContractCreationTx(value, gas, price, data)
+ } else {
+ msg = types.NewTransactionMessage(to.Address(), value, gas, price, data)
+ }
+
+ state := self.chainManager.TransState()
+ nonce := state.GetNonce(key.Address())
+
+ msg.SetNonce(nonce)
+ msg.SignECDSA(from.PriateKey())
+
+ // Do some pre processing for our "pre" events and hooks
+ block := self.chainManager.NewBlock(from.Address())
+ coinbase := state.GetOrNewStateObject(from.Address())
+ coinbase.SetGasPool(block.GasLimit())
+ self.blockManager.ApplyTransactions(coinbase, state, block, types.Transactions{tx}, true)
+
+ err := self.obj.TxPool().Add(tx)
+ if err != nil {
+ return nil, err
+ }
+ state.SetNonce(key.Address(), nonce+1)
+
+ if contractCreation {
+ addr := core.AddressFromMessage(tx)
+ pipelogger.Infof("Contract addr %x\n", addr)
+ }
+
+ return tx, nil
+}
+*/