aboutsummaryrefslogtreecommitdiffstats
path: root/whisper/whisperv2
diff options
context:
space:
mode:
Diffstat (limited to 'whisper/whisperv2')
-rw-r--r--whisper/whisperv2/api.go413
-rw-r--r--whisper/whisperv2/doc.go32
-rw-r--r--whisper/whisperv2/envelope.go147
-rw-r--r--whisper/whisperv2/envelope_test.go158
-rw-r--r--whisper/whisperv2/filter.go132
-rw-r--r--whisper/whisperv2/filter_test.go215
-rw-r--r--whisper/whisperv2/main.go106
-rw-r--r--whisper/whisperv2/message.go156
-rw-r--r--whisper/whisperv2/message_test.go159
-rw-r--r--whisper/whisperv2/peer.go175
-rw-r--r--whisper/whisperv2/peer_test.go261
-rw-r--r--whisper/whisperv2/topic.go140
-rw-r--r--whisper/whisperv2/topic_test.go215
-rw-r--r--whisper/whisperv2/whisper.go378
-rw-r--r--whisper/whisperv2/whisper_test.go216
15 files changed, 2903 insertions, 0 deletions
diff --git a/whisper/whisperv2/api.go b/whisper/whisperv2/api.go
new file mode 100644
index 000000000..9c9c6a84c
--- /dev/null
+++ b/whisper/whisperv2/api.go
@@ -0,0 +1,413 @@
+// Copyright 2015 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package whisperv2
+
+import (
+ "encoding/json"
+ "fmt"
+ "sync"
+ "time"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/crypto"
+ "github.com/ethereum/go-ethereum/rpc"
+)
+
+// PublicWhisperAPI provides the whisper RPC service.
+type PublicWhisperAPI struct {
+ w *Whisper
+
+ messagesMu sync.RWMutex
+ messages map[int]*whisperFilter
+}
+
+type whisperOfflineError struct{}
+
+func (e *whisperOfflineError) Error() string {
+ return "whisper is offline"
+}
+
+// whisperOffLineErr is returned when the node doesn't offer the shh service.
+var whisperOffLineErr = new(whisperOfflineError)
+
+// NewPublicWhisperAPI create a new RPC whisper service.
+func NewPublicWhisperAPI(w *Whisper) *PublicWhisperAPI {
+ return &PublicWhisperAPI{w: w, messages: make(map[int]*whisperFilter)}
+}
+
+// Version returns the Whisper version this node offers.
+func (s *PublicWhisperAPI) Version() (*rpc.HexNumber, error) {
+ if s.w == nil {
+ return rpc.NewHexNumber(0), whisperOffLineErr
+ }
+ return rpc.NewHexNumber(s.w.Version()), nil
+}
+
+// HasIdentity checks if the the whisper node is configured with the private key
+// of the specified public pair.
+func (s *PublicWhisperAPI) HasIdentity(identity string) (bool, error) {
+ if s.w == nil {
+ return false, whisperOffLineErr
+ }
+ return s.w.HasIdentity(crypto.ToECDSAPub(common.FromHex(identity))), nil
+}
+
+// NewIdentity generates a new cryptographic identity for the client, and injects
+// it into the known identities for message decryption.
+func (s *PublicWhisperAPI) NewIdentity() (string, error) {
+ if s.w == nil {
+ return "", whisperOffLineErr
+ }
+
+ identity := s.w.NewIdentity()
+ return common.ToHex(crypto.FromECDSAPub(&identity.PublicKey)), nil
+}
+
+type NewFilterArgs struct {
+ To string
+ From string
+ Topics [][][]byte
+}
+
+// NewWhisperFilter creates and registers a new message filter to watch for inbound whisper messages.
+func (s *PublicWhisperAPI) NewFilter(args NewFilterArgs) (*rpc.HexNumber, error) {
+ if s.w == nil {
+ return nil, whisperOffLineErr
+ }
+
+ var id int
+ filter := Filter{
+ To: crypto.ToECDSAPub(common.FromHex(args.To)),
+ From: crypto.ToECDSAPub(common.FromHex(args.From)),
+ Topics: NewFilterTopics(args.Topics...),
+ Fn: func(message *Message) {
+ wmsg := NewWhisperMessage(message)
+ s.messagesMu.RLock() // Only read lock to the filter pool
+ defer s.messagesMu.RUnlock()
+ if s.messages[id] != nil {
+ s.messages[id].insert(wmsg)
+ }
+ },
+ }
+
+ id = s.w.Watch(filter)
+
+ s.messagesMu.Lock()
+ s.messages[id] = newWhisperFilter(id, s.w)
+ s.messagesMu.Unlock()
+
+ return rpc.NewHexNumber(id), nil
+}
+
+// GetFilterChanges retrieves all the new messages matched by a filter since the last retrieval.
+func (s *PublicWhisperAPI) GetFilterChanges(filterId rpc.HexNumber) []WhisperMessage {
+ s.messagesMu.RLock()
+ defer s.messagesMu.RUnlock()
+
+ if s.messages[filterId.Int()] != nil {
+ if changes := s.messages[filterId.Int()].retrieve(); changes != nil {
+ return changes
+ }
+ }
+ return returnWhisperMessages(nil)
+}
+
+// UninstallFilter disables and removes an existing filter.
+func (s *PublicWhisperAPI) UninstallFilter(filterId rpc.HexNumber) bool {
+ s.messagesMu.Lock()
+ defer s.messagesMu.Unlock()
+
+ if _, ok := s.messages[filterId.Int()]; ok {
+ delete(s.messages, filterId.Int())
+ return true
+ }
+ return false
+}
+
+// GetMessages retrieves all the known messages that match a specific filter.
+func (s *PublicWhisperAPI) GetMessages(filterId rpc.HexNumber) []WhisperMessage {
+ // Retrieve all the cached messages matching a specific, existing filter
+ s.messagesMu.RLock()
+ defer s.messagesMu.RUnlock()
+
+ var messages []*Message
+ if s.messages[filterId.Int()] != nil {
+ messages = s.messages[filterId.Int()].messages()
+ }
+
+ return returnWhisperMessages(messages)
+}
+
+// returnWhisperMessages converts aNhisper message to a RPC whisper message.
+func returnWhisperMessages(messages []*Message) []WhisperMessage {
+ msgs := make([]WhisperMessage, len(messages))
+ for i, msg := range messages {
+ msgs[i] = NewWhisperMessage(msg)
+ }
+ return msgs
+}
+
+type PostArgs struct {
+ From string `json:"from"`
+ To string `json:"to"`
+ Topics [][]byte `json:"topics"`
+ Payload string `json:"payload"`
+ Priority int64 `json:"priority"`
+ TTL int64 `json:"ttl"`
+}
+
+// Post injects a message into the whisper network for distribution.
+func (s *PublicWhisperAPI) Post(args PostArgs) (bool, error) {
+ if s.w == nil {
+ return false, whisperOffLineErr
+ }
+
+ // construct whisper message with transmission options
+ message := NewMessage(common.FromHex(args.Payload))
+ options := Options{
+ To: crypto.ToECDSAPub(common.FromHex(args.To)),
+ TTL: time.Duration(args.TTL) * time.Second,
+ Topics: NewTopics(args.Topics...),
+ }
+
+ // set sender identity
+ if len(args.From) > 0 {
+ if key := s.w.GetIdentity(crypto.ToECDSAPub(common.FromHex(args.From))); key != nil {
+ options.From = key
+ } else {
+ return false, fmt.Errorf("unknown identity to send from: %s", args.From)
+ }
+ }
+
+ // Wrap and send the message
+ pow := time.Duration(args.Priority) * time.Millisecond
+ envelope, err := message.Wrap(pow, options)
+ if err != nil {
+ return false, err
+ }
+
+ return true, s.w.Send(envelope)
+}
+
+// WhisperMessage is the RPC representation of a whisper message.
+type WhisperMessage struct {
+ ref *Message
+
+ Payload string `json:"payload"`
+ To string `json:"to"`
+ From string `json:"from"`
+ Sent int64 `json:"sent"`
+ TTL int64 `json:"ttl"`
+ Hash string `json:"hash"`
+}
+
+func (args *PostArgs) UnmarshalJSON(data []byte) (err error) {
+ var obj struct {
+ From string `json:"from"`
+ To string `json:"to"`
+ Topics []string `json:"topics"`
+ Payload string `json:"payload"`
+ Priority rpc.HexNumber `json:"priority"`
+ TTL rpc.HexNumber `json:"ttl"`
+ }
+
+ if err := json.Unmarshal(data, &obj); err != nil {
+ return err
+ }
+
+ args.From = obj.From
+ args.To = obj.To
+ args.Payload = obj.Payload
+ args.Priority = obj.Priority.Int64()
+ args.TTL = obj.TTL.Int64()
+
+ // decode topic strings
+ args.Topics = make([][]byte, len(obj.Topics))
+ for i, topic := range obj.Topics {
+ args.Topics[i] = common.FromHex(topic)
+ }
+
+ return nil
+}
+
+// UnmarshalJSON implements the json.Unmarshaler interface, invoked to convert a
+// JSON message blob into a WhisperFilterArgs structure.
+func (args *NewFilterArgs) UnmarshalJSON(b []byte) (err error) {
+ // Unmarshal the JSON message and sanity check
+ var obj struct {
+ To interface{} `json:"to"`
+ From interface{} `json:"from"`
+ Topics interface{} `json:"topics"`
+ }
+ if err := json.Unmarshal(b, &obj); err != nil {
+ return err
+ }
+
+ // Retrieve the simple data contents of the filter arguments
+ if obj.To == nil {
+ args.To = ""
+ } else {
+ argstr, ok := obj.To.(string)
+ if !ok {
+ return fmt.Errorf("to is not a string")
+ }
+ args.To = argstr
+ }
+ if obj.From == nil {
+ args.From = ""
+ } else {
+ argstr, ok := obj.From.(string)
+ if !ok {
+ return fmt.Errorf("from is not a string")
+ }
+ args.From = argstr
+ }
+ // Construct the nested topic array
+ if obj.Topics != nil {
+ // Make sure we have an actual topic array
+ list, ok := obj.Topics.([]interface{})
+ if !ok {
+ return fmt.Errorf("topics is not an array")
+ }
+ // Iterate over each topic and handle nil, string or array
+ topics := make([][]string, len(list))
+ for idx, field := range list {
+ switch value := field.(type) {
+ case nil:
+ topics[idx] = []string{}
+
+ case string:
+ topics[idx] = []string{value}
+
+ case []interface{}:
+ topics[idx] = make([]string, len(value))
+ for i, nested := range value {
+ switch value := nested.(type) {
+ case nil:
+ topics[idx][i] = ""
+
+ case string:
+ topics[idx][i] = value
+
+ default:
+ return fmt.Errorf("topic[%d][%d] is not a string", idx, i)
+ }
+ }
+ default:
+ return fmt.Errorf("topic[%d] not a string or array", idx)
+ }
+ }
+
+ topicsDecoded := make([][][]byte, len(topics))
+ for i, condition := range topics {
+ topicsDecoded[i] = make([][]byte, len(condition))
+ for j, topic := range condition {
+ topicsDecoded[i][j] = common.FromHex(topic)
+ }
+ }
+
+ args.Topics = topicsDecoded
+ }
+ return nil
+}
+
+// whisperFilter is the message cache matching a specific filter, accumulating
+// inbound messages until the are requested by the client.
+type whisperFilter struct {
+ id int // Filter identifier for old message retrieval
+ ref *Whisper // Whisper reference for old message retrieval
+
+ cache []WhisperMessage // Cache of messages not yet polled
+ skip map[common.Hash]struct{} // List of retrieved messages to avoid duplication
+ update time.Time // Time of the last message query
+
+ lock sync.RWMutex // Lock protecting the filter internals
+}
+
+// messages retrieves all the cached messages from the entire pool matching the
+// filter, resetting the filter's change buffer.
+func (w *whisperFilter) messages() []*Message {
+ w.lock.Lock()
+ defer w.lock.Unlock()
+
+ w.cache = nil
+ w.update = time.Now()
+
+ w.skip = make(map[common.Hash]struct{})
+ messages := w.ref.Messages(w.id)
+ for _, message := range messages {
+ w.skip[message.Hash] = struct{}{}
+ }
+ return messages
+}
+
+// insert injects a new batch of messages into the filter cache.
+func (w *whisperFilter) insert(messages ...WhisperMessage) {
+ w.lock.Lock()
+ defer w.lock.Unlock()
+
+ for _, message := range messages {
+ if _, ok := w.skip[message.ref.Hash]; !ok {
+ w.cache = append(w.cache, messages...)
+ }
+ }
+}
+
+// retrieve fetches all the cached messages from the filter.
+func (w *whisperFilter) retrieve() (messages []WhisperMessage) {
+ w.lock.Lock()
+ defer w.lock.Unlock()
+
+ messages, w.cache = w.cache, nil
+ w.update = time.Now()
+
+ return
+}
+
+// activity returns the last time instance when client requests were executed on
+// the filter.
+func (w *whisperFilter) activity() time.Time {
+ w.lock.RLock()
+ defer w.lock.RUnlock()
+
+ return w.update
+}
+
+// newWhisperFilter creates a new serialized, poll based whisper topic filter.
+func newWhisperFilter(id int, ref *Whisper) *whisperFilter {
+ return &whisperFilter{
+ id: id,
+ ref: ref,
+
+ update: time.Now(),
+ skip: make(map[common.Hash]struct{}),
+ }
+}
+
+// NewWhisperMessage converts an internal message into an API version.
+func NewWhisperMessage(message *Message) WhisperMessage {
+ return WhisperMessage{
+ ref: message,
+
+ Payload: common.ToHex(message.Payload),
+ From: common.ToHex(crypto.FromECDSAPub(message.Recover())),
+ To: common.ToHex(crypto.FromECDSAPub(message.To)),
+ Sent: message.Sent.Unix(),
+ TTL: int64(message.TTL / time.Second),
+ Hash: common.ToHex(message.Hash.Bytes()),
+ }
+}
diff --git a/whisper/whisperv2/doc.go b/whisper/whisperv2/doc.go
new file mode 100644
index 000000000..7252f44b1
--- /dev/null
+++ b/whisper/whisperv2/doc.go
@@ -0,0 +1,32 @@
+// Copyright 2014 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+/*
+Package whisper implements the Whisper PoC-1.
+
+(https://github.com/ethereum/wiki/wiki/Whisper-PoC-1-Protocol-Spec)
+
+Whisper combines aspects of both DHTs and datagram messaging systems (e.g. UDP).
+As such it may be likened and compared to both, not dissimilar to the
+matter/energy duality (apologies to physicists for the blatant abuse of a
+fundamental and beautiful natural principle).
+
+Whisper is a pure identity-based messaging system. Whisper provides a low-level
+(non-application-specific) but easily-accessible API without being based upon
+or prejudiced by the low-level hardware attributes and characteristics,
+particularly the notion of singular endpoints.
+*/
+package whisperv2
diff --git a/whisper/whisperv2/envelope.go b/whisper/whisperv2/envelope.go
new file mode 100644
index 000000000..7110ab457
--- /dev/null
+++ b/whisper/whisperv2/envelope.go
@@ -0,0 +1,147 @@
+// Copyright 2014 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+// Contains the Whisper protocol Envelope element. For formal details please see
+// the specs at https://github.com/ethereum/wiki/wiki/Whisper-PoC-1-Protocol-Spec#envelopes.
+
+package whisperv2
+
+import (
+ "crypto/ecdsa"
+ "encoding/binary"
+ "fmt"
+ "time"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/crypto"
+ "github.com/ethereum/go-ethereum/crypto/ecies"
+ "github.com/ethereum/go-ethereum/rlp"
+)
+
+// Envelope represents a clear-text data packet to transmit through the Whisper
+// network. Its contents may or may not be encrypted and signed.
+type Envelope struct {
+ Expiry uint32 // Whisper protocol specifies int32, really should be int64
+ TTL uint32 // ^^^^^^
+ Topics []Topic
+ Data []byte
+ Nonce uint32
+
+ hash common.Hash // Cached hash of the envelope to avoid rehashing every time
+}
+
+// NewEnvelope wraps a Whisper message with expiration and destination data
+// included into an envelope for network forwarding.
+func NewEnvelope(ttl time.Duration, topics []Topic, msg *Message) *Envelope {
+ return &Envelope{
+ Expiry: uint32(time.Now().Add(ttl).Unix()),
+ TTL: uint32(ttl.Seconds()),
+ Topics: topics,
+ Data: msg.bytes(),
+ Nonce: 0,
+ }
+}
+
+// Seal closes the envelope by spending the requested amount of time as a proof
+// of work on hashing the data.
+func (self *Envelope) Seal(pow time.Duration) {
+ d := make([]byte, 64)
+ copy(d[:32], self.rlpWithoutNonce())
+
+ finish, bestBit := time.Now().Add(pow).UnixNano(), 0
+ for nonce := uint32(0); time.Now().UnixNano() < finish; {
+ for i := 0; i < 1024; i++ {
+ binary.BigEndian.PutUint32(d[60:], nonce)
+
+ firstBit := common.FirstBitSet(common.BigD(crypto.Keccak256(d)))
+ if firstBit > bestBit {
+ self.Nonce, bestBit = nonce, firstBit
+ }
+ nonce++
+ }
+ }
+}
+
+// rlpWithoutNonce returns the RLP encoded envelope contents, except the nonce.
+func (self *Envelope) rlpWithoutNonce() []byte {
+ enc, _ := rlp.EncodeToBytes([]interface{}{self.Expiry, self.TTL, self.Topics, self.Data})
+ return enc
+}
+
+// Open extracts the message contained within a potentially encrypted envelope.
+func (self *Envelope) Open(key *ecdsa.PrivateKey) (msg *Message, err error) {
+ // Split open the payload into a message construct
+ data := self.Data
+
+ message := &Message{
+ Flags: data[0],
+ Sent: time.Unix(int64(self.Expiry-self.TTL), 0),
+ TTL: time.Duration(self.TTL) * time.Second,
+ Hash: self.Hash(),
+ }
+ data = data[1:]
+
+ if message.Flags&signatureFlag == signatureFlag {
+ if len(data) < signatureLength {
+ return nil, fmt.Errorf("unable to open envelope. First bit set but len(data) < len(signature)")
+ }
+ message.Signature, data = data[:signatureLength], data[signatureLength:]
+ }
+ message.Payload = data
+
+ // Decrypt the message, if requested
+ if key == nil {
+ return message, nil
+ }
+ err = message.decrypt(key)
+ switch err {
+ case nil:
+ return message, nil
+
+ case ecies.ErrInvalidPublicKey: // Payload isn't encrypted
+ return message, err
+
+ default:
+ return nil, fmt.Errorf("unable to open envelope, decrypt failed: %v", err)
+ }
+}
+
+// Hash returns the SHA3 hash of the envelope, calculating it if not yet done.
+func (self *Envelope) Hash() common.Hash {
+ if (self.hash == common.Hash{}) {
+ enc, _ := rlp.EncodeToBytes(self)
+ self.hash = crypto.Keccak256Hash(enc)
+ }
+ return self.hash
+}
+
+// DecodeRLP decodes an Envelope from an RLP data stream.
+func (self *Envelope) DecodeRLP(s *rlp.Stream) error {
+ raw, err := s.Raw()
+ if err != nil {
+ return err
+ }
+ // The decoding of Envelope uses the struct fields but also needs
+ // to compute the hash of the whole RLP-encoded envelope. This
+ // type has the same structure as Envelope but is not an
+ // rlp.Decoder so we can reuse the Envelope struct definition.
+ type rlpenv Envelope
+ if err := rlp.DecodeBytes(raw, (*rlpenv)(self)); err != nil {
+ return err
+ }
+ self.hash = crypto.Keccak256Hash(raw)
+ return nil
+}
diff --git a/whisper/whisperv2/envelope_test.go b/whisper/whisperv2/envelope_test.go
new file mode 100644
index 000000000..75e2fbe8a
--- /dev/null
+++ b/whisper/whisperv2/envelope_test.go
@@ -0,0 +1,158 @@
+// Copyright 2015 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package whisperv2
+
+import (
+ "bytes"
+ "testing"
+ "time"
+
+ "github.com/ethereum/go-ethereum/crypto"
+ "github.com/ethereum/go-ethereum/crypto/ecies"
+)
+
+func TestEnvelopeOpen(t *testing.T) {
+ payload := []byte("hello world")
+ message := NewMessage(payload)
+
+ envelope, err := message.Wrap(DefaultPoW, Options{})
+ if err != nil {
+ t.Fatalf("failed to wrap message: %v", err)
+ }
+ opened, err := envelope.Open(nil)
+ if err != nil {
+ t.Fatalf("failed to open envelope: %v", err)
+ }
+ if opened.Flags != message.Flags {
+ t.Fatalf("flags mismatch: have %d, want %d", opened.Flags, message.Flags)
+ }
+ if bytes.Compare(opened.Signature, message.Signature) != 0 {
+ t.Fatalf("signature mismatch: have 0x%x, want 0x%x", opened.Signature, message.Signature)
+ }
+ if bytes.Compare(opened.Payload, message.Payload) != 0 {
+ t.Fatalf("payload mismatch: have 0x%x, want 0x%x", opened.Payload, message.Payload)
+ }
+ if opened.Sent.Unix() != message.Sent.Unix() {
+ t.Fatalf("send time mismatch: have %d, want %d", opened.Sent, message.Sent)
+ }
+ if opened.TTL/time.Second != DefaultTTL/time.Second {
+ t.Fatalf("message TTL mismatch: have %v, want %v", opened.TTL, DefaultTTL)
+ }
+
+ if opened.Hash != envelope.Hash() {
+ t.Fatalf("message hash mismatch: have 0x%x, want 0x%x", opened.Hash, envelope.Hash())
+ }
+}
+
+func TestEnvelopeAnonymousOpenUntargeted(t *testing.T) {
+ payload := []byte("hello envelope")
+ envelope, err := NewMessage(payload).Wrap(DefaultPoW, Options{})
+ if err != nil {
+ t.Fatalf("failed to wrap message: %v", err)
+ }
+ opened, err := envelope.Open(nil)
+ if err != nil {
+ t.Fatalf("failed to open envelope: %v", err)
+ }
+ if opened.To != nil {
+ t.Fatalf("recipient mismatch: have 0x%x, want nil", opened.To)
+ }
+ if bytes.Compare(opened.Payload, payload) != 0 {
+ t.Fatalf("payload mismatch: have 0x%x, want 0x%x", opened.Payload, payload)
+ }
+}
+
+func TestEnvelopeAnonymousOpenTargeted(t *testing.T) {
+ key, err := crypto.GenerateKey()
+ if err != nil {
+ t.Fatalf("failed to generate test identity: %v", err)
+ }
+
+ payload := []byte("hello envelope")
+ envelope, err := NewMessage(payload).Wrap(DefaultPoW, Options{
+ To: &key.PublicKey,
+ })
+ if err != nil {
+ t.Fatalf("failed to wrap message: %v", err)
+ }
+ opened, err := envelope.Open(nil)
+ if err != nil {
+ t.Fatalf("failed to open envelope: %v", err)
+ }
+ if opened.To != nil {
+ t.Fatalf("recipient mismatch: have 0x%x, want nil", opened.To)
+ }
+ if bytes.Compare(opened.Payload, payload) == 0 {
+ t.Fatalf("payload match, should have been encrypted: 0x%x", opened.Payload)
+ }
+}
+
+func TestEnvelopeIdentifiedOpenUntargeted(t *testing.T) {
+ key, err := crypto.GenerateKey()
+ if err != nil {
+ t.Fatalf("failed to generate test identity: %v", err)
+ }
+
+ payload := []byte("hello envelope")
+ envelope, err := NewMessage(payload).Wrap(DefaultPoW, Options{})
+ if err != nil {
+ t.Fatalf("failed to wrap message: %v", err)
+ }
+ opened, err := envelope.Open(key)
+ switch err {
+ case nil:
+ t.Fatalf("envelope opened with bad key: %v", opened)
+
+ case ecies.ErrInvalidPublicKey:
+ // Ok, key mismatch but opened
+
+ default:
+ t.Fatalf("failed to open envelope: %v", err)
+ }
+
+ if opened.To != nil {
+ t.Fatalf("recipient mismatch: have 0x%x, want nil", opened.To)
+ }
+ if bytes.Compare(opened.Payload, payload) != 0 {
+ t.Fatalf("payload mismatch: have 0x%x, want 0x%x", opened.Payload, payload)
+ }
+}
+
+func TestEnvelopeIdentifiedOpenTargeted(t *testing.T) {
+ key, err := crypto.GenerateKey()
+ if err != nil {
+ t.Fatalf("failed to generate test identity: %v", err)
+ }
+
+ payload := []byte("hello envelope")
+ envelope, err := NewMessage(payload).Wrap(DefaultPoW, Options{
+ To: &key.PublicKey,
+ })
+ if err != nil {
+ t.Fatalf("failed to wrap message: %v", err)
+ }
+ opened, err := envelope.Open(key)
+ if err != nil {
+ t.Fatalf("failed to open envelope: %v", err)
+ }
+ if opened.To != nil {
+ t.Fatalf("recipient mismatch: have 0x%x, want nil", opened.To)
+ }
+ if bytes.Compare(opened.Payload, payload) != 0 {
+ t.Fatalf("payload mismatch: have 0x%x, want 0x%x", opened.Payload, payload)
+ }
+}
diff --git a/whisper/whisperv2/filter.go b/whisper/whisperv2/filter.go
new file mode 100644
index 000000000..8ce4a54fb
--- /dev/null
+++ b/whisper/whisperv2/filter.go
@@ -0,0 +1,132 @@
+// Copyright 2014 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+// Contains the message filter for fine grained subscriptions.
+
+package whisperv2
+
+import (
+ "crypto/ecdsa"
+
+ "github.com/ethereum/go-ethereum/event/filter"
+)
+
+// Filter is used to subscribe to specific types of whisper messages.
+type Filter struct {
+ To *ecdsa.PublicKey // Recipient of the message
+ From *ecdsa.PublicKey // Sender of the message
+ Topics [][]Topic // Topics to filter messages with
+ Fn func(msg *Message) // Handler in case of a match
+}
+
+// NewFilterTopics creates a 2D topic array used by whisper.Filter from binary
+// data elements.
+func NewFilterTopics(data ...[][]byte) [][]Topic {
+ filter := make([][]Topic, len(data))
+ for i, condition := range data {
+ // Handle the special case of condition == [[]byte{}]
+ if len(condition) == 1 && len(condition[0]) == 0 {
+ filter[i] = []Topic{}
+ continue
+ }
+ // Otherwise flatten normally
+ filter[i] = NewTopics(condition...)
+ }
+ return filter
+}
+
+// NewFilterTopicsFlat creates a 2D topic array used by whisper.Filter from flat
+// binary data elements.
+func NewFilterTopicsFlat(data ...[]byte) [][]Topic {
+ filter := make([][]Topic, len(data))
+ for i, element := range data {
+ // Only add non-wildcard topics
+ filter[i] = make([]Topic, 0, 1)
+ if len(element) > 0 {
+ filter[i] = append(filter[i], NewTopic(element))
+ }
+ }
+ return filter
+}
+
+// NewFilterTopicsFromStrings creates a 2D topic array used by whisper.Filter
+// from textual data elements.
+func NewFilterTopicsFromStrings(data ...[]string) [][]Topic {
+ filter := make([][]Topic, len(data))
+ for i, condition := range data {
+ // Handle the special case of condition == [""]
+ if len(condition) == 1 && condition[0] == "" {
+ filter[i] = []Topic{}
+ continue
+ }
+ // Otherwise flatten normally
+ filter[i] = NewTopicsFromStrings(condition...)
+ }
+ return filter
+}
+
+// NewFilterTopicsFromStringsFlat creates a 2D topic array used by whisper.Filter from flat
+// binary data elements.
+func NewFilterTopicsFromStringsFlat(data ...string) [][]Topic {
+ filter := make([][]Topic, len(data))
+ for i, element := range data {
+ // Only add non-wildcard topics
+ filter[i] = make([]Topic, 0, 1)
+ if element != "" {
+ filter[i] = append(filter[i], NewTopicFromString(element))
+ }
+ }
+ return filter
+}
+
+// filterer is the internal, fully initialized filter ready to match inbound
+// messages to a variety of criteria.
+type filterer struct {
+ to string // Recipient of the message
+ from string // Sender of the message
+ matcher *topicMatcher // Topics to filter messages with
+ fn func(data interface{}) // Handler in case of a match
+}
+
+// Compare checks if the specified filter matches the current one.
+func (self filterer) Compare(f filter.Filter) bool {
+ filter := f.(filterer)
+
+ // Check the message sender and recipient
+ if len(self.to) > 0 && self.to != filter.to {
+ return false
+ }
+ if len(self.from) > 0 && self.from != filter.from {
+ return false
+ }
+ // Check the topic filtering
+ topics := make([]Topic, len(filter.matcher.conditions))
+ for i, group := range filter.matcher.conditions {
+ // Message should contain a single topic entry, extract
+ for topics[i], _ = range group {
+ break
+ }
+ }
+ if !self.matcher.Matches(topics) {
+ return false
+ }
+ return true
+}
+
+// Trigger is called when a filter successfully matches an inbound message.
+func (self filterer) Trigger(data interface{}) {
+ self.fn(data)
+}
diff --git a/whisper/whisperv2/filter_test.go b/whisper/whisperv2/filter_test.go
new file mode 100644
index 000000000..5a14a84bb
--- /dev/null
+++ b/whisper/whisperv2/filter_test.go
@@ -0,0 +1,215 @@
+// Copyright 2015 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package whisperv2
+
+import (
+ "bytes"
+
+ "testing"
+)
+
+var filterTopicsCreationTests = []struct {
+ topics [][]string
+ filter [][][4]byte
+}{
+ { // Simple topic filter
+ topics: [][]string{
+ {"abc", "def", "ghi"},
+ {"def"},
+ {"ghi", "abc"},
+ },
+ filter: [][][4]byte{
+ {{0x4e, 0x03, 0x65, 0x7a}, {0x34, 0x60, 0x7c, 0x9b}, {0x21, 0x41, 0x7d, 0xf9}},
+ {{0x34, 0x60, 0x7c, 0x9b}},
+ {{0x21, 0x41, 0x7d, 0xf9}, {0x4e, 0x03, 0x65, 0x7a}},
+ },
+ },
+ { // Wild-carded topic filter
+ topics: [][]string{
+ {"abc", "def", "ghi"},
+ {},
+ {""},
+ {"def"},
+ },
+ filter: [][][4]byte{
+ {{0x4e, 0x03, 0x65, 0x7a}, {0x34, 0x60, 0x7c, 0x9b}, {0x21, 0x41, 0x7d, 0xf9}},
+ {},
+ {},
+ {{0x34, 0x60, 0x7c, 0x9b}},
+ },
+ },
+}
+
+var filterTopicsCreationFlatTests = []struct {
+ topics []string
+ filter [][][4]byte
+}{
+ { // Simple topic list
+ topics: []string{"abc", "def", "ghi"},
+ filter: [][][4]byte{
+ {{0x4e, 0x03, 0x65, 0x7a}},
+ {{0x34, 0x60, 0x7c, 0x9b}},
+ {{0x21, 0x41, 0x7d, 0xf9}},
+ },
+ },
+ { // Wild-carded topic list
+ topics: []string{"abc", "", "ghi"},
+ filter: [][][4]byte{
+ {{0x4e, 0x03, 0x65, 0x7a}},
+ {},
+ {{0x21, 0x41, 0x7d, 0xf9}},
+ },
+ },
+}
+
+func TestFilterTopicsCreation(t *testing.T) {
+ // Check full filter creation
+ for i, tt := range filterTopicsCreationTests {
+ // Check the textual creation
+ filter := NewFilterTopicsFromStrings(tt.topics...)
+ if len(filter) != len(tt.topics) {
+ t.Errorf("test %d: condition count mismatch: have %v, want %v", i, len(filter), len(tt.topics))
+ continue
+ }
+ for j, condition := range filter {
+ if len(condition) != len(tt.filter[j]) {
+ t.Errorf("test %d, condition %d: size mismatch: have %v, want %v", i, j, len(condition), len(tt.filter[j]))
+ continue
+ }
+ for k := 0; k < len(condition); k++ {
+ if bytes.Compare(condition[k][:], tt.filter[j][k][:]) != 0 {
+ t.Errorf("test %d, condition %d, segment %d: filter mismatch: have 0x%x, want 0x%x", i, j, k, condition[k], tt.filter[j][k])
+ }
+ }
+ }
+ // Check the binary creation
+ binary := make([][][]byte, len(tt.topics))
+ for j, condition := range tt.topics {
+ binary[j] = make([][]byte, len(condition))
+ for k, segment := range condition {
+ binary[j][k] = []byte(segment)
+ }
+ }
+ filter = NewFilterTopics(binary...)
+ if len(filter) != len(tt.topics) {
+ t.Errorf("test %d: condition count mismatch: have %v, want %v", i, len(filter), len(tt.topics))
+ continue
+ }
+ for j, condition := range filter {
+ if len(condition) != len(tt.filter[j]) {
+ t.Errorf("test %d, condition %d: size mismatch: have %v, want %v", i, j, len(condition), len(tt.filter[j]))
+ continue
+ }
+ for k := 0; k < len(condition); k++ {
+ if bytes.Compare(condition[k][:], tt.filter[j][k][:]) != 0 {
+ t.Errorf("test %d, condition %d, segment %d: filter mismatch: have 0x%x, want 0x%x", i, j, k, condition[k], tt.filter[j][k])
+ }
+ }
+ }
+ }
+ // Check flat filter creation
+ for i, tt := range filterTopicsCreationFlatTests {
+ // Check the textual creation
+ filter := NewFilterTopicsFromStringsFlat(tt.topics...)
+ if len(filter) != len(tt.topics) {
+ t.Errorf("test %d: condition count mismatch: have %v, want %v", i, len(filter), len(tt.topics))
+ continue
+ }
+ for j, condition := range filter {
+ if len(condition) != len(tt.filter[j]) {
+ t.Errorf("test %d, condition %d: size mismatch: have %v, want %v", i, j, len(condition), len(tt.filter[j]))
+ continue
+ }
+ for k := 0; k < len(condition); k++ {
+ if bytes.Compare(condition[k][:], tt.filter[j][k][:]) != 0 {
+ t.Errorf("test %d, condition %d, segment %d: filter mismatch: have 0x%x, want 0x%x", i, j, k, condition[k], tt.filter[j][k])
+ }
+ }
+ }
+ // Check the binary creation
+ binary := make([][]byte, len(tt.topics))
+ for j, topic := range tt.topics {
+ binary[j] = []byte(topic)
+ }
+ filter = NewFilterTopicsFlat(binary...)
+ if len(filter) != len(tt.topics) {
+ t.Errorf("test %d: condition count mismatch: have %v, want %v", i, len(filter), len(tt.topics))
+ continue
+ }
+ for j, condition := range filter {
+ if len(condition) != len(tt.filter[j]) {
+ t.Errorf("test %d, condition %d: size mismatch: have %v, want %v", i, j, len(condition), len(tt.filter[j]))
+ continue
+ }
+ for k := 0; k < len(condition); k++ {
+ if bytes.Compare(condition[k][:], tt.filter[j][k][:]) != 0 {
+ t.Errorf("test %d, condition %d, segment %d: filter mismatch: have 0x%x, want 0x%x", i, j, k, condition[k], tt.filter[j][k])
+ }
+ }
+ }
+ }
+}
+
+var filterCompareTests = []struct {
+ matcher filterer
+ message filterer
+ match bool
+}{
+ { // Wild-card filter matching anything
+ matcher: filterer{to: "", from: "", matcher: newTopicMatcher()},
+ message: filterer{to: "to", from: "from", matcher: newTopicMatcher(NewFilterTopicsFromStringsFlat("topic")...)},
+ match: true,
+ },
+ { // Filter matching the to field
+ matcher: filterer{to: "to", from: "", matcher: newTopicMatcher()},
+ message: filterer{to: "to", from: "from", matcher: newTopicMatcher(NewFilterTopicsFromStringsFlat("topic")...)},
+ match: true,
+ },
+ { // Filter rejecting the to field
+ matcher: filterer{to: "to", from: "", matcher: newTopicMatcher()},
+ message: filterer{to: "", from: "from", matcher: newTopicMatcher(NewFilterTopicsFromStringsFlat("topic")...)},
+ match: false,
+ },
+ { // Filter matching the from field
+ matcher: filterer{to: "", from: "from", matcher: newTopicMatcher()},
+ message: filterer{to: "to", from: "from", matcher: newTopicMatcher(NewFilterTopicsFromStringsFlat("topic")...)},
+ match: true,
+ },
+ { // Filter rejecting the from field
+ matcher: filterer{to: "", from: "from", matcher: newTopicMatcher()},
+ message: filterer{to: "to", from: "", matcher: newTopicMatcher(NewFilterTopicsFromStringsFlat("topic")...)},
+ match: false,
+ },
+ { // Filter matching the topic field
+ matcher: filterer{to: "", from: "from", matcher: newTopicMatcher(NewFilterTopicsFromStringsFlat("topic")...)},
+ message: filterer{to: "to", from: "from", matcher: newTopicMatcher(NewFilterTopicsFromStringsFlat("topic")...)},
+ match: true,
+ },
+ { // Filter rejecting the topic field
+ matcher: filterer{to: "", from: "", matcher: newTopicMatcher(NewFilterTopicsFromStringsFlat("topic")...)},
+ message: filterer{to: "to", from: "from", matcher: newTopicMatcher()},
+ match: false,
+ },
+}
+
+func TestFilterCompare(t *testing.T) {
+ for i, tt := range filterCompareTests {
+ if match := tt.matcher.Compare(tt.message); match != tt.match {
+ t.Errorf("test %d: match mismatch: have %v, want %v", i, match, tt.match)
+ }
+ }
+}
diff --git a/whisper/whisperv2/main.go b/whisper/whisperv2/main.go
new file mode 100644
index 000000000..be4160489
--- /dev/null
+++ b/whisper/whisperv2/main.go
@@ -0,0 +1,106 @@
+// Copyright 2014 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+// +build none
+
+// Contains a simple whisper peer setup and self messaging to allow playing
+// around with the protocol and API without a fancy client implementation.
+
+package main
+
+import (
+ "fmt"
+ "log"
+ "os"
+ "time"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/crypto"
+ "github.com/ethereum/go-ethereum/logger"
+ "github.com/ethereum/go-ethereum/p2p"
+ "github.com/ethereum/go-ethereum/p2p/nat"
+ "github.com/ethereum/go-ethereum/whisper"
+)
+
+func main() {
+ logger.AddLogSystem(logger.NewStdLogSystem(os.Stdout, log.LstdFlags, logger.InfoLevel))
+
+ // Generate the peer identity
+ key, err := crypto.GenerateKey()
+ if err != nil {
+ fmt.Printf("Failed to generate peer key: %v.\n", err)
+ os.Exit(-1)
+ }
+ name := common.MakeName("whisper-go", "1.0")
+ shh := whisper.New()
+
+ // Create an Ethereum peer to communicate through
+ server := p2p.Server{
+ PrivateKey: key,
+ MaxPeers: 10,
+ Name: name,
+ Protocols: []p2p.Protocol{shh.Protocol()},
+ ListenAddr: ":30300",
+ NAT: nat.Any(),
+ }
+ fmt.Println("Starting Ethereum peer...")
+ if err := server.Start(); err != nil {
+ fmt.Printf("Failed to start Ethereum peer: %v.\n", err)
+ os.Exit(1)
+ }
+
+ // Send a message to self to check that something works
+ payload := fmt.Sprintf("Hello world, this is %v. In case you're wondering, the time is %v", name, time.Now())
+ if err := selfSend(shh, []byte(payload)); err != nil {
+ fmt.Printf("Failed to self message: %v.\n", err)
+ os.Exit(-1)
+ }
+}
+
+// SendSelf wraps a payload into a Whisper envelope and forwards it to itself.
+func selfSend(shh *whisper.Whisper, payload []byte) error {
+ ok := make(chan struct{})
+
+ // Start watching for self messages, output any arrivals
+ id := shh.NewIdentity()
+ shh.Watch(whisper.Filter{
+ To: &id.PublicKey,
+ Fn: func(msg *whisper.Message) {
+ fmt.Printf("Message received: %s, signed with 0x%x.\n", string(msg.Payload), msg.Signature)
+ close(ok)
+ },
+ })
+ // Wrap the payload and encrypt it
+ msg := whisper.NewMessage(payload)
+ envelope, err := msg.Wrap(whisper.DefaultPoW, whisper.Options{
+ From: id,
+ To: &id.PublicKey,
+ TTL: whisper.DefaultTTL,
+ })
+ if err != nil {
+ return fmt.Errorf("failed to seal message: %v", err)
+ }
+ // Dump the message into the system and wait for it to pop back out
+ if err := shh.Send(envelope); err != nil {
+ return fmt.Errorf("failed to send self-message: %v", err)
+ }
+ select {
+ case <-ok:
+ case <-time.After(time.Second):
+ return fmt.Errorf("failed to receive message in time")
+ }
+ return nil
+}
diff --git a/whisper/whisperv2/message.go b/whisper/whisperv2/message.go
new file mode 100644
index 000000000..7ef9d0912
--- /dev/null
+++ b/whisper/whisperv2/message.go
@@ -0,0 +1,156 @@
+// Copyright 2014 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+// Contains the Whisper protocol Message element. For formal details please see
+// the specs at https://github.com/ethereum/wiki/wiki/Whisper-PoC-1-Protocol-Spec#messages.
+
+package whisperv2
+
+import (
+ "crypto/ecdsa"
+ "math/rand"
+ "time"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/crypto"
+ "github.com/ethereum/go-ethereum/logger"
+ "github.com/ethereum/go-ethereum/logger/glog"
+)
+
+// Message represents an end-user data packet to transmit through the Whisper
+// protocol. These are wrapped into Envelopes that need not be understood by
+// intermediate nodes, just forwarded.
+type Message struct {
+ Flags byte // First bit is signature presence, rest reserved and should be random
+ Signature []byte
+ Payload []byte
+
+ Sent time.Time // Time when the message was posted into the network
+ TTL time.Duration // Maximum time to live allowed for the message
+
+ To *ecdsa.PublicKey // Message recipient (identity used to decode the message)
+ Hash common.Hash // Message envelope hash to act as a unique id
+}
+
+// Options specifies the exact way a message should be wrapped into an Envelope.
+type Options struct {
+ From *ecdsa.PrivateKey
+ To *ecdsa.PublicKey
+ TTL time.Duration
+ Topics []Topic
+}
+
+// NewMessage creates and initializes a non-signed, non-encrypted Whisper message.
+func NewMessage(payload []byte) *Message {
+ // Construct an initial flag set: no signature, rest random
+ flags := byte(rand.Intn(256))
+ flags &= ^signatureFlag
+
+ // Assemble and return the message
+ return &Message{
+ Flags: flags,
+ Payload: payload,
+ Sent: time.Now(),
+ }
+}
+
+// Wrap bundles the message into an Envelope to transmit over the network.
+//
+// pow (Proof Of Work) controls how much time to spend on hashing the message,
+// inherently controlling its priority through the network (smaller hash, bigger
+// priority).
+//
+// The user can control the amount of identity, privacy and encryption through
+// the options parameter as follows:
+// - options.From == nil && options.To == nil: anonymous broadcast
+// - options.From != nil && options.To == nil: signed broadcast (known sender)
+// - options.From == nil && options.To != nil: encrypted anonymous message
+// - options.From != nil && options.To != nil: encrypted signed message
+func (self *Message) Wrap(pow time.Duration, options Options) (*Envelope, error) {
+ // Use the default TTL if non was specified
+ if options.TTL == 0 {
+ options.TTL = DefaultTTL
+ }
+ self.TTL = options.TTL
+
+ // Sign and encrypt the message if requested
+ if options.From != nil {
+ if err := self.sign(options.From); err != nil {
+ return nil, err
+ }
+ }
+ if options.To != nil {
+ if err := self.encrypt(options.To); err != nil {
+ return nil, err
+ }
+ }
+ // Wrap the processed message, seal it and return
+ envelope := NewEnvelope(options.TTL, options.Topics, self)
+ envelope.Seal(pow)
+
+ return envelope, nil
+}
+
+// sign calculates and sets the cryptographic signature for the message , also
+// setting the sign flag.
+func (self *Message) sign(key *ecdsa.PrivateKey) (err error) {
+ self.Flags |= signatureFlag
+ self.Signature, err = crypto.Sign(self.hash(), key)
+ return
+}
+
+// Recover retrieves the public key of the message signer.
+func (self *Message) Recover() *ecdsa.PublicKey {
+ defer func() { recover() }() // in case of invalid signature
+
+ // Short circuit if no signature is present
+ if self.Signature == nil {
+ return nil
+ }
+ // Otherwise try and recover the signature
+ pub, err := crypto.SigToPub(self.hash(), self.Signature)
+ if err != nil {
+ glog.V(logger.Error).Infof("Could not get public key from signature: %v", err)
+ return nil
+ }
+ return pub
+}
+
+// encrypt encrypts a message payload with a public key.
+func (self *Message) encrypt(key *ecdsa.PublicKey) (err error) {
+ self.Payload, err = crypto.Encrypt(key, self.Payload)
+ return
+}
+
+// decrypt decrypts an encrypted payload with a private key.
+func (self *Message) decrypt(key *ecdsa.PrivateKey) error {
+ cleartext, err := crypto.Decrypt(key, self.Payload)
+ if err == nil {
+ self.Payload = cleartext
+ }
+ return err
+}
+
+// hash calculates the SHA3 checksum of the message flags and payload.
+func (self *Message) hash() []byte {
+ return crypto.Keccak256(append([]byte{self.Flags}, self.Payload...))
+}
+
+// bytes flattens the message contents (flags, signature and payload) into a
+// single binary blob.
+func (self *Message) bytes() []byte {
+ return append([]byte{self.Flags}, append(self.Signature, self.Payload...)...)
+}
diff --git a/whisper/whisperv2/message_test.go b/whisper/whisperv2/message_test.go
new file mode 100644
index 000000000..efa64e431
--- /dev/null
+++ b/whisper/whisperv2/message_test.go
@@ -0,0 +1,159 @@
+// Copyright 2014 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package whisperv2
+
+import (
+ "bytes"
+ "crypto/elliptic"
+ "testing"
+ "time"
+
+ "github.com/ethereum/go-ethereum/crypto"
+ "github.com/ethereum/go-ethereum/crypto/secp256k1"
+)
+
+// Tests whether a message can be wrapped without any identity or encryption.
+func TestMessageSimpleWrap(t *testing.T) {
+ payload := []byte("hello world")
+
+ msg := NewMessage(payload)
+ if _, err := msg.Wrap(DefaultPoW, Options{}); err != nil {
+ t.Fatalf("failed to wrap message: %v", err)
+ }
+ if msg.Flags&signatureFlag != 0 {
+ t.Fatalf("signature flag mismatch: have %d, want %d", msg.Flags&signatureFlag, 0)
+ }
+ if len(msg.Signature) != 0 {
+ t.Fatalf("signature found for simple wrapping: 0x%x", msg.Signature)
+ }
+ if bytes.Compare(msg.Payload, payload) != 0 {
+ t.Fatalf("payload mismatch after wrapping: have 0x%x, want 0x%x", msg.Payload, payload)
+ }
+ if msg.TTL/time.Second != DefaultTTL/time.Second {
+ t.Fatalf("message TTL mismatch: have %v, want %v", msg.TTL, DefaultTTL)
+ }
+}
+
+// Tests whether a message can be signed, and wrapped in plain-text.
+func TestMessageCleartextSignRecover(t *testing.T) {
+ key, err := crypto.GenerateKey()
+ if err != nil {
+ t.Fatalf("failed to create crypto key: %v", err)
+ }
+ payload := []byte("hello world")
+
+ msg := NewMessage(payload)
+ if _, err := msg.Wrap(DefaultPoW, Options{
+ From: key,
+ }); err != nil {
+ t.Fatalf("failed to sign message: %v", err)
+ }
+ if msg.Flags&signatureFlag != signatureFlag {
+ t.Fatalf("signature flag mismatch: have %d, want %d", msg.Flags&signatureFlag, signatureFlag)
+ }
+ if bytes.Compare(msg.Payload, payload) != 0 {
+ t.Fatalf("payload mismatch after signing: have 0x%x, want 0x%x", msg.Payload, payload)
+ }
+
+ pubKey := msg.Recover()
+ if pubKey == nil {
+ t.Fatalf("failed to recover public key")
+ }
+ p1 := elliptic.Marshal(secp256k1.S256(), key.PublicKey.X, key.PublicKey.Y)
+ p2 := elliptic.Marshal(secp256k1.S256(), pubKey.X, pubKey.Y)
+ if !bytes.Equal(p1, p2) {
+ t.Fatalf("public key mismatch: have 0x%x, want 0x%x", p2, p1)
+ }
+}
+
+// Tests whether a message can be encrypted and decrypted using an anonymous
+// sender (i.e. no signature).
+func TestMessageAnonymousEncryptDecrypt(t *testing.T) {
+ key, err := crypto.GenerateKey()
+ if err != nil {
+ t.Fatalf("failed to create recipient crypto key: %v", err)
+ }
+ payload := []byte("hello world")
+
+ msg := NewMessage(payload)
+ envelope, err := msg.Wrap(DefaultPoW, Options{
+ To: &key.PublicKey,
+ })
+ if err != nil {
+ t.Fatalf("failed to encrypt message: %v", err)
+ }
+ if msg.Flags&signatureFlag != 0 {
+ t.Fatalf("signature flag mismatch: have %d, want %d", msg.Flags&signatureFlag, 0)
+ }
+ if len(msg.Signature) != 0 {
+ t.Fatalf("signature found for anonymous message: 0x%x", msg.Signature)
+ }
+
+ out, err := envelope.Open(key)
+ if err != nil {
+ t.Fatalf("failed to open encrypted message: %v", err)
+ }
+ if !bytes.Equal(out.Payload, payload) {
+ t.Errorf("payload mismatch: have 0x%x, want 0x%x", out.Payload, payload)
+ }
+}
+
+// Tests whether a message can be properly signed and encrypted.
+func TestMessageFullCrypto(t *testing.T) {
+ fromKey, err := crypto.GenerateKey()
+ if err != nil {
+ t.Fatalf("failed to create sender crypto key: %v", err)
+ }
+ toKey, err := crypto.GenerateKey()
+ if err != nil {
+ t.Fatalf("failed to create recipient crypto key: %v", err)
+ }
+
+ payload := []byte("hello world")
+ msg := NewMessage(payload)
+ envelope, err := msg.Wrap(DefaultPoW, Options{
+ From: fromKey,
+ To: &toKey.PublicKey,
+ })
+ if err != nil {
+ t.Fatalf("failed to encrypt message: %v", err)
+ }
+ if msg.Flags&signatureFlag != signatureFlag {
+ t.Fatalf("signature flag mismatch: have %d, want %d", msg.Flags&signatureFlag, signatureFlag)
+ }
+ if len(msg.Signature) == 0 {
+ t.Fatalf("no signature found for signed message")
+ }
+
+ out, err := envelope.Open(toKey)
+ if err != nil {
+ t.Fatalf("failed to open encrypted message: %v", err)
+ }
+ if !bytes.Equal(out.Payload, payload) {
+ t.Errorf("payload mismatch: have 0x%x, want 0x%x", out.Payload, payload)
+ }
+
+ pubKey := out.Recover()
+ if pubKey == nil {
+ t.Fatalf("failed to recover public key")
+ }
+ p1 := elliptic.Marshal(secp256k1.S256(), fromKey.PublicKey.X, fromKey.PublicKey.Y)
+ p2 := elliptic.Marshal(secp256k1.S256(), pubKey.X, pubKey.Y)
+ if !bytes.Equal(p1, p2) {
+ t.Fatalf("public key mismatch: have 0x%x, want 0x%x", p2, p1)
+ }
+}
diff --git a/whisper/whisperv2/peer.go b/whisper/whisperv2/peer.go
new file mode 100644
index 000000000..404ebd513
--- /dev/null
+++ b/whisper/whisperv2/peer.go
@@ -0,0 +1,175 @@
+// Copyright 2014 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package whisperv2
+
+import (
+ "fmt"
+ "time"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/logger"
+ "github.com/ethereum/go-ethereum/logger/glog"
+ "github.com/ethereum/go-ethereum/p2p"
+ "github.com/ethereum/go-ethereum/rlp"
+ "gopkg.in/fatih/set.v0"
+)
+
+// peer represents a whisper protocol peer connection.
+type peer struct {
+ host *Whisper
+ peer *p2p.Peer
+ ws p2p.MsgReadWriter
+
+ known *set.Set // Messages already known by the peer to avoid wasting bandwidth
+
+ quit chan struct{}
+}
+
+// newPeer creates a new whisper peer object, but does not run the handshake itself.
+func newPeer(host *Whisper, remote *p2p.Peer, rw p2p.MsgReadWriter) *peer {
+ return &peer{
+ host: host,
+ peer: remote,
+ ws: rw,
+ known: set.New(),
+ quit: make(chan struct{}),
+ }
+}
+
+// start initiates the peer updater, periodically broadcasting the whisper packets
+// into the network.
+func (self *peer) start() {
+ go self.update()
+ glog.V(logger.Debug).Infof("%v: whisper started", self.peer)
+}
+
+// stop terminates the peer updater, stopping message forwarding to it.
+func (self *peer) stop() {
+ close(self.quit)
+ glog.V(logger.Debug).Infof("%v: whisper stopped", self.peer)
+}
+
+// handshake sends the protocol initiation status message to the remote peer and
+// verifies the remote status too.
+func (self *peer) handshake() error {
+ // Send the handshake status message asynchronously
+ errc := make(chan error, 1)
+ go func() {
+ errc <- p2p.SendItems(self.ws, statusCode, protocolVersion)
+ }()
+ // Fetch the remote status packet and verify protocol match
+ packet, err := self.ws.ReadMsg()
+ if err != nil {
+ return err
+ }
+ if packet.Code != statusCode {
+ return fmt.Errorf("peer sent %x before status packet", packet.Code)
+ }
+ s := rlp.NewStream(packet.Payload, uint64(packet.Size))
+ if _, err := s.List(); err != nil {
+ return fmt.Errorf("bad status message: %v", err)
+ }
+ peerVersion, err := s.Uint()
+ if err != nil {
+ return fmt.Errorf("bad status message: %v", err)
+ }
+ if peerVersion != protocolVersion {
+ return fmt.Errorf("protocol version mismatch %d != %d", peerVersion, protocolVersion)
+ }
+ // Wait until out own status is consumed too
+ if err := <-errc; err != nil {
+ return fmt.Errorf("failed to send status packet: %v", err)
+ }
+ return nil
+}
+
+// update executes periodic operations on the peer, including message transmission
+// and expiration.
+func (self *peer) update() {
+ // Start the tickers for the updates
+ expire := time.NewTicker(expirationCycle)
+ transmit := time.NewTicker(transmissionCycle)
+
+ // Loop and transmit until termination is requested
+ for {
+ select {
+ case <-expire.C:
+ self.expire()
+
+ case <-transmit.C:
+ if err := self.broadcast(); err != nil {
+ glog.V(logger.Info).Infof("%v: broadcast failed: %v", self.peer, err)
+ return
+ }
+
+ case <-self.quit:
+ return
+ }
+ }
+}
+
+// mark marks an envelope known to the peer so that it won't be sent back.
+func (self *peer) mark(envelope *Envelope) {
+ self.known.Add(envelope.Hash())
+}
+
+// marked checks if an envelope is already known to the remote peer.
+func (self *peer) marked(envelope *Envelope) bool {
+ return self.known.Has(envelope.Hash())
+}
+
+// expire iterates over all the known envelopes in the host and removes all
+// expired (unknown) ones from the known list.
+func (self *peer) expire() {
+ // Assemble the list of available envelopes
+ available := set.NewNonTS()
+ for _, envelope := range self.host.envelopes() {
+ available.Add(envelope.Hash())
+ }
+ // Cross reference availability with known status
+ unmark := make(map[common.Hash]struct{})
+ self.known.Each(func(v interface{}) bool {
+ if !available.Has(v.(common.Hash)) {
+ unmark[v.(common.Hash)] = struct{}{}
+ }
+ return true
+ })
+ // Dump all known but unavailable
+ for hash, _ := range unmark {
+ self.known.Remove(hash)
+ }
+}
+
+// broadcast iterates over the collection of envelopes and transmits yet unknown
+// ones over the network.
+func (self *peer) broadcast() error {
+ // Fetch the envelopes and collect the unknown ones
+ envelopes := self.host.envelopes()
+ transmit := make([]*Envelope, 0, len(envelopes))
+ for _, envelope := range envelopes {
+ if !self.marked(envelope) {
+ transmit = append(transmit, envelope)
+ self.mark(envelope)
+ }
+ }
+ // Transmit the unknown batch (potentially empty)
+ if err := p2p.Send(self.ws, messagesCode, transmit); err != nil {
+ return err
+ }
+ glog.V(logger.Detail).Infoln(self.peer, "broadcasted", len(transmit), "message(s)")
+ return nil
+}
diff --git a/whisper/whisperv2/peer_test.go b/whisper/whisperv2/peer_test.go
new file mode 100644
index 000000000..9755e134c
--- /dev/null
+++ b/whisper/whisperv2/peer_test.go
@@ -0,0 +1,261 @@
+// Copyright 2015 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package whisperv2
+
+import (
+ "testing"
+ "time"
+
+ "github.com/ethereum/go-ethereum/p2p"
+ "github.com/ethereum/go-ethereum/p2p/discover"
+)
+
+type testPeer struct {
+ client *Whisper
+ stream *p2p.MsgPipeRW
+ termed chan struct{}
+}
+
+func startTestPeer() *testPeer {
+ // Create a simulated P2P remote peer and data streams to it
+ remote := p2p.NewPeer(discover.NodeID{}, "", nil)
+ tester, tested := p2p.MsgPipe()
+
+ // Create a whisper client and connect with it to the tester peer
+ client := New()
+ client.Start(nil)
+
+ termed := make(chan struct{})
+ go func() {
+ defer client.Stop()
+ defer close(termed)
+ defer tested.Close()
+
+ client.handlePeer(remote, tested)
+ }()
+
+ return &testPeer{
+ client: client,
+ stream: tester,
+ termed: termed,
+ }
+}
+
+func startTestPeerInited() (*testPeer, error) {
+ peer := startTestPeer()
+
+ if err := p2p.ExpectMsg(peer.stream, statusCode, []uint64{protocolVersion}); err != nil {
+ peer.stream.Close()
+ return nil, err
+ }
+ if err := p2p.SendItems(peer.stream, statusCode, protocolVersion); err != nil {
+ peer.stream.Close()
+ return nil, err
+ }
+ return peer, nil
+}
+
+func TestPeerStatusMessage(t *testing.T) {
+ tester := startTestPeer()
+
+ // Wait for the handshake status message and check it
+ if err := p2p.ExpectMsg(tester.stream, statusCode, []uint64{protocolVersion}); err != nil {
+ t.Fatalf("status message mismatch: %v", err)
+ }
+ // Terminate the node
+ tester.stream.Close()
+
+ select {
+ case <-tester.termed:
+ case <-time.After(time.Second):
+ t.Fatalf("local close timed out")
+ }
+}
+
+func TestPeerHandshakeFail(t *testing.T) {
+ tester := startTestPeer()
+
+ // Wait for and check the handshake
+ if err := p2p.ExpectMsg(tester.stream, statusCode, []uint64{protocolVersion}); err != nil {
+ t.Fatalf("status message mismatch: %v", err)
+ }
+ // Send an invalid handshake status and verify disconnect
+ if err := p2p.SendItems(tester.stream, messagesCode); err != nil {
+ t.Fatalf("failed to send malformed status: %v", err)
+ }
+ select {
+ case <-tester.termed:
+ case <-time.After(time.Second):
+ t.Fatalf("remote close timed out")
+ }
+}
+
+func TestPeerHandshakeSuccess(t *testing.T) {
+ tester := startTestPeer()
+
+ // Wait for and check the handshake
+ if err := p2p.ExpectMsg(tester.stream, statusCode, []uint64{protocolVersion}); err != nil {
+ t.Fatalf("status message mismatch: %v", err)
+ }
+ // Send a valid handshake status and make sure connection stays live
+ if err := p2p.SendItems(tester.stream, statusCode, protocolVersion); err != nil {
+ t.Fatalf("failed to send status: %v", err)
+ }
+ select {
+ case <-tester.termed:
+ t.Fatalf("valid handshake disconnected")
+
+ case <-time.After(100 * time.Millisecond):
+ }
+ // Clean up the test
+ tester.stream.Close()
+
+ select {
+ case <-tester.termed:
+ case <-time.After(time.Second):
+ t.Fatalf("local close timed out")
+ }
+}
+
+func TestPeerSend(t *testing.T) {
+ // Start a tester and execute the handshake
+ tester, err := startTestPeerInited()
+ if err != nil {
+ t.Fatalf("failed to start initialized peer: %v", err)
+ }
+ defer tester.stream.Close()
+
+ // Construct a message and inject into the tester
+ message := NewMessage([]byte("peer broadcast test message"))
+ envelope, err := message.Wrap(DefaultPoW, Options{
+ TTL: DefaultTTL,
+ })
+ if err != nil {
+ t.Fatalf("failed to wrap message: %v", err)
+ }
+ if err := tester.client.Send(envelope); err != nil {
+ t.Fatalf("failed to send message: %v", err)
+ }
+ // Check that the message is eventually forwarded
+ payload := []interface{}{envelope}
+ if err := p2p.ExpectMsg(tester.stream, messagesCode, payload); err != nil {
+ t.Fatalf("message mismatch: %v", err)
+ }
+ // Make sure that even with a re-insert, an empty batch is received
+ if err := tester.client.Send(envelope); err != nil {
+ t.Fatalf("failed to send message: %v", err)
+ }
+ if err := p2p.ExpectMsg(tester.stream, messagesCode, []interface{}{}); err != nil {
+ t.Fatalf("message mismatch: %v", err)
+ }
+}
+
+func TestPeerDeliver(t *testing.T) {
+ // Start a tester and execute the handshake
+ tester, err := startTestPeerInited()
+ if err != nil {
+ t.Fatalf("failed to start initialized peer: %v", err)
+ }
+ defer tester.stream.Close()
+
+ // Watch for all inbound messages
+ arrived := make(chan struct{}, 1)
+ tester.client.Watch(Filter{
+ Fn: func(message *Message) {
+ arrived <- struct{}{}
+ },
+ })
+ // Construct a message and deliver it to the tester peer
+ message := NewMessage([]byte("peer broadcast test message"))
+ envelope, err := message.Wrap(DefaultPoW, Options{
+ TTL: DefaultTTL,
+ })
+ if err != nil {
+ t.Fatalf("failed to wrap message: %v", err)
+ }
+ if err := p2p.Send(tester.stream, messagesCode, []*Envelope{envelope}); err != nil {
+ t.Fatalf("failed to transfer message: %v", err)
+ }
+ // Check that the message is delivered upstream
+ select {
+ case <-arrived:
+ case <-time.After(time.Second):
+ t.Fatalf("message delivery timeout")
+ }
+ // Check that a resend is not delivered
+ if err := p2p.Send(tester.stream, messagesCode, []*Envelope{envelope}); err != nil {
+ t.Fatalf("failed to transfer message: %v", err)
+ }
+ select {
+ case <-time.After(2 * transmissionCycle):
+ case <-arrived:
+ t.Fatalf("repeating message arrived")
+ }
+}
+
+func TestPeerMessageExpiration(t *testing.T) {
+ // Start a tester and execute the handshake
+ tester, err := startTestPeerInited()
+ if err != nil {
+ t.Fatalf("failed to start initialized peer: %v", err)
+ }
+ defer tester.stream.Close()
+
+ // Fetch the peer instance for later inspection
+ tester.client.peerMu.RLock()
+ if peers := len(tester.client.peers); peers != 1 {
+ t.Fatalf("peer pool size mismatch: have %v, want %v", peers, 1)
+ }
+ var peer *peer
+ for peer, _ = range tester.client.peers {
+ break
+ }
+ tester.client.peerMu.RUnlock()
+
+ // Construct a message and pass it through the tester
+ message := NewMessage([]byte("peer test message"))
+ envelope, err := message.Wrap(DefaultPoW, Options{
+ TTL: time.Second,
+ })
+ if err != nil {
+ t.Fatalf("failed to wrap message: %v", err)
+ }
+ if err := tester.client.Send(envelope); err != nil {
+ t.Fatalf("failed to send message: %v", err)
+ }
+ payload := []interface{}{envelope}
+ if err := p2p.ExpectMsg(tester.stream, messagesCode, payload); err != nil {
+ // A premature empty message may have been broadcast, check the next too
+ if err := p2p.ExpectMsg(tester.stream, messagesCode, payload); err != nil {
+ t.Fatalf("message mismatch: %v", err)
+ }
+ }
+ // Check that the message is inside the cache
+ if !peer.known.Has(envelope.Hash()) {
+ t.Fatalf("message not found in cache")
+ }
+ // Discard messages until expiration and check cache again
+ exp := time.Now().Add(time.Second + 2*expirationCycle + 100*time.Millisecond)
+ for time.Now().Before(exp) {
+ if err := p2p.ExpectMsg(tester.stream, messagesCode, []interface{}{}); err != nil {
+ t.Fatalf("message mismatch: %v", err)
+ }
+ }
+ if peer.known.Has(envelope.Hash()) {
+ t.Fatalf("message not expired from cache")
+ }
+}
diff --git a/whisper/whisperv2/topic.go b/whisper/whisperv2/topic.go
new file mode 100644
index 000000000..3e2b47bd3
--- /dev/null
+++ b/whisper/whisperv2/topic.go
@@ -0,0 +1,140 @@
+// Copyright 2015 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+// Contains the Whisper protocol Topic element. For formal details please see
+// the specs at https://github.com/ethereum/wiki/wiki/Whisper-PoC-1-Protocol-Spec#topics.
+
+package whisperv2
+
+import "github.com/ethereum/go-ethereum/crypto"
+
+// Topic represents a cryptographically secure, probabilistic partial
+// classifications of a message, determined as the first (left) 4 bytes of the
+// SHA3 hash of some arbitrary data given by the original author of the message.
+type Topic [4]byte
+
+// NewTopic creates a topic from the 4 byte prefix of the SHA3 hash of the data.
+//
+// Note, empty topics are considered the wildcard, and cannot be used in messages.
+func NewTopic(data []byte) Topic {
+ prefix := [4]byte{}
+ copy(prefix[:], crypto.Keccak256(data)[:4])
+ return Topic(prefix)
+}
+
+// NewTopics creates a list of topics from a list of binary data elements, by
+// iteratively calling NewTopic on each of them.
+func NewTopics(data ...[]byte) []Topic {
+ topics := make([]Topic, len(data))
+ for i, element := range data {
+ topics[i] = NewTopic(element)
+ }
+ return topics
+}
+
+// NewTopicFromString creates a topic using the binary data contents of the
+// specified string.
+func NewTopicFromString(data string) Topic {
+ return NewTopic([]byte(data))
+}
+
+// NewTopicsFromStrings creates a list of topics from a list of textual data
+// elements, by iteratively calling NewTopicFromString on each of them.
+func NewTopicsFromStrings(data ...string) []Topic {
+ topics := make([]Topic, len(data))
+ for i, element := range data {
+ topics[i] = NewTopicFromString(element)
+ }
+ return topics
+}
+
+// String converts a topic byte array to a string representation.
+func (self *Topic) String() string {
+ return string(self[:])
+}
+
+// topicMatcher is a filter expression to verify if a list of topics contained
+// in an arriving message matches some topic conditions. The topic matcher is
+// built up of a list of conditions, each of which must be satisfied by the
+// corresponding topic in the message. Each condition may require: a) an exact
+// topic match; b) a match from a set of topics; or c) a wild-card matching all.
+//
+// If a message contains more topics than required by the matcher, those beyond
+// the condition count are ignored and assumed to match.
+//
+// Consider the following sample topic matcher:
+// sample := {
+// {TopicA1, TopicA2, TopicA3},
+// {TopicB},
+// nil,
+// {TopicD1, TopicD2}
+// }
+// In order for a message to pass this filter, it should enumerate at least 4
+// topics, the first any of [TopicA1, TopicA2, TopicA3], the second mandatory
+// "TopicB", the third is ignored by the filter and the fourth either "TopicD1"
+// or "TopicD2". If the message contains further topics, the filter will match
+// them too.
+type topicMatcher struct {
+ conditions []map[Topic]struct{}
+}
+
+// newTopicMatcher create a topic matcher from a list of topic conditions.
+func newTopicMatcher(topics ...[]Topic) *topicMatcher {
+ matcher := make([]map[Topic]struct{}, len(topics))
+ for i, condition := range topics {
+ matcher[i] = make(map[Topic]struct{})
+ for _, topic := range condition {
+ matcher[i][topic] = struct{}{}
+ }
+ }
+ return &topicMatcher{conditions: matcher}
+}
+
+// newTopicMatcherFromBinary create a topic matcher from a list of binary conditions.
+func newTopicMatcherFromBinary(data ...[][]byte) *topicMatcher {
+ topics := make([][]Topic, len(data))
+ for i, condition := range data {
+ topics[i] = NewTopics(condition...)
+ }
+ return newTopicMatcher(topics...)
+}
+
+// newTopicMatcherFromStrings creates a topic matcher from a list of textual
+// conditions.
+func newTopicMatcherFromStrings(data ...[]string) *topicMatcher {
+ topics := make([][]Topic, len(data))
+ for i, condition := range data {
+ topics[i] = NewTopicsFromStrings(condition...)
+ }
+ return newTopicMatcher(topics...)
+}
+
+// Matches checks if a list of topics matches this particular condition set.
+func (self *topicMatcher) Matches(topics []Topic) bool {
+ // Mismatch if there aren't enough topics
+ if len(self.conditions) > len(topics) {
+ return false
+ }
+ // Check each topic condition for existence (skip wild-cards)
+ for i := 0; i < len(topics) && i < len(self.conditions); i++ {
+ if len(self.conditions[i]) > 0 {
+ if _, ok := self.conditions[i][topics[i]]; !ok {
+ return false
+ }
+ }
+ }
+ return true
+}
diff --git a/whisper/whisperv2/topic_test.go b/whisper/whisperv2/topic_test.go
new file mode 100644
index 000000000..efd4a2c61
--- /dev/null
+++ b/whisper/whisperv2/topic_test.go
@@ -0,0 +1,215 @@
+// Copyright 2015 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package whisperv2
+
+import (
+ "bytes"
+ "testing"
+)
+
+var topicCreationTests = []struct {
+ data []byte
+ hash [4]byte
+}{
+ {hash: [4]byte{0x8f, 0x9a, 0x2b, 0x7d}, data: []byte("test name")},
+ {hash: [4]byte{0xf2, 0x6e, 0x77, 0x79}, data: []byte("some other test")},
+}
+
+func TestTopicCreation(t *testing.T) {
+ // Create the topics individually
+ for i, tt := range topicCreationTests {
+ topic := NewTopic(tt.data)
+ if bytes.Compare(topic[:], tt.hash[:]) != 0 {
+ t.Errorf("binary test %d: hash mismatch: have %v, want %v.", i, topic, tt.hash)
+ }
+ }
+ for i, tt := range topicCreationTests {
+ topic := NewTopicFromString(string(tt.data))
+ if bytes.Compare(topic[:], tt.hash[:]) != 0 {
+ t.Errorf("textual test %d: hash mismatch: have %v, want %v.", i, topic, tt.hash)
+ }
+ }
+ // Create the topics in batches
+ binaryData := make([][]byte, len(topicCreationTests))
+ for i, tt := range topicCreationTests {
+ binaryData[i] = tt.data
+ }
+ textualData := make([]string, len(topicCreationTests))
+ for i, tt := range topicCreationTests {
+ textualData[i] = string(tt.data)
+ }
+
+ topics := NewTopics(binaryData...)
+ for i, tt := range topicCreationTests {
+ if bytes.Compare(topics[i][:], tt.hash[:]) != 0 {
+ t.Errorf("binary batch test %d: hash mismatch: have %v, want %v.", i, topics[i], tt.hash)
+ }
+ }
+ topics = NewTopicsFromStrings(textualData...)
+ for i, tt := range topicCreationTests {
+ if bytes.Compare(topics[i][:], tt.hash[:]) != 0 {
+ t.Errorf("textual batch test %d: hash mismatch: have %v, want %v.", i, topics[i], tt.hash)
+ }
+ }
+}
+
+var topicMatcherCreationTest = struct {
+ binary [][][]byte
+ textual [][]string
+ matcher []map[[4]byte]struct{}
+}{
+ binary: [][][]byte{
+ [][]byte{},
+ [][]byte{
+ []byte("Topic A"),
+ },
+ [][]byte{
+ []byte("Topic B1"),
+ []byte("Topic B2"),
+ []byte("Topic B3"),
+ },
+ },
+ textual: [][]string{
+ []string{},
+ []string{"Topic A"},
+ []string{"Topic B1", "Topic B2", "Topic B3"},
+ },
+ matcher: []map[[4]byte]struct{}{
+ map[[4]byte]struct{}{},
+ map[[4]byte]struct{}{
+ [4]byte{0x25, 0xfc, 0x95, 0x66}: struct{}{},
+ },
+ map[[4]byte]struct{}{
+ [4]byte{0x93, 0x6d, 0xec, 0x09}: struct{}{},
+ [4]byte{0x25, 0x23, 0x34, 0xd3}: struct{}{},
+ [4]byte{0x6b, 0xc2, 0x73, 0xd1}: struct{}{},
+ },
+ },
+}
+
+func TestTopicMatcherCreation(t *testing.T) {
+ test := topicMatcherCreationTest
+
+ matcher := newTopicMatcherFromBinary(test.binary...)
+ for i, cond := range matcher.conditions {
+ for topic, _ := range cond {
+ if _, ok := test.matcher[i][topic]; !ok {
+ t.Errorf("condition %d; extra topic found: 0x%x", i, topic[:])
+ }
+ }
+ }
+ for i, cond := range test.matcher {
+ for topic, _ := range cond {
+ if _, ok := matcher.conditions[i][topic]; !ok {
+ t.Errorf("condition %d; topic not found: 0x%x", i, topic[:])
+ }
+ }
+ }
+
+ matcher = newTopicMatcherFromStrings(test.textual...)
+ for i, cond := range matcher.conditions {
+ for topic, _ := range cond {
+ if _, ok := test.matcher[i][topic]; !ok {
+ t.Errorf("condition %d; extra topic found: 0x%x", i, topic[:])
+ }
+ }
+ }
+ for i, cond := range test.matcher {
+ for topic, _ := range cond {
+ if _, ok := matcher.conditions[i][topic]; !ok {
+ t.Errorf("condition %d; topic not found: 0x%x", i, topic[:])
+ }
+ }
+ }
+}
+
+var topicMatcherTests = []struct {
+ filter [][]string
+ topics []string
+ match bool
+}{
+ // Empty topic matcher should match everything
+ {
+ filter: [][]string{},
+ topics: []string{},
+ match: true,
+ },
+ {
+ filter: [][]string{},
+ topics: []string{"a", "b", "c"},
+ match: true,
+ },
+ // Fixed topic matcher should match strictly, but only prefix
+ {
+ filter: [][]string{[]string{"a"}, []string{"b"}},
+ topics: []string{"a"},
+ match: false,
+ },
+ {
+ filter: [][]string{[]string{"a"}, []string{"b"}},
+ topics: []string{"a", "b"},
+ match: true,
+ },
+ {
+ filter: [][]string{[]string{"a"}, []string{"b"}},
+ topics: []string{"a", "b", "c"},
+ match: true,
+ },
+ // Multi-matcher should match any from a sub-group
+ {
+ filter: [][]string{[]string{"a1", "a2"}},
+ topics: []string{"a"},
+ match: false,
+ },
+ {
+ filter: [][]string{[]string{"a1", "a2"}},
+ topics: []string{"a1"},
+ match: true,
+ },
+ {
+ filter: [][]string{[]string{"a1", "a2"}},
+ topics: []string{"a2"},
+ match: true,
+ },
+ // Wild-card condition should match anything
+ {
+ filter: [][]string{[]string{}, []string{"b"}},
+ topics: []string{"a"},
+ match: false,
+ },
+ {
+ filter: [][]string{[]string{}, []string{"b"}},
+ topics: []string{"a", "b"},
+ match: true,
+ },
+ {
+ filter: [][]string{[]string{}, []string{"b"}},
+ topics: []string{"b", "b"},
+ match: true,
+ },
+}
+
+func TestTopicMatcher(t *testing.T) {
+ for i, tt := range topicMatcherTests {
+ topics := NewTopicsFromStrings(tt.topics...)
+
+ matcher := newTopicMatcherFromStrings(tt.filter...)
+ if match := matcher.Matches(topics); match != tt.match {
+ t.Errorf("test %d: match mismatch: have %v, want %v", i, match, tt.match)
+ }
+ }
+}
diff --git a/whisper/whisperv2/whisper.go b/whisper/whisperv2/whisper.go
new file mode 100644
index 000000000..d9054959e
--- /dev/null
+++ b/whisper/whisperv2/whisper.go
@@ -0,0 +1,378 @@
+// Copyright 2014 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package whisperv2
+
+import (
+ "crypto/ecdsa"
+ "sync"
+ "time"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/crypto"
+ "github.com/ethereum/go-ethereum/crypto/ecies"
+ "github.com/ethereum/go-ethereum/event/filter"
+ "github.com/ethereum/go-ethereum/logger"
+ "github.com/ethereum/go-ethereum/logger/glog"
+ "github.com/ethereum/go-ethereum/p2p"
+ "github.com/ethereum/go-ethereum/rpc"
+
+ "gopkg.in/fatih/set.v0"
+)
+
+const (
+ statusCode = 0x00
+ messagesCode = 0x01
+
+ protocolVersion uint64 = 0x02
+ protocolName = "shh"
+
+ signatureFlag = byte(1 << 7)
+ signatureLength = 65
+
+ expirationCycle = 800 * time.Millisecond
+ transmissionCycle = 300 * time.Millisecond
+)
+
+const (
+ DefaultTTL = 50 * time.Second
+ DefaultPoW = 50 * time.Millisecond
+)
+
+type MessageEvent struct {
+ To *ecdsa.PrivateKey
+ From *ecdsa.PublicKey
+ Message *Message
+}
+
+// Whisper represents a dark communication interface through the Ethereum
+// network, using its very own P2P communication layer.
+type Whisper struct {
+ protocol p2p.Protocol
+ filters *filter.Filters
+
+ keys map[string]*ecdsa.PrivateKey
+
+ messages map[common.Hash]*Envelope // Pool of messages currently tracked by this node
+ expirations map[uint32]*set.SetNonTS // Message expiration pool (TODO: something lighter)
+ poolMu sync.RWMutex // Mutex to sync the message and expiration pools
+
+ peers map[*peer]struct{} // Set of currently active peers
+ peerMu sync.RWMutex // Mutex to sync the active peer set
+
+ quit chan struct{}
+}
+
+// New creates a Whisper client ready to communicate through the Ethereum P2P
+// network.
+func New() *Whisper {
+ whisper := &Whisper{
+ filters: filter.New(),
+ keys: make(map[string]*ecdsa.PrivateKey),
+ messages: make(map[common.Hash]*Envelope),
+ expirations: make(map[uint32]*set.SetNonTS),
+ peers: make(map[*peer]struct{}),
+ quit: make(chan struct{}),
+ }
+ whisper.filters.Start()
+
+ // p2p whisper sub protocol handler
+ whisper.protocol = p2p.Protocol{
+ Name: protocolName,
+ Version: uint(protocolVersion),
+ Length: 2,
+ Run: whisper.handlePeer,
+ }
+
+ return whisper
+}
+
+// APIs returns the RPC descriptors the Whisper implementation offers
+func (s *Whisper) APIs() []rpc.API {
+ return []rpc.API{
+ {
+ Namespace: "shh",
+ Version: "1.0",
+ Service: NewPublicWhisperAPI(s),
+ Public: true,
+ },
+ }
+}
+
+// Protocols returns the whisper sub-protocols ran by this particular client.
+func (self *Whisper) Protocols() []p2p.Protocol {
+ return []p2p.Protocol{self.protocol}
+}
+
+// Version returns the whisper sub-protocols version number.
+func (self *Whisper) Version() uint {
+ return self.protocol.Version
+}
+
+// NewIdentity generates a new cryptographic identity for the client, and injects
+// it into the known identities for message decryption.
+func (self *Whisper) NewIdentity() *ecdsa.PrivateKey {
+ key, err := crypto.GenerateKey()
+ if err != nil {
+ panic(err)
+ }
+ self.keys[string(crypto.FromECDSAPub(&key.PublicKey))] = key
+
+ return key
+}
+
+// HasIdentity checks if the the whisper node is configured with the private key
+// of the specified public pair.
+func (self *Whisper) HasIdentity(key *ecdsa.PublicKey) bool {
+ return self.keys[string(crypto.FromECDSAPub(key))] != nil
+}
+
+// GetIdentity retrieves the private key of the specified public identity.
+func (self *Whisper) GetIdentity(key *ecdsa.PublicKey) *ecdsa.PrivateKey {
+ return self.keys[string(crypto.FromECDSAPub(key))]
+}
+
+// Watch installs a new message handler to run in case a matching packet arrives
+// from the whisper network.
+func (self *Whisper) Watch(options Filter) int {
+ filter := filterer{
+ to: string(crypto.FromECDSAPub(options.To)),
+ from: string(crypto.FromECDSAPub(options.From)),
+ matcher: newTopicMatcher(options.Topics...),
+ fn: func(data interface{}) {
+ options.Fn(data.(*Message))
+ },
+ }
+ return self.filters.Install(filter)
+}
+
+// Unwatch removes an installed message handler.
+func (self *Whisper) Unwatch(id int) {
+ self.filters.Uninstall(id)
+}
+
+// Send injects a message into the whisper send queue, to be distributed in the
+// network in the coming cycles.
+func (self *Whisper) Send(envelope *Envelope) error {
+ return self.add(envelope)
+}
+
+// Start implements node.Service, starting the background data propagation thread
+// of the Whisper protocol.
+func (self *Whisper) Start(*p2p.Server) error {
+ glog.V(logger.Info).Infoln("Whisper started")
+ go self.update()
+ return nil
+}
+
+// Stop implements node.Service, stopping the background data propagation thread
+// of the Whisper protocol.
+func (self *Whisper) Stop() error {
+ close(self.quit)
+ glog.V(logger.Info).Infoln("Whisper stopped")
+ return nil
+}
+
+// Messages retrieves all the currently pooled messages matching a filter id.
+func (self *Whisper) Messages(id int) []*Message {
+ messages := make([]*Message, 0)
+ if filter := self.filters.Get(id); filter != nil {
+ for _, envelope := range self.messages {
+ if message := self.open(envelope); message != nil {
+ if self.filters.Match(filter, createFilter(message, envelope.Topics)) {
+ messages = append(messages, message)
+ }
+ }
+ }
+ }
+ return messages
+}
+
+// handlePeer is called by the underlying P2P layer when the whisper sub-protocol
+// connection is negotiated.
+func (self *Whisper) handlePeer(peer *p2p.Peer, rw p2p.MsgReadWriter) error {
+ // Create the new peer and start tracking it
+ whisperPeer := newPeer(self, peer, rw)
+
+ self.peerMu.Lock()
+ self.peers[whisperPeer] = struct{}{}
+ self.peerMu.Unlock()
+
+ defer func() {
+ self.peerMu.Lock()
+ delete(self.peers, whisperPeer)
+ self.peerMu.Unlock()
+ }()
+
+ // Run the peer handshake and state updates
+ if err := whisperPeer.handshake(); err != nil {
+ return err
+ }
+ whisperPeer.start()
+ defer whisperPeer.stop()
+
+ // Read and process inbound messages directly to merge into client-global state
+ for {
+ // Fetch the next packet and decode the contained envelopes
+ packet, err := rw.ReadMsg()
+ if err != nil {
+ return err
+ }
+ var envelopes []*Envelope
+ if err := packet.Decode(&envelopes); err != nil {
+ glog.V(logger.Info).Infof("%v: failed to decode envelope: %v", peer, err)
+ continue
+ }
+ // Inject all envelopes into the internal pool
+ for _, envelope := range envelopes {
+ if err := self.add(envelope); err != nil {
+ // TODO Punish peer here. Invalid envelope.
+ glog.V(logger.Debug).Infof("%v: failed to pool envelope: %v", peer, err)
+ }
+ whisperPeer.mark(envelope)
+ }
+ }
+}
+
+// add inserts a new envelope into the message pool to be distributed within the
+// whisper network. It also inserts the envelope into the expiration pool at the
+// appropriate time-stamp.
+func (self *Whisper) add(envelope *Envelope) error {
+ self.poolMu.Lock()
+ defer self.poolMu.Unlock()
+
+ // short circuit when a received envelope has already expired
+ if envelope.Expiry < uint32(time.Now().Unix()) {
+ return nil
+ }
+
+ // Insert the message into the tracked pool
+ hash := envelope.Hash()
+ if _, ok := self.messages[hash]; ok {
+ glog.V(logger.Detail).Infof("whisper envelope already cached: %x\n", envelope)
+ return nil
+ }
+ self.messages[hash] = envelope
+
+ // Insert the message into the expiration pool for later removal
+ if self.expirations[envelope.Expiry] == nil {
+ self.expirations[envelope.Expiry] = set.NewNonTS()
+ }
+ if !self.expirations[envelope.Expiry].Has(hash) {
+ self.expirations[envelope.Expiry].Add(hash)
+
+ // Notify the local node of a message arrival
+ go self.postEvent(envelope)
+ }
+ glog.V(logger.Detail).Infof("cached whisper envelope %x\n", envelope)
+ return nil
+}
+
+// postEvent opens an envelope with the configured identities and delivers the
+// message upstream from application processing.
+func (self *Whisper) postEvent(envelope *Envelope) {
+ if message := self.open(envelope); message != nil {
+ self.filters.Notify(createFilter(message, envelope.Topics), message)
+ }
+}
+
+// open tries to decrypt a whisper envelope with all the configured identities,
+// returning the decrypted message and the key used to achieve it. If not keys
+// are configured, open will return the payload as if non encrypted.
+func (self *Whisper) open(envelope *Envelope) *Message {
+ // Short circuit if no identity is set, and assume clear-text
+ if len(self.keys) == 0 {
+ if message, err := envelope.Open(nil); err == nil {
+ return message
+ }
+ }
+ // Iterate over the keys and try to decrypt the message
+ for _, key := range self.keys {
+ message, err := envelope.Open(key)
+ if err == nil {
+ message.To = &key.PublicKey
+ return message
+ } else if err == ecies.ErrInvalidPublicKey {
+ return message
+ }
+ }
+ // Failed to decrypt, don't return anything
+ return nil
+}
+
+// createFilter creates a message filter to check against installed handlers.
+func createFilter(message *Message, topics []Topic) filter.Filter {
+ matcher := make([][]Topic, len(topics))
+ for i, topic := range topics {
+ matcher[i] = []Topic{topic}
+ }
+ return filterer{
+ to: string(crypto.FromECDSAPub(message.To)),
+ from: string(crypto.FromECDSAPub(message.Recover())),
+ matcher: newTopicMatcher(matcher...),
+ }
+}
+
+// update loops until the lifetime of the whisper node, updating its internal
+// state by expiring stale messages from the pool.
+func (self *Whisper) update() {
+ // Start a ticker to check for expirations
+ expire := time.NewTicker(expirationCycle)
+
+ // Repeat updates until termination is requested
+ for {
+ select {
+ case <-expire.C:
+ self.expire()
+
+ case <-self.quit:
+ return
+ }
+ }
+}
+
+// expire iterates over all the expiration timestamps, removing all stale
+// messages from the pools.
+func (self *Whisper) expire() {
+ self.poolMu.Lock()
+ defer self.poolMu.Unlock()
+
+ now := uint32(time.Now().Unix())
+ for then, hashSet := range self.expirations {
+ // Short circuit if a future time
+ if then > now {
+ continue
+ }
+ // Dump all expired messages and remove timestamp
+ hashSet.Each(func(v interface{}) bool {
+ delete(self.messages, v.(common.Hash))
+ return true
+ })
+ self.expirations[then].Clear()
+ }
+}
+
+// envelopes retrieves all the messages currently pooled by the node.
+func (self *Whisper) envelopes() []*Envelope {
+ self.poolMu.RLock()
+ defer self.poolMu.RUnlock()
+
+ envelopes := make([]*Envelope, 0, len(self.messages))
+ for _, envelope := range self.messages {
+ envelopes = append(envelopes, envelope)
+ }
+ return envelopes
+}
diff --git a/whisper/whisperv2/whisper_test.go b/whisper/whisperv2/whisper_test.go
new file mode 100644
index 000000000..1e0d3f85d
--- /dev/null
+++ b/whisper/whisperv2/whisper_test.go
@@ -0,0 +1,216 @@
+// Copyright 2014 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package whisperv2
+
+import (
+ "testing"
+ "time"
+
+ "github.com/ethereum/go-ethereum/p2p"
+ "github.com/ethereum/go-ethereum/p2p/discover"
+)
+
+func startTestCluster(n int) []*Whisper {
+ // Create the batch of simulated peers
+ nodes := make([]*p2p.Peer, n)
+ for i := 0; i < n; i++ {
+ nodes[i] = p2p.NewPeer(discover.NodeID{}, "", nil)
+ }
+ whispers := make([]*Whisper, n)
+ for i := 0; i < n; i++ {
+ whispers[i] = New()
+ whispers[i].Start(nil)
+ }
+ // Wire all the peers to the root one
+ for i := 1; i < n; i++ {
+ src, dst := p2p.MsgPipe()
+
+ go whispers[0].handlePeer(nodes[i], src)
+ go whispers[i].handlePeer(nodes[0], dst)
+ }
+ return whispers
+}
+
+func TestSelfMessage(t *testing.T) {
+ // Start the single node cluster
+ client := startTestCluster(1)[0]
+
+ // Start watching for self messages, signal any arrivals
+ self := client.NewIdentity()
+ done := make(chan struct{})
+
+ client.Watch(Filter{
+ To: &self.PublicKey,
+ Fn: func(msg *Message) {
+ close(done)
+ },
+ })
+ // Send a dummy message to oneself
+ msg := NewMessage([]byte("self whisper"))
+ envelope, err := msg.Wrap(DefaultPoW, Options{
+ From: self,
+ To: &self.PublicKey,
+ TTL: DefaultTTL,
+ })
+ if err != nil {
+ t.Fatalf("failed to wrap message: %v", err)
+ }
+ // Dump the message into the system and wait for it to pop back out
+ if err := client.Send(envelope); err != nil {
+ t.Fatalf("failed to send self-message: %v", err)
+ }
+ select {
+ case <-done:
+ case <-time.After(time.Second):
+ t.Fatalf("self-message receive timeout")
+ }
+}
+
+func TestDirectMessage(t *testing.T) {
+ // Start the sender-recipient cluster
+ cluster := startTestCluster(2)
+
+ sender := cluster[0]
+ senderId := sender.NewIdentity()
+
+ recipient := cluster[1]
+ recipientId := recipient.NewIdentity()
+
+ // Watch for arriving messages on the recipient
+ done := make(chan struct{})
+ recipient.Watch(Filter{
+ To: &recipientId.PublicKey,
+ Fn: func(msg *Message) {
+ close(done)
+ },
+ })
+ // Send a dummy message from the sender
+ msg := NewMessage([]byte("direct whisper"))
+ envelope, err := msg.Wrap(DefaultPoW, Options{
+ From: senderId,
+ To: &recipientId.PublicKey,
+ TTL: DefaultTTL,
+ })
+ if err != nil {
+ t.Fatalf("failed to wrap message: %v", err)
+ }
+ if err := sender.Send(envelope); err != nil {
+ t.Fatalf("failed to send direct message: %v", err)
+ }
+ // Wait for an arrival or a timeout
+ select {
+ case <-done:
+ case <-time.After(time.Second):
+ t.Fatalf("direct message receive timeout")
+ }
+}
+
+func TestAnonymousBroadcast(t *testing.T) {
+ testBroadcast(true, t)
+}
+
+func TestIdentifiedBroadcast(t *testing.T) {
+ testBroadcast(false, t)
+}
+
+func testBroadcast(anonymous bool, t *testing.T) {
+ // Start the single sender multi recipient cluster
+ cluster := startTestCluster(3)
+
+ sender := cluster[1]
+ targets := cluster[1:]
+ for _, target := range targets {
+ if !anonymous {
+ target.NewIdentity()
+ }
+ }
+ // Watch for arriving messages on the recipients
+ dones := make([]chan struct{}, len(targets))
+ for i := 0; i < len(targets); i++ {
+ done := make(chan struct{}) // need for the closure
+ dones[i] = done
+
+ targets[i].Watch(Filter{
+ Topics: NewFilterTopicsFromStringsFlat("broadcast topic"),
+ Fn: func(msg *Message) {
+ close(done)
+ },
+ })
+ }
+ // Send a dummy message from the sender
+ msg := NewMessage([]byte("broadcast whisper"))
+ envelope, err := msg.Wrap(DefaultPoW, Options{
+ Topics: NewTopicsFromStrings("broadcast topic"),
+ TTL: DefaultTTL,
+ })
+ if err != nil {
+ t.Fatalf("failed to wrap message: %v", err)
+ }
+ if err := sender.Send(envelope); err != nil {
+ t.Fatalf("failed to send broadcast message: %v", err)
+ }
+ // Wait for an arrival on each recipient, or timeouts
+ timeout := time.After(time.Second)
+ for _, done := range dones {
+ select {
+ case <-done:
+ case <-timeout:
+ t.Fatalf("broadcast message receive timeout")
+ }
+ }
+}
+
+func TestMessageExpiration(t *testing.T) {
+ // Start the single node cluster and inject a dummy message
+ node := startTestCluster(1)[0]
+
+ message := NewMessage([]byte("expiring message"))
+ envelope, err := message.Wrap(DefaultPoW, Options{TTL: time.Second})
+ if err != nil {
+ t.Fatalf("failed to wrap message: %v", err)
+ }
+ if err := node.Send(envelope); err != nil {
+ t.Fatalf("failed to inject message: %v", err)
+ }
+ // Check that the message is inside the cache
+ node.poolMu.RLock()
+ _, found := node.messages[envelope.Hash()]
+ node.poolMu.RUnlock()
+
+ if !found {
+ t.Fatalf("message not found in cache")
+ }
+ // Wait for expiration and check cache again
+ time.Sleep(time.Second) // wait for expiration
+ time.Sleep(2 * expirationCycle) // wait for cleanup cycle
+
+ node.poolMu.RLock()
+ _, found = node.messages[envelope.Hash()]
+ node.poolMu.RUnlock()
+ if found {
+ t.Fatalf("message not expired from cache")
+ }
+
+ // Check that adding an expired envelope doesn't do anything.
+ node.add(envelope)
+ node.poolMu.RLock()
+ _, found = node.messages[envelope.Hash()]
+ node.poolMu.RUnlock()
+ if found {
+ t.Fatalf("message was added to cache")
+ }
+}