aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--blockpool/blockpool.go189
-rw-r--r--blockpool/errors_test.go2
-rw-r--r--blockpool/peers.go93
-rw-r--r--blockpool/peers_test.go11
-rw-r--r--cmd/ethtest/main.go26
-rw-r--r--cmd/geth/main.go10
-rw-r--r--core/execution.go5
-rw-r--r--p2p/discover/udp.go15
-rw-r--r--p2p/handshake.go68
-rw-r--r--p2p/handshake_test.go4
-rw-r--r--p2p/peer.go132
-rw-r--r--p2p/peer_error.go10
-rw-r--r--p2p/peer_test.go74
-rw-r--r--p2p/server.go176
-rw-r--r--p2p/server_test.go58
-rw-r--r--ui/qt/qwhisper/whisper.go4
-rw-r--r--whisper/envelope.go139
-rw-r--r--whisper/main.go77
-rw-r--r--whisper/message.go121
-rw-r--r--whisper/message_test.go138
-rw-r--r--whisper/messages_test.go50
-rw-r--r--whisper/whisper.go5
-rw-r--r--whisper/whisper_test.go4
-rw-r--r--xeth/whisper.go4
24 files changed, 901 insertions, 514 deletions
diff --git a/blockpool/blockpool.go b/blockpool/blockpool.go
index f442259e0..3ed2e92c7 100644
--- a/blockpool/blockpool.go
+++ b/blockpool/blockpool.go
@@ -169,6 +169,9 @@ type BlockPool struct {
// alloc-easy pool of hash slices
hashSlicePool chan []common.Hash
+ nodeCache map[common.Hash]*node
+ nodeCacheLock sync.RWMutex
+
// waitgroup is used in tests to wait for result-critical routines
// as well as in determining idle / syncing status
wg sync.WaitGroup //
@@ -210,6 +213,7 @@ func (self *BlockPool) Start() {
self.Config.init()
self.hashSlicePool = make(chan []common.Hash, 150)
+ self.nodeCache = make(map[common.Hash]*node)
self.status = newStatus()
self.quit = make(chan bool)
self.pool = make(map[common.Hash]*entry)
@@ -615,127 +619,104 @@ LOOP:
If the block received is the head block of the current best peer, signal it to the head section process
*/
func (self *BlockPool) AddBlock(block *types.Block, peerId string) {
- hash := block.Hash()
-
- sender, _ := self.peers.getPeer(peerId)
- if sender == nil {
- return
- }
self.status.lock.Lock()
self.status.activePeers[peerId]++
self.status.lock.Unlock()
- entry := self.get(hash)
- blockIsCurrentHead := false
- sender.lock.RLock()
- currentBlockHash := sender.currentBlockHash
- currentBlock := sender.currentBlock
- currentBlockC := sender.currentBlockC
- switchC := sender.switchC
- sender.lock.RUnlock()
-
- // a peer's current head block is appearing the first time
- if hash == currentBlockHash {
- // this happens when block came in a newblock message but
- // also if sent in a blockmsg (for instance, if we requested, only if we
- // dont apply on blockrequests the restriction of flood control)
- blockIsCurrentHead = true
- if currentBlock == nil {
- sender.lock.Lock()
- sender.setChainInfoFromBlock(block)
- sender.lock.Unlock()
-
- self.status.lock.Lock()
- self.status.values.BlockHashes++
- self.status.values.Blocks++
- self.status.values.BlocksInPool++
- self.status.lock.Unlock()
- // signal to head section process
- select {
- case currentBlockC <- block:
- case <-switchC:
- }
- } else {
- plog.DebugDetailf("AddBlock: head block %s for peer <%s> (head: %s) already known", hex(hash), peerId, hex(currentBlockHash))
- }
- } else {
-
- plog.DebugDetailf("AddBlock: block %s received from peer <%s> (head: %s)", hex(hash), peerId, hex(currentBlockHash))
-
- /* @zelig !!!
- requested 5 hashes from both A & B. A responds sooner then B, process blocks. Close section.
- delayed B sends you block ... UNREQUESTED. Blocked
- if entry == nil {
- plog.DebugDetailf("AddBlock: unrequested block %s received from peer <%s> (head: %s)", hex(hash), peerId, hex(sender.currentBlockHash))
- sender.addError(ErrUnrequestedBlock, "%x", hash)
-
- self.status.lock.Lock()
- self.status.badPeers[peerId]++
- self.status.lock.Unlock()
- return
- }
- */
- }
+ hash := block.Hash()
- if entry == nil {
- // FIXME: here check the cache find or create node -
- // put peer as blockBy!
+ // check if block is already inserted in the blockchain
+ if self.hasBlock(hash) {
return
}
- node := entry.node
- node.lock.Lock()
- defer node.lock.Unlock()
-
- // register peer on node as source
- if node.peers == nil {
- node.peers = make(map[string]bool)
- }
- FoundBlockCurrentHead, found := node.peers[sender.id]
- if !found || FoundBlockCurrentHead {
- // if found but not FoundBlockCurrentHead, then no update
- // necessary (||)
- node.peers[sender.id] = blockIsCurrentHead
- // for those that are false, TD will update their head
- // for those that are true, TD is checked !
- // this is checked at the time of TD calculation in checkTD
- }
- // check if block already received
- if node.block != nil {
- plog.DebugDetailf("AddBlock: block %s from peer <%s> (head: %s) already sent by <%s> ", hex(hash), peerId, hex(sender.currentBlockHash), node.blockBy)
- }
-
- // check if block is already inserted in the blockchain
- if self.hasBlock(hash) {
- plog.DebugDetailf("AddBlock: block %s from peer <%s> (head: %s) already in the blockchain", hex(hash), peerId, hex(sender.currentBlockHash))
+ sender, _ := self.peers.getPeer(peerId)
+ if sender == nil {
return
}
+ tdFromCurrentHead, currentBlockHash := sender.setChainInfoFromBlock(block)
- /*
- @zelig needs discussing
- Viktor: pow check can be delayed in a go routine and therefore cache
- creation is not blocking
- // validate block for PoW
- if !self.verifyPoW(block) {
- plog.Warnf("AddBlock: invalid PoW on block %s from peer <%s> (head: %s)", hex(hash), peerId, hex(sender.currentBlockHash))
- sender.addError(ErrInvalidPoW, "%x", hash)
+ entry := self.get(hash)
- self.status.lock.Lock()
- self.status.badPeers[peerId]++
- self.status.lock.Unlock()
+ /* @zelig !!!
+ requested 5 hashes from both A & B. A responds sooner then B, process blocks. Close section.
+ delayed B sends you block ... UNREQUESTED. Blocked
+ if entry == nil {
+ plog.DebugDetailf("AddBlock: unrequested block %s received from peer <%s> (head: %s)", hex(hash), peerId, hex(sender.currentBlockHash))
+ sender.addError(ErrUnrequestedBlock, "%x", hash)
+
+ self.status.lock.Lock()
+ self.status.badPeers[peerId]++
+ self.status.lock.Unlock()
+ return
+ }
+ */
- return
+ var bnode *node
+ if entry == nil {
+ self.nodeCacheLock.Lock()
+ bnode, _ = self.nodeCache[hash]
+ if bnode == nil {
+ bnode = &node{
+ hash: currentBlockHash,
+ block: block,
+ hashBy: peerId,
+ blockBy: peerId,
+ td: tdFromCurrentHead,
}
- */
+ self.nodeCache[hash] = bnode
+ }
+ self.nodeCacheLock.Unlock()
+ } else {
+ bnode = entry.node
+ }
- node.block = block
- node.blockBy = peerId
+ bnode.lock.Lock()
+ defer bnode.lock.Unlock()
- self.status.lock.Lock()
- self.status.values.Blocks++
- self.status.values.BlocksInPool++
- self.status.lock.Unlock()
+ // check if block already received
+ if bnode.block != nil {
+ plog.DebugDetailf("AddBlock: block %s from peer <%s> (head: %s) already sent by <%s> ", hex(hash), peerId, hex(sender.currentBlockHash), bnode.blockBy)
+ // register peer on node as source
+ if bnode.peers == nil {
+ bnode.peers = make(map[string]bool)
+ }
+ foundBlockCurrentHead, found := bnode.peers[sender.id]
+ if !found || foundBlockCurrentHead {
+ // if found but not FoundBlockCurrentHead, then no update
+ // necessary (||)
+ bnode.peers[sender.id] = (currentBlockHash == hash)
+ // for those that are false, TD will update their head
+ // for those that are true, TD is checked !
+ // this is checked at the time of TD calculation in checkTD
+ }
+ sender.setChainInfoFromNode(bnode)
+ } else {
+ /*
+ @zelig needs discussing
+ Viktor: pow check can be delayed in a go routine and therefore cache
+ creation is not blocking
+ // validate block for PoW
+ if !self.verifyPoW(block) {
+ plog.Warnf("AddBlock: invalid PoW on block %s from peer <%s> (head: %s)", hex(hash), peerId, hex(sender.currentBlockHash))
+ sender.addError(ErrInvalidPoW, "%x", hash)
+
+ self.status.lock.Lock()
+ self.status.badPeers[peerId]++
+ self.status.lock.Unlock()
+
+ return
+ }
+ */
+ bnode.block = block
+ bnode.blockBy = peerId
+ bnode.td = tdFromCurrentHead
+ self.status.lock.Lock()
+ self.status.values.Blocks++
+ self.status.values.BlocksInPool++
+ self.status.lock.Unlock()
+ }
}
diff --git a/blockpool/errors_test.go b/blockpool/errors_test.go
index c9bf79a23..0fbf94d7d 100644
--- a/blockpool/errors_test.go
+++ b/blockpool/errors_test.go
@@ -128,7 +128,7 @@ func TestErrInsufficientChainInfo(t *testing.T) {
}
func TestIncorrectTD(t *testing.T) {
- t.Skip() // td not tested atm
+ t.Skip("skipping TD check until network is healthy")
test.LogInit()
_, blockPool, blockPoolTester := newTestBlockPool(t)
blockPoolTester.blockChain[0] = nil
diff --git a/blockpool/peers.go b/blockpool/peers.go
index e5d6a16d6..c6cade460 100644
--- a/blockpool/peers.go
+++ b/blockpool/peers.go
@@ -18,6 +18,7 @@ type peer struct {
// last known blockchain status
td *big.Int
+ tdAdvertised bool
currentBlockHash common.Hash
currentBlock *types.Block
parentHash common.Hash
@@ -88,10 +89,12 @@ func (self *peers) newPeer(
peerError: peerError,
currentBlockC: make(chan *types.Block),
headSectionC: make(chan *section),
+ switchC: make(chan bool),
bp: self.bp,
idle: true,
addToBlacklist: self.addToBlacklist,
}
+ close(p.switchC) //! hack :((((
// at creation the peer is recorded in the peer pool
self.peers[id] = p
return
@@ -135,21 +138,58 @@ func (self *peer) addError(code int, format string, params ...interface{}) {
}
// caller must hold peer lock
-func (self *peer) setChainInfo(td *big.Int, c common.Hash) {
- self.td = td
- self.currentBlockHash = c
- self.currentBlock = nil
- self.parentHash = common.Hash{}
- self.headSection = nil
+func (self *peer) setChainInfo(td *big.Int, currentBlockHash common.Hash) {
+ self.lock.Lock()
+ defer self.lock.Unlock()
+ if self.currentBlockHash != currentBlockHash {
+ previousBlockHash := self.currentBlockHash
+ plog.Debugf("addPeer: Update peer <%s> with td %v and current block %s (was %v)", self.id, td, hex(currentBlockHash), hex(previousBlockHash))
+ self.td = td
+ self.currentBlockHash = currentBlockHash
+ self.currentBlock = nil
+ self.parentHash = common.Hash{}
+ self.headSection = nil
+ }
+ self.tdAdvertised = true
}
-// caller must hold peer lock
-func (self *peer) setChainInfoFromBlock(block *types.Block) {
- // use the optional TD to update peer td, this helps second best peer selection
+func (self *peer) setChainInfoFromBlock(block *types.Block) (td *big.Int, currentBlockHash common.Hash) {
+ self.lock.Lock()
+ currentBlockC := self.currentBlockC
+ switchC := self.switchC
+ hash := block.Hash()
+ // this happens when block came in a newblock message but
+ // also if sent in a blockmsg (for instance, if we requested, only if we
+ // dont apply on blockrequests the restriction of flood control)
+ currentBlockHash = self.currentBlockHash
+ if currentBlockHash == hash && self.currentBlock == nil {
+ // signal to head section process
+ plog.DebugDetailf("AddBlock: head block %s for peer <%s> (head: %s) received\n", hex(hash), self.id, hex(currentBlockHash))
+ td = self.td
+ } else {
+ plog.DebugDetailf("AddBlock: head block %s for peer <%s> (head: %s) already known", hex(hash), self.id, hex(currentBlockHash))
+ }
+ self.lock.Unlock()
+ // this must be called without peerlock.
+ // peerlock held can halt the loop and block on select forever
+ if td != nil {
+ select {
+ case currentBlockC <- block:
+ case <-switchC: // peer is not best peer
+ }
+ }
+ return
+}
+
+// this will use the TD given by the first peer to update peer td, this helps second best peer selection
+// :FIXME: node
+func (self *peer) setChainInfoFromNode(n *node) {
// in case best peer is lost
- if block.Td != nil && block.Td.Cmp(self.td) > 0 {
- plog.DebugDetailf("setChainInfoFromBlock: update <%s> - head: %v->%v - TD: %v->%v", self.id, hex(self.currentBlockHash), hex(block.Hash()), self.td, block.Td)
- self.td = block.Td
+ block := n.block
+ hash := block.Hash()
+ if n.td != nil && n.td.Cmp(self.td) > 0 {
+ plog.DebugDetailf("AddBlock: update peer <%s> - head: %v->%v - TD: %v->%v", self.id, hex(self.currentBlockHash), hex(hash), self.td, n.td)
+ self.td = n.td
self.currentBlockHash = block.Hash()
self.parentHash = block.ParentHash()
self.currentBlock = block
@@ -218,17 +258,11 @@ func (self *peers) addPeer(
if found {
// when called on an already connected peer, it means a newBlockMsg is received
// peer head info is updated
- p.lock.Lock()
- if p.currentBlockHash != currentBlockHash {
- previousBlockHash = p.currentBlockHash
- plog.Debugf("addPeer: Update peer <%s> with td %v and current block %s (was %v)", id, td, hex(currentBlockHash), hex(previousBlockHash))
- p.setChainInfo(td, currentBlockHash)
-
- self.status.lock.Lock()
- self.status.values.NewBlocks++
- self.status.lock.Unlock()
- }
- p.lock.Unlock()
+ p.setChainInfo(td, currentBlockHash)
+ // FIXME: only count the same block once
+ self.status.lock.Lock()
+ self.status.values.NewBlocks++
+ self.status.lock.Unlock()
} else {
p = self.newPeer(td, currentBlockHash, id, requestBlockHashes, requestBlocks, peerError)
@@ -333,8 +367,8 @@ func (self *BlockPool) switchPeer(oldp, newp *peer) {
close(oldp.switchC)
}
if newp != nil {
- newp.idleC = make(chan bool)
- newp.switchC = make(chan bool)
+ // newp.idleC = make(chan bool)
+ // newp.switchC = make(chan bool)
// if new best peer has no head section yet, create it and run it
// otherwise head section is an element of peer.sections
if newp.headSection == nil {
@@ -354,6 +388,9 @@ func (self *BlockPool) switchPeer(oldp, newp *peer) {
}
}()
+ } else {
+ newp.idleC = make(chan bool)
+ newp.switchC = make(chan bool)
}
var connected = make(map[common.Hash]*section)
@@ -528,10 +565,12 @@ func (self *peer) getBlockHashes() bool {
// main loop for head section process
func (self *peer) run() {
- self.lock.RLock()
+ self.lock.Lock()
+ self.switchC = make(chan bool)
+ self.idleC = make(chan bool)
switchC := self.switchC
plog.Debugf("HeadSection: <%s> section process for head %s started", self.id, hex(self.currentBlockHash))
- self.lock.RUnlock()
+ self.lock.Unlock()
self.blockHashesRequestTimer = nil
diff --git a/blockpool/peers_test.go b/blockpool/peers_test.go
index e32bb6fc8..e5788e379 100644
--- a/blockpool/peers_test.go
+++ b/blockpool/peers_test.go
@@ -145,7 +145,6 @@ func TestAddPeer(t *testing.T) {
}
func TestPeerPromotionByTdOnBlock(t *testing.T) {
- t.Skip()
test.LogInit()
_, blockPool, blockPoolTester := newTestBlockPool(t)
blockPoolTester.blockChain[0] = nil
@@ -155,28 +154,26 @@ func TestPeerPromotionByTdOnBlock(t *testing.T) {
peer2 := blockPoolTester.newPeer("peer2", 4, 4)
blockPool.Start()
- blockPoolTester.tds = make(map[int]int)
- blockPoolTester.tds[3] = 3
- // pool
peer0.AddPeer()
peer0.serveBlocks(1, 2)
best := peer1.AddPeer()
// this tests that peer1 is not promoted over peer0 yet
if best {
t.Errorf("peer1 (TD=1) should not be set as best")
+ return
}
best = peer2.AddPeer()
peer2.serveBlocks(3, 4)
peer2.serveBlockHashes(4, 3, 2, 1)
- // hashes := blockPoolTester.hashPool.IndexesToHashes([]int{2, 3})
- peer1.serveBlocks(2, 3)
+ peer1.sendBlocks(3, 4)
blockPool.RemovePeer("peer2")
if blockPool.peers.best.id != "peer1" {
t.Errorf("peer1 (TD=3) should be set as best")
+ return
}
- peer1.serveBlocks(0, 1, 2)
+ peer1.serveBlocks(0, 1, 2, 3)
blockPool.Wait(waitTimeout)
blockPool.Stop()
diff --git a/cmd/ethtest/main.go b/cmd/ethtest/main.go
index 3c5b2cedf..c2c94d6c4 100644
--- a/cmd/ethtest/main.go
+++ b/cmd/ethtest/main.go
@@ -38,6 +38,7 @@ import (
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/logger"
+ "github.com/ethereum/go-ethereum/logger/glog"
"github.com/ethereum/go-ethereum/tests/helper"
)
@@ -110,6 +111,9 @@ func RunVmTest(r io.Reader) (failed int) {
log.Fatalln(err)
}
+ vm.Debug = true
+ glog.SetV(4)
+ glog.SetToStderr(true)
for name, test := range tests {
db, _ := ethdb.NewMemDatabase()
statedb := state.New(common.Hash{}, db)
@@ -135,7 +139,7 @@ func RunVmTest(r io.Reader) (failed int) {
rexp := helper.FromHex(test.Out)
if bytes.Compare(rexp, ret) != 0 {
- helper.Log.Infof("%s's return failed. Expected %x, got %x\n", name, rexp, ret)
+ glog.V(logger.Info).Infof("%s's return failed. Expected %x, got %x\n", name, rexp, ret)
failed = 1
}
@@ -147,7 +151,7 @@ func RunVmTest(r io.Reader) (failed int) {
if len(test.Exec) == 0 {
if obj.Balance().Cmp(common.Big(account.Balance)) != 0 {
- helper.Log.Infof("%s's : (%x) balance failed. Expected %v, got %v => %v\n", name, obj.Address().Bytes()[:4], account.Balance, obj.Balance(), new(big.Int).Sub(common.Big(account.Balance), obj.Balance()))
+ glog.V(logger.Info).Infof("%s's : (%x) balance failed. Expected %v, got %v => %v\n", name, obj.Address().Bytes()[:4], account.Balance, obj.Balance(), new(big.Int).Sub(common.Big(account.Balance), obj.Balance()))
failed = 1
}
}
@@ -157,7 +161,7 @@ func RunVmTest(r io.Reader) (failed int) {
vexp := helper.FromHex(value)
if bytes.Compare(v, vexp) != 0 {
- helper.Log.Infof("%s's : (%x: %s) storage failed. Expected %x, got %x (%v %v)\n", name, obj.Address().Bytes()[0:4], addr, vexp, v, common.BigD(vexp), common.BigD(v))
+ glog.V(logger.Info).Infof("%s's : (%x: %s) storage failed. Expected %x, got %x (%v %v)\n", name, obj.Address().Bytes()[0:4], addr, vexp, v, common.BigD(vexp), common.BigD(v))
failed = 1
}
}
@@ -166,33 +170,33 @@ func RunVmTest(r io.Reader) (failed int) {
statedb.Sync()
//if !bytes.Equal(common.Hex2Bytes(test.PostStateRoot), statedb.Root()) {
if common.HexToHash(test.PostStateRoot) != statedb.Root() {
- helper.Log.Infof("%s's : Post state root failed. Expected %s, got %x", name, test.PostStateRoot, statedb.Root())
+ glog.V(logger.Info).Infof("%s's : Post state root failed. Expected %s, got %x", name, test.PostStateRoot, statedb.Root())
failed = 1
}
if len(test.Logs) > 0 {
if len(test.Logs) != len(logs) {
- helper.Log.Infof("log length failed. Expected %d, got %d", len(test.Logs), len(logs))
+ glog.V(logger.Info).Infof("log length failed. Expected %d, got %d", len(test.Logs), len(logs))
failed = 1
} else {
for i, log := range test.Logs {
if common.HexToAddress(log.AddressF) != logs[i].Address {
- helper.Log.Infof("'%s' log address failed. Expected %v got %x", name, log.AddressF, logs[i].Address)
+ glog.V(logger.Info).Infof("'%s' log address failed. Expected %v got %x", name, log.AddressF, logs[i].Address)
failed = 1
}
if !bytes.Equal(logs[i].Data, helper.FromHex(log.DataF)) {
- helper.Log.Infof("'%s' log data failed. Expected %v got %x", name, log.DataF, logs[i].Data)
+ glog.V(logger.Info).Infof("'%s' log data failed. Expected %v got %x", name, log.DataF, logs[i].Data)
failed = 1
}
if len(log.TopicsF) != len(logs[i].Topics) {
- helper.Log.Infof("'%s' log topics length failed. Expected %d got %d", name, len(log.TopicsF), logs[i].Topics)
+ glog.V(logger.Info).Infof("'%s' log topics length failed. Expected %d got %d", name, len(log.TopicsF), logs[i].Topics)
failed = 1
} else {
for j, topic := range log.TopicsF {
if common.HexToHash(topic) != logs[i].Topics[j] {
- helper.Log.Infof("'%s' log topic[%d] failed. Expected %v got %x", name, j, topic, logs[i].Topics[j])
+ glog.V(logger.Info).Infof("'%s' log topic[%d] failed. Expected %v got %x", name, j, topic, logs[i].Topics[j])
failed = 1
}
}
@@ -200,7 +204,7 @@ func RunVmTest(r io.Reader) (failed int) {
genBloom := common.LeftPadBytes(types.LogsBloom(state.Logs{logs[i]}).Bytes(), 256)
if !bytes.Equal(genBloom, common.Hex2Bytes(log.BloomF)) {
- helper.Log.Infof("'%s' bloom failed.", name)
+ glog.V(logger.Info).Infof("'%s' bloom failed.", name)
failed = 1
}
}
@@ -208,7 +212,7 @@ func RunVmTest(r io.Reader) (failed int) {
}
if failed == 1 {
- helper.Log.Infoln(string(statedb.Dump()))
+ glog.V(logger.Info).Infoln(string(statedb.Dump()))
}
logger.Flush()
diff --git a/cmd/geth/main.go b/cmd/geth/main.go
index 4853a16fc..9e5ae5857 100644
--- a/cmd/geth/main.go
+++ b/cmd/geth/main.go
@@ -24,6 +24,8 @@ import (
"bufio"
"fmt"
"io/ioutil"
+ "log"
+ "net/http"
"os"
"runtime"
"strconv"
@@ -40,10 +42,11 @@ import (
"github.com/ethereum/go-ethereum/logger"
"github.com/peterh/liner"
)
+import _ "net/http/pprof"
const (
ClientIdentifier = "Geth"
- Version = "0.9.8"
+ Version = "0.9.9"
)
var app = utils.NewApp(Version, "the go-ethereum command line interface")
@@ -247,6 +250,11 @@ JavaScript API. See https://github.com/ethereum/go-ethereum/wiki/Javascipt-Conso
}
func main() {
+ // Start up the default http server for pprof
+ go func() {
+ log.Println(http.ListenAndServe("localhost:6060", nil))
+ }()
+
fmt.Printf("Welcome to the FRONTIER\n")
runtime.GOMAXPROCS(runtime.NumCPU())
defer logger.Flush()
diff --git a/core/execution.go b/core/execution.go
index 8134471d1..9adf98032 100644
--- a/core/execution.go
+++ b/core/execution.go
@@ -34,7 +34,10 @@ func (self *Execution) Call(codeAddr common.Address, caller vm.ContextRef) ([]by
}
func (self *Execution) Create(caller vm.ContextRef) (ret []byte, err error, account *state.StateObject) {
- ret, err = self.exec(nil, self.input, caller)
+ // Input must be nil for create
+ code := self.input
+ self.input = nil
+ ret, err = self.exec(nil, code, caller)
account = self.env.State().GetStateObject(*self.address)
return
}
diff --git a/p2p/discover/udp.go b/p2p/discover/udp.go
index a638a8f35..d37260e7d 100644
--- a/p2p/discover/udp.go
+++ b/p2p/discover/udp.go
@@ -335,7 +335,7 @@ func (t *udp) send(toaddr *net.UDPAddr, ptype byte, req interface{}) error {
if err != nil {
return err
}
- glog.V(logger.Detail).Infof(">>> %v %T %v\n", toaddr, req, req)
+ glog.V(logger.Detail).Infof(">>> %v %T\n", toaddr, req)
if _, err = t.conn.WriteToUDP(packet, toaddr); err != nil {
glog.V(logger.Detail).Infoln("UDP send failed:", err)
}
@@ -378,12 +378,11 @@ func (t *udp) readLoop() {
glog.V(logger.Debug).Infof("Bad packet from %v: %v\n", from, err)
continue
}
- glog.V(logger.Detail).Infof("<<< %v %T %v\n", from, packet, packet)
- go func() {
- if err := packet.handle(t, from, fromID, hash); err != nil {
- glog.V(logger.Debug).Infof("error handling %T from %v: %v", packet, from, err)
- }
- }()
+ status := "ok"
+ if err := packet.handle(t, from, fromID, hash); err != nil {
+ status = err.Error()
+ }
+ glog.V(logger.Detail).Infof("<<< %v %T: %s\n", from, packet, status)
}
}
@@ -430,7 +429,7 @@ func (req *ping) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) er
})
if !t.handleReply(fromID, pingPacket, req) {
// Note: we're ignoring the provided IP address right now
- t.bond(true, fromID, from, req.Port)
+ go t.bond(true, fromID, from, req.Port)
}
return nil
}
diff --git a/p2p/handshake.go b/p2p/handshake.go
index 031064407..43361364f 100644
--- a/p2p/handshake.go
+++ b/p2p/handshake.go
@@ -68,50 +68,61 @@ type protoHandshake struct {
// setupConn starts a protocol session on the given connection.
// It runs the encryption handshake and the protocol handshake.
// If dial is non-nil, the connection the local node is the initiator.
-func setupConn(fd net.Conn, prv *ecdsa.PrivateKey, our *protoHandshake, dial *discover.Node) (*conn, error) {
+// If atcap is true, the connection will be disconnected with DiscTooManyPeers
+// after the key exchange.
+func setupConn(fd net.Conn, prv *ecdsa.PrivateKey, our *protoHandshake, dial *discover.Node, atcap bool) (*conn, error) {
if dial == nil {
- return setupInboundConn(fd, prv, our)
+ return setupInboundConn(fd, prv, our, atcap)
} else {
- return setupOutboundConn(fd, prv, our, dial)
+ return setupOutboundConn(fd, prv, our, dial, atcap)
}
}
-func setupInboundConn(fd net.Conn, prv *ecdsa.PrivateKey, our *protoHandshake) (*conn, error) {
+func setupInboundConn(fd net.Conn, prv *ecdsa.PrivateKey, our *protoHandshake, atcap bool) (*conn, error) {
secrets, err := receiverEncHandshake(fd, prv, nil)
if err != nil {
return nil, fmt.Errorf("encryption handshake failed: %v", err)
}
-
- // Run the protocol handshake using authenticated messages.
rw := newRlpxFrameRW(fd, secrets)
- rhs, err := readProtocolHandshake(rw, our)
+ if atcap {
+ SendItems(rw, discMsg, DiscTooManyPeers)
+ return nil, errors.New("we have too many peers")
+ }
+ // Run the protocol handshake using authenticated messages.
+ rhs, err := readProtocolHandshake(rw, secrets.RemoteID, our)
if err != nil {
return nil, err
}
- if rhs.ID != secrets.RemoteID {
- return nil, errors.New("node ID in protocol handshake does not match encryption handshake")
- }
- // TODO: validate that handshake node ID matches
if err := Send(rw, handshakeMsg, our); err != nil {
- return nil, fmt.Errorf("protocol write error: %v", err)
+ return nil, fmt.Errorf("protocol handshake write error: %v", err)
}
return &conn{rw, rhs}, nil
}
-func setupOutboundConn(fd net.Conn, prv *ecdsa.PrivateKey, our *protoHandshake, dial *discover.Node) (*conn, error) {
+func setupOutboundConn(fd net.Conn, prv *ecdsa.PrivateKey, our *protoHandshake, dial *discover.Node, atcap bool) (*conn, error) {
secrets, err := initiatorEncHandshake(fd, prv, dial.ID, nil)
if err != nil {
return nil, fmt.Errorf("encryption handshake failed: %v", err)
}
-
- // Run the protocol handshake using authenticated messages.
rw := newRlpxFrameRW(fd, secrets)
- if err := Send(rw, handshakeMsg, our); err != nil {
- return nil, fmt.Errorf("protocol write error: %v", err)
+ if atcap {
+ SendItems(rw, discMsg, DiscTooManyPeers)
+ return nil, errors.New("we have too many peers")
}
- rhs, err := readProtocolHandshake(rw, our)
+ // Run the protocol handshake using authenticated messages.
+ //
+ // Note that even though writing the handshake is first, we prefer
+ // returning the handshake read error. If the remote side
+ // disconnects us early with a valid reason, we should return it
+ // as the error so it can be tracked elsewhere.
+ werr := make(chan error)
+ go func() { werr <- Send(rw, handshakeMsg, our) }()
+ rhs, err := readProtocolHandshake(rw, secrets.RemoteID, our)
if err != nil {
- return nil, fmt.Errorf("protocol handshake read error: %v", err)
+ return nil, err
+ }
+ if err := <-werr; err != nil {
+ return nil, fmt.Errorf("protocol handshake write error: %v", err)
}
if rhs.ID != dial.ID {
return nil, errors.New("dialed node id mismatch")
@@ -398,18 +409,17 @@ func xor(one, other []byte) (xor []byte) {
return xor
}
-func readProtocolHandshake(r MsgReader, our *protoHandshake) (*protoHandshake, error) {
- // read and handle remote handshake
- msg, err := r.ReadMsg()
+func readProtocolHandshake(rw MsgReadWriter, wantID discover.NodeID, our *protoHandshake) (*protoHandshake, error) {
+ msg, err := rw.ReadMsg()
if err != nil {
return nil, err
}
if msg.Code == discMsg {
// disconnect before protocol handshake is valid according to the
// spec and we send it ourself if Server.addPeer fails.
- var reason DiscReason
+ var reason [1]DiscReason
rlp.Decode(msg.Payload, &reason)
- return nil, discRequestedError(reason)
+ return nil, reason[0]
}
if msg.Code != handshakeMsg {
return nil, fmt.Errorf("expected handshake, got %x", msg.Code)
@@ -423,10 +433,16 @@ func readProtocolHandshake(r MsgReader, our *protoHandshake) (*protoHandshake, e
}
// validate handshake info
if hs.Version != our.Version {
- return nil, newPeerError(errP2PVersionMismatch, "required version %d, received %d\n", baseProtocolVersion, hs.Version)
+ SendItems(rw, discMsg, DiscIncompatibleVersion)
+ return nil, fmt.Errorf("required version %d, received %d\n", baseProtocolVersion, hs.Version)
}
if (hs.ID == discover.NodeID{}) {
- return nil, newPeerError(errPubkeyInvalid, "missing")
+ SendItems(rw, discMsg, DiscInvalidIdentity)
+ return nil, errors.New("invalid public key in handshake")
+ }
+ if hs.ID != wantID {
+ SendItems(rw, discMsg, DiscUnexpectedIdentity)
+ return nil, errors.New("handshake node ID does not match encryption handshake")
}
return &hs, nil
}
diff --git a/p2p/handshake_test.go b/p2p/handshake_test.go
index 19423bb82..c22af7a9c 100644
--- a/p2p/handshake_test.go
+++ b/p2p/handshake_test.go
@@ -143,7 +143,7 @@ func TestSetupConn(t *testing.T) {
done := make(chan struct{})
go func() {
defer close(done)
- conn0, err := setupConn(fd0, prv0, hs0, node1)
+ conn0, err := setupConn(fd0, prv0, hs0, node1, false)
if err != nil {
t.Errorf("outbound side error: %v", err)
return
@@ -156,7 +156,7 @@ func TestSetupConn(t *testing.T) {
}
}()
- conn1, err := setupConn(fd1, prv1, hs1, nil)
+ conn1, err := setupConn(fd1, prv1, hs1, nil, false)
if err != nil {
t.Fatalf("inbound side error: %v", err)
}
diff --git a/p2p/peer.go b/p2p/peer.go
index 6b97ea58d..7bc4f9cf6 100644
--- a/p2p/peer.go
+++ b/p2p/peer.go
@@ -44,7 +44,7 @@ type Peer struct {
rw *conn
running map[string]*protoRW
- protoWG sync.WaitGroup
+ wg sync.WaitGroup
protoErr chan error
closed chan struct{}
disc chan DiscReason
@@ -102,58 +102,50 @@ func (p *Peer) String() string {
func newPeer(fd net.Conn, conn *conn, protocols []Protocol) *Peer {
logtag := fmt.Sprintf("Peer %.8x %v", conn.ID[:], fd.RemoteAddr())
+ protomap := matchProtocols(protocols, conn.Caps, conn)
p := &Peer{
Logger: logger.NewLogger(logtag),
conn: fd,
rw: conn,
- running: matchProtocols(protocols, conn.Caps, conn),
+ running: protomap,
disc: make(chan DiscReason),
- protoErr: make(chan error),
+ protoErr: make(chan error, len(protomap)+1), // protocols + pingLoop
closed: make(chan struct{}),
}
return p
}
func (p *Peer) run() DiscReason {
- var readErr = make(chan error, 1)
- defer p.closeProtocols()
- defer close(p.closed)
+ readErr := make(chan error, 1)
+ p.wg.Add(2)
+ go p.readLoop(readErr)
+ go p.pingLoop()
p.startProtocols()
- go func() { readErr <- p.readLoop() }()
-
- ping := time.NewTicker(pingInterval)
- defer ping.Stop()
// Wait for an error or disconnect.
var reason DiscReason
-loop:
- for {
- select {
- case <-ping.C:
- go func() {
- if err := SendItems(p.rw, pingMsg); err != nil {
- p.protoErr <- err
- return
- }
- }()
- case err := <-readErr:
- // We rely on protocols to abort if there is a write error. It
- // might be more robust to handle them here as well.
- p.DebugDetailf("Read error: %v\n", err)
- p.conn.Close()
- return DiscNetworkError
- case err := <-p.protoErr:
- reason = discReasonForError(err)
- break loop
- case reason = <-p.disc:
- break loop
+ select {
+ case err := <-readErr:
+ if r, ok := err.(DiscReason); ok {
+ reason = r
+ break
}
+ // Note: We rely on protocols to abort if there is a write
+ // error. It might be more robust to handle them here as well.
+ p.DebugDetailf("Read error: %v\n", err)
+ p.conn.Close()
+ reason = DiscNetworkError
+ case err := <-p.protoErr:
+ reason = discReasonForError(err)
+ case reason = <-p.disc:
}
- p.politeDisconnect(reason)
- // Wait for readLoop. It will end because conn is now closed.
- <-readErr
+ close(p.closed)
+ p.wg.Wait()
+ if reason != DiscNetworkError {
+ p.politeDisconnect(reason)
+ }
p.Debugf("Disconnected: %v\n", reason)
return reason
}
@@ -174,18 +166,36 @@ func (p *Peer) politeDisconnect(reason DiscReason) {
p.conn.Close()
}
-func (p *Peer) readLoop() error {
+func (p *Peer) pingLoop() {
+ ping := time.NewTicker(pingInterval)
+ defer p.wg.Done()
+ defer ping.Stop()
+ for {
+ select {
+ case <-ping.C:
+ if err := SendItems(p.rw, pingMsg); err != nil {
+ p.protoErr <- err
+ return
+ }
+ case <-p.closed:
+ return
+ }
+ }
+}
+
+func (p *Peer) readLoop(errc chan<- error) {
+ defer p.wg.Done()
for {
- p.conn.SetDeadline(time.Now().Add(frameReadTimeout))
msg, err := p.rw.ReadMsg()
if err != nil {
- return err
+ errc <- err
+ return
}
if err = p.handle(msg); err != nil {
- return err
+ errc <- err
+ return
}
}
- return nil
}
func (p *Peer) handle(msg Msg) error {
@@ -195,12 +205,11 @@ func (p *Peer) handle(msg Msg) error {
go SendItems(p.rw, pongMsg)
case msg.Code == discMsg:
var reason [1]DiscReason
- // no need to discard or for error checking, we'll close the
- // connection after this.
+ // This is the last message. We don't need to discard or
+ // check errors because, the connection will be closed after it.
rlp.Decode(msg.Payload, &reason)
p.Debugf("Disconnect requested: %v\n", reason[0])
- p.Disconnect(DiscRequested)
- return discRequestedError(reason[0])
+ return DiscRequested
case msg.Code < baseProtocolLength:
// ignore other base protocol messages
return msg.Discard()
@@ -210,7 +219,12 @@ func (p *Peer) handle(msg Msg) error {
if err != nil {
return fmt.Errorf("msg code out of range: %v", msg.Code)
}
- proto.in <- msg
+ select {
+ case proto.in <- msg:
+ return nil
+ case <-p.closed:
+ return io.EOF
+ }
}
return nil
}
@@ -234,10 +248,11 @@ outer:
}
func (p *Peer) startProtocols() {
+ p.wg.Add(len(p.running))
for _, proto := range p.running {
proto := proto
+ proto.closed = p.closed
p.DebugDetailf("Starting protocol %s/%d\n", proto.Name, proto.Version)
- p.protoWG.Add(1)
go func() {
err := proto.Run(p, proto)
if err == nil {
@@ -246,11 +261,8 @@ func (p *Peer) startProtocols() {
} else {
p.DebugDetailf("Protocol %s/%d error: %v\n", proto.Name, proto.Version, err)
}
- select {
- case p.protoErr <- err:
- case <-p.closed:
- }
- p.protoWG.Done()
+ p.protoErr <- err
+ p.wg.Done()
}()
}
}
@@ -266,13 +278,6 @@ func (p *Peer) getProto(code uint64) (*protoRW, error) {
return nil, newPeerError(errInvalidMsgCode, "%d", code)
}
-func (p *Peer) closeProtocols() {
- for _, p := range p.running {
- close(p.in)
- }
- p.protoWG.Wait()
-}
-
// writeProtoMsg sends the given message on behalf of the given named protocol.
// this exists because of Server.Broadcast.
func (p *Peer) writeProtoMsg(protoName string, msg Msg) error {
@@ -289,8 +294,8 @@ func (p *Peer) writeProtoMsg(protoName string, msg Msg) error {
type protoRW struct {
Protocol
-
in chan Msg
+ closed <-chan struct{}
offset uint64
w MsgWriter
}
@@ -304,10 +309,11 @@ func (rw *protoRW) WriteMsg(msg Msg) error {
}
func (rw *protoRW) ReadMsg() (Msg, error) {
- msg, ok := <-rw.in
- if !ok {
- return msg, io.EOF
+ select {
+ case msg := <-rw.in:
+ msg.Code -= rw.offset
+ return msg, nil
+ case <-rw.closed:
+ return Msg{}, io.EOF
}
- msg.Code -= rw.offset
- return msg, nil
}
diff --git a/p2p/peer_error.go b/p2p/peer_error.go
index 0ff4f4b43..402131630 100644
--- a/p2p/peer_error.go
+++ b/p2p/peer_error.go
@@ -98,15 +98,13 @@ func (d DiscReason) String() string {
return discReasonToString[d]
}
-type discRequestedError DiscReason
-
-func (err discRequestedError) Error() string {
- return fmt.Sprintf("disconnect requested: %v", DiscReason(err))
+func (d DiscReason) Error() string {
+ return d.String()
}
func discReasonForError(err error) DiscReason {
- if reason, ok := err.(discRequestedError); ok {
- return DiscReason(reason)
+ if reason, ok := err.(DiscReason); ok {
+ return reason
}
peerError, ok := err.(*peerError)
if !ok {
diff --git a/p2p/peer_test.go b/p2p/peer_test.go
index 3c4c71c0c..fb76818a0 100644
--- a/p2p/peer_test.go
+++ b/p2p/peer_test.go
@@ -2,8 +2,9 @@ package p2p
import (
"bytes"
+ "errors"
"fmt"
- "io"
+ "math/rand"
"net"
"reflect"
"testing"
@@ -27,7 +28,7 @@ var discard = Protocol{
},
}
-func testPeer(protos []Protocol) (io.Closer, *conn, *Peer, <-chan DiscReason) {
+func testPeer(protos []Protocol) (func(), *conn, *Peer, <-chan DiscReason) {
fd1, _ := net.Pipe()
hs1 := &protoHandshake{ID: randomID(), Version: baseProtocolVersion}
hs2 := &protoHandshake{ID: randomID(), Version: baseProtocolVersion}
@@ -41,7 +42,11 @@ func testPeer(protos []Protocol) (io.Closer, *conn, *Peer, <-chan DiscReason) {
errc := make(chan DiscReason, 1)
go func() { errc <- peer.run() }()
- return p1, &conn{p2, hs2}, peer, errc
+ closer := func() {
+ p1.Close()
+ fd1.Close()
+ }
+ return closer, &conn{p2, hs2}, peer, errc
}
func TestPeerProtoReadMsg(t *testing.T) {
@@ -67,7 +72,7 @@ func TestPeerProtoReadMsg(t *testing.T) {
}
closer, rw, _, errc := testPeer([]Protocol{proto})
- defer closer.Close()
+ defer closer()
Send(rw, baseProtocolLength+2, []uint{1})
Send(rw, baseProtocolLength+3, []uint{2})
@@ -99,7 +104,7 @@ func TestPeerProtoEncodeMsg(t *testing.T) {
},
}
closer, rw, _, _ := testPeer([]Protocol{proto})
- defer closer.Close()
+ defer closer()
if err := ExpectMsg(rw, 17, []string{"foo", "bar"}); err != nil {
t.Error(err)
@@ -110,7 +115,7 @@ func TestPeerWriteForBroadcast(t *testing.T) {
defer testlog(t).detach()
closer, rw, peer, peerErr := testPeer([]Protocol{discard})
- defer closer.Close()
+ defer closer()
emptymsg := func(code uint64) Msg {
return Msg{Code: code, Size: 0, Payload: bytes.NewReader(nil)}
@@ -150,7 +155,7 @@ func TestPeerPing(t *testing.T) {
defer testlog(t).detach()
closer, rw, _, _ := testPeer(nil)
- defer closer.Close()
+ defer closer()
if err := SendItems(rw, pingMsg); err != nil {
t.Fatal(err)
}
@@ -163,19 +168,70 @@ func TestPeerDisconnect(t *testing.T) {
defer testlog(t).detach()
closer, rw, _, disc := testPeer(nil)
- defer closer.Close()
+ defer closer()
if err := SendItems(rw, discMsg, DiscQuitting); err != nil {
t.Fatal(err)
}
if err := ExpectMsg(rw, discMsg, []interface{}{DiscRequested}); err != nil {
t.Error(err)
}
- closer.Close() // make test end faster
+ closer()
if reason := <-disc; reason != DiscRequested {
t.Errorf("run returned wrong reason: got %v, want %v", reason, DiscRequested)
}
}
+// This test is supposed to verify that Peer can reliably handle
+// multiple causes of disconnection occurring at the same time.
+func TestPeerDisconnectRace(t *testing.T) {
+ defer testlog(t).detach()
+ maybe := func() bool { return rand.Intn(1) == 1 }
+
+ for i := 0; i < 1000; i++ {
+ protoclose := make(chan error)
+ protodisc := make(chan DiscReason)
+ closer, rw, p, disc := testPeer([]Protocol{
+ {
+ Name: "closereq",
+ Run: func(p *Peer, rw MsgReadWriter) error { return <-protoclose },
+ Length: 1,
+ },
+ {
+ Name: "disconnect",
+ Run: func(p *Peer, rw MsgReadWriter) error { p.Disconnect(<-protodisc); return nil },
+ Length: 1,
+ },
+ })
+
+ // Simulate incoming messages.
+ go SendItems(rw, baseProtocolLength+1)
+ go SendItems(rw, baseProtocolLength+2)
+ // Close the network connection.
+ go closer()
+ // Make protocol "closereq" return.
+ protoclose <- errors.New("protocol closed")
+ // Make protocol "disconnect" call peer.Disconnect
+ protodisc <- DiscAlreadyConnected
+ // In some cases, simulate something else calling peer.Disconnect.
+ if maybe() {
+ go p.Disconnect(DiscInvalidIdentity)
+ }
+ // In some cases, simulate remote requesting a disconnect.
+ if maybe() {
+ go SendItems(rw, discMsg, DiscQuitting)
+ }
+
+ select {
+ case <-disc:
+ case <-time.After(2 * time.Second):
+ // Peer.run should return quickly. If it doesn't the Peer
+ // goroutines are probably deadlocked. Call panic in order to
+ // show the stacks.
+ panic("Peer.run took to long to return.")
+ }
+ }
+}
+
func TestNewPeer(t *testing.T) {
name := "nodename"
caps := []Cap{{"foo", 2}, {"bar", 3}}
diff --git a/p2p/server.go b/p2p/server.go
index 0a2621aa8..5cd3dc2ad 100644
--- a/p2p/server.go
+++ b/p2p/server.go
@@ -3,6 +3,7 @@ package p2p
import (
"bytes"
"crypto/ecdsa"
+ "crypto/rand"
"errors"
"fmt"
"net"
@@ -20,6 +21,11 @@ const (
defaultDialTimeout = 10 * time.Second
refreshPeersInterval = 30 * time.Second
+ // This is the maximum number of inbound connection
+ // that are allowed to linger between 'accepted' and
+ // 'added as peer'.
+ maxAcceptConns = 50
+
// total timeout for encryption handshake and protocol
// handshake in both directions.
handshakeTimeout = 5 * time.Second
@@ -85,12 +91,12 @@ type Server struct {
ourHandshake *protoHandshake
- lock sync.RWMutex
- running bool
- listener net.Listener
- peers map[discover.NodeID]*Peer
+ lock sync.RWMutex // protects running and peers
+ running bool
+ peers map[discover.NodeID]*Peer
- ntab *discover.Table
+ ntab *discover.Table
+ listener net.Listener
quit chan struct{}
loopWG sync.WaitGroup // {dial,listen,nat}Loop
@@ -98,7 +104,7 @@ type Server struct {
peerConnect chan *discover.Node
}
-type setupFunc func(net.Conn, *ecdsa.PrivateKey, *protoHandshake, *discover.Node) (*conn, error)
+type setupFunc func(net.Conn, *ecdsa.PrivateKey, *protoHandshake, *discover.Node, bool) (*conn, error)
type newPeerHook func(*Peer)
// Peers returns all connected peers.
@@ -260,62 +266,94 @@ func (srv *Server) Stop() {
srv.peerWG.Wait()
}
+// Self returns the local node's endpoint information.
+func (srv *Server) Self() *discover.Node {
+ return srv.ntab.Self()
+}
+
// main loop for adding connections via listening
func (srv *Server) listenLoop() {
defer srv.loopWG.Done()
+
+ // This channel acts as a semaphore limiting
+ // active inbound connections that are lingering pre-handshake.
+ // If all slots are taken, no further connections are accepted.
+ slots := make(chan struct{}, maxAcceptConns)
+ for i := 0; i < maxAcceptConns; i++ {
+ slots <- struct{}{}
+ }
+
glog.V(logger.Info).Infoln("Listening on", srv.listener.Addr())
for {
+ <-slots
conn, err := srv.listener.Accept()
if err != nil {
return
}
glog.V(logger.Debug).Infof("Accepted conn %v\n", conn.RemoteAddr())
srv.peerWG.Add(1)
- go srv.startPeer(conn, nil)
+ go func() {
+ srv.startPeer(conn, nil)
+ slots <- struct{}{}
+ }()
}
}
func (srv *Server) dialLoop() {
+ var (
+ dialed = make(chan *discover.Node)
+ dialing = make(map[discover.NodeID]bool)
+ findresults = make(chan []*discover.Node)
+ refresh = time.NewTimer(0)
+ )
defer srv.loopWG.Done()
- refresh := time.NewTicker(refreshPeersInterval)
defer refresh.Stop()
- srv.ntab.Bootstrap(srv.BootstrapNodes)
- go srv.findPeers()
-
- dialed := make(chan *discover.Node)
- dialing := make(map[discover.NodeID]bool)
+ // TODO: maybe limit number of active dials
+ dial := func(dest *discover.Node) {
+ // Don't dial nodes that would fail the checks in addPeer.
+ // This is important because the connection handshake is a lot
+ // of work and we'd rather avoid doing that work for peers
+ // that can't be added.
+ srv.lock.RLock()
+ ok, _ := srv.checkPeer(dest.ID)
+ srv.lock.RUnlock()
+ if !ok || dialing[dest.ID] {
+ return
+ }
- // TODO: limit number of active dials
- // TODO: ensure only one findPeers goroutine is running
- // TODO: pause findPeers when we're at capacity
+ dialing[dest.ID] = true
+ srv.peerWG.Add(1)
+ go func() {
+ srv.dialNode(dest)
+ dialed <- dest
+ }()
+ }
+ srv.ntab.Bootstrap(srv.BootstrapNodes)
for {
select {
case <-refresh.C:
-
- go srv.findPeers()
+ // Grab some nodes to connect to if we're not at capacity.
+ srv.lock.RLock()
+ needpeers := len(srv.peers) < srv.MaxPeers
+ srv.lock.RUnlock()
+ if needpeers {
+ go func() {
+ var target discover.NodeID
+ rand.Read(target[:])
+ findresults <- srv.ntab.Lookup(target)
+ }()
+ refresh.Stop()
+ }
case dest := <-srv.peerConnect:
- // avoid dialing nodes that are already connected.
- // there is another check for this in addPeer,
- // which runs after the handshake.
- srv.lock.Lock()
- _, isconnected := srv.peers[dest.ID]
- srv.lock.Unlock()
- if isconnected || dialing[dest.ID] || dest.ID == srv.Self().ID {
- continue
+ dial(dest)
+ case dests := <-findresults:
+ for _, dest := range dests {
+ dial(dest)
}
-
- dialing[dest.ID] = true
- srv.peerWG.Add(1)
- go func() {
- srv.dialNode(dest)
- // at this point, the peer has been added
- // or discarded. either way, we're not dialing it anymore.
- dialed <- dest
- }()
-
+ refresh.Reset(refreshPeersInterval)
case dest := <-dialed:
delete(dialing, dest.ID)
@@ -331,44 +369,34 @@ func (srv *Server) dialNode(dest *discover.Node) {
glog.V(logger.Debug).Infof("Dialing %v\n", dest)
conn, err := srv.Dialer.Dial("tcp", addr.String())
if err != nil {
+ // dialLoop adds to the wait group counter when launching
+ // dialNode, so we need to count it down again. startPeer also
+ // does that when an error occurs.
+ srv.peerWG.Done()
glog.V(logger.Detail).Infof("dial error: %v", err)
return
}
srv.startPeer(conn, dest)
}
-func (srv *Server) Self() *discover.Node {
- return srv.ntab.Self()
-}
-
-func (srv *Server) findPeers() {
- far := srv.Self().ID
- for i := range far {
- far[i] = ^far[i]
- }
- closeToSelf := srv.ntab.Lookup(srv.Self().ID)
- farFromSelf := srv.ntab.Lookup(far)
-
- for i := 0; i < len(closeToSelf) || i < len(farFromSelf); i++ {
- if i < len(closeToSelf) {
- srv.peerConnect <- closeToSelf[i]
- }
- if i < len(farFromSelf) {
- srv.peerConnect <- farFromSelf[i]
- }
- }
-}
-
func (srv *Server) startPeer(fd net.Conn, dest *discover.Node) {
// TODO: handle/store session token
+
+ // Run setupFunc, which should create an authenticated connection
+ // and run the capability exchange. Note that any early error
+ // returns during that exchange need to call peerWG.Done because
+ // the callers of startPeer added the peer to the wait group already.
fd.SetDeadline(time.Now().Add(handshakeTimeout))
- conn, err := srv.setupFunc(fd, srv.PrivateKey, srv.ourHandshake, dest)
+ srv.lock.RLock()
+ atcap := len(srv.peers) == srv.MaxPeers
+ srv.lock.RUnlock()
+ conn, err := srv.setupFunc(fd, srv.PrivateKey, srv.ourHandshake, dest, atcap)
if err != nil {
fd.Close()
glog.V(logger.Debug).Infof("Handshake with %v failed: %v", fd.RemoteAddr(), err)
+ srv.peerWG.Done()
return
}
-
conn.MsgReadWriter = &netWrapper{
wrapped: conn.MsgReadWriter,
conn: fd, rtimeout: frameReadTimeout, wtimeout: frameWriteTimeout,
@@ -377,26 +405,30 @@ func (srv *Server) startPeer(fd net.Conn, dest *discover.Node) {
if ok, reason := srv.addPeer(conn.ID, p); !ok {
glog.V(logger.Detail).Infof("Not adding %v (%v)\n", p, reason)
p.politeDisconnect(reason)
+ srv.peerWG.Done()
return
}
+ // The handshakes are done and it passed all checks.
+ // Spawn the Peer loops.
+ go srv.runPeer(p)
+}
+func (srv *Server) runPeer(p *Peer) {
glog.V(logger.Debug).Infof("Added %v\n", p)
srvjslog.LogJson(&logger.P2PConnected{
- RemoteId: fmt.Sprintf("%x", conn.ID[:]),
- RemoteAddress: fd.RemoteAddr().String(),
- RemoteVersionString: conn.Name,
+ RemoteId: p.ID().String(),
+ RemoteAddress: p.RemoteAddr().String(),
+ RemoteVersionString: p.Name(),
NumConnections: srv.PeerCount(),
})
-
if srv.newPeerHook != nil {
srv.newPeerHook(p)
}
discreason := p.run()
srv.removePeer(p)
-
glog.V(logger.Debug).Infof("Removed %v (%v)\n", p, discreason)
srvjslog.LogJson(&logger.P2PDisconnected{
- RemoteId: fmt.Sprintf("%x", conn.ID[:]),
+ RemoteId: p.ID().String(),
NumConnections: srv.PeerCount(),
})
}
@@ -404,6 +436,14 @@ func (srv *Server) startPeer(fd net.Conn, dest *discover.Node) {
func (srv *Server) addPeer(id discover.NodeID, p *Peer) (bool, DiscReason) {
srv.lock.Lock()
defer srv.lock.Unlock()
+ if ok, reason := srv.checkPeer(id); !ok {
+ return false, reason
+ }
+ srv.peers[id] = p
+ return true, 0
+}
+
+func (srv *Server) checkPeer(id discover.NodeID) (bool, DiscReason) {
switch {
case !srv.running:
return false, DiscQuitting
@@ -413,9 +453,9 @@ func (srv *Server) addPeer(id discover.NodeID, p *Peer) (bool, DiscReason) {
return false, DiscAlreadyConnected
case id == srv.Self().ID:
return false, DiscSelf
+ default:
+ return true, 0
}
- srv.peers[id] = p
- return true, 0
}
func (srv *Server) removePeer(p *Peer) {
diff --git a/p2p/server_test.go b/p2p/server_test.go
index 14e7c7de2..53cc3c258 100644
--- a/p2p/server_test.go
+++ b/p2p/server_test.go
@@ -22,7 +22,7 @@ func startTestServer(t *testing.T, pf newPeerHook) *Server {
ListenAddr: "127.0.0.1:0",
PrivateKey: newkey(),
newPeerHook: pf,
- setupFunc: func(fd net.Conn, prv *ecdsa.PrivateKey, our *protoHandshake, dial *discover.Node) (*conn, error) {
+ setupFunc: func(fd net.Conn, prv *ecdsa.PrivateKey, our *protoHandshake, dial *discover.Node, atcap bool) (*conn, error) {
id := randomID()
rw := newRlpxFrameRW(fd, secrets{
MAC: zero16,
@@ -163,6 +163,62 @@ func TestServerBroadcast(t *testing.T) {
}
}
+// This test checks that connections are disconnected
+// just after the encryption handshake when the server is
+// at capacity.
+//
+// It also serves as a light-weight integration test.
+func TestServerDisconnectAtCap(t *testing.T) {
+ defer testlog(t).detach()
+
+ started := make(chan *Peer)
+ srv := &Server{
+ ListenAddr: "127.0.0.1:0",
+ PrivateKey: newkey(),
+ MaxPeers: 10,
+ NoDial: true,
+ // This hook signals that the peer was actually started. We
+ // need to wait for the peer to be started before dialing the
+ // next connection to get a deterministic peer count.
+ newPeerHook: func(p *Peer) { started <- p },
+ }
+ if err := srv.Start(); err != nil {
+ t.Fatal(err)
+ }
+ defer srv.Stop()
+
+ nconns := srv.MaxPeers + 1
+ dialer := &net.Dialer{Deadline: time.Now().Add(3 * time.Second)}
+ for i := 0; i < nconns; i++ {
+ conn, err := dialer.Dial("tcp", srv.ListenAddr)
+ if err != nil {
+ t.Fatalf("conn %d: dial error: %v", i, err)
+ }
+ // Close the connection when the test ends, before
+ // shutting down the server.
+ defer conn.Close()
+ // Run the handshakes just like a real peer would.
+ key := newkey()
+ hs := &protoHandshake{Version: baseProtocolVersion, ID: discover.PubkeyID(&key.PublicKey)}
+ _, err = setupConn(conn, key, hs, srv.Self(), false)
+ if i == nconns-1 {
+ // When handling the last connection, the server should
+ // disconnect immediately instead of running the protocol
+ // handshake.
+ if err != DiscTooManyPeers {
+ t.Errorf("conn %d: got error %q, expected %q", i, err, DiscTooManyPeers)
+ }
+ } else {
+ // For all earlier connections, the handshake should go through.
+ if err != nil {
+ t.Fatalf("conn %d: unexpected error: %v", i, err)
+ }
+ // Wait for runPeer to be started.
+ <-started
+ }
+ }
+}
+
func newkey() *ecdsa.PrivateKey {
key, err := crypto.GenerateKey()
if err != nil {
diff --git a/ui/qt/qwhisper/whisper.go b/ui/qt/qwhisper/whisper.go
index bf165bbcc..3c2d0a4b9 100644
--- a/ui/qt/qwhisper/whisper.go
+++ b/ui/qt/qwhisper/whisper.go
@@ -37,8 +37,8 @@ func (self *Whisper) Post(payload []string, to, from string, topics []string, pr
pk := crypto.ToECDSAPub(common.FromHex(from))
if key := self.Whisper.GetIdentity(pk); key != nil {
msg := whisper.NewMessage(data)
- envelope, err := msg.Seal(time.Duration(priority*100000), whisper.Opts{
- Ttl: time.Duration(ttl) * time.Second,
+ envelope, err := msg.Wrap(time.Duration(priority*100000), whisper.Options{
+ TTL: time.Duration(ttl) * time.Second,
To: crypto.ToECDSAPub(common.FromHex(to)),
From: key,
Topics: whisper.TopicsFromString(topics...),
diff --git a/whisper/envelope.go b/whisper/envelope.go
index 20e3e6d39..f35a40a42 100644
--- a/whisper/envelope.go
+++ b/whisper/envelope.go
@@ -1,3 +1,6 @@
+// Contains the Whisper protocol Envelope element. For formal details please see
+// the specs at https://github.com/ethereum/wiki/wiki/Whisper-PoC-1-Protocol-Spec#envelopes.
+
package whisper
import (
@@ -12,10 +15,8 @@ import (
"github.com/ethereum/go-ethereum/rlp"
)
-const (
- DefaultPow = 50 * time.Millisecond
-)
-
+// Envelope represents a clear-text data packet to transmit through the Whisper
+// network. Its contents may or may not be encrypted and signed.
type Envelope struct {
Expiry uint32 // Whisper protocol specifies int32, really should be int64
TTL uint32 // ^^^^^^
@@ -26,96 +27,104 @@ type Envelope struct {
hash common.Hash
}
-func (self *Envelope) Hash() common.Hash {
- if (self.hash == common.Hash{}) {
- enc, _ := rlp.EncodeToBytes(self)
- self.hash = crypto.Sha3Hash(enc)
- }
- return self.hash
-}
-
-func NewEnvelope(ttl time.Duration, topics [][]byte, data *Message) *Envelope {
- exp := time.Now().Add(ttl)
+// NewEnvelope wraps a Whisper message with expiration and destination data
+// included into an envelope for network forwarding.
+func NewEnvelope(ttl time.Duration, topics [][]byte, msg *Message) *Envelope {
return &Envelope{
- Expiry: uint32(exp.Unix()),
+ Expiry: uint32(time.Now().Add(ttl).Unix()),
TTL: uint32(ttl.Seconds()),
Topics: topics,
- Data: data.Bytes(),
+ Data: msg.bytes(),
Nonce: 0,
}
}
+// Seal closes the envelope by spending the requested amount of time as a proof
+// of work on hashing the data.
func (self *Envelope) Seal(pow time.Duration) {
- self.proveWork(pow)
-}
-
-func (self *Envelope) Open(prv *ecdsa.PrivateKey) (msg *Message, err error) {
- data := self.Data
- var message Message
- dataStart := 1
- if data[0] > 0 {
- if len(data) < 66 {
- return nil, fmt.Errorf("unable to open envelope. First bit set but len(data) < 66")
- }
- dataStart = 66
- message.Flags = data[0]
- message.Signature = data[1:66]
- }
-
- payload := data[dataStart:]
- if prv != nil {
- message.Payload, err = crypto.Decrypt(prv, payload)
- switch err {
- case nil: // OK
- case ecies.ErrInvalidPublicKey: // Payload isn't encrypted
- message.Payload = payload
- return &message, err
- default:
- return nil, fmt.Errorf("unable to open envelope. Decrypt failed: %v", err)
- }
- }
-
- return &message, nil
-}
-
-func (self *Envelope) proveWork(dura time.Duration) {
- var bestBit int
d := make([]byte, 64)
- enc, _ := rlp.EncodeToBytes(self.withoutNonce())
- copy(d[:32], enc)
+ copy(d[:32], self.rlpWithoutNonce())
- then := time.Now().Add(dura).UnixNano()
- for n := uint32(0); time.Now().UnixNano() < then; {
+ finish, bestBit := time.Now().Add(pow).UnixNano(), 0
+ for nonce := uint32(0); time.Now().UnixNano() < finish; {
for i := 0; i < 1024; i++ {
- binary.BigEndian.PutUint32(d[60:], n)
+ binary.BigEndian.PutUint32(d[60:], nonce)
- fbs := common.FirstBitSet(common.BigD(crypto.Sha3(d)))
- if fbs > bestBit {
- bestBit = fbs
- self.Nonce = n
+ firstBit := common.FirstBitSet(common.BigD(crypto.Sha3(d)))
+ if firstBit > bestBit {
+ self.Nonce, bestBit = nonce, firstBit
}
-
- n++
+ nonce++
}
}
}
+// valid checks whether the claimed proof of work was indeed executed.
+// TODO: Is this really useful? Isn't this always true?
func (self *Envelope) valid() bool {
d := make([]byte, 64)
- enc, _ := rlp.EncodeToBytes(self.withoutNonce())
- copy(d[:32], enc)
+ copy(d[:32], self.rlpWithoutNonce())
binary.BigEndian.PutUint32(d[60:], self.Nonce)
+
return common.FirstBitSet(common.BigD(crypto.Sha3(d))) > 0
}
-func (self *Envelope) withoutNonce() interface{} {
- return []interface{}{self.Expiry, self.TTL, self.Topics, self.Data}
+// rlpWithoutNonce returns the RLP encoded envelope contents, except the nonce.
+func (self *Envelope) rlpWithoutNonce() []byte {
+ enc, _ := rlp.EncodeToBytes([]interface{}{self.Expiry, self.TTL, self.Topics, self.Data})
+ return enc
+}
+
+// Open extracts the message contained within a potentially encrypted envelope.
+func (self *Envelope) Open(key *ecdsa.PrivateKey) (msg *Message, err error) {
+ // Split open the payload into a message construct
+ data := self.Data
+
+ message := &Message{
+ Flags: data[0],
+ }
+ data = data[1:]
+
+ if message.Flags&128 == 128 {
+ if len(data) < 65 {
+ return nil, fmt.Errorf("unable to open envelope. First bit set but len(data) < 65")
+ }
+ message.Signature, data = data[:65], data[65:]
+ }
+ message.Payload = data
+
+ // Short circuit if the encryption was requested
+ if key == nil {
+ return message, nil
+ }
+ // Otherwise try to decrypt the message
+ message.Payload, err = crypto.Decrypt(key, message.Payload)
+ switch err {
+ case nil:
+ return message, nil
+
+ case ecies.ErrInvalidPublicKey: // Payload isn't encrypted
+ return message, err
+
+ default:
+ return nil, fmt.Errorf("unable to open envelope, decrypt failed: %v", err)
+ }
+}
+
+// Hash returns the SHA3 hash of the envelope, calculating it if not yet done.
+func (self *Envelope) Hash() common.Hash {
+ if (self.hash == common.Hash{}) {
+ enc, _ := rlp.EncodeToBytes(self)
+ self.hash = crypto.Sha3Hash(enc)
+ }
+ return self.hash
}
// rlpenv is an Envelope but is not an rlp.Decoder.
// It is used for decoding because we need to
type rlpenv Envelope
+// DecodeRLP decodes an Envelope from an RLP data stream.
func (self *Envelope) DecodeRLP(s *rlp.Stream) error {
raw, err := s.Raw()
if err != nil {
diff --git a/whisper/main.go b/whisper/main.go
index 9f35dbb8d..422f0fa3b 100644
--- a/whisper/main.go
+++ b/whisper/main.go
@@ -1,37 +1,90 @@
// +build none
+// Contains a simple whisper peer setup and self messaging to allow playing
+// around with the protocol and API without a fancy client implementation.
+
package main
import (
"fmt"
"log"
"os"
+ "time"
- "github.com/ethereum/go-ethereum/crypto/secp256k1"
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/p2p"
+ "github.com/ethereum/go-ethereum/p2p/nat"
"github.com/ethereum/go-ethereum/whisper"
)
func main() {
logger.AddLogSystem(logger.NewStdLogSystem(os.Stdout, log.LstdFlags, logger.InfoLevel))
- pub, _ := secp256k1.GenerateKeyPair()
-
- whisper := whisper.New()
+ // Generate the peer identity
+ key, err := crypto.GenerateKey()
+ if err != nil {
+ fmt.Printf("Failed to generate peer key: %v.\n", err)
+ os.Exit(-1)
+ }
+ name := common.MakeName("whisper-go", "1.0")
+ shh := whisper.New()
- srv := p2p.Server{
+ // Create an Ethereum peer to communicate through
+ server := p2p.Server{
+ PrivateKey: key,
MaxPeers: 10,
- Identity: p2p.NewSimpleClientIdentity("whisper-go", "1.0", "", string(pub)),
+ Name: name,
+ Protocols: []p2p.Protocol{shh.Protocol()},
ListenAddr: ":30300",
- NAT: p2p.UPNP(),
-
- Protocols: []p2p.Protocol{whisper.Protocol()},
+ NAT: nat.Any(),
}
- if err := srv.Start(); err != nil {
- fmt.Println("could not start server:", err)
+ fmt.Println("Starting Ethereum peer...")
+ if err := server.Start(); err != nil {
+ fmt.Printf("Failed to start Ethereum peer: %v.\n", err)
os.Exit(1)
}
- select {}
+ // Send a message to self to check that something works
+ payload := fmt.Sprintf("Hello world, this is %v. In case you're wondering, the time is %v", name, time.Now())
+ if err := selfSend(shh, []byte(payload)); err != nil {
+ fmt.Printf("Failed to self message: %v.\n", err)
+ os.Exit(-1)
+ }
+}
+
+// SendSelf wraps a payload into a Whisper envelope and forwards it to itself.
+func selfSend(shh *whisper.Whisper, payload []byte) error {
+ ok := make(chan struct{})
+
+ // Start watching for self messages, output any arrivals
+ id := shh.NewIdentity()
+ shh.Watch(whisper.Filter{
+ To: &id.PublicKey,
+ Fn: func(msg *whisper.Message) {
+ fmt.Printf("Message received: %s, signed with 0x%x.\n", string(msg.Payload), msg.Signature)
+ close(ok)
+ },
+ })
+ // Wrap the payload and encrypt it
+ msg := whisper.NewMessage(payload)
+ envelope, err := msg.Wrap(whisper.DefaultProofOfWork, whisper.Options{
+ From: id,
+ To: &id.PublicKey,
+ TTL: whisper.DefaultTimeToLive,
+ })
+ if err != nil {
+ return fmt.Errorf("failed to seal message: %v", err)
+ }
+ // Dump the message into the system and wait for it to pop back out
+ if err := shh.Send(envelope); err != nil {
+ return fmt.Errorf("failed to send self-message: %v", err)
+ }
+ select {
+ case <-ok:
+ case <-time.After(time.Second):
+ return fmt.Errorf("failed to receive message in time")
+ }
+ return nil
}
diff --git a/whisper/message.go b/whisper/message.go
index ad6a1bcff..2666ee6e0 100644
--- a/whisper/message.go
+++ b/whisper/message.go
@@ -1,7 +1,11 @@
+// Contains the Whisper protocol Message element. For formal details please see
+// the specs at https://github.com/ethereum/wiki/wiki/Whisper-PoC-1-Protocol-Spec#messages.
+
package whisper
import (
"crypto/ecdsa"
+ "math/rand"
"time"
"github.com/ethereum/go-ethereum/crypto"
@@ -9,8 +13,11 @@ import (
"github.com/ethereum/go-ethereum/logger/glog"
)
+// Message represents an end-user data packet to trasmit through the Whisper
+// protocol. These are wrapped into Envelopes that need not be understood by
+// intermediate nodes, just forwarded.
type Message struct {
- Flags byte
+ Flags byte // First bit is signature presence, rest reserved and should be random
Signature []byte
Payload []byte
Sent int64
@@ -18,71 +25,95 @@ type Message struct {
To *ecdsa.PublicKey
}
+// Options specifies the exact way a message should be wrapped into an Envelope.
+type Options struct {
+ From *ecdsa.PrivateKey
+ To *ecdsa.PublicKey
+ TTL time.Duration
+ Topics [][]byte
+}
+
+// NewMessage creates and initializes a non-signed, non-encrypted Whisper message.
func NewMessage(payload []byte) *Message {
- return &Message{Flags: 0, Payload: payload, Sent: time.Now().Unix()}
+ // Construct an initial flag set: bit #1 = 0 (no signature), rest random
+ flags := byte(rand.Intn(128))
+
+ // Assemble and return the message
+ return &Message{
+ Flags: flags,
+ Payload: payload,
+ Sent: time.Now().Unix(),
+ }
}
-func (self *Message) hash() []byte {
- return crypto.Sha3(append([]byte{self.Flags}, self.Payload...))
+// Wrap bundles the message into an Envelope to transmit over the network.
+//
+// pow (Proof Of Work) controls how much time to spend on hashing the message,
+// inherently controlling its priority through the network (smaller hash, bigger
+// priority).
+//
+// The user can control the amount of identity, privacy and encryption through
+// the options parameter as follows:
+// - options.From == nil && options.To == nil: anonymous broadcast
+// - options.From != nil && options.To == nil: signed broadcast (known sender)
+// - options.From == nil && options.To != nil: encrypted anonymous message
+// - options.From != nil && options.To != nil: encrypted signed message
+func (self *Message) Wrap(pow time.Duration, options Options) (*Envelope, error) {
+ // Use the default TTL if non was specified
+ if options.TTL == 0 {
+ options.TTL = DefaultTimeToLive
+ }
+ // Sign and encrypt the message if requested
+ if options.From != nil {
+ if err := self.sign(options.From); err != nil {
+ return nil, err
+ }
+ }
+ if options.To != nil {
+ if err := self.encrypt(options.To); err != nil {
+ return nil, err
+ }
+ }
+ // Wrap the processed message, seal it and return
+ envelope := NewEnvelope(options.TTL, options.Topics, self)
+ envelope.Seal(pow)
+
+ return envelope, nil
}
+// sign calculates and sets the cryptographic signature for the message , also
+// setting the sign flag.
func (self *Message) sign(key *ecdsa.PrivateKey) (err error) {
- self.Flags = 1
+ self.Flags |= 1 << 7
self.Signature, err = crypto.Sign(self.hash(), key)
return
}
+// Recover retrieves the public key of the message signer.
func (self *Message) Recover() *ecdsa.PublicKey {
- defer func() { recover() }() // in case of invalid sig
+ defer func() { recover() }() // in case of invalid signature
+
pub, err := crypto.SigToPub(self.hash(), self.Signature)
if err != nil {
- glog.V(logger.Error).Infof("Could not get pubkey from signature: ", err)
+ glog.V(logger.Error).Infof("Could not get public key from signature: %v", err)
return nil
}
return pub
}
-func (self *Message) Encrypt(to *ecdsa.PublicKey) (err error) {
+// encrypt encrypts a message payload with a public key.
+func (self *Message) encrypt(to *ecdsa.PublicKey) (err error) {
self.Payload, err = crypto.Encrypt(to, self.Payload)
- if err != nil {
- return err
- }
-
- return nil
-}
-
-func (self *Message) Bytes() []byte {
- return append([]byte{self.Flags}, append(self.Signature, self.Payload...)...)
+ return
}
-type Opts struct {
- From *ecdsa.PrivateKey
- To *ecdsa.PublicKey
- Ttl time.Duration
- Topics [][]byte
+// hash calculates the SHA3 checksum of the message flags and payload.
+func (self *Message) hash() []byte {
+ return crypto.Sha3(append([]byte{self.Flags}, self.Payload...))
}
-func (self *Message) Seal(pow time.Duration, opts Opts) (*Envelope, error) {
- if opts.From != nil {
- err := self.sign(opts.From)
- if err != nil {
- return nil, err
- }
- }
-
- if opts.To != nil {
- err := self.Encrypt(opts.To)
- if err != nil {
- return nil, err
- }
- }
-
- if opts.Ttl == 0 {
- opts.Ttl = DefaultTtl
- }
-
- envelope := NewEnvelope(opts.Ttl, opts.Topics, self)
- envelope.Seal(pow)
-
- return envelope, nil
+// bytes flattens the message contents (flags, signature and payload) into a
+// single binary blob.
+func (self *Message) bytes() []byte {
+ return append([]byte{self.Flags}, append(self.Signature, self.Payload...)...)
}
diff --git a/whisper/message_test.go b/whisper/message_test.go
new file mode 100644
index 000000000..8d4c5e990
--- /dev/null
+++ b/whisper/message_test.go
@@ -0,0 +1,138 @@
+package whisper
+
+import (
+ "bytes"
+ "crypto/elliptic"
+ "testing"
+
+ "github.com/ethereum/go-ethereum/crypto"
+)
+
+// Tests whether a message can be wrapped without any identity or encryption.
+func TestMessageSimpleWrap(t *testing.T) {
+ payload := []byte("hello world")
+
+ msg := NewMessage(payload)
+ if _, err := msg.Wrap(DefaultProofOfWork, Options{}); err != nil {
+ t.Fatalf("failed to wrap message: %v", err)
+ }
+ if msg.Flags&128 != 0 {
+ t.Fatalf("signature flag mismatch: have %d, want %d", (msg.Flags&128)>>7, 0)
+ }
+ if len(msg.Signature) != 0 {
+ t.Fatalf("signature found for simple wrapping: 0x%x", msg.Signature)
+ }
+ if bytes.Compare(msg.Payload, payload) != 0 {
+ t.Fatalf("payload mismatch after wrapping: have 0x%x, want 0x%x", msg.Payload, payload)
+ }
+}
+
+// Tests whether a message can be signed, and wrapped in plain-text.
+func TestMessageCleartextSignRecover(t *testing.T) {
+ key, err := crypto.GenerateKey()
+ if err != nil {
+ t.Fatalf("failed to create crypto key: %v", err)
+ }
+ payload := []byte("hello world")
+
+ msg := NewMessage(payload)
+ if _, err := msg.Wrap(DefaultProofOfWork, Options{
+ From: key,
+ }); err != nil {
+ t.Fatalf("failed to sign message: %v", err)
+ }
+ if msg.Flags&128 != 128 {
+ t.Fatalf("signature flag mismatch: have %d, want %d", (msg.Flags&128)>>7, 1)
+ }
+ if bytes.Compare(msg.Payload, payload) != 0 {
+ t.Fatalf("payload mismatch after signing: have 0x%x, want 0x%x", msg.Payload, payload)
+ }
+
+ pubKey := msg.Recover()
+ if pubKey == nil {
+ t.Fatalf("failed to recover public key")
+ }
+ p1 := elliptic.Marshal(crypto.S256(), key.PublicKey.X, key.PublicKey.Y)
+ p2 := elliptic.Marshal(crypto.S256(), pubKey.X, pubKey.Y)
+ if !bytes.Equal(p1, p2) {
+ t.Fatalf("public key mismatch: have 0x%x, want 0x%x", p2, p1)
+ }
+}
+
+// Tests whether a message can be encrypted and decrypted using an anonymous
+// sender (i.e. no signature).
+func TestMessageAnonymousEncryptDecrypt(t *testing.T) {
+ key, err := crypto.GenerateKey()
+ if err != nil {
+ t.Fatalf("failed to create recipient crypto key: %v", err)
+ }
+ payload := []byte("hello world")
+
+ msg := NewMessage(payload)
+ envelope, err := msg.Wrap(DefaultProofOfWork, Options{
+ To: &key.PublicKey,
+ })
+ if err != nil {
+ t.Fatalf("failed to encrypt message: %v", err)
+ }
+ if msg.Flags&128 != 0 {
+ t.Fatalf("signature flag mismatch: have %d, want %d", (msg.Flags&128)>>7, 0)
+ }
+ if len(msg.Signature) != 0 {
+ t.Fatalf("signature found for anonymous message: 0x%x", msg.Signature)
+ }
+
+ out, err := envelope.Open(key)
+ if err != nil {
+ t.Fatalf("failed to open encrypted message: %v", err)
+ }
+ if !bytes.Equal(out.Payload, payload) {
+ t.Error("payload mismatch: have 0x%x, want 0x%x", out.Payload, payload)
+ }
+}
+
+// Tests whether a message can be properly signed and encrypted.
+func TestMessageFullCrypto(t *testing.T) {
+ fromKey, err := crypto.GenerateKey()
+ if err != nil {
+ t.Fatalf("failed to create sender crypto key: %v", err)
+ }
+ toKey, err := crypto.GenerateKey()
+ if err != nil {
+ t.Fatalf("failed to create recipient crypto key: %v", err)
+ }
+
+ payload := []byte("hello world")
+ msg := NewMessage(payload)
+ envelope, err := msg.Wrap(DefaultProofOfWork, Options{
+ From: fromKey,
+ To: &toKey.PublicKey,
+ })
+ if err != nil {
+ t.Fatalf("failed to encrypt message: %v", err)
+ }
+ if msg.Flags&128 != 128 {
+ t.Fatalf("signature flag mismatch: have %d, want %d", (msg.Flags&128)>>7, 1)
+ }
+ if len(msg.Signature) == 0 {
+ t.Fatalf("no signature found for signed message")
+ }
+
+ out, err := envelope.Open(toKey)
+ if err != nil {
+ t.Fatalf("failed to open encrypted message: %v", err)
+ }
+ if !bytes.Equal(out.Payload, payload) {
+ t.Error("payload mismatch: have 0x%x, want 0x%x", out.Payload, payload)
+ }
+
+ pubKey := out.Recover()
+ if pubKey == nil {
+ t.Fatalf("failed to recover public key")
+ }
+ p1 := elliptic.Marshal(crypto.S256(), fromKey.PublicKey.X, fromKey.PublicKey.Y)
+ p2 := elliptic.Marshal(crypto.S256(), pubKey.X, pubKey.Y)
+ if !bytes.Equal(p1, p2) {
+ t.Fatalf("public key mismatch: have 0x%x, want 0x%x", p2, p1)
+ }
+}
diff --git a/whisper/messages_test.go b/whisper/messages_test.go
deleted file mode 100644
index 93caa31b3..000000000
--- a/whisper/messages_test.go
+++ /dev/null
@@ -1,50 +0,0 @@
-package whisper
-
-import (
- "bytes"
- "crypto/elliptic"
- "fmt"
- "testing"
-
- "github.com/ethereum/go-ethereum/crypto"
-)
-
-func TestSign(t *testing.T) {
- prv, _ := crypto.GenerateKey()
- msg := NewMessage([]byte("hello world"))
- msg.sign(prv)
-
- pubKey := msg.Recover()
- p1 := elliptic.Marshal(crypto.S256(), prv.PublicKey.X, prv.PublicKey.Y)
- p2 := elliptic.Marshal(crypto.S256(), pubKey.X, pubKey.Y)
-
- if !bytes.Equal(p1, p2) {
- t.Error("recovered pub key did not match")
- }
-}
-
-func TestMessageEncryptDecrypt(t *testing.T) {
- prv1, _ := crypto.GenerateKey()
- prv2, _ := crypto.GenerateKey()
-
- data := []byte("hello world")
- msg := NewMessage(data)
- envelope, err := msg.Seal(DefaultPow, Opts{
- From: prv1,
- To: &prv2.PublicKey,
- })
- if err != nil {
- fmt.Println(err)
- t.FailNow()
- }
-
- msg1, err := envelope.Open(prv2)
- if err != nil {
- t.Error(err)
- t.FailNow()
- }
-
- if !bytes.Equal(msg1.Payload, data) {
- t.Error("encryption error. data did not match")
- }
-}
diff --git a/whisper/whisper.go b/whisper/whisper.go
index 00dcb1932..d803e27d4 100644
--- a/whisper/whisper.go
+++ b/whisper/whisper.go
@@ -28,7 +28,10 @@ type MessageEvent struct {
Message *Message
}
-const DefaultTtl = 50 * time.Second
+const (
+ DefaultTimeToLive = 50 * time.Second
+ DefaultProofOfWork = 50 * time.Millisecond
+)
type Whisper struct {
protocol p2p.Protocol
diff --git a/whisper/whisper_test.go b/whisper/whisper_test.go
index 3e3945a0a..b29e34a5e 100644
--- a/whisper/whisper_test.go
+++ b/whisper/whisper_test.go
@@ -18,8 +18,8 @@ func TestEvent(t *testing.T) {
})
msg := NewMessage([]byte(fmt.Sprintf("Hello world. This is whisper-go. Incase you're wondering; the time is %v", time.Now())))
- envelope, err := msg.Seal(DefaultPow, Opts{
- Ttl: DefaultTtl,
+ envelope, err := msg.Wrap(DefaultProofOfWork, Options{
+ TTL: DefaultTimeToLive,
From: id,
To: &id.PublicKey,
})
diff --git a/xeth/whisper.go b/xeth/whisper.go
index 72e1ee04f..51caec8d6 100644
--- a/xeth/whisper.go
+++ b/xeth/whisper.go
@@ -32,8 +32,8 @@ func (self *Whisper) Post(payload string, to, from string, topics []string, prio
pk := crypto.ToECDSAPub(common.FromHex(from))
if key := self.Whisper.GetIdentity(pk); key != nil || len(from) == 0 {
msg := whisper.NewMessage(common.FromHex(payload))
- envelope, err := msg.Seal(time.Duration(priority*100000), whisper.Opts{
- Ttl: time.Duration(ttl) * time.Second,
+ envelope, err := msg.Wrap(time.Duration(priority*100000), whisper.Options{
+ TTL: time.Duration(ttl) * time.Second,
To: crypto.ToECDSAPub(common.FromHex(to)),
From: key,
Topics: whisper.TopicsFromString(topics...),