aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorobscuren <geffobscura@gmail.com>2015-04-18 21:14:56 +0800
committerobscuren <geffobscura@gmail.com>2015-04-18 21:14:56 +0800
commitff67fbf96448b83b778960a6c20ea8dfd854c825 (patch)
tree557b494fb21eb3bad72121e82320cb158b00f93a
parent8244825bbf9ca7342c052508f50a56b16c979a1e (diff)
parent525cefa37aafbc42de8911344c9853d950c06ded (diff)
downloadgo-tangerine-ff67fbf96448b83b778960a6c20ea8dfd854c825.tar
go-tangerine-ff67fbf96448b83b778960a6c20ea8dfd854c825.tar.gz
go-tangerine-ff67fbf96448b83b778960a6c20ea8dfd854c825.tar.bz2
go-tangerine-ff67fbf96448b83b778960a6c20ea8dfd854c825.tar.lz
go-tangerine-ff67fbf96448b83b778960a6c20ea8dfd854c825.tar.xz
go-tangerine-ff67fbf96448b83b778960a6c20ea8dfd854c825.tar.zst
go-tangerine-ff67fbf96448b83b778960a6c20ea8dfd854c825.zip
Merge branch 'develop' into downloader-proto
-rw-r--r--Godeps/Godeps.json4
-rw-r--r--Godeps/_workspace/src/github.com/ethereum/ethash/ethash.go4
-rw-r--r--miner/worker.go2
-rw-r--r--p2p/server.go11
-rw-r--r--rpc/api.go4
-rw-r--r--rpc/args.go2
-rw-r--r--ui/qt/qwhisper/whisper.go4
-rw-r--r--whisper/envelope.go29
-rw-r--r--whisper/filter.go11
-rw-r--r--whisper/main.go4
-rw-r--r--whisper/message.go26
-rw-r--r--whisper/message_test.go24
-rw-r--r--whisper/peer.go172
-rw-r--r--whisper/peer_test.go242
-rw-r--r--whisper/sort.go29
-rw-r--r--whisper/sort_test.go23
-rw-r--r--whisper/topic.go61
-rw-r--r--whisper/topic_test.go67
-rw-r--r--whisper/util.go36
-rw-r--r--whisper/whisper.go334
-rw-r--r--whisper/whisper_test.go189
-rw-r--r--xeth/whisper.go4
22 files changed, 916 insertions, 366 deletions
diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json
index c51a2312b..bc5e3144a 100644
--- a/Godeps/Godeps.json
+++ b/Godeps/Godeps.json
@@ -22,8 +22,8 @@
},
{
"ImportPath": "github.com/ethereum/ethash",
- "Comment": "v23.1-81-g4039fd0",
- "Rev": "4039fd095084679fb0bf3feae91d02506b5d67aa"
+ "Comment": "v23.1-82-g908aad3",
+ "Rev": "908aad345c9fbf3ab9bbb94031dc02d0d90df1b8"
},
{
"ImportPath": "github.com/ethereum/serpent-go",
diff --git a/Godeps/_workspace/src/github.com/ethereum/ethash/ethash.go b/Godeps/_workspace/src/github.com/ethereum/ethash/ethash.go
index c33bfccc4..74285a33c 100644
--- a/Godeps/_workspace/src/github.com/ethereum/ethash/ethash.go
+++ b/Godeps/_workspace/src/github.com/ethereum/ethash/ethash.go
@@ -91,7 +91,7 @@ func makeParamsAndCache(chainManager pow.ChainManager, blockNum uint64) (*Params
return nil, err
}
- glog.V(logger.Info).Infoln("Making cache")
+ glog.V(logger.Info).Infof("Making cache for epoch: %d (%v) (%x)\n", paramsAndCache.Epoch, blockNum, seedHash)
start := time.Now()
C.ethash_mkcache(paramsAndCache.cache, paramsAndCache.params, (*C.ethash_blockhash_t)(unsafe.Pointer(&seedHash[0])))
@@ -387,7 +387,7 @@ func (pow *Ethash) verify(hash common.Hash, mixDigest common.Hash, difficulty *b
if blockNum/epochLength < pow.paramsAndCache.Epoch {
var err error
// If we can't make the params for some reason, this block is invalid
- pAc, err = makeParamsAndCache(pow.chainManager, blockNum+1)
+ pAc, err = makeParamsAndCache(pow.chainManager, blockNum)
if err != nil {
glog.V(logger.Info).Infoln("big fucking eror", err)
return false
diff --git a/miner/worker.go b/miner/worker.go
index 9fb248efa..daabd3db5 100644
--- a/miner/worker.go
+++ b/miner/worker.go
@@ -201,7 +201,7 @@ func (self *worker) wait() {
}
self.mux.Post(core.NewMinedBlockEvent{block})
- glog.V(logger.Info).Infof("🔨 Mined block #%v", block.Number())
+ glog.V(logger.Info).Infof("🔨 Mined block #%v", block.Number())
jsonlogger.LogJson(&logger.EthMinerNewBlock{
BlockHash: block.Hash().Hex(),
diff --git a/p2p/server.go b/p2p/server.go
index eaffc9d13..b5c4a1f59 100644
--- a/p2p/server.go
+++ b/p2p/server.go
@@ -359,9 +359,11 @@ func (srv *Server) dialLoop() {
rand.Read(target[:])
findresults <- srv.ntab.Lookup(target)
}()
- refresh.Stop()
+ } else {
+ // Make sure we check again if the peer count falls
+ // below MaxPeers.
+ refresh.Reset(refreshPeersInterval)
}
-
case dest := <-srv.peerConnect:
dial(dest)
case dests := <-findresults:
@@ -371,7 +373,10 @@ func (srv *Server) dialLoop() {
refresh.Reset(refreshPeersInterval)
case dest := <-dialed:
delete(dialing, dest.ID)
-
+ if len(dialing) == 0 {
+ // Check again immediately after dialing all current candidates.
+ refresh.Reset(0)
+ }
case <-srv.quit:
// TODO: maybe wait for active dials
return
diff --git a/rpc/api.go b/rpc/api.go
index 4b61fa3a5..bf5066f9a 100644
--- a/rpc/api.go
+++ b/rpc/api.go
@@ -182,8 +182,8 @@ func (api *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) err
if err != nil {
return err
}
-
- *reply = v
+ // TODO unwrap the parent method's ToHex call
+ *reply = newHexData(common.FromHex(v))
case "eth_flush":
return NewNotImplementedError(req.Method)
case "eth_getBlockByHash":
diff --git a/rpc/args.go b/rpc/args.go
index 4bc36f5d9..4b3840285 100644
--- a/rpc/args.go
+++ b/rpc/args.go
@@ -279,6 +279,8 @@ func (args *CallArgs) UnmarshalJSON(b []byte) (err error) {
return NewDecodeParamError(err.Error())
}
+ args.From = ext.From
+
if len(ext.To) == 0 {
return NewValidationError("to", "is required")
}
diff --git a/ui/qt/qwhisper/whisper.go b/ui/qt/qwhisper/whisper.go
index 3c2d0a4b9..50b0626f5 100644
--- a/ui/qt/qwhisper/whisper.go
+++ b/ui/qt/qwhisper/whisper.go
@@ -41,7 +41,7 @@ func (self *Whisper) Post(payload []string, to, from string, topics []string, pr
TTL: time.Duration(ttl) * time.Second,
To: crypto.ToECDSAPub(common.FromHex(to)),
From: key,
- Topics: whisper.TopicsFromString(topics...),
+ Topics: whisper.NewTopicsFromStrings(topics...),
})
if err != nil {
@@ -106,7 +106,7 @@ func filterFromMap(opts map[string]interface{}) (f whisper.Filter) {
if topicList, ok := opts["topics"].(*qml.List); ok {
var topics []string
topicList.Convert(&topics)
- f.Topics = whisper.TopicsFromString(topics...)
+ f.Topics = whisper.NewTopicsFromStrings(topics...)
}
return
diff --git a/whisper/envelope.go b/whisper/envelope.go
index f35a40a42..0a817e26e 100644
--- a/whisper/envelope.go
+++ b/whisper/envelope.go
@@ -20,16 +20,16 @@ import (
type Envelope struct {
Expiry uint32 // Whisper protocol specifies int32, really should be int64
TTL uint32 // ^^^^^^
- Topics [][]byte
+ Topics []Topic
Data []byte
Nonce uint32
- hash common.Hash
+ hash common.Hash // Cached hash of the envelope to avoid rehashing every time
}
// NewEnvelope wraps a Whisper message with expiration and destination data
// included into an envelope for network forwarding.
-func NewEnvelope(ttl time.Duration, topics [][]byte, msg *Message) *Envelope {
+func NewEnvelope(ttl time.Duration, topics []Topic, msg *Message) *Envelope {
return &Envelope{
Expiry: uint32(time.Now().Add(ttl).Unix()),
TTL: uint32(ttl.Seconds()),
@@ -59,16 +59,6 @@ func (self *Envelope) Seal(pow time.Duration) {
}
}
-// valid checks whether the claimed proof of work was indeed executed.
-// TODO: Is this really useful? Isn't this always true?
-func (self *Envelope) valid() bool {
- d := make([]byte, 64)
- copy(d[:32], self.rlpWithoutNonce())
- binary.BigEndian.PutUint32(d[60:], self.Nonce)
-
- return common.FirstBitSet(common.BigD(crypto.Sha3(d))) > 0
-}
-
// rlpWithoutNonce returns the RLP encoded envelope contents, except the nonce.
func (self *Envelope) rlpWithoutNonce() []byte {
enc, _ := rlp.EncodeToBytes([]interface{}{self.Expiry, self.TTL, self.Topics, self.Data})
@@ -85,20 +75,19 @@ func (self *Envelope) Open(key *ecdsa.PrivateKey) (msg *Message, err error) {
}
data = data[1:]
- if message.Flags&128 == 128 {
- if len(data) < 65 {
- return nil, fmt.Errorf("unable to open envelope. First bit set but len(data) < 65")
+ if message.Flags&signatureFlag == signatureFlag {
+ if len(data) < signatureLength {
+ return nil, fmt.Errorf("unable to open envelope. First bit set but len(data) < len(signature)")
}
- message.Signature, data = data[:65], data[65:]
+ message.Signature, data = data[:signatureLength], data[signatureLength:]
}
message.Payload = data
- // Short circuit if the encryption was requested
+ // Decrypt the message, if requested
if key == nil {
return message, nil
}
- // Otherwise try to decrypt the message
- message.Payload, err = crypto.Decrypt(key, message.Payload)
+ err = message.decrypt(key)
switch err {
case nil:
return message, nil
diff --git a/whisper/filter.go b/whisper/filter.go
index b33f2c1a2..8fcc45afd 100644
--- a/whisper/filter.go
+++ b/whisper/filter.go
@@ -1,10 +1,13 @@
+// Contains the message filter for fine grained subscriptions.
+
package whisper
import "crypto/ecdsa"
+// Filter is used to subscribe to specific types of whisper messages.
type Filter struct {
- To *ecdsa.PublicKey
- From *ecdsa.PublicKey
- Topics [][]byte
- Fn func(*Message)
+ To *ecdsa.PublicKey // Recipient of the message
+ From *ecdsa.PublicKey // Sender of the message
+ Topics []Topic // Topics to watch messages on
+ Fn func(*Message) // Handler in case of a match
}
diff --git a/whisper/main.go b/whisper/main.go
index 422f0fa3b..3c8c3801f 100644
--- a/whisper/main.go
+++ b/whisper/main.go
@@ -69,10 +69,10 @@ func selfSend(shh *whisper.Whisper, payload []byte) error {
})
// Wrap the payload and encrypt it
msg := whisper.NewMessage(payload)
- envelope, err := msg.Wrap(whisper.DefaultProofOfWork, whisper.Options{
+ envelope, err := msg.Wrap(whisper.DefaultPoW, whisper.Options{
From: id,
To: &id.PublicKey,
- TTL: whisper.DefaultTimeToLive,
+ TTL: whisper.DefaultTTL,
})
if err != nil {
return fmt.Errorf("failed to seal message: %v", err)
diff --git a/whisper/message.go b/whisper/message.go
index 2666ee6e0..07c673567 100644
--- a/whisper/message.go
+++ b/whisper/message.go
@@ -30,13 +30,14 @@ type Options struct {
From *ecdsa.PrivateKey
To *ecdsa.PublicKey
TTL time.Duration
- Topics [][]byte
+ Topics []Topic
}
// NewMessage creates and initializes a non-signed, non-encrypted Whisper message.
func NewMessage(payload []byte) *Message {
- // Construct an initial flag set: bit #1 = 0 (no signature), rest random
- flags := byte(rand.Intn(128))
+ // Construct an initial flag set: no signature, rest random
+ flags := byte(rand.Intn(256))
+ flags &= ^signatureFlag
// Assemble and return the message
return &Message{
@@ -61,7 +62,7 @@ func NewMessage(payload []byte) *Message {
func (self *Message) Wrap(pow time.Duration, options Options) (*Envelope, error) {
// Use the default TTL if non was specified
if options.TTL == 0 {
- options.TTL = DefaultTimeToLive
+ options.TTL = DefaultTTL
}
// Sign and encrypt the message if requested
if options.From != nil {
@@ -84,7 +85,7 @@ func (self *Message) Wrap(pow time.Duration, options Options) (*Envelope, error)
// sign calculates and sets the cryptographic signature for the message , also
// setting the sign flag.
func (self *Message) sign(key *ecdsa.PrivateKey) (err error) {
- self.Flags |= 1 << 7
+ self.Flags |= signatureFlag
self.Signature, err = crypto.Sign(self.hash(), key)
return
}
@@ -93,6 +94,11 @@ func (self *Message) sign(key *ecdsa.PrivateKey) (err error) {
func (self *Message) Recover() *ecdsa.PublicKey {
defer func() { recover() }() // in case of invalid signature
+ // Short circuit if no signature is present
+ if self.Signature == nil {
+ return nil
+ }
+ // Otherwise try and recover the signature
pub, err := crypto.SigToPub(self.hash(), self.Signature)
if err != nil {
glog.V(logger.Error).Infof("Could not get public key from signature: %v", err)
@@ -102,8 +108,14 @@ func (self *Message) Recover() *ecdsa.PublicKey {
}
// encrypt encrypts a message payload with a public key.
-func (self *Message) encrypt(to *ecdsa.PublicKey) (err error) {
- self.Payload, err = crypto.Encrypt(to, self.Payload)
+func (self *Message) encrypt(key *ecdsa.PublicKey) (err error) {
+ self.Payload, err = crypto.Encrypt(key, self.Payload)
+ return
+}
+
+// decrypt decrypts an encrypted payload with a private key.
+func (self *Message) decrypt(key *ecdsa.PrivateKey) (err error) {
+ self.Payload, err = crypto.Decrypt(key, self.Payload)
return
}
diff --git a/whisper/message_test.go b/whisper/message_test.go
index 8d4c5e990..18a254e5c 100644
--- a/whisper/message_test.go
+++ b/whisper/message_test.go
@@ -13,11 +13,11 @@ func TestMessageSimpleWrap(t *testing.T) {
payload := []byte("hello world")
msg := NewMessage(payload)
- if _, err := msg.Wrap(DefaultProofOfWork, Options{}); err != nil {
+ if _, err := msg.Wrap(DefaultPoW, Options{}); err != nil {
t.Fatalf("failed to wrap message: %v", err)
}
- if msg.Flags&128 != 0 {
- t.Fatalf("signature flag mismatch: have %d, want %d", (msg.Flags&128)>>7, 0)
+ if msg.Flags&signatureFlag != 0 {
+ t.Fatalf("signature flag mismatch: have %d, want %d", msg.Flags&signatureFlag, 0)
}
if len(msg.Signature) != 0 {
t.Fatalf("signature found for simple wrapping: 0x%x", msg.Signature)
@@ -36,13 +36,13 @@ func TestMessageCleartextSignRecover(t *testing.T) {
payload := []byte("hello world")
msg := NewMessage(payload)
- if _, err := msg.Wrap(DefaultProofOfWork, Options{
+ if _, err := msg.Wrap(DefaultPoW, Options{
From: key,
}); err != nil {
t.Fatalf("failed to sign message: %v", err)
}
- if msg.Flags&128 != 128 {
- t.Fatalf("signature flag mismatch: have %d, want %d", (msg.Flags&128)>>7, 1)
+ if msg.Flags&signatureFlag != signatureFlag {
+ t.Fatalf("signature flag mismatch: have %d, want %d", msg.Flags&signatureFlag, signatureFlag)
}
if bytes.Compare(msg.Payload, payload) != 0 {
t.Fatalf("payload mismatch after signing: have 0x%x, want 0x%x", msg.Payload, payload)
@@ -69,14 +69,14 @@ func TestMessageAnonymousEncryptDecrypt(t *testing.T) {
payload := []byte("hello world")
msg := NewMessage(payload)
- envelope, err := msg.Wrap(DefaultProofOfWork, Options{
+ envelope, err := msg.Wrap(DefaultPoW, Options{
To: &key.PublicKey,
})
if err != nil {
t.Fatalf("failed to encrypt message: %v", err)
}
- if msg.Flags&128 != 0 {
- t.Fatalf("signature flag mismatch: have %d, want %d", (msg.Flags&128)>>7, 0)
+ if msg.Flags&signatureFlag != 0 {
+ t.Fatalf("signature flag mismatch: have %d, want %d", msg.Flags&signatureFlag, 0)
}
if len(msg.Signature) != 0 {
t.Fatalf("signature found for anonymous message: 0x%x", msg.Signature)
@@ -104,15 +104,15 @@ func TestMessageFullCrypto(t *testing.T) {
payload := []byte("hello world")
msg := NewMessage(payload)
- envelope, err := msg.Wrap(DefaultProofOfWork, Options{
+ envelope, err := msg.Wrap(DefaultPoW, Options{
From: fromKey,
To: &toKey.PublicKey,
})
if err != nil {
t.Fatalf("failed to encrypt message: %v", err)
}
- if msg.Flags&128 != 128 {
- t.Fatalf("signature flag mismatch: have %d, want %d", (msg.Flags&128)>>7, 1)
+ if msg.Flags&signatureFlag != signatureFlag {
+ t.Fatalf("signature flag mismatch: have %d, want %d", msg.Flags&signatureFlag, signatureFlag)
}
if len(msg.Signature) == 0 {
t.Fatalf("no signature found for signed message")
diff --git a/whisper/peer.go b/whisper/peer.go
index 338166c25..e4301f37c 100644
--- a/whisper/peer.go
+++ b/whisper/peer.go
@@ -4,110 +4,160 @@ import (
"fmt"
"time"
+ "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rlp"
"gopkg.in/fatih/set.v0"
)
-const (
- protocolVersion uint64 = 0x02
-)
-
+// peer represents a whisper protocol peer connection.
type peer struct {
host *Whisper
peer *p2p.Peer
ws p2p.MsgReadWriter
- // XXX Eventually this is going to reach exceptional large space. We need an expiry here
- known *set.Set
+ known *set.Set // Messages already known by the peer to avoid wasting bandwidth
quit chan struct{}
}
-func NewPeer(host *Whisper, p *p2p.Peer, ws p2p.MsgReadWriter) *peer {
- return &peer{host, p, ws, set.New(), make(chan struct{})}
-}
-
-func (self *peer) init() error {
- if err := self.handleStatus(); err != nil {
- return err
+// newPeer creates and initializes a new whisper peer connection, returning either
+// the newly constructed link or a failure reason.
+func newPeer(host *Whisper, remote *p2p.Peer, rw p2p.MsgReadWriter) (*peer, error) {
+ p := &peer{
+ host: host,
+ peer: remote,
+ ws: rw,
+ known: set.New(),
+ quit: make(chan struct{}),
}
-
- return nil
+ if err := p.handshake(); err != nil {
+ return nil, err
+ }
+ return p, nil
}
+// start initiates the peer updater, periodically broadcasting the whisper packets
+// into the network.
func (self *peer) start() {
go self.update()
self.peer.Debugln("whisper started")
}
+// stop terminates the peer updater, stopping message forwarding to it.
func (self *peer) stop() {
+ close(self.quit)
self.peer.Debugln("whisper stopped")
+}
- close(self.quit)
+// handshake sends the protocol initiation status message to the remote peer and
+// verifies the remote status too.
+func (self *peer) handshake() error {
+ // Send the handshake status message asynchronously
+ errc := make(chan error, 1)
+ go func() {
+ errc <- p2p.SendItems(self.ws, statusCode, protocolVersion)
+ }()
+ // Fetch the remote status packet and verify protocol match
+ packet, err := self.ws.ReadMsg()
+ if err != nil {
+ return err
+ }
+ if packet.Code != statusCode {
+ return fmt.Errorf("peer sent %x before status packet", packet.Code)
+ }
+ s := rlp.NewStream(packet.Payload)
+ if _, err := s.List(); err != nil {
+ return fmt.Errorf("bad status message: %v", err)
+ }
+ peerVersion, err := s.Uint()
+ if err != nil {
+ return fmt.Errorf("bad status message: %v", err)
+ }
+ if peerVersion != protocolVersion {
+ return fmt.Errorf("protocol version mismatch %d != %d", peerVersion, protocolVersion)
+ }
+ // Wait until out own status is consumed too
+ if err := <-errc; err != nil {
+ return fmt.Errorf("failed to send status packet: %v", err)
+ }
+ return nil
}
+// update executes periodic operations on the peer, including message transmission
+// and expiration.
func (self *peer) update() {
- relay := time.NewTicker(300 * time.Millisecond)
-out:
+ // Start the tickers for the updates
+ expire := time.NewTicker(expirationCycle)
+ transmit := time.NewTicker(transmissionCycle)
+
+ // Loop and transmit until termination is requested
for {
select {
- case <-relay.C:
- err := self.broadcast(self.host.envelopes())
- if err != nil {
- self.peer.Infoln("broadcast err:", err)
- break out
+ case <-expire.C:
+ self.expire()
+
+ case <-transmit.C:
+ if err := self.broadcast(); err != nil {
+ self.peer.Infoln("broadcast failed:", err)
+ return
}
case <-self.quit:
- break out
+ return
}
}
}
-func (self *peer) broadcast(envelopes []*Envelope) error {
- envs := make([]*Envelope, 0, len(envelopes))
- for _, env := range envelopes {
- if !self.known.Has(env.Hash()) {
- envs = append(envs, env)
- self.known.Add(env.Hash())
- }
- }
- if len(envs) > 0 {
- if err := p2p.Send(self.ws, envelopesMsg, envs); err != nil {
- return err
- }
- self.peer.DebugDetailln("broadcasted", len(envs), "message(s)")
- }
- return nil
+// mark marks an envelope known to the peer so that it won't be sent back.
+func (self *peer) mark(envelope *Envelope) {
+ self.known.Add(envelope.Hash())
}
-func (self *peer) addKnown(envelope *Envelope) {
- self.known.Add(envelope.Hash())
+// marked checks if an envelope is already known to the remote peer.
+func (self *peer) marked(envelope *Envelope) bool {
+ return self.known.Has(envelope.Hash())
}
-func (self *peer) handleStatus() error {
- ws := self.ws
- if err := p2p.SendItems(ws, statusMsg, protocolVersion); err != nil {
- return err
- }
- msg, err := ws.ReadMsg()
- if err != nil {
- return err
+// expire iterates over all the known envelopes in the host and removes all
+// expired (unknown) ones from the known list.
+func (self *peer) expire() {
+ // Assemble the list of available envelopes
+ available := set.NewNonTS()
+ for _, envelope := range self.host.envelopes() {
+ available.Add(envelope.Hash())
}
- if msg.Code != statusMsg {
- return fmt.Errorf("peer send %x before status msg", msg.Code)
- }
- s := rlp.NewStream(msg.Payload)
- if _, err := s.List(); err != nil {
- return fmt.Errorf("bad status message: %v", err)
+ // Cross reference availability with known status
+ unmark := make(map[common.Hash]struct{})
+ self.known.Each(func(v interface{}) bool {
+ if !available.Has(v.(common.Hash)) {
+ unmark[v.(common.Hash)] = struct{}{}
+ }
+ return true
+ })
+ // Dump all known but unavailable
+ for hash, _ := range unmark {
+ self.known.Remove(hash)
}
- pv, err := s.Uint()
- if err != nil {
- return fmt.Errorf("bad status message: %v", err)
+}
+
+// broadcast iterates over the collection of envelopes and transmits yet unknown
+// ones over the network.
+func (self *peer) broadcast() error {
+ // Fetch the envelopes and collect the unknown ones
+ envelopes := self.host.envelopes()
+ transmit := make([]*Envelope, 0, len(envelopes))
+ for _, envelope := range envelopes {
+ if !self.marked(envelope) {
+ transmit = append(transmit, envelope)
+ self.mark(envelope)
+ }
}
- if pv != protocolVersion {
- return fmt.Errorf("protocol version mismatch %d != %d", pv, protocolVersion)
+ // Transmit the unknown batch (potentially empty)
+ if err := p2p.Send(self.ws, messagesCode, transmit); err != nil {
+ return err
}
- return msg.Discard() // ignore anything after protocol version
+ self.peer.DebugDetailln("broadcasted", len(transmit), "message(s)")
+
+ return nil
}
diff --git a/whisper/peer_test.go b/whisper/peer_test.go
new file mode 100644
index 000000000..9008cdc59
--- /dev/null
+++ b/whisper/peer_test.go
@@ -0,0 +1,242 @@
+package whisper
+
+import (
+ "testing"
+ "time"
+
+ "github.com/ethereum/go-ethereum/p2p"
+ "github.com/ethereum/go-ethereum/p2p/discover"
+)
+
+type testPeer struct {
+ client *Whisper
+ stream *p2p.MsgPipeRW
+ termed chan struct{}
+}
+
+func startTestPeer() *testPeer {
+ // Create a simulated P2P remote peer and data streams to it
+ remote := p2p.NewPeer(discover.NodeID{}, "", nil)
+ tester, tested := p2p.MsgPipe()
+
+ // Create a whisper client and connect with it to the tester peer
+ client := New()
+ client.Start()
+
+ termed := make(chan struct{})
+ go func() {
+ defer client.Stop()
+ defer close(termed)
+ defer tested.Close()
+
+ client.handlePeer(remote, tested)
+ }()
+
+ return &testPeer{
+ client: client,
+ stream: tester,
+ termed: termed,
+ }
+}
+
+func startTestPeerInited() (*testPeer, error) {
+ peer := startTestPeer()
+
+ if err := p2p.ExpectMsg(peer.stream, statusCode, []uint64{protocolVersion}); err != nil {
+ peer.stream.Close()
+ return nil, err
+ }
+ if err := p2p.SendItems(peer.stream, statusCode, protocolVersion); err != nil {
+ peer.stream.Close()
+ return nil, err
+ }
+ return peer, nil
+}
+
+func TestPeerStatusMessage(t *testing.T) {
+ tester := startTestPeer()
+
+ // Wait for the handshake status message and check it
+ if err := p2p.ExpectMsg(tester.stream, statusCode, []uint64{protocolVersion}); err != nil {
+ t.Fatalf("status message mismatch: %v", err)
+ }
+ // Terminate the node
+ tester.stream.Close()
+
+ select {
+ case <-tester.termed:
+ case <-time.After(time.Second):
+ t.Fatalf("local close timed out")
+ }
+}
+
+func TestPeerHandshakeFail(t *testing.T) {
+ tester := startTestPeer()
+
+ // Wait for and check the handshake
+ if err := p2p.ExpectMsg(tester.stream, statusCode, []uint64{protocolVersion}); err != nil {
+ t.Fatalf("status message mismatch: %v", err)
+ }
+ // Send an invalid handshake status and verify disconnect
+ if err := p2p.SendItems(tester.stream, messagesCode); err != nil {
+ t.Fatalf("failed to send malformed status: %v", err)
+ }
+ select {
+ case <-tester.termed:
+ case <-time.After(time.Second):
+ t.Fatalf("remote close timed out")
+ }
+}
+
+func TestPeerHandshakeSuccess(t *testing.T) {
+ tester := startTestPeer()
+
+ // Wait for and check the handshake
+ if err := p2p.ExpectMsg(tester.stream, statusCode, []uint64{protocolVersion}); err != nil {
+ t.Fatalf("status message mismatch: %v", err)
+ }
+ // Send a valid handshake status and make sure connection stays live
+ if err := p2p.SendItems(tester.stream, statusCode, protocolVersion); err != nil {
+ t.Fatalf("failed to send status: %v", err)
+ }
+ select {
+ case <-tester.termed:
+ t.Fatalf("valid handshake disconnected")
+
+ case <-time.After(100 * time.Millisecond):
+ }
+ // Clean up the test
+ tester.stream.Close()
+
+ select {
+ case <-tester.termed:
+ case <-time.After(time.Second):
+ t.Fatalf("local close timed out")
+ }
+}
+
+func TestPeerSend(t *testing.T) {
+ // Start a tester and execute the handshake
+ tester, err := startTestPeerInited()
+ if err != nil {
+ t.Fatalf("failed to start initialized peer: %v", err)
+ }
+ defer tester.stream.Close()
+
+ // Construct a message and inject into the tester
+ message := NewMessage([]byte("peer broadcast test message"))
+ envelope, err := message.Wrap(DefaultPoW, Options{
+ TTL: DefaultTTL,
+ })
+ if err != nil {
+ t.Fatalf("failed to wrap message: %v", err)
+ }
+ if err := tester.client.Send(envelope); err != nil {
+ t.Fatalf("failed to send message: %v", err)
+ }
+ // Check that the message is eventually forwarded
+ payload := []interface{}{envelope}
+ if err := p2p.ExpectMsg(tester.stream, messagesCode, payload); err != nil {
+ t.Fatalf("message mismatch: %v", err)
+ }
+ // Make sure that even with a re-insert, an empty batch is received
+ if err := tester.client.Send(envelope); err != nil {
+ t.Fatalf("failed to send message: %v", err)
+ }
+ if err := p2p.ExpectMsg(tester.stream, messagesCode, []interface{}{}); err != nil {
+ t.Fatalf("message mismatch: %v", err)
+ }
+}
+
+func TestPeerDeliver(t *testing.T) {
+ // Start a tester and execute the handshake
+ tester, err := startTestPeerInited()
+ if err != nil {
+ t.Fatalf("failed to start initialized peer: %v", err)
+ }
+ defer tester.stream.Close()
+
+ // Watch for all inbound messages
+ arrived := make(chan struct{}, 1)
+ tester.client.Watch(Filter{
+ Fn: func(message *Message) {
+ arrived <- struct{}{}
+ },
+ })
+ // Construct a message and deliver it to the tester peer
+ message := NewMessage([]byte("peer broadcast test message"))
+ envelope, err := message.Wrap(DefaultPoW, Options{
+ TTL: DefaultTTL,
+ })
+ if err != nil {
+ t.Fatalf("failed to wrap message: %v", err)
+ }
+ if err := p2p.Send(tester.stream, messagesCode, []*Envelope{envelope}); err != nil {
+ t.Fatalf("failed to transfer message: %v", err)
+ }
+ // Check that the message is delivered upstream
+ select {
+ case <-arrived:
+ case <-time.After(time.Second):
+ t.Fatalf("message delivery timeout")
+ }
+ // Check that a resend is not delivered
+ if err := p2p.Send(tester.stream, messagesCode, []*Envelope{envelope}); err != nil {
+ t.Fatalf("failed to transfer message: %v", err)
+ }
+ select {
+ case <-time.After(2 * transmissionCycle):
+ case <-arrived:
+ t.Fatalf("repeating message arrived")
+ }
+}
+
+func TestPeerMessageExpiration(t *testing.T) {
+ // Start a tester and execute the handshake
+ tester, err := startTestPeerInited()
+ if err != nil {
+ t.Fatalf("failed to start initialized peer: %v", err)
+ }
+ defer tester.stream.Close()
+
+ // Fetch the peer instance for later inspection
+ tester.client.peerMu.RLock()
+ if peers := len(tester.client.peers); peers != 1 {
+ t.Fatalf("peer pool size mismatch: have %v, want %v", peers, 1)
+ }
+ var peer *peer
+ for peer, _ = range tester.client.peers {
+ break
+ }
+ tester.client.peerMu.RUnlock()
+
+ // Construct a message and pass it through the tester
+ message := NewMessage([]byte("peer test message"))
+ envelope, err := message.Wrap(DefaultPoW, Options{
+ TTL: time.Second,
+ })
+ if err != nil {
+ t.Fatalf("failed to wrap message: %v", err)
+ }
+ if err := tester.client.Send(envelope); err != nil {
+ t.Fatalf("failed to send message: %v", err)
+ }
+ payload := []interface{}{envelope}
+ if err := p2p.ExpectMsg(tester.stream, messagesCode, payload); err != nil {
+ t.Fatalf("message mismatch: %v", err)
+ }
+ // Check that the message is inside the cache
+ if !peer.known.Has(envelope.Hash()) {
+ t.Fatalf("message not found in cache")
+ }
+ // Discard messages until expiration and check cache again
+ exp := time.Now().Add(time.Second + expirationCycle)
+ for time.Now().Before(exp) {
+ if err := p2p.ExpectMsg(tester.stream, messagesCode, []interface{}{}); err != nil {
+ t.Fatalf("message mismatch: %v", err)
+ }
+ }
+ if peer.known.Has(envelope.Hash()) {
+ t.Fatalf("message not expired from cache")
+ }
+}
diff --git a/whisper/sort.go b/whisper/sort.go
deleted file mode 100644
index 313ba5ac0..000000000
--- a/whisper/sort.go
+++ /dev/null
@@ -1,29 +0,0 @@
-package whisper
-
-import (
- "sort"
-
- "github.com/ethereum/go-ethereum/common"
-)
-
-type sortedKeys struct {
- k []int32
-}
-
-func (self *sortedKeys) Len() int { return len(self.k) }
-func (self *sortedKeys) Less(i, j int) bool { return self.k[i] < self.k[j] }
-func (self *sortedKeys) Swap(i, j int) { self.k[i], self.k[j] = self.k[j], self.k[i] }
-
-func sortKeys(m map[int32]common.Hash) []int32 {
- sorted := new(sortedKeys)
- sorted.k = make([]int32, len(m))
- i := 0
- for key, _ := range m {
- sorted.k[i] = key
- i++
- }
-
- sort.Sort(sorted)
-
- return sorted.k
-}
diff --git a/whisper/sort_test.go b/whisper/sort_test.go
deleted file mode 100644
index a61fde4c2..000000000
--- a/whisper/sort_test.go
+++ /dev/null
@@ -1,23 +0,0 @@
-package whisper
-
-import (
- "testing"
-
- "github.com/ethereum/go-ethereum/common"
-)
-
-func TestSorting(t *testing.T) {
- m := map[int32]common.Hash{
- 1: {1},
- 3: {3},
- 2: {2},
- 5: {5},
- }
- exp := []int32{1, 2, 3, 5}
- res := sortKeys(m)
- for i, k := range res {
- if k != exp[i] {
- t.Error(k, "failed. Expected", exp[i])
- }
- }
-}
diff --git a/whisper/topic.go b/whisper/topic.go
new file mode 100644
index 000000000..a965c7cc2
--- /dev/null
+++ b/whisper/topic.go
@@ -0,0 +1,61 @@
+// Contains the Whisper protocol Topic element. For formal details please see
+// the specs at https://github.com/ethereum/wiki/wiki/Whisper-PoC-1-Protocol-Spec#topics.
+
+package whisper
+
+import "github.com/ethereum/go-ethereum/crypto"
+
+// Topic represents a cryptographically secure, probabilistic partial
+// classifications of a message, determined as the first (left) 4 bytes of the
+// SHA3 hash of some arbitrary data given by the original author of the message.
+type Topic [4]byte
+
+// NewTopic creates a topic from the 4 byte prefix of the SHA3 hash of the data.
+func NewTopic(data []byte) Topic {
+ prefix := [4]byte{}
+ copy(prefix[:], crypto.Sha3(data)[:4])
+ return Topic(prefix)
+}
+
+// NewTopics creates a list of topics from a list of binary data elements, by
+// iteratively calling NewTopic on each of them.
+func NewTopics(data ...[]byte) []Topic {
+ topics := make([]Topic, len(data))
+ for i, element := range data {
+ topics[i] = NewTopic(element)
+ }
+ return topics
+}
+
+// NewTopicFromString creates a topic using the binary data contents of the
+// specified string.
+func NewTopicFromString(data string) Topic {
+ return NewTopic([]byte(data))
+}
+
+// NewTopicsFromStrings creates a list of topics from a list of textual data
+// elements, by iteratively calling NewTopicFromString on each of them.
+func NewTopicsFromStrings(data ...string) []Topic {
+ topics := make([]Topic, len(data))
+ for i, element := range data {
+ topics[i] = NewTopicFromString(element)
+ }
+ return topics
+}
+
+// String converts a topic byte array to a string representation.
+func (self *Topic) String() string {
+ return string(self[:])
+}
+
+// TopicSet represents a hash set to check if a topic exists or not.
+type topicSet map[string]struct{}
+
+// NewTopicSet creates a topic hash set from a slice of topics.
+func newTopicSet(topics []Topic) topicSet {
+ set := make(map[string]struct{})
+ for _, topic := range topics {
+ set[topic.String()] = struct{}{}
+ }
+ return topicSet(set)
+}
diff --git a/whisper/topic_test.go b/whisper/topic_test.go
new file mode 100644
index 000000000..4015079dc
--- /dev/null
+++ b/whisper/topic_test.go
@@ -0,0 +1,67 @@
+package whisper
+
+import (
+ "bytes"
+ "testing"
+)
+
+var topicCreationTests = []struct {
+ data []byte
+ hash [4]byte
+}{
+ {hash: [4]byte{0xc5, 0xd2, 0x46, 0x01}, data: nil},
+ {hash: [4]byte{0xc5, 0xd2, 0x46, 0x01}, data: []byte{}},
+ {hash: [4]byte{0x8f, 0x9a, 0x2b, 0x7d}, data: []byte("test name")},
+}
+
+func TestTopicCreation(t *testing.T) {
+ // Create the topics individually
+ for i, tt := range topicCreationTests {
+ topic := NewTopic(tt.data)
+ if bytes.Compare(topic[:], tt.hash[:]) != 0 {
+ t.Errorf("binary test %d: hash mismatch: have %v, want %v.", i, topic, tt.hash)
+ }
+ }
+ for i, tt := range topicCreationTests {
+ topic := NewTopicFromString(string(tt.data))
+ if bytes.Compare(topic[:], tt.hash[:]) != 0 {
+ t.Errorf("textual test %d: hash mismatch: have %v, want %v.", i, topic, tt.hash)
+ }
+ }
+ // Create the topics in batches
+ binaryData := make([][]byte, len(topicCreationTests))
+ for i, tt := range topicCreationTests {
+ binaryData[i] = tt.data
+ }
+ textualData := make([]string, len(topicCreationTests))
+ for i, tt := range topicCreationTests {
+ textualData[i] = string(tt.data)
+ }
+
+ topics := NewTopics(binaryData...)
+ for i, tt := range topicCreationTests {
+ if bytes.Compare(topics[i][:], tt.hash[:]) != 0 {
+ t.Errorf("binary batch test %d: hash mismatch: have %v, want %v.", i, topics[i], tt.hash)
+ }
+ }
+ topics = NewTopicsFromStrings(textualData...)
+ for i, tt := range topicCreationTests {
+ if bytes.Compare(topics[i][:], tt.hash[:]) != 0 {
+ t.Errorf("textual batch test %d: hash mismatch: have %v, want %v.", i, topics[i], tt.hash)
+ }
+ }
+}
+
+func TestTopicSetCreation(t *testing.T) {
+ topics := make([]Topic, len(topicCreationTests))
+ for i, tt := range topicCreationTests {
+ topics[i] = NewTopic(tt.data)
+ }
+ set := newTopicSet(topics)
+ for i, tt := range topicCreationTests {
+ topic := NewTopic(tt.data)
+ if _, ok := set[topic.String()]; !ok {
+ t.Errorf("topic %d: not found in set", i)
+ }
+ }
+}
diff --git a/whisper/util.go b/whisper/util.go
deleted file mode 100644
index 7a222395f..000000000
--- a/whisper/util.go
+++ /dev/null
@@ -1,36 +0,0 @@
-package whisper
-
-import "github.com/ethereum/go-ethereum/crypto"
-
-func hashTopic(topic []byte) []byte {
- return crypto.Sha3(topic)[:4]
-}
-
-// NOTE this isn't DRY, but I don't want to iterate twice.
-
-// Returns a formatted topics byte slice.
-// data: unformatted data (e.g., no hashes needed)
-func Topics(data [][]byte) [][]byte {
- d := make([][]byte, len(data))
- for i, byts := range data {
- d[i] = hashTopic(byts)
- }
- return d
-}
-
-func TopicsFromString(data ...string) [][]byte {
- d := make([][]byte, len(data))
- for i, str := range data {
- d[i] = hashTopic([]byte(str))
- }
- return d
-}
-
-func bytesToMap(s [][]byte) map[string]struct{} {
- m := make(map[string]struct{})
- for _, topic := range s {
- m[string(topic)] = struct{}{}
- }
-
- return m
-}
diff --git a/whisper/whisper.go b/whisper/whisper.go
index d803e27d4..9317fad50 100644
--- a/whisper/whisper.go
+++ b/whisper/whisper.go
@@ -2,7 +2,6 @@ package whisper
import (
"crypto/ecdsa"
- "errors"
"sync"
"time"
@@ -17,9 +16,22 @@ import (
)
const (
- statusMsg = 0x0
- envelopesMsg = 0x01
- whisperVersion = 0x02
+ statusCode = 0x00
+ messagesCode = 0x01
+
+ protocolVersion uint64 = 0x02
+ protocolName = "shh"
+
+ signatureFlag = byte(1 << 7)
+ signatureLength = 65
+
+ expirationCycle = 800 * time.Millisecond
+ transmissionCycle = 300 * time.Millisecond
+)
+
+const (
+ DefaultTTL = 50 * time.Second
+ DefaultPoW = 50 * time.Millisecond
)
type MessageEvent struct {
@@ -28,250 +40,298 @@ type MessageEvent struct {
Message *Message
}
-const (
- DefaultTimeToLive = 50 * time.Second
- DefaultProofOfWork = 50 * time.Millisecond
-)
-
+// Whisper represents a dark communication interface through the Ethereum
+// network, using its very own P2P communication layer.
type Whisper struct {
protocol p2p.Protocol
filters *filter.Filters
- mmu sync.RWMutex
- messages map[common.Hash]*Envelope
- expiry map[uint32]*set.SetNonTS
+ keys map[string]*ecdsa.PrivateKey
- quit chan struct{}
+ messages map[common.Hash]*Envelope // Pool of messages currently tracked by this node
+ expirations map[uint32]*set.SetNonTS // Message expiration pool (TODO: something lighter)
+ poolMu sync.RWMutex // Mutex to sync the message and expiration pools
- keys map[string]*ecdsa.PrivateKey
+ peers map[*peer]struct{} // Set of currently active peers
+ peerMu sync.RWMutex // Mutex to sync the active peer set
+
+ quit chan struct{}
}
func New() *Whisper {
whisper := &Whisper{
- messages: make(map[common.Hash]*Envelope),
- filters: filter.New(),
- expiry: make(map[uint32]*set.SetNonTS),
- quit: make(chan struct{}),
- keys: make(map[string]*ecdsa.PrivateKey),
+ filters: filter.New(),
+ keys: make(map[string]*ecdsa.PrivateKey),
+ messages: make(map[common.Hash]*Envelope),
+ expirations: make(map[uint32]*set.SetNonTS),
+ peers: make(map[*peer]struct{}),
+ quit: make(chan struct{}),
}
whisper.filters.Start()
// p2p whisper sub protocol handler
whisper.protocol = p2p.Protocol{
- Name: "shh",
- Version: uint(whisperVersion),
+ Name: protocolName,
+ Version: uint(protocolVersion),
Length: 2,
- Run: whisper.msgHandler,
+ Run: whisper.handlePeer,
}
return whisper
}
-func (self *Whisper) Version() uint {
- return self.protocol.Version
-}
-
-func (self *Whisper) Start() {
- glog.V(logger.Info).Infoln("Whisper started")
- go self.update()
-}
-
-func (self *Whisper) Stop() {
- close(self.quit)
+// Protocol returns the whisper sub-protocol handler for this particular client.
+func (self *Whisper) Protocol() p2p.Protocol {
+ return self.protocol
}
-func (self *Whisper) Send(envelope *Envelope) error {
- return self.add(envelope)
+// Version returns the whisper sub-protocols version number.
+func (self *Whisper) Version() uint {
+ return self.protocol.Version
}
+// NewIdentity generates a new cryptographic identity for the client, and injects
+// it into the known identities for message decryption.
func (self *Whisper) NewIdentity() *ecdsa.PrivateKey {
key, err := crypto.GenerateKey()
if err != nil {
panic(err)
}
-
self.keys[string(crypto.FromECDSAPub(&key.PublicKey))] = key
return key
}
+// HasIdentity checks if the the whisper node is configured with the private key
+// of the specified public pair.
func (self *Whisper) HasIdentity(key *ecdsa.PublicKey) bool {
return self.keys[string(crypto.FromECDSAPub(key))] != nil
}
+// GetIdentity retrieves the private key of the specified public identity.
func (self *Whisper) GetIdentity(key *ecdsa.PublicKey) *ecdsa.PrivateKey {
return self.keys[string(crypto.FromECDSAPub(key))]
}
-// func (self *Whisper) RemoveIdentity(key *ecdsa.PublicKey) bool {
-// k := string(crypto.FromECDSAPub(key))
-// if _, ok := self.keys[k]; ok {
-// delete(self.keys, k)
-// return true
-// }
-// return false
-// }
-
-func (self *Whisper) Watch(opts Filter) int {
- return self.filters.Install(filter.Generic{
- Str1: string(crypto.FromECDSAPub(opts.To)),
- Str2: string(crypto.FromECDSAPub(opts.From)),
- Data: bytesToMap(opts.Topics),
+// Watch installs a new message handler to run in case a matching packet arrives
+// from the whisper network.
+func (self *Whisper) Watch(options Filter) int {
+ filter := filter.Generic{
+ Str1: string(crypto.FromECDSAPub(options.To)),
+ Str2: string(crypto.FromECDSAPub(options.From)),
+ Data: newTopicSet(options.Topics),
Fn: func(data interface{}) {
- opts.Fn(data.(*Message))
+ options.Fn(data.(*Message))
},
- })
+ }
+ return self.filters.Install(filter)
}
+// Unwatch removes an installed message handler.
func (self *Whisper) Unwatch(id int) {
self.filters.Uninstall(id)
}
-func (self *Whisper) Messages(id int) (messages []*Message) {
- filter := self.filters.Get(id)
- if filter != nil {
- for _, e := range self.messages {
- if msg, key := self.open(e); msg != nil {
- f := createFilter(msg, e.Topics, key)
- if self.filters.Match(filter, f) {
- messages = append(messages, msg)
+// Send injects a message into the whisper send queue, to be distributed in the
+// network in the coming cycles.
+func (self *Whisper) Send(envelope *Envelope) error {
+ return self.add(envelope)
+}
+
+func (self *Whisper) Start() {
+ glog.V(logger.Info).Infoln("Whisper started")
+ go self.update()
+}
+
+func (self *Whisper) Stop() {
+ close(self.quit)
+ glog.V(logger.Info).Infoln("Whisper stopped")
+}
+
+// Messages retrieves the currently pooled messages matching a filter id.
+func (self *Whisper) Messages(id int) []*Message {
+ messages := make([]*Message, 0)
+ if filter := self.filters.Get(id); filter != nil {
+ for _, envelope := range self.messages {
+ if message := self.open(envelope); message != nil {
+ if self.filters.Match(filter, createFilter(message, envelope.Topics)) {
+ messages = append(messages, message)
}
}
}
}
-
- return
+ return messages
}
-// Main handler for passing whisper messages to whisper peer objects
-func (self *Whisper) msgHandler(peer *p2p.Peer, ws p2p.MsgReadWriter) error {
- wpeer := NewPeer(self, peer, ws)
- // initialise whisper peer (handshake/status)
- if err := wpeer.init(); err != nil {
+// func (self *Whisper) RemoveIdentity(key *ecdsa.PublicKey) bool {
+// k := string(crypto.FromECDSAPub(key))
+// if _, ok := self.keys[k]; ok {
+// delete(self.keys, k)
+// return true
+// }
+// return false
+// }
+
+// handlePeer is called by the underlying P2P layer when the whisper sub-protocol
+// connection is negotiated.
+func (self *Whisper) handlePeer(peer *p2p.Peer, rw p2p.MsgReadWriter) error {
+ // Create, initialize and start the whisper peer
+ whisperPeer, err := newPeer(self, peer, rw)
+ if err != nil {
return err
}
- // kick of the main handler for broadcasting/managing envelopes
- go wpeer.start()
- defer wpeer.stop()
-
- // Main *read* loop. Writing is done by the peer it self.
+ whisperPeer.start()
+ defer whisperPeer.stop()
+
+ // Start tracking the active peer
+ self.peerMu.Lock()
+ self.peers[whisperPeer] = struct{}{}
+ self.peerMu.Unlock()
+
+ defer func() {
+ self.peerMu.Lock()
+ delete(self.peers, whisperPeer)
+ self.peerMu.Unlock()
+ }()
+ // Read and process inbound messages directly to merge into client-global state
for {
- msg, err := ws.ReadMsg()
+ // Fetch the next packet and decode the contained envelopes
+ packet, err := rw.ReadMsg()
if err != nil {
return err
}
-
var envelopes []*Envelope
- if err := msg.Decode(&envelopes); err != nil {
- peer.Infoln(err)
+ if err := packet.Decode(&envelopes); err != nil {
+ peer.Infof("failed to decode enveloped: %v", err)
continue
}
-
+ // Inject all envelopes into the internal pool
for _, envelope := range envelopes {
if err := self.add(envelope); err != nil {
// TODO Punish peer here. Invalid envelope.
- peer.Debugln(err)
+ peer.Debugf("failed to pool envelope: %f", err)
}
- wpeer.addKnown(envelope)
+ whisperPeer.mark(envelope)
}
}
}
-// takes care of adding envelopes to the messages pool. At this moment no sanity checks are being performed.
+// add inserts a new envelope into the message pool to be distributed within the
+// whisper network. It also inserts the envelope into the expiration pool at the
+// appropriate time-stamp.
func (self *Whisper) add(envelope *Envelope) error {
- if !envelope.valid() {
- return errors.New("invalid pow provided for envelope")
- }
-
- self.mmu.Lock()
- defer self.mmu.Unlock()
+ self.poolMu.Lock()
+ defer self.poolMu.Unlock()
+ // Insert the message into the tracked pool
hash := envelope.Hash()
+ if _, ok := self.messages[hash]; ok {
+ glog.V(logger.Detail).Infof("whisper envelope already cached: %x\n", envelope)
+ return nil
+ }
self.messages[hash] = envelope
- if self.expiry[envelope.Expiry] == nil {
- self.expiry[envelope.Expiry] = set.NewNonTS()
+
+ // Insert the message into the expiration pool for later removal
+ if self.expirations[envelope.Expiry] == nil {
+ self.expirations[envelope.Expiry] = set.NewNonTS()
}
+ if !self.expirations[envelope.Expiry].Has(hash) {
+ self.expirations[envelope.Expiry].Add(hash)
- if !self.expiry[envelope.Expiry].Has(hash) {
- self.expiry[envelope.Expiry].Add(hash)
+ // Notify the local node of a message arrival
go self.postEvent(envelope)
}
+ glog.V(logger.Detail).Infof("cached whisper envelope %x\n", envelope)
- glog.V(logger.Detail).Infof("added whisper envelope %x\n", envelope)
+ return nil
+}
+
+// postEvent opens an envelope with the configured identities and delivers the
+// message upstream from application processing.
+func (self *Whisper) postEvent(envelope *Envelope) {
+ if message := self.open(envelope); message != nil {
+ self.filters.Notify(createFilter(message, envelope.Topics), message)
+ }
+}
+// open tries to decrypt a whisper envelope with all the configured identities,
+// returning the decrypted message and the key used to achieve it. If not keys
+// are configured, open will return the payload as if non encrypted.
+func (self *Whisper) open(envelope *Envelope) *Message {
+ // Short circuit if no identity is set, and assume clear-text
+ if len(self.keys) == 0 {
+ if message, err := envelope.Open(nil); err == nil {
+ return message
+ }
+ }
+ // Iterate over the keys and try to decrypt the message
+ for _, key := range self.keys {
+ message, err := envelope.Open(key)
+ if err == nil || err == ecies.ErrInvalidPublicKey {
+ message.To = &key.PublicKey
+ return message
+ }
+ }
+ // Failed to decrypt, don't return anything
return nil
}
+// createFilter creates a message filter to check against installed handlers.
+func createFilter(message *Message, topics []Topic) filter.Filter {
+ return filter.Generic{
+ Str1: string(crypto.FromECDSAPub(message.To)),
+ Str2: string(crypto.FromECDSAPub(message.Recover())),
+ Data: newTopicSet(topics),
+ }
+}
+
+// update loops until the lifetime of the whisper node, updating its internal
+// state by expiring stale messages from the pool.
func (self *Whisper) update() {
- expire := time.NewTicker(800 * time.Millisecond)
-out:
+ // Start a ticker to check for expirations
+ expire := time.NewTicker(expirationCycle)
+
+ // Repeat updates until termination is requested
for {
select {
case <-expire.C:
self.expire()
+
case <-self.quit:
- break out
+ return
}
}
}
+// expire iterates over all the expiration timestamps, removing all stale
+// messages from the pools.
func (self *Whisper) expire() {
- self.mmu.Lock()
- defer self.mmu.Unlock()
+ self.poolMu.Lock()
+ defer self.poolMu.Unlock()
now := uint32(time.Now().Unix())
- for then, hashSet := range self.expiry {
+ for then, hashSet := range self.expirations {
+ // Short circuit if a future time
if then > now {
continue
}
-
+ // Dump all expired messages and remove timestamp
hashSet.Each(func(v interface{}) bool {
delete(self.messages, v.(common.Hash))
return true
})
- self.expiry[then].Clear()
+ self.expirations[then].Clear()
}
}
-func (self *Whisper) envelopes() (envelopes []*Envelope) {
- self.mmu.RLock()
- defer self.mmu.RUnlock()
+// envelopes retrieves all the messages currently pooled by the node.
+func (self *Whisper) envelopes() []*Envelope {
+ self.poolMu.RLock()
+ defer self.poolMu.RUnlock()
- envelopes = make([]*Envelope, len(self.messages))
- i := 0
+ envelopes := make([]*Envelope, 0, len(self.messages))
for _, envelope := range self.messages {
- envelopes[i] = envelope
- i++
- }
-
- return
-}
-
-func (self *Whisper) postEvent(envelope *Envelope) {
- if message, key := self.open(envelope); message != nil {
- self.filters.Notify(createFilter(message, envelope.Topics, key), message)
- }
-}
-
-func (self *Whisper) open(envelope *Envelope) (*Message, *ecdsa.PrivateKey) {
- for _, key := range self.keys {
- if message, err := envelope.Open(key); err == nil || (err != nil && err == ecies.ErrInvalidPublicKey) {
- message.To = &key.PublicKey
-
- return message, key
- }
- }
-
- return nil, nil
-}
-
-func (self *Whisper) Protocol() p2p.Protocol {
- return self.protocol
-}
-
-func createFilter(message *Message, topics [][]byte, key *ecdsa.PrivateKey) filter.Filter {
- return filter.Generic{
- Str1: string(crypto.FromECDSAPub(&key.PublicKey)), Str2: string(crypto.FromECDSAPub(message.Recover())),
- Data: bytesToMap(topics),
+ envelopes = append(envelopes, envelope)
}
+ return envelopes
}
diff --git a/whisper/whisper_test.go b/whisper/whisper_test.go
index b29e34a5e..def8e68d8 100644
--- a/whisper/whisper_test.go
+++ b/whisper/whisper_test.go
@@ -1,38 +1,185 @@
package whisper
import (
- "fmt"
"testing"
"time"
+
+ "github.com/ethereum/go-ethereum/p2p"
+ "github.com/ethereum/go-ethereum/p2p/discover"
)
-func TestEvent(t *testing.T) {
- res := make(chan *Message, 1)
- whisper := New()
- id := whisper.NewIdentity()
- whisper.Watch(Filter{
- To: &id.PublicKey,
+func startTestCluster(n int) []*Whisper {
+ // Create the batch of simulated peers
+ nodes := make([]*p2p.Peer, n)
+ for i := 0; i < n; i++ {
+ nodes[i] = p2p.NewPeer(discover.NodeID{}, "", nil)
+ }
+ whispers := make([]*Whisper, n)
+ for i := 0; i < n; i++ {
+ whispers[i] = New()
+ whispers[i].Start()
+ }
+ // Wire all the peers to the root one
+ for i := 1; i < n; i++ {
+ src, dst := p2p.MsgPipe()
+
+ go whispers[0].handlePeer(nodes[i], src)
+ go whispers[i].handlePeer(nodes[0], dst)
+ }
+ return whispers
+}
+
+func TestSelfMessage(t *testing.T) {
+ // Start the single node cluster
+ client := startTestCluster(1)[0]
+
+ // Start watching for self messages, signal any arrivals
+ self := client.NewIdentity()
+ done := make(chan struct{})
+
+ client.Watch(Filter{
+ To: &self.PublicKey,
Fn: func(msg *Message) {
- res <- msg
+ close(done)
},
})
-
- msg := NewMessage([]byte(fmt.Sprintf("Hello world. This is whisper-go. Incase you're wondering; the time is %v", time.Now())))
- envelope, err := msg.Wrap(DefaultProofOfWork, Options{
- TTL: DefaultTimeToLive,
- From: id,
- To: &id.PublicKey,
+ // Send a dummy message to oneself
+ msg := NewMessage([]byte("self whisper"))
+ envelope, err := msg.Wrap(DefaultPoW, Options{
+ From: self,
+ To: &self.PublicKey,
+ TTL: DefaultTTL,
})
if err != nil {
- fmt.Println(err)
- t.FailNow()
+ t.Fatalf("failed to wrap message: %v", err)
+ }
+ // Dump the message into the system and wait for it to pop back out
+ if err := client.Send(envelope); err != nil {
+ t.Fatalf("failed to send self-message: %v", err)
+ }
+ select {
+ case <-done:
+ case <-time.After(time.Second):
+ t.Fatalf("self-message receive timeout")
}
+}
+
+func TestDirectMessage(t *testing.T) {
+ // Start the sender-recipient cluster
+ cluster := startTestCluster(2)
- tick := time.NewTicker(time.Second)
- whisper.postEvent(envelope)
+ sender := cluster[0]
+ senderId := sender.NewIdentity()
+
+ recipient := cluster[1]
+ recipientId := recipient.NewIdentity()
+
+ // Watch for arriving messages on the recipient
+ done := make(chan struct{})
+ recipient.Watch(Filter{
+ To: &recipientId.PublicKey,
+ Fn: func(msg *Message) {
+ close(done)
+ },
+ })
+ // Send a dummy message from the sender
+ msg := NewMessage([]byte("direct whisper"))
+ envelope, err := msg.Wrap(DefaultPoW, Options{
+ From: senderId,
+ To: &recipientId.PublicKey,
+ TTL: DefaultTTL,
+ })
+ if err != nil {
+ t.Fatalf("failed to wrap message: %v", err)
+ }
+ if err := sender.Send(envelope); err != nil {
+ t.Fatalf("failed to send direct message: %v", err)
+ }
+ // Wait for an arrival or a timeout
select {
- case <-res:
- case <-tick.C:
- t.Error("did not receive message")
+ case <-done:
+ case <-time.After(time.Second):
+ t.Fatalf("direct message receive timeout")
+ }
+}
+
+func TestAnonymousBroadcast(t *testing.T) {
+ testBroadcast(true, t)
+}
+
+func TestIdentifiedBroadcast(t *testing.T) {
+ testBroadcast(false, t)
+}
+
+func testBroadcast(anonymous bool, t *testing.T) {
+ // Start the single sender multi recipient cluster
+ cluster := startTestCluster(3)
+
+ sender := cluster[1]
+ targets := cluster[1:]
+ for _, target := range targets {
+ if !anonymous {
+ target.NewIdentity()
+ }
+ }
+ // Watch for arriving messages on the recipients
+ dones := make([]chan struct{}, len(targets))
+ for i := 0; i < len(targets); i++ {
+ done := make(chan struct{}) // need for the closure
+ dones[i] = done
+
+ targets[i].Watch(Filter{
+ Topics: NewTopicsFromStrings("broadcast topic"),
+ Fn: func(msg *Message) {
+ close(done)
+ },
+ })
+ }
+ // Send a dummy message from the sender
+ msg := NewMessage([]byte("broadcast whisper"))
+ envelope, err := msg.Wrap(DefaultPoW, Options{
+ Topics: NewTopicsFromStrings("broadcast topic"),
+ TTL: DefaultTTL,
+ })
+ if err != nil {
+ t.Fatalf("failed to wrap message: %v", err)
+ }
+ if err := sender.Send(envelope); err != nil {
+ t.Fatalf("failed to send broadcast message: %v", err)
+ }
+ // Wait for an arrival on each recipient, or timeouts
+ timeout := time.After(time.Second)
+ for _, done := range dones {
+ select {
+ case <-done:
+ case <-timeout:
+ t.Fatalf("broadcast message receive timeout")
+ }
+ }
+}
+
+func TestMessageExpiration(t *testing.T) {
+ // Start the single node cluster and inject a dummy message
+ node := startTestCluster(1)[0]
+
+ message := NewMessage([]byte("expiring message"))
+ envelope, err := message.Wrap(DefaultPoW, Options{
+ TTL: time.Second,
+ })
+ if err != nil {
+ t.Fatalf("failed to wrap message: %v", err)
+ }
+ if err := node.Send(envelope); err != nil {
+ t.Fatalf("failed to inject message: %v", err)
+ }
+ // Check that the message is inside the cache
+ if _, ok := node.messages[envelope.Hash()]; !ok {
+ t.Fatalf("message not found in cache")
+ }
+ // Wait for expiration and check cache again
+ time.Sleep(time.Second) // wait for expiration
+ time.Sleep(expirationCycle) // wait for cleanup cycle
+ if _, ok := node.messages[envelope.Hash()]; ok {
+ t.Fatalf("message not expired from cache")
}
}
diff --git a/xeth/whisper.go b/xeth/whisper.go
index 51caec8d6..342910b5c 100644
--- a/xeth/whisper.go
+++ b/xeth/whisper.go
@@ -36,7 +36,7 @@ func (self *Whisper) Post(payload string, to, from string, topics []string, prio
TTL: time.Duration(ttl) * time.Second,
To: crypto.ToECDSAPub(common.FromHex(to)),
From: key,
- Topics: whisper.TopicsFromString(topics...),
+ Topics: whisper.NewTopicsFromStrings(topics...),
})
if err != nil {
@@ -71,7 +71,7 @@ func (self *Whisper) Watch(opts *Options) int {
filter := whisper.Filter{
To: crypto.ToECDSAPub(common.FromHex(opts.To)),
From: crypto.ToECDSAPub(common.FromHex(opts.From)),
- Topics: whisper.TopicsFromString(opts.Topics...),
+ Topics: whisper.NewTopicsFromStrings(opts.Topics...),
}
var i int