diff options
54 files changed, 2541 insertions, 3005 deletions
diff --git a/block_pool.go b/block_pool.go deleted file mode 100644 index c618f6993..000000000 --- a/block_pool.go +++ /dev/null @@ -1,351 +0,0 @@ -package eth - -import ( - "bytes" - "container/list" - "fmt" - "math" - "math/big" - "sync" - "time" - - "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/ethutil" - "github.com/ethereum/go-ethereum/logger" - "github.com/ethereum/go-ethereum/wire" -) - -var poollogger = logger.NewLogger("BPOOL") - -type block struct { - from *Peer - peer *Peer - block *types.Block - reqAt time.Time - requested int -} - -type BlockPool struct { - mut sync.Mutex - - eth *Ethereum - - hashes [][]byte - pool map[string]*block - - td *big.Int - quit chan bool - - fetchingHashes bool - downloadStartedAt time.Time - - ChainLength, BlocksProcessed int - - peer *Peer -} - -func NewBlockPool(eth *Ethereum) *BlockPool { - return &BlockPool{ - eth: eth, - pool: make(map[string]*block), - td: ethutil.Big0, - quit: make(chan bool), - } -} - -func (self *BlockPool) Len() int { - return len(self.hashes) -} - -func (self *BlockPool) Reset() { - self.pool = make(map[string]*block) - self.hashes = nil -} - -func (self *BlockPool) HasLatestHash() bool { - self.mut.Lock() - defer self.mut.Unlock() - - return self.pool[string(self.eth.ChainManager().CurrentBlock().Hash())] != nil -} - -func (self *BlockPool) HasCommonHash(hash []byte) bool { - return self.eth.ChainManager().GetBlock(hash) != nil -} - -func (self *BlockPool) Blocks() (blocks types.Blocks) { - for _, item := range self.pool { - if item.block != nil { - blocks = append(blocks, item.block) - } - } - - return -} - -func (self *BlockPool) FetchHashes(peer *Peer) bool { - highestTd := self.eth.HighestTDPeer() - - if (self.peer == nil && peer.td.Cmp(highestTd) >= 0) || (self.peer != nil && peer.td.Cmp(self.peer.td) > 0) || self.peer == peer { - if self.peer != peer { - poollogger.Infof("Found better suitable peer (%v vs %v)\n", self.td, peer.td) - - if self.peer != nil { - self.peer.doneFetchingHashes = true - } - } - - self.peer = peer - self.td = peer.td - - if !self.HasLatestHash() { - self.fetchHashes() - } - - return true - } - - return false -} - -func (self *BlockPool) fetchHashes() { - peer := self.peer - - peer.doneFetchingHashes = false - - const amount = 256 - peerlogger.Debugf("Fetching hashes (%d) %x...\n", amount, peer.lastReceivedHash[0:4]) - peer.QueueMessage(wire.NewMessage(wire.MsgGetBlockHashesTy, []interface{}{peer.lastReceivedHash, uint32(amount)})) -} - -func (self *BlockPool) AddHash(hash []byte, peer *Peer) { - self.mut.Lock() - defer self.mut.Unlock() - - if self.pool[string(hash)] == nil { - self.pool[string(hash)] = &block{peer, nil, nil, time.Now(), 0} - - self.hashes = append([][]byte{hash}, self.hashes...) - } -} - -func (self *BlockPool) Add(b *types.Block, peer *Peer) { - self.addBlock(b, peer, false) -} - -func (self *BlockPool) AddNew(b *types.Block, peer *Peer) { - self.addBlock(b, peer, true) -} - -func (self *BlockPool) addBlock(b *types.Block, peer *Peer, newBlock bool) { - self.mut.Lock() - defer self.mut.Unlock() - - hash := string(b.Hash()) - - if self.pool[hash] == nil && !self.eth.ChainManager().HasBlock(b.Hash()) { - poollogger.Infof("Got unrequested block (%x...)\n", hash[0:4]) - - self.hashes = append(self.hashes, b.Hash()) - self.pool[hash] = &block{peer, peer, b, time.Now(), 0} - - // The following is only performed on an unrequested new block - if newBlock { - fmt.Println("1.", !self.eth.ChainManager().HasBlock(b.PrevHash), ethutil.Bytes2Hex(b.Hash()[0:4]), ethutil.Bytes2Hex(b.PrevHash[0:4])) - fmt.Println("2.", self.pool[string(b.PrevHash)] == nil) - fmt.Println("3.", !self.fetchingHashes) - if !self.eth.ChainManager().HasBlock(b.PrevHash) /*&& self.pool[string(b.PrevHash)] == nil*/ && !self.fetchingHashes { - poollogger.Infof("Unknown chain, requesting (%x...)\n", b.PrevHash[0:4]) - peer.QueueMessage(wire.NewMessage(wire.MsgGetBlockHashesTy, []interface{}{b.Hash(), uint32(256)})) - } - } - } else if self.pool[hash] != nil { - self.pool[hash].block = b - } - - self.BlocksProcessed++ -} - -func (self *BlockPool) Remove(hash []byte) { - self.mut.Lock() - defer self.mut.Unlock() - - self.hashes = ethutil.DeleteFromByteSlice(self.hashes, hash) - delete(self.pool, string(hash)) -} - -func (self *BlockPool) DistributeHashes() { - self.mut.Lock() - defer self.mut.Unlock() - - var ( - peerLen = self.eth.peers.Len() - amount = 256 * peerLen - dist = make(map[*Peer][][]byte) - ) - - num := int(math.Min(float64(amount), float64(len(self.pool)))) - for i, j := 0, 0; i < len(self.hashes) && j < num; i++ { - hash := self.hashes[i] - item := self.pool[string(hash)] - - if item != nil && item.block == nil { - var peer *Peer - lastFetchFailed := time.Since(item.reqAt) > 5*time.Second - - // Handle failed requests - if lastFetchFailed && item.requested > 5 && item.peer != nil { - if item.requested < 100 { - // Select peer the hash was retrieved off - peer = item.from - } else { - // Remove it - self.hashes = ethutil.DeleteFromByteSlice(self.hashes, hash) - delete(self.pool, string(hash)) - } - } else if lastFetchFailed || item.peer == nil { - // Find a suitable, available peer - eachPeer(self.eth.peers, func(p *Peer, v *list.Element) { - if peer == nil && len(dist[p]) < amount/peerLen && p.statusKnown { - peer = p - } - }) - } - - if peer != nil { - item.reqAt = time.Now() - item.peer = peer - item.requested++ - - dist[peer] = append(dist[peer], hash) - } - } - } - - for peer, hashes := range dist { - peer.FetchBlocks(hashes) - } - - if len(dist) > 0 { - self.downloadStartedAt = time.Now() - } -} - -func (self *BlockPool) Start() { - go self.downloadThread() - go self.chainThread() -} - -func (self *BlockPool) Stop() { - close(self.quit) -} - -func (self *BlockPool) downloadThread() { - serviceTimer := time.NewTicker(100 * time.Millisecond) -out: - for { - select { - case <-self.quit: - break out - case <-serviceTimer.C: - // Check if we're catching up. If not distribute the hashes to - // the peers and download the blockchain - self.fetchingHashes = false - eachPeer(self.eth.peers, func(p *Peer, v *list.Element) { - if p.statusKnown && p.FetchingHashes() { - self.fetchingHashes = true - } - }) - - if len(self.hashes) > 0 { - self.DistributeHashes() - } - - if self.ChainLength < len(self.hashes) { - self.ChainLength = len(self.hashes) - } - - if self.peer != nil && - !self.peer.doneFetchingHashes && - time.Since(self.peer.lastHashAt) > 10*time.Second && - time.Since(self.peer.lastHashRequestedAt) > 5*time.Second { - self.fetchHashes() - } - - /* - if !self.fetchingHashes { - blocks := self.Blocks() - chain.BlockBy(chain.Number).Sort(blocks) - - if len(blocks) > 0 { - if !self.eth.ChainManager().HasBlock(b.PrevHash) && self.pool[string(b.PrevHash)] == nil && !self.fetchingHashes { - } - } - } - */ - } - } -} - -func (self *BlockPool) chainThread() { - procTimer := time.NewTicker(500 * time.Millisecond) -out: - for { - select { - case <-self.quit: - break out - case <-procTimer.C: - blocks := self.Blocks() - types.BlockBy(types.Number).Sort(blocks) - - // Find common block - for i, block := range blocks { - if self.eth.ChainManager().HasBlock(block.PrevHash) { - blocks = blocks[i:] - break - } - } - - if len(blocks) > 0 { - if self.eth.ChainManager().HasBlock(blocks[0].PrevHash) { - for i, block := range blocks[1:] { - // NOTE: The Ith element in this loop refers to the previous block in - // outer "blocks" - if bytes.Compare(block.PrevHash, blocks[i].Hash()) != 0 { - blocks = blocks[:i] - - break - } - } - } else { - blocks = nil - } - } - - if len(blocks) > 0 { - chainman := self.eth.ChainManager() - - err := chainman.InsertChain(blocks) - if err != nil { - poollogger.Debugln(err) - - self.Reset() - - if self.peer != nil && self.peer.conn != nil { - poollogger.Debugf("Punishing peer for supplying bad chain (%v)\n", self.peer.conn.RemoteAddr()) - } - - // This peer gave us bad hashes and made us fetch a bad chain, therefor he shall be punished. - self.eth.BlacklistPeer(self.peer) - self.peer.StopWithReason(DiscBadPeer) - self.td = ethutil.Big0 - self.peer = nil - } - - for _, block := range blocks { - self.Remove(block.Hash()) - } - } - } - } -} diff --git a/cmd/ethereum/cmd.go b/cmd/ethereum/cmd.go index 8710d6136..d8b9ea487 100644 --- a/cmd/ethereum/cmd.go +++ b/cmd/ethereum/cmd.go @@ -21,9 +21,9 @@ import ( "io/ioutil" "os" - "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/cmd/ethereum/repl" "github.com/ethereum/go-ethereum/cmd/utils" + "github.com/ethereum/go-ethereum/eth" "github.com/ethereum/go-ethereum/javascript" ) diff --git a/cmd/ethereum/flags.go b/cmd/ethereum/flags.go index 783944cf2..556735491 100644 --- a/cmd/ethereum/flags.go +++ b/cmd/ethereum/flags.go @@ -38,7 +38,8 @@ var ( StartRpc bool StartWebSockets bool RpcPort int - UseUPnP bool + NatType string + PMPGateway string OutboundPort string ShowGenesis bool AddPeer string @@ -84,7 +85,8 @@ func Init() { flag.StringVar(&KeyRing, "keyring", "", "identifier for keyring to use") flag.StringVar(&KeyStore, "keystore", "db", "system to store keyrings: db|file (db)") flag.StringVar(&OutboundPort, "port", "30303", "listening port") - flag.BoolVar(&UseUPnP, "upnp", false, "enable UPnP support") + flag.StringVar(&NatType, "nat", "", "NAT support (UPNP|PMP) (none)") + flag.StringVar(&PMPGateway, "pmp", "", "Gateway IP for PMP") flag.IntVar(&MaxPeer, "maxpeer", 10, "maximum desired peers") flag.IntVar(&RpcPort, "rpcport", 8080, "port to start json-rpc server on") flag.BoolVar(&StartRpc, "rpc", false, "start rpc server") diff --git a/cmd/ethereum/main.go b/cmd/ethereum/main.go index 9efc8e9dc..da09e0b58 100644 --- a/cmd/ethereum/main.go +++ b/cmd/ethereum/main.go @@ -69,9 +69,9 @@ func main() { // create, import, export keys utils.KeyTasks(keyManager, KeyRing, GenAddr, SecretFile, ExportDir, NonInteractive) - clientIdentity := utils.NewClientIdentity(ClientIdentifier, Version, Identifier) + clientIdentity := utils.NewClientIdentity(ClientIdentifier, Version, Identifier, string(keyManager.PublicKey())) - ethereum := utils.NewEthereum(db, clientIdentity, keyManager, UseUPnP, OutboundPort, MaxPeer) + ethereum := utils.NewEthereum(db, clientIdentity, keyManager, utils.NatType(NatType, PMPGateway), OutboundPort, MaxPeer) if Dump { var block *types.Block diff --git a/cmd/ethereum/repl/repl.go b/cmd/ethereum/repl/repl.go index a5146fecd..4a7880ff4 100644 --- a/cmd/ethereum/repl/repl.go +++ b/cmd/ethereum/repl/repl.go @@ -24,7 +24,7 @@ import ( "os" "path" - "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/eth" "github.com/ethereum/go-ethereum/ethutil" "github.com/ethereum/go-ethereum/javascript" "github.com/ethereum/go-ethereum/logger" diff --git a/cmd/mist/assets/qml/main.qml b/cmd/mist/assets/qml/main.qml index 9f1f214a6..285757080 100644 --- a/cmd/mist/assets/qml/main.qml +++ b/cmd/mist/assets/qml/main.qml @@ -50,6 +50,7 @@ ApplicationWindow { addPlugin("./views/miner.qml", {noAdd: true, close: false, section: "ethereum", active: true}); addPlugin("./views/transaction.qml", {noAdd: true, close: false, section: "legacy"}); + addPlugin("./views/whisper.qml", {noAdd: true, close: false, section: "legacy"}); addPlugin("./views/chain.qml", {noAdd: true, close: false, section: "legacy"}); addPlugin("./views/pending_tx.qml", {noAdd: true, close: false, section: "legacy"}); addPlugin("./views/info.qml", {noAdd: true, close: false, section: "legacy"}); diff --git a/cmd/mist/assets/qml/views/whisper.qml b/cmd/mist/assets/qml/views/whisper.qml new file mode 100644 index 000000000..b50841ba5 --- /dev/null +++ b/cmd/mist/assets/qml/views/whisper.qml @@ -0,0 +1,47 @@ + +import QtQuick 2.0 +import QtQuick.Controls 1.0; +import QtQuick.Layouts 1.0; +import QtQuick.Dialogs 1.0; +import QtQuick.Window 2.1; +import QtQuick.Controls.Styles 1.1 +import Ethereum 1.0 + +Rectangle { + id: root + property var title: "Whisper" + property var iconSource: "../facet.png" + property var menuItem + + objectName: "whisperView" + anchors.fill: parent + + property var identity: "" + Component.onCompleted: { + identity = shh.newIdentity() + console.log("New identity:", identity) + + var t = shh.watch({topics: ["chat"]}) + } + + RowLayout { + TextField { + id: to + placeholderText: "To" + } + TextField { + id: data + placeholderText: "Data" + } + TextField { + id: topics + placeholderText: "topic1, topic2, topic3, ..." + } + Button { + text: "Send" + onClicked: { + shh.post(eth.toHex(data.text), "", identity, topics.text.split(","), 500, 50) + } + } + } +} diff --git a/cmd/mist/flags.go b/cmd/mist/flags.go index 2ae0a0487..1d77532d9 100644 --- a/cmd/mist/flags.go +++ b/cmd/mist/flags.go @@ -36,10 +36,12 @@ var ( Identifier string KeyRing string KeyStore string + PMPGateway string StartRpc bool StartWebSockets bool RpcPort int UseUPnP bool + NatType string OutboundPort string ShowGenesis bool AddPeer string @@ -111,10 +113,12 @@ func Init() { flag.BoolVar(&NonInteractive, "y", false, "non-interactive mode (say yes to confirmations)") flag.BoolVar(&UseSeed, "seed", true, "seed peers") flag.BoolVar(&GenAddr, "genaddr", false, "create a new priv/pub key") + flag.StringVar(&NatType, "nat", "", "NAT support (UPNP|PMP) (none)") flag.StringVar(&SecretFile, "import", "", "imports the file given (hex or mnemonic formats)") flag.StringVar(&ExportDir, "export", "", "exports the session keyring to files in the directory given") flag.StringVar(&LogFile, "logfile", "", "log file (defaults to standard output)") flag.StringVar(&Datadir, "datadir", defaultDataDir(), "specifies the datadir to use") + flag.StringVar(&PMPGateway, "pmp", "", "Gateway IP for PMP") flag.StringVar(&ConfigFile, "conf", defaultConfigFile, "config file") flag.StringVar(&DebugFile, "debug", "", "debug file (no debugging if not set)") flag.IntVar(&LogLevel, "loglevel", int(logger.InfoLevel), "loglevel: 0-5: silent,error,warn,info,debug,debug detail)") diff --git a/cmd/mist/gui.go b/cmd/mist/gui.go index 7775889cc..3ab307174 100644 --- a/cmd/mist/gui.go +++ b/cmd/mist/gui.go @@ -30,14 +30,15 @@ import ( "strings" "time" - "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/eth" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/ethutil" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/miner" - "github.com/ethereum/go-ethereum/wire" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/ui/qt/qwhisper" "github.com/ethereum/go-ethereum/xeth" "gopkg.in/qml.v1" ) @@ -87,7 +88,8 @@ type Gui struct { eth *eth.Ethereum // The public Ethereum library - uiLib *UiLib + uiLib *UiLib + whisper *qwhisper.Whisper txDb *ethdb.LDBDatabase @@ -97,7 +99,7 @@ type Gui struct { pipe *xeth.JSXEth Session string - clientIdentity *wire.SimpleClientIdentity + clientIdentity *p2p.SimpleClientIdentity config *ethutil.ConfigManager plugins map[string]plugin @@ -107,7 +109,7 @@ type Gui struct { } // Create GUI, but doesn't start it -func NewWindow(ethereum *eth.Ethereum, config *ethutil.ConfigManager, clientIdentity *wire.SimpleClientIdentity, session string, logLevel int) *Gui { +func NewWindow(ethereum *eth.Ethereum, config *ethutil.ConfigManager, clientIdentity *p2p.SimpleClientIdentity, session string, logLevel int) *Gui { db, err := ethdb.NewLDBDatabase("tx_database") if err != nil { panic(err) @@ -138,10 +140,12 @@ func (gui *Gui) Start(assetPath string) { gui.engine = qml.NewEngine() context := gui.engine.Context() gui.uiLib = NewUiLib(gui.engine, gui.eth, assetPath) + gui.whisper = qwhisper.New(gui.eth.Whisper()) // Expose the eth library and the ui library to QML context.SetVar("gui", gui) context.SetVar("eth", gui.uiLib) + context.SetVar("shh", gui.whisper) // Load the main QML interface data, _ := ethutil.Config.Db.Get([]byte("KeyRing")) @@ -391,6 +395,8 @@ func (gui *Gui) update() { gui.setPeerInfo() }() + gui.whisper.SetView(gui.win.Root().ObjectByName("whisperView")) + for _, plugin := range gui.plugins { guilogger.Infoln("Loading plugin ", plugin.Name) @@ -409,8 +415,7 @@ func (gui *Gui) update() { miningLabel := gui.getObjectByName("miningLabel") events := gui.eth.EventMux().Subscribe( - eth.ChainSyncEvent{}, - eth.PeerListEvent{}, + //eth.PeerListEvent{}, core.NewBlockEvent{}, core.TxPreEvent{}, core.TxPostEvent{}, @@ -460,9 +465,6 @@ func (gui *Gui) update() { gui.setWalletValue(object.Balance(), nil) state.UpdateStateObject(object) - - case eth.PeerListEvent: - gui.setPeerInfo() } case <-peerUpdateTicker.C: @@ -472,16 +474,18 @@ func (gui *Gui) update() { lastBlockLabel.Set("text", statusText) miningLabel.Set("text", "Mining @ "+strconv.FormatInt(gui.uiLib.miner.GetPow().GetHashrate(), 10)+"Khash") - blockLength := gui.eth.BlockPool().BlocksProcessed - chainLength := gui.eth.BlockPool().ChainLength + /* + blockLength := gui.eth.BlockPool().BlocksProcessed + chainLength := gui.eth.BlockPool().ChainLength - var ( - pct float64 = 1.0 / float64(chainLength) * float64(blockLength) - dlWidget = gui.win.Root().ObjectByName("downloadIndicator") - dlLabel = gui.win.Root().ObjectByName("downloadLabel") - ) - dlWidget.Set("value", pct) - dlLabel.Set("text", fmt.Sprintf("%d / %d", blockLength, chainLength)) + var ( + pct float64 = 1.0 / float64(chainLength) * float64(blockLength) + dlWidget = gui.win.Root().ObjectByName("downloadIndicator") + dlLabel = gui.win.Root().ObjectByName("downloadLabel") + ) + dlWidget.Set("value", pct) + dlLabel.Set("text", fmt.Sprintf("%d / %d", blockLength, chainLength)) + */ case <-statsUpdateTicker.C: gui.setStatsPane() @@ -509,7 +513,7 @@ Heap Alloc: %d CGNext: %x NumGC: %d `, Version, runtime.Version(), - eth.ProtocolVersion, eth.P2PVersion, + eth.ProtocolVersion, 2, runtime.NumCPU, runtime.NumGoroutine(), runtime.NumCgoCall(), memStats.Alloc, memStats.HeapAlloc, memStats.NextGC, memStats.NumGC, diff --git a/cmd/mist/main.go b/cmd/mist/main.go index 14336b4e8..3ea6e8e91 100644 --- a/cmd/mist/main.go +++ b/cmd/mist/main.go @@ -23,8 +23,8 @@ import ( "runtime" "time" - "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/cmd/utils" + "github.com/ethereum/go-ethereum/eth" "github.com/ethereum/go-ethereum/logger" "gopkg.in/qml.v1" ) @@ -58,8 +58,8 @@ func run() error { // create, import, export keys utils.KeyTasks(keyManager, KeyRing, GenAddr, SecretFile, ExportDir, NonInteractive) - clientIdentity := utils.NewClientIdentity(ClientIdentifier, Version, Identifier) - ethereum = utils.NewEthereum(db, clientIdentity, keyManager, UseUPnP, OutboundPort, MaxPeer) + clientIdentity := utils.NewClientIdentity(ClientIdentifier, Version, Identifier, string(keyManager.PublicKey())) + ethereum := utils.NewEthereum(db, clientIdentity, keyManager, utils.NatType(NatType, PMPGateway), OutboundPort, MaxPeer) if ShowGenesis { utils.ShowGenesis(ethereum) @@ -69,6 +69,10 @@ func run() error { utils.StartRpc(ethereum, RpcPort) } + if StartWebSockets { + utils.StartWebSockets(ethereum) + } + gui := NewWindow(ethereum, config, clientIdentity, KeyRing, LogLevel) gui.stdLog = stdLog @@ -100,16 +104,10 @@ func main() { utils.HandleInterrupt() - if StartWebSockets { - utils.StartWebSockets(ethereum) - } - // we need to run the interrupt callbacks in case gui is closed // this skips if we got here by actual interrupt stopping the GUI if !interrupted { utils.RunInterruptCallbacks(os.Interrupt) } - // this blocks the thread - ethereum.WaitForShutdown() logger.Flush() } diff --git a/cmd/mist/ui_lib.go b/cmd/mist/ui_lib.go index fdbde50fd..68f333563 100644 --- a/cmd/mist/ui_lib.go +++ b/cmd/mist/ui_lib.go @@ -24,11 +24,12 @@ import ( "strconv" "strings" - "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/eth" "github.com/ethereum/go-ethereum/ethutil" + "github.com/ethereum/go-ethereum/event/filter" "github.com/ethereum/go-ethereum/javascript" "github.com/ethereum/go-ethereum/miner" "github.com/ethereum/go-ethereum/state" @@ -57,6 +58,7 @@ type UiLib struct { jsEngine *javascript.JSRE filterCallbacks map[int][]int + filterManager *filter.FilterManager miner *miner.Miner } @@ -64,6 +66,7 @@ type UiLib struct { func NewUiLib(engine *qml.Engine, eth *eth.Ethereum, assetPath string) *UiLib { lib := &UiLib{JSXEth: xeth.NewJSXEth(eth), engine: engine, eth: eth, assetPath: assetPath, jsEngine: javascript.NewJSRE(eth), filterCallbacks: make(map[int][]int)} //, filters: make(map[int]*xeth.JSFilter)} lib.miner = miner.New(eth.KeyManager().Address(), eth) + lib.filterManager = filter.NewFilterManager(eth.EventMux()) return lib } @@ -123,7 +126,8 @@ func (self *UiLib) LookupAddress(name string) string { } func (self *UiLib) PastPeers() *ethutil.List { - return ethutil.NewList(eth.PastPeers()) + return ethutil.NewList([]string{}) + //return ethutil.NewList(eth.PastPeers()) } func (self *UiLib) ImportTx(rlpTx string) { @@ -191,7 +195,7 @@ func (ui *UiLib) Connect(button qml.Object) { } func (ui *UiLib) ConnectToPeer(addr string) { - ui.eth.ConnectToPeer(addr) + ui.eth.SuggestPeer(addr) } func (ui *UiLib) AssetPath(p string) string { @@ -226,7 +230,7 @@ func (self *UiLib) NewFilter(object map[string]interface{}) (id int) { filter.MessageCallback = func(messages state.Messages) { self.win.Root().Call("invokeFilterCallback", xeth.ToJSMessages(messages), id) } - id = self.eth.InstallFilter(filter) + id = self.filterManager.InstallFilter(filter) return id } @@ -239,12 +243,12 @@ func (self *UiLib) NewFilterString(typ string) (id int) { fmt.Println("QML is lagging") } } - id = self.eth.InstallFilter(filter) + id = self.filterManager.InstallFilter(filter) return id } func (self *UiLib) Messages(id int) *ethutil.List { - filter := self.eth.GetFilter(id) + filter := self.filterManager.GetFilter(id) if filter != nil { messages := xeth.ToJSMessages(filter.Find()) @@ -255,7 +259,7 @@ func (self *UiLib) Messages(id int) *ethutil.List { } func (self *UiLib) UninstallFilter(id int) { - self.eth.UninstallFilter(id) + self.filterManager.UninstallFilter(id) } func mapToTxParams(object map[string]interface{}) map[string]string { @@ -372,3 +376,16 @@ func (self *UiLib) ToggleMining() bool { return false } } + +func (self *UiLib) ToHex(data string) string { + return "0x" + ethutil.Bytes2Hex([]byte(data)) +} + +/* +// XXX Refactor me & MOVE +func (self *Ethereum) InstallFilter(filter *core.Filter) (id int) { + return self.filterManager.InstallFilter(filter) +} +func (self *Ethereum) UninstallFilter(id int) { self.filterManager.UninstallFilter(id) } +func (self *Ethereum) GetFilter(id int) *core.Filter { return self.filterManager.GetFilter(id) } +*/ diff --git a/cmd/utils/cmd.go b/cmd/utils/cmd.go index db7bcd35e..3e3ac617a 100644 --- a/cmd/utils/cmd.go +++ b/cmd/utils/cmd.go @@ -4,23 +4,23 @@ import ( "fmt" "io" "log" + "net" "os" "os/signal" "path" "path/filepath" "regexp" "runtime" - "time" "bitbucket.org/kardianos/osext" - "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/eth" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/ethutil" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/miner" + "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/rpc" - "github.com/ethereum/go-ethereum/wire" "github.com/ethereum/go-ethereum/xeth" ) @@ -144,17 +144,32 @@ func NewDatabase() ethutil.Database { return db } -func NewClientIdentity(clientIdentifier, version, customIdentifier string) *wire.SimpleClientIdentity { - return wire.NewSimpleClientIdentity(clientIdentifier, version, customIdentifier) +func NewClientIdentity(clientIdentifier, version, customIdentifier string, pubkey string) *p2p.SimpleClientIdentity { + return p2p.NewSimpleClientIdentity(clientIdentifier, version, customIdentifier, pubkey) } -func NewEthereum(db ethutil.Database, clientIdentity wire.ClientIdentity, keyManager *crypto.KeyManager, usePnp bool, OutboundPort string, MaxPeer int) *eth.Ethereum { - ethereum, err := eth.New(db, clientIdentity, keyManager, eth.CapDefault, usePnp) +func NatType(natType string, gateway string) (nat p2p.NAT) { + switch natType { + case "UPNP": + nat = p2p.UPNP() + case "PMP": + ip := net.ParseIP(gateway) + if ip == nil { + clilogger.Fatalf("cannot resolve PMP gateway IP %s", gateway) + } + nat = p2p.PMP(ip) + case "": + default: + clilogger.Fatalf("unrecognised NAT type '%s'", natType) + } + return +} + +func NewEthereum(db ethutil.Database, clientIdentity p2p.ClientIdentity, keyManager *crypto.KeyManager, nat p2p.NAT, OutboundPort string, MaxPeer int) *eth.Ethereum { + ethereum, err := eth.New(db, clientIdentity, keyManager, nat, OutboundPort, MaxPeer) if err != nil { clilogger.Fatalln("eth start err:", err) } - ethereum.Port = OutboundPort - ethereum.MaxPeers = MaxPeer return ethereum } @@ -268,11 +283,6 @@ func StartMining(ethereum *eth.Ethereum) bool { if gminer == nil { gminer = miner.New(addr, ethereum) } - // Give it some time to connect with peers - time.Sleep(3 * time.Second) - for !ethereum.IsUpToDate() { - time.Sleep(5 * time.Second) - } gminer.Start() }() RegisterInterrupt(func(os.Signal) { diff --git a/cmd/utils/websockets.go b/cmd/utils/websockets.go index d3ba50e78..29f9b8aeb 100644 --- a/cmd/utils/websockets.go +++ b/cmd/utils/websockets.go @@ -1,7 +1,7 @@ package utils import ( - "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/eth" "github.com/ethereum/go-ethereum/ethutil" "github.com/ethereum/go-ethereum/websocket" "github.com/ethereum/go-ethereum/xeth" diff --git a/core/block_manager.go b/core/block_manager.go index 8d319f84e..85b4891a5 100644 --- a/core/block_manager.go +++ b/core/block_manager.go @@ -2,7 +2,6 @@ package core import ( "bytes" - "container/list" "errors" "fmt" "math/big" @@ -14,10 +13,10 @@ import ( "github.com/ethereum/go-ethereum/ethutil" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/pow" "github.com/ethereum/go-ethereum/pow/ezp" "github.com/ethereum/go-ethereum/state" - "github.com/ethereum/go-ethereum/wire" ) var statelogger = logger.NewLogger("BLOCK") @@ -38,13 +37,12 @@ type EthManager interface { BlockManager() *BlockManager ChainManager() *ChainManager TxPool() *TxPool - Broadcast(msgType wire.MsgType, data []interface{}) PeerCount() int IsMining() bool IsListening() bool - Peers() *list.List + Peers() []*p2p.Peer KeyManager() *crypto.KeyManager - ClientIdentity() wire.ClientIdentity + ClientIdentity() p2p.ClientIdentity Db() ethutil.Database EventMux() *event.TypeMux } diff --git a/core/chain_manager.go b/core/chain_manager.go index 794ae0011..e6268c01e 100644 --- a/core/chain_manager.go +++ b/core/chain_manager.go @@ -100,6 +100,13 @@ func NewChainManager(mux *event.TypeMux) *ChainManager { return bc } +func (self *ChainManager) Status() (td *big.Int, currentBlock []byte, genesisBlock []byte) { + self.mu.RLock() + defer self.mu.RUnlock() + + return self.td, self.currentBlock.Hash(), self.Genesis().Hash() +} + func (self *ChainManager) SetProcessor(proc types.BlockProcessor) { self.processor = proc } @@ -221,7 +228,7 @@ func (bc *ChainManager) HasBlock(hash []byte) bool { return len(data) != 0 } -func (self *ChainManager) GetChainHashesFromHash(hash []byte, max uint64) (chain [][]byte) { +func (self *ChainManager) GetBlockHashesFromHash(hash []byte, max uint64) (chain [][]byte) { block := self.GetBlock(hash) if block == nil { return diff --git a/core/events.go b/core/events.go index deeba3e98..fe106da49 100644 --- a/core/events.go +++ b/core/events.go @@ -10,3 +10,6 @@ type TxPostEvent struct{ Tx *types.Transaction } // NewBlockEvent is posted when a block has been imported. type NewBlockEvent struct{ Block *types.Block } + +// NewMinedBlockEvent is posted when a block has been imported. +type NewMinedBlockEvent struct{ Block *types.Block } diff --git a/core/transaction_pool.go b/core/transaction_pool.go index 58c2255a4..17fcdb86a 100644 --- a/core/transaction_pool.go +++ b/core/transaction_pool.go @@ -11,7 +11,6 @@ import ( "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/state" - "github.com/ethereum/go-ethereum/wire" ) var txplogger = logger.NewLogger("TXP") @@ -19,7 +18,9 @@ var txplogger = logger.NewLogger("TXP") const txPoolQueueSize = 50 type TxPoolHook chan *types.Transaction -type TxMsgTy byte +type TxMsg struct { + Tx *types.Transaction +} const ( minGasPrice = 1000000 @@ -27,11 +28,6 @@ const ( var MinGasPrice = big.NewInt(10000000000000) -type TxMsg struct { - Tx *types.Transaction - Type TxMsgTy -} - func EachTx(pool *list.List, it func(*types.Transaction, *list.Element) bool) { for e := pool.Front(); e != nil; e = e.Next() { if it(e.Value.(*types.Transaction), e) { @@ -76,19 +72,17 @@ type TxPool struct { subscribers []chan TxMsg - broadcaster types.Broadcaster chainManager *ChainManager eventMux *event.TypeMux } -func NewTxPool(chainManager *ChainManager, broadcaster types.Broadcaster, eventMux *event.TypeMux) *TxPool { +func NewTxPool(chainManager *ChainManager, eventMux *event.TypeMux) *TxPool { return &TxPool{ pool: list.New(), queueChan: make(chan *types.Transaction, txPoolQueueSize), quit: make(chan bool), chainManager: chainManager, eventMux: eventMux, - broadcaster: broadcaster, } } @@ -100,7 +94,7 @@ func (pool *TxPool) addTransaction(tx *types.Transaction) { pool.pool.PushBack(tx) // Broadcast the transaction to the rest of the peers - pool.broadcaster.Broadcast(wire.MsgTxTy, []interface{}{tx.RlpData()}) + pool.eventMux.Post(TxPreEvent{tx}) } func (pool *TxPool) ValidateTransaction(tx *types.Transaction) error { @@ -166,7 +160,17 @@ func (self *TxPool) Size() int { return self.pool.Len() } -func (pool *TxPool) CurrentTransactions() []*types.Transaction { +func (self *TxPool) AddTransactions(txs []*types.Transaction) { + for _, tx := range txs { + if err := self.Add(tx); err != nil { + txplogger.Infoln(err) + } else { + txplogger.Infof("tx %x\n", tx.Hash()[0:4]) + } + } +} + +func (pool *TxPool) GetTransactions() []*types.Transaction { pool.mutex.Lock() defer pool.mutex.Unlock() @@ -213,7 +217,7 @@ func (self *TxPool) RemoveSet(txs types.Transactions) { } func (pool *TxPool) Flush() []*types.Transaction { - txList := pool.CurrentTransactions() + txList := pool.GetTransactions() // Recreate a new list all together // XXX Is this the fastest way? diff --git a/core/types/common.go b/core/types/common.go index 89cb5f498..ba88b77e1 100644 --- a/core/types/common.go +++ b/core/types/common.go @@ -4,13 +4,8 @@ import ( "math/big" "github.com/ethereum/go-ethereum/state" - "github.com/ethereum/go-ethereum/wire" ) type BlockProcessor interface { Process(*Block) (*big.Int, state.Messages, error) } - -type Broadcaster interface { - Broadcast(wire.MsgType, []interface{}) -} diff --git a/eth/backend.go b/eth/backend.go new file mode 100644 index 000000000..0aad6a514 --- /dev/null +++ b/eth/backend.go @@ -0,0 +1,249 @@ +package eth + +import ( + "net" + "sync" + + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/ethutil" + "github.com/ethereum/go-ethereum/event" + ethlogger "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/pow/ezp" + "github.com/ethereum/go-ethereum/rpc" + "github.com/ethereum/go-ethereum/whisper" +) + +const ( + seedNodeAddress = "poc-7.ethdev.com:30300" +) + +var logger = ethlogger.NewLogger("SERV") + +type Ethereum struct { + // Channel for shutting down the ethereum + shutdownChan chan bool + quit chan bool + + // DB interface + db ethutil.Database + blacklist p2p.Blacklist + + //*** SERVICES *** + // State manager for processing new blocks and managing the over all states + blockManager *core.BlockManager + txPool *core.TxPool + chainManager *core.ChainManager + blockPool *BlockPool + whisper *whisper.Whisper + + server *p2p.Server + eventMux *event.TypeMux + txSub event.Subscription + blockSub event.Subscription + + RpcServer *rpc.JsonRpcServer + keyManager *crypto.KeyManager + + clientIdentity p2p.ClientIdentity + + synclock sync.Mutex + syncGroup sync.WaitGroup + + Mining bool +} + +func New(db ethutil.Database, identity p2p.ClientIdentity, keyManager *crypto.KeyManager, nat p2p.NAT, port string, maxPeers int) (*Ethereum, error) { + + saveProtocolVersion(db) + ethutil.Config.Db = db + + eth := &Ethereum{ + shutdownChan: make(chan bool), + quit: make(chan bool), + db: db, + keyManager: keyManager, + clientIdentity: identity, + blacklist: p2p.NewBlacklist(), + eventMux: &event.TypeMux{}, + } + + eth.chainManager = core.NewChainManager(eth.EventMux()) + eth.txPool = core.NewTxPool(eth.chainManager, eth.EventMux()) + eth.blockManager = core.NewBlockManager(eth.txPool, eth.chainManager, eth.EventMux()) + eth.chainManager.SetProcessor(eth.blockManager) + eth.whisper = whisper.New() + + hasBlock := eth.chainManager.HasBlock + insertChain := eth.chainManager.InsertChain + eth.blockPool = NewBlockPool(hasBlock, insertChain, ezp.Verify) + + // Start services + eth.txPool.Start() + + ethProto := EthProtocol(eth.txPool, eth.chainManager, eth.blockPool) + protocols := []p2p.Protocol{ethProto, eth.whisper.Protocol()} + + server := &p2p.Server{ + Identity: identity, + MaxPeers: maxPeers, + Protocols: protocols, + ListenAddr: ":" + port, + Blacklist: eth.blacklist, + NAT: nat, + } + + eth.server = server + + return eth, nil +} + +func (s *Ethereum) KeyManager() *crypto.KeyManager { + return s.keyManager +} + +func (s *Ethereum) ClientIdentity() p2p.ClientIdentity { + return s.clientIdentity +} + +func (s *Ethereum) ChainManager() *core.ChainManager { + return s.chainManager +} + +func (s *Ethereum) BlockManager() *core.BlockManager { + return s.blockManager +} + +func (s *Ethereum) TxPool() *core.TxPool { + return s.txPool +} + +func (s *Ethereum) BlockPool() *BlockPool { + return s.blockPool +} + +func (s *Ethereum) Whisper() *whisper.Whisper { + return s.whisper +} + +func (s *Ethereum) EventMux() *event.TypeMux { + return s.eventMux +} +func (self *Ethereum) Db() ethutil.Database { + return self.db +} + +func (s *Ethereum) IsMining() bool { + return s.Mining +} + +func (s *Ethereum) IsListening() bool { + // XXX TODO + return false +} + +func (s *Ethereum) PeerCount() int { + return s.server.PeerCount() +} + +func (s *Ethereum) Peers() []*p2p.Peer { + return s.server.Peers() +} + +func (s *Ethereum) MaxPeers() int { + return s.server.MaxPeers +} + +// Start the ethereum +func (s *Ethereum) Start(seed bool) error { + err := s.server.Start() + if err != nil { + return err + } + s.blockPool.Start() + s.whisper.Start() + + // broadcast transactions + s.txSub = s.eventMux.Subscribe(core.TxPreEvent{}) + go s.txBroadcastLoop() + + // broadcast mined blocks + s.blockSub = s.eventMux.Subscribe(core.NewMinedBlockEvent{}) + go s.blockBroadcastLoop() + + // TODO: read peers here + if seed { + logger.Infof("Connect to seed node %v", seedNodeAddress) + if err := s.SuggestPeer(seedNodeAddress); err != nil { + return err + } + } + + logger.Infoln("Server started") + return nil +} + +func (self *Ethereum) SuggestPeer(addr string) error { + netaddr, err := net.ResolveTCPAddr("tcp", addr) + if err != nil { + logger.Errorf("couldn't resolve %s:", addr, err) + return err + } + + self.server.SuggestPeer(netaddr.IP, netaddr.Port, nil) + return nil +} + +func (s *Ethereum) Stop() { + // Close the database + defer s.db.Close() + + close(s.quit) + + s.txSub.Unsubscribe() // quits txBroadcastLoop + s.blockSub.Unsubscribe() // quits blockBroadcastLoop + + if s.RpcServer != nil { + s.RpcServer.Stop() + } + s.txPool.Stop() + s.eventMux.Stop() + s.blockPool.Stop() + s.whisper.Stop() + + logger.Infoln("Server stopped") + close(s.shutdownChan) +} + +// This function will wait for a shutdown and resumes main thread execution +func (s *Ethereum) WaitForShutdown() { + <-s.shutdownChan +} + +// now tx broadcasting is taken out of txPool +// handled here via subscription, efficiency? +func (self *Ethereum) txBroadcastLoop() { + // automatically stops if unsubscribe + for obj := range self.txSub.Chan() { + event := obj.(core.TxPreEvent) + self.server.Broadcast("eth", TxMsg, []interface{}{event.Tx.RlpData()}) + } +} + +func (self *Ethereum) blockBroadcastLoop() { + // automatically stops if unsubscribe + for obj := range self.txSub.Chan() { + event := obj.(core.NewMinedBlockEvent) + self.server.Broadcast("eth", NewBlockMsg, event.Block.Value().Val) + } +} + +func saveProtocolVersion(db ethutil.Database) { + d, _ := db.Get([]byte("ProtocolVersion")) + protocolVersion := ethutil.NewValue(d).Uint() + + if protocolVersion == 0 { + db.Put([]byte("ProtocolVersion"), ethutil.NewValue(ProtocolVersion).Bytes()) + } +} diff --git a/eth/block_pool.go b/eth/block_pool.go new file mode 100644 index 000000000..7cfbc63f8 --- /dev/null +++ b/eth/block_pool.go @@ -0,0 +1,1015 @@ +package eth + +import ( + "math" + "math/big" + "math/rand" + "sort" + "sync" + "time" + + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethutil" + ethlogger "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/pow" +) + +var poolLogger = ethlogger.NewLogger("Blockpool") + +const ( + blockHashesBatchSize = 256 + blockBatchSize = 64 + blocksRequestInterval = 10 // seconds + blocksRequestRepetition = 1 + blockHashesRequestInterval = 10 // seconds + blocksRequestMaxIdleRounds = 10 + cacheTimeout = 3 // minutes + blockTimeout = 5 // minutes +) + +type poolNode struct { + lock sync.RWMutex + hash []byte + block *types.Block + child *poolNode + parent *poolNode + section *section + knownParent bool + peer string + source string + complete bool +} + +type BlockPool struct { + lock sync.RWMutex + pool map[string]*poolNode + + peersLock sync.RWMutex + peers map[string]*peerInfo + peer *peerInfo + + quit chan bool + wg sync.WaitGroup + running bool + + // the minimal interface with blockchain + hasBlock func(hash []byte) bool + insertChain func(types.Blocks) error + verifyPoW func(pow.Block) bool +} + +type peerInfo struct { + lock sync.RWMutex + + td *big.Int + currentBlock []byte + id string + + requestBlockHashes func([]byte) error + requestBlocks func([][]byte) error + peerError func(int, string, ...interface{}) + + sections map[string]*section + roots []*poolNode + quitC chan bool +} + +func NewBlockPool(hasBlock func(hash []byte) bool, insertChain func(types.Blocks) error, verifyPoW func(pow.Block) bool, +) *BlockPool { + return &BlockPool{ + hasBlock: hasBlock, + insertChain: insertChain, + verifyPoW: verifyPoW, + } +} + +// allows restart +func (self *BlockPool) Start() { + self.lock.Lock() + if self.running { + self.lock.Unlock() + return + } + self.running = true + self.quit = make(chan bool) + self.pool = make(map[string]*poolNode) + self.lock.Unlock() + + self.peersLock.Lock() + self.peers = make(map[string]*peerInfo) + self.peersLock.Unlock() + + poolLogger.Infoln("Started") + +} + +func (self *BlockPool) Stop() { + self.lock.Lock() + if !self.running { + self.lock.Unlock() + return + } + self.running = false + self.lock.Unlock() + + poolLogger.Infoln("Stopping") + + close(self.quit) + self.lock.Lock() + self.peersLock.Lock() + self.peers = nil + self.pool = nil + self.peer = nil + self.wg.Wait() + self.lock.Unlock() + self.peersLock.Unlock() + poolLogger.Infoln("Stopped") + +} + +// AddPeer is called by the eth protocol instance running on the peer after +// the status message has been received with total difficulty and current block hash +// AddPeer can only be used once, RemovePeer needs to be called when the peer disconnects +func (self *BlockPool) AddPeer(td *big.Int, currentBlock []byte, peerId string, requestBlockHashes func([]byte) error, requestBlocks func([][]byte) error, peerError func(int, string, ...interface{})) bool { + self.peersLock.Lock() + defer self.peersLock.Unlock() + if self.peers[peerId] != nil { + panic("peer already added") + } + peer := &peerInfo{ + td: td, + currentBlock: currentBlock, + id: peerId, //peer.Identity().Pubkey() + requestBlockHashes: requestBlockHashes, + requestBlocks: requestBlocks, + peerError: peerError, + } + self.peers[peerId] = peer + poolLogger.Debugf("add new peer %v with td %v", peerId, td) + currentTD := ethutil.Big0 + if self.peer != nil { + currentTD = self.peer.td + } + if td.Cmp(currentTD) > 0 { + self.peer.stop(peer) + peer.start(self.peer) + poolLogger.Debugf("peer %v promoted to best peer", peerId) + self.peer = peer + return true + } + return false +} + +// RemovePeer is called by the eth protocol when the peer disconnects +func (self *BlockPool) RemovePeer(peerId string) { + self.peersLock.Lock() + defer self.peersLock.Unlock() + peer := self.peers[peerId] + if peer == nil { + return + } + self.peers[peerId] = nil + poolLogger.Debugf("remove peer %v", peerId[0:4]) + + // if current best peer is removed, need find a better one + if self.peer != nil && peerId == self.peer.id { + var newPeer *peerInfo + max := ethutil.Big0 + // peer with the highest self-acclaimed TD is chosen + for _, info := range self.peers { + if info.td.Cmp(max) > 0 { + max = info.td + newPeer = info + } + } + self.peer.stop(peer) + peer.start(self.peer) + if newPeer != nil { + poolLogger.Debugf("peer %v with td %v promoted to best peer", newPeer.id[0:4], newPeer.td) + } else { + poolLogger.Warnln("no peers left") + } + } +} + +// Entry point for eth protocol to add block hashes received via BlockHashesMsg +// only hashes from the best peer is handled +// this method is always responsible to initiate further hash requests until +// a known parent is reached unless cancelled by a peerChange event +// this process also launches all request processes on each chain section +// this function needs to run asynchronously for one peer since the message is discarded??? +func (self *BlockPool) AddBlockHashes(next func() ([]byte, bool), peerId string) { + + // check if this peer is the best + peer, best := self.getPeer(peerId) + if !best { + return + } + // peer is still the best + + var child *poolNode + var depth int + + // iterate using next (rlp stream lazy decoder) feeding hashesC + self.wg.Add(1) + go func() { + for { + select { + case <-self.quit: + return + case <-peer.quitC: + // if the peer is demoted, no more hashes taken + break + default: + hash, ok := next() + if !ok { + // message consumed chain skeleton built + break + } + // check if known block connecting the downloaded chain to our blockchain + if self.hasBlock(hash) { + poolLogger.Infof("known block (%x...)\n", hash[0:4]) + if child != nil { + child.Lock() + // mark child as absolute pool root with parent known to blockchain + child.knownParent = true + child.Unlock() + } + break + } + // + var parent *poolNode + // look up node in pool + parent = self.get(hash) + if parent != nil { + // reached a known chain in the pool + // request blocks on the newly added part of the chain + if child != nil { + self.link(parent, child) + + // activate the current chain + self.activateChain(parent, peer, true) + poolLogger.Debugf("potential chain of %v blocks added, reached blockpool, activate chain", depth) + break + } + // if this is the first hash, we expect to find it + parent.RLock() + grandParent := parent.parent + parent.RUnlock() + if grandParent != nil { + // activate the current chain + self.activateChain(parent, peer, true) + poolLogger.Debugf("block hash found, activate chain") + break + } + // the first node is the root of a chain in the pool, rejoice and continue + } + // if node does not exist, create it and index in the pool + section := §ion{} + if child == nil { + section.top = parent + } + parent = &poolNode{ + hash: hash, + child: child, + section: section, + peer: peerId, + } + self.set(hash, parent) + poolLogger.Debugf("create potential block for %x...", hash[0:4]) + + depth++ + child = parent + } + } + if child != nil { + poolLogger.Debugf("chain of %v hashes added", depth) + // start a processSection on the last node, but switch off asking + // hashes and blocks until next peer confirms this chain + section := self.processSection(child) + peer.addSection(child.hash, section) + section.start() + } + }() +} + +// AddBlock is the entry point for the eth protocol when blockmsg is received upon requests +// It has a strict interpretation of the protocol in that if the block received has not been requested, it results in an error (which can be ignored) +// block is checked for PoW +// only the first PoW-valid block for a hash is considered legit +func (self *BlockPool) AddBlock(block *types.Block, peerId string) { + hash := block.Hash() + node := self.get(hash) + node.RLock() + b := node.block + node.RUnlock() + if b != nil { + return + } + if node == nil && !self.hasBlock(hash) { + self.peerError(peerId, ErrUnrequestedBlock, "%x", hash) + return + } + // validate block for PoW + if !self.verifyPoW(block) { + self.peerError(peerId, ErrInvalidPoW, "%x", hash) + } + node.Lock() + node.block = block + node.source = peerId + node.Unlock() +} + +// iterates down a known poolchain and activates fetching processes +// on each chain section for the peer +// stops if the peer is demoted +// registers last section root as root for the peer (in case peer is promoted a second time, to remember) +func (self *BlockPool) activateChain(node *poolNode, peer *peerInfo, on bool) { + self.wg.Add(1) + go func() { + for { + node.sectionRLock() + bottom := node.section.bottom + if bottom == nil { // the chain section is being created or killed + break + } + // register this section with the peer + if peer != nil { + peer.addSection(bottom.hash, bottom.section) + if on { + bottom.section.start() + } else { + bottom.section.start() + } + } + if bottom.parent == nil { + node = bottom + break + } + // if peer demoted stop activation + select { + case <-peer.quitC: + break + default: + } + + node = bottom.parent + bottom.sectionRUnlock() + } + // remember root for this peer + peer.addRoot(node) + self.wg.Done() + }() +} + +// main worker thread on each section in the poolchain +// - kills the section if there are blocks missing after an absolute time +// - kills the section if there are maxIdleRounds of idle rounds of block requests with no response +// - periodically polls the chain section for missing blocks which are then requested from peers +// - registers the process controller on the peer so that if the peer is promoted as best peer the second time (after a disconnect of a better one), all active processes are switched back on unless they expire and killed () +// - when turned off (if peer disconnects and new peer connects with alternative chain), no blockrequests are made but absolute expiry timer is ticking +// - when turned back on it recursively calls itself on the root of the next chain section +// - when exits, signals to +func (self *BlockPool) processSection(node *poolNode) *section { + // absolute time after which sub-chain is killed if not complete (some blocks are missing) + suicideTimer := time.After(blockTimeout * time.Minute) + var blocksRequestTimer, blockHashesRequestTimer <-chan time.Time + var nodeC, missingC, processC chan *poolNode + controlC := make(chan bool) + resetC := make(chan bool) + var hashes [][]byte + var i, total, missing, lastMissing, depth int + var blockHashesRequests, blocksRequests int + var idle int + var init, alarm, done, same, running, once bool + orignode := node + hash := node.hash + + node.sectionLock() + defer node.sectionUnlock() + section := §ion{controlC: controlC, resetC: resetC} + node.section = section + + go func() { + self.wg.Add(1) + for { + node.sectionRLock() + controlC = node.section.controlC + node.sectionRUnlock() + + if init { + // missing blocks read from nodeC + // initialized section + if depth == 0 { + break + } + // enable select case to read missing block when ready + processC = missingC + missingC = make(chan *poolNode, lastMissing) + nodeC = nil + // only do once + init = false + } else { + if !once { + missingC = nil + processC = nil + i = 0 + total = 0 + lastMissing = 0 + } + } + + // went through all blocks in section + if i != 0 && i == lastMissing { + if len(hashes) > 0 { + // send block requests to peers + self.requestBlocks(blocksRequests, hashes) + } + blocksRequests++ + poolLogger.Debugf("[%x] block request attempt %v: missing %v/%v/%v", hash[0:4], blocksRequests, missing, total, depth) + if missing == lastMissing { + // idle round + if same { + // more than once + idle++ + // too many idle rounds + if idle > blocksRequestMaxIdleRounds { + poolLogger.Debugf("[%x] block requests had %v idle rounds (%v total attempts): missing %v/%v/%v\ngiving up...", hash[0:4], idle, blocksRequests, missing, total, depth) + self.killChain(node, nil) + break + } + } else { + idle = 0 + } + same = true + } else { + if missing == 0 { + // no missing nodes + poolLogger.Debugf("block request process complete on section %x... (%v total blocksRequests): missing %v/%v/%v", hash[0:4], blockHashesRequests, blocksRequests, missing, total, depth) + node.Lock() + orignode.complete = true + node.Unlock() + blocksRequestTimer = nil + if blockHashesRequestTimer == nil { + // not waiting for hashes any more + poolLogger.Debugf("hash request on root %x... successful (%v total attempts)\nquitting...", hash[0:4], blockHashesRequests) + break + } // otherwise suicide if no hashes coming + } + same = false + } + lastMissing = missing + i = 0 + missing = 0 + // ready for next round + done = true + } + if done && alarm { + poolLogger.Debugf("start checking if new blocks arrived (attempt %v): missing %v/%v/%v", blocksRequests, missing, total, depth) + blocksRequestTimer = time.After(blocksRequestInterval * time.Second) + alarm = false + done = false + // processC supposed to be empty and never closed so just swap, no need to allocate + tempC := processC + processC = missingC + missingC = tempC + } + select { + case <-self.quit: + break + case <-suicideTimer: + self.killChain(node, nil) + poolLogger.Warnf("[%x] timeout. (%v total attempts): missing %v/%v/%v", hash[0:4], blocksRequests, missing, total, depth) + break + case <-blocksRequestTimer: + alarm = true + case <-blockHashesRequestTimer: + orignode.RLock() + parent := orignode.parent + orignode.RUnlock() + if parent != nil { + // if not root of chain, switch off + poolLogger.Debugf("[%x] parent found, hash requests deactivated (after %v total attempts)\n", hash[0:4], blockHashesRequests) + blockHashesRequestTimer = nil + } else { + blockHashesRequests++ + poolLogger.Debugf("[%x] hash request on root (%v total attempts)\n", hash[0:4], blockHashesRequests) + self.requestBlockHashes(parent.hash) + blockHashesRequestTimer = time.After(blockHashesRequestInterval * time.Second) + } + case r, ok := <-controlC: + if !ok { + break + } + if running && !r { + poolLogger.Debugf("process on section %x... (%v total attempts): missing %v/%v/%v", hash[0:4], blocksRequests, missing, total, depth) + + alarm = false + blocksRequestTimer = nil + blockHashesRequestTimer = nil + processC = nil + } + if !running && r { + poolLogger.Debugf("[%x] on", hash[0:4]) + + orignode.RLock() + parent := orignode.parent + complete := orignode.complete + knownParent := orignode.knownParent + orignode.RUnlock() + if !complete { + poolLogger.Debugf("[%x] activate block requests", hash[0:4]) + blocksRequestTimer = time.After(0) + } + if parent == nil && !knownParent { + // if no parent but not connected to blockchain + poolLogger.Debugf("[%x] activate block hashes requests", hash[0:4]) + blockHashesRequestTimer = time.After(0) + } else { + blockHashesRequestTimer = nil + } + alarm = true + processC = missingC + if !once { + // if not run at least once fully, launch iterator + processC = make(chan *poolNode) + missingC = make(chan *poolNode) + self.foldUp(orignode, processC) + once = true + } + } + total = lastMissing + case <-resetC: + once = false + init = false + done = false + case node, ok := <-processC: + if !ok { + // channel closed, first iteration finished + init = true + once = true + continue + } + i++ + // if node has no block + node.RLock() + block := node.block + nhash := node.hash + knownParent := node.knownParent + node.RUnlock() + if !init { + depth++ + } + if block == nil { + missing++ + if !init { + total++ + } + hashes = append(hashes, nhash) + if len(hashes) == blockBatchSize { + self.requestBlocks(blocksRequests, hashes) + hashes = nil + } + missingC <- node + } else { + // block is found + if knownParent { + // connected to the blockchain, insert the longest chain of blocks + var blocks types.Blocks + child := node + parent := node + node.sectionRLock() + for child != nil && child.block != nil { + parent = child + blocks = append(blocks, parent.block) + child = parent.child + } + node.sectionRUnlock() + poolLogger.Debugf("[%x] insert %v blocks into blockchain", hash[0:4], len(blocks)) + if err := self.insertChain(blocks); err != nil { + // TODO: not clear which peer we need to address + // peerError should dispatch to peer if still connected and disconnect + self.peerError(node.source, ErrInvalidBlock, "%v", err) + poolLogger.Debugf("invalid block %v", node.hash) + poolLogger.Debugf("penalise peers %v (hash), %v (block)", node.peer, node.source) + // penalise peer in node.source + self.killChain(node, nil) + // self.disconnect() + break + } + // if suceeded mark the next one (no block yet) as connected to blockchain + if child != nil { + child.Lock() + child.knownParent = true + child.Unlock() + } + // reset starting node to first node with missing block + orignode = child + // pop the inserted ancestors off the channel + for i := 1; i < len(blocks); i++ { + <-processC + } + // delink inserted chain section + self.killChain(node, parent) + } + } + } + } + poolLogger.Debugf("[%x] quit after\n%v block hashes requests\n%v block requests: missing %v/%v/%v", hash[0:4], blockHashesRequests, blocksRequests, missing, total, depth) + + self.wg.Done() + node.sectionLock() + node.section.controlC = nil + node.sectionUnlock() + // this signals that controller not available + }() + return section + +} + +func (self *BlockPool) peerError(peerId string, code int, format string, params ...interface{}) { + self.peersLock.RLock() + defer self.peersLock.RUnlock() + peer, ok := self.peers[peerId] + if ok { + peer.peerError(code, format, params...) + } +} + +func (self *BlockPool) requestBlockHashes(hash []byte) { + self.peersLock.Lock() + defer self.peersLock.Unlock() + if self.peer != nil { + self.peer.requestBlockHashes(hash) + } +} + +func (self *BlockPool) requestBlocks(attempts int, hashes [][]byte) { + // distribute block request among known peers + self.peersLock.Lock() + defer self.peersLock.Unlock() + peerCount := len(self.peers) + // on first attempt use the best peer + if attempts == 0 { + self.peer.requestBlocks(hashes) + return + } + repetitions := int(math.Min(float64(peerCount), float64(blocksRequestRepetition))) + poolLogger.Debugf("request %v missing blocks from %v/%v peers", len(hashes), repetitions, peerCount) + i := 0 + indexes := rand.Perm(peerCount)[0:(repetitions - 1)] + sort.Ints(indexes) + for _, peer := range self.peers { + if i == indexes[0] { + peer.requestBlocks(hashes) + indexes = indexes[1:] + if len(indexes) == 0 { + break + } + } + i++ + } +} + +func (self *BlockPool) getPeer(peerId string) (*peerInfo, bool) { + self.peersLock.RLock() + defer self.peersLock.RUnlock() + if self.peer != nil && self.peer.id == peerId { + return self.peer, true + } + info, ok := self.peers[peerId] + if !ok { + panic("unknown peer") + } + return info, false +} + +func (self *peerInfo) addSection(hash []byte, section *section) { + self.lock.Lock() + defer self.lock.Unlock() + self.sections[string(hash)] = section +} + +func (self *peerInfo) addRoot(node *poolNode) { + self.lock.Lock() + defer self.lock.Unlock() + self.roots = append(self.roots, node) +} + +// (re)starts processes registered for this peer (self) +func (self *peerInfo) start(peer *peerInfo) { + self.lock.Lock() + defer self.lock.Unlock() + self.quitC = make(chan bool) + for _, root := range self.roots { + root.sectionRLock() + if root.section.bottom != nil { + if root.parent == nil { + self.requestBlockHashes(root.hash) + } + } + root.sectionRUnlock() + } + self.roots = nil + self.controlSections(peer, true) +} + +// (re)starts process without requests, only suicide timer +func (self *peerInfo) stop(peer *peerInfo) { + self.lock.RLock() + defer self.lock.RUnlock() + close(self.quitC) + self.controlSections(peer, false) +} + +func (self *peerInfo) controlSections(peer *peerInfo, on bool) { + if peer != nil { + peer.lock.RLock() + defer peer.lock.RUnlock() + } + for hash, section := range peer.sections { + if section.done() { + delete(self.sections, hash) + } + _, exists := peer.sections[hash] + if on || peer == nil || exists { + if on { + // self is best peer + section.start() + } else { + // (re)starts process without requests, only suicide timer + section.stop() + } + } + } +} + +// called when parent is found in pool +// parent and child are guaranteed to be on different sections +func (self *BlockPool) link(parent, child *poolNode) { + var top bool + parent.sectionLock() + if child != nil { + child.sectionLock() + } + if parent == parent.section.top && parent.section.top != nil { + top = true + } + var bottom bool + + if child == child.section.bottom { + bottom = true + } + if parent.child != child { + orphan := parent.child + if orphan != nil { + // got a fork in the chain + if top { + orphan.lock.Lock() + // make old child orphan + orphan.parent = nil + orphan.lock.Unlock() + } else { // we are under section lock + // make old child orphan + orphan.parent = nil + // reset section objects above the fork + nchild := orphan.child + node := orphan + section := §ion{bottom: orphan} + for node.section == nchild.section { + node = nchild + node.section = section + nchild = node.child + } + section.top = node + // set up a suicide + self.processSection(orphan).stop() + } + } else { + // child is on top of a chain need to close section + child.section.bottom = child + } + // adopt new child + parent.child = child + if !top { + parent.section.top = parent + // restart section process so that shorter section is scanned for blocks + parent.section.reset() + } + } + + if child != nil { + if child.parent != parent { + stepParent := child.parent + if stepParent != nil { + if bottom { + stepParent.Lock() + stepParent.child = nil + stepParent.Unlock() + } else { + // we are on the same section + // if it is a aberrant reverse fork, + stepParent.child = nil + node := stepParent + nparent := stepParent.child + section := §ion{top: stepParent} + for node.section == nparent.section { + node = nparent + node.section = section + node = node.parent + } + } + } else { + // linking to a root node, ie. parent is under the root of a chain + parent.section.top = parent + } + } + child.parent = parent + child.section.bottom = child + } + // this needed if someone lied about the parent before + child.knownParent = false + + parent.sectionUnlock() + if child != nil { + child.sectionUnlock() + } +} + +// this immediately kills the chain from node to end (inclusive) section by section +func (self *BlockPool) killChain(node *poolNode, end *poolNode) { + poolLogger.Debugf("kill chain section with root node %v", node) + + node.sectionLock() + node.section.abort() + self.set(node.hash, nil) + child := node.child + top := node.section.top + i := 1 + self.wg.Add(1) + go func() { + var quit bool + for node != top && node != end && child != nil { + node = child + select { + case <-self.quit: + quit = true + break + default: + } + self.set(node.hash, nil) + child = node.child + } + poolLogger.Debugf("killed chain section of %v blocks with root node %v", i, node) + if !quit { + if node == top { + if node != end && child != nil && end != nil { + // + self.killChain(child, end) + } + } else { + if child != nil { + // delink rest of this section if ended midsection + child.section.bottom = child + child.parent = nil + } + } + } + node.section.bottom = nil + node.sectionUnlock() + self.wg.Done() + }() +} + +// structure to store long range links on chain to skip along +type section struct { + lock sync.RWMutex + bottom *poolNode + top *poolNode + controlC chan bool + resetC chan bool +} + +func (self *section) start() { + self.lock.RLock() + defer self.lock.RUnlock() + if self.controlC != nil { + self.controlC <- true + } +} + +func (self *section) stop() { + self.lock.RLock() + defer self.lock.RUnlock() + if self.controlC != nil { + self.controlC <- false + } +} + +func (self *section) reset() { + self.lock.RLock() + defer self.lock.RUnlock() + if self.controlC != nil { + self.resetC <- true + self.controlC <- false + } +} + +func (self *section) abort() { + self.lock.Lock() + defer self.lock.Unlock() + if self.controlC != nil { + close(self.controlC) + self.controlC = nil + } +} + +func (self *section) done() bool { + self.lock.Lock() + defer self.lock.Unlock() + if self.controlC != nil { + return true + } + return false +} + +func (self *BlockPool) get(hash []byte) (node *poolNode) { + self.lock.Lock() + defer self.lock.Unlock() + return self.pool[string(hash)] +} + +func (self *BlockPool) set(hash []byte, node *poolNode) { + self.lock.Lock() + defer self.lock.Unlock() + self.pool[string(hash)] = node +} + +// first time for block request, this iteration retrieves nodes of the chain +// from node up to top (all the way if nil) via child links +// copies the controller +// and feeds nodeC channel +// this is performed under section readlock to prevent top from going away +// when +func (self *BlockPool) foldUp(node *poolNode, nodeC chan *poolNode) { + self.wg.Add(1) + go func() { + node.sectionRLock() + defer node.sectionRUnlock() + for node != nil { + select { + case <-self.quit: + break + case nodeC <- node: + if node == node.section.top { + break + } + node = node.child + } + } + close(nodeC) + self.wg.Done() + }() +} + +func (self *poolNode) Lock() { + self.sectionLock() + self.lock.Lock() +} + +func (self *poolNode) Unlock() { + self.lock.Unlock() + self.sectionUnlock() +} + +func (self *poolNode) RLock() { + self.lock.RLock() +} + +func (self *poolNode) RUnlock() { + self.lock.RUnlock() +} + +func (self *poolNode) sectionLock() { + self.lock.RLock() + defer self.lock.RUnlock() + self.section.lock.Lock() +} + +func (self *poolNode) sectionUnlock() { + self.lock.RLock() + defer self.lock.RUnlock() + self.section.lock.Unlock() +} + +func (self *poolNode) sectionRLock() { + self.lock.RLock() + defer self.lock.RUnlock() + self.section.lock.RLock() +} + +func (self *poolNode) sectionRUnlock() { + self.lock.RLock() + defer self.lock.RUnlock() + self.section.lock.RUnlock() +} diff --git a/eth/block_pool_test.go b/eth/block_pool_test.go new file mode 100644 index 000000000..315cc748d --- /dev/null +++ b/eth/block_pool_test.go @@ -0,0 +1,198 @@ +package eth + +import ( + "bytes" + "fmt" + "log" + "os" + "sync" + "testing" + + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/ethutil" + ethlogger "github.com/ethereum/go-ethereum/logger" +) + +var sys = ethlogger.NewStdLogSystem(os.Stdout, log.LstdFlags, ethlogger.LogLevel(ethlogger.DebugDetailLevel)) + +type testChainManager struct { + knownBlock func(hash []byte) bool + addBlock func(*types.Block) error + checkPoW func(*types.Block) bool +} + +func (self *testChainManager) KnownBlock(hash []byte) bool { + if self.knownBlock != nil { + return self.knownBlock(hash) + } + return false +} + +func (self *testChainManager) AddBlock(block *types.Block) error { + if self.addBlock != nil { + return self.addBlock(block) + } + return nil +} + +func (self *testChainManager) CheckPoW(block *types.Block) bool { + if self.checkPoW != nil { + return self.checkPoW(block) + } + return false +} + +func knownBlock(hashes ...[]byte) (f func([]byte) bool) { + f = func(block []byte) bool { + for _, hash := range hashes { + if bytes.Compare(block, hash) == 0 { + return true + } + } + return false + } + return +} + +func addBlock(hashes ...[]byte) (f func(*types.Block) error) { + f = func(block *types.Block) error { + for _, hash := range hashes { + if bytes.Compare(block.Hash(), hash) == 0 { + return fmt.Errorf("invalid by test") + } + } + return nil + } + return +} + +func checkPoW(hashes ...[]byte) (f func(*types.Block) bool) { + f = func(block *types.Block) bool { + for _, hash := range hashes { + if bytes.Compare(block.Hash(), hash) == 0 { + return false + } + } + return true + } + return +} + +func newTestChainManager(knownBlocks [][]byte, invalidBlocks [][]byte, invalidPoW [][]byte) *testChainManager { + return &testChainManager{ + knownBlock: knownBlock(knownBlocks...), + addBlock: addBlock(invalidBlocks...), + checkPoW: checkPoW(invalidPoW...), + } +} + +type intToHash map[int][]byte + +type hashToInt map[string]int + +type testHashPool struct { + intToHash + hashToInt +} + +func newHash(i int) []byte { + return crypto.Sha3([]byte(string(i))) +} + +func newTestBlockPool(knownBlockIndexes []int, invalidBlockIndexes []int, invalidPoWIndexes []int) (hashPool *testHashPool, blockPool *BlockPool) { + hashPool = &testHashPool{make(intToHash), make(hashToInt)} + knownBlocks := hashPool.indexesToHashes(knownBlockIndexes) + invalidBlocks := hashPool.indexesToHashes(invalidBlockIndexes) + invalidPoW := hashPool.indexesToHashes(invalidPoWIndexes) + blockPool = NewBlockPool(newTestChainManager(knownBlocks, invalidBlocks, invalidPoW)) + return +} + +func (self *testHashPool) indexesToHashes(indexes []int) (hashes [][]byte) { + for _, i := range indexes { + hash, found := self.intToHash[i] + if !found { + hash = newHash(i) + self.intToHash[i] = hash + self.hashToInt[string(hash)] = i + } + hashes = append(hashes, hash) + } + return +} + +func (self *testHashPool) hashesToIndexes(hashes [][]byte) (indexes []int) { + for _, hash := range hashes { + i, found := self.hashToInt[string(hash)] + if !found { + i = -1 + } + indexes = append(indexes, i) + } + return +} + +type protocolChecker struct { + blockHashesRequests []int + blocksRequests [][]int + invalidBlocks []error + hashPool *testHashPool + lock sync.Mutex +} + +// -1 is special: not found (a hash never seen) +func (self *protocolChecker) requestBlockHashesCallBack() (requestBlockHashesCallBack func([]byte) error) { + requestBlockHashesCallBack = func(hash []byte) error { + indexes := self.hashPool.hashesToIndexes([][]byte{hash}) + self.lock.Lock() + defer self.lock.Unlock() + self.blockHashesRequests = append(self.blockHashesRequests, indexes[0]) + return nil + } + return +} + +func (self *protocolChecker) requestBlocksCallBack() (requestBlocksCallBack func([][]byte) error) { + requestBlocksCallBack = func(hashes [][]byte) error { + indexes := self.hashPool.hashesToIndexes(hashes) + self.lock.Lock() + defer self.lock.Unlock() + self.blocksRequests = append(self.blocksRequests, indexes) + return nil + } + return +} + +func (self *protocolChecker) invalidBlockCallBack() (invalidBlockCallBack func(error)) { + invalidBlockCallBack = func(err error) { + self.invalidBlocks = append(self.invalidBlocks, err) + } + return +} + +func TestAddPeer(t *testing.T) { + ethlogger.AddLogSystem(sys) + knownBlockIndexes := []int{0, 1} + invalidBlockIndexes := []int{2, 3} + invalidPoWIndexes := []int{4, 5} + hashPool, blockPool := newTestBlockPool(knownBlockIndexes, invalidBlockIndexes, invalidPoWIndexes) + // TODO: + // hashPool, blockPool, blockChainChecker = newTestBlockPool(knownBlockIndexes, invalidBlockIndexes, invalidPoWIndexes) + peer0 := &protocolChecker{ + // blockHashesRequests: make([]int), + // blocksRequests: make([][]int), + // invalidBlocks: make([]error), + hashPool: hashPool, + } + best := blockPool.AddPeer(ethutil.Big1, newHash(100), "0", + peer0.requestBlockHashesCallBack(), + peer0.requestBlocksCallBack(), + peer0.invalidBlockCallBack(), + ) + if !best { + t.Errorf("peer not accepted as best") + } + blockPool.Stop() + +} diff --git a/eth/error.go b/eth/error.go new file mode 100644 index 000000000..d1daad575 --- /dev/null +++ b/eth/error.go @@ -0,0 +1,71 @@ +package eth + +import ( + "fmt" +) + +const ( + ErrMsgTooLarge = iota + ErrDecode + ErrInvalidMsgCode + ErrProtocolVersionMismatch + ErrNetworkIdMismatch + ErrGenesisBlockMismatch + ErrNoStatusMsg + ErrExtraStatusMsg + ErrInvalidBlock + ErrInvalidPoW + ErrUnrequestedBlock +) + +var errorToString = map[int]string{ + ErrMsgTooLarge: "Message too long", + ErrDecode: "Invalid message", + ErrInvalidMsgCode: "Invalid message code", + ErrProtocolVersionMismatch: "Protocol version mismatch", + ErrNetworkIdMismatch: "NetworkId mismatch", + ErrGenesisBlockMismatch: "Genesis block mismatch", + ErrNoStatusMsg: "No status message", + ErrExtraStatusMsg: "Extra status message", + ErrInvalidBlock: "Invalid block", + ErrInvalidPoW: "Invalid PoW", + ErrUnrequestedBlock: "Unrequested block", +} + +type protocolError struct { + Code int + fatal bool + message string + format string + params []interface{} + // size int +} + +func newProtocolError(code int, format string, params ...interface{}) *protocolError { + return &protocolError{Code: code, format: format, params: params} +} + +func ProtocolError(code int, format string, params ...interface{}) (err *protocolError) { + err = newProtocolError(code, format, params...) + // report(err) + return +} + +func (self protocolError) Error() (message string) { + message = self.message + if message == "" { + message, ok := errorToString[self.Code] + if !ok { + panic("invalid error code") + } + if self.format != "" { + message += ": " + fmt.Sprintf(self.format, self.params...) + } + self.message = message + } + return +} + +func (self *protocolError) Fatal() bool { + return self.fatal +} diff --git a/eth/peer_util.go b/eth/peer_util.go new file mode 100644 index 000000000..6cf80cde2 --- /dev/null +++ b/eth/peer_util.go @@ -0,0 +1,23 @@ +package eth + +import ( + "encoding/json" + + "github.com/ethereum/go-ethereum/ethutil" +) + +func WritePeers(path string, addresses []string) { + if len(addresses) > 0 { + data, _ := json.MarshalIndent(addresses, "", " ") + ethutil.WriteFile(path, data) + } +} + +func ReadPeers(path string) (ips []string, err error) { + var data string + data, err = ethutil.ReadAllFile(path) + if err != nil { + json.Unmarshal([]byte(data), &ips) + } + return +} diff --git a/eth/protocol.go b/eth/protocol.go new file mode 100644 index 000000000..3b6f95d44 --- /dev/null +++ b/eth/protocol.go @@ -0,0 +1,319 @@ +package eth + +import ( + "bytes" + "fmt" + "math" + "math/big" + + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethutil" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/rlp" +) + +const ( + ProtocolVersion = 49 + NetworkId = 0 + ProtocolLength = uint64(8) + ProtocolMaxMsgSize = 10 * 1024 * 1024 +) + +// eth protocol message codes +const ( + StatusMsg = iota + GetTxMsg // unused + TxMsg + GetBlockHashesMsg + BlockHashesMsg + GetBlocksMsg + BlocksMsg + NewBlockMsg +) + +// ethProtocol represents the ethereum wire protocol +// instance is running on each peer +type ethProtocol struct { + txPool txPool + chainManager chainManager + blockPool blockPool + peer *p2p.Peer + id string + rw p2p.MsgReadWriter +} + +// backend is the interface the ethereum protocol backend should implement +// used as an argument to EthProtocol +type txPool interface { + AddTransactions([]*types.Transaction) +} + +type chainManager interface { + GetBlockHashesFromHash(hash []byte, amount uint64) (hashes [][]byte) + GetBlock(hash []byte) (block *types.Block) + Status() (td *big.Int, currentBlock []byte, genesisBlock []byte) +} + +type blockPool interface { + AddBlockHashes(next func() ([]byte, bool), peerId string) + AddBlock(block *types.Block, peerId string) + AddPeer(td *big.Int, currentBlock []byte, peerId string, requestHashes func([]byte) error, requestBlocks func([][]byte) error, peerError func(int, string, ...interface{})) (best bool) + RemovePeer(peerId string) +} + +// message structs used for rlp decoding +type newBlockMsgData struct { + Block *types.Block + TD *big.Int +} + +type getBlockHashesMsgData struct { + Hash []byte + Amount uint64 +} + +// main entrypoint, wrappers starting a server running the eth protocol +// use this constructor to attach the protocol ("class") to server caps +// the Dev p2p layer then runs the protocol instance on each peer +func EthProtocol(txPool txPool, chainManager chainManager, blockPool blockPool) p2p.Protocol { + return p2p.Protocol{ + Name: "eth", + Version: ProtocolVersion, + Length: ProtocolLength, + Run: func(peer *p2p.Peer, rw p2p.MsgReadWriter) error { + return runEthProtocol(txPool, chainManager, blockPool, peer, rw) + }, + } +} + +// the main loop that handles incoming messages +// note RemovePeer in the post-disconnect hook +func runEthProtocol(txPool txPool, chainManager chainManager, blockPool blockPool, peer *p2p.Peer, rw p2p.MsgReadWriter) (err error) { + self := ðProtocol{ + txPool: txPool, + chainManager: chainManager, + blockPool: blockPool, + rw: rw, + peer: peer, + id: (string)(peer.Identity().Pubkey()), + } + err = self.handleStatus() + if err == nil { + for { + err = self.handle() + if err != nil { + fmt.Println(err) + self.blockPool.RemovePeer(self.id) + break + } + } + } + return +} + +func (self *ethProtocol) handle() error { + msg, err := self.rw.ReadMsg() + if err != nil { + return err + } + if msg.Size > ProtocolMaxMsgSize { + return ProtocolError(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize) + } + // make sure that the payload has been fully consumed + defer msg.Discard() + + switch msg.Code { + + case StatusMsg: + return ProtocolError(ErrExtraStatusMsg, "") + + case TxMsg: + // TODO: rework using lazy RLP stream + var txs []*types.Transaction + if err := msg.Decode(&txs); err != nil { + return ProtocolError(ErrDecode, "%v", err) + } + self.txPool.AddTransactions(txs) + + case GetBlockHashesMsg: + var request getBlockHashesMsgData + if err := msg.Decode(&request); err != nil { + return ProtocolError(ErrDecode, "%v", err) + } + hashes := self.chainManager.GetBlockHashesFromHash(request.Hash, request.Amount) + return self.rw.EncodeMsg(BlockHashesMsg, ethutil.ByteSliceToInterface(hashes)...) + + case BlockHashesMsg: + // TODO: redo using lazy decode , this way very inefficient on known chains + msgStream := rlp.NewListStream(msg.Payload, uint64(msg.Size)) + var err error + iter := func() (hash []byte, ok bool) { + hash, err = msgStream.Bytes() + if err == nil { + ok = true + } + return + } + self.blockPool.AddBlockHashes(iter, self.id) + if err != nil && err != rlp.EOL { + return ProtocolError(ErrDecode, "%v", err) + } + + case GetBlocksMsg: + var blockHashes [][]byte + if err := msg.Decode(&blockHashes); err != nil { + return ProtocolError(ErrDecode, "%v", err) + } + max := int(math.Min(float64(len(blockHashes)), blockHashesBatchSize)) + var blocks []interface{} + for i, hash := range blockHashes { + if i >= max { + break + } + block := self.chainManager.GetBlock(hash) + if block != nil { + blocks = append(blocks, block.Value().Raw()) + } + } + return self.rw.EncodeMsg(BlocksMsg, blocks...) + + case BlocksMsg: + msgStream := rlp.NewListStream(msg.Payload, uint64(msg.Size)) + for { + var block *types.Block + if err := msgStream.Decode(&block); err != nil { + if err == rlp.EOL { + break + } else { + return ProtocolError(ErrDecode, "%v", err) + } + } + self.blockPool.AddBlock(block, self.id) + } + + case NewBlockMsg: + var request newBlockMsgData + if err := msg.Decode(&request); err != nil { + return ProtocolError(ErrDecode, "%v", err) + } + hash := request.Block.Hash() + // to simplify backend interface adding a new block + // uses AddPeer followed by AddHashes, AddBlock only if peer is the best peer + // (or selected as new best peer) + if self.blockPool.AddPeer(request.TD, hash, self.id, self.requestBlockHashes, self.requestBlocks, self.protoErrorDisconnect) { + called := true + iter := func() (hash []byte, ok bool) { + if called { + called = false + return hash, true + } else { + return + } + } + self.blockPool.AddBlockHashes(iter, self.id) + self.blockPool.AddBlock(request.Block, self.id) + } + + default: + return ProtocolError(ErrInvalidMsgCode, "%v", msg.Code) + } + return nil +} + +type statusMsgData struct { + ProtocolVersion uint + NetworkId uint + TD *big.Int + CurrentBlock []byte + GenesisBlock []byte +} + +func (self *ethProtocol) statusMsg() p2p.Msg { + td, currentBlock, genesisBlock := self.chainManager.Status() + + return p2p.NewMsg(StatusMsg, + uint32(ProtocolVersion), + uint32(NetworkId), + td, + currentBlock, + genesisBlock, + ) +} + +func (self *ethProtocol) handleStatus() error { + // send precanned status message + if err := self.rw.WriteMsg(self.statusMsg()); err != nil { + return err + } + + // read and handle remote status + msg, err := self.rw.ReadMsg() + if err != nil { + return err + } + + if msg.Code != StatusMsg { + return ProtocolError(ErrNoStatusMsg, "first msg has code %x (!= %x)", msg.Code, StatusMsg) + } + + if msg.Size > ProtocolMaxMsgSize { + return ProtocolError(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize) + } + + var status statusMsgData + if err := msg.Decode(&status); err != nil { + return ProtocolError(ErrDecode, "%v", err) + } + + _, _, genesisBlock := self.chainManager.Status() + + if bytes.Compare(status.GenesisBlock, genesisBlock) != 0 { + return ProtocolError(ErrGenesisBlockMismatch, "%x (!= %x)", status.GenesisBlock, genesisBlock) + } + + if status.NetworkId != NetworkId { + return ProtocolError(ErrNetworkIdMismatch, "%d (!= %d)", status.NetworkId, NetworkId) + } + + if ProtocolVersion != status.ProtocolVersion { + return ProtocolError(ErrProtocolVersionMismatch, "%d (!= %d)", status.ProtocolVersion, ProtocolVersion) + } + + self.peer.Infof("Peer is [eth] capable (%d/%d). TD=%v H=%x\n", status.ProtocolVersion, status.NetworkId, status.TD, status.CurrentBlock[:4]) + + //self.blockPool.AddPeer(status.TD, status.CurrentBlock, self.id, self.requestBlockHashes, self.requestBlocks, self.protoErrorDisconnect) + self.peer.Infoln("AddPeer(IGNORED)") + + return nil +} + +func (self *ethProtocol) requestBlockHashes(from []byte) error { + self.peer.Debugf("fetching hashes (%d) %x...\n", blockHashesBatchSize, from[0:4]) + return self.rw.EncodeMsg(GetBlockHashesMsg, from, blockHashesBatchSize) +} + +func (self *ethProtocol) requestBlocks(hashes [][]byte) error { + self.peer.Debugf("fetching %v blocks", len(hashes)) + return self.rw.EncodeMsg(GetBlocksMsg, ethutil.ByteSliceToInterface(hashes)) +} + +func (self *ethProtocol) protoError(code int, format string, params ...interface{}) (err *protocolError) { + err = ProtocolError(code, format, params...) + if err.Fatal() { + self.peer.Errorln(err) + } else { + self.peer.Debugln(err) + } + return +} + +func (self *ethProtocol) protoErrorDisconnect(code int, format string, params ...interface{}) { + err := ProtocolError(code, format, params...) + if err.Fatal() { + self.peer.Errorln(err) + // disconnect + } else { + self.peer.Debugln(err) + } + +} diff --git a/eth/protocol_test.go b/eth/protocol_test.go new file mode 100644 index 000000000..322aec7b7 --- /dev/null +++ b/eth/protocol_test.go @@ -0,0 +1,232 @@ +package eth + +import ( + "io" + "math/big" + "testing" + + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/p2p" +) + +type testMsgReadWriter struct { + in chan p2p.Msg + out chan p2p.Msg +} + +func (self *testMsgReadWriter) In(msg p2p.Msg) { + self.in <- msg +} + +func (self *testMsgReadWriter) Out(msg p2p.Msg) { + self.in <- msg +} + +func (self *testMsgReadWriter) WriteMsg(msg p2p.Msg) error { + self.out <- msg + return nil +} + +func (self *testMsgReadWriter) EncodeMsg(code uint64, data ...interface{}) error { + return self.WriteMsg(p2p.NewMsg(code, data)) +} + +func (self *testMsgReadWriter) ReadMsg() (p2p.Msg, error) { + msg, ok := <-self.in + if !ok { + return msg, io.EOF + } + return msg, nil +} + +func errorCheck(t *testing.T, expCode int, err error) { + perr, ok := err.(*protocolError) + if ok && perr != nil { + if code := perr.Code; code != expCode { + ok = false + } + } + if !ok { + t.Errorf("expected error code %v, got %v", ErrNoStatusMsg, err) + } +} + +type TestBackend struct { + getTransactions func() []*types.Transaction + addTransactions func(txs []*types.Transaction) + getBlockHashes func(hash []byte, amount uint32) (hashes [][]byte) + addBlockHashes func(next func() ([]byte, bool), peerId string) + getBlock func(hash []byte) *types.Block + addBlock func(block *types.Block, peerId string) (err error) + addPeer func(td *big.Int, currentBlock []byte, peerId string, requestHashes func([]byte) error, requestBlocks func([][]byte) error, invalidBlock func(error)) (best bool) + removePeer func(peerId string) + status func() (td *big.Int, currentBlock []byte, genesisBlock []byte) +} + +func (self *TestBackend) GetTransactions() (txs []*types.Transaction) { + if self.getTransactions != nil { + txs = self.getTransactions() + } + return +} + +func (self *TestBackend) AddTransactions(txs []*types.Transaction) { + if self.addTransactions != nil { + self.addTransactions(txs) + } +} + +func (self *TestBackend) GetBlockHashes(hash []byte, amount uint32) (hashes [][]byte) { + if self.getBlockHashes != nil { + hashes = self.getBlockHashes(hash, amount) + } + return +} + +<<<<<<< HEAD +<<<<<<< HEAD +func (self *TestBackend) AddBlockHashes(next func() ([]byte, bool), peerId string) { + if self.addBlockHashes != nil { + self.addBlockHashes(next, peerId) + } +} + +======= +func (self *TestBackend) AddHash(hash []byte, peer *p2p.Peer) (more bool) { + if self.addHash != nil { + more = self.addHash(hash, peer) +======= +func (self *TestBackend) AddBlockHashes(next func() ([]byte, bool), peerId string) { + if self.addBlockHashes != nil { + self.addBlockHashes(next, peerId) +>>>>>>> eth protocol changes + } +} +<<<<<<< HEAD +>>>>>>> initial commit for eth-p2p integration +======= + +>>>>>>> eth protocol changes +func (self *TestBackend) GetBlock(hash []byte) (block *types.Block) { + if self.getBlock != nil { + block = self.getBlock(hash) + } + return +} + +<<<<<<< HEAD +<<<<<<< HEAD +func (self *TestBackend) AddBlock(block *types.Block, peerId string) (err error) { + if self.addBlock != nil { + err = self.addBlock(block, peerId) +======= +func (self *TestBackend) AddBlock(td *big.Int, block *types.Block, peer *p2p.Peer) (fetchHashes bool, err error) { + if self.addBlock != nil { + fetchHashes, err = self.addBlock(td, block, peer) +>>>>>>> initial commit for eth-p2p integration +======= +func (self *TestBackend) AddBlock(block *types.Block, peerId string) (err error) { + if self.addBlock != nil { + err = self.addBlock(block, peerId) +>>>>>>> eth protocol changes + } + return +} + +<<<<<<< HEAD +<<<<<<< HEAD +func (self *TestBackend) AddPeer(td *big.Int, currentBlock []byte, peerId string, requestBlockHashes func([]byte) error, requestBlocks func([][]byte) error, invalidBlock func(error)) (best bool) { + if self.addPeer != nil { + best = self.addPeer(td, currentBlock, peerId, requestBlockHashes, requestBlocks, invalidBlock) +======= +func (self *TestBackend) AddPeer(td *big.Int, currentBlock []byte, peer *p2p.Peer) (fetchHashes bool) { + if self.addPeer != nil { + fetchHashes = self.addPeer(td, currentBlock, peer) +>>>>>>> initial commit for eth-p2p integration +======= +func (self *TestBackend) AddPeer(td *big.Int, currentBlock []byte, peerId string, requestBlockHashes func([]byte) error, requestBlocks func([][]byte) error, invalidBlock func(error)) (best bool) { + if self.addPeer != nil { + best = self.addPeer(td, currentBlock, peerId, requestBlockHashes, requestBlocks, invalidBlock) +>>>>>>> eth protocol changes + } + return +} + +<<<<<<< HEAD +<<<<<<< HEAD +======= +>>>>>>> eth protocol changes +func (self *TestBackend) RemovePeer(peerId string) { + if self.removePeer != nil { + self.removePeer(peerId) + } +} + +<<<<<<< HEAD +======= +>>>>>>> initial commit for eth-p2p integration +======= +>>>>>>> eth protocol changes +func (self *TestBackend) Status() (td *big.Int, currentBlock []byte, genesisBlock []byte) { + if self.status != nil { + td, currentBlock, genesisBlock = self.status() + } + return +} + +<<<<<<< HEAD +<<<<<<< HEAD +======= +>>>>>>> eth protocol changes +// TODO: refactor this into p2p/client_identity +type peerId struct { + pubkey []byte +} + +func (self *peerId) String() string { + return "test peer" +} + +func (self *peerId) Pubkey() (pubkey []byte) { + pubkey = self.pubkey + if len(pubkey) == 0 { + pubkey = crypto.GenerateNewKeyPair().PublicKey + self.pubkey = pubkey + } + return +} + +func testPeer() *p2p.Peer { + return p2p.NewPeer(&peerId{}, []p2p.Cap{}) +} + +func TestErrNoStatusMsg(t *testing.T) { +<<<<<<< HEAD +======= +func TestEth(t *testing.T) { +>>>>>>> initial commit for eth-p2p integration +======= +>>>>>>> eth protocol changes + quit := make(chan bool) + rw := &testMsgReadWriter{make(chan p2p.Msg, 10), make(chan p2p.Msg, 10)} + testBackend := &TestBackend{} + var err error + go func() { +<<<<<<< HEAD +<<<<<<< HEAD + err = runEthProtocol(testBackend, testPeer(), rw) +======= + err = runEthProtocol(testBackend, nil, rw) +>>>>>>> initial commit for eth-p2p integration +======= + err = runEthProtocol(testBackend, testPeer(), rw) +>>>>>>> eth protocol changes + close(quit) + }() + statusMsg := p2p.NewMsg(4) + rw.In(statusMsg) + <-quit + errorCheck(t, ErrNoStatusMsg, err) + // read(t, remote, []byte("hello, world"), nil) +} diff --git a/ethereum.go b/ethereum.go deleted file mode 100644 index 5d74e28e9..000000000 --- a/ethereum.go +++ /dev/null @@ -1,659 +0,0 @@ -package eth - -import ( - "container/list" - "encoding/json" - "fmt" - "math/big" - "math/rand" - "net" - "path" - "strconv" - "strings" - "sync" - "sync/atomic" - "time" - - "github.com/ethereum/go-ethereum/core" - "github.com/ethereum/go-ethereum/crypto" - "github.com/ethereum/go-ethereum/ethutil" - "github.com/ethereum/go-ethereum/event" - "github.com/ethereum/go-ethereum/logger" - "github.com/ethereum/go-ethereum/rpc" - "github.com/ethereum/go-ethereum/state" - "github.com/ethereum/go-ethereum/wire" -) - -const ( - seedTextFileUri string = "http://www.ethereum.org/servers.poc3.txt" - seedNodeAddress = "poc-7.ethdev.com:30303" -) - -var loggerger = logger.NewLogger("SERV") - -func eachPeer(peers *list.List, callback func(*Peer, *list.Element)) { - // Loop thru the peers and close them (if we had them) - for e := peers.Front(); e != nil; e = e.Next() { - callback(e.Value.(*Peer), e) - } -} - -const ( - processReapingTimeout = 60 // TODO increase -) - -type Ethereum struct { - // Channel for shutting down the ethereum - shutdownChan chan bool - quit chan bool - - // DB interface - db ethutil.Database - // State manager for processing new blocks and managing the over all states - blockManager *core.BlockManager - // The transaction pool. Transaction can be pushed on this pool - // for later including in the blocks - txPool *core.TxPool - // The canonical chain - blockChain *core.ChainManager - // The block pool - blockPool *BlockPool - // Eventer - eventMux event.TypeMux - // Peers - peers *list.List - // Nonce - Nonce uint64 - - Addr net.Addr - Port string - - blacklist [][]byte - - peerMut sync.Mutex - - // Capabilities for outgoing peers - serverCaps Caps - - nat NAT - - // Specifies the desired amount of maximum peers - MaxPeers int - - Mining bool - - listening bool - - RpcServer *rpc.JsonRpcServer - - keyManager *crypto.KeyManager - - clientIdentity wire.ClientIdentity - - isUpToDate bool - - filterMu sync.RWMutex - filterId int - filters map[int]*core.Filter -} - -func New(db ethutil.Database, clientIdentity wire.ClientIdentity, keyManager *crypto.KeyManager, caps Caps, usePnp bool) (*Ethereum, error) { - var err error - var nat NAT - - if usePnp { - nat, err = Discover() - if err != nil { - loggerger.Debugln("UPnP failed", err) - } - } - - bootstrapDb(db) - - ethutil.Config.Db = db - - nonce, _ := ethutil.RandomUint64() - ethereum := &Ethereum{ - shutdownChan: make(chan bool), - quit: make(chan bool), - db: db, - peers: list.New(), - Nonce: nonce, - serverCaps: caps, - nat: nat, - keyManager: keyManager, - clientIdentity: clientIdentity, - isUpToDate: true, - filters: make(map[int]*core.Filter), - } - - ethereum.blockPool = NewBlockPool(ethereum) - ethereum.blockChain = core.NewChainManager(ethereum.EventMux()) - ethereum.txPool = core.NewTxPool(ethereum.blockChain, ethereum, ethereum.EventMux()) - ethereum.blockManager = core.NewBlockManager(ethereum.txPool, ethereum.blockChain, ethereum.EventMux()) - ethereum.blockChain.SetProcessor(ethereum.blockManager) - - // Start the tx pool - ethereum.txPool.Start() - - return ethereum, nil -} - -func (s *Ethereum) KeyManager() *crypto.KeyManager { - return s.keyManager -} - -func (s *Ethereum) ClientIdentity() wire.ClientIdentity { - return s.clientIdentity -} - -func (s *Ethereum) ChainManager() *core.ChainManager { - return s.blockChain -} - -func (s *Ethereum) BlockManager() *core.BlockManager { - return s.blockManager -} - -func (s *Ethereum) TxPool() *core.TxPool { - return s.txPool -} -func (s *Ethereum) BlockPool() *BlockPool { - return s.blockPool -} -func (s *Ethereum) EventMux() *event.TypeMux { - return &s.eventMux -} -func (self *Ethereum) Db() ethutil.Database { - return self.db -} - -func (s *Ethereum) ServerCaps() Caps { - return s.serverCaps -} -func (s *Ethereum) IsMining() bool { - return s.Mining -} -func (s *Ethereum) PeerCount() int { - return s.peers.Len() -} -func (s *Ethereum) IsUpToDate() bool { - upToDate := true - eachPeer(s.peers, func(peer *Peer, e *list.Element) { - if atomic.LoadInt32(&peer.connected) == 1 { - if peer.catchingUp == true && peer.versionKnown { - upToDate = false - } - } - }) - return upToDate -} -func (s *Ethereum) PushPeer(peer *Peer) { - s.peers.PushBack(peer) -} -func (s *Ethereum) IsListening() bool { - return s.listening -} - -func (s *Ethereum) HighestTDPeer() (td *big.Int) { - td = big.NewInt(0) - - eachPeer(s.peers, func(p *Peer, v *list.Element) { - if p.td.Cmp(td) > 0 { - td = p.td - } - }) - - return -} - -func (self *Ethereum) BlacklistPeer(peer *Peer) { - self.blacklist = append(self.blacklist, peer.pubkey) -} - -func (s *Ethereum) AddPeer(conn net.Conn) { - peer := NewPeer(conn, s, true) - - if peer != nil { - if s.peers.Len() < s.MaxPeers { - peer.Start() - } else { - loggerger.Debugf("Max connected peers reached. Not adding incoming peer.") - } - } -} - -func (s *Ethereum) ProcessPeerList(addrs []string) { - for _, addr := range addrs { - // TODO Probably requires some sanity checks - s.ConnectToPeer(addr) - } -} - -func (s *Ethereum) ConnectToPeer(addr string) error { - if s.peers.Len() < s.MaxPeers { - var alreadyConnected bool - - ahost, aport, _ := net.SplitHostPort(addr) - var chost string - - ips, err := net.LookupIP(ahost) - - if err != nil { - return err - } else { - // If more then one ip is available try stripping away the ipv6 ones - if len(ips) > 1 { - var ipsv4 []net.IP - // For now remove the ipv6 addresses - for _, ip := range ips { - if strings.Contains(ip.String(), "::") { - continue - } else { - ipsv4 = append(ipsv4, ip) - } - } - if len(ipsv4) == 0 { - return fmt.Errorf("[SERV] No IPV4 addresses available for hostname") - } - - // Pick a random ipv4 address, simulating round-robin DNS. - rand.Seed(time.Now().UTC().UnixNano()) - i := rand.Intn(len(ipsv4)) - chost = ipsv4[i].String() - } else { - if len(ips) == 0 { - return fmt.Errorf("[SERV] No IPs resolved for the given hostname") - return nil - } - chost = ips[0].String() - } - } - - eachPeer(s.peers, func(p *Peer, v *list.Element) { - if p.conn == nil { - return - } - phost, pport, _ := net.SplitHostPort(p.conn.RemoteAddr().String()) - - if phost == chost && pport == aport { - alreadyConnected = true - //loggerger.Debugf("Peer %s already added.\n", chost) - return - } - }) - - if alreadyConnected { - return nil - } - - NewOutboundPeer(addr, s, s.serverCaps) - } - - return nil -} - -func (s *Ethereum) OutboundPeers() []*Peer { - // Create a new peer slice with at least the length of the total peers - outboundPeers := make([]*Peer, s.peers.Len()) - length := 0 - eachPeer(s.peers, func(p *Peer, e *list.Element) { - if !p.inbound && p.conn != nil { - outboundPeers[length] = p - length++ - } - }) - - return outboundPeers[:length] -} - -func (s *Ethereum) InboundPeers() []*Peer { - // Create a new peer slice with at least the length of the total peers - inboundPeers := make([]*Peer, s.peers.Len()) - length := 0 - eachPeer(s.peers, func(p *Peer, e *list.Element) { - if p.inbound { - inboundPeers[length] = p - length++ - } - }) - - return inboundPeers[:length] -} - -func (s *Ethereum) InOutPeers() []*Peer { - // Reap the dead peers first - s.reapPeers() - - // Create a new peer slice with at least the length of the total peers - inboundPeers := make([]*Peer, s.peers.Len()) - length := 0 - eachPeer(s.peers, func(p *Peer, e *list.Element) { - // Only return peers with an actual ip - if len(p.host) > 0 { - inboundPeers[length] = p - length++ - } - }) - - return inboundPeers[:length] -} - -func (s *Ethereum) Broadcast(msgType wire.MsgType, data []interface{}) { - msg := wire.NewMessage(msgType, data) - s.BroadcastMsg(msg) -} - -func (s *Ethereum) BroadcastMsg(msg *wire.Msg) { - eachPeer(s.peers, func(p *Peer, e *list.Element) { - p.QueueMessage(msg) - }) -} - -func (s *Ethereum) Peers() *list.List { - return s.peers -} - -func (s *Ethereum) reapPeers() { - eachPeer(s.peers, func(p *Peer, e *list.Element) { - if atomic.LoadInt32(&p.disconnect) == 1 || (p.inbound && (time.Now().Unix()-p.lastPong) > int64(5*time.Minute)) { - s.removePeerElement(e) - } - }) -} - -func (s *Ethereum) removePeerElement(e *list.Element) { - s.peerMut.Lock() - defer s.peerMut.Unlock() - - s.peers.Remove(e) - - s.eventMux.Post(PeerListEvent{s.peers}) -} - -func (s *Ethereum) RemovePeer(p *Peer) { - eachPeer(s.peers, func(peer *Peer, e *list.Element) { - if peer == p { - s.removePeerElement(e) - } - }) -} - -func (s *Ethereum) reapDeadPeerHandler() { - reapTimer := time.NewTicker(processReapingTimeout * time.Second) - - for { - select { - case <-reapTimer.C: - s.reapPeers() - } - } -} - -// Start the ethereum -func (s *Ethereum) Start(seed bool) { - s.blockPool.Start() - - // Bind to addr and port - ln, err := net.Listen("tcp", ":"+s.Port) - if err != nil { - loggerger.Warnf("Port %s in use. Connection listening disabled. Acting as client", s.Port) - s.listening = false - } else { - s.listening = true - // Starting accepting connections - loggerger.Infoln("Ready and accepting connections") - // Start the peer handler - go s.peerHandler(ln) - } - - if s.nat != nil { - go s.upnpUpdateThread() - } - - // Start the reaping processes - go s.reapDeadPeerHandler() - go s.update() - go s.filterLoop() - - if seed { - s.Seed() - } - s.ConnectToPeer("localhost:40404") - loggerger.Infoln("Server started") -} - -func (s *Ethereum) Seed() { - // Sorry Py person. I must blacklist. you perform badly - s.blacklist = append(s.blacklist, ethutil.Hex2Bytes("64656330303561383532336435376331616537643864663236623336313863373537353163636634333530626263396330346237336262623931383064393031")) - ips := PastPeers() - if len(ips) > 0 { - for _, ip := range ips { - loggerger.Infoln("Connecting to previous peer ", ip) - s.ConnectToPeer(ip) - } - } else { - loggerger.Debugln("Retrieving seed nodes") - - // Eth-Go Bootstrapping - ips, er := net.LookupIP("seed.bysh.me") - if er == nil { - peers := []string{} - for _, ip := range ips { - node := fmt.Sprintf("%s:%d", ip.String(), 30303) - loggerger.Debugln("Found DNS Go Peer:", node) - peers = append(peers, node) - } - s.ProcessPeerList(peers) - } - - // Official DNS Bootstrapping - _, nodes, err := net.LookupSRV("eth", "tcp", "ethereum.org") - if err == nil { - peers := []string{} - // Iterate SRV nodes - for _, n := range nodes { - target := n.Target - port := strconv.Itoa(int(n.Port)) - // Resolve target to ip (Go returns list, so may resolve to multiple ips?) - addr, err := net.LookupHost(target) - if err == nil { - for _, a := range addr { - // Build string out of SRV port and Resolved IP - peer := net.JoinHostPort(a, port) - loggerger.Debugln("Found DNS Bootstrap Peer:", peer) - peers = append(peers, peer) - } - } else { - loggerger.Debugln("Couldn't resolve :", target) - } - } - // Connect to Peer list - s.ProcessPeerList(peers) - } - - s.ConnectToPeer(seedNodeAddress) - } -} - -func (s *Ethereum) peerHandler(listener net.Listener) { - for { - conn, err := listener.Accept() - if err != nil { - loggerger.Debugln(err) - - continue - } - - go s.AddPeer(conn) - } -} - -func (s *Ethereum) Stop() { - // Stop eventMux first, it will close all subscriptions. - s.eventMux.Stop() - - // Close the database - defer s.db.Close() - - var ips []string - eachPeer(s.peers, func(p *Peer, e *list.Element) { - ips = append(ips, p.conn.RemoteAddr().String()) - }) - - if len(ips) > 0 { - d, _ := json.MarshalIndent(ips, "", " ") - ethutil.WriteFile(path.Join(ethutil.Config.ExecPath, "known_peers.json"), d) - } - - eachPeer(s.peers, func(p *Peer, e *list.Element) { - p.Stop() - }) - - close(s.quit) - - if s.RpcServer != nil { - s.RpcServer.Stop() - } - s.txPool.Stop() - s.blockPool.Stop() - - loggerger.Infoln("Server stopped") - close(s.shutdownChan) -} - -// This function will wait for a shutdown and resumes main thread execution -func (s *Ethereum) WaitForShutdown() { - <-s.shutdownChan -} - -func (s *Ethereum) upnpUpdateThread() { - // Go off immediately to prevent code duplication, thereafter we renew - // lease every 15 minutes. - timer := time.NewTimer(5 * time.Minute) - lport, _ := strconv.ParseInt(s.Port, 10, 16) - first := true -out: - for { - select { - case <-timer.C: - var err error - _, err = s.nat.AddPortMapping("TCP", int(lport), int(lport), "eth listen port", 20*60) - if err != nil { - loggerger.Debugln("can't add UPnP port mapping:", err) - break out - } - if first && err == nil { - _, err = s.nat.GetExternalAddress() - if err != nil { - loggerger.Debugln("UPnP can't get external address:", err) - continue out - } - first = false - } - timer.Reset(time.Minute * 15) - case <-s.quit: - break out - } - } - - timer.Stop() - - if err := s.nat.DeletePortMapping("TCP", int(lport), int(lport)); err != nil { - loggerger.Debugln("unable to remove UPnP port mapping:", err) - } else { - loggerger.Debugln("succesfully disestablished UPnP port mapping") - } -} - -func (self *Ethereum) update() { - upToDateTimer := time.NewTicker(1 * time.Second) - -out: - for { - select { - case <-upToDateTimer.C: - if self.IsUpToDate() && !self.isUpToDate { - self.eventMux.Post(ChainSyncEvent{false}) - self.isUpToDate = true - } else if !self.IsUpToDate() && self.isUpToDate { - self.eventMux.Post(ChainSyncEvent{true}) - self.isUpToDate = false - } - case <-self.quit: - break out - } - } -} - -// InstallFilter adds filter for blockchain events. -// The filter's callbacks will run for matching blocks and messages. -// The filter should not be modified after it has been installed. -func (self *Ethereum) InstallFilter(filter *core.Filter) (id int) { - self.filterMu.Lock() - id = self.filterId - self.filters[id] = filter - self.filterId++ - self.filterMu.Unlock() - return id -} - -func (self *Ethereum) UninstallFilter(id int) { - self.filterMu.Lock() - delete(self.filters, id) - self.filterMu.Unlock() -} - -// GetFilter retrieves a filter installed using InstallFilter. -// The filter may not be modified. -func (self *Ethereum) GetFilter(id int) *core.Filter { - self.filterMu.RLock() - defer self.filterMu.RUnlock() - return self.filters[id] -} - -func (self *Ethereum) filterLoop() { - // Subscribe to events - events := self.eventMux.Subscribe(core.NewBlockEvent{}, state.Messages(nil)) - for event := range events.Chan() { - switch event := event.(type) { - case core.NewBlockEvent: - self.filterMu.RLock() - for _, filter := range self.filters { - if filter.BlockCallback != nil { - filter.BlockCallback(event.Block) - } - } - self.filterMu.RUnlock() - - case state.Messages: - self.filterMu.RLock() - for _, filter := range self.filters { - if filter.MessageCallback != nil { - msgs := filter.FilterMessages(event) - if len(msgs) > 0 { - filter.MessageCallback(msgs) - } - } - } - self.filterMu.RUnlock() - } - } -} - -func bootstrapDb(db ethutil.Database) { - d, _ := db.Get([]byte("ProtocolVersion")) - protov := ethutil.NewValue(d).Uint() - - if protov == 0 { - db.Put([]byte("ProtocolVersion"), ethutil.NewValue(ProtocolVersion).Bytes()) - } -} - -func PastPeers() []string { - var ips []string - data, _ := ethutil.ReadAllFile(path.Join(ethutil.Config.ExecPath, "known_peers.json")) - json.Unmarshal([]byte(data), &ips) - - return ips -} diff --git a/event/filter/generic_filter.go b/event/filter/generic_filter.go index b04b4801e..2ce0f0642 100644 --- a/event/filter/generic_filter.go +++ b/event/filter/generic_filter.go @@ -2,19 +2,29 @@ package filter type Generic struct { Str1, Str2, Str3 string + Data map[string]struct{} Fn func(data interface{}) } +// self = registered, f = incoming func (self Generic) Compare(f Filter) bool { + var strMatch, dataMatch = true, true + filter := f.(Generic) - if (len(self.Str1) == 0 || filter.Str1 == self.Str1) && - (len(self.Str2) == 0 || filter.Str2 == self.Str2) && - (len(self.Str3) == 0 || filter.Str3 == self.Str3) { - return true + if (len(self.Str1) > 0 && filter.Str1 != self.Str1) || + (len(self.Str2) > 0 && filter.Str2 != self.Str2) || + (len(self.Str3) > 0 && filter.Str3 != self.Str3) { + strMatch = false + } + + for k, _ := range self.Data { + if _, ok := filter.Data[k]; !ok { + return false + } } - return false + return strMatch && dataMatch } func (self Generic) Trigger(data interface{}) { diff --git a/event/filter/old_filter.go b/event/filter/old_filter.go new file mode 100644 index 000000000..1a9a88173 --- /dev/null +++ b/event/filter/old_filter.go @@ -0,0 +1,94 @@ +// XXX This is the old filter system specifically for messages. This is till in used and could use some refactoring +package filter + +import ( + "sync" + + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/state" +) + +type FilterManager struct { + eventMux *event.TypeMux + + filterMu sync.RWMutex + filterId int + filters map[int]*core.Filter + + quit chan struct{} +} + +func NewFilterManager(mux *event.TypeMux) *FilterManager { + return &FilterManager{ + eventMux: mux, + filters: make(map[int]*core.Filter), + } +} + +func (self *FilterManager) Start() { + go self.filterLoop() +} + +func (self *FilterManager) Stop() { + close(self.quit) +} + +func (self *FilterManager) InstallFilter(filter *core.Filter) (id int) { + self.filterMu.Lock() + id = self.filterId + self.filters[id] = filter + self.filterId++ + self.filterMu.Unlock() + return id +} + +func (self *FilterManager) UninstallFilter(id int) { + self.filterMu.Lock() + delete(self.filters, id) + self.filterMu.Unlock() +} + +// GetFilter retrieves a filter installed using InstallFilter. +// The filter may not be modified. +func (self *FilterManager) GetFilter(id int) *core.Filter { + self.filterMu.RLock() + defer self.filterMu.RUnlock() + return self.filters[id] +} + +func (self *FilterManager) filterLoop() { + // Subscribe to events + events := self.eventMux.Subscribe(core.NewBlockEvent{}, state.Messages(nil)) + +out: + for { + select { + case <-self.quit: + break out + case event := <-events.Chan(): + switch event := event.(type) { + case core.NewBlockEvent: + self.filterMu.RLock() + for _, filter := range self.filters { + if filter.BlockCallback != nil { + filter.BlockCallback(event.Block) + } + } + self.filterMu.RUnlock() + + case state.Messages: + self.filterMu.RLock() + for _, filter := range self.filters { + if filter.MessageCallback != nil { + msgs := filter.FilterMessages(event) + if len(msgs) > 0 { + filter.MessageCallback(msgs) + } + } + } + self.filterMu.RUnlock() + } + } + } +} diff --git a/events.go b/events.go deleted file mode 100644 index 5fff1d831..000000000 --- a/events.go +++ /dev/null @@ -1,11 +0,0 @@ -package eth - -import "container/list" - -type PeerListEvent struct { - Peers *list.List -} - -type ChainSyncEvent struct { - InSync bool -} diff --git a/javascript/javascript_runtime.go b/javascript/javascript_runtime.go index a26f0154e..af1405049 100644 --- a/javascript/javascript_runtime.go +++ b/javascript/javascript_runtime.go @@ -7,10 +7,10 @@ import ( "path" "path/filepath" - "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/cmd/utils" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/eth" "github.com/ethereum/go-ethereum/ethutil" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/logger" @@ -203,7 +203,7 @@ func (self *JSRE) addPeer(call otto.FunctionCall) otto.Value { if err != nil { return otto.FalseValue() } - self.ethereum.ConnectToPeer(host) + self.ethereum.SuggestPeer(host) return otto.TrueValue() } diff --git a/javascript/types.go b/javascript/types.go index d5acaecce..cf5a6677b 100644 --- a/javascript/types.go +++ b/javascript/types.go @@ -3,7 +3,7 @@ package javascript import ( "fmt" - "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/eth" "github.com/ethereum/go-ethereum/ethutil" "github.com/ethereum/go-ethereum/state" "github.com/ethereum/go-ethereum/ui" diff --git a/miner/miner.go b/miner/miner.go index f63096b63..d909c228b 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -27,7 +27,7 @@ import ( "math/big" "sort" - "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/eth" "github.com/ethereum/go-ethereum/ethutil" "github.com/ethereum/go-ethereum/pow" "github.com/ethereum/go-ethereum/pow/ezp" @@ -36,7 +36,6 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/logger" - "github.com/ethereum/go-ethereum/wire" ) type LocalTx struct { @@ -217,7 +216,7 @@ func (self *Miner) mine() { if err != nil { minerlogger.Infoln(err) } else { - self.eth.Broadcast(wire.MsgBlockTy, []interface{}{block.Value().Val}) + self.eth.EventMux().Post(core.NewMinedBlockEvent{block}) minerlogger.Infof("🔨 Mined block %x\n", block.Hash()) minerlogger.Infoln(block) @@ -246,7 +245,7 @@ func (self *Miner) finiliseTxs() types.Transactions { } // Faster than append - for _, tx := range self.eth.TxPool().CurrentTransactions() { + for _, tx := range self.eth.TxPool().GetTransactions() { if tx.GasPrice().Cmp(self.MinAcceptedGasPrice) >= 0 { txs[actualSize] = tx actualSize++ diff --git a/nat.go b/nat.go deleted file mode 100644 index 999308eb2..000000000 --- a/nat.go +++ /dev/null @@ -1,12 +0,0 @@ -package eth - -import ( - "net" -) - -// protocol is either "udp" or "tcp" -type NAT interface { - GetExternalAddress() (addr net.IP, err error) - AddPortMapping(protocol string, externalPort, internalPort int, description string, timeout int) (mappedExternalPort int, err error) - DeletePortMapping(protocol string, externalPort, internalPort int) (err error) -} diff --git a/natpmp.go b/natpmp.go deleted file mode 100644 index 489342a4b..000000000 --- a/natpmp.go +++ /dev/null @@ -1,55 +0,0 @@ -package eth - -import ( - "fmt" - "net" - - natpmp "github.com/jackpal/go-nat-pmp" -) - -// Adapt the NAT-PMP protocol to the NAT interface - -// TODO: -// + Register for changes to the external address. -// + Re-register port mapping when router reboots. -// + A mechanism for keeping a port mapping registered. - -type natPMPClient struct { - client *natpmp.Client -} - -func NewNatPMP(gateway net.IP) (nat NAT) { - return &natPMPClient{natpmp.NewClient(gateway)} -} - -func (n *natPMPClient) GetExternalAddress() (addr net.IP, err error) { - response, err := n.client.GetExternalAddress() - if err != nil { - return - } - ip := response.ExternalIPAddress - addr = net.IPv4(ip[0], ip[1], ip[2], ip[3]) - return -} - -func (n *natPMPClient) AddPortMapping(protocol string, externalPort, internalPort int, - description string, timeout int) (mappedExternalPort int, err error) { - if timeout <= 0 { - err = fmt.Errorf("timeout must not be <= 0") - return - } - // Note order of port arguments is switched between our AddPortMapping and the client's AddPortMapping. - response, err := n.client.AddPortMapping(protocol, internalPort, externalPort, timeout) - if err != nil { - return - } - mappedExternalPort = int(response.MappedExternalPort) - return -} - -func (n *natPMPClient) DeletePortMapping(protocol string, externalPort, internalPort int) (err error) { - // To destroy a mapping, send an add-port with - // an internalPort of the internal port to destroy, an external port of zero and a time of zero. - _, err = n.client.AddPortMapping(protocol, internalPort, 0, 0) - return -} diff --git a/natupnp.go b/natupnp.go deleted file mode 100644 index c7f9eeb62..000000000 --- a/natupnp.go +++ /dev/null @@ -1,338 +0,0 @@ -package eth - -// Just enough UPnP to be able to forward ports -// - -import ( - "bytes" - "encoding/xml" - "errors" - "net" - "net/http" - "os" - "strconv" - "strings" - "time" -) - -type upnpNAT struct { - serviceURL string - ourIP string -} - -func Discover() (nat NAT, err error) { - ssdp, err := net.ResolveUDPAddr("udp4", "239.255.255.250:1900") - if err != nil { - return - } - conn, err := net.ListenPacket("udp4", ":0") - if err != nil { - return - } - socket := conn.(*net.UDPConn) - defer socket.Close() - - err = socket.SetDeadline(time.Now().Add(10 * time.Second)) - if err != nil { - return - } - - st := "ST: urn:schemas-upnp-org:device:InternetGatewayDevice:1\r\n" - buf := bytes.NewBufferString( - "M-SEARCH * HTTP/1.1\r\n" + - "HOST: 239.255.255.250:1900\r\n" + - st + - "MAN: \"ssdp:discover\"\r\n" + - "MX: 2\r\n\r\n") - message := buf.Bytes() - answerBytes := make([]byte, 1024) - for i := 0; i < 3; i++ { - _, err = socket.WriteToUDP(message, ssdp) - if err != nil { - return - } - var n int - n, _, err = socket.ReadFromUDP(answerBytes) - if err != nil { - continue - // socket.Close() - // return - } - answer := string(answerBytes[0:n]) - if strings.Index(answer, "\r\n"+st) < 0 { - continue - } - // HTTP header field names are case-insensitive. - // http://www.w3.org/Protocols/rfc2616/rfc2616-sec4.html#sec4.2 - locString := "\r\nlocation: " - answer = strings.ToLower(answer) - locIndex := strings.Index(answer, locString) - if locIndex < 0 { - continue - } - loc := answer[locIndex+len(locString):] - endIndex := strings.Index(loc, "\r\n") - if endIndex < 0 { - continue - } - locURL := loc[0:endIndex] - var serviceURL string - serviceURL, err = getServiceURL(locURL) - if err != nil { - return - } - var ourIP string - ourIP, err = getOurIP() - if err != nil { - return - } - nat = &upnpNAT{serviceURL: serviceURL, ourIP: ourIP} - return - } - err = errors.New("UPnP port discovery failed.") - return -} - -// service represents the Service type in an UPnP xml description. -// Only the parts we care about are present and thus the xml may have more -// fields than present in the structure. -type service struct { - ServiceType string `xml:"serviceType"` - ControlURL string `xml:"controlURL"` -} - -// deviceList represents the deviceList type in an UPnP xml description. -// Only the parts we care about are present and thus the xml may have more -// fields than present in the structure. -type deviceList struct { - XMLName xml.Name `xml:"deviceList"` - Device []device `xml:"device"` -} - -// serviceList represents the serviceList type in an UPnP xml description. -// Only the parts we care about are present and thus the xml may have more -// fields than present in the structure. -type serviceList struct { - XMLName xml.Name `xml:"serviceList"` - Service []service `xml:"service"` -} - -// device represents the device type in an UPnP xml description. -// Only the parts we care about are present and thus the xml may have more -// fields than present in the structure. -type device struct { - XMLName xml.Name `xml:"device"` - DeviceType string `xml:"deviceType"` - DeviceList deviceList `xml:"deviceList"` - ServiceList serviceList `xml:"serviceList"` -} - -// specVersion represents the specVersion in a UPnP xml description. -// Only the parts we care about are present and thus the xml may have more -// fields than present in the structure. -type specVersion struct { - XMLName xml.Name `xml:"specVersion"` - Major int `xml:"major"` - Minor int `xml:"minor"` -} - -// root represents the Root document for a UPnP xml description. -// Only the parts we care about are present and thus the xml may have more -// fields than present in the structure. -type root struct { - XMLName xml.Name `xml:"root"` - SpecVersion specVersion - Device device -} - -func getChildDevice(d *device, deviceType string) *device { - dl := d.DeviceList.Device - for i := 0; i < len(dl); i++ { - if dl[i].DeviceType == deviceType { - return &dl[i] - } - } - return nil -} - -func getChildService(d *device, serviceType string) *service { - sl := d.ServiceList.Service - for i := 0; i < len(sl); i++ { - if sl[i].ServiceType == serviceType { - return &sl[i] - } - } - return nil -} - -func getOurIP() (ip string, err error) { - hostname, err := os.Hostname() - if err != nil { - return - } - p, err := net.LookupIP(hostname) - if err != nil && len(p) > 0 { - return - } - return p[0].String(), nil -} - -func getServiceURL(rootURL string) (url string, err error) { - r, err := http.Get(rootURL) - if err != nil { - return - } - defer r.Body.Close() - if r.StatusCode >= 400 { - err = errors.New(string(r.StatusCode)) - return - } - var root root - err = xml.NewDecoder(r.Body).Decode(&root) - - if err != nil { - return - } - a := &root.Device - if a.DeviceType != "urn:schemas-upnp-org:device:InternetGatewayDevice:1" { - err = errors.New("No InternetGatewayDevice") - return - } - b := getChildDevice(a, "urn:schemas-upnp-org:device:WANDevice:1") - if b == nil { - err = errors.New("No WANDevice") - return - } - c := getChildDevice(b, "urn:schemas-upnp-org:device:WANConnectionDevice:1") - if c == nil { - err = errors.New("No WANConnectionDevice") - return - } - d := getChildService(c, "urn:schemas-upnp-org:service:WANIPConnection:1") - if d == nil { - err = errors.New("No WANIPConnection") - return - } - url = combineURL(rootURL, d.ControlURL) - return -} - -func combineURL(rootURL, subURL string) string { - protocolEnd := "://" - protoEndIndex := strings.Index(rootURL, protocolEnd) - a := rootURL[protoEndIndex+len(protocolEnd):] - rootIndex := strings.Index(a, "/") - return rootURL[0:protoEndIndex+len(protocolEnd)+rootIndex] + subURL -} - -func soapRequest(url, function, message string) (r *http.Response, err error) { - fullMessage := "<?xml version=\"1.0\" ?>" + - "<s:Envelope xmlns:s=\"http://schemas.xmlsoap.org/soap/envelope/\" s:encodingStyle=\"http://schemas.xmlsoap.org/soap/encoding/\">\r\n" + - "<s:Body>" + message + "</s:Body></s:Envelope>" - - req, err := http.NewRequest("POST", url, strings.NewReader(fullMessage)) - if err != nil { - return nil, err - } - req.Header.Set("Content-Type", "text/xml ; charset=\"utf-8\"") - req.Header.Set("User-Agent", "Darwin/10.0.0, UPnP/1.0, MiniUPnPc/1.3") - //req.Header.Set("Transfer-Encoding", "chunked") - req.Header.Set("SOAPAction", "\"urn:schemas-upnp-org:service:WANIPConnection:1#"+function+"\"") - req.Header.Set("Connection", "Close") - req.Header.Set("Cache-Control", "no-cache") - req.Header.Set("Pragma", "no-cache") - - // log.Stderr("soapRequest ", req) - //fmt.Println(fullMessage) - - r, err = http.DefaultClient.Do(req) - if err != nil { - return - } - - if r.Body != nil { - defer r.Body.Close() - } - - if r.StatusCode >= 400 { - // log.Stderr(function, r.StatusCode) - err = errors.New("Error " + strconv.Itoa(r.StatusCode) + " for " + function) - r = nil - return - } - return -} - -type statusInfo struct { - externalIpAddress string -} - -func (n *upnpNAT) getStatusInfo() (info statusInfo, err error) { - - message := "<u:GetStatusInfo xmlns:u=\"urn:schemas-upnp-org:service:WANIPConnection:1\">\r\n" + - "</u:GetStatusInfo>" - - var response *http.Response - response, err = soapRequest(n.serviceURL, "GetStatusInfo", message) - if err != nil { - return - } - - // TODO: Write a soap reply parser. It has to eat the Body and envelope tags... - - response.Body.Close() - return -} - -func (n *upnpNAT) GetExternalAddress() (addr net.IP, err error) { - info, err := n.getStatusInfo() - if err != nil { - return - } - addr = net.ParseIP(info.externalIpAddress) - return -} - -func (n *upnpNAT) AddPortMapping(protocol string, externalPort, internalPort int, description string, timeout int) (mappedExternalPort int, err error) { - // A single concatenation would break ARM compilation. - message := "<u:AddPortMapping xmlns:u=\"urn:schemas-upnp-org:service:WANIPConnection:1\">\r\n" + - "<NewRemoteHost></NewRemoteHost><NewExternalPort>" + strconv.Itoa(externalPort) - message += "</NewExternalPort><NewProtocol>" + protocol + "</NewProtocol>" - message += "<NewInternalPort>" + strconv.Itoa(internalPort) + "</NewInternalPort>" + - "<NewInternalClient>" + n.ourIP + "</NewInternalClient>" + - "<NewEnabled>1</NewEnabled><NewPortMappingDescription>" - message += description + - "</NewPortMappingDescription><NewLeaseDuration>" + strconv.Itoa(timeout) + - "</NewLeaseDuration></u:AddPortMapping>" - - var response *http.Response - response, err = soapRequest(n.serviceURL, "AddPortMapping", message) - if err != nil { - return - } - - // TODO: check response to see if the port was forwarded - // log.Println(message, response) - mappedExternalPort = externalPort - _ = response - return -} - -func (n *upnpNAT) DeletePortMapping(protocol string, externalPort, internalPort int) (err error) { - - message := "<u:DeletePortMapping xmlns:u=\"urn:schemas-upnp-org:service:WANIPConnection:1\">\r\n" + - "<NewRemoteHost></NewRemoteHost><NewExternalPort>" + strconv.Itoa(externalPort) + - "</NewExternalPort><NewProtocol>" + protocol + "</NewProtocol>" + - "</u:DeletePortMapping>" - - var response *http.Response - response, err = soapRequest(n.serviceURL, "DeletePortMapping", message) - if err != nil { - return - } - - // TODO: check response to see if the port was deleted - // log.Println(message, response) - _ = response - return -} diff --git a/p2p/server.go b/p2p/server.go index 8a6087566..326781234 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -246,12 +246,7 @@ func (srv *Server) Stop() { func (srv *Server) discLoop() { for peer := range srv.peerDisconnect { - // peer has just disconnected. free up its slot. - srvlog.Infof("%v is gone", peer) - srv.peerSlots <- peer.slot - srv.lock.Lock() - srv.peers[peer.slot] = nil - srv.lock.Unlock() + srv.removePeer(peer) } } @@ -384,7 +379,7 @@ func (srv *Server) addPeer(conn net.Conn, desc *peerAddr, slot int) *Peer { func (srv *Server) removePeer(peer *Peer) { srv.lock.Lock() defer srv.lock.Unlock() - srvlog.Debugf("Removing peer %v %v (slot %v)\n", peer, peer.slot) + srvlog.Debugf("Removing %v (slot %v)\n", peer, peer.slot) if srv.peers[peer.slot] != peer { srvlog.Warnln("Invalid peer to remove:", peer) return @@ -416,6 +411,7 @@ func (srv *Server) verifyPeer(addr *peerAddr) error { return nil } +// TODO replace with "Set" type Blacklist interface { Get([]byte) (bool, error) Put([]byte) error diff --git a/peer.go b/peer.go deleted file mode 100644 index 13f0239d4..000000000 --- a/peer.go +++ /dev/null @@ -1,881 +0,0 @@ -package eth - -import ( - "bytes" - "container/list" - "fmt" - "math" - "math/big" - "net" - "strconv" - "strings" - "sync/atomic" - "time" - - "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/ethutil" - "github.com/ethereum/go-ethereum/logger" - "github.com/ethereum/go-ethereum/wire" -) - -var peerlogger = logger.NewLogger("PEER") - -const ( - // The size of the output buffer for writing messages - outputBufferSize = 50 - // Current protocol version - ProtocolVersion = 49 - // Current P2P version - P2PVersion = 2 - // Ethereum network version - NetVersion = 0 - // Interval for ping/pong message - pingPongTimer = 2 * time.Second -) - -type DiscReason byte - -const ( - // Values are given explicitly instead of by iota because these values are - // defined by the wire protocol spec; it is easier for humans to ensure - // correctness when values are explicit. - DiscRequested DiscReason = iota - DiscReTcpSysErr - DiscBadProto - DiscBadPeer - DiscTooManyPeers - DiscConnDup - DiscGenesisErr - DiscProtoErr - DiscQuitting -) - -var discReasonToString = []string{ - "requested", - "TCP sys error", - "bad protocol", - "useless peer", - "too many peers", - "already connected", - "wrong genesis block", - "incompatible network", - "quitting", -} - -func (d DiscReason) String() string { - if len(discReasonToString) < int(d) { - return "Unknown" - } - - return discReasonToString[d] -} - -// Peer capabilities -type Caps byte - -const ( - CapPeerDiscTy Caps = 1 << iota - CapTxTy - CapChainTy - - CapDefault = CapChainTy | CapTxTy | CapPeerDiscTy -) - -var capsToString = map[Caps]string{ - CapPeerDiscTy: "Peer discovery", - CapTxTy: "Transaction relaying", - CapChainTy: "Block chain relaying", -} - -func (c Caps) IsCap(cap Caps) bool { - return c&cap > 0 -} - -func (c Caps) String() string { - var caps []string - if c.IsCap(CapPeerDiscTy) { - caps = append(caps, capsToString[CapPeerDiscTy]) - } - if c.IsCap(CapChainTy) { - caps = append(caps, capsToString[CapChainTy]) - } - if c.IsCap(CapTxTy) { - caps = append(caps, capsToString[CapTxTy]) - } - - return strings.Join(caps, " | ") -} - -type Peer struct { - // Ethereum interface - ethereum *Ethereum - // Net connection - conn net.Conn - // Output queue which is used to communicate and handle messages - outputQueue chan *wire.Msg - // Quit channel - quit chan bool - // Determines whether it's an inbound or outbound peer - inbound bool - // Flag for checking the peer's connectivity state - connected int32 - disconnect int32 - // Last known message send - lastSend time.Time - // Indicated whether a verack has been send or not - // This flag is used by writeMessage to check if messages are allowed - // to be send or not. If no version is known all messages are ignored. - versionKnown bool - statusKnown bool - - // Last received pong message - lastPong int64 - lastBlockReceived time.Time - doneFetchingHashes bool - lastHashAt time.Time - lastHashRequestedAt time.Time - - host []byte - port uint16 - caps Caps - td *big.Int - bestHash []byte - lastReceivedHash []byte - requestedHashes [][]byte - - // This peer's public key - pubkey []byte - - // Indicated whether the node is catching up or not - catchingUp bool - diverted bool - blocksRequested int - - version string - - // We use this to give some kind of pingtime to a node, not very accurate, could be improved. - pingTime time.Duration - pingStartTime time.Time - - lastRequestedBlock *types.Block - - protocolCaps *ethutil.Value -} - -func NewPeer(conn net.Conn, ethereum *Ethereum, inbound bool) *Peer { - pubkey := ethereum.KeyManager().PublicKey()[1:] - - return &Peer{ - outputQueue: make(chan *wire.Msg, outputBufferSize), - quit: make(chan bool), - ethereum: ethereum, - conn: conn, - inbound: inbound, - disconnect: 0, - connected: 1, - port: 30303, - pubkey: pubkey, - blocksRequested: 10, - caps: ethereum.ServerCaps(), - version: ethereum.ClientIdentity().String(), - protocolCaps: ethutil.NewValue(nil), - td: big.NewInt(0), - doneFetchingHashes: true, - } -} - -func NewOutboundPeer(addr string, ethereum *Ethereum, caps Caps) *Peer { - p := &Peer{ - outputQueue: make(chan *wire.Msg, outputBufferSize), - quit: make(chan bool), - ethereum: ethereum, - inbound: false, - connected: 0, - disconnect: 0, - port: 30303, - caps: caps, - version: ethereum.ClientIdentity().String(), - protocolCaps: ethutil.NewValue(nil), - td: big.NewInt(0), - doneFetchingHashes: true, - } - - // Set up the connection in another goroutine so we don't block the main thread - go func() { - conn, err := p.Connect(addr) - if err != nil { - //peerlogger.Debugln("Connection to peer failed. Giving up.", err) - p.Stop() - return - } - p.conn = conn - - // Atomically set the connection state - atomic.StoreInt32(&p.connected, 1) - atomic.StoreInt32(&p.disconnect, 0) - - p.Start() - }() - - return p -} - -func (self *Peer) Connect(addr string) (conn net.Conn, err error) { - const maxTries = 3 - for attempts := 0; attempts < maxTries; attempts++ { - conn, err = net.DialTimeout("tcp", addr, 10*time.Second) - if err != nil { - time.Sleep(time.Duration(attempts*20) * time.Second) - continue - } - - // Success - return - } - - return -} - -// Getters -func (p *Peer) PingTime() string { - return p.pingTime.String() -} -func (p *Peer) Inbound() bool { - return p.inbound -} -func (p *Peer) LastSend() time.Time { - return p.lastSend -} -func (p *Peer) LastPong() int64 { - return p.lastPong -} -func (p *Peer) Host() []byte { - return p.host -} -func (p *Peer) Port() uint16 { - return p.port -} -func (p *Peer) Version() string { - return p.version -} -func (p *Peer) Connected() *int32 { - return &p.connected -} - -// Setters -func (p *Peer) SetVersion(version string) { - p.version = version -} - -// Outputs any RLP encoded data to the peer -func (p *Peer) QueueMessage(msg *wire.Msg) { - if atomic.LoadInt32(&p.connected) != 1 { - return - } - p.outputQueue <- msg -} - -func (p *Peer) writeMessage(msg *wire.Msg) { - // Ignore the write if we're not connected - if atomic.LoadInt32(&p.connected) != 1 { - return - } - - if !p.versionKnown { - switch msg.Type { - case wire.MsgHandshakeTy: // Ok - default: // Anything but ack is allowed - return - } - } else { - /* - if !p.statusKnown { - switch msg.Type { - case wire.MsgStatusTy: // Ok - default: // Anything but ack is allowed - return - } - } - */ - } - - peerlogger.DebugDetailf("(%v) <= %v\n", p.conn.RemoteAddr(), formatMessage(msg)) - - err := wire.WriteMessage(p.conn, msg) - if err != nil { - peerlogger.Debugln(" Can't send message:", err) - // Stop the client if there was an error writing to it - p.Stop() - return - } -} - -// Outbound message handler. Outbound messages are handled here -func (p *Peer) HandleOutbound() { - // The ping timer. Makes sure that every 2 minutes a ping is send to the peer - pingTimer := time.NewTicker(pingPongTimer) - serviceTimer := time.NewTicker(10 * time.Second) - -out: - for { - skip: - select { - // Main message queue. All outbound messages are processed through here - case msg := <-p.outputQueue: - if !p.statusKnown { - switch msg.Type { - case wire.MsgTxTy, wire.MsgGetBlockHashesTy, wire.MsgBlockHashesTy, wire.MsgGetBlocksTy, wire.MsgBlockTy: - break skip - } - } - - switch msg.Type { - case wire.MsgGetBlockHashesTy: - p.lastHashRequestedAt = time.Now() - } - - p.writeMessage(msg) - p.lastSend = time.Now() - - // Ping timer - case <-pingTimer.C: - p.writeMessage(wire.NewMessage(wire.MsgPingTy, "")) - p.pingStartTime = time.Now() - - // Service timer takes care of peer broadcasting, transaction - // posting or block posting - case <-serviceTimer.C: - p.QueueMessage(wire.NewMessage(wire.MsgGetPeersTy, "")) - - case <-p.quit: - // Break out of the for loop if a quit message is posted - break out - } - } - -clean: - // This loop is for draining the output queue and anybody waiting for us - for { - select { - case <-p.outputQueue: - // TODO - default: - break clean - } - } -} - -func formatMessage(msg *wire.Msg) (ret string) { - ret = fmt.Sprintf("%v %v", msg.Type, msg.Data) - - /* - XXX Commented out because I need the log level here to determine - if i should or shouldn't generate this message - */ - /* - switch msg.Type { - case wire.MsgPeersTy: - ret += fmt.Sprintf("(%d entries)", msg.Data.Len()) - case wire.MsgBlockTy: - b1, b2 := chain.NewBlockFromRlpValue(msg.Data.Get(0)), ethchain.NewBlockFromRlpValue(msg.Data.Get(msg.Data.Len()-1)) - ret += fmt.Sprintf("(%d entries) %x - %x", msg.Data.Len(), b1.Hash()[0:4], b2.Hash()[0:4]) - case wire.MsgBlockHashesTy: - h1, h2 := msg.Data.Get(0).Bytes(), msg.Data.Get(msg.Data.Len()-1).Bytes() - ret += fmt.Sprintf("(%d entries) %x - %x", msg.Data.Len(), h1, h2) - } - */ - - return -} - -// Inbound handler. Inbound messages are received here and passed to the appropriate methods -func (p *Peer) HandleInbound() { - for atomic.LoadInt32(&p.disconnect) == 0 { - - // HMM? - time.Sleep(50 * time.Millisecond) - // Wait for a message from the peer - msgs, err := wire.ReadMessages(p.conn) - if err != nil { - peerlogger.Debugln(err) - } - for _, msg := range msgs { - peerlogger.DebugDetailf("(%v) => %v\n", p.conn.RemoteAddr(), formatMessage(msg)) - - switch msg.Type { - case wire.MsgHandshakeTy: - // Version message - p.handleHandshake(msg) - - //if p.caps.IsCap(CapPeerDiscTy) { - p.QueueMessage(wire.NewMessage(wire.MsgGetPeersTy, "")) - //} - - case wire.MsgDiscTy: - p.Stop() - peerlogger.Infoln("Disconnect peer: ", DiscReason(msg.Data.Get(0).Uint())) - case wire.MsgPingTy: - // Respond back with pong - p.QueueMessage(wire.NewMessage(wire.MsgPongTy, "")) - case wire.MsgPongTy: - // If we received a pong back from a peer we set the - // last pong so the peer handler knows this peer is still - // active. - p.lastPong = time.Now().Unix() - p.pingTime = time.Since(p.pingStartTime) - case wire.MsgTxTy: - // If the message was a transaction queue the transaction - // in the TxPool where it will undergo validation and - // processing when a new block is found - for i := 0; i < msg.Data.Len(); i++ { - tx := types.NewTransactionFromValue(msg.Data.Get(i)) - err := p.ethereum.TxPool().Add(tx) - if err != nil { - peerlogger.Infoln(err) - } else { - peerlogger.Infof("tx OK (%x)\n", tx.Hash()[0:4]) - } - } - case wire.MsgGetPeersTy: - // Peer asked for list of connected peers - //p.pushPeers() - case wire.MsgPeersTy: - // Received a list of peers (probably because MsgGetPeersTy was send) - data := msg.Data - // Create new list of possible peers for the ethereum to process - peers := make([]string, data.Len()) - // Parse each possible peer - for i := 0; i < data.Len(); i++ { - value := data.Get(i) - peers[i] = unpackAddr(value.Get(0), value.Get(1).Uint()) - } - - // Connect to the list of peers - p.ethereum.ProcessPeerList(peers) - - case wire.MsgStatusTy: - // Handle peer's status msg - p.handleStatus(msg) - } - - // TMP - if p.statusKnown { - switch msg.Type { - - case wire.MsgGetBlockHashesTy: - if msg.Data.Len() < 2 { - peerlogger.Debugln("err: argument length invalid ", msg.Data.Len()) - } - - hash := msg.Data.Get(0).Bytes() - amount := msg.Data.Get(1).Uint() - - hashes := p.ethereum.ChainManager().GetChainHashesFromHash(hash, amount) - - p.QueueMessage(wire.NewMessage(wire.MsgBlockHashesTy, ethutil.ByteSliceToInterface(hashes))) - - case wire.MsgGetBlocksTy: - // Limit to max 300 blocks - max := int(math.Min(float64(msg.Data.Len()), 300.0)) - var blocks []interface{} - - for i := 0; i < max; i++ { - hash := msg.Data.Get(i).Bytes() - block := p.ethereum.ChainManager().GetBlock(hash) - if block != nil { - blocks = append(blocks, block.Value().Raw()) - } - } - - p.QueueMessage(wire.NewMessage(wire.MsgBlockTy, blocks)) - - case wire.MsgBlockHashesTy: - p.catchingUp = true - - blockPool := p.ethereum.blockPool - - foundCommonHash := false - p.lastHashAt = time.Now() - - it := msg.Data.NewIterator() - for it.Next() { - hash := it.Value().Bytes() - p.lastReceivedHash = hash - - if blockPool.HasCommonHash(hash) { - foundCommonHash = true - - break - } - - blockPool.AddHash(hash, p) - } - - if !foundCommonHash { - p.FetchHashes() - } else { - peerlogger.Infof("Found common hash (%x...)\n", p.lastReceivedHash[0:4]) - p.doneFetchingHashes = true - } - - case wire.MsgBlockTy: - p.catchingUp = true - - blockPool := p.ethereum.blockPool - - it := msg.Data.NewIterator() - for it.Next() { - block := types.NewBlockFromRlpValue(it.Value()) - blockPool.Add(block, p) - - p.lastBlockReceived = time.Now() - } - case wire.MsgNewBlockTy: - var ( - blockPool = p.ethereum.blockPool - block = types.NewBlockFromRlpValue(msg.Data.Get(0)) - td = msg.Data.Get(1).BigInt() - ) - - if td.Cmp(blockPool.td) > 0 { - p.ethereum.blockPool.AddNew(block, p) - } - } - - } - } - } - - p.Stop() -} - -func (self *Peer) FetchBlocks(hashes [][]byte) { - if len(hashes) > 0 { - peerlogger.Debugf("Fetching blocks (%d)\n", len(hashes)) - - self.QueueMessage(wire.NewMessage(wire.MsgGetBlocksTy, ethutil.ByteSliceToInterface(hashes))) - } -} - -func (self *Peer) FetchHashes() bool { - blockPool := self.ethereum.blockPool - - return blockPool.FetchHashes(self) -} - -func (self *Peer) FetchingHashes() bool { - return !self.doneFetchingHashes -} - -// General update method -func (self *Peer) update() { - serviceTimer := time.NewTicker(100 * time.Millisecond) - -out: - for { - select { - case <-serviceTimer.C: - if self.IsCap("eth") { - var ( - sinceBlock = time.Since(self.lastBlockReceived) - ) - - if sinceBlock > 5*time.Second { - self.catchingUp = false - } - } - case <-self.quit: - break out - } - } - - serviceTimer.Stop() -} - -func (p *Peer) Start() { - peerHost, peerPort, _ := net.SplitHostPort(p.conn.LocalAddr().String()) - servHost, servPort, _ := net.SplitHostPort(p.conn.RemoteAddr().String()) - - if p.inbound { - p.host, p.port = packAddr(peerHost, peerPort) - } else { - p.host, p.port = packAddr(servHost, servPort) - } - - err := p.pushHandshake() - if err != nil { - peerlogger.Debugln("Peer can't send outbound version ack", err) - - p.Stop() - - return - } - - go p.HandleOutbound() - // Run the inbound handler in a new goroutine - go p.HandleInbound() - // Run the general update handler - go p.update() - - // Wait a few seconds for startup and then ask for an initial ping - time.Sleep(2 * time.Second) - p.writeMessage(wire.NewMessage(wire.MsgPingTy, "")) - p.pingStartTime = time.Now() - -} - -func (p *Peer) Stop() { - p.StopWithReason(DiscRequested) -} - -func (p *Peer) StopWithReason(reason DiscReason) { - if atomic.AddInt32(&p.disconnect, 1) != 1 { - return - } - - // Pre-emptively remove the peer; don't wait for reaping. We already know it's dead if we are here - p.ethereum.RemovePeer(p) - - close(p.quit) - if atomic.LoadInt32(&p.connected) != 0 { - p.writeMessage(wire.NewMessage(wire.MsgDiscTy, reason)) - p.conn.Close() - } -} - -func (p *Peer) peersMessage() *wire.Msg { - outPeers := make([]interface{}, len(p.ethereum.InOutPeers())) - // Serialise each peer - for i, peer := range p.ethereum.InOutPeers() { - // Don't return localhost as valid peer - if !net.ParseIP(peer.conn.RemoteAddr().String()).IsLoopback() { - outPeers[i] = peer.RlpData() - } - } - - // Return the message to the peer with the known list of connected clients - return wire.NewMessage(wire.MsgPeersTy, outPeers) -} - -// Pushes the list of outbound peers to the client when requested -func (p *Peer) pushPeers() { - p.QueueMessage(p.peersMessage()) -} - -func (self *Peer) pushStatus() { - msg := wire.NewMessage(wire.MsgStatusTy, []interface{}{ - uint32(ProtocolVersion), - uint32(NetVersion), - self.ethereum.ChainManager().Td(), - self.ethereum.ChainManager().CurrentBlock().Hash(), - self.ethereum.ChainManager().Genesis().Hash(), - }) - - self.QueueMessage(msg) -} - -func (self *Peer) handleStatus(msg *wire.Msg) { - c := msg.Data - - var ( - //protoVersion = c.Get(0).Uint() - netVersion = c.Get(1).Uint() - td = c.Get(2).BigInt() - bestHash = c.Get(3).Bytes() - genesis = c.Get(4).Bytes() - ) - - if bytes.Compare(self.ethereum.ChainManager().Genesis().Hash(), genesis) != 0 { - loggerger.Warnf("Invalid genisis hash %x. Disabling [eth]\n", genesis) - return - } - - if netVersion != NetVersion { - loggerger.Warnf("Invalid network version %d. Disabling [eth]\n", netVersion) - return - } - - /* - if protoVersion != ProtocolVersion { - loggerger.Warnf("Invalid protocol version %d. Disabling [eth]\n", protoVersion) - return - } - */ - - // Get the td and last hash - self.td = td - self.bestHash = bestHash - self.lastReceivedHash = bestHash - - self.statusKnown = true - - // Compare the total TD with the blockchain TD. If remote is higher - // fetch hashes from highest TD node. - self.FetchHashes() - - loggerger.Infof("Peer is [eth] capable. (TD = %v ~ %x)", self.td, self.bestHash) - -} - -func (p *Peer) pushHandshake() error { - pubkey := p.ethereum.KeyManager().PublicKey() - msg := wire.NewMessage(wire.MsgHandshakeTy, []interface{}{ - P2PVersion, []byte(p.version), []interface{}{[]interface{}{"eth", ProtocolVersion}}, p.port, pubkey[1:], - }) - - p.QueueMessage(msg) - - return nil -} - -func (p *Peer) handleHandshake(msg *wire.Msg) { - c := msg.Data - - var ( - p2pVersion = c.Get(0).Uint() - clientId = c.Get(1).Str() - caps = c.Get(2) - port = c.Get(3).Uint() - pub = c.Get(4).Bytes() - ) - - // Check correctness of p2p protocol version - if p2pVersion != P2PVersion { - peerlogger.Debugf("Invalid P2P version. Require protocol %d, received %d\n", P2PVersion, p2pVersion) - p.Stop() - return - } - - // Handle the pub key (validation, uniqueness) - if len(pub) == 0 { - peerlogger.Warnln("Pubkey required, not supplied in handshake.") - p.Stop() - return - } - - // Self connect detection - pubkey := p.ethereum.KeyManager().PublicKey() - if bytes.Compare(pubkey[1:], pub) == 0 { - p.Stop() - - return - } - - // Check for blacklisting - for _, pk := range p.ethereum.blacklist { - if bytes.Compare(pk, pub) == 0 { - peerlogger.Debugf("Blacklisted peer tried to connect (%x...)\n", pubkey[0:4]) - p.StopWithReason(DiscBadPeer) - - return - } - } - - usedPub := 0 - // This peer is already added to the peerlist so we expect to find a double pubkey at least once - eachPeer(p.ethereum.Peers(), func(peer *Peer, e *list.Element) { - if bytes.Compare(pub, peer.pubkey) == 0 { - usedPub++ - } - }) - - if usedPub > 0 { - peerlogger.Debugf("Pubkey %x found more then once. Already connected to client.", p.pubkey) - p.Stop() - return - } - p.pubkey = pub - - // If this is an inbound connection send an ack back - if p.inbound { - p.port = uint16(port) - } - - p.SetVersion(clientId) - - p.versionKnown = true - - p.ethereum.PushPeer(p) - p.ethereum.eventMux.Post(PeerListEvent{p.ethereum.Peers()}) - - p.protocolCaps = caps - - it := caps.NewIterator() - var capsStrs []string - for it.Next() { - cap := it.Value().Get(0).Str() - ver := it.Value().Get(1).Uint() - switch cap { - case "eth": - if ver != ProtocolVersion { - loggerger.Warnf("Invalid protocol version %d. Disabling [eth]\n", ver) - continue - } - p.pushStatus() - } - - capsStrs = append(capsStrs, fmt.Sprintf("%s/%d", cap, ver)) - } - - peerlogger.Infof("Added peer (%s) %d / %d (%v)\n", p.conn.RemoteAddr(), p.ethereum.Peers().Len(), p.ethereum.MaxPeers, capsStrs) - - peerlogger.Debugln(p) -} - -func (self *Peer) IsCap(cap string) bool { - capsIt := self.protocolCaps.NewIterator() - for capsIt.Next() { - if capsIt.Value().Str() == cap { - return true - } - } - - return false -} - -func (self *Peer) Caps() *ethutil.Value { - return self.protocolCaps -} - -func (p *Peer) String() string { - var strBoundType string - if p.inbound { - strBoundType = "inbound" - } else { - strBoundType = "outbound" - } - var strConnectType string - if atomic.LoadInt32(&p.disconnect) == 0 { - strConnectType = "connected" - } else { - strConnectType = "disconnected" - } - - return fmt.Sprintf("[%s] (%s) %v %s", strConnectType, strBoundType, p.conn.RemoteAddr(), p.version) - -} - -func (p *Peer) RlpData() []interface{} { - return []interface{}{p.host, p.port, p.pubkey} -} - -func packAddr(address, _port string) (host []byte, port uint16) { - p, _ := strconv.Atoi(_port) - port = uint16(p) - - h := net.ParseIP(address) - if ip := h.To4(); ip != nil { - host = []byte(ip) - } else { - host = []byte(h) - } - - return -} - -func unpackAddr(value *ethutil.Value, p uint64) string { - host, _ := net.IP(value.Bytes()).MarshalText() - prt := strconv.Itoa(int(p)) - - return net.JoinHostPort(string(host), prt) -} diff --git a/pow/ezp/pow.go b/pow/ezp/pow.go index bfe3ea098..f669f8aa4 100644 --- a/pow/ezp/pow.go +++ b/pow/ezp/pow.go @@ -59,7 +59,7 @@ func (pow *EasyPow) Search(block pow.Block, stop <-chan struct{}) []byte { } sha := crypto.Sha3(big.NewInt(r.Int63()).Bytes()) - if pow.verify(hash, diff, sha) { + if verify(hash, diff, sha) { return sha } } @@ -72,7 +72,11 @@ func (pow *EasyPow) Search(block pow.Block, stop <-chan struct{}) []byte { return nil } -func (pow *EasyPow) verify(hash []byte, diff *big.Int, nonce []byte) bool { +func (pow *EasyPow) Verify(block pow.Block) bool { + return Verify(block) +} + +func verify(hash []byte, diff *big.Int, nonce []byte) bool { sha := sha3.NewKeccak256() d := append(hash, nonce...) @@ -84,6 +88,6 @@ func (pow *EasyPow) verify(hash []byte, diff *big.Int, nonce []byte) bool { return res.Cmp(verification) <= 0 } -func (pow *EasyPow) Verify(block pow.Block) bool { - return pow.verify(block.HashNoNonce(), block.Diff(), block.N()) +func Verify(block pow.Block) bool { + return verify(block.HashNoNonce(), block.Diff(), block.N()) } diff --git a/ui/qt/qwhisper/whisper.go b/ui/qt/qwhisper/whisper.go index bed23c8a7..8f05c0695 100644 --- a/ui/qt/qwhisper/whisper.go +++ b/ui/qt/qwhisper/whisper.go @@ -1,11 +1,14 @@ package qwhisper import ( + "fmt" "time" + "unsafe" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethutil" "github.com/ethereum/go-ethereum/whisper" + "gopkg.in/qml.v1" ) func fromHex(s string) []byte { @@ -16,27 +19,44 @@ func fromHex(s string) []byte { } func toHex(b []byte) string { return "0x" + ethutil.Bytes2Hex(b) } +type Watch struct { +} + +func (self *Watch) Arrived(v unsafe.Pointer) { + fmt.Println(v) +} + type Whisper struct { *whisper.Whisper + view qml.Object + + watches map[int]*Watch } func New(w *whisper.Whisper) *Whisper { - return &Whisper{w} + return &Whisper{w, nil, make(map[int]*Watch)} } -func (self *Whisper) Post(data string, pow, ttl uint32, to, from string) { +func (self *Whisper) SetView(view qml.Object) { + self.view = view +} + +func (self *Whisper) Post(data string, to, from string, topics []string, pow, ttl uint32) { msg := whisper.NewMessage(fromHex(data)) envelope, err := msg.Seal(time.Duration(pow), whisper.Opts{ - Ttl: time.Duration(ttl), - To: crypto.ToECDSAPub(fromHex(to)), - From: crypto.ToECDSA(fromHex(from)), + Ttl: time.Duration(ttl), + To: crypto.ToECDSAPub(fromHex(to)), + From: crypto.ToECDSA(fromHex(from)), + Topics: whisper.TopicsFromString(topics...), }) if err != nil { + fmt.Println(err) // handle error return } if err := self.Whisper.Send(envelope); err != nil { + fmt.Println(err) // handle error return } @@ -46,16 +66,19 @@ func (self *Whisper) NewIdentity() string { return toHex(self.Whisper.NewIdentity().D.Bytes()) } -func (self *Whisper) HasIdentify(key string) bool { +func (self *Whisper) HasIdentity(key string) bool { return self.Whisper.HasIdentity(crypto.ToECDSA(fromHex(key))) } -func (self *Whisper) Watch(opts map[string]interface{}) { +func (self *Whisper) Watch(opts map[string]interface{}) *Watch { filter := filterFromMap(opts) filter.Fn = func(msg *whisper.Message) { - // TODO POST TO QT WINDOW + fmt.Println(msg) } - self.Whisper.Watch(filter) + i := self.Whisper.Watch(filter) + self.watches[i] = &Watch{} + + return self.watches[i] } func filterFromMap(opts map[string]interface{}) (f whisper.Filter) { @@ -65,6 +88,11 @@ func filterFromMap(opts map[string]interface{}) (f whisper.Filter) { if from, ok := opts["from"].(string); ok { f.From = crypto.ToECDSAPub(fromHex(from)) } + if topicList, ok := opts["topics"].(*qml.List); ok { + var topics []string + topicList.Convert(&topics) + f.Topics = whisper.TopicsFromString(topics...) + } return } diff --git a/ui/qt/qwhisper/whisper_test.go b/ui/qt/qwhisper/whisper_test.go new file mode 100644 index 000000000..efa4e6238 --- /dev/null +++ b/ui/qt/qwhisper/whisper_test.go @@ -0,0 +1,15 @@ +package qwhisper + +import ( + "testing" + + "github.com/ethereum/go-ethereum/whisper" +) + +func TestHasIdentity(t *testing.T) { + qw := New(whisper.New()) + id := qw.NewIdentity() + if !qw.HasIdentity(id) { + t.Error("expected to have identity") + } +} diff --git a/whisper/doc.go b/whisper/doc.go new file mode 100644 index 000000000..986df8fb9 --- /dev/null +++ b/whisper/doc.go @@ -0,0 +1,16 @@ +/* +Package whisper implements the Whisper PoC-1. + +(https://github.com/ethereum/wiki/wiki/Whisper-PoC-1-Protocol-Spec) + +Whisper combines aspects of both DHTs and datagram messaging systems (e.g. UDP). +As such it may be likened and compared to both, not dissimilar to the +matter/energy duality (apologies to physicists for the blatant abuse of a +fundamental and beautiful natural principle). + +Whisper is a pure identity-based messaging system. Whisper provides a low-level +(non-application-specific) but easily-accessible API without being based upon +or prejudiced by the low-level hardware attributes and characteristics, +particularly the notion of singular endpoints. +*/ +package whisper diff --git a/whisper/envelope.go b/whisper/envelope.go index 683e88128..066e20f6a 100644 --- a/whisper/envelope.go +++ b/whisper/envelope.go @@ -11,6 +11,7 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethutil" "github.com/ethereum/go-ethereum/rlp" + "github.com/obscuren/ecies" ) const ( @@ -73,10 +74,15 @@ func (self *Envelope) Open(prv *ecdsa.PrivateKey) (msg *Message, err error) { message.Flags = data[0] message.Signature = data[1:66] } - message.Payload = data[dataStart:] + + payload := data[dataStart:] if prv != nil { - message.Payload, err = crypto.Decrypt(prv, message.Payload) - if err != nil { + message.Payload, err = crypto.Decrypt(prv, payload) + switch err { + case ecies.ErrInvalidPublicKey: // Payload isn't encrypted + message.Payload = payload + return &message, err + default: return nil, fmt.Errorf("unable to open envelope. Decrypt failed: %v", err) } } diff --git a/whisper/main.go b/whisper/main.go index 2ee2f3ff1..edd5f7004 100644 --- a/whisper/main.go +++ b/whisper/main.go @@ -5,10 +5,8 @@ package main import ( "fmt" "log" - "net" "os" - "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/whisper" @@ -20,12 +18,12 @@ func main() { pub, _ := secp256k1.GenerateKeyPair() - whisper := whisper.New(&event.TypeMux{}) + whisper := whisper.New() srv := p2p.Server{ MaxPeers: 10, Identity: p2p.NewSimpleClientIdentity("whisper-go", "1.0", "", string(pub)), - ListenAddr: ":30303", + ListenAddr: ":30300", NAT: p2p.UPNP(), Protocols: []p2p.Protocol{whisper.Protocol()}, @@ -35,13 +33,5 @@ func main() { os.Exit(1) } - // add seed peers - seed, err := net.ResolveTCPAddr("tcp", "poc-7.ethdev.com:30300") - if err != nil { - fmt.Println("couldn't resolve:", err) - os.Exit(1) - } - srv.SuggestPeer(seed.IP, seed.Port, nil) - select {} } diff --git a/whisper/util.go b/whisper/util.go index abef1d667..7a222395f 100644 --- a/whisper/util.go +++ b/whisper/util.go @@ -18,10 +18,19 @@ func Topics(data [][]byte) [][]byte { return d } -func TopicsFromString(data []string) [][]byte { +func TopicsFromString(data ...string) [][]byte { d := make([][]byte, len(data)) for i, str := range data { d[i] = hashTopic([]byte(str)) } return d } + +func bytesToMap(s [][]byte) map[string]struct{} { + m := make(map[string]struct{}) + for _, topic := range s { + m[string(topic)] = struct{}{} + } + + return m +} diff --git a/whisper/whisper.go b/whisper/whisper.go index 356debd1c..9721ca9f9 100644 --- a/whisper/whisper.go +++ b/whisper/whisper.go @@ -4,13 +4,14 @@ import ( "bytes" "crypto/ecdsa" "errors" - "fmt" "sync" "time" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/event/filter" + "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/p2p" + "github.com/obscuren/ecies" "gopkg.in/fatih/set.v0" ) @@ -47,6 +48,8 @@ type MessageEvent struct { const DefaultTtl = 50 * time.Second +var wlogger = logger.NewLogger("SHH") + type Whisper struct { protocol p2p.Protocol filters *filter.Filters @@ -68,17 +71,6 @@ func New() *Whisper { quit: make(chan struct{}), } whisper.filters.Start() - go whisper.update() - - // XXX TODO REMOVE TESTING CODE - msg := NewMessage([]byte(fmt.Sprintf("Hello world. This is whisper-go. Incase you're wondering; the time is %v", time.Now()))) - envelope, _ := msg.Seal(DefaultPow, Opts{ - Ttl: DefaultTtl, - }) - if err := whisper.Send(envelope); err != nil { - fmt.Println(err) - } - // XXX TODO REMOVE TESTING CODE // p2p whisper sub protocol handler whisper.protocol = p2p.Protocol{ @@ -91,6 +83,11 @@ func New() *Whisper { return whisper } +func (self *Whisper) Start() { + wlogger.Infoln("Whisper started") + go self.update() +} + func (self *Whisper) Stop() { close(self.quit) } @@ -122,6 +119,7 @@ func (self *Whisper) Watch(opts Filter) int { return self.filters.Install(filter.Generic{ Str1: string(crypto.FromECDSA(opts.To)), Str2: string(crypto.FromECDSAPub(opts.From)), + Data: bytesToMap(opts.Topics), Fn: func(data interface{}) { opts.Fn(data.(*Message)) }, @@ -230,13 +228,14 @@ func (self *Whisper) envelopes() (envelopes []*Envelope) { func (self *Whisper) postEvent(envelope *Envelope) { for _, key := range self.keys { - if message, err := envelope.Open(key); err == nil { + if message, err := envelope.Open(key); err == nil || (err != nil && err == ecies.ErrInvalidPublicKey) { // Create a custom filter? self.filters.Notify(filter.Generic{ Str1: string(crypto.FromECDSA(key)), Str2: string(crypto.FromECDSAPub(message.Recover())), + Data: bytesToMap(envelope.Topics), }, message) } else { - fmt.Println(err) + wlogger.Infoln(err) } } } diff --git a/wire/.gitignore b/wire/.gitignore deleted file mode 100644 index f725d58d1..000000000 --- a/wire/.gitignore +++ /dev/null @@ -1,12 +0,0 @@ -# See http://help.github.com/ignore-files/ for more about ignoring files. -# -# If you find yourself ignoring temporary files generated by your text editor -# or operating system, you probably want to add a global ignore instead: -# git config --global core.excludesfile ~/.gitignore_global - -/tmp -*/**/*un~ -*un~ -.DS_Store -*/**/.DS_Store - diff --git a/wire/README.md b/wire/README.md deleted file mode 100644 index 7f63688b3..000000000 --- a/wire/README.md +++ /dev/null @@ -1,36 +0,0 @@ -# ethwire - -The ethwire package contains the ethereum wire protocol. The ethwire -package is required to write and read from the ethereum network. - -# Installation - -`go get github.com/ethereum/ethwire-go` - -# Messaging overview - -The Ethereum Wire protocol defines the communication between the nodes -running Ethereum. Further reader reading can be done on the -[Wiki](http://wiki.ethereum.org/index.php/Wire_Protocol). - -# Reading Messages - -```go -// Read and validate the next eth message from the provided connection. -// returns a error message with the details. -msg, err := ethwire.ReadMessage(conn) -if err != nil { - // Handle error -} -``` - -# Writing Messages - -```go -// Constructs a message which can be interpreted by the eth network. -// Write the inventory to network -err := ethwire.WriteMessage(conn, &Msg{ - Type: ethwire.MsgInvTy, - Data : []interface{}{...}, -}) -``` diff --git a/wire/client_identity.go b/wire/client_identity.go deleted file mode 100644 index 0a268024a..000000000 --- a/wire/client_identity.go +++ /dev/null @@ -1,56 +0,0 @@ -package wire - -import ( - "fmt" - "runtime" -) - -// should be used in Peer handleHandshake, incorporate Caps, ProtocolVersion, Pubkey etc. -type ClientIdentity interface { - String() string -} - -type SimpleClientIdentity struct { - clientIdentifier string - version string - customIdentifier string - os string - implementation string -} - -func NewSimpleClientIdentity(clientIdentifier string, version string, customIdentifier string) *SimpleClientIdentity { - clientIdentity := &SimpleClientIdentity{ - clientIdentifier: clientIdentifier, - version: version, - customIdentifier: customIdentifier, - os: runtime.GOOS, - implementation: runtime.Version(), - } - - return clientIdentity -} - -func (c *SimpleClientIdentity) init() { -} - -func (c *SimpleClientIdentity) String() string { - var id string - if len(c.customIdentifier) > 0 { - id = "/" + c.customIdentifier - } - - return fmt.Sprintf("%s/v%s%s/%s/%s", - c.clientIdentifier, - c.version, - id, - c.os, - c.implementation) -} - -func (c *SimpleClientIdentity) SetCustomIdentifier(customIdentifier string) { - c.customIdentifier = customIdentifier -} - -func (c *SimpleClientIdentity) GetCustomIdentifier() string { - return c.customIdentifier -} diff --git a/wire/client_identity_test.go b/wire/client_identity_test.go deleted file mode 100644 index c0e7a0159..000000000 --- a/wire/client_identity_test.go +++ /dev/null @@ -1,30 +0,0 @@ -package wire - -import ( - "fmt" - "runtime" - "testing" -) - -func TestClientIdentity(t *testing.T) { - clientIdentity := NewSimpleClientIdentity("Ethereum(G)", "0.5.16", "test") - clientString := clientIdentity.String() - expected := fmt.Sprintf("Ethereum(G)/v0.5.16/test/%s/%s", runtime.GOOS, runtime.Version()) - if clientString != expected { - t.Errorf("Expected clientIdentity to be %q, got %q", expected, clientString) - } - customIdentifier := clientIdentity.GetCustomIdentifier() - if customIdentifier != "test" { - t.Errorf("Expected clientIdentity.GetCustomIdentifier() to be 'test', got %q", customIdentifier) - } - clientIdentity.SetCustomIdentifier("test2") - customIdentifier = clientIdentity.GetCustomIdentifier() - if customIdentifier != "test2" { - t.Errorf("Expected clientIdentity.GetCustomIdentifier() to be 'test2', got %q", customIdentifier) - } - clientString = clientIdentity.String() - expected = fmt.Sprintf("Ethereum(G)/v0.5.16/test2/%s/%s", runtime.GOOS, runtime.Version()) - if clientString != expected { - t.Errorf("Expected clientIdentity to be %q, got %q", expected, clientString) - } -} diff --git a/wire/messages2.go b/wire/messages2.go deleted file mode 100644 index acbd9e0d5..000000000 --- a/wire/messages2.go +++ /dev/null @@ -1,199 +0,0 @@ -package wire - -import ( - "bytes" - "errors" - "fmt" - "net" - "time" - - "github.com/ethereum/go-ethereum/ethutil" -) - -// The connection object allows you to set up a connection to the Ethereum network. -// The Connection object takes care of all encoding and sending objects properly over -// the network. -type Connection struct { - conn net.Conn - nTimeout time.Duration - pendingMessages Messages -} - -// Create a new connection to the Ethereum network -func New(conn net.Conn) *Connection { - return &Connection{conn: conn, nTimeout: 500} -} - -// Read, reads from the network. It will block until the next message is received. -func (self *Connection) Read() *Msg { - if len(self.pendingMessages) == 0 { - self.readMessages() - } - - ret := self.pendingMessages[0] - self.pendingMessages = self.pendingMessages[1:] - - return ret - -} - -// Write to the Ethereum network specifying the type of the message and -// the data. Data can be of type RlpEncodable or []interface{}. Returns -// nil or if something went wrong an error. -func (self *Connection) Write(typ MsgType, v ...interface{}) error { - var pack []byte - - slice := [][]interface{}{[]interface{}{byte(typ)}} - for _, value := range v { - if encodable, ok := value.(ethutil.RlpEncodeDecode); ok { - slice = append(slice, encodable.RlpValue()) - } else if raw, ok := value.([]interface{}); ok { - slice = append(slice, raw) - } else { - panic(fmt.Sprintf("Unable to 'write' object of type %T", value)) - } - } - - // Encode the type and the (RLP encoded) data for sending over the wire - encoded := ethutil.NewValue(slice).Encode() - payloadLength := ethutil.NumberToBytes(uint32(len(encoded)), 32) - - // Write magic token and payload length (first 8 bytes) - pack = append(MagicToken, payloadLength...) - pack = append(pack, encoded...) - - // Write to the connection - _, err := self.conn.Write(pack) - if err != nil { - return err - } - - return nil -} - -func (self *Connection) readMessage(data []byte) (msg *Msg, remaining []byte, done bool, err error) { - if len(data) == 0 { - return nil, nil, true, nil - } - - if len(data) <= 8 { - return nil, remaining, false, errors.New("Invalid message") - } - - // Check if the received 4 first bytes are the magic token - if bytes.Compare(MagicToken, data[:4]) != 0 { - return nil, nil, false, fmt.Errorf("MagicToken mismatch. Received %v", data[:4]) - } - - messageLength := ethutil.BytesToNumber(data[4:8]) - remaining = data[8+messageLength:] - if int(messageLength) > len(data[8:]) { - return nil, nil, false, fmt.Errorf("message length %d, expected %d", len(data[8:]), messageLength) - } - - message := data[8 : 8+messageLength] - decoder := ethutil.NewValueFromBytes(message) - // Type of message - t := decoder.Get(0).Uint() - // Actual data - d := decoder.SliceFrom(1) - - msg = &Msg{ - Type: MsgType(t), - Data: d, - } - - return -} - -// The basic message reader waits for data on the given connection, decoding -// and doing a few sanity checks such as if there's a data type and -// unmarhals the given data -func (self *Connection) readMessages() (err error) { - // The recovering function in case anything goes horribly wrong - defer func() { - if r := recover(); r != nil { - err = fmt.Errorf("wire.ReadMessage error: %v", r) - } - }() - - // Buff for writing network message to - //buff := make([]byte, 1440) - var buff []byte - var totalBytes int - for { - // Give buffering some time - self.conn.SetReadDeadline(time.Now().Add(self.nTimeout * time.Millisecond)) - // Create a new temporarily buffer - b := make([]byte, 1440) - // Wait for a message from this peer - n, _ := self.conn.Read(b) - if err != nil && n == 0 { - if err.Error() != "EOF" { - fmt.Println("err now", err) - return err - } else { - break - } - - // Messages can't be empty - } else if n == 0 { - break - } - - buff = append(buff, b[:n]...) - totalBytes += n - } - - // Reslice buffer - buff = buff[:totalBytes] - msg, remaining, done, err := self.readMessage(buff) - for ; done != true; msg, remaining, done, err = self.readMessage(remaining) { - //log.Println("rx", msg) - - if msg != nil { - self.pendingMessages = append(self.pendingMessages, msg) - } - } - - return -} - -func ReadMessage(data []byte) (msg *Msg, remaining []byte, done bool, err error) { - if len(data) == 0 { - return nil, nil, true, nil - } - - if len(data) <= 8 { - return nil, remaining, false, errors.New("Invalid message") - } - - // Check if the received 4 first bytes are the magic token - if bytes.Compare(MagicToken, data[:4]) != 0 { - return nil, nil, false, fmt.Errorf("MagicToken mismatch. Received %v", data[:4]) - } - - messageLength := ethutil.BytesToNumber(data[4:8]) - remaining = data[8+messageLength:] - if int(messageLength) > len(data[8:]) { - return nil, nil, false, fmt.Errorf("message length %d, expected %d", len(data[8:]), messageLength) - } - - message := data[8 : 8+messageLength] - decoder := ethutil.NewValueFromBytes(message) - // Type of message - t := decoder.Get(0).Uint() - // Actual data - d := decoder.SliceFrom(1) - - msg = &Msg{ - Type: MsgType(t), - Data: d, - } - - return -} - -func bufferedRead(conn net.Conn) ([]byte, error) { - return nil, nil -} diff --git a/wire/messaging.go b/wire/messaging.go deleted file mode 100644 index 9c6cb5944..000000000 --- a/wire/messaging.go +++ /dev/null @@ -1,178 +0,0 @@ -// Package wire provides low level access to the Ethereum network and allows -// you to broadcast data over the network. -package wire - -import ( - "bytes" - "fmt" - "net" - "time" - - "github.com/ethereum/go-ethereum/ethutil" -) - -// Connection interface describing the methods required to implement the wire protocol. -type Conn interface { - Write(typ MsgType, v ...interface{}) error - Read() *Msg -} - -// The magic token which should be the first 4 bytes of every message and can be used as separator between messages. -var MagicToken = []byte{34, 64, 8, 145} - -type MsgType byte - -const ( - // Values are given explicitly instead of by iota because these values are - // defined by the wire protocol spec; it is easier for humans to ensure - // correctness when values are explicit. - MsgHandshakeTy = 0x00 - MsgDiscTy = 0x01 - MsgPingTy = 0x02 - MsgPongTy = 0x03 - MsgGetPeersTy = 0x04 - MsgPeersTy = 0x05 - - MsgStatusTy = 0x10 - MsgTxTy = 0x12 - MsgGetBlockHashesTy = 0x13 - MsgBlockHashesTy = 0x14 - MsgGetBlocksTy = 0x15 - MsgBlockTy = 0x16 - MsgNewBlockTy = 0x17 -) - -var msgTypeToString = map[MsgType]string{ - MsgHandshakeTy: "Handshake", - MsgDiscTy: "Disconnect", - MsgPingTy: "Ping", - MsgPongTy: "Pong", - MsgGetPeersTy: "Get peers", - MsgStatusTy: "Status", - MsgPeersTy: "Peers", - MsgTxTy: "Transactions", - MsgBlockTy: "Blocks", - //MsgGetTxsTy: "Get Txs", - MsgGetBlockHashesTy: "Get block hashes", - MsgBlockHashesTy: "Block hashes", - MsgGetBlocksTy: "Get blocks", -} - -func (mt MsgType) String() string { - return msgTypeToString[mt] -} - -type Msg struct { - Type MsgType // Specifies how the encoded data should be interpreted - //Data []byte - Data *ethutil.Value -} - -func NewMessage(msgType MsgType, data interface{}) *Msg { - return &Msg{ - Type: msgType, - Data: ethutil.NewValue(data), - } -} - -type Messages []*Msg - -// The basic message reader waits for data on the given connection, decoding -// and doing a few sanity checks such as if there's a data type and -// unmarhals the given data -func ReadMessages(conn net.Conn) (msgs []*Msg, err error) { - // The recovering function in case anything goes horribly wrong - defer func() { - if r := recover(); r != nil { - err = fmt.Errorf("wire.ReadMessage error: %v", r) - } - }() - - var ( - buff []byte - messages [][]byte - msgLength int - ) - - for { - // Give buffering some time - conn.SetReadDeadline(time.Now().Add(5 * time.Millisecond)) - // Create a new temporarily buffer - b := make([]byte, 1440) - n, _ := conn.Read(b) - if err != nil && n == 0 { - if err.Error() != "EOF" { - fmt.Println("err now", err) - return nil, err - } else { - break - } - } - - if n == 0 && len(buff) == 0 { - // If there's nothing on the wire wait for a bit - time.Sleep(200 * time.Millisecond) - - continue - } - - buff = append(buff, b[:n]...) - if msgLength == 0 { - // Check if the received 4 first bytes are the magic token - if bytes.Compare(MagicToken, buff[:4]) != 0 { - return nil, fmt.Errorf("MagicToken mismatch. Received %v", buff[:4]) - } - - // Read the length of the message - msgLength = int(ethutil.BytesToNumber(buff[4:8])) - - // Remove the token and length - buff = buff[8:] - } - - if len(buff) >= msgLength { - messages = append(messages, buff[:msgLength]) - buff = buff[msgLength:] - msgLength = 0 - - if len(buff) == 0 { - break - } - } - } - - for _, m := range messages { - decoder := ethutil.NewValueFromBytes(m) - // Type of message - t := decoder.Get(0).Uint() - // Actual data - d := decoder.SliceFrom(1) - - msgs = append(msgs, &Msg{Type: MsgType(t), Data: d}) - } - - return -} - -// The basic message writer takes care of writing data over the given -// connection and does some basic error checking -func WriteMessage(conn net.Conn, msg *Msg) error { - var pack []byte - - // Encode the type and the (RLP encoded) data for sending over the wire - encoded := ethutil.NewValue(append([]interface{}{byte(msg.Type)}, msg.Data.Slice()...)).Encode() - payloadLength := ethutil.NumberToBytes(uint32(len(encoded)), 32) - - // Write magic token and payload length (first 8 bytes) - pack = append(MagicToken, payloadLength...) - pack = append(pack, encoded...) - //fmt.Printf("payload %v (%v) %q\n", msg.Type, conn.RemoteAddr(), encoded) - - // Write to the connection - _, err := conn.Write(pack) - if err != nil { - return err - } - - return nil -} diff --git a/xeth/hexface.go b/xeth/hexface.go index bfd2dddd9..6c084f947 100644 --- a/xeth/hexface.go +++ b/xeth/hexface.go @@ -3,7 +3,6 @@ package xeth import ( "bytes" "encoding/json" - "sync/atomic" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" @@ -63,12 +62,8 @@ func (self *JSXEth) PeerCount() int { func (self *JSXEth) Peers() []JSPeer { var peers []JSPeer - for peer := self.obj.Peers().Front(); peer != nil; peer = peer.Next() { - p := peer.Value.(core.Peer) - // we only want connected peers - if atomic.LoadInt32(p.Connected()) != 0 { - peers = append(peers, *NewJSPeer(p)) - } + for _, peer := range self.obj.Peers() { + peers = append(peers, *NewJSPeer(peer)) } return peers diff --git a/xeth/js_types.go b/xeth/js_types.go index 62867d6a9..987edce37 100644 --- a/xeth/js_types.go +++ b/xeth/js_types.go @@ -1,14 +1,13 @@ package xeth import ( - "fmt" - "strconv" "strings" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethutil" + "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/state" ) @@ -155,38 +154,36 @@ func NewPReciept(contractCreation bool, creationAddress, hash, address []byte) * // Peer interface exposed to QML type JSPeer struct { - ref *core.Peer - Inbound bool `json:"isInbound"` - LastSend int64 `json:"lastSend"` - LastPong int64 `json:"lastPong"` - Ip string `json:"ip"` - Port int `json:"port"` - Version string `json:"version"` - LastResponse string `json:"lastResponse"` - Latency string `json:"latency"` - Caps string `json:"caps"` -} - -func NewJSPeer(peer core.Peer) *JSPeer { - if peer == nil { - return nil - } - - var ip []string - for _, i := range peer.Host() { - ip = append(ip, strconv.Itoa(int(i))) - } - ipAddress := strings.Join(ip, ".") - - var caps []string - capsIt := peer.Caps().NewIterator() - for capsIt.Next() { - cap := capsIt.Value().Get(0).Str() - ver := capsIt.Value().Get(1).Uint() - caps = append(caps, fmt.Sprintf("%s/%d", cap, ver)) - } - - return &JSPeer{ref: &peer, Inbound: peer.Inbound(), LastSend: peer.LastSend().Unix(), LastPong: peer.LastPong(), Version: peer.Version(), Ip: ipAddress, Port: int(peer.Port()), Latency: peer.PingTime(), Caps: "[" + strings.Join(caps, ", ") + "]"} + ref *p2p.Peer + // Inbound bool `json:"isInbound"` + // LastSend int64 `json:"lastSend"` + // LastPong int64 `json:"lastPong"` + // Ip string `json:"ip"` + // Port int `json:"port"` + // Version string `json:"version"` + // LastResponse string `json:"lastResponse"` + // Latency string `json:"latency"` + // Caps string `json:"caps"` +} + +func NewJSPeer(peer *p2p.Peer) *JSPeer { + + // var ip []string + // for _, i := range peer.Host() { + // ip = append(ip, strconv.Itoa(int(i))) + // } + // ipAddress := strings.Join(ip, ".") + + // var caps []string + // capsIt := peer.Caps().NewIterator() + // for capsIt.Next() { + // cap := capsIt.Value().Get(0).Str() + // ver := capsIt.Value().Get(1).Uint() + // caps = append(caps, fmt.Sprintf("%s/%d", cap, ver)) + // } + + return &JSPeer{ref: peer} + // return &JSPeer{ref: &peer, Inbound: peer.Inbound(), LastSend: peer.LastSend().Unix(), LastPong: peer.LastPong(), Version: peer.Version(), Ip: ipAddress, Port: int(peer.Port()), Latency: peer.PingTime(), Caps: "[" + strings.Join(caps, ", ") + "]"} } type JSReceipt struct { diff --git a/xeth/world.go b/xeth/world.go index 956ef1e15..008a08423 100644 --- a/xeth/world.go +++ b/xeth/world.go @@ -1,8 +1,7 @@ package xeth import ( - "container/list" - + "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/state" ) @@ -55,7 +54,7 @@ func (self *World) IsListening() bool { return self.pipe.obj.IsListening() } -func (self *World) Peers() *list.List { +func (self *World) Peers() []*p2p.Peer { return self.pipe.obj.Peers() } |