aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--block_pool.go351
-rw-r--r--cmd/ethereum/cmd.go2
-rw-r--r--cmd/ethereum/flags.go6
-rw-r--r--cmd/ethereum/main.go4
-rw-r--r--cmd/ethereum/repl/repl.go2
-rw-r--r--cmd/mist/assets/qml/main.qml1
-rw-r--r--cmd/mist/assets/qml/views/whisper.qml47
-rw-r--r--cmd/mist/flags.go4
-rw-r--r--cmd/mist/gui.go44
-rw-r--r--cmd/mist/main.go16
-rw-r--r--cmd/mist/ui_lib.go31
-rw-r--r--cmd/utils/cmd.go38
-rw-r--r--cmd/utils/websockets.go2
-rw-r--r--core/block_manager.go8
-rw-r--r--core/chain_manager.go9
-rw-r--r--core/events.go3
-rw-r--r--core/transaction_pool.go30
-rw-r--r--core/types/common.go5
-rw-r--r--eth/backend.go249
-rw-r--r--eth/block_pool.go1015
-rw-r--r--eth/block_pool_test.go198
-rw-r--r--eth/error.go71
-rw-r--r--eth/peer_util.go23
-rw-r--r--eth/protocol.go319
-rw-r--r--eth/protocol_test.go232
-rw-r--r--ethereum.go659
-rw-r--r--event/filter/generic_filter.go20
-rw-r--r--event/filter/old_filter.go94
-rw-r--r--events.go11
-rw-r--r--javascript/javascript_runtime.go4
-rw-r--r--javascript/types.go2
-rw-r--r--miner/miner.go7
-rw-r--r--nat.go12
-rw-r--r--natpmp.go55
-rw-r--r--natupnp.go338
-rw-r--r--p2p/server.go10
-rw-r--r--peer.go881
-rw-r--r--pow/ezp/pow.go12
-rw-r--r--ui/qt/qwhisper/whisper.go46
-rw-r--r--ui/qt/qwhisper/whisper_test.go15
-rw-r--r--whisper/doc.go16
-rw-r--r--whisper/envelope.go12
-rw-r--r--whisper/main.go14
-rw-r--r--whisper/util.go11
-rw-r--r--whisper/whisper.go27
-rw-r--r--wire/.gitignore12
-rw-r--r--wire/README.md36
-rw-r--r--wire/client_identity.go56
-rw-r--r--wire/client_identity_test.go30
-rw-r--r--wire/messages2.go199
-rw-r--r--wire/messaging.go178
-rw-r--r--xeth/hexface.go9
-rw-r--r--xeth/js_types.go65
-rw-r--r--xeth/world.go5
54 files changed, 2541 insertions, 3005 deletions
diff --git a/block_pool.go b/block_pool.go
deleted file mode 100644
index c618f6993..000000000
--- a/block_pool.go
+++ /dev/null
@@ -1,351 +0,0 @@
-package eth
-
-import (
- "bytes"
- "container/list"
- "fmt"
- "math"
- "math/big"
- "sync"
- "time"
-
- "github.com/ethereum/go-ethereum/core/types"
- "github.com/ethereum/go-ethereum/ethutil"
- "github.com/ethereum/go-ethereum/logger"
- "github.com/ethereum/go-ethereum/wire"
-)
-
-var poollogger = logger.NewLogger("BPOOL")
-
-type block struct {
- from *Peer
- peer *Peer
- block *types.Block
- reqAt time.Time
- requested int
-}
-
-type BlockPool struct {
- mut sync.Mutex
-
- eth *Ethereum
-
- hashes [][]byte
- pool map[string]*block
-
- td *big.Int
- quit chan bool
-
- fetchingHashes bool
- downloadStartedAt time.Time
-
- ChainLength, BlocksProcessed int
-
- peer *Peer
-}
-
-func NewBlockPool(eth *Ethereum) *BlockPool {
- return &BlockPool{
- eth: eth,
- pool: make(map[string]*block),
- td: ethutil.Big0,
- quit: make(chan bool),
- }
-}
-
-func (self *BlockPool) Len() int {
- return len(self.hashes)
-}
-
-func (self *BlockPool) Reset() {
- self.pool = make(map[string]*block)
- self.hashes = nil
-}
-
-func (self *BlockPool) HasLatestHash() bool {
- self.mut.Lock()
- defer self.mut.Unlock()
-
- return self.pool[string(self.eth.ChainManager().CurrentBlock().Hash())] != nil
-}
-
-func (self *BlockPool) HasCommonHash(hash []byte) bool {
- return self.eth.ChainManager().GetBlock(hash) != nil
-}
-
-func (self *BlockPool) Blocks() (blocks types.Blocks) {
- for _, item := range self.pool {
- if item.block != nil {
- blocks = append(blocks, item.block)
- }
- }
-
- return
-}
-
-func (self *BlockPool) FetchHashes(peer *Peer) bool {
- highestTd := self.eth.HighestTDPeer()
-
- if (self.peer == nil && peer.td.Cmp(highestTd) >= 0) || (self.peer != nil && peer.td.Cmp(self.peer.td) > 0) || self.peer == peer {
- if self.peer != peer {
- poollogger.Infof("Found better suitable peer (%v vs %v)\n", self.td, peer.td)
-
- if self.peer != nil {
- self.peer.doneFetchingHashes = true
- }
- }
-
- self.peer = peer
- self.td = peer.td
-
- if !self.HasLatestHash() {
- self.fetchHashes()
- }
-
- return true
- }
-
- return false
-}
-
-func (self *BlockPool) fetchHashes() {
- peer := self.peer
-
- peer.doneFetchingHashes = false
-
- const amount = 256
- peerlogger.Debugf("Fetching hashes (%d) %x...\n", amount, peer.lastReceivedHash[0:4])
- peer.QueueMessage(wire.NewMessage(wire.MsgGetBlockHashesTy, []interface{}{peer.lastReceivedHash, uint32(amount)}))
-}
-
-func (self *BlockPool) AddHash(hash []byte, peer *Peer) {
- self.mut.Lock()
- defer self.mut.Unlock()
-
- if self.pool[string(hash)] == nil {
- self.pool[string(hash)] = &block{peer, nil, nil, time.Now(), 0}
-
- self.hashes = append([][]byte{hash}, self.hashes...)
- }
-}
-
-func (self *BlockPool) Add(b *types.Block, peer *Peer) {
- self.addBlock(b, peer, false)
-}
-
-func (self *BlockPool) AddNew(b *types.Block, peer *Peer) {
- self.addBlock(b, peer, true)
-}
-
-func (self *BlockPool) addBlock(b *types.Block, peer *Peer, newBlock bool) {
- self.mut.Lock()
- defer self.mut.Unlock()
-
- hash := string(b.Hash())
-
- if self.pool[hash] == nil && !self.eth.ChainManager().HasBlock(b.Hash()) {
- poollogger.Infof("Got unrequested block (%x...)\n", hash[0:4])
-
- self.hashes = append(self.hashes, b.Hash())
- self.pool[hash] = &block{peer, peer, b, time.Now(), 0}
-
- // The following is only performed on an unrequested new block
- if newBlock {
- fmt.Println("1.", !self.eth.ChainManager().HasBlock(b.PrevHash), ethutil.Bytes2Hex(b.Hash()[0:4]), ethutil.Bytes2Hex(b.PrevHash[0:4]))
- fmt.Println("2.", self.pool[string(b.PrevHash)] == nil)
- fmt.Println("3.", !self.fetchingHashes)
- if !self.eth.ChainManager().HasBlock(b.PrevHash) /*&& self.pool[string(b.PrevHash)] == nil*/ && !self.fetchingHashes {
- poollogger.Infof("Unknown chain, requesting (%x...)\n", b.PrevHash[0:4])
- peer.QueueMessage(wire.NewMessage(wire.MsgGetBlockHashesTy, []interface{}{b.Hash(), uint32(256)}))
- }
- }
- } else if self.pool[hash] != nil {
- self.pool[hash].block = b
- }
-
- self.BlocksProcessed++
-}
-
-func (self *BlockPool) Remove(hash []byte) {
- self.mut.Lock()
- defer self.mut.Unlock()
-
- self.hashes = ethutil.DeleteFromByteSlice(self.hashes, hash)
- delete(self.pool, string(hash))
-}
-
-func (self *BlockPool) DistributeHashes() {
- self.mut.Lock()
- defer self.mut.Unlock()
-
- var (
- peerLen = self.eth.peers.Len()
- amount = 256 * peerLen
- dist = make(map[*Peer][][]byte)
- )
-
- num := int(math.Min(float64(amount), float64(len(self.pool))))
- for i, j := 0, 0; i < len(self.hashes) && j < num; i++ {
- hash := self.hashes[i]
- item := self.pool[string(hash)]
-
- if item != nil && item.block == nil {
- var peer *Peer
- lastFetchFailed := time.Since(item.reqAt) > 5*time.Second
-
- // Handle failed requests
- if lastFetchFailed && item.requested > 5 && item.peer != nil {
- if item.requested < 100 {
- // Select peer the hash was retrieved off
- peer = item.from
- } else {
- // Remove it
- self.hashes = ethutil.DeleteFromByteSlice(self.hashes, hash)
- delete(self.pool, string(hash))
- }
- } else if lastFetchFailed || item.peer == nil {
- // Find a suitable, available peer
- eachPeer(self.eth.peers, func(p *Peer, v *list.Element) {
- if peer == nil && len(dist[p]) < amount/peerLen && p.statusKnown {
- peer = p
- }
- })
- }
-
- if peer != nil {
- item.reqAt = time.Now()
- item.peer = peer
- item.requested++
-
- dist[peer] = append(dist[peer], hash)
- }
- }
- }
-
- for peer, hashes := range dist {
- peer.FetchBlocks(hashes)
- }
-
- if len(dist) > 0 {
- self.downloadStartedAt = time.Now()
- }
-}
-
-func (self *BlockPool) Start() {
- go self.downloadThread()
- go self.chainThread()
-}
-
-func (self *BlockPool) Stop() {
- close(self.quit)
-}
-
-func (self *BlockPool) downloadThread() {
- serviceTimer := time.NewTicker(100 * time.Millisecond)
-out:
- for {
- select {
- case <-self.quit:
- break out
- case <-serviceTimer.C:
- // Check if we're catching up. If not distribute the hashes to
- // the peers and download the blockchain
- self.fetchingHashes = false
- eachPeer(self.eth.peers, func(p *Peer, v *list.Element) {
- if p.statusKnown && p.FetchingHashes() {
- self.fetchingHashes = true
- }
- })
-
- if len(self.hashes) > 0 {
- self.DistributeHashes()
- }
-
- if self.ChainLength < len(self.hashes) {
- self.ChainLength = len(self.hashes)
- }
-
- if self.peer != nil &&
- !self.peer.doneFetchingHashes &&
- time.Since(self.peer.lastHashAt) > 10*time.Second &&
- time.Since(self.peer.lastHashRequestedAt) > 5*time.Second {
- self.fetchHashes()
- }
-
- /*
- if !self.fetchingHashes {
- blocks := self.Blocks()
- chain.BlockBy(chain.Number).Sort(blocks)
-
- if len(blocks) > 0 {
- if !self.eth.ChainManager().HasBlock(b.PrevHash) && self.pool[string(b.PrevHash)] == nil && !self.fetchingHashes {
- }
- }
- }
- */
- }
- }
-}
-
-func (self *BlockPool) chainThread() {
- procTimer := time.NewTicker(500 * time.Millisecond)
-out:
- for {
- select {
- case <-self.quit:
- break out
- case <-procTimer.C:
- blocks := self.Blocks()
- types.BlockBy(types.Number).Sort(blocks)
-
- // Find common block
- for i, block := range blocks {
- if self.eth.ChainManager().HasBlock(block.PrevHash) {
- blocks = blocks[i:]
- break
- }
- }
-
- if len(blocks) > 0 {
- if self.eth.ChainManager().HasBlock(blocks[0].PrevHash) {
- for i, block := range blocks[1:] {
- // NOTE: The Ith element in this loop refers to the previous block in
- // outer "blocks"
- if bytes.Compare(block.PrevHash, blocks[i].Hash()) != 0 {
- blocks = blocks[:i]
-
- break
- }
- }
- } else {
- blocks = nil
- }
- }
-
- if len(blocks) > 0 {
- chainman := self.eth.ChainManager()
-
- err := chainman.InsertChain(blocks)
- if err != nil {
- poollogger.Debugln(err)
-
- self.Reset()
-
- if self.peer != nil && self.peer.conn != nil {
- poollogger.Debugf("Punishing peer for supplying bad chain (%v)\n", self.peer.conn.RemoteAddr())
- }
-
- // This peer gave us bad hashes and made us fetch a bad chain, therefor he shall be punished.
- self.eth.BlacklistPeer(self.peer)
- self.peer.StopWithReason(DiscBadPeer)
- self.td = ethutil.Big0
- self.peer = nil
- }
-
- for _, block := range blocks {
- self.Remove(block.Hash())
- }
- }
- }
- }
-}
diff --git a/cmd/ethereum/cmd.go b/cmd/ethereum/cmd.go
index 8710d6136..d8b9ea487 100644
--- a/cmd/ethereum/cmd.go
+++ b/cmd/ethereum/cmd.go
@@ -21,9 +21,9 @@ import (
"io/ioutil"
"os"
- "github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/cmd/ethereum/repl"
"github.com/ethereum/go-ethereum/cmd/utils"
+ "github.com/ethereum/go-ethereum/eth"
"github.com/ethereum/go-ethereum/javascript"
)
diff --git a/cmd/ethereum/flags.go b/cmd/ethereum/flags.go
index 783944cf2..556735491 100644
--- a/cmd/ethereum/flags.go
+++ b/cmd/ethereum/flags.go
@@ -38,7 +38,8 @@ var (
StartRpc bool
StartWebSockets bool
RpcPort int
- UseUPnP bool
+ NatType string
+ PMPGateway string
OutboundPort string
ShowGenesis bool
AddPeer string
@@ -84,7 +85,8 @@ func Init() {
flag.StringVar(&KeyRing, "keyring", "", "identifier for keyring to use")
flag.StringVar(&KeyStore, "keystore", "db", "system to store keyrings: db|file (db)")
flag.StringVar(&OutboundPort, "port", "30303", "listening port")
- flag.BoolVar(&UseUPnP, "upnp", false, "enable UPnP support")
+ flag.StringVar(&NatType, "nat", "", "NAT support (UPNP|PMP) (none)")
+ flag.StringVar(&PMPGateway, "pmp", "", "Gateway IP for PMP")
flag.IntVar(&MaxPeer, "maxpeer", 10, "maximum desired peers")
flag.IntVar(&RpcPort, "rpcport", 8080, "port to start json-rpc server on")
flag.BoolVar(&StartRpc, "rpc", false, "start rpc server")
diff --git a/cmd/ethereum/main.go b/cmd/ethereum/main.go
index 9efc8e9dc..da09e0b58 100644
--- a/cmd/ethereum/main.go
+++ b/cmd/ethereum/main.go
@@ -69,9 +69,9 @@ func main() {
// create, import, export keys
utils.KeyTasks(keyManager, KeyRing, GenAddr, SecretFile, ExportDir, NonInteractive)
- clientIdentity := utils.NewClientIdentity(ClientIdentifier, Version, Identifier)
+ clientIdentity := utils.NewClientIdentity(ClientIdentifier, Version, Identifier, string(keyManager.PublicKey()))
- ethereum := utils.NewEthereum(db, clientIdentity, keyManager, UseUPnP, OutboundPort, MaxPeer)
+ ethereum := utils.NewEthereum(db, clientIdentity, keyManager, utils.NatType(NatType, PMPGateway), OutboundPort, MaxPeer)
if Dump {
var block *types.Block
diff --git a/cmd/ethereum/repl/repl.go b/cmd/ethereum/repl/repl.go
index a5146fecd..4a7880ff4 100644
--- a/cmd/ethereum/repl/repl.go
+++ b/cmd/ethereum/repl/repl.go
@@ -24,7 +24,7 @@ import (
"os"
"path"
- "github.com/ethereum/go-ethereum"
+ "github.com/ethereum/go-ethereum/eth"
"github.com/ethereum/go-ethereum/ethutil"
"github.com/ethereum/go-ethereum/javascript"
"github.com/ethereum/go-ethereum/logger"
diff --git a/cmd/mist/assets/qml/main.qml b/cmd/mist/assets/qml/main.qml
index 9f1f214a6..285757080 100644
--- a/cmd/mist/assets/qml/main.qml
+++ b/cmd/mist/assets/qml/main.qml
@@ -50,6 +50,7 @@ ApplicationWindow {
addPlugin("./views/miner.qml", {noAdd: true, close: false, section: "ethereum", active: true});
addPlugin("./views/transaction.qml", {noAdd: true, close: false, section: "legacy"});
+ addPlugin("./views/whisper.qml", {noAdd: true, close: false, section: "legacy"});
addPlugin("./views/chain.qml", {noAdd: true, close: false, section: "legacy"});
addPlugin("./views/pending_tx.qml", {noAdd: true, close: false, section: "legacy"});
addPlugin("./views/info.qml", {noAdd: true, close: false, section: "legacy"});
diff --git a/cmd/mist/assets/qml/views/whisper.qml b/cmd/mist/assets/qml/views/whisper.qml
new file mode 100644
index 000000000..b50841ba5
--- /dev/null
+++ b/cmd/mist/assets/qml/views/whisper.qml
@@ -0,0 +1,47 @@
+
+import QtQuick 2.0
+import QtQuick.Controls 1.0;
+import QtQuick.Layouts 1.0;
+import QtQuick.Dialogs 1.0;
+import QtQuick.Window 2.1;
+import QtQuick.Controls.Styles 1.1
+import Ethereum 1.0
+
+Rectangle {
+ id: root
+ property var title: "Whisper"
+ property var iconSource: "../facet.png"
+ property var menuItem
+
+ objectName: "whisperView"
+ anchors.fill: parent
+
+ property var identity: ""
+ Component.onCompleted: {
+ identity = shh.newIdentity()
+ console.log("New identity:", identity)
+
+ var t = shh.watch({topics: ["chat"]})
+ }
+
+ RowLayout {
+ TextField {
+ id: to
+ placeholderText: "To"
+ }
+ TextField {
+ id: data
+ placeholderText: "Data"
+ }
+ TextField {
+ id: topics
+ placeholderText: "topic1, topic2, topic3, ..."
+ }
+ Button {
+ text: "Send"
+ onClicked: {
+ shh.post(eth.toHex(data.text), "", identity, topics.text.split(","), 500, 50)
+ }
+ }
+ }
+}
diff --git a/cmd/mist/flags.go b/cmd/mist/flags.go
index 2ae0a0487..1d77532d9 100644
--- a/cmd/mist/flags.go
+++ b/cmd/mist/flags.go
@@ -36,10 +36,12 @@ var (
Identifier string
KeyRing string
KeyStore string
+ PMPGateway string
StartRpc bool
StartWebSockets bool
RpcPort int
UseUPnP bool
+ NatType string
OutboundPort string
ShowGenesis bool
AddPeer string
@@ -111,10 +113,12 @@ func Init() {
flag.BoolVar(&NonInteractive, "y", false, "non-interactive mode (say yes to confirmations)")
flag.BoolVar(&UseSeed, "seed", true, "seed peers")
flag.BoolVar(&GenAddr, "genaddr", false, "create a new priv/pub key")
+ flag.StringVar(&NatType, "nat", "", "NAT support (UPNP|PMP) (none)")
flag.StringVar(&SecretFile, "import", "", "imports the file given (hex or mnemonic formats)")
flag.StringVar(&ExportDir, "export", "", "exports the session keyring to files in the directory given")
flag.StringVar(&LogFile, "logfile", "", "log file (defaults to standard output)")
flag.StringVar(&Datadir, "datadir", defaultDataDir(), "specifies the datadir to use")
+ flag.StringVar(&PMPGateway, "pmp", "", "Gateway IP for PMP")
flag.StringVar(&ConfigFile, "conf", defaultConfigFile, "config file")
flag.StringVar(&DebugFile, "debug", "", "debug file (no debugging if not set)")
flag.IntVar(&LogLevel, "loglevel", int(logger.InfoLevel), "loglevel: 0-5: silent,error,warn,info,debug,debug detail)")
diff --git a/cmd/mist/gui.go b/cmd/mist/gui.go
index 7775889cc..3ab307174 100644
--- a/cmd/mist/gui.go
+++ b/cmd/mist/gui.go
@@ -30,14 +30,15 @@ import (
"strings"
"time"
- "github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/eth"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/ethutil"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/miner"
- "github.com/ethereum/go-ethereum/wire"
+ "github.com/ethereum/go-ethereum/p2p"
+ "github.com/ethereum/go-ethereum/ui/qt/qwhisper"
"github.com/ethereum/go-ethereum/xeth"
"gopkg.in/qml.v1"
)
@@ -87,7 +88,8 @@ type Gui struct {
eth *eth.Ethereum
// The public Ethereum library
- uiLib *UiLib
+ uiLib *UiLib
+ whisper *qwhisper.Whisper
txDb *ethdb.LDBDatabase
@@ -97,7 +99,7 @@ type Gui struct {
pipe *xeth.JSXEth
Session string
- clientIdentity *wire.SimpleClientIdentity
+ clientIdentity *p2p.SimpleClientIdentity
config *ethutil.ConfigManager
plugins map[string]plugin
@@ -107,7 +109,7 @@ type Gui struct {
}
// Create GUI, but doesn't start it
-func NewWindow(ethereum *eth.Ethereum, config *ethutil.ConfigManager, clientIdentity *wire.SimpleClientIdentity, session string, logLevel int) *Gui {
+func NewWindow(ethereum *eth.Ethereum, config *ethutil.ConfigManager, clientIdentity *p2p.SimpleClientIdentity, session string, logLevel int) *Gui {
db, err := ethdb.NewLDBDatabase("tx_database")
if err != nil {
panic(err)
@@ -138,10 +140,12 @@ func (gui *Gui) Start(assetPath string) {
gui.engine = qml.NewEngine()
context := gui.engine.Context()
gui.uiLib = NewUiLib(gui.engine, gui.eth, assetPath)
+ gui.whisper = qwhisper.New(gui.eth.Whisper())
// Expose the eth library and the ui library to QML
context.SetVar("gui", gui)
context.SetVar("eth", gui.uiLib)
+ context.SetVar("shh", gui.whisper)
// Load the main QML interface
data, _ := ethutil.Config.Db.Get([]byte("KeyRing"))
@@ -391,6 +395,8 @@ func (gui *Gui) update() {
gui.setPeerInfo()
}()
+ gui.whisper.SetView(gui.win.Root().ObjectByName("whisperView"))
+
for _, plugin := range gui.plugins {
guilogger.Infoln("Loading plugin ", plugin.Name)
@@ -409,8 +415,7 @@ func (gui *Gui) update() {
miningLabel := gui.getObjectByName("miningLabel")
events := gui.eth.EventMux().Subscribe(
- eth.ChainSyncEvent{},
- eth.PeerListEvent{},
+ //eth.PeerListEvent{},
core.NewBlockEvent{},
core.TxPreEvent{},
core.TxPostEvent{},
@@ -460,9 +465,6 @@ func (gui *Gui) update() {
gui.setWalletValue(object.Balance(), nil)
state.UpdateStateObject(object)
-
- case eth.PeerListEvent:
- gui.setPeerInfo()
}
case <-peerUpdateTicker.C:
@@ -472,16 +474,18 @@ func (gui *Gui) update() {
lastBlockLabel.Set("text", statusText)
miningLabel.Set("text", "Mining @ "+strconv.FormatInt(gui.uiLib.miner.GetPow().GetHashrate(), 10)+"Khash")
- blockLength := gui.eth.BlockPool().BlocksProcessed
- chainLength := gui.eth.BlockPool().ChainLength
+ /*
+ blockLength := gui.eth.BlockPool().BlocksProcessed
+ chainLength := gui.eth.BlockPool().ChainLength
- var (
- pct float64 = 1.0 / float64(chainLength) * float64(blockLength)
- dlWidget = gui.win.Root().ObjectByName("downloadIndicator")
- dlLabel = gui.win.Root().ObjectByName("downloadLabel")
- )
- dlWidget.Set("value", pct)
- dlLabel.Set("text", fmt.Sprintf("%d / %d", blockLength, chainLength))
+ var (
+ pct float64 = 1.0 / float64(chainLength) * float64(blockLength)
+ dlWidget = gui.win.Root().ObjectByName("downloadIndicator")
+ dlLabel = gui.win.Root().ObjectByName("downloadLabel")
+ )
+ dlWidget.Set("value", pct)
+ dlLabel.Set("text", fmt.Sprintf("%d / %d", blockLength, chainLength))
+ */
case <-statsUpdateTicker.C:
gui.setStatsPane()
@@ -509,7 +513,7 @@ Heap Alloc: %d
CGNext: %x
NumGC: %d
`, Version, runtime.Version(),
- eth.ProtocolVersion, eth.P2PVersion,
+ eth.ProtocolVersion, 2,
runtime.NumCPU, runtime.NumGoroutine(), runtime.NumCgoCall(),
memStats.Alloc, memStats.HeapAlloc,
memStats.NextGC, memStats.NumGC,
diff --git a/cmd/mist/main.go b/cmd/mist/main.go
index 14336b4e8..3ea6e8e91 100644
--- a/cmd/mist/main.go
+++ b/cmd/mist/main.go
@@ -23,8 +23,8 @@ import (
"runtime"
"time"
- "github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/cmd/utils"
+ "github.com/ethereum/go-ethereum/eth"
"github.com/ethereum/go-ethereum/logger"
"gopkg.in/qml.v1"
)
@@ -58,8 +58,8 @@ func run() error {
// create, import, export keys
utils.KeyTasks(keyManager, KeyRing, GenAddr, SecretFile, ExportDir, NonInteractive)
- clientIdentity := utils.NewClientIdentity(ClientIdentifier, Version, Identifier)
- ethereum = utils.NewEthereum(db, clientIdentity, keyManager, UseUPnP, OutboundPort, MaxPeer)
+ clientIdentity := utils.NewClientIdentity(ClientIdentifier, Version, Identifier, string(keyManager.PublicKey()))
+ ethereum := utils.NewEthereum(db, clientIdentity, keyManager, utils.NatType(NatType, PMPGateway), OutboundPort, MaxPeer)
if ShowGenesis {
utils.ShowGenesis(ethereum)
@@ -69,6 +69,10 @@ func run() error {
utils.StartRpc(ethereum, RpcPort)
}
+ if StartWebSockets {
+ utils.StartWebSockets(ethereum)
+ }
+
gui := NewWindow(ethereum, config, clientIdentity, KeyRing, LogLevel)
gui.stdLog = stdLog
@@ -100,16 +104,10 @@ func main() {
utils.HandleInterrupt()
- if StartWebSockets {
- utils.StartWebSockets(ethereum)
- }
-
// we need to run the interrupt callbacks in case gui is closed
// this skips if we got here by actual interrupt stopping the GUI
if !interrupted {
utils.RunInterruptCallbacks(os.Interrupt)
}
- // this blocks the thread
- ethereum.WaitForShutdown()
logger.Flush()
}
diff --git a/cmd/mist/ui_lib.go b/cmd/mist/ui_lib.go
index fdbde50fd..68f333563 100644
--- a/cmd/mist/ui_lib.go
+++ b/cmd/mist/ui_lib.go
@@ -24,11 +24,12 @@ import (
"strconv"
"strings"
- "github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
+ "github.com/ethereum/go-ethereum/eth"
"github.com/ethereum/go-ethereum/ethutil"
+ "github.com/ethereum/go-ethereum/event/filter"
"github.com/ethereum/go-ethereum/javascript"
"github.com/ethereum/go-ethereum/miner"
"github.com/ethereum/go-ethereum/state"
@@ -57,6 +58,7 @@ type UiLib struct {
jsEngine *javascript.JSRE
filterCallbacks map[int][]int
+ filterManager *filter.FilterManager
miner *miner.Miner
}
@@ -64,6 +66,7 @@ type UiLib struct {
func NewUiLib(engine *qml.Engine, eth *eth.Ethereum, assetPath string) *UiLib {
lib := &UiLib{JSXEth: xeth.NewJSXEth(eth), engine: engine, eth: eth, assetPath: assetPath, jsEngine: javascript.NewJSRE(eth), filterCallbacks: make(map[int][]int)} //, filters: make(map[int]*xeth.JSFilter)}
lib.miner = miner.New(eth.KeyManager().Address(), eth)
+ lib.filterManager = filter.NewFilterManager(eth.EventMux())
return lib
}
@@ -123,7 +126,8 @@ func (self *UiLib) LookupAddress(name string) string {
}
func (self *UiLib) PastPeers() *ethutil.List {
- return ethutil.NewList(eth.PastPeers())
+ return ethutil.NewList([]string{})
+ //return ethutil.NewList(eth.PastPeers())
}
func (self *UiLib) ImportTx(rlpTx string) {
@@ -191,7 +195,7 @@ func (ui *UiLib) Connect(button qml.Object) {
}
func (ui *UiLib) ConnectToPeer(addr string) {
- ui.eth.ConnectToPeer(addr)
+ ui.eth.SuggestPeer(addr)
}
func (ui *UiLib) AssetPath(p string) string {
@@ -226,7 +230,7 @@ func (self *UiLib) NewFilter(object map[string]interface{}) (id int) {
filter.MessageCallback = func(messages state.Messages) {
self.win.Root().Call("invokeFilterCallback", xeth.ToJSMessages(messages), id)
}
- id = self.eth.InstallFilter(filter)
+ id = self.filterManager.InstallFilter(filter)
return id
}
@@ -239,12 +243,12 @@ func (self *UiLib) NewFilterString(typ string) (id int) {
fmt.Println("QML is lagging")
}
}
- id = self.eth.InstallFilter(filter)
+ id = self.filterManager.InstallFilter(filter)
return id
}
func (self *UiLib) Messages(id int) *ethutil.List {
- filter := self.eth.GetFilter(id)
+ filter := self.filterManager.GetFilter(id)
if filter != nil {
messages := xeth.ToJSMessages(filter.Find())
@@ -255,7 +259,7 @@ func (self *UiLib) Messages(id int) *ethutil.List {
}
func (self *UiLib) UninstallFilter(id int) {
- self.eth.UninstallFilter(id)
+ self.filterManager.UninstallFilter(id)
}
func mapToTxParams(object map[string]interface{}) map[string]string {
@@ -372,3 +376,16 @@ func (self *UiLib) ToggleMining() bool {
return false
}
}
+
+func (self *UiLib) ToHex(data string) string {
+ return "0x" + ethutil.Bytes2Hex([]byte(data))
+}
+
+/*
+// XXX Refactor me & MOVE
+func (self *Ethereum) InstallFilter(filter *core.Filter) (id int) {
+ return self.filterManager.InstallFilter(filter)
+}
+func (self *Ethereum) UninstallFilter(id int) { self.filterManager.UninstallFilter(id) }
+func (self *Ethereum) GetFilter(id int) *core.Filter { return self.filterManager.GetFilter(id) }
+*/
diff --git a/cmd/utils/cmd.go b/cmd/utils/cmd.go
index db7bcd35e..3e3ac617a 100644
--- a/cmd/utils/cmd.go
+++ b/cmd/utils/cmd.go
@@ -4,23 +4,23 @@ import (
"fmt"
"io"
"log"
+ "net"
"os"
"os/signal"
"path"
"path/filepath"
"regexp"
"runtime"
- "time"
"bitbucket.org/kardianos/osext"
- "github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/crypto"
+ "github.com/ethereum/go-ethereum/eth"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/ethutil"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/miner"
+ "github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rpc"
- "github.com/ethereum/go-ethereum/wire"
"github.com/ethereum/go-ethereum/xeth"
)
@@ -144,17 +144,32 @@ func NewDatabase() ethutil.Database {
return db
}
-func NewClientIdentity(clientIdentifier, version, customIdentifier string) *wire.SimpleClientIdentity {
- return wire.NewSimpleClientIdentity(clientIdentifier, version, customIdentifier)
+func NewClientIdentity(clientIdentifier, version, customIdentifier string, pubkey string) *p2p.SimpleClientIdentity {
+ return p2p.NewSimpleClientIdentity(clientIdentifier, version, customIdentifier, pubkey)
}
-func NewEthereum(db ethutil.Database, clientIdentity wire.ClientIdentity, keyManager *crypto.KeyManager, usePnp bool, OutboundPort string, MaxPeer int) *eth.Ethereum {
- ethereum, err := eth.New(db, clientIdentity, keyManager, eth.CapDefault, usePnp)
+func NatType(natType string, gateway string) (nat p2p.NAT) {
+ switch natType {
+ case "UPNP":
+ nat = p2p.UPNP()
+ case "PMP":
+ ip := net.ParseIP(gateway)
+ if ip == nil {
+ clilogger.Fatalf("cannot resolve PMP gateway IP %s", gateway)
+ }
+ nat = p2p.PMP(ip)
+ case "":
+ default:
+ clilogger.Fatalf("unrecognised NAT type '%s'", natType)
+ }
+ return
+}
+
+func NewEthereum(db ethutil.Database, clientIdentity p2p.ClientIdentity, keyManager *crypto.KeyManager, nat p2p.NAT, OutboundPort string, MaxPeer int) *eth.Ethereum {
+ ethereum, err := eth.New(db, clientIdentity, keyManager, nat, OutboundPort, MaxPeer)
if err != nil {
clilogger.Fatalln("eth start err:", err)
}
- ethereum.Port = OutboundPort
- ethereum.MaxPeers = MaxPeer
return ethereum
}
@@ -268,11 +283,6 @@ func StartMining(ethereum *eth.Ethereum) bool {
if gminer == nil {
gminer = miner.New(addr, ethereum)
}
- // Give it some time to connect with peers
- time.Sleep(3 * time.Second)
- for !ethereum.IsUpToDate() {
- time.Sleep(5 * time.Second)
- }
gminer.Start()
}()
RegisterInterrupt(func(os.Signal) {
diff --git a/cmd/utils/websockets.go b/cmd/utils/websockets.go
index d3ba50e78..29f9b8aeb 100644
--- a/cmd/utils/websockets.go
+++ b/cmd/utils/websockets.go
@@ -1,7 +1,7 @@
package utils
import (
- "github.com/ethereum/go-ethereum"
+ "github.com/ethereum/go-ethereum/eth"
"github.com/ethereum/go-ethereum/ethutil"
"github.com/ethereum/go-ethereum/websocket"
"github.com/ethereum/go-ethereum/xeth"
diff --git a/core/block_manager.go b/core/block_manager.go
index 8d319f84e..85b4891a5 100644
--- a/core/block_manager.go
+++ b/core/block_manager.go
@@ -2,7 +2,6 @@ package core
import (
"bytes"
- "container/list"
"errors"
"fmt"
"math/big"
@@ -14,10 +13,10 @@ import (
"github.com/ethereum/go-ethereum/ethutil"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/logger"
+ "github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/pow"
"github.com/ethereum/go-ethereum/pow/ezp"
"github.com/ethereum/go-ethereum/state"
- "github.com/ethereum/go-ethereum/wire"
)
var statelogger = logger.NewLogger("BLOCK")
@@ -38,13 +37,12 @@ type EthManager interface {
BlockManager() *BlockManager
ChainManager() *ChainManager
TxPool() *TxPool
- Broadcast(msgType wire.MsgType, data []interface{})
PeerCount() int
IsMining() bool
IsListening() bool
- Peers() *list.List
+ Peers() []*p2p.Peer
KeyManager() *crypto.KeyManager
- ClientIdentity() wire.ClientIdentity
+ ClientIdentity() p2p.ClientIdentity
Db() ethutil.Database
EventMux() *event.TypeMux
}
diff --git a/core/chain_manager.go b/core/chain_manager.go
index 794ae0011..e6268c01e 100644
--- a/core/chain_manager.go
+++ b/core/chain_manager.go
@@ -100,6 +100,13 @@ func NewChainManager(mux *event.TypeMux) *ChainManager {
return bc
}
+func (self *ChainManager) Status() (td *big.Int, currentBlock []byte, genesisBlock []byte) {
+ self.mu.RLock()
+ defer self.mu.RUnlock()
+
+ return self.td, self.currentBlock.Hash(), self.Genesis().Hash()
+}
+
func (self *ChainManager) SetProcessor(proc types.BlockProcessor) {
self.processor = proc
}
@@ -221,7 +228,7 @@ func (bc *ChainManager) HasBlock(hash []byte) bool {
return len(data) != 0
}
-func (self *ChainManager) GetChainHashesFromHash(hash []byte, max uint64) (chain [][]byte) {
+func (self *ChainManager) GetBlockHashesFromHash(hash []byte, max uint64) (chain [][]byte) {
block := self.GetBlock(hash)
if block == nil {
return
diff --git a/core/events.go b/core/events.go
index deeba3e98..fe106da49 100644
--- a/core/events.go
+++ b/core/events.go
@@ -10,3 +10,6 @@ type TxPostEvent struct{ Tx *types.Transaction }
// NewBlockEvent is posted when a block has been imported.
type NewBlockEvent struct{ Block *types.Block }
+
+// NewMinedBlockEvent is posted when a block has been imported.
+type NewMinedBlockEvent struct{ Block *types.Block }
diff --git a/core/transaction_pool.go b/core/transaction_pool.go
index 58c2255a4..17fcdb86a 100644
--- a/core/transaction_pool.go
+++ b/core/transaction_pool.go
@@ -11,7 +11,6 @@ import (
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/state"
- "github.com/ethereum/go-ethereum/wire"
)
var txplogger = logger.NewLogger("TXP")
@@ -19,7 +18,9 @@ var txplogger = logger.NewLogger("TXP")
const txPoolQueueSize = 50
type TxPoolHook chan *types.Transaction
-type TxMsgTy byte
+type TxMsg struct {
+ Tx *types.Transaction
+}
const (
minGasPrice = 1000000
@@ -27,11 +28,6 @@ const (
var MinGasPrice = big.NewInt(10000000000000)
-type TxMsg struct {
- Tx *types.Transaction
- Type TxMsgTy
-}
-
func EachTx(pool *list.List, it func(*types.Transaction, *list.Element) bool) {
for e := pool.Front(); e != nil; e = e.Next() {
if it(e.Value.(*types.Transaction), e) {
@@ -76,19 +72,17 @@ type TxPool struct {
subscribers []chan TxMsg
- broadcaster types.Broadcaster
chainManager *ChainManager
eventMux *event.TypeMux
}
-func NewTxPool(chainManager *ChainManager, broadcaster types.Broadcaster, eventMux *event.TypeMux) *TxPool {
+func NewTxPool(chainManager *ChainManager, eventMux *event.TypeMux) *TxPool {
return &TxPool{
pool: list.New(),
queueChan: make(chan *types.Transaction, txPoolQueueSize),
quit: make(chan bool),
chainManager: chainManager,
eventMux: eventMux,
- broadcaster: broadcaster,
}
}
@@ -100,7 +94,7 @@ func (pool *TxPool) addTransaction(tx *types.Transaction) {
pool.pool.PushBack(tx)
// Broadcast the transaction to the rest of the peers
- pool.broadcaster.Broadcast(wire.MsgTxTy, []interface{}{tx.RlpData()})
+ pool.eventMux.Post(TxPreEvent{tx})
}
func (pool *TxPool) ValidateTransaction(tx *types.Transaction) error {
@@ -166,7 +160,17 @@ func (self *TxPool) Size() int {
return self.pool.Len()
}
-func (pool *TxPool) CurrentTransactions() []*types.Transaction {
+func (self *TxPool) AddTransactions(txs []*types.Transaction) {
+ for _, tx := range txs {
+ if err := self.Add(tx); err != nil {
+ txplogger.Infoln(err)
+ } else {
+ txplogger.Infof("tx %x\n", tx.Hash()[0:4])
+ }
+ }
+}
+
+func (pool *TxPool) GetTransactions() []*types.Transaction {
pool.mutex.Lock()
defer pool.mutex.Unlock()
@@ -213,7 +217,7 @@ func (self *TxPool) RemoveSet(txs types.Transactions) {
}
func (pool *TxPool) Flush() []*types.Transaction {
- txList := pool.CurrentTransactions()
+ txList := pool.GetTransactions()
// Recreate a new list all together
// XXX Is this the fastest way?
diff --git a/core/types/common.go b/core/types/common.go
index 89cb5f498..ba88b77e1 100644
--- a/core/types/common.go
+++ b/core/types/common.go
@@ -4,13 +4,8 @@ import (
"math/big"
"github.com/ethereum/go-ethereum/state"
- "github.com/ethereum/go-ethereum/wire"
)
type BlockProcessor interface {
Process(*Block) (*big.Int, state.Messages, error)
}
-
-type Broadcaster interface {
- Broadcast(wire.MsgType, []interface{})
-}
diff --git a/eth/backend.go b/eth/backend.go
new file mode 100644
index 000000000..0aad6a514
--- /dev/null
+++ b/eth/backend.go
@@ -0,0 +1,249 @@
+package eth
+
+import (
+ "net"
+ "sync"
+
+ "github.com/ethereum/go-ethereum/core"
+ "github.com/ethereum/go-ethereum/crypto"
+ "github.com/ethereum/go-ethereum/ethutil"
+ "github.com/ethereum/go-ethereum/event"
+ ethlogger "github.com/ethereum/go-ethereum/logger"
+ "github.com/ethereum/go-ethereum/p2p"
+ "github.com/ethereum/go-ethereum/pow/ezp"
+ "github.com/ethereum/go-ethereum/rpc"
+ "github.com/ethereum/go-ethereum/whisper"
+)
+
+const (
+ seedNodeAddress = "poc-7.ethdev.com:30300"
+)
+
+var logger = ethlogger.NewLogger("SERV")
+
+type Ethereum struct {
+ // Channel for shutting down the ethereum
+ shutdownChan chan bool
+ quit chan bool
+
+ // DB interface
+ db ethutil.Database
+ blacklist p2p.Blacklist
+
+ //*** SERVICES ***
+ // State manager for processing new blocks and managing the over all states
+ blockManager *core.BlockManager
+ txPool *core.TxPool
+ chainManager *core.ChainManager
+ blockPool *BlockPool
+ whisper *whisper.Whisper
+
+ server *p2p.Server
+ eventMux *event.TypeMux
+ txSub event.Subscription
+ blockSub event.Subscription
+
+ RpcServer *rpc.JsonRpcServer
+ keyManager *crypto.KeyManager
+
+ clientIdentity p2p.ClientIdentity
+
+ synclock sync.Mutex
+ syncGroup sync.WaitGroup
+
+ Mining bool
+}
+
+func New(db ethutil.Database, identity p2p.ClientIdentity, keyManager *crypto.KeyManager, nat p2p.NAT, port string, maxPeers int) (*Ethereum, error) {
+
+ saveProtocolVersion(db)
+ ethutil.Config.Db = db
+
+ eth := &Ethereum{
+ shutdownChan: make(chan bool),
+ quit: make(chan bool),
+ db: db,
+ keyManager: keyManager,
+ clientIdentity: identity,
+ blacklist: p2p.NewBlacklist(),
+ eventMux: &event.TypeMux{},
+ }
+
+ eth.chainManager = core.NewChainManager(eth.EventMux())
+ eth.txPool = core.NewTxPool(eth.chainManager, eth.EventMux())
+ eth.blockManager = core.NewBlockManager(eth.txPool, eth.chainManager, eth.EventMux())
+ eth.chainManager.SetProcessor(eth.blockManager)
+ eth.whisper = whisper.New()
+
+ hasBlock := eth.chainManager.HasBlock
+ insertChain := eth.chainManager.InsertChain
+ eth.blockPool = NewBlockPool(hasBlock, insertChain, ezp.Verify)
+
+ // Start services
+ eth.txPool.Start()
+
+ ethProto := EthProtocol(eth.txPool, eth.chainManager, eth.blockPool)
+ protocols := []p2p.Protocol{ethProto, eth.whisper.Protocol()}
+
+ server := &p2p.Server{
+ Identity: identity,
+ MaxPeers: maxPeers,
+ Protocols: protocols,
+ ListenAddr: ":" + port,
+ Blacklist: eth.blacklist,
+ NAT: nat,
+ }
+
+ eth.server = server
+
+ return eth, nil
+}
+
+func (s *Ethereum) KeyManager() *crypto.KeyManager {
+ return s.keyManager
+}
+
+func (s *Ethereum) ClientIdentity() p2p.ClientIdentity {
+ return s.clientIdentity
+}
+
+func (s *Ethereum) ChainManager() *core.ChainManager {
+ return s.chainManager
+}
+
+func (s *Ethereum) BlockManager() *core.BlockManager {
+ return s.blockManager
+}
+
+func (s *Ethereum) TxPool() *core.TxPool {
+ return s.txPool
+}
+
+func (s *Ethereum) BlockPool() *BlockPool {
+ return s.blockPool
+}
+
+func (s *Ethereum) Whisper() *whisper.Whisper {
+ return s.whisper
+}
+
+func (s *Ethereum) EventMux() *event.TypeMux {
+ return s.eventMux
+}
+func (self *Ethereum) Db() ethutil.Database {
+ return self.db
+}
+
+func (s *Ethereum) IsMining() bool {
+ return s.Mining
+}
+
+func (s *Ethereum) IsListening() bool {
+ // XXX TODO
+ return false
+}
+
+func (s *Ethereum) PeerCount() int {
+ return s.server.PeerCount()
+}
+
+func (s *Ethereum) Peers() []*p2p.Peer {
+ return s.server.Peers()
+}
+
+func (s *Ethereum) MaxPeers() int {
+ return s.server.MaxPeers
+}
+
+// Start the ethereum
+func (s *Ethereum) Start(seed bool) error {
+ err := s.server.Start()
+ if err != nil {
+ return err
+ }
+ s.blockPool.Start()
+ s.whisper.Start()
+
+ // broadcast transactions
+ s.txSub = s.eventMux.Subscribe(core.TxPreEvent{})
+ go s.txBroadcastLoop()
+
+ // broadcast mined blocks
+ s.blockSub = s.eventMux.Subscribe(core.NewMinedBlockEvent{})
+ go s.blockBroadcastLoop()
+
+ // TODO: read peers here
+ if seed {
+ logger.Infof("Connect to seed node %v", seedNodeAddress)
+ if err := s.SuggestPeer(seedNodeAddress); err != nil {
+ return err
+ }
+ }
+
+ logger.Infoln("Server started")
+ return nil
+}
+
+func (self *Ethereum) SuggestPeer(addr string) error {
+ netaddr, err := net.ResolveTCPAddr("tcp", addr)
+ if err != nil {
+ logger.Errorf("couldn't resolve %s:", addr, err)
+ return err
+ }
+
+ self.server.SuggestPeer(netaddr.IP, netaddr.Port, nil)
+ return nil
+}
+
+func (s *Ethereum) Stop() {
+ // Close the database
+ defer s.db.Close()
+
+ close(s.quit)
+
+ s.txSub.Unsubscribe() // quits txBroadcastLoop
+ s.blockSub.Unsubscribe() // quits blockBroadcastLoop
+
+ if s.RpcServer != nil {
+ s.RpcServer.Stop()
+ }
+ s.txPool.Stop()
+ s.eventMux.Stop()
+ s.blockPool.Stop()
+ s.whisper.Stop()
+
+ logger.Infoln("Server stopped")
+ close(s.shutdownChan)
+}
+
+// This function will wait for a shutdown and resumes main thread execution
+func (s *Ethereum) WaitForShutdown() {
+ <-s.shutdownChan
+}
+
+// now tx broadcasting is taken out of txPool
+// handled here via subscription, efficiency?
+func (self *Ethereum) txBroadcastLoop() {
+ // automatically stops if unsubscribe
+ for obj := range self.txSub.Chan() {
+ event := obj.(core.TxPreEvent)
+ self.server.Broadcast("eth", TxMsg, []interface{}{event.Tx.RlpData()})
+ }
+}
+
+func (self *Ethereum) blockBroadcastLoop() {
+ // automatically stops if unsubscribe
+ for obj := range self.txSub.Chan() {
+ event := obj.(core.NewMinedBlockEvent)
+ self.server.Broadcast("eth", NewBlockMsg, event.Block.Value().Val)
+ }
+}
+
+func saveProtocolVersion(db ethutil.Database) {
+ d, _ := db.Get([]byte("ProtocolVersion"))
+ protocolVersion := ethutil.NewValue(d).Uint()
+
+ if protocolVersion == 0 {
+ db.Put([]byte("ProtocolVersion"), ethutil.NewValue(ProtocolVersion).Bytes())
+ }
+}
diff --git a/eth/block_pool.go b/eth/block_pool.go
new file mode 100644
index 000000000..7cfbc63f8
--- /dev/null
+++ b/eth/block_pool.go
@@ -0,0 +1,1015 @@
+package eth
+
+import (
+ "math"
+ "math/big"
+ "math/rand"
+ "sort"
+ "sync"
+ "time"
+
+ "github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/ethutil"
+ ethlogger "github.com/ethereum/go-ethereum/logger"
+ "github.com/ethereum/go-ethereum/pow"
+)
+
+var poolLogger = ethlogger.NewLogger("Blockpool")
+
+const (
+ blockHashesBatchSize = 256
+ blockBatchSize = 64
+ blocksRequestInterval = 10 // seconds
+ blocksRequestRepetition = 1
+ blockHashesRequestInterval = 10 // seconds
+ blocksRequestMaxIdleRounds = 10
+ cacheTimeout = 3 // minutes
+ blockTimeout = 5 // minutes
+)
+
+type poolNode struct {
+ lock sync.RWMutex
+ hash []byte
+ block *types.Block
+ child *poolNode
+ parent *poolNode
+ section *section
+ knownParent bool
+ peer string
+ source string
+ complete bool
+}
+
+type BlockPool struct {
+ lock sync.RWMutex
+ pool map[string]*poolNode
+
+ peersLock sync.RWMutex
+ peers map[string]*peerInfo
+ peer *peerInfo
+
+ quit chan bool
+ wg sync.WaitGroup
+ running bool
+
+ // the minimal interface with blockchain
+ hasBlock func(hash []byte) bool
+ insertChain func(types.Blocks) error
+ verifyPoW func(pow.Block) bool
+}
+
+type peerInfo struct {
+ lock sync.RWMutex
+
+ td *big.Int
+ currentBlock []byte
+ id string
+
+ requestBlockHashes func([]byte) error
+ requestBlocks func([][]byte) error
+ peerError func(int, string, ...interface{})
+
+ sections map[string]*section
+ roots []*poolNode
+ quitC chan bool
+}
+
+func NewBlockPool(hasBlock func(hash []byte) bool, insertChain func(types.Blocks) error, verifyPoW func(pow.Block) bool,
+) *BlockPool {
+ return &BlockPool{
+ hasBlock: hasBlock,
+ insertChain: insertChain,
+ verifyPoW: verifyPoW,
+ }
+}
+
+// allows restart
+func (self *BlockPool) Start() {
+ self.lock.Lock()
+ if self.running {
+ self.lock.Unlock()
+ return
+ }
+ self.running = true
+ self.quit = make(chan bool)
+ self.pool = make(map[string]*poolNode)
+ self.lock.Unlock()
+
+ self.peersLock.Lock()
+ self.peers = make(map[string]*peerInfo)
+ self.peersLock.Unlock()
+
+ poolLogger.Infoln("Started")
+
+}
+
+func (self *BlockPool) Stop() {
+ self.lock.Lock()
+ if !self.running {
+ self.lock.Unlock()
+ return
+ }
+ self.running = false
+ self.lock.Unlock()
+
+ poolLogger.Infoln("Stopping")
+
+ close(self.quit)
+ self.lock.Lock()
+ self.peersLock.Lock()
+ self.peers = nil
+ self.pool = nil
+ self.peer = nil
+ self.wg.Wait()
+ self.lock.Unlock()
+ self.peersLock.Unlock()
+ poolLogger.Infoln("Stopped")
+
+}
+
+// AddPeer is called by the eth protocol instance running on the peer after
+// the status message has been received with total difficulty and current block hash
+// AddPeer can only be used once, RemovePeer needs to be called when the peer disconnects
+func (self *BlockPool) AddPeer(td *big.Int, currentBlock []byte, peerId string, requestBlockHashes func([]byte) error, requestBlocks func([][]byte) error, peerError func(int, string, ...interface{})) bool {
+ self.peersLock.Lock()
+ defer self.peersLock.Unlock()
+ if self.peers[peerId] != nil {
+ panic("peer already added")
+ }
+ peer := &peerInfo{
+ td: td,
+ currentBlock: currentBlock,
+ id: peerId, //peer.Identity().Pubkey()
+ requestBlockHashes: requestBlockHashes,
+ requestBlocks: requestBlocks,
+ peerError: peerError,
+ }
+ self.peers[peerId] = peer
+ poolLogger.Debugf("add new peer %v with td %v", peerId, td)
+ currentTD := ethutil.Big0
+ if self.peer != nil {
+ currentTD = self.peer.td
+ }
+ if td.Cmp(currentTD) > 0 {
+ self.peer.stop(peer)
+ peer.start(self.peer)
+ poolLogger.Debugf("peer %v promoted to best peer", peerId)
+ self.peer = peer
+ return true
+ }
+ return false
+}
+
+// RemovePeer is called by the eth protocol when the peer disconnects
+func (self *BlockPool) RemovePeer(peerId string) {
+ self.peersLock.Lock()
+ defer self.peersLock.Unlock()
+ peer := self.peers[peerId]
+ if peer == nil {
+ return
+ }
+ self.peers[peerId] = nil
+ poolLogger.Debugf("remove peer %v", peerId[0:4])
+
+ // if current best peer is removed, need find a better one
+ if self.peer != nil && peerId == self.peer.id {
+ var newPeer *peerInfo
+ max := ethutil.Big0
+ // peer with the highest self-acclaimed TD is chosen
+ for _, info := range self.peers {
+ if info.td.Cmp(max) > 0 {
+ max = info.td
+ newPeer = info
+ }
+ }
+ self.peer.stop(peer)
+ peer.start(self.peer)
+ if newPeer != nil {
+ poolLogger.Debugf("peer %v with td %v promoted to best peer", newPeer.id[0:4], newPeer.td)
+ } else {
+ poolLogger.Warnln("no peers left")
+ }
+ }
+}
+
+// Entry point for eth protocol to add block hashes received via BlockHashesMsg
+// only hashes from the best peer is handled
+// this method is always responsible to initiate further hash requests until
+// a known parent is reached unless cancelled by a peerChange event
+// this process also launches all request processes on each chain section
+// this function needs to run asynchronously for one peer since the message is discarded???
+func (self *BlockPool) AddBlockHashes(next func() ([]byte, bool), peerId string) {
+
+ // check if this peer is the best
+ peer, best := self.getPeer(peerId)
+ if !best {
+ return
+ }
+ // peer is still the best
+
+ var child *poolNode
+ var depth int
+
+ // iterate using next (rlp stream lazy decoder) feeding hashesC
+ self.wg.Add(1)
+ go func() {
+ for {
+ select {
+ case <-self.quit:
+ return
+ case <-peer.quitC:
+ // if the peer is demoted, no more hashes taken
+ break
+ default:
+ hash, ok := next()
+ if !ok {
+ // message consumed chain skeleton built
+ break
+ }
+ // check if known block connecting the downloaded chain to our blockchain
+ if self.hasBlock(hash) {
+ poolLogger.Infof("known block (%x...)\n", hash[0:4])
+ if child != nil {
+ child.Lock()
+ // mark child as absolute pool root with parent known to blockchain
+ child.knownParent = true
+ child.Unlock()
+ }
+ break
+ }
+ //
+ var parent *poolNode
+ // look up node in pool
+ parent = self.get(hash)
+ if parent != nil {
+ // reached a known chain in the pool
+ // request blocks on the newly added part of the chain
+ if child != nil {
+ self.link(parent, child)
+
+ // activate the current chain
+ self.activateChain(parent, peer, true)
+ poolLogger.Debugf("potential chain of %v blocks added, reached blockpool, activate chain", depth)
+ break
+ }
+ // if this is the first hash, we expect to find it
+ parent.RLock()
+ grandParent := parent.parent
+ parent.RUnlock()
+ if grandParent != nil {
+ // activate the current chain
+ self.activateChain(parent, peer, true)
+ poolLogger.Debugf("block hash found, activate chain")
+ break
+ }
+ // the first node is the root of a chain in the pool, rejoice and continue
+ }
+ // if node does not exist, create it and index in the pool
+ section := &section{}
+ if child == nil {
+ section.top = parent
+ }
+ parent = &poolNode{
+ hash: hash,
+ child: child,
+ section: section,
+ peer: peerId,
+ }
+ self.set(hash, parent)
+ poolLogger.Debugf("create potential block for %x...", hash[0:4])
+
+ depth++
+ child = parent
+ }
+ }
+ if child != nil {
+ poolLogger.Debugf("chain of %v hashes added", depth)
+ // start a processSection on the last node, but switch off asking
+ // hashes and blocks until next peer confirms this chain
+ section := self.processSection(child)
+ peer.addSection(child.hash, section)
+ section.start()
+ }
+ }()
+}
+
+// AddBlock is the entry point for the eth protocol when blockmsg is received upon requests
+// It has a strict interpretation of the protocol in that if the block received has not been requested, it results in an error (which can be ignored)
+// block is checked for PoW
+// only the first PoW-valid block for a hash is considered legit
+func (self *BlockPool) AddBlock(block *types.Block, peerId string) {
+ hash := block.Hash()
+ node := self.get(hash)
+ node.RLock()
+ b := node.block
+ node.RUnlock()
+ if b != nil {
+ return
+ }
+ if node == nil && !self.hasBlock(hash) {
+ self.peerError(peerId, ErrUnrequestedBlock, "%x", hash)
+ return
+ }
+ // validate block for PoW
+ if !self.verifyPoW(block) {
+ self.peerError(peerId, ErrInvalidPoW, "%x", hash)
+ }
+ node.Lock()
+ node.block = block
+ node.source = peerId
+ node.Unlock()
+}
+
+// iterates down a known poolchain and activates fetching processes
+// on each chain section for the peer
+// stops if the peer is demoted
+// registers last section root as root for the peer (in case peer is promoted a second time, to remember)
+func (self *BlockPool) activateChain(node *poolNode, peer *peerInfo, on bool) {
+ self.wg.Add(1)
+ go func() {
+ for {
+ node.sectionRLock()
+ bottom := node.section.bottom
+ if bottom == nil { // the chain section is being created or killed
+ break
+ }
+ // register this section with the peer
+ if peer != nil {
+ peer.addSection(bottom.hash, bottom.section)
+ if on {
+ bottom.section.start()
+ } else {
+ bottom.section.start()
+ }
+ }
+ if bottom.parent == nil {
+ node = bottom
+ break
+ }
+ // if peer demoted stop activation
+ select {
+ case <-peer.quitC:
+ break
+ default:
+ }
+
+ node = bottom.parent
+ bottom.sectionRUnlock()
+ }
+ // remember root for this peer
+ peer.addRoot(node)
+ self.wg.Done()
+ }()
+}
+
+// main worker thread on each section in the poolchain
+// - kills the section if there are blocks missing after an absolute time
+// - kills the section if there are maxIdleRounds of idle rounds of block requests with no response
+// - periodically polls the chain section for missing blocks which are then requested from peers
+// - registers the process controller on the peer so that if the peer is promoted as best peer the second time (after a disconnect of a better one), all active processes are switched back on unless they expire and killed ()
+// - when turned off (if peer disconnects and new peer connects with alternative chain), no blockrequests are made but absolute expiry timer is ticking
+// - when turned back on it recursively calls itself on the root of the next chain section
+// - when exits, signals to
+func (self *BlockPool) processSection(node *poolNode) *section {
+ // absolute time after which sub-chain is killed if not complete (some blocks are missing)
+ suicideTimer := time.After(blockTimeout * time.Minute)
+ var blocksRequestTimer, blockHashesRequestTimer <-chan time.Time
+ var nodeC, missingC, processC chan *poolNode
+ controlC := make(chan bool)
+ resetC := make(chan bool)
+ var hashes [][]byte
+ var i, total, missing, lastMissing, depth int
+ var blockHashesRequests, blocksRequests int
+ var idle int
+ var init, alarm, done, same, running, once bool
+ orignode := node
+ hash := node.hash
+
+ node.sectionLock()
+ defer node.sectionUnlock()
+ section := &section{controlC: controlC, resetC: resetC}
+ node.section = section
+
+ go func() {
+ self.wg.Add(1)
+ for {
+ node.sectionRLock()
+ controlC = node.section.controlC
+ node.sectionRUnlock()
+
+ if init {
+ // missing blocks read from nodeC
+ // initialized section
+ if depth == 0 {
+ break
+ }
+ // enable select case to read missing block when ready
+ processC = missingC
+ missingC = make(chan *poolNode, lastMissing)
+ nodeC = nil
+ // only do once
+ init = false
+ } else {
+ if !once {
+ missingC = nil
+ processC = nil
+ i = 0
+ total = 0
+ lastMissing = 0
+ }
+ }
+
+ // went through all blocks in section
+ if i != 0 && i == lastMissing {
+ if len(hashes) > 0 {
+ // send block requests to peers
+ self.requestBlocks(blocksRequests, hashes)
+ }
+ blocksRequests++
+ poolLogger.Debugf("[%x] block request attempt %v: missing %v/%v/%v", hash[0:4], blocksRequests, missing, total, depth)
+ if missing == lastMissing {
+ // idle round
+ if same {
+ // more than once
+ idle++
+ // too many idle rounds
+ if idle > blocksRequestMaxIdleRounds {
+ poolLogger.Debugf("[%x] block requests had %v idle rounds (%v total attempts): missing %v/%v/%v\ngiving up...", hash[0:4], idle, blocksRequests, missing, total, depth)
+ self.killChain(node, nil)
+ break
+ }
+ } else {
+ idle = 0
+ }
+ same = true
+ } else {
+ if missing == 0 {
+ // no missing nodes
+ poolLogger.Debugf("block request process complete on section %x... (%v total blocksRequests): missing %v/%v/%v", hash[0:4], blockHashesRequests, blocksRequests, missing, total, depth)
+ node.Lock()
+ orignode.complete = true
+ node.Unlock()
+ blocksRequestTimer = nil
+ if blockHashesRequestTimer == nil {
+ // not waiting for hashes any more
+ poolLogger.Debugf("hash request on root %x... successful (%v total attempts)\nquitting...", hash[0:4], blockHashesRequests)
+ break
+ } // otherwise suicide if no hashes coming
+ }
+ same = false
+ }
+ lastMissing = missing
+ i = 0
+ missing = 0
+ // ready for next round
+ done = true
+ }
+ if done && alarm {
+ poolLogger.Debugf("start checking if new blocks arrived (attempt %v): missing %v/%v/%v", blocksRequests, missing, total, depth)
+ blocksRequestTimer = time.After(blocksRequestInterval * time.Second)
+ alarm = false
+ done = false
+ // processC supposed to be empty and never closed so just swap, no need to allocate
+ tempC := processC
+ processC = missingC
+ missingC = tempC
+ }
+ select {
+ case <-self.quit:
+ break
+ case <-suicideTimer:
+ self.killChain(node, nil)
+ poolLogger.Warnf("[%x] timeout. (%v total attempts): missing %v/%v/%v", hash[0:4], blocksRequests, missing, total, depth)
+ break
+ case <-blocksRequestTimer:
+ alarm = true
+ case <-blockHashesRequestTimer:
+ orignode.RLock()
+ parent := orignode.parent
+ orignode.RUnlock()
+ if parent != nil {
+ // if not root of chain, switch off
+ poolLogger.Debugf("[%x] parent found, hash requests deactivated (after %v total attempts)\n", hash[0:4], blockHashesRequests)
+ blockHashesRequestTimer = nil
+ } else {
+ blockHashesRequests++
+ poolLogger.Debugf("[%x] hash request on root (%v total attempts)\n", hash[0:4], blockHashesRequests)
+ self.requestBlockHashes(parent.hash)
+ blockHashesRequestTimer = time.After(blockHashesRequestInterval * time.Second)
+ }
+ case r, ok := <-controlC:
+ if !ok {
+ break
+ }
+ if running && !r {
+ poolLogger.Debugf("process on section %x... (%v total attempts): missing %v/%v/%v", hash[0:4], blocksRequests, missing, total, depth)
+
+ alarm = false
+ blocksRequestTimer = nil
+ blockHashesRequestTimer = nil
+ processC = nil
+ }
+ if !running && r {
+ poolLogger.Debugf("[%x] on", hash[0:4])
+
+ orignode.RLock()
+ parent := orignode.parent
+ complete := orignode.complete
+ knownParent := orignode.knownParent
+ orignode.RUnlock()
+ if !complete {
+ poolLogger.Debugf("[%x] activate block requests", hash[0:4])
+ blocksRequestTimer = time.After(0)
+ }
+ if parent == nil && !knownParent {
+ // if no parent but not connected to blockchain
+ poolLogger.Debugf("[%x] activate block hashes requests", hash[0:4])
+ blockHashesRequestTimer = time.After(0)
+ } else {
+ blockHashesRequestTimer = nil
+ }
+ alarm = true
+ processC = missingC
+ if !once {
+ // if not run at least once fully, launch iterator
+ processC = make(chan *poolNode)
+ missingC = make(chan *poolNode)
+ self.foldUp(orignode, processC)
+ once = true
+ }
+ }
+ total = lastMissing
+ case <-resetC:
+ once = false
+ init = false
+ done = false
+ case node, ok := <-processC:
+ if !ok {
+ // channel closed, first iteration finished
+ init = true
+ once = true
+ continue
+ }
+ i++
+ // if node has no block
+ node.RLock()
+ block := node.block
+ nhash := node.hash
+ knownParent := node.knownParent
+ node.RUnlock()
+ if !init {
+ depth++
+ }
+ if block == nil {
+ missing++
+ if !init {
+ total++
+ }
+ hashes = append(hashes, nhash)
+ if len(hashes) == blockBatchSize {
+ self.requestBlocks(blocksRequests, hashes)
+ hashes = nil
+ }
+ missingC <- node
+ } else {
+ // block is found
+ if knownParent {
+ // connected to the blockchain, insert the longest chain of blocks
+ var blocks types.Blocks
+ child := node
+ parent := node
+ node.sectionRLock()
+ for child != nil && child.block != nil {
+ parent = child
+ blocks = append(blocks, parent.block)
+ child = parent.child
+ }
+ node.sectionRUnlock()
+ poolLogger.Debugf("[%x] insert %v blocks into blockchain", hash[0:4], len(blocks))
+ if err := self.insertChain(blocks); err != nil {
+ // TODO: not clear which peer we need to address
+ // peerError should dispatch to peer if still connected and disconnect
+ self.peerError(node.source, ErrInvalidBlock, "%v", err)
+ poolLogger.Debugf("invalid block %v", node.hash)
+ poolLogger.Debugf("penalise peers %v (hash), %v (block)", node.peer, node.source)
+ // penalise peer in node.source
+ self.killChain(node, nil)
+ // self.disconnect()
+ break
+ }
+ // if suceeded mark the next one (no block yet) as connected to blockchain
+ if child != nil {
+ child.Lock()
+ child.knownParent = true
+ child.Unlock()
+ }
+ // reset starting node to first node with missing block
+ orignode = child
+ // pop the inserted ancestors off the channel
+ for i := 1; i < len(blocks); i++ {
+ <-processC
+ }
+ // delink inserted chain section
+ self.killChain(node, parent)
+ }
+ }
+ }
+ }
+ poolLogger.Debugf("[%x] quit after\n%v block hashes requests\n%v block requests: missing %v/%v/%v", hash[0:4], blockHashesRequests, blocksRequests, missing, total, depth)
+
+ self.wg.Done()
+ node.sectionLock()
+ node.section.controlC = nil
+ node.sectionUnlock()
+ // this signals that controller not available
+ }()
+ return section
+
+}
+
+func (self *BlockPool) peerError(peerId string, code int, format string, params ...interface{}) {
+ self.peersLock.RLock()
+ defer self.peersLock.RUnlock()
+ peer, ok := self.peers[peerId]
+ if ok {
+ peer.peerError(code, format, params...)
+ }
+}
+
+func (self *BlockPool) requestBlockHashes(hash []byte) {
+ self.peersLock.Lock()
+ defer self.peersLock.Unlock()
+ if self.peer != nil {
+ self.peer.requestBlockHashes(hash)
+ }
+}
+
+func (self *BlockPool) requestBlocks(attempts int, hashes [][]byte) {
+ // distribute block request among known peers
+ self.peersLock.Lock()
+ defer self.peersLock.Unlock()
+ peerCount := len(self.peers)
+ // on first attempt use the best peer
+ if attempts == 0 {
+ self.peer.requestBlocks(hashes)
+ return
+ }
+ repetitions := int(math.Min(float64(peerCount), float64(blocksRequestRepetition)))
+ poolLogger.Debugf("request %v missing blocks from %v/%v peers", len(hashes), repetitions, peerCount)
+ i := 0
+ indexes := rand.Perm(peerCount)[0:(repetitions - 1)]
+ sort.Ints(indexes)
+ for _, peer := range self.peers {
+ if i == indexes[0] {
+ peer.requestBlocks(hashes)
+ indexes = indexes[1:]
+ if len(indexes) == 0 {
+ break
+ }
+ }
+ i++
+ }
+}
+
+func (self *BlockPool) getPeer(peerId string) (*peerInfo, bool) {
+ self.peersLock.RLock()
+ defer self.peersLock.RUnlock()
+ if self.peer != nil && self.peer.id == peerId {
+ return self.peer, true
+ }
+ info, ok := self.peers[peerId]
+ if !ok {
+ panic("unknown peer")
+ }
+ return info, false
+}
+
+func (self *peerInfo) addSection(hash []byte, section *section) {
+ self.lock.Lock()
+ defer self.lock.Unlock()
+ self.sections[string(hash)] = section
+}
+
+func (self *peerInfo) addRoot(node *poolNode) {
+ self.lock.Lock()
+ defer self.lock.Unlock()
+ self.roots = append(self.roots, node)
+}
+
+// (re)starts processes registered for this peer (self)
+func (self *peerInfo) start(peer *peerInfo) {
+ self.lock.Lock()
+ defer self.lock.Unlock()
+ self.quitC = make(chan bool)
+ for _, root := range self.roots {
+ root.sectionRLock()
+ if root.section.bottom != nil {
+ if root.parent == nil {
+ self.requestBlockHashes(root.hash)
+ }
+ }
+ root.sectionRUnlock()
+ }
+ self.roots = nil
+ self.controlSections(peer, true)
+}
+
+// (re)starts process without requests, only suicide timer
+func (self *peerInfo) stop(peer *peerInfo) {
+ self.lock.RLock()
+ defer self.lock.RUnlock()
+ close(self.quitC)
+ self.controlSections(peer, false)
+}
+
+func (self *peerInfo) controlSections(peer *peerInfo, on bool) {
+ if peer != nil {
+ peer.lock.RLock()
+ defer peer.lock.RUnlock()
+ }
+ for hash, section := range peer.sections {
+ if section.done() {
+ delete(self.sections, hash)
+ }
+ _, exists := peer.sections[hash]
+ if on || peer == nil || exists {
+ if on {
+ // self is best peer
+ section.start()
+ } else {
+ // (re)starts process without requests, only suicide timer
+ section.stop()
+ }
+ }
+ }
+}
+
+// called when parent is found in pool
+// parent and child are guaranteed to be on different sections
+func (self *BlockPool) link(parent, child *poolNode) {
+ var top bool
+ parent.sectionLock()
+ if child != nil {
+ child.sectionLock()
+ }
+ if parent == parent.section.top && parent.section.top != nil {
+ top = true
+ }
+ var bottom bool
+
+ if child == child.section.bottom {
+ bottom = true
+ }
+ if parent.child != child {
+ orphan := parent.child
+ if orphan != nil {
+ // got a fork in the chain
+ if top {
+ orphan.lock.Lock()
+ // make old child orphan
+ orphan.parent = nil
+ orphan.lock.Unlock()
+ } else { // we are under section lock
+ // make old child orphan
+ orphan.parent = nil
+ // reset section objects above the fork
+ nchild := orphan.child
+ node := orphan
+ section := &section{bottom: orphan}
+ for node.section == nchild.section {
+ node = nchild
+ node.section = section
+ nchild = node.child
+ }
+ section.top = node
+ // set up a suicide
+ self.processSection(orphan).stop()
+ }
+ } else {
+ // child is on top of a chain need to close section
+ child.section.bottom = child
+ }
+ // adopt new child
+ parent.child = child
+ if !top {
+ parent.section.top = parent
+ // restart section process so that shorter section is scanned for blocks
+ parent.section.reset()
+ }
+ }
+
+ if child != nil {
+ if child.parent != parent {
+ stepParent := child.parent
+ if stepParent != nil {
+ if bottom {
+ stepParent.Lock()
+ stepParent.child = nil
+ stepParent.Unlock()
+ } else {
+ // we are on the same section
+ // if it is a aberrant reverse fork,
+ stepParent.child = nil
+ node := stepParent
+ nparent := stepParent.child
+ section := &section{top: stepParent}
+ for node.section == nparent.section {
+ node = nparent
+ node.section = section
+ node = node.parent
+ }
+ }
+ } else {
+ // linking to a root node, ie. parent is under the root of a chain
+ parent.section.top = parent
+ }
+ }
+ child.parent = parent
+ child.section.bottom = child
+ }
+ // this needed if someone lied about the parent before
+ child.knownParent = false
+
+ parent.sectionUnlock()
+ if child != nil {
+ child.sectionUnlock()
+ }
+}
+
+// this immediately kills the chain from node to end (inclusive) section by section
+func (self *BlockPool) killChain(node *poolNode, end *poolNode) {
+ poolLogger.Debugf("kill chain section with root node %v", node)
+
+ node.sectionLock()
+ node.section.abort()
+ self.set(node.hash, nil)
+ child := node.child
+ top := node.section.top
+ i := 1
+ self.wg.Add(1)
+ go func() {
+ var quit bool
+ for node != top && node != end && child != nil {
+ node = child
+ select {
+ case <-self.quit:
+ quit = true
+ break
+ default:
+ }
+ self.set(node.hash, nil)
+ child = node.child
+ }
+ poolLogger.Debugf("killed chain section of %v blocks with root node %v", i, node)
+ if !quit {
+ if node == top {
+ if node != end && child != nil && end != nil {
+ //
+ self.killChain(child, end)
+ }
+ } else {
+ if child != nil {
+ // delink rest of this section if ended midsection
+ child.section.bottom = child
+ child.parent = nil
+ }
+ }
+ }
+ node.section.bottom = nil
+ node.sectionUnlock()
+ self.wg.Done()
+ }()
+}
+
+// structure to store long range links on chain to skip along
+type section struct {
+ lock sync.RWMutex
+ bottom *poolNode
+ top *poolNode
+ controlC chan bool
+ resetC chan bool
+}
+
+func (self *section) start() {
+ self.lock.RLock()
+ defer self.lock.RUnlock()
+ if self.controlC != nil {
+ self.controlC <- true
+ }
+}
+
+func (self *section) stop() {
+ self.lock.RLock()
+ defer self.lock.RUnlock()
+ if self.controlC != nil {
+ self.controlC <- false
+ }
+}
+
+func (self *section) reset() {
+ self.lock.RLock()
+ defer self.lock.RUnlock()
+ if self.controlC != nil {
+ self.resetC <- true
+ self.controlC <- false
+ }
+}
+
+func (self *section) abort() {
+ self.lock.Lock()
+ defer self.lock.Unlock()
+ if self.controlC != nil {
+ close(self.controlC)
+ self.controlC = nil
+ }
+}
+
+func (self *section) done() bool {
+ self.lock.Lock()
+ defer self.lock.Unlock()
+ if self.controlC != nil {
+ return true
+ }
+ return false
+}
+
+func (self *BlockPool) get(hash []byte) (node *poolNode) {
+ self.lock.Lock()
+ defer self.lock.Unlock()
+ return self.pool[string(hash)]
+}
+
+func (self *BlockPool) set(hash []byte, node *poolNode) {
+ self.lock.Lock()
+ defer self.lock.Unlock()
+ self.pool[string(hash)] = node
+}
+
+// first time for block request, this iteration retrieves nodes of the chain
+// from node up to top (all the way if nil) via child links
+// copies the controller
+// and feeds nodeC channel
+// this is performed under section readlock to prevent top from going away
+// when
+func (self *BlockPool) foldUp(node *poolNode, nodeC chan *poolNode) {
+ self.wg.Add(1)
+ go func() {
+ node.sectionRLock()
+ defer node.sectionRUnlock()
+ for node != nil {
+ select {
+ case <-self.quit:
+ break
+ case nodeC <- node:
+ if node == node.section.top {
+ break
+ }
+ node = node.child
+ }
+ }
+ close(nodeC)
+ self.wg.Done()
+ }()
+}
+
+func (self *poolNode) Lock() {
+ self.sectionLock()
+ self.lock.Lock()
+}
+
+func (self *poolNode) Unlock() {
+ self.lock.Unlock()
+ self.sectionUnlock()
+}
+
+func (self *poolNode) RLock() {
+ self.lock.RLock()
+}
+
+func (self *poolNode) RUnlock() {
+ self.lock.RUnlock()
+}
+
+func (self *poolNode) sectionLock() {
+ self.lock.RLock()
+ defer self.lock.RUnlock()
+ self.section.lock.Lock()
+}
+
+func (self *poolNode) sectionUnlock() {
+ self.lock.RLock()
+ defer self.lock.RUnlock()
+ self.section.lock.Unlock()
+}
+
+func (self *poolNode) sectionRLock() {
+ self.lock.RLock()
+ defer self.lock.RUnlock()
+ self.section.lock.RLock()
+}
+
+func (self *poolNode) sectionRUnlock() {
+ self.lock.RLock()
+ defer self.lock.RUnlock()
+ self.section.lock.RUnlock()
+}
diff --git a/eth/block_pool_test.go b/eth/block_pool_test.go
new file mode 100644
index 000000000..315cc748d
--- /dev/null
+++ b/eth/block_pool_test.go
@@ -0,0 +1,198 @@
+package eth
+
+import (
+ "bytes"
+ "fmt"
+ "log"
+ "os"
+ "sync"
+ "testing"
+
+ "github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/crypto"
+ "github.com/ethereum/go-ethereum/ethutil"
+ ethlogger "github.com/ethereum/go-ethereum/logger"
+)
+
+var sys = ethlogger.NewStdLogSystem(os.Stdout, log.LstdFlags, ethlogger.LogLevel(ethlogger.DebugDetailLevel))
+
+type testChainManager struct {
+ knownBlock func(hash []byte) bool
+ addBlock func(*types.Block) error
+ checkPoW func(*types.Block) bool
+}
+
+func (self *testChainManager) KnownBlock(hash []byte) bool {
+ if self.knownBlock != nil {
+ return self.knownBlock(hash)
+ }
+ return false
+}
+
+func (self *testChainManager) AddBlock(block *types.Block) error {
+ if self.addBlock != nil {
+ return self.addBlock(block)
+ }
+ return nil
+}
+
+func (self *testChainManager) CheckPoW(block *types.Block) bool {
+ if self.checkPoW != nil {
+ return self.checkPoW(block)
+ }
+ return false
+}
+
+func knownBlock(hashes ...[]byte) (f func([]byte) bool) {
+ f = func(block []byte) bool {
+ for _, hash := range hashes {
+ if bytes.Compare(block, hash) == 0 {
+ return true
+ }
+ }
+ return false
+ }
+ return
+}
+
+func addBlock(hashes ...[]byte) (f func(*types.Block) error) {
+ f = func(block *types.Block) error {
+ for _, hash := range hashes {
+ if bytes.Compare(block.Hash(), hash) == 0 {
+ return fmt.Errorf("invalid by test")
+ }
+ }
+ return nil
+ }
+ return
+}
+
+func checkPoW(hashes ...[]byte) (f func(*types.Block) bool) {
+ f = func(block *types.Block) bool {
+ for _, hash := range hashes {
+ if bytes.Compare(block.Hash(), hash) == 0 {
+ return false
+ }
+ }
+ return true
+ }
+ return
+}
+
+func newTestChainManager(knownBlocks [][]byte, invalidBlocks [][]byte, invalidPoW [][]byte) *testChainManager {
+ return &testChainManager{
+ knownBlock: knownBlock(knownBlocks...),
+ addBlock: addBlock(invalidBlocks...),
+ checkPoW: checkPoW(invalidPoW...),
+ }
+}
+
+type intToHash map[int][]byte
+
+type hashToInt map[string]int
+
+type testHashPool struct {
+ intToHash
+ hashToInt
+}
+
+func newHash(i int) []byte {
+ return crypto.Sha3([]byte(string(i)))
+}
+
+func newTestBlockPool(knownBlockIndexes []int, invalidBlockIndexes []int, invalidPoWIndexes []int) (hashPool *testHashPool, blockPool *BlockPool) {
+ hashPool = &testHashPool{make(intToHash), make(hashToInt)}
+ knownBlocks := hashPool.indexesToHashes(knownBlockIndexes)
+ invalidBlocks := hashPool.indexesToHashes(invalidBlockIndexes)
+ invalidPoW := hashPool.indexesToHashes(invalidPoWIndexes)
+ blockPool = NewBlockPool(newTestChainManager(knownBlocks, invalidBlocks, invalidPoW))
+ return
+}
+
+func (self *testHashPool) indexesToHashes(indexes []int) (hashes [][]byte) {
+ for _, i := range indexes {
+ hash, found := self.intToHash[i]
+ if !found {
+ hash = newHash(i)
+ self.intToHash[i] = hash
+ self.hashToInt[string(hash)] = i
+ }
+ hashes = append(hashes, hash)
+ }
+ return
+}
+
+func (self *testHashPool) hashesToIndexes(hashes [][]byte) (indexes []int) {
+ for _, hash := range hashes {
+ i, found := self.hashToInt[string(hash)]
+ if !found {
+ i = -1
+ }
+ indexes = append(indexes, i)
+ }
+ return
+}
+
+type protocolChecker struct {
+ blockHashesRequests []int
+ blocksRequests [][]int
+ invalidBlocks []error
+ hashPool *testHashPool
+ lock sync.Mutex
+}
+
+// -1 is special: not found (a hash never seen)
+func (self *protocolChecker) requestBlockHashesCallBack() (requestBlockHashesCallBack func([]byte) error) {
+ requestBlockHashesCallBack = func(hash []byte) error {
+ indexes := self.hashPool.hashesToIndexes([][]byte{hash})
+ self.lock.Lock()
+ defer self.lock.Unlock()
+ self.blockHashesRequests = append(self.blockHashesRequests, indexes[0])
+ return nil
+ }
+ return
+}
+
+func (self *protocolChecker) requestBlocksCallBack() (requestBlocksCallBack func([][]byte) error) {
+ requestBlocksCallBack = func(hashes [][]byte) error {
+ indexes := self.hashPool.hashesToIndexes(hashes)
+ self.lock.Lock()
+ defer self.lock.Unlock()
+ self.blocksRequests = append(self.blocksRequests, indexes)
+ return nil
+ }
+ return
+}
+
+func (self *protocolChecker) invalidBlockCallBack() (invalidBlockCallBack func(error)) {
+ invalidBlockCallBack = func(err error) {
+ self.invalidBlocks = append(self.invalidBlocks, err)
+ }
+ return
+}
+
+func TestAddPeer(t *testing.T) {
+ ethlogger.AddLogSystem(sys)
+ knownBlockIndexes := []int{0, 1}
+ invalidBlockIndexes := []int{2, 3}
+ invalidPoWIndexes := []int{4, 5}
+ hashPool, blockPool := newTestBlockPool(knownBlockIndexes, invalidBlockIndexes, invalidPoWIndexes)
+ // TODO:
+ // hashPool, blockPool, blockChainChecker = newTestBlockPool(knownBlockIndexes, invalidBlockIndexes, invalidPoWIndexes)
+ peer0 := &protocolChecker{
+ // blockHashesRequests: make([]int),
+ // blocksRequests: make([][]int),
+ // invalidBlocks: make([]error),
+ hashPool: hashPool,
+ }
+ best := blockPool.AddPeer(ethutil.Big1, newHash(100), "0",
+ peer0.requestBlockHashesCallBack(),
+ peer0.requestBlocksCallBack(),
+ peer0.invalidBlockCallBack(),
+ )
+ if !best {
+ t.Errorf("peer not accepted as best")
+ }
+ blockPool.Stop()
+
+}
diff --git a/eth/error.go b/eth/error.go
new file mode 100644
index 000000000..d1daad575
--- /dev/null
+++ b/eth/error.go
@@ -0,0 +1,71 @@
+package eth
+
+import (
+ "fmt"
+)
+
+const (
+ ErrMsgTooLarge = iota
+ ErrDecode
+ ErrInvalidMsgCode
+ ErrProtocolVersionMismatch
+ ErrNetworkIdMismatch
+ ErrGenesisBlockMismatch
+ ErrNoStatusMsg
+ ErrExtraStatusMsg
+ ErrInvalidBlock
+ ErrInvalidPoW
+ ErrUnrequestedBlock
+)
+
+var errorToString = map[int]string{
+ ErrMsgTooLarge: "Message too long",
+ ErrDecode: "Invalid message",
+ ErrInvalidMsgCode: "Invalid message code",
+ ErrProtocolVersionMismatch: "Protocol version mismatch",
+ ErrNetworkIdMismatch: "NetworkId mismatch",
+ ErrGenesisBlockMismatch: "Genesis block mismatch",
+ ErrNoStatusMsg: "No status message",
+ ErrExtraStatusMsg: "Extra status message",
+ ErrInvalidBlock: "Invalid block",
+ ErrInvalidPoW: "Invalid PoW",
+ ErrUnrequestedBlock: "Unrequested block",
+}
+
+type protocolError struct {
+ Code int
+ fatal bool
+ message string
+ format string
+ params []interface{}
+ // size int
+}
+
+func newProtocolError(code int, format string, params ...interface{}) *protocolError {
+ return &protocolError{Code: code, format: format, params: params}
+}
+
+func ProtocolError(code int, format string, params ...interface{}) (err *protocolError) {
+ err = newProtocolError(code, format, params...)
+ // report(err)
+ return
+}
+
+func (self protocolError) Error() (message string) {
+ message = self.message
+ if message == "" {
+ message, ok := errorToString[self.Code]
+ if !ok {
+ panic("invalid error code")
+ }
+ if self.format != "" {
+ message += ": " + fmt.Sprintf(self.format, self.params...)
+ }
+ self.message = message
+ }
+ return
+}
+
+func (self *protocolError) Fatal() bool {
+ return self.fatal
+}
diff --git a/eth/peer_util.go b/eth/peer_util.go
new file mode 100644
index 000000000..6cf80cde2
--- /dev/null
+++ b/eth/peer_util.go
@@ -0,0 +1,23 @@
+package eth
+
+import (
+ "encoding/json"
+
+ "github.com/ethereum/go-ethereum/ethutil"
+)
+
+func WritePeers(path string, addresses []string) {
+ if len(addresses) > 0 {
+ data, _ := json.MarshalIndent(addresses, "", " ")
+ ethutil.WriteFile(path, data)
+ }
+}
+
+func ReadPeers(path string) (ips []string, err error) {
+ var data string
+ data, err = ethutil.ReadAllFile(path)
+ if err != nil {
+ json.Unmarshal([]byte(data), &ips)
+ }
+ return
+}
diff --git a/eth/protocol.go b/eth/protocol.go
new file mode 100644
index 000000000..3b6f95d44
--- /dev/null
+++ b/eth/protocol.go
@@ -0,0 +1,319 @@
+package eth
+
+import (
+ "bytes"
+ "fmt"
+ "math"
+ "math/big"
+
+ "github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/ethutil"
+ "github.com/ethereum/go-ethereum/p2p"
+ "github.com/ethereum/go-ethereum/rlp"
+)
+
+const (
+ ProtocolVersion = 49
+ NetworkId = 0
+ ProtocolLength = uint64(8)
+ ProtocolMaxMsgSize = 10 * 1024 * 1024
+)
+
+// eth protocol message codes
+const (
+ StatusMsg = iota
+ GetTxMsg // unused
+ TxMsg
+ GetBlockHashesMsg
+ BlockHashesMsg
+ GetBlocksMsg
+ BlocksMsg
+ NewBlockMsg
+)
+
+// ethProtocol represents the ethereum wire protocol
+// instance is running on each peer
+type ethProtocol struct {
+ txPool txPool
+ chainManager chainManager
+ blockPool blockPool
+ peer *p2p.Peer
+ id string
+ rw p2p.MsgReadWriter
+}
+
+// backend is the interface the ethereum protocol backend should implement
+// used as an argument to EthProtocol
+type txPool interface {
+ AddTransactions([]*types.Transaction)
+}
+
+type chainManager interface {
+ GetBlockHashesFromHash(hash []byte, amount uint64) (hashes [][]byte)
+ GetBlock(hash []byte) (block *types.Block)
+ Status() (td *big.Int, currentBlock []byte, genesisBlock []byte)
+}
+
+type blockPool interface {
+ AddBlockHashes(next func() ([]byte, bool), peerId string)
+ AddBlock(block *types.Block, peerId string)
+ AddPeer(td *big.Int, currentBlock []byte, peerId string, requestHashes func([]byte) error, requestBlocks func([][]byte) error, peerError func(int, string, ...interface{})) (best bool)
+ RemovePeer(peerId string)
+}
+
+// message structs used for rlp decoding
+type newBlockMsgData struct {
+ Block *types.Block
+ TD *big.Int
+}
+
+type getBlockHashesMsgData struct {
+ Hash []byte
+ Amount uint64
+}
+
+// main entrypoint, wrappers starting a server running the eth protocol
+// use this constructor to attach the protocol ("class") to server caps
+// the Dev p2p layer then runs the protocol instance on each peer
+func EthProtocol(txPool txPool, chainManager chainManager, blockPool blockPool) p2p.Protocol {
+ return p2p.Protocol{
+ Name: "eth",
+ Version: ProtocolVersion,
+ Length: ProtocolLength,
+ Run: func(peer *p2p.Peer, rw p2p.MsgReadWriter) error {
+ return runEthProtocol(txPool, chainManager, blockPool, peer, rw)
+ },
+ }
+}
+
+// the main loop that handles incoming messages
+// note RemovePeer in the post-disconnect hook
+func runEthProtocol(txPool txPool, chainManager chainManager, blockPool blockPool, peer *p2p.Peer, rw p2p.MsgReadWriter) (err error) {
+ self := &ethProtocol{
+ txPool: txPool,
+ chainManager: chainManager,
+ blockPool: blockPool,
+ rw: rw,
+ peer: peer,
+ id: (string)(peer.Identity().Pubkey()),
+ }
+ err = self.handleStatus()
+ if err == nil {
+ for {
+ err = self.handle()
+ if err != nil {
+ fmt.Println(err)
+ self.blockPool.RemovePeer(self.id)
+ break
+ }
+ }
+ }
+ return
+}
+
+func (self *ethProtocol) handle() error {
+ msg, err := self.rw.ReadMsg()
+ if err != nil {
+ return err
+ }
+ if msg.Size > ProtocolMaxMsgSize {
+ return ProtocolError(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize)
+ }
+ // make sure that the payload has been fully consumed
+ defer msg.Discard()
+
+ switch msg.Code {
+
+ case StatusMsg:
+ return ProtocolError(ErrExtraStatusMsg, "")
+
+ case TxMsg:
+ // TODO: rework using lazy RLP stream
+ var txs []*types.Transaction
+ if err := msg.Decode(&txs); err != nil {
+ return ProtocolError(ErrDecode, "%v", err)
+ }
+ self.txPool.AddTransactions(txs)
+
+ case GetBlockHashesMsg:
+ var request getBlockHashesMsgData
+ if err := msg.Decode(&request); err != nil {
+ return ProtocolError(ErrDecode, "%v", err)
+ }
+ hashes := self.chainManager.GetBlockHashesFromHash(request.Hash, request.Amount)
+ return self.rw.EncodeMsg(BlockHashesMsg, ethutil.ByteSliceToInterface(hashes)...)
+
+ case BlockHashesMsg:
+ // TODO: redo using lazy decode , this way very inefficient on known chains
+ msgStream := rlp.NewListStream(msg.Payload, uint64(msg.Size))
+ var err error
+ iter := func() (hash []byte, ok bool) {
+ hash, err = msgStream.Bytes()
+ if err == nil {
+ ok = true
+ }
+ return
+ }
+ self.blockPool.AddBlockHashes(iter, self.id)
+ if err != nil && err != rlp.EOL {
+ return ProtocolError(ErrDecode, "%v", err)
+ }
+
+ case GetBlocksMsg:
+ var blockHashes [][]byte
+ if err := msg.Decode(&blockHashes); err != nil {
+ return ProtocolError(ErrDecode, "%v", err)
+ }
+ max := int(math.Min(float64(len(blockHashes)), blockHashesBatchSize))
+ var blocks []interface{}
+ for i, hash := range blockHashes {
+ if i >= max {
+ break
+ }
+ block := self.chainManager.GetBlock(hash)
+ if block != nil {
+ blocks = append(blocks, block.Value().Raw())
+ }
+ }
+ return self.rw.EncodeMsg(BlocksMsg, blocks...)
+
+ case BlocksMsg:
+ msgStream := rlp.NewListStream(msg.Payload, uint64(msg.Size))
+ for {
+ var block *types.Block
+ if err := msgStream.Decode(&block); err != nil {
+ if err == rlp.EOL {
+ break
+ } else {
+ return ProtocolError(ErrDecode, "%v", err)
+ }
+ }
+ self.blockPool.AddBlock(block, self.id)
+ }
+
+ case NewBlockMsg:
+ var request newBlockMsgData
+ if err := msg.Decode(&request); err != nil {
+ return ProtocolError(ErrDecode, "%v", err)
+ }
+ hash := request.Block.Hash()
+ // to simplify backend interface adding a new block
+ // uses AddPeer followed by AddHashes, AddBlock only if peer is the best peer
+ // (or selected as new best peer)
+ if self.blockPool.AddPeer(request.TD, hash, self.id, self.requestBlockHashes, self.requestBlocks, self.protoErrorDisconnect) {
+ called := true
+ iter := func() (hash []byte, ok bool) {
+ if called {
+ called = false
+ return hash, true
+ } else {
+ return
+ }
+ }
+ self.blockPool.AddBlockHashes(iter, self.id)
+ self.blockPool.AddBlock(request.Block, self.id)
+ }
+
+ default:
+ return ProtocolError(ErrInvalidMsgCode, "%v", msg.Code)
+ }
+ return nil
+}
+
+type statusMsgData struct {
+ ProtocolVersion uint
+ NetworkId uint
+ TD *big.Int
+ CurrentBlock []byte
+ GenesisBlock []byte
+}
+
+func (self *ethProtocol) statusMsg() p2p.Msg {
+ td, currentBlock, genesisBlock := self.chainManager.Status()
+
+ return p2p.NewMsg(StatusMsg,
+ uint32(ProtocolVersion),
+ uint32(NetworkId),
+ td,
+ currentBlock,
+ genesisBlock,
+ )
+}
+
+func (self *ethProtocol) handleStatus() error {
+ // send precanned status message
+ if err := self.rw.WriteMsg(self.statusMsg()); err != nil {
+ return err
+ }
+
+ // read and handle remote status
+ msg, err := self.rw.ReadMsg()
+ if err != nil {
+ return err
+ }
+
+ if msg.Code != StatusMsg {
+ return ProtocolError(ErrNoStatusMsg, "first msg has code %x (!= %x)", msg.Code, StatusMsg)
+ }
+
+ if msg.Size > ProtocolMaxMsgSize {
+ return ProtocolError(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize)
+ }
+
+ var status statusMsgData
+ if err := msg.Decode(&status); err != nil {
+ return ProtocolError(ErrDecode, "%v", err)
+ }
+
+ _, _, genesisBlock := self.chainManager.Status()
+
+ if bytes.Compare(status.GenesisBlock, genesisBlock) != 0 {
+ return ProtocolError(ErrGenesisBlockMismatch, "%x (!= %x)", status.GenesisBlock, genesisBlock)
+ }
+
+ if status.NetworkId != NetworkId {
+ return ProtocolError(ErrNetworkIdMismatch, "%d (!= %d)", status.NetworkId, NetworkId)
+ }
+
+ if ProtocolVersion != status.ProtocolVersion {
+ return ProtocolError(ErrProtocolVersionMismatch, "%d (!= %d)", status.ProtocolVersion, ProtocolVersion)
+ }
+
+ self.peer.Infof("Peer is [eth] capable (%d/%d). TD=%v H=%x\n", status.ProtocolVersion, status.NetworkId, status.TD, status.CurrentBlock[:4])
+
+ //self.blockPool.AddPeer(status.TD, status.CurrentBlock, self.id, self.requestBlockHashes, self.requestBlocks, self.protoErrorDisconnect)
+ self.peer.Infoln("AddPeer(IGNORED)")
+
+ return nil
+}
+
+func (self *ethProtocol) requestBlockHashes(from []byte) error {
+ self.peer.Debugf("fetching hashes (%d) %x...\n", blockHashesBatchSize, from[0:4])
+ return self.rw.EncodeMsg(GetBlockHashesMsg, from, blockHashesBatchSize)
+}
+
+func (self *ethProtocol) requestBlocks(hashes [][]byte) error {
+ self.peer.Debugf("fetching %v blocks", len(hashes))
+ return self.rw.EncodeMsg(GetBlocksMsg, ethutil.ByteSliceToInterface(hashes))
+}
+
+func (self *ethProtocol) protoError(code int, format string, params ...interface{}) (err *protocolError) {
+ err = ProtocolError(code, format, params...)
+ if err.Fatal() {
+ self.peer.Errorln(err)
+ } else {
+ self.peer.Debugln(err)
+ }
+ return
+}
+
+func (self *ethProtocol) protoErrorDisconnect(code int, format string, params ...interface{}) {
+ err := ProtocolError(code, format, params...)
+ if err.Fatal() {
+ self.peer.Errorln(err)
+ // disconnect
+ } else {
+ self.peer.Debugln(err)
+ }
+
+}
diff --git a/eth/protocol_test.go b/eth/protocol_test.go
new file mode 100644
index 000000000..322aec7b7
--- /dev/null
+++ b/eth/protocol_test.go
@@ -0,0 +1,232 @@
+package eth
+
+import (
+ "io"
+ "math/big"
+ "testing"
+
+ "github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/crypto"
+ "github.com/ethereum/go-ethereum/p2p"
+)
+
+type testMsgReadWriter struct {
+ in chan p2p.Msg
+ out chan p2p.Msg
+}
+
+func (self *testMsgReadWriter) In(msg p2p.Msg) {
+ self.in <- msg
+}
+
+func (self *testMsgReadWriter) Out(msg p2p.Msg) {
+ self.in <- msg
+}
+
+func (self *testMsgReadWriter) WriteMsg(msg p2p.Msg) error {
+ self.out <- msg
+ return nil
+}
+
+func (self *testMsgReadWriter) EncodeMsg(code uint64, data ...interface{}) error {
+ return self.WriteMsg(p2p.NewMsg(code, data))
+}
+
+func (self *testMsgReadWriter) ReadMsg() (p2p.Msg, error) {
+ msg, ok := <-self.in
+ if !ok {
+ return msg, io.EOF
+ }
+ return msg, nil
+}
+
+func errorCheck(t *testing.T, expCode int, err error) {
+ perr, ok := err.(*protocolError)
+ if ok && perr != nil {
+ if code := perr.Code; code != expCode {
+ ok = false
+ }
+ }
+ if !ok {
+ t.Errorf("expected error code %v, got %v", ErrNoStatusMsg, err)
+ }
+}
+
+type TestBackend struct {
+ getTransactions func() []*types.Transaction
+ addTransactions func(txs []*types.Transaction)
+ getBlockHashes func(hash []byte, amount uint32) (hashes [][]byte)
+ addBlockHashes func(next func() ([]byte, bool), peerId string)
+ getBlock func(hash []byte) *types.Block
+ addBlock func(block *types.Block, peerId string) (err error)
+ addPeer func(td *big.Int, currentBlock []byte, peerId string, requestHashes func([]byte) error, requestBlocks func([][]byte) error, invalidBlock func(error)) (best bool)
+ removePeer func(peerId string)
+ status func() (td *big.Int, currentBlock []byte, genesisBlock []byte)
+}
+
+func (self *TestBackend) GetTransactions() (txs []*types.Transaction) {
+ if self.getTransactions != nil {
+ txs = self.getTransactions()
+ }
+ return
+}
+
+func (self *TestBackend) AddTransactions(txs []*types.Transaction) {
+ if self.addTransactions != nil {
+ self.addTransactions(txs)
+ }
+}
+
+func (self *TestBackend) GetBlockHashes(hash []byte, amount uint32) (hashes [][]byte) {
+ if self.getBlockHashes != nil {
+ hashes = self.getBlockHashes(hash, amount)
+ }
+ return
+}
+
+<<<<<<< HEAD
+<<<<<<< HEAD
+func (self *TestBackend) AddBlockHashes(next func() ([]byte, bool), peerId string) {
+ if self.addBlockHashes != nil {
+ self.addBlockHashes(next, peerId)
+ }
+}
+
+=======
+func (self *TestBackend) AddHash(hash []byte, peer *p2p.Peer) (more bool) {
+ if self.addHash != nil {
+ more = self.addHash(hash, peer)
+=======
+func (self *TestBackend) AddBlockHashes(next func() ([]byte, bool), peerId string) {
+ if self.addBlockHashes != nil {
+ self.addBlockHashes(next, peerId)
+>>>>>>> eth protocol changes
+ }
+}
+<<<<<<< HEAD
+>>>>>>> initial commit for eth-p2p integration
+=======
+
+>>>>>>> eth protocol changes
+func (self *TestBackend) GetBlock(hash []byte) (block *types.Block) {
+ if self.getBlock != nil {
+ block = self.getBlock(hash)
+ }
+ return
+}
+
+<<<<<<< HEAD
+<<<<<<< HEAD
+func (self *TestBackend) AddBlock(block *types.Block, peerId string) (err error) {
+ if self.addBlock != nil {
+ err = self.addBlock(block, peerId)
+=======
+func (self *TestBackend) AddBlock(td *big.Int, block *types.Block, peer *p2p.Peer) (fetchHashes bool, err error) {
+ if self.addBlock != nil {
+ fetchHashes, err = self.addBlock(td, block, peer)
+>>>>>>> initial commit for eth-p2p integration
+=======
+func (self *TestBackend) AddBlock(block *types.Block, peerId string) (err error) {
+ if self.addBlock != nil {
+ err = self.addBlock(block, peerId)
+>>>>>>> eth protocol changes
+ }
+ return
+}
+
+<<<<<<< HEAD
+<<<<<<< HEAD
+func (self *TestBackend) AddPeer(td *big.Int, currentBlock []byte, peerId string, requestBlockHashes func([]byte) error, requestBlocks func([][]byte) error, invalidBlock func(error)) (best bool) {
+ if self.addPeer != nil {
+ best = self.addPeer(td, currentBlock, peerId, requestBlockHashes, requestBlocks, invalidBlock)
+=======
+func (self *TestBackend) AddPeer(td *big.Int, currentBlock []byte, peer *p2p.Peer) (fetchHashes bool) {
+ if self.addPeer != nil {
+ fetchHashes = self.addPeer(td, currentBlock, peer)
+>>>>>>> initial commit for eth-p2p integration
+=======
+func (self *TestBackend) AddPeer(td *big.Int, currentBlock []byte, peerId string, requestBlockHashes func([]byte) error, requestBlocks func([][]byte) error, invalidBlock func(error)) (best bool) {
+ if self.addPeer != nil {
+ best = self.addPeer(td, currentBlock, peerId, requestBlockHashes, requestBlocks, invalidBlock)
+>>>>>>> eth protocol changes
+ }
+ return
+}
+
+<<<<<<< HEAD
+<<<<<<< HEAD
+=======
+>>>>>>> eth protocol changes
+func (self *TestBackend) RemovePeer(peerId string) {
+ if self.removePeer != nil {
+ self.removePeer(peerId)
+ }
+}
+
+<<<<<<< HEAD
+=======
+>>>>>>> initial commit for eth-p2p integration
+=======
+>>>>>>> eth protocol changes
+func (self *TestBackend) Status() (td *big.Int, currentBlock []byte, genesisBlock []byte) {
+ if self.status != nil {
+ td, currentBlock, genesisBlock = self.status()
+ }
+ return
+}
+
+<<<<<<< HEAD
+<<<<<<< HEAD
+=======
+>>>>>>> eth protocol changes
+// TODO: refactor this into p2p/client_identity
+type peerId struct {
+ pubkey []byte
+}
+
+func (self *peerId) String() string {
+ return "test peer"
+}
+
+func (self *peerId) Pubkey() (pubkey []byte) {
+ pubkey = self.pubkey
+ if len(pubkey) == 0 {
+ pubkey = crypto.GenerateNewKeyPair().PublicKey
+ self.pubkey = pubkey
+ }
+ return
+}
+
+func testPeer() *p2p.Peer {
+ return p2p.NewPeer(&peerId{}, []p2p.Cap{})
+}
+
+func TestErrNoStatusMsg(t *testing.T) {
+<<<<<<< HEAD
+=======
+func TestEth(t *testing.T) {
+>>>>>>> initial commit for eth-p2p integration
+=======
+>>>>>>> eth protocol changes
+ quit := make(chan bool)
+ rw := &testMsgReadWriter{make(chan p2p.Msg, 10), make(chan p2p.Msg, 10)}
+ testBackend := &TestBackend{}
+ var err error
+ go func() {
+<<<<<<< HEAD
+<<<<<<< HEAD
+ err = runEthProtocol(testBackend, testPeer(), rw)
+=======
+ err = runEthProtocol(testBackend, nil, rw)
+>>>>>>> initial commit for eth-p2p integration
+=======
+ err = runEthProtocol(testBackend, testPeer(), rw)
+>>>>>>> eth protocol changes
+ close(quit)
+ }()
+ statusMsg := p2p.NewMsg(4)
+ rw.In(statusMsg)
+ <-quit
+ errorCheck(t, ErrNoStatusMsg, err)
+ // read(t, remote, []byte("hello, world"), nil)
+}
diff --git a/ethereum.go b/ethereum.go
deleted file mode 100644
index 5d74e28e9..000000000
--- a/ethereum.go
+++ /dev/null
@@ -1,659 +0,0 @@
-package eth
-
-import (
- "container/list"
- "encoding/json"
- "fmt"
- "math/big"
- "math/rand"
- "net"
- "path"
- "strconv"
- "strings"
- "sync"
- "sync/atomic"
- "time"
-
- "github.com/ethereum/go-ethereum/core"
- "github.com/ethereum/go-ethereum/crypto"
- "github.com/ethereum/go-ethereum/ethutil"
- "github.com/ethereum/go-ethereum/event"
- "github.com/ethereum/go-ethereum/logger"
- "github.com/ethereum/go-ethereum/rpc"
- "github.com/ethereum/go-ethereum/state"
- "github.com/ethereum/go-ethereum/wire"
-)
-
-const (
- seedTextFileUri string = "http://www.ethereum.org/servers.poc3.txt"
- seedNodeAddress = "poc-7.ethdev.com:30303"
-)
-
-var loggerger = logger.NewLogger("SERV")
-
-func eachPeer(peers *list.List, callback func(*Peer, *list.Element)) {
- // Loop thru the peers and close them (if we had them)
- for e := peers.Front(); e != nil; e = e.Next() {
- callback(e.Value.(*Peer), e)
- }
-}
-
-const (
- processReapingTimeout = 60 // TODO increase
-)
-
-type Ethereum struct {
- // Channel for shutting down the ethereum
- shutdownChan chan bool
- quit chan bool
-
- // DB interface
- db ethutil.Database
- // State manager for processing new blocks and managing the over all states
- blockManager *core.BlockManager
- // The transaction pool. Transaction can be pushed on this pool
- // for later including in the blocks
- txPool *core.TxPool
- // The canonical chain
- blockChain *core.ChainManager
- // The block pool
- blockPool *BlockPool
- // Eventer
- eventMux event.TypeMux
- // Peers
- peers *list.List
- // Nonce
- Nonce uint64
-
- Addr net.Addr
- Port string
-
- blacklist [][]byte
-
- peerMut sync.Mutex
-
- // Capabilities for outgoing peers
- serverCaps Caps
-
- nat NAT
-
- // Specifies the desired amount of maximum peers
- MaxPeers int
-
- Mining bool
-
- listening bool
-
- RpcServer *rpc.JsonRpcServer
-
- keyManager *crypto.KeyManager
-
- clientIdentity wire.ClientIdentity
-
- isUpToDate bool
-
- filterMu sync.RWMutex
- filterId int
- filters map[int]*core.Filter
-}
-
-func New(db ethutil.Database, clientIdentity wire.ClientIdentity, keyManager *crypto.KeyManager, caps Caps, usePnp bool) (*Ethereum, error) {
- var err error
- var nat NAT
-
- if usePnp {
- nat, err = Discover()
- if err != nil {
- loggerger.Debugln("UPnP failed", err)
- }
- }
-
- bootstrapDb(db)
-
- ethutil.Config.Db = db
-
- nonce, _ := ethutil.RandomUint64()
- ethereum := &Ethereum{
- shutdownChan: make(chan bool),
- quit: make(chan bool),
- db: db,
- peers: list.New(),
- Nonce: nonce,
- serverCaps: caps,
- nat: nat,
- keyManager: keyManager,
- clientIdentity: clientIdentity,
- isUpToDate: true,
- filters: make(map[int]*core.Filter),
- }
-
- ethereum.blockPool = NewBlockPool(ethereum)
- ethereum.blockChain = core.NewChainManager(ethereum.EventMux())
- ethereum.txPool = core.NewTxPool(ethereum.blockChain, ethereum, ethereum.EventMux())
- ethereum.blockManager = core.NewBlockManager(ethereum.txPool, ethereum.blockChain, ethereum.EventMux())
- ethereum.blockChain.SetProcessor(ethereum.blockManager)
-
- // Start the tx pool
- ethereum.txPool.Start()
-
- return ethereum, nil
-}
-
-func (s *Ethereum) KeyManager() *crypto.KeyManager {
- return s.keyManager
-}
-
-func (s *Ethereum) ClientIdentity() wire.ClientIdentity {
- return s.clientIdentity
-}
-
-func (s *Ethereum) ChainManager() *core.ChainManager {
- return s.blockChain
-}
-
-func (s *Ethereum) BlockManager() *core.BlockManager {
- return s.blockManager
-}
-
-func (s *Ethereum) TxPool() *core.TxPool {
- return s.txPool
-}
-func (s *Ethereum) BlockPool() *BlockPool {
- return s.blockPool
-}
-func (s *Ethereum) EventMux() *event.TypeMux {
- return &s.eventMux
-}
-func (self *Ethereum) Db() ethutil.Database {
- return self.db
-}
-
-func (s *Ethereum) ServerCaps() Caps {
- return s.serverCaps
-}
-func (s *Ethereum) IsMining() bool {
- return s.Mining
-}
-func (s *Ethereum) PeerCount() int {
- return s.peers.Len()
-}
-func (s *Ethereum) IsUpToDate() bool {
- upToDate := true
- eachPeer(s.peers, func(peer *Peer, e *list.Element) {
- if atomic.LoadInt32(&peer.connected) == 1 {
- if peer.catchingUp == true && peer.versionKnown {
- upToDate = false
- }
- }
- })
- return upToDate
-}
-func (s *Ethereum) PushPeer(peer *Peer) {
- s.peers.PushBack(peer)
-}
-func (s *Ethereum) IsListening() bool {
- return s.listening
-}
-
-func (s *Ethereum) HighestTDPeer() (td *big.Int) {
- td = big.NewInt(0)
-
- eachPeer(s.peers, func(p *Peer, v *list.Element) {
- if p.td.Cmp(td) > 0 {
- td = p.td
- }
- })
-
- return
-}
-
-func (self *Ethereum) BlacklistPeer(peer *Peer) {
- self.blacklist = append(self.blacklist, peer.pubkey)
-}
-
-func (s *Ethereum) AddPeer(conn net.Conn) {
- peer := NewPeer(conn, s, true)
-
- if peer != nil {
- if s.peers.Len() < s.MaxPeers {
- peer.Start()
- } else {
- loggerger.Debugf("Max connected peers reached. Not adding incoming peer.")
- }
- }
-}
-
-func (s *Ethereum) ProcessPeerList(addrs []string) {
- for _, addr := range addrs {
- // TODO Probably requires some sanity checks
- s.ConnectToPeer(addr)
- }
-}
-
-func (s *Ethereum) ConnectToPeer(addr string) error {
- if s.peers.Len() < s.MaxPeers {
- var alreadyConnected bool
-
- ahost, aport, _ := net.SplitHostPort(addr)
- var chost string
-
- ips, err := net.LookupIP(ahost)
-
- if err != nil {
- return err
- } else {
- // If more then one ip is available try stripping away the ipv6 ones
- if len(ips) > 1 {
- var ipsv4 []net.IP
- // For now remove the ipv6 addresses
- for _, ip := range ips {
- if strings.Contains(ip.String(), "::") {
- continue
- } else {
- ipsv4 = append(ipsv4, ip)
- }
- }
- if len(ipsv4) == 0 {
- return fmt.Errorf("[SERV] No IPV4 addresses available for hostname")
- }
-
- // Pick a random ipv4 address, simulating round-robin DNS.
- rand.Seed(time.Now().UTC().UnixNano())
- i := rand.Intn(len(ipsv4))
- chost = ipsv4[i].String()
- } else {
- if len(ips) == 0 {
- return fmt.Errorf("[SERV] No IPs resolved for the given hostname")
- return nil
- }
- chost = ips[0].String()
- }
- }
-
- eachPeer(s.peers, func(p *Peer, v *list.Element) {
- if p.conn == nil {
- return
- }
- phost, pport, _ := net.SplitHostPort(p.conn.RemoteAddr().String())
-
- if phost == chost && pport == aport {
- alreadyConnected = true
- //loggerger.Debugf("Peer %s already added.\n", chost)
- return
- }
- })
-
- if alreadyConnected {
- return nil
- }
-
- NewOutboundPeer(addr, s, s.serverCaps)
- }
-
- return nil
-}
-
-func (s *Ethereum) OutboundPeers() []*Peer {
- // Create a new peer slice with at least the length of the total peers
- outboundPeers := make([]*Peer, s.peers.Len())
- length := 0
- eachPeer(s.peers, func(p *Peer, e *list.Element) {
- if !p.inbound && p.conn != nil {
- outboundPeers[length] = p
- length++
- }
- })
-
- return outboundPeers[:length]
-}
-
-func (s *Ethereum) InboundPeers() []*Peer {
- // Create a new peer slice with at least the length of the total peers
- inboundPeers := make([]*Peer, s.peers.Len())
- length := 0
- eachPeer(s.peers, func(p *Peer, e *list.Element) {
- if p.inbound {
- inboundPeers[length] = p
- length++
- }
- })
-
- return inboundPeers[:length]
-}
-
-func (s *Ethereum) InOutPeers() []*Peer {
- // Reap the dead peers first
- s.reapPeers()
-
- // Create a new peer slice with at least the length of the total peers
- inboundPeers := make([]*Peer, s.peers.Len())
- length := 0
- eachPeer(s.peers, func(p *Peer, e *list.Element) {
- // Only return peers with an actual ip
- if len(p.host) > 0 {
- inboundPeers[length] = p
- length++
- }
- })
-
- return inboundPeers[:length]
-}
-
-func (s *Ethereum) Broadcast(msgType wire.MsgType, data []interface{}) {
- msg := wire.NewMessage(msgType, data)
- s.BroadcastMsg(msg)
-}
-
-func (s *Ethereum) BroadcastMsg(msg *wire.Msg) {
- eachPeer(s.peers, func(p *Peer, e *list.Element) {
- p.QueueMessage(msg)
- })
-}
-
-func (s *Ethereum) Peers() *list.List {
- return s.peers
-}
-
-func (s *Ethereum) reapPeers() {
- eachPeer(s.peers, func(p *Peer, e *list.Element) {
- if atomic.LoadInt32(&p.disconnect) == 1 || (p.inbound && (time.Now().Unix()-p.lastPong) > int64(5*time.Minute)) {
- s.removePeerElement(e)
- }
- })
-}
-
-func (s *Ethereum) removePeerElement(e *list.Element) {
- s.peerMut.Lock()
- defer s.peerMut.Unlock()
-
- s.peers.Remove(e)
-
- s.eventMux.Post(PeerListEvent{s.peers})
-}
-
-func (s *Ethereum) RemovePeer(p *Peer) {
- eachPeer(s.peers, func(peer *Peer, e *list.Element) {
- if peer == p {
- s.removePeerElement(e)
- }
- })
-}
-
-func (s *Ethereum) reapDeadPeerHandler() {
- reapTimer := time.NewTicker(processReapingTimeout * time.Second)
-
- for {
- select {
- case <-reapTimer.C:
- s.reapPeers()
- }
- }
-}
-
-// Start the ethereum
-func (s *Ethereum) Start(seed bool) {
- s.blockPool.Start()
-
- // Bind to addr and port
- ln, err := net.Listen("tcp", ":"+s.Port)
- if err != nil {
- loggerger.Warnf("Port %s in use. Connection listening disabled. Acting as client", s.Port)
- s.listening = false
- } else {
- s.listening = true
- // Starting accepting connections
- loggerger.Infoln("Ready and accepting connections")
- // Start the peer handler
- go s.peerHandler(ln)
- }
-
- if s.nat != nil {
- go s.upnpUpdateThread()
- }
-
- // Start the reaping processes
- go s.reapDeadPeerHandler()
- go s.update()
- go s.filterLoop()
-
- if seed {
- s.Seed()
- }
- s.ConnectToPeer("localhost:40404")
- loggerger.Infoln("Server started")
-}
-
-func (s *Ethereum) Seed() {
- // Sorry Py person. I must blacklist. you perform badly
- s.blacklist = append(s.blacklist, ethutil.Hex2Bytes("64656330303561383532336435376331616537643864663236623336313863373537353163636634333530626263396330346237336262623931383064393031"))
- ips := PastPeers()
- if len(ips) > 0 {
- for _, ip := range ips {
- loggerger.Infoln("Connecting to previous peer ", ip)
- s.ConnectToPeer(ip)
- }
- } else {
- loggerger.Debugln("Retrieving seed nodes")
-
- // Eth-Go Bootstrapping
- ips, er := net.LookupIP("seed.bysh.me")
- if er == nil {
- peers := []string{}
- for _, ip := range ips {
- node := fmt.Sprintf("%s:%d", ip.String(), 30303)
- loggerger.Debugln("Found DNS Go Peer:", node)
- peers = append(peers, node)
- }
- s.ProcessPeerList(peers)
- }
-
- // Official DNS Bootstrapping
- _, nodes, err := net.LookupSRV("eth", "tcp", "ethereum.org")
- if err == nil {
- peers := []string{}
- // Iterate SRV nodes
- for _, n := range nodes {
- target := n.Target
- port := strconv.Itoa(int(n.Port))
- // Resolve target to ip (Go returns list, so may resolve to multiple ips?)
- addr, err := net.LookupHost(target)
- if err == nil {
- for _, a := range addr {
- // Build string out of SRV port and Resolved IP
- peer := net.JoinHostPort(a, port)
- loggerger.Debugln("Found DNS Bootstrap Peer:", peer)
- peers = append(peers, peer)
- }
- } else {
- loggerger.Debugln("Couldn't resolve :", target)
- }
- }
- // Connect to Peer list
- s.ProcessPeerList(peers)
- }
-
- s.ConnectToPeer(seedNodeAddress)
- }
-}
-
-func (s *Ethereum) peerHandler(listener net.Listener) {
- for {
- conn, err := listener.Accept()
- if err != nil {
- loggerger.Debugln(err)
-
- continue
- }
-
- go s.AddPeer(conn)
- }
-}
-
-func (s *Ethereum) Stop() {
- // Stop eventMux first, it will close all subscriptions.
- s.eventMux.Stop()
-
- // Close the database
- defer s.db.Close()
-
- var ips []string
- eachPeer(s.peers, func(p *Peer, e *list.Element) {
- ips = append(ips, p.conn.RemoteAddr().String())
- })
-
- if len(ips) > 0 {
- d, _ := json.MarshalIndent(ips, "", " ")
- ethutil.WriteFile(path.Join(ethutil.Config.ExecPath, "known_peers.json"), d)
- }
-
- eachPeer(s.peers, func(p *Peer, e *list.Element) {
- p.Stop()
- })
-
- close(s.quit)
-
- if s.RpcServer != nil {
- s.RpcServer.Stop()
- }
- s.txPool.Stop()
- s.blockPool.Stop()
-
- loggerger.Infoln("Server stopped")
- close(s.shutdownChan)
-}
-
-// This function will wait for a shutdown and resumes main thread execution
-func (s *Ethereum) WaitForShutdown() {
- <-s.shutdownChan
-}
-
-func (s *Ethereum) upnpUpdateThread() {
- // Go off immediately to prevent code duplication, thereafter we renew
- // lease every 15 minutes.
- timer := time.NewTimer(5 * time.Minute)
- lport, _ := strconv.ParseInt(s.Port, 10, 16)
- first := true
-out:
- for {
- select {
- case <-timer.C:
- var err error
- _, err = s.nat.AddPortMapping("TCP", int(lport), int(lport), "eth listen port", 20*60)
- if err != nil {
- loggerger.Debugln("can't add UPnP port mapping:", err)
- break out
- }
- if first && err == nil {
- _, err = s.nat.GetExternalAddress()
- if err != nil {
- loggerger.Debugln("UPnP can't get external address:", err)
- continue out
- }
- first = false
- }
- timer.Reset(time.Minute * 15)
- case <-s.quit:
- break out
- }
- }
-
- timer.Stop()
-
- if err := s.nat.DeletePortMapping("TCP", int(lport), int(lport)); err != nil {
- loggerger.Debugln("unable to remove UPnP port mapping:", err)
- } else {
- loggerger.Debugln("succesfully disestablished UPnP port mapping")
- }
-}
-
-func (self *Ethereum) update() {
- upToDateTimer := time.NewTicker(1 * time.Second)
-
-out:
- for {
- select {
- case <-upToDateTimer.C:
- if self.IsUpToDate() && !self.isUpToDate {
- self.eventMux.Post(ChainSyncEvent{false})
- self.isUpToDate = true
- } else if !self.IsUpToDate() && self.isUpToDate {
- self.eventMux.Post(ChainSyncEvent{true})
- self.isUpToDate = false
- }
- case <-self.quit:
- break out
- }
- }
-}
-
-// InstallFilter adds filter for blockchain events.
-// The filter's callbacks will run for matching blocks and messages.
-// The filter should not be modified after it has been installed.
-func (self *Ethereum) InstallFilter(filter *core.Filter) (id int) {
- self.filterMu.Lock()
- id = self.filterId
- self.filters[id] = filter
- self.filterId++
- self.filterMu.Unlock()
- return id
-}
-
-func (self *Ethereum) UninstallFilter(id int) {
- self.filterMu.Lock()
- delete(self.filters, id)
- self.filterMu.Unlock()
-}
-
-// GetFilter retrieves a filter installed using InstallFilter.
-// The filter may not be modified.
-func (self *Ethereum) GetFilter(id int) *core.Filter {
- self.filterMu.RLock()
- defer self.filterMu.RUnlock()
- return self.filters[id]
-}
-
-func (self *Ethereum) filterLoop() {
- // Subscribe to events
- events := self.eventMux.Subscribe(core.NewBlockEvent{}, state.Messages(nil))
- for event := range events.Chan() {
- switch event := event.(type) {
- case core.NewBlockEvent:
- self.filterMu.RLock()
- for _, filter := range self.filters {
- if filter.BlockCallback != nil {
- filter.BlockCallback(event.Block)
- }
- }
- self.filterMu.RUnlock()
-
- case state.Messages:
- self.filterMu.RLock()
- for _, filter := range self.filters {
- if filter.MessageCallback != nil {
- msgs := filter.FilterMessages(event)
- if len(msgs) > 0 {
- filter.MessageCallback(msgs)
- }
- }
- }
- self.filterMu.RUnlock()
- }
- }
-}
-
-func bootstrapDb(db ethutil.Database) {
- d, _ := db.Get([]byte("ProtocolVersion"))
- protov := ethutil.NewValue(d).Uint()
-
- if protov == 0 {
- db.Put([]byte("ProtocolVersion"), ethutil.NewValue(ProtocolVersion).Bytes())
- }
-}
-
-func PastPeers() []string {
- var ips []string
- data, _ := ethutil.ReadAllFile(path.Join(ethutil.Config.ExecPath, "known_peers.json"))
- json.Unmarshal([]byte(data), &ips)
-
- return ips
-}
diff --git a/event/filter/generic_filter.go b/event/filter/generic_filter.go
index b04b4801e..2ce0f0642 100644
--- a/event/filter/generic_filter.go
+++ b/event/filter/generic_filter.go
@@ -2,19 +2,29 @@ package filter
type Generic struct {
Str1, Str2, Str3 string
+ Data map[string]struct{}
Fn func(data interface{})
}
+// self = registered, f = incoming
func (self Generic) Compare(f Filter) bool {
+ var strMatch, dataMatch = true, true
+
filter := f.(Generic)
- if (len(self.Str1) == 0 || filter.Str1 == self.Str1) &&
- (len(self.Str2) == 0 || filter.Str2 == self.Str2) &&
- (len(self.Str3) == 0 || filter.Str3 == self.Str3) {
- return true
+ if (len(self.Str1) > 0 && filter.Str1 != self.Str1) ||
+ (len(self.Str2) > 0 && filter.Str2 != self.Str2) ||
+ (len(self.Str3) > 0 && filter.Str3 != self.Str3) {
+ strMatch = false
+ }
+
+ for k, _ := range self.Data {
+ if _, ok := filter.Data[k]; !ok {
+ return false
+ }
}
- return false
+ return strMatch && dataMatch
}
func (self Generic) Trigger(data interface{}) {
diff --git a/event/filter/old_filter.go b/event/filter/old_filter.go
new file mode 100644
index 000000000..1a9a88173
--- /dev/null
+++ b/event/filter/old_filter.go
@@ -0,0 +1,94 @@
+// XXX This is the old filter system specifically for messages. This is till in used and could use some refactoring
+package filter
+
+import (
+ "sync"
+
+ "github.com/ethereum/go-ethereum/core"
+ "github.com/ethereum/go-ethereum/event"
+ "github.com/ethereum/go-ethereum/state"
+)
+
+type FilterManager struct {
+ eventMux *event.TypeMux
+
+ filterMu sync.RWMutex
+ filterId int
+ filters map[int]*core.Filter
+
+ quit chan struct{}
+}
+
+func NewFilterManager(mux *event.TypeMux) *FilterManager {
+ return &FilterManager{
+ eventMux: mux,
+ filters: make(map[int]*core.Filter),
+ }
+}
+
+func (self *FilterManager) Start() {
+ go self.filterLoop()
+}
+
+func (self *FilterManager) Stop() {
+ close(self.quit)
+}
+
+func (self *FilterManager) InstallFilter(filter *core.Filter) (id int) {
+ self.filterMu.Lock()
+ id = self.filterId
+ self.filters[id] = filter
+ self.filterId++
+ self.filterMu.Unlock()
+ return id
+}
+
+func (self *FilterManager) UninstallFilter(id int) {
+ self.filterMu.Lock()
+ delete(self.filters, id)
+ self.filterMu.Unlock()
+}
+
+// GetFilter retrieves a filter installed using InstallFilter.
+// The filter may not be modified.
+func (self *FilterManager) GetFilter(id int) *core.Filter {
+ self.filterMu.RLock()
+ defer self.filterMu.RUnlock()
+ return self.filters[id]
+}
+
+func (self *FilterManager) filterLoop() {
+ // Subscribe to events
+ events := self.eventMux.Subscribe(core.NewBlockEvent{}, state.Messages(nil))
+
+out:
+ for {
+ select {
+ case <-self.quit:
+ break out
+ case event := <-events.Chan():
+ switch event := event.(type) {
+ case core.NewBlockEvent:
+ self.filterMu.RLock()
+ for _, filter := range self.filters {
+ if filter.BlockCallback != nil {
+ filter.BlockCallback(event.Block)
+ }
+ }
+ self.filterMu.RUnlock()
+
+ case state.Messages:
+ self.filterMu.RLock()
+ for _, filter := range self.filters {
+ if filter.MessageCallback != nil {
+ msgs := filter.FilterMessages(event)
+ if len(msgs) > 0 {
+ filter.MessageCallback(msgs)
+ }
+ }
+ }
+ self.filterMu.RUnlock()
+ }
+ }
+ }
+}
diff --git a/events.go b/events.go
deleted file mode 100644
index 5fff1d831..000000000
--- a/events.go
+++ /dev/null
@@ -1,11 +0,0 @@
-package eth
-
-import "container/list"
-
-type PeerListEvent struct {
- Peers *list.List
-}
-
-type ChainSyncEvent struct {
- InSync bool
-}
diff --git a/javascript/javascript_runtime.go b/javascript/javascript_runtime.go
index a26f0154e..af1405049 100644
--- a/javascript/javascript_runtime.go
+++ b/javascript/javascript_runtime.go
@@ -7,10 +7,10 @@ import (
"path"
"path/filepath"
- "github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/cmd/utils"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/eth"
"github.com/ethereum/go-ethereum/ethutil"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/logger"
@@ -203,7 +203,7 @@ func (self *JSRE) addPeer(call otto.FunctionCall) otto.Value {
if err != nil {
return otto.FalseValue()
}
- self.ethereum.ConnectToPeer(host)
+ self.ethereum.SuggestPeer(host)
return otto.TrueValue()
}
diff --git a/javascript/types.go b/javascript/types.go
index d5acaecce..cf5a6677b 100644
--- a/javascript/types.go
+++ b/javascript/types.go
@@ -3,7 +3,7 @@ package javascript
import (
"fmt"
- "github.com/ethereum/go-ethereum"
+ "github.com/ethereum/go-ethereum/eth"
"github.com/ethereum/go-ethereum/ethutil"
"github.com/ethereum/go-ethereum/state"
"github.com/ethereum/go-ethereum/ui"
diff --git a/miner/miner.go b/miner/miner.go
index f63096b63..d909c228b 100644
--- a/miner/miner.go
+++ b/miner/miner.go
@@ -27,7 +27,7 @@ import (
"math/big"
"sort"
- "github.com/ethereum/go-ethereum"
+ "github.com/ethereum/go-ethereum/eth"
"github.com/ethereum/go-ethereum/ethutil"
"github.com/ethereum/go-ethereum/pow"
"github.com/ethereum/go-ethereum/pow/ezp"
@@ -36,7 +36,6 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/logger"
- "github.com/ethereum/go-ethereum/wire"
)
type LocalTx struct {
@@ -217,7 +216,7 @@ func (self *Miner) mine() {
if err != nil {
minerlogger.Infoln(err)
} else {
- self.eth.Broadcast(wire.MsgBlockTy, []interface{}{block.Value().Val})
+ self.eth.EventMux().Post(core.NewMinedBlockEvent{block})
minerlogger.Infof("🔨 Mined block %x\n", block.Hash())
minerlogger.Infoln(block)
@@ -246,7 +245,7 @@ func (self *Miner) finiliseTxs() types.Transactions {
}
// Faster than append
- for _, tx := range self.eth.TxPool().CurrentTransactions() {
+ for _, tx := range self.eth.TxPool().GetTransactions() {
if tx.GasPrice().Cmp(self.MinAcceptedGasPrice) >= 0 {
txs[actualSize] = tx
actualSize++
diff --git a/nat.go b/nat.go
deleted file mode 100644
index 999308eb2..000000000
--- a/nat.go
+++ /dev/null
@@ -1,12 +0,0 @@
-package eth
-
-import (
- "net"
-)
-
-// protocol is either "udp" or "tcp"
-type NAT interface {
- GetExternalAddress() (addr net.IP, err error)
- AddPortMapping(protocol string, externalPort, internalPort int, description string, timeout int) (mappedExternalPort int, err error)
- DeletePortMapping(protocol string, externalPort, internalPort int) (err error)
-}
diff --git a/natpmp.go b/natpmp.go
deleted file mode 100644
index 489342a4b..000000000
--- a/natpmp.go
+++ /dev/null
@@ -1,55 +0,0 @@
-package eth
-
-import (
- "fmt"
- "net"
-
- natpmp "github.com/jackpal/go-nat-pmp"
-)
-
-// Adapt the NAT-PMP protocol to the NAT interface
-
-// TODO:
-// + Register for changes to the external address.
-// + Re-register port mapping when router reboots.
-// + A mechanism for keeping a port mapping registered.
-
-type natPMPClient struct {
- client *natpmp.Client
-}
-
-func NewNatPMP(gateway net.IP) (nat NAT) {
- return &natPMPClient{natpmp.NewClient(gateway)}
-}
-
-func (n *natPMPClient) GetExternalAddress() (addr net.IP, err error) {
- response, err := n.client.GetExternalAddress()
- if err != nil {
- return
- }
- ip := response.ExternalIPAddress
- addr = net.IPv4(ip[0], ip[1], ip[2], ip[3])
- return
-}
-
-func (n *natPMPClient) AddPortMapping(protocol string, externalPort, internalPort int,
- description string, timeout int) (mappedExternalPort int, err error) {
- if timeout <= 0 {
- err = fmt.Errorf("timeout must not be <= 0")
- return
- }
- // Note order of port arguments is switched between our AddPortMapping and the client's AddPortMapping.
- response, err := n.client.AddPortMapping(protocol, internalPort, externalPort, timeout)
- if err != nil {
- return
- }
- mappedExternalPort = int(response.MappedExternalPort)
- return
-}
-
-func (n *natPMPClient) DeletePortMapping(protocol string, externalPort, internalPort int) (err error) {
- // To destroy a mapping, send an add-port with
- // an internalPort of the internal port to destroy, an external port of zero and a time of zero.
- _, err = n.client.AddPortMapping(protocol, internalPort, 0, 0)
- return
-}
diff --git a/natupnp.go b/natupnp.go
deleted file mode 100644
index c7f9eeb62..000000000
--- a/natupnp.go
+++ /dev/null
@@ -1,338 +0,0 @@
-package eth
-
-// Just enough UPnP to be able to forward ports
-//
-
-import (
- "bytes"
- "encoding/xml"
- "errors"
- "net"
- "net/http"
- "os"
- "strconv"
- "strings"
- "time"
-)
-
-type upnpNAT struct {
- serviceURL string
- ourIP string
-}
-
-func Discover() (nat NAT, err error) {
- ssdp, err := net.ResolveUDPAddr("udp4", "239.255.255.250:1900")
- if err != nil {
- return
- }
- conn, err := net.ListenPacket("udp4", ":0")
- if err != nil {
- return
- }
- socket := conn.(*net.UDPConn)
- defer socket.Close()
-
- err = socket.SetDeadline(time.Now().Add(10 * time.Second))
- if err != nil {
- return
- }
-
- st := "ST: urn:schemas-upnp-org:device:InternetGatewayDevice:1\r\n"
- buf := bytes.NewBufferString(
- "M-SEARCH * HTTP/1.1\r\n" +
- "HOST: 239.255.255.250:1900\r\n" +
- st +
- "MAN: \"ssdp:discover\"\r\n" +
- "MX: 2\r\n\r\n")
- message := buf.Bytes()
- answerBytes := make([]byte, 1024)
- for i := 0; i < 3; i++ {
- _, err = socket.WriteToUDP(message, ssdp)
- if err != nil {
- return
- }
- var n int
- n, _, err = socket.ReadFromUDP(answerBytes)
- if err != nil {
- continue
- // socket.Close()
- // return
- }
- answer := string(answerBytes[0:n])
- if strings.Index(answer, "\r\n"+st) < 0 {
- continue
- }
- // HTTP header field names are case-insensitive.
- // http://www.w3.org/Protocols/rfc2616/rfc2616-sec4.html#sec4.2
- locString := "\r\nlocation: "
- answer = strings.ToLower(answer)
- locIndex := strings.Index(answer, locString)
- if locIndex < 0 {
- continue
- }
- loc := answer[locIndex+len(locString):]
- endIndex := strings.Index(loc, "\r\n")
- if endIndex < 0 {
- continue
- }
- locURL := loc[0:endIndex]
- var serviceURL string
- serviceURL, err = getServiceURL(locURL)
- if err != nil {
- return
- }
- var ourIP string
- ourIP, err = getOurIP()
- if err != nil {
- return
- }
- nat = &upnpNAT{serviceURL: serviceURL, ourIP: ourIP}
- return
- }
- err = errors.New("UPnP port discovery failed.")
- return
-}
-
-// service represents the Service type in an UPnP xml description.
-// Only the parts we care about are present and thus the xml may have more
-// fields than present in the structure.
-type service struct {
- ServiceType string `xml:"serviceType"`
- ControlURL string `xml:"controlURL"`
-}
-
-// deviceList represents the deviceList type in an UPnP xml description.
-// Only the parts we care about are present and thus the xml may have more
-// fields than present in the structure.
-type deviceList struct {
- XMLName xml.Name `xml:"deviceList"`
- Device []device `xml:"device"`
-}
-
-// serviceList represents the serviceList type in an UPnP xml description.
-// Only the parts we care about are present and thus the xml may have more
-// fields than present in the structure.
-type serviceList struct {
- XMLName xml.Name `xml:"serviceList"`
- Service []service `xml:"service"`
-}
-
-// device represents the device type in an UPnP xml description.
-// Only the parts we care about are present and thus the xml may have more
-// fields than present in the structure.
-type device struct {
- XMLName xml.Name `xml:"device"`
- DeviceType string `xml:"deviceType"`
- DeviceList deviceList `xml:"deviceList"`
- ServiceList serviceList `xml:"serviceList"`
-}
-
-// specVersion represents the specVersion in a UPnP xml description.
-// Only the parts we care about are present and thus the xml may have more
-// fields than present in the structure.
-type specVersion struct {
- XMLName xml.Name `xml:"specVersion"`
- Major int `xml:"major"`
- Minor int `xml:"minor"`
-}
-
-// root represents the Root document for a UPnP xml description.
-// Only the parts we care about are present and thus the xml may have more
-// fields than present in the structure.
-type root struct {
- XMLName xml.Name `xml:"root"`
- SpecVersion specVersion
- Device device
-}
-
-func getChildDevice(d *device, deviceType string) *device {
- dl := d.DeviceList.Device
- for i := 0; i < len(dl); i++ {
- if dl[i].DeviceType == deviceType {
- return &dl[i]
- }
- }
- return nil
-}
-
-func getChildService(d *device, serviceType string) *service {
- sl := d.ServiceList.Service
- for i := 0; i < len(sl); i++ {
- if sl[i].ServiceType == serviceType {
- return &sl[i]
- }
- }
- return nil
-}
-
-func getOurIP() (ip string, err error) {
- hostname, err := os.Hostname()
- if err != nil {
- return
- }
- p, err := net.LookupIP(hostname)
- if err != nil && len(p) > 0 {
- return
- }
- return p[0].String(), nil
-}
-
-func getServiceURL(rootURL string) (url string, err error) {
- r, err := http.Get(rootURL)
- if err != nil {
- return
- }
- defer r.Body.Close()
- if r.StatusCode >= 400 {
- err = errors.New(string(r.StatusCode))
- return
- }
- var root root
- err = xml.NewDecoder(r.Body).Decode(&root)
-
- if err != nil {
- return
- }
- a := &root.Device
- if a.DeviceType != "urn:schemas-upnp-org:device:InternetGatewayDevice:1" {
- err = errors.New("No InternetGatewayDevice")
- return
- }
- b := getChildDevice(a, "urn:schemas-upnp-org:device:WANDevice:1")
- if b == nil {
- err = errors.New("No WANDevice")
- return
- }
- c := getChildDevice(b, "urn:schemas-upnp-org:device:WANConnectionDevice:1")
- if c == nil {
- err = errors.New("No WANConnectionDevice")
- return
- }
- d := getChildService(c, "urn:schemas-upnp-org:service:WANIPConnection:1")
- if d == nil {
- err = errors.New("No WANIPConnection")
- return
- }
- url = combineURL(rootURL, d.ControlURL)
- return
-}
-
-func combineURL(rootURL, subURL string) string {
- protocolEnd := "://"
- protoEndIndex := strings.Index(rootURL, protocolEnd)
- a := rootURL[protoEndIndex+len(protocolEnd):]
- rootIndex := strings.Index(a, "/")
- return rootURL[0:protoEndIndex+len(protocolEnd)+rootIndex] + subURL
-}
-
-func soapRequest(url, function, message string) (r *http.Response, err error) {
- fullMessage := "<?xml version=\"1.0\" ?>" +
- "<s:Envelope xmlns:s=\"http://schemas.xmlsoap.org/soap/envelope/\" s:encodingStyle=\"http://schemas.xmlsoap.org/soap/encoding/\">\r\n" +
- "<s:Body>" + message + "</s:Body></s:Envelope>"
-
- req, err := http.NewRequest("POST", url, strings.NewReader(fullMessage))
- if err != nil {
- return nil, err
- }
- req.Header.Set("Content-Type", "text/xml ; charset=\"utf-8\"")
- req.Header.Set("User-Agent", "Darwin/10.0.0, UPnP/1.0, MiniUPnPc/1.3")
- //req.Header.Set("Transfer-Encoding", "chunked")
- req.Header.Set("SOAPAction", "\"urn:schemas-upnp-org:service:WANIPConnection:1#"+function+"\"")
- req.Header.Set("Connection", "Close")
- req.Header.Set("Cache-Control", "no-cache")
- req.Header.Set("Pragma", "no-cache")
-
- // log.Stderr("soapRequest ", req)
- //fmt.Println(fullMessage)
-
- r, err = http.DefaultClient.Do(req)
- if err != nil {
- return
- }
-
- if r.Body != nil {
- defer r.Body.Close()
- }
-
- if r.StatusCode >= 400 {
- // log.Stderr(function, r.StatusCode)
- err = errors.New("Error " + strconv.Itoa(r.StatusCode) + " for " + function)
- r = nil
- return
- }
- return
-}
-
-type statusInfo struct {
- externalIpAddress string
-}
-
-func (n *upnpNAT) getStatusInfo() (info statusInfo, err error) {
-
- message := "<u:GetStatusInfo xmlns:u=\"urn:schemas-upnp-org:service:WANIPConnection:1\">\r\n" +
- "</u:GetStatusInfo>"
-
- var response *http.Response
- response, err = soapRequest(n.serviceURL, "GetStatusInfo", message)
- if err != nil {
- return
- }
-
- // TODO: Write a soap reply parser. It has to eat the Body and envelope tags...
-
- response.Body.Close()
- return
-}
-
-func (n *upnpNAT) GetExternalAddress() (addr net.IP, err error) {
- info, err := n.getStatusInfo()
- if err != nil {
- return
- }
- addr = net.ParseIP(info.externalIpAddress)
- return
-}
-
-func (n *upnpNAT) AddPortMapping(protocol string, externalPort, internalPort int, description string, timeout int) (mappedExternalPort int, err error) {
- // A single concatenation would break ARM compilation.
- message := "<u:AddPortMapping xmlns:u=\"urn:schemas-upnp-org:service:WANIPConnection:1\">\r\n" +
- "<NewRemoteHost></NewRemoteHost><NewExternalPort>" + strconv.Itoa(externalPort)
- message += "</NewExternalPort><NewProtocol>" + protocol + "</NewProtocol>"
- message += "<NewInternalPort>" + strconv.Itoa(internalPort) + "</NewInternalPort>" +
- "<NewInternalClient>" + n.ourIP + "</NewInternalClient>" +
- "<NewEnabled>1</NewEnabled><NewPortMappingDescription>"
- message += description +
- "</NewPortMappingDescription><NewLeaseDuration>" + strconv.Itoa(timeout) +
- "</NewLeaseDuration></u:AddPortMapping>"
-
- var response *http.Response
- response, err = soapRequest(n.serviceURL, "AddPortMapping", message)
- if err != nil {
- return
- }
-
- // TODO: check response to see if the port was forwarded
- // log.Println(message, response)
- mappedExternalPort = externalPort
- _ = response
- return
-}
-
-func (n *upnpNAT) DeletePortMapping(protocol string, externalPort, internalPort int) (err error) {
-
- message := "<u:DeletePortMapping xmlns:u=\"urn:schemas-upnp-org:service:WANIPConnection:1\">\r\n" +
- "<NewRemoteHost></NewRemoteHost><NewExternalPort>" + strconv.Itoa(externalPort) +
- "</NewExternalPort><NewProtocol>" + protocol + "</NewProtocol>" +
- "</u:DeletePortMapping>"
-
- var response *http.Response
- response, err = soapRequest(n.serviceURL, "DeletePortMapping", message)
- if err != nil {
- return
- }
-
- // TODO: check response to see if the port was deleted
- // log.Println(message, response)
- _ = response
- return
-}
diff --git a/p2p/server.go b/p2p/server.go
index 8a6087566..326781234 100644
--- a/p2p/server.go
+++ b/p2p/server.go
@@ -246,12 +246,7 @@ func (srv *Server) Stop() {
func (srv *Server) discLoop() {
for peer := range srv.peerDisconnect {
- // peer has just disconnected. free up its slot.
- srvlog.Infof("%v is gone", peer)
- srv.peerSlots <- peer.slot
- srv.lock.Lock()
- srv.peers[peer.slot] = nil
- srv.lock.Unlock()
+ srv.removePeer(peer)
}
}
@@ -384,7 +379,7 @@ func (srv *Server) addPeer(conn net.Conn, desc *peerAddr, slot int) *Peer {
func (srv *Server) removePeer(peer *Peer) {
srv.lock.Lock()
defer srv.lock.Unlock()
- srvlog.Debugf("Removing peer %v %v (slot %v)\n", peer, peer.slot)
+ srvlog.Debugf("Removing %v (slot %v)\n", peer, peer.slot)
if srv.peers[peer.slot] != peer {
srvlog.Warnln("Invalid peer to remove:", peer)
return
@@ -416,6 +411,7 @@ func (srv *Server) verifyPeer(addr *peerAddr) error {
return nil
}
+// TODO replace with "Set"
type Blacklist interface {
Get([]byte) (bool, error)
Put([]byte) error
diff --git a/peer.go b/peer.go
deleted file mode 100644
index 13f0239d4..000000000
--- a/peer.go
+++ /dev/null
@@ -1,881 +0,0 @@
-package eth
-
-import (
- "bytes"
- "container/list"
- "fmt"
- "math"
- "math/big"
- "net"
- "strconv"
- "strings"
- "sync/atomic"
- "time"
-
- "github.com/ethereum/go-ethereum/core/types"
- "github.com/ethereum/go-ethereum/ethutil"
- "github.com/ethereum/go-ethereum/logger"
- "github.com/ethereum/go-ethereum/wire"
-)
-
-var peerlogger = logger.NewLogger("PEER")
-
-const (
- // The size of the output buffer for writing messages
- outputBufferSize = 50
- // Current protocol version
- ProtocolVersion = 49
- // Current P2P version
- P2PVersion = 2
- // Ethereum network version
- NetVersion = 0
- // Interval for ping/pong message
- pingPongTimer = 2 * time.Second
-)
-
-type DiscReason byte
-
-const (
- // Values are given explicitly instead of by iota because these values are
- // defined by the wire protocol spec; it is easier for humans to ensure
- // correctness when values are explicit.
- DiscRequested DiscReason = iota
- DiscReTcpSysErr
- DiscBadProto
- DiscBadPeer
- DiscTooManyPeers
- DiscConnDup
- DiscGenesisErr
- DiscProtoErr
- DiscQuitting
-)
-
-var discReasonToString = []string{
- "requested",
- "TCP sys error",
- "bad protocol",
- "useless peer",
- "too many peers",
- "already connected",
- "wrong genesis block",
- "incompatible network",
- "quitting",
-}
-
-func (d DiscReason) String() string {
- if len(discReasonToString) < int(d) {
- return "Unknown"
- }
-
- return discReasonToString[d]
-}
-
-// Peer capabilities
-type Caps byte
-
-const (
- CapPeerDiscTy Caps = 1 << iota
- CapTxTy
- CapChainTy
-
- CapDefault = CapChainTy | CapTxTy | CapPeerDiscTy
-)
-
-var capsToString = map[Caps]string{
- CapPeerDiscTy: "Peer discovery",
- CapTxTy: "Transaction relaying",
- CapChainTy: "Block chain relaying",
-}
-
-func (c Caps) IsCap(cap Caps) bool {
- return c&cap > 0
-}
-
-func (c Caps) String() string {
- var caps []string
- if c.IsCap(CapPeerDiscTy) {
- caps = append(caps, capsToString[CapPeerDiscTy])
- }
- if c.IsCap(CapChainTy) {
- caps = append(caps, capsToString[CapChainTy])
- }
- if c.IsCap(CapTxTy) {
- caps = append(caps, capsToString[CapTxTy])
- }
-
- return strings.Join(caps, " | ")
-}
-
-type Peer struct {
- // Ethereum interface
- ethereum *Ethereum
- // Net connection
- conn net.Conn
- // Output queue which is used to communicate and handle messages
- outputQueue chan *wire.Msg
- // Quit channel
- quit chan bool
- // Determines whether it's an inbound or outbound peer
- inbound bool
- // Flag for checking the peer's connectivity state
- connected int32
- disconnect int32
- // Last known message send
- lastSend time.Time
- // Indicated whether a verack has been send or not
- // This flag is used by writeMessage to check if messages are allowed
- // to be send or not. If no version is known all messages are ignored.
- versionKnown bool
- statusKnown bool
-
- // Last received pong message
- lastPong int64
- lastBlockReceived time.Time
- doneFetchingHashes bool
- lastHashAt time.Time
- lastHashRequestedAt time.Time
-
- host []byte
- port uint16
- caps Caps
- td *big.Int
- bestHash []byte
- lastReceivedHash []byte
- requestedHashes [][]byte
-
- // This peer's public key
- pubkey []byte
-
- // Indicated whether the node is catching up or not
- catchingUp bool
- diverted bool
- blocksRequested int
-
- version string
-
- // We use this to give some kind of pingtime to a node, not very accurate, could be improved.
- pingTime time.Duration
- pingStartTime time.Time
-
- lastRequestedBlock *types.Block
-
- protocolCaps *ethutil.Value
-}
-
-func NewPeer(conn net.Conn, ethereum *Ethereum, inbound bool) *Peer {
- pubkey := ethereum.KeyManager().PublicKey()[1:]
-
- return &Peer{
- outputQueue: make(chan *wire.Msg, outputBufferSize),
- quit: make(chan bool),
- ethereum: ethereum,
- conn: conn,
- inbound: inbound,
- disconnect: 0,
- connected: 1,
- port: 30303,
- pubkey: pubkey,
- blocksRequested: 10,
- caps: ethereum.ServerCaps(),
- version: ethereum.ClientIdentity().String(),
- protocolCaps: ethutil.NewValue(nil),
- td: big.NewInt(0),
- doneFetchingHashes: true,
- }
-}
-
-func NewOutboundPeer(addr string, ethereum *Ethereum, caps Caps) *Peer {
- p := &Peer{
- outputQueue: make(chan *wire.Msg, outputBufferSize),
- quit: make(chan bool),
- ethereum: ethereum,
- inbound: false,
- connected: 0,
- disconnect: 0,
- port: 30303,
- caps: caps,
- version: ethereum.ClientIdentity().String(),
- protocolCaps: ethutil.NewValue(nil),
- td: big.NewInt(0),
- doneFetchingHashes: true,
- }
-
- // Set up the connection in another goroutine so we don't block the main thread
- go func() {
- conn, err := p.Connect(addr)
- if err != nil {
- //peerlogger.Debugln("Connection to peer failed. Giving up.", err)
- p.Stop()
- return
- }
- p.conn = conn
-
- // Atomically set the connection state
- atomic.StoreInt32(&p.connected, 1)
- atomic.StoreInt32(&p.disconnect, 0)
-
- p.Start()
- }()
-
- return p
-}
-
-func (self *Peer) Connect(addr string) (conn net.Conn, err error) {
- const maxTries = 3
- for attempts := 0; attempts < maxTries; attempts++ {
- conn, err = net.DialTimeout("tcp", addr, 10*time.Second)
- if err != nil {
- time.Sleep(time.Duration(attempts*20) * time.Second)
- continue
- }
-
- // Success
- return
- }
-
- return
-}
-
-// Getters
-func (p *Peer) PingTime() string {
- return p.pingTime.String()
-}
-func (p *Peer) Inbound() bool {
- return p.inbound
-}
-func (p *Peer) LastSend() time.Time {
- return p.lastSend
-}
-func (p *Peer) LastPong() int64 {
- return p.lastPong
-}
-func (p *Peer) Host() []byte {
- return p.host
-}
-func (p *Peer) Port() uint16 {
- return p.port
-}
-func (p *Peer) Version() string {
- return p.version
-}
-func (p *Peer) Connected() *int32 {
- return &p.connected
-}
-
-// Setters
-func (p *Peer) SetVersion(version string) {
- p.version = version
-}
-
-// Outputs any RLP encoded data to the peer
-func (p *Peer) QueueMessage(msg *wire.Msg) {
- if atomic.LoadInt32(&p.connected) != 1 {
- return
- }
- p.outputQueue <- msg
-}
-
-func (p *Peer) writeMessage(msg *wire.Msg) {
- // Ignore the write if we're not connected
- if atomic.LoadInt32(&p.connected) != 1 {
- return
- }
-
- if !p.versionKnown {
- switch msg.Type {
- case wire.MsgHandshakeTy: // Ok
- default: // Anything but ack is allowed
- return
- }
- } else {
- /*
- if !p.statusKnown {
- switch msg.Type {
- case wire.MsgStatusTy: // Ok
- default: // Anything but ack is allowed
- return
- }
- }
- */
- }
-
- peerlogger.DebugDetailf("(%v) <= %v\n", p.conn.RemoteAddr(), formatMessage(msg))
-
- err := wire.WriteMessage(p.conn, msg)
- if err != nil {
- peerlogger.Debugln(" Can't send message:", err)
- // Stop the client if there was an error writing to it
- p.Stop()
- return
- }
-}
-
-// Outbound message handler. Outbound messages are handled here
-func (p *Peer) HandleOutbound() {
- // The ping timer. Makes sure that every 2 minutes a ping is send to the peer
- pingTimer := time.NewTicker(pingPongTimer)
- serviceTimer := time.NewTicker(10 * time.Second)
-
-out:
- for {
- skip:
- select {
- // Main message queue. All outbound messages are processed through here
- case msg := <-p.outputQueue:
- if !p.statusKnown {
- switch msg.Type {
- case wire.MsgTxTy, wire.MsgGetBlockHashesTy, wire.MsgBlockHashesTy, wire.MsgGetBlocksTy, wire.MsgBlockTy:
- break skip
- }
- }
-
- switch msg.Type {
- case wire.MsgGetBlockHashesTy:
- p.lastHashRequestedAt = time.Now()
- }
-
- p.writeMessage(msg)
- p.lastSend = time.Now()
-
- // Ping timer
- case <-pingTimer.C:
- p.writeMessage(wire.NewMessage(wire.MsgPingTy, ""))
- p.pingStartTime = time.Now()
-
- // Service timer takes care of peer broadcasting, transaction
- // posting or block posting
- case <-serviceTimer.C:
- p.QueueMessage(wire.NewMessage(wire.MsgGetPeersTy, ""))
-
- case <-p.quit:
- // Break out of the for loop if a quit message is posted
- break out
- }
- }
-
-clean:
- // This loop is for draining the output queue and anybody waiting for us
- for {
- select {
- case <-p.outputQueue:
- // TODO
- default:
- break clean
- }
- }
-}
-
-func formatMessage(msg *wire.Msg) (ret string) {
- ret = fmt.Sprintf("%v %v", msg.Type, msg.Data)
-
- /*
- XXX Commented out because I need the log level here to determine
- if i should or shouldn't generate this message
- */
- /*
- switch msg.Type {
- case wire.MsgPeersTy:
- ret += fmt.Sprintf("(%d entries)", msg.Data.Len())
- case wire.MsgBlockTy:
- b1, b2 := chain.NewBlockFromRlpValue(msg.Data.Get(0)), ethchain.NewBlockFromRlpValue(msg.Data.Get(msg.Data.Len()-1))
- ret += fmt.Sprintf("(%d entries) %x - %x", msg.Data.Len(), b1.Hash()[0:4], b2.Hash()[0:4])
- case wire.MsgBlockHashesTy:
- h1, h2 := msg.Data.Get(0).Bytes(), msg.Data.Get(msg.Data.Len()-1).Bytes()
- ret += fmt.Sprintf("(%d entries) %x - %x", msg.Data.Len(), h1, h2)
- }
- */
-
- return
-}
-
-// Inbound handler. Inbound messages are received here and passed to the appropriate methods
-func (p *Peer) HandleInbound() {
- for atomic.LoadInt32(&p.disconnect) == 0 {
-
- // HMM?
- time.Sleep(50 * time.Millisecond)
- // Wait for a message from the peer
- msgs, err := wire.ReadMessages(p.conn)
- if err != nil {
- peerlogger.Debugln(err)
- }
- for _, msg := range msgs {
- peerlogger.DebugDetailf("(%v) => %v\n", p.conn.RemoteAddr(), formatMessage(msg))
-
- switch msg.Type {
- case wire.MsgHandshakeTy:
- // Version message
- p.handleHandshake(msg)
-
- //if p.caps.IsCap(CapPeerDiscTy) {
- p.QueueMessage(wire.NewMessage(wire.MsgGetPeersTy, ""))
- //}
-
- case wire.MsgDiscTy:
- p.Stop()
- peerlogger.Infoln("Disconnect peer: ", DiscReason(msg.Data.Get(0).Uint()))
- case wire.MsgPingTy:
- // Respond back with pong
- p.QueueMessage(wire.NewMessage(wire.MsgPongTy, ""))
- case wire.MsgPongTy:
- // If we received a pong back from a peer we set the
- // last pong so the peer handler knows this peer is still
- // active.
- p.lastPong = time.Now().Unix()
- p.pingTime = time.Since(p.pingStartTime)
- case wire.MsgTxTy:
- // If the message was a transaction queue the transaction
- // in the TxPool where it will undergo validation and
- // processing when a new block is found
- for i := 0; i < msg.Data.Len(); i++ {
- tx := types.NewTransactionFromValue(msg.Data.Get(i))
- err := p.ethereum.TxPool().Add(tx)
- if err != nil {
- peerlogger.Infoln(err)
- } else {
- peerlogger.Infof("tx OK (%x)\n", tx.Hash()[0:4])
- }
- }
- case wire.MsgGetPeersTy:
- // Peer asked for list of connected peers
- //p.pushPeers()
- case wire.MsgPeersTy:
- // Received a list of peers (probably because MsgGetPeersTy was send)
- data := msg.Data
- // Create new list of possible peers for the ethereum to process
- peers := make([]string, data.Len())
- // Parse each possible peer
- for i := 0; i < data.Len(); i++ {
- value := data.Get(i)
- peers[i] = unpackAddr(value.Get(0), value.Get(1).Uint())
- }
-
- // Connect to the list of peers
- p.ethereum.ProcessPeerList(peers)
-
- case wire.MsgStatusTy:
- // Handle peer's status msg
- p.handleStatus(msg)
- }
-
- // TMP
- if p.statusKnown {
- switch msg.Type {
-
- case wire.MsgGetBlockHashesTy:
- if msg.Data.Len() < 2 {
- peerlogger.Debugln("err: argument length invalid ", msg.Data.Len())
- }
-
- hash := msg.Data.Get(0).Bytes()
- amount := msg.Data.Get(1).Uint()
-
- hashes := p.ethereum.ChainManager().GetChainHashesFromHash(hash, amount)
-
- p.QueueMessage(wire.NewMessage(wire.MsgBlockHashesTy, ethutil.ByteSliceToInterface(hashes)))
-
- case wire.MsgGetBlocksTy:
- // Limit to max 300 blocks
- max := int(math.Min(float64(msg.Data.Len()), 300.0))
- var blocks []interface{}
-
- for i := 0; i < max; i++ {
- hash := msg.Data.Get(i).Bytes()
- block := p.ethereum.ChainManager().GetBlock(hash)
- if block != nil {
- blocks = append(blocks, block.Value().Raw())
- }
- }
-
- p.QueueMessage(wire.NewMessage(wire.MsgBlockTy, blocks))
-
- case wire.MsgBlockHashesTy:
- p.catchingUp = true
-
- blockPool := p.ethereum.blockPool
-
- foundCommonHash := false
- p.lastHashAt = time.Now()
-
- it := msg.Data.NewIterator()
- for it.Next() {
- hash := it.Value().Bytes()
- p.lastReceivedHash = hash
-
- if blockPool.HasCommonHash(hash) {
- foundCommonHash = true
-
- break
- }
-
- blockPool.AddHash(hash, p)
- }
-
- if !foundCommonHash {
- p.FetchHashes()
- } else {
- peerlogger.Infof("Found common hash (%x...)\n", p.lastReceivedHash[0:4])
- p.doneFetchingHashes = true
- }
-
- case wire.MsgBlockTy:
- p.catchingUp = true
-
- blockPool := p.ethereum.blockPool
-
- it := msg.Data.NewIterator()
- for it.Next() {
- block := types.NewBlockFromRlpValue(it.Value())
- blockPool.Add(block, p)
-
- p.lastBlockReceived = time.Now()
- }
- case wire.MsgNewBlockTy:
- var (
- blockPool = p.ethereum.blockPool
- block = types.NewBlockFromRlpValue(msg.Data.Get(0))
- td = msg.Data.Get(1).BigInt()
- )
-
- if td.Cmp(blockPool.td) > 0 {
- p.ethereum.blockPool.AddNew(block, p)
- }
- }
-
- }
- }
- }
-
- p.Stop()
-}
-
-func (self *Peer) FetchBlocks(hashes [][]byte) {
- if len(hashes) > 0 {
- peerlogger.Debugf("Fetching blocks (%d)\n", len(hashes))
-
- self.QueueMessage(wire.NewMessage(wire.MsgGetBlocksTy, ethutil.ByteSliceToInterface(hashes)))
- }
-}
-
-func (self *Peer) FetchHashes() bool {
- blockPool := self.ethereum.blockPool
-
- return blockPool.FetchHashes(self)
-}
-
-func (self *Peer) FetchingHashes() bool {
- return !self.doneFetchingHashes
-}
-
-// General update method
-func (self *Peer) update() {
- serviceTimer := time.NewTicker(100 * time.Millisecond)
-
-out:
- for {
- select {
- case <-serviceTimer.C:
- if self.IsCap("eth") {
- var (
- sinceBlock = time.Since(self.lastBlockReceived)
- )
-
- if sinceBlock > 5*time.Second {
- self.catchingUp = false
- }
- }
- case <-self.quit:
- break out
- }
- }
-
- serviceTimer.Stop()
-}
-
-func (p *Peer) Start() {
- peerHost, peerPort, _ := net.SplitHostPort(p.conn.LocalAddr().String())
- servHost, servPort, _ := net.SplitHostPort(p.conn.RemoteAddr().String())
-
- if p.inbound {
- p.host, p.port = packAddr(peerHost, peerPort)
- } else {
- p.host, p.port = packAddr(servHost, servPort)
- }
-
- err := p.pushHandshake()
- if err != nil {
- peerlogger.Debugln("Peer can't send outbound version ack", err)
-
- p.Stop()
-
- return
- }
-
- go p.HandleOutbound()
- // Run the inbound handler in a new goroutine
- go p.HandleInbound()
- // Run the general update handler
- go p.update()
-
- // Wait a few seconds for startup and then ask for an initial ping
- time.Sleep(2 * time.Second)
- p.writeMessage(wire.NewMessage(wire.MsgPingTy, ""))
- p.pingStartTime = time.Now()
-
-}
-
-func (p *Peer) Stop() {
- p.StopWithReason(DiscRequested)
-}
-
-func (p *Peer) StopWithReason(reason DiscReason) {
- if atomic.AddInt32(&p.disconnect, 1) != 1 {
- return
- }
-
- // Pre-emptively remove the peer; don't wait for reaping. We already know it's dead if we are here
- p.ethereum.RemovePeer(p)
-
- close(p.quit)
- if atomic.LoadInt32(&p.connected) != 0 {
- p.writeMessage(wire.NewMessage(wire.MsgDiscTy, reason))
- p.conn.Close()
- }
-}
-
-func (p *Peer) peersMessage() *wire.Msg {
- outPeers := make([]interface{}, len(p.ethereum.InOutPeers()))
- // Serialise each peer
- for i, peer := range p.ethereum.InOutPeers() {
- // Don't return localhost as valid peer
- if !net.ParseIP(peer.conn.RemoteAddr().String()).IsLoopback() {
- outPeers[i] = peer.RlpData()
- }
- }
-
- // Return the message to the peer with the known list of connected clients
- return wire.NewMessage(wire.MsgPeersTy, outPeers)
-}
-
-// Pushes the list of outbound peers to the client when requested
-func (p *Peer) pushPeers() {
- p.QueueMessage(p.peersMessage())
-}
-
-func (self *Peer) pushStatus() {
- msg := wire.NewMessage(wire.MsgStatusTy, []interface{}{
- uint32(ProtocolVersion),
- uint32(NetVersion),
- self.ethereum.ChainManager().Td(),
- self.ethereum.ChainManager().CurrentBlock().Hash(),
- self.ethereum.ChainManager().Genesis().Hash(),
- })
-
- self.QueueMessage(msg)
-}
-
-func (self *Peer) handleStatus(msg *wire.Msg) {
- c := msg.Data
-
- var (
- //protoVersion = c.Get(0).Uint()
- netVersion = c.Get(1).Uint()
- td = c.Get(2).BigInt()
- bestHash = c.Get(3).Bytes()
- genesis = c.Get(4).Bytes()
- )
-
- if bytes.Compare(self.ethereum.ChainManager().Genesis().Hash(), genesis) != 0 {
- loggerger.Warnf("Invalid genisis hash %x. Disabling [eth]\n", genesis)
- return
- }
-
- if netVersion != NetVersion {
- loggerger.Warnf("Invalid network version %d. Disabling [eth]\n", netVersion)
- return
- }
-
- /*
- if protoVersion != ProtocolVersion {
- loggerger.Warnf("Invalid protocol version %d. Disabling [eth]\n", protoVersion)
- return
- }
- */
-
- // Get the td and last hash
- self.td = td
- self.bestHash = bestHash
- self.lastReceivedHash = bestHash
-
- self.statusKnown = true
-
- // Compare the total TD with the blockchain TD. If remote is higher
- // fetch hashes from highest TD node.
- self.FetchHashes()
-
- loggerger.Infof("Peer is [eth] capable. (TD = %v ~ %x)", self.td, self.bestHash)
-
-}
-
-func (p *Peer) pushHandshake() error {
- pubkey := p.ethereum.KeyManager().PublicKey()
- msg := wire.NewMessage(wire.MsgHandshakeTy, []interface{}{
- P2PVersion, []byte(p.version), []interface{}{[]interface{}{"eth", ProtocolVersion}}, p.port, pubkey[1:],
- })
-
- p.QueueMessage(msg)
-
- return nil
-}
-
-func (p *Peer) handleHandshake(msg *wire.Msg) {
- c := msg.Data
-
- var (
- p2pVersion = c.Get(0).Uint()
- clientId = c.Get(1).Str()
- caps = c.Get(2)
- port = c.Get(3).Uint()
- pub = c.Get(4).Bytes()
- )
-
- // Check correctness of p2p protocol version
- if p2pVersion != P2PVersion {
- peerlogger.Debugf("Invalid P2P version. Require protocol %d, received %d\n", P2PVersion, p2pVersion)
- p.Stop()
- return
- }
-
- // Handle the pub key (validation, uniqueness)
- if len(pub) == 0 {
- peerlogger.Warnln("Pubkey required, not supplied in handshake.")
- p.Stop()
- return
- }
-
- // Self connect detection
- pubkey := p.ethereum.KeyManager().PublicKey()
- if bytes.Compare(pubkey[1:], pub) == 0 {
- p.Stop()
-
- return
- }
-
- // Check for blacklisting
- for _, pk := range p.ethereum.blacklist {
- if bytes.Compare(pk, pub) == 0 {
- peerlogger.Debugf("Blacklisted peer tried to connect (%x...)\n", pubkey[0:4])
- p.StopWithReason(DiscBadPeer)
-
- return
- }
- }
-
- usedPub := 0
- // This peer is already added to the peerlist so we expect to find a double pubkey at least once
- eachPeer(p.ethereum.Peers(), func(peer *Peer, e *list.Element) {
- if bytes.Compare(pub, peer.pubkey) == 0 {
- usedPub++
- }
- })
-
- if usedPub > 0 {
- peerlogger.Debugf("Pubkey %x found more then once. Already connected to client.", p.pubkey)
- p.Stop()
- return
- }
- p.pubkey = pub
-
- // If this is an inbound connection send an ack back
- if p.inbound {
- p.port = uint16(port)
- }
-
- p.SetVersion(clientId)
-
- p.versionKnown = true
-
- p.ethereum.PushPeer(p)
- p.ethereum.eventMux.Post(PeerListEvent{p.ethereum.Peers()})
-
- p.protocolCaps = caps
-
- it := caps.NewIterator()
- var capsStrs []string
- for it.Next() {
- cap := it.Value().Get(0).Str()
- ver := it.Value().Get(1).Uint()
- switch cap {
- case "eth":
- if ver != ProtocolVersion {
- loggerger.Warnf("Invalid protocol version %d. Disabling [eth]\n", ver)
- continue
- }
- p.pushStatus()
- }
-
- capsStrs = append(capsStrs, fmt.Sprintf("%s/%d", cap, ver))
- }
-
- peerlogger.Infof("Added peer (%s) %d / %d (%v)\n", p.conn.RemoteAddr(), p.ethereum.Peers().Len(), p.ethereum.MaxPeers, capsStrs)
-
- peerlogger.Debugln(p)
-}
-
-func (self *Peer) IsCap(cap string) bool {
- capsIt := self.protocolCaps.NewIterator()
- for capsIt.Next() {
- if capsIt.Value().Str() == cap {
- return true
- }
- }
-
- return false
-}
-
-func (self *Peer) Caps() *ethutil.Value {
- return self.protocolCaps
-}
-
-func (p *Peer) String() string {
- var strBoundType string
- if p.inbound {
- strBoundType = "inbound"
- } else {
- strBoundType = "outbound"
- }
- var strConnectType string
- if atomic.LoadInt32(&p.disconnect) == 0 {
- strConnectType = "connected"
- } else {
- strConnectType = "disconnected"
- }
-
- return fmt.Sprintf("[%s] (%s) %v %s", strConnectType, strBoundType, p.conn.RemoteAddr(), p.version)
-
-}
-
-func (p *Peer) RlpData() []interface{} {
- return []interface{}{p.host, p.port, p.pubkey}
-}
-
-func packAddr(address, _port string) (host []byte, port uint16) {
- p, _ := strconv.Atoi(_port)
- port = uint16(p)
-
- h := net.ParseIP(address)
- if ip := h.To4(); ip != nil {
- host = []byte(ip)
- } else {
- host = []byte(h)
- }
-
- return
-}
-
-func unpackAddr(value *ethutil.Value, p uint64) string {
- host, _ := net.IP(value.Bytes()).MarshalText()
- prt := strconv.Itoa(int(p))
-
- return net.JoinHostPort(string(host), prt)
-}
diff --git a/pow/ezp/pow.go b/pow/ezp/pow.go
index bfe3ea098..f669f8aa4 100644
--- a/pow/ezp/pow.go
+++ b/pow/ezp/pow.go
@@ -59,7 +59,7 @@ func (pow *EasyPow) Search(block pow.Block, stop <-chan struct{}) []byte {
}
sha := crypto.Sha3(big.NewInt(r.Int63()).Bytes())
- if pow.verify(hash, diff, sha) {
+ if verify(hash, diff, sha) {
return sha
}
}
@@ -72,7 +72,11 @@ func (pow *EasyPow) Search(block pow.Block, stop <-chan struct{}) []byte {
return nil
}
-func (pow *EasyPow) verify(hash []byte, diff *big.Int, nonce []byte) bool {
+func (pow *EasyPow) Verify(block pow.Block) bool {
+ return Verify(block)
+}
+
+func verify(hash []byte, diff *big.Int, nonce []byte) bool {
sha := sha3.NewKeccak256()
d := append(hash, nonce...)
@@ -84,6 +88,6 @@ func (pow *EasyPow) verify(hash []byte, diff *big.Int, nonce []byte) bool {
return res.Cmp(verification) <= 0
}
-func (pow *EasyPow) Verify(block pow.Block) bool {
- return pow.verify(block.HashNoNonce(), block.Diff(), block.N())
+func Verify(block pow.Block) bool {
+ return verify(block.HashNoNonce(), block.Diff(), block.N())
}
diff --git a/ui/qt/qwhisper/whisper.go b/ui/qt/qwhisper/whisper.go
index bed23c8a7..8f05c0695 100644
--- a/ui/qt/qwhisper/whisper.go
+++ b/ui/qt/qwhisper/whisper.go
@@ -1,11 +1,14 @@
package qwhisper
import (
+ "fmt"
"time"
+ "unsafe"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethutil"
"github.com/ethereum/go-ethereum/whisper"
+ "gopkg.in/qml.v1"
)
func fromHex(s string) []byte {
@@ -16,27 +19,44 @@ func fromHex(s string) []byte {
}
func toHex(b []byte) string { return "0x" + ethutil.Bytes2Hex(b) }
+type Watch struct {
+}
+
+func (self *Watch) Arrived(v unsafe.Pointer) {
+ fmt.Println(v)
+}
+
type Whisper struct {
*whisper.Whisper
+ view qml.Object
+
+ watches map[int]*Watch
}
func New(w *whisper.Whisper) *Whisper {
- return &Whisper{w}
+ return &Whisper{w, nil, make(map[int]*Watch)}
}
-func (self *Whisper) Post(data string, pow, ttl uint32, to, from string) {
+func (self *Whisper) SetView(view qml.Object) {
+ self.view = view
+}
+
+func (self *Whisper) Post(data string, to, from string, topics []string, pow, ttl uint32) {
msg := whisper.NewMessage(fromHex(data))
envelope, err := msg.Seal(time.Duration(pow), whisper.Opts{
- Ttl: time.Duration(ttl),
- To: crypto.ToECDSAPub(fromHex(to)),
- From: crypto.ToECDSA(fromHex(from)),
+ Ttl: time.Duration(ttl),
+ To: crypto.ToECDSAPub(fromHex(to)),
+ From: crypto.ToECDSA(fromHex(from)),
+ Topics: whisper.TopicsFromString(topics...),
})
if err != nil {
+ fmt.Println(err)
// handle error
return
}
if err := self.Whisper.Send(envelope); err != nil {
+ fmt.Println(err)
// handle error
return
}
@@ -46,16 +66,19 @@ func (self *Whisper) NewIdentity() string {
return toHex(self.Whisper.NewIdentity().D.Bytes())
}
-func (self *Whisper) HasIdentify(key string) bool {
+func (self *Whisper) HasIdentity(key string) bool {
return self.Whisper.HasIdentity(crypto.ToECDSA(fromHex(key)))
}
-func (self *Whisper) Watch(opts map[string]interface{}) {
+func (self *Whisper) Watch(opts map[string]interface{}) *Watch {
filter := filterFromMap(opts)
filter.Fn = func(msg *whisper.Message) {
- // TODO POST TO QT WINDOW
+ fmt.Println(msg)
}
- self.Whisper.Watch(filter)
+ i := self.Whisper.Watch(filter)
+ self.watches[i] = &Watch{}
+
+ return self.watches[i]
}
func filterFromMap(opts map[string]interface{}) (f whisper.Filter) {
@@ -65,6 +88,11 @@ func filterFromMap(opts map[string]interface{}) (f whisper.Filter) {
if from, ok := opts["from"].(string); ok {
f.From = crypto.ToECDSAPub(fromHex(from))
}
+ if topicList, ok := opts["topics"].(*qml.List); ok {
+ var topics []string
+ topicList.Convert(&topics)
+ f.Topics = whisper.TopicsFromString(topics...)
+ }
return
}
diff --git a/ui/qt/qwhisper/whisper_test.go b/ui/qt/qwhisper/whisper_test.go
new file mode 100644
index 000000000..efa4e6238
--- /dev/null
+++ b/ui/qt/qwhisper/whisper_test.go
@@ -0,0 +1,15 @@
+package qwhisper
+
+import (
+ "testing"
+
+ "github.com/ethereum/go-ethereum/whisper"
+)
+
+func TestHasIdentity(t *testing.T) {
+ qw := New(whisper.New())
+ id := qw.NewIdentity()
+ if !qw.HasIdentity(id) {
+ t.Error("expected to have identity")
+ }
+}
diff --git a/whisper/doc.go b/whisper/doc.go
new file mode 100644
index 000000000..986df8fb9
--- /dev/null
+++ b/whisper/doc.go
@@ -0,0 +1,16 @@
+/*
+Package whisper implements the Whisper PoC-1.
+
+(https://github.com/ethereum/wiki/wiki/Whisper-PoC-1-Protocol-Spec)
+
+Whisper combines aspects of both DHTs and datagram messaging systems (e.g. UDP).
+As such it may be likened and compared to both, not dissimilar to the
+matter/energy duality (apologies to physicists for the blatant abuse of a
+fundamental and beautiful natural principle).
+
+Whisper is a pure identity-based messaging system. Whisper provides a low-level
+(non-application-specific) but easily-accessible API without being based upon
+or prejudiced by the low-level hardware attributes and characteristics,
+particularly the notion of singular endpoints.
+*/
+package whisper
diff --git a/whisper/envelope.go b/whisper/envelope.go
index 683e88128..066e20f6a 100644
--- a/whisper/envelope.go
+++ b/whisper/envelope.go
@@ -11,6 +11,7 @@ import (
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethutil"
"github.com/ethereum/go-ethereum/rlp"
+ "github.com/obscuren/ecies"
)
const (
@@ -73,10 +74,15 @@ func (self *Envelope) Open(prv *ecdsa.PrivateKey) (msg *Message, err error) {
message.Flags = data[0]
message.Signature = data[1:66]
}
- message.Payload = data[dataStart:]
+
+ payload := data[dataStart:]
if prv != nil {
- message.Payload, err = crypto.Decrypt(prv, message.Payload)
- if err != nil {
+ message.Payload, err = crypto.Decrypt(prv, payload)
+ switch err {
+ case ecies.ErrInvalidPublicKey: // Payload isn't encrypted
+ message.Payload = payload
+ return &message, err
+ default:
return nil, fmt.Errorf("unable to open envelope. Decrypt failed: %v", err)
}
}
diff --git a/whisper/main.go b/whisper/main.go
index 2ee2f3ff1..edd5f7004 100644
--- a/whisper/main.go
+++ b/whisper/main.go
@@ -5,10 +5,8 @@ package main
import (
"fmt"
"log"
- "net"
"os"
- "github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/whisper"
@@ -20,12 +18,12 @@ func main() {
pub, _ := secp256k1.GenerateKeyPair()
- whisper := whisper.New(&event.TypeMux{})
+ whisper := whisper.New()
srv := p2p.Server{
MaxPeers: 10,
Identity: p2p.NewSimpleClientIdentity("whisper-go", "1.0", "", string(pub)),
- ListenAddr: ":30303",
+ ListenAddr: ":30300",
NAT: p2p.UPNP(),
Protocols: []p2p.Protocol{whisper.Protocol()},
@@ -35,13 +33,5 @@ func main() {
os.Exit(1)
}
- // add seed peers
- seed, err := net.ResolveTCPAddr("tcp", "poc-7.ethdev.com:30300")
- if err != nil {
- fmt.Println("couldn't resolve:", err)
- os.Exit(1)
- }
- srv.SuggestPeer(seed.IP, seed.Port, nil)
-
select {}
}
diff --git a/whisper/util.go b/whisper/util.go
index abef1d667..7a222395f 100644
--- a/whisper/util.go
+++ b/whisper/util.go
@@ -18,10 +18,19 @@ func Topics(data [][]byte) [][]byte {
return d
}
-func TopicsFromString(data []string) [][]byte {
+func TopicsFromString(data ...string) [][]byte {
d := make([][]byte, len(data))
for i, str := range data {
d[i] = hashTopic([]byte(str))
}
return d
}
+
+func bytesToMap(s [][]byte) map[string]struct{} {
+ m := make(map[string]struct{})
+ for _, topic := range s {
+ m[string(topic)] = struct{}{}
+ }
+
+ return m
+}
diff --git a/whisper/whisper.go b/whisper/whisper.go
index 356debd1c..9721ca9f9 100644
--- a/whisper/whisper.go
+++ b/whisper/whisper.go
@@ -4,13 +4,14 @@ import (
"bytes"
"crypto/ecdsa"
"errors"
- "fmt"
"sync"
"time"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/event/filter"
+ "github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/p2p"
+ "github.com/obscuren/ecies"
"gopkg.in/fatih/set.v0"
)
@@ -47,6 +48,8 @@ type MessageEvent struct {
const DefaultTtl = 50 * time.Second
+var wlogger = logger.NewLogger("SHH")
+
type Whisper struct {
protocol p2p.Protocol
filters *filter.Filters
@@ -68,17 +71,6 @@ func New() *Whisper {
quit: make(chan struct{}),
}
whisper.filters.Start()
- go whisper.update()
-
- // XXX TODO REMOVE TESTING CODE
- msg := NewMessage([]byte(fmt.Sprintf("Hello world. This is whisper-go. Incase you're wondering; the time is %v", time.Now())))
- envelope, _ := msg.Seal(DefaultPow, Opts{
- Ttl: DefaultTtl,
- })
- if err := whisper.Send(envelope); err != nil {
- fmt.Println(err)
- }
- // XXX TODO REMOVE TESTING CODE
// p2p whisper sub protocol handler
whisper.protocol = p2p.Protocol{
@@ -91,6 +83,11 @@ func New() *Whisper {
return whisper
}
+func (self *Whisper) Start() {
+ wlogger.Infoln("Whisper started")
+ go self.update()
+}
+
func (self *Whisper) Stop() {
close(self.quit)
}
@@ -122,6 +119,7 @@ func (self *Whisper) Watch(opts Filter) int {
return self.filters.Install(filter.Generic{
Str1: string(crypto.FromECDSA(opts.To)),
Str2: string(crypto.FromECDSAPub(opts.From)),
+ Data: bytesToMap(opts.Topics),
Fn: func(data interface{}) {
opts.Fn(data.(*Message))
},
@@ -230,13 +228,14 @@ func (self *Whisper) envelopes() (envelopes []*Envelope) {
func (self *Whisper) postEvent(envelope *Envelope) {
for _, key := range self.keys {
- if message, err := envelope.Open(key); err == nil {
+ if message, err := envelope.Open(key); err == nil || (err != nil && err == ecies.ErrInvalidPublicKey) {
// Create a custom filter?
self.filters.Notify(filter.Generic{
Str1: string(crypto.FromECDSA(key)), Str2: string(crypto.FromECDSAPub(message.Recover())),
+ Data: bytesToMap(envelope.Topics),
}, message)
} else {
- fmt.Println(err)
+ wlogger.Infoln(err)
}
}
}
diff --git a/wire/.gitignore b/wire/.gitignore
deleted file mode 100644
index f725d58d1..000000000
--- a/wire/.gitignore
+++ /dev/null
@@ -1,12 +0,0 @@
-# See http://help.github.com/ignore-files/ for more about ignoring files.
-#
-# If you find yourself ignoring temporary files generated by your text editor
-# or operating system, you probably want to add a global ignore instead:
-# git config --global core.excludesfile ~/.gitignore_global
-
-/tmp
-*/**/*un~
-*un~
-.DS_Store
-*/**/.DS_Store
-
diff --git a/wire/README.md b/wire/README.md
deleted file mode 100644
index 7f63688b3..000000000
--- a/wire/README.md
+++ /dev/null
@@ -1,36 +0,0 @@
-# ethwire
-
-The ethwire package contains the ethereum wire protocol. The ethwire
-package is required to write and read from the ethereum network.
-
-# Installation
-
-`go get github.com/ethereum/ethwire-go`
-
-# Messaging overview
-
-The Ethereum Wire protocol defines the communication between the nodes
-running Ethereum. Further reader reading can be done on the
-[Wiki](http://wiki.ethereum.org/index.php/Wire_Protocol).
-
-# Reading Messages
-
-```go
-// Read and validate the next eth message from the provided connection.
-// returns a error message with the details.
-msg, err := ethwire.ReadMessage(conn)
-if err != nil {
- // Handle error
-}
-```
-
-# Writing Messages
-
-```go
-// Constructs a message which can be interpreted by the eth network.
-// Write the inventory to network
-err := ethwire.WriteMessage(conn, &Msg{
- Type: ethwire.MsgInvTy,
- Data : []interface{}{...},
-})
-```
diff --git a/wire/client_identity.go b/wire/client_identity.go
deleted file mode 100644
index 0a268024a..000000000
--- a/wire/client_identity.go
+++ /dev/null
@@ -1,56 +0,0 @@
-package wire
-
-import (
- "fmt"
- "runtime"
-)
-
-// should be used in Peer handleHandshake, incorporate Caps, ProtocolVersion, Pubkey etc.
-type ClientIdentity interface {
- String() string
-}
-
-type SimpleClientIdentity struct {
- clientIdentifier string
- version string
- customIdentifier string
- os string
- implementation string
-}
-
-func NewSimpleClientIdentity(clientIdentifier string, version string, customIdentifier string) *SimpleClientIdentity {
- clientIdentity := &SimpleClientIdentity{
- clientIdentifier: clientIdentifier,
- version: version,
- customIdentifier: customIdentifier,
- os: runtime.GOOS,
- implementation: runtime.Version(),
- }
-
- return clientIdentity
-}
-
-func (c *SimpleClientIdentity) init() {
-}
-
-func (c *SimpleClientIdentity) String() string {
- var id string
- if len(c.customIdentifier) > 0 {
- id = "/" + c.customIdentifier
- }
-
- return fmt.Sprintf("%s/v%s%s/%s/%s",
- c.clientIdentifier,
- c.version,
- id,
- c.os,
- c.implementation)
-}
-
-func (c *SimpleClientIdentity) SetCustomIdentifier(customIdentifier string) {
- c.customIdentifier = customIdentifier
-}
-
-func (c *SimpleClientIdentity) GetCustomIdentifier() string {
- return c.customIdentifier
-}
diff --git a/wire/client_identity_test.go b/wire/client_identity_test.go
deleted file mode 100644
index c0e7a0159..000000000
--- a/wire/client_identity_test.go
+++ /dev/null
@@ -1,30 +0,0 @@
-package wire
-
-import (
- "fmt"
- "runtime"
- "testing"
-)
-
-func TestClientIdentity(t *testing.T) {
- clientIdentity := NewSimpleClientIdentity("Ethereum(G)", "0.5.16", "test")
- clientString := clientIdentity.String()
- expected := fmt.Sprintf("Ethereum(G)/v0.5.16/test/%s/%s", runtime.GOOS, runtime.Version())
- if clientString != expected {
- t.Errorf("Expected clientIdentity to be %q, got %q", expected, clientString)
- }
- customIdentifier := clientIdentity.GetCustomIdentifier()
- if customIdentifier != "test" {
- t.Errorf("Expected clientIdentity.GetCustomIdentifier() to be 'test', got %q", customIdentifier)
- }
- clientIdentity.SetCustomIdentifier("test2")
- customIdentifier = clientIdentity.GetCustomIdentifier()
- if customIdentifier != "test2" {
- t.Errorf("Expected clientIdentity.GetCustomIdentifier() to be 'test2', got %q", customIdentifier)
- }
- clientString = clientIdentity.String()
- expected = fmt.Sprintf("Ethereum(G)/v0.5.16/test2/%s/%s", runtime.GOOS, runtime.Version())
- if clientString != expected {
- t.Errorf("Expected clientIdentity to be %q, got %q", expected, clientString)
- }
-}
diff --git a/wire/messages2.go b/wire/messages2.go
deleted file mode 100644
index acbd9e0d5..000000000
--- a/wire/messages2.go
+++ /dev/null
@@ -1,199 +0,0 @@
-package wire
-
-import (
- "bytes"
- "errors"
- "fmt"
- "net"
- "time"
-
- "github.com/ethereum/go-ethereum/ethutil"
-)
-
-// The connection object allows you to set up a connection to the Ethereum network.
-// The Connection object takes care of all encoding and sending objects properly over
-// the network.
-type Connection struct {
- conn net.Conn
- nTimeout time.Duration
- pendingMessages Messages
-}
-
-// Create a new connection to the Ethereum network
-func New(conn net.Conn) *Connection {
- return &Connection{conn: conn, nTimeout: 500}
-}
-
-// Read, reads from the network. It will block until the next message is received.
-func (self *Connection) Read() *Msg {
- if len(self.pendingMessages) == 0 {
- self.readMessages()
- }
-
- ret := self.pendingMessages[0]
- self.pendingMessages = self.pendingMessages[1:]
-
- return ret
-
-}
-
-// Write to the Ethereum network specifying the type of the message and
-// the data. Data can be of type RlpEncodable or []interface{}. Returns
-// nil or if something went wrong an error.
-func (self *Connection) Write(typ MsgType, v ...interface{}) error {
- var pack []byte
-
- slice := [][]interface{}{[]interface{}{byte(typ)}}
- for _, value := range v {
- if encodable, ok := value.(ethutil.RlpEncodeDecode); ok {
- slice = append(slice, encodable.RlpValue())
- } else if raw, ok := value.([]interface{}); ok {
- slice = append(slice, raw)
- } else {
- panic(fmt.Sprintf("Unable to 'write' object of type %T", value))
- }
- }
-
- // Encode the type and the (RLP encoded) data for sending over the wire
- encoded := ethutil.NewValue(slice).Encode()
- payloadLength := ethutil.NumberToBytes(uint32(len(encoded)), 32)
-
- // Write magic token and payload length (first 8 bytes)
- pack = append(MagicToken, payloadLength...)
- pack = append(pack, encoded...)
-
- // Write to the connection
- _, err := self.conn.Write(pack)
- if err != nil {
- return err
- }
-
- return nil
-}
-
-func (self *Connection) readMessage(data []byte) (msg *Msg, remaining []byte, done bool, err error) {
- if len(data) == 0 {
- return nil, nil, true, nil
- }
-
- if len(data) <= 8 {
- return nil, remaining, false, errors.New("Invalid message")
- }
-
- // Check if the received 4 first bytes are the magic token
- if bytes.Compare(MagicToken, data[:4]) != 0 {
- return nil, nil, false, fmt.Errorf("MagicToken mismatch. Received %v", data[:4])
- }
-
- messageLength := ethutil.BytesToNumber(data[4:8])
- remaining = data[8+messageLength:]
- if int(messageLength) > len(data[8:]) {
- return nil, nil, false, fmt.Errorf("message length %d, expected %d", len(data[8:]), messageLength)
- }
-
- message := data[8 : 8+messageLength]
- decoder := ethutil.NewValueFromBytes(message)
- // Type of message
- t := decoder.Get(0).Uint()
- // Actual data
- d := decoder.SliceFrom(1)
-
- msg = &Msg{
- Type: MsgType(t),
- Data: d,
- }
-
- return
-}
-
-// The basic message reader waits for data on the given connection, decoding
-// and doing a few sanity checks such as if there's a data type and
-// unmarhals the given data
-func (self *Connection) readMessages() (err error) {
- // The recovering function in case anything goes horribly wrong
- defer func() {
- if r := recover(); r != nil {
- err = fmt.Errorf("wire.ReadMessage error: %v", r)
- }
- }()
-
- // Buff for writing network message to
- //buff := make([]byte, 1440)
- var buff []byte
- var totalBytes int
- for {
- // Give buffering some time
- self.conn.SetReadDeadline(time.Now().Add(self.nTimeout * time.Millisecond))
- // Create a new temporarily buffer
- b := make([]byte, 1440)
- // Wait for a message from this peer
- n, _ := self.conn.Read(b)
- if err != nil && n == 0 {
- if err.Error() != "EOF" {
- fmt.Println("err now", err)
- return err
- } else {
- break
- }
-
- // Messages can't be empty
- } else if n == 0 {
- break
- }
-
- buff = append(buff, b[:n]...)
- totalBytes += n
- }
-
- // Reslice buffer
- buff = buff[:totalBytes]
- msg, remaining, done, err := self.readMessage(buff)
- for ; done != true; msg, remaining, done, err = self.readMessage(remaining) {
- //log.Println("rx", msg)
-
- if msg != nil {
- self.pendingMessages = append(self.pendingMessages, msg)
- }
- }
-
- return
-}
-
-func ReadMessage(data []byte) (msg *Msg, remaining []byte, done bool, err error) {
- if len(data) == 0 {
- return nil, nil, true, nil
- }
-
- if len(data) <= 8 {
- return nil, remaining, false, errors.New("Invalid message")
- }
-
- // Check if the received 4 first bytes are the magic token
- if bytes.Compare(MagicToken, data[:4]) != 0 {
- return nil, nil, false, fmt.Errorf("MagicToken mismatch. Received %v", data[:4])
- }
-
- messageLength := ethutil.BytesToNumber(data[4:8])
- remaining = data[8+messageLength:]
- if int(messageLength) > len(data[8:]) {
- return nil, nil, false, fmt.Errorf("message length %d, expected %d", len(data[8:]), messageLength)
- }
-
- message := data[8 : 8+messageLength]
- decoder := ethutil.NewValueFromBytes(message)
- // Type of message
- t := decoder.Get(0).Uint()
- // Actual data
- d := decoder.SliceFrom(1)
-
- msg = &Msg{
- Type: MsgType(t),
- Data: d,
- }
-
- return
-}
-
-func bufferedRead(conn net.Conn) ([]byte, error) {
- return nil, nil
-}
diff --git a/wire/messaging.go b/wire/messaging.go
deleted file mode 100644
index 9c6cb5944..000000000
--- a/wire/messaging.go
+++ /dev/null
@@ -1,178 +0,0 @@
-// Package wire provides low level access to the Ethereum network and allows
-// you to broadcast data over the network.
-package wire
-
-import (
- "bytes"
- "fmt"
- "net"
- "time"
-
- "github.com/ethereum/go-ethereum/ethutil"
-)
-
-// Connection interface describing the methods required to implement the wire protocol.
-type Conn interface {
- Write(typ MsgType, v ...interface{}) error
- Read() *Msg
-}
-
-// The magic token which should be the first 4 bytes of every message and can be used as separator between messages.
-var MagicToken = []byte{34, 64, 8, 145}
-
-type MsgType byte
-
-const (
- // Values are given explicitly instead of by iota because these values are
- // defined by the wire protocol spec; it is easier for humans to ensure
- // correctness when values are explicit.
- MsgHandshakeTy = 0x00
- MsgDiscTy = 0x01
- MsgPingTy = 0x02
- MsgPongTy = 0x03
- MsgGetPeersTy = 0x04
- MsgPeersTy = 0x05
-
- MsgStatusTy = 0x10
- MsgTxTy = 0x12
- MsgGetBlockHashesTy = 0x13
- MsgBlockHashesTy = 0x14
- MsgGetBlocksTy = 0x15
- MsgBlockTy = 0x16
- MsgNewBlockTy = 0x17
-)
-
-var msgTypeToString = map[MsgType]string{
- MsgHandshakeTy: "Handshake",
- MsgDiscTy: "Disconnect",
- MsgPingTy: "Ping",
- MsgPongTy: "Pong",
- MsgGetPeersTy: "Get peers",
- MsgStatusTy: "Status",
- MsgPeersTy: "Peers",
- MsgTxTy: "Transactions",
- MsgBlockTy: "Blocks",
- //MsgGetTxsTy: "Get Txs",
- MsgGetBlockHashesTy: "Get block hashes",
- MsgBlockHashesTy: "Block hashes",
- MsgGetBlocksTy: "Get blocks",
-}
-
-func (mt MsgType) String() string {
- return msgTypeToString[mt]
-}
-
-type Msg struct {
- Type MsgType // Specifies how the encoded data should be interpreted
- //Data []byte
- Data *ethutil.Value
-}
-
-func NewMessage(msgType MsgType, data interface{}) *Msg {
- return &Msg{
- Type: msgType,
- Data: ethutil.NewValue(data),
- }
-}
-
-type Messages []*Msg
-
-// The basic message reader waits for data on the given connection, decoding
-// and doing a few sanity checks such as if there's a data type and
-// unmarhals the given data
-func ReadMessages(conn net.Conn) (msgs []*Msg, err error) {
- // The recovering function in case anything goes horribly wrong
- defer func() {
- if r := recover(); r != nil {
- err = fmt.Errorf("wire.ReadMessage error: %v", r)
- }
- }()
-
- var (
- buff []byte
- messages [][]byte
- msgLength int
- )
-
- for {
- // Give buffering some time
- conn.SetReadDeadline(time.Now().Add(5 * time.Millisecond))
- // Create a new temporarily buffer
- b := make([]byte, 1440)
- n, _ := conn.Read(b)
- if err != nil && n == 0 {
- if err.Error() != "EOF" {
- fmt.Println("err now", err)
- return nil, err
- } else {
- break
- }
- }
-
- if n == 0 && len(buff) == 0 {
- // If there's nothing on the wire wait for a bit
- time.Sleep(200 * time.Millisecond)
-
- continue
- }
-
- buff = append(buff, b[:n]...)
- if msgLength == 0 {
- // Check if the received 4 first bytes are the magic token
- if bytes.Compare(MagicToken, buff[:4]) != 0 {
- return nil, fmt.Errorf("MagicToken mismatch. Received %v", buff[:4])
- }
-
- // Read the length of the message
- msgLength = int(ethutil.BytesToNumber(buff[4:8]))
-
- // Remove the token and length
- buff = buff[8:]
- }
-
- if len(buff) >= msgLength {
- messages = append(messages, buff[:msgLength])
- buff = buff[msgLength:]
- msgLength = 0
-
- if len(buff) == 0 {
- break
- }
- }
- }
-
- for _, m := range messages {
- decoder := ethutil.NewValueFromBytes(m)
- // Type of message
- t := decoder.Get(0).Uint()
- // Actual data
- d := decoder.SliceFrom(1)
-
- msgs = append(msgs, &Msg{Type: MsgType(t), Data: d})
- }
-
- return
-}
-
-// The basic message writer takes care of writing data over the given
-// connection and does some basic error checking
-func WriteMessage(conn net.Conn, msg *Msg) error {
- var pack []byte
-
- // Encode the type and the (RLP encoded) data for sending over the wire
- encoded := ethutil.NewValue(append([]interface{}{byte(msg.Type)}, msg.Data.Slice()...)).Encode()
- payloadLength := ethutil.NumberToBytes(uint32(len(encoded)), 32)
-
- // Write magic token and payload length (first 8 bytes)
- pack = append(MagicToken, payloadLength...)
- pack = append(pack, encoded...)
- //fmt.Printf("payload %v (%v) %q\n", msg.Type, conn.RemoteAddr(), encoded)
-
- // Write to the connection
- _, err := conn.Write(pack)
- if err != nil {
- return err
- }
-
- return nil
-}
diff --git a/xeth/hexface.go b/xeth/hexface.go
index bfd2dddd9..6c084f947 100644
--- a/xeth/hexface.go
+++ b/xeth/hexface.go
@@ -3,7 +3,6 @@ package xeth
import (
"bytes"
"encoding/json"
- "sync/atomic"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
@@ -63,12 +62,8 @@ func (self *JSXEth) PeerCount() int {
func (self *JSXEth) Peers() []JSPeer {
var peers []JSPeer
- for peer := self.obj.Peers().Front(); peer != nil; peer = peer.Next() {
- p := peer.Value.(core.Peer)
- // we only want connected peers
- if atomic.LoadInt32(p.Connected()) != 0 {
- peers = append(peers, *NewJSPeer(p))
- }
+ for _, peer := range self.obj.Peers() {
+ peers = append(peers, *NewJSPeer(peer))
}
return peers
diff --git a/xeth/js_types.go b/xeth/js_types.go
index 62867d6a9..987edce37 100644
--- a/xeth/js_types.go
+++ b/xeth/js_types.go
@@ -1,14 +1,13 @@
package xeth
import (
- "fmt"
- "strconv"
"strings"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethutil"
+ "github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/state"
)
@@ -155,38 +154,36 @@ func NewPReciept(contractCreation bool, creationAddress, hash, address []byte) *
// Peer interface exposed to QML
type JSPeer struct {
- ref *core.Peer
- Inbound bool `json:"isInbound"`
- LastSend int64 `json:"lastSend"`
- LastPong int64 `json:"lastPong"`
- Ip string `json:"ip"`
- Port int `json:"port"`
- Version string `json:"version"`
- LastResponse string `json:"lastResponse"`
- Latency string `json:"latency"`
- Caps string `json:"caps"`
-}
-
-func NewJSPeer(peer core.Peer) *JSPeer {
- if peer == nil {
- return nil
- }
-
- var ip []string
- for _, i := range peer.Host() {
- ip = append(ip, strconv.Itoa(int(i)))
- }
- ipAddress := strings.Join(ip, ".")
-
- var caps []string
- capsIt := peer.Caps().NewIterator()
- for capsIt.Next() {
- cap := capsIt.Value().Get(0).Str()
- ver := capsIt.Value().Get(1).Uint()
- caps = append(caps, fmt.Sprintf("%s/%d", cap, ver))
- }
-
- return &JSPeer{ref: &peer, Inbound: peer.Inbound(), LastSend: peer.LastSend().Unix(), LastPong: peer.LastPong(), Version: peer.Version(), Ip: ipAddress, Port: int(peer.Port()), Latency: peer.PingTime(), Caps: "[" + strings.Join(caps, ", ") + "]"}
+ ref *p2p.Peer
+ // Inbound bool `json:"isInbound"`
+ // LastSend int64 `json:"lastSend"`
+ // LastPong int64 `json:"lastPong"`
+ // Ip string `json:"ip"`
+ // Port int `json:"port"`
+ // Version string `json:"version"`
+ // LastResponse string `json:"lastResponse"`
+ // Latency string `json:"latency"`
+ // Caps string `json:"caps"`
+}
+
+func NewJSPeer(peer *p2p.Peer) *JSPeer {
+
+ // var ip []string
+ // for _, i := range peer.Host() {
+ // ip = append(ip, strconv.Itoa(int(i)))
+ // }
+ // ipAddress := strings.Join(ip, ".")
+
+ // var caps []string
+ // capsIt := peer.Caps().NewIterator()
+ // for capsIt.Next() {
+ // cap := capsIt.Value().Get(0).Str()
+ // ver := capsIt.Value().Get(1).Uint()
+ // caps = append(caps, fmt.Sprintf("%s/%d", cap, ver))
+ // }
+
+ return &JSPeer{ref: peer}
+ // return &JSPeer{ref: &peer, Inbound: peer.Inbound(), LastSend: peer.LastSend().Unix(), LastPong: peer.LastPong(), Version: peer.Version(), Ip: ipAddress, Port: int(peer.Port()), Latency: peer.PingTime(), Caps: "[" + strings.Join(caps, ", ") + "]"}
}
type JSReceipt struct {
diff --git a/xeth/world.go b/xeth/world.go
index 956ef1e15..008a08423 100644
--- a/xeth/world.go
+++ b/xeth/world.go
@@ -1,8 +1,7 @@
package xeth
import (
- "container/list"
-
+ "github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/state"
)
@@ -55,7 +54,7 @@ func (self *World) IsListening() bool {
return self.pipe.obj.IsListening()
}
-func (self *World) Peers() *list.List {
+func (self *World) Peers() []*p2p.Peer {
return self.pipe.obj.Peers()
}