diff options
Diffstat (limited to 'whisper')
-rw-r--r-- | whisper/envelope.go | 29 | ||||
-rw-r--r-- | whisper/filter.go | 11 | ||||
-rw-r--r-- | whisper/main.go | 4 | ||||
-rw-r--r-- | whisper/message.go | 26 | ||||
-rw-r--r-- | whisper/message_test.go | 24 | ||||
-rw-r--r-- | whisper/peer.go | 172 | ||||
-rw-r--r-- | whisper/peer_test.go | 242 | ||||
-rw-r--r-- | whisper/sort.go | 29 | ||||
-rw-r--r-- | whisper/sort_test.go | 23 | ||||
-rw-r--r-- | whisper/topic.go | 61 | ||||
-rw-r--r-- | whisper/topic_test.go | 67 | ||||
-rw-r--r-- | whisper/util.go | 36 | ||||
-rw-r--r-- | whisper/whisper.go | 334 | ||||
-rw-r--r-- | whisper/whisper_test.go | 189 |
14 files changed, 895 insertions, 352 deletions
diff --git a/whisper/envelope.go b/whisper/envelope.go index f35a40a42..0a817e26e 100644 --- a/whisper/envelope.go +++ b/whisper/envelope.go @@ -20,16 +20,16 @@ import ( type Envelope struct { Expiry uint32 // Whisper protocol specifies int32, really should be int64 TTL uint32 // ^^^^^^ - Topics [][]byte + Topics []Topic Data []byte Nonce uint32 - hash common.Hash + hash common.Hash // Cached hash of the envelope to avoid rehashing every time } // NewEnvelope wraps a Whisper message with expiration and destination data // included into an envelope for network forwarding. -func NewEnvelope(ttl time.Duration, topics [][]byte, msg *Message) *Envelope { +func NewEnvelope(ttl time.Duration, topics []Topic, msg *Message) *Envelope { return &Envelope{ Expiry: uint32(time.Now().Add(ttl).Unix()), TTL: uint32(ttl.Seconds()), @@ -59,16 +59,6 @@ func (self *Envelope) Seal(pow time.Duration) { } } -// valid checks whether the claimed proof of work was indeed executed. -// TODO: Is this really useful? Isn't this always true? -func (self *Envelope) valid() bool { - d := make([]byte, 64) - copy(d[:32], self.rlpWithoutNonce()) - binary.BigEndian.PutUint32(d[60:], self.Nonce) - - return common.FirstBitSet(common.BigD(crypto.Sha3(d))) > 0 -} - // rlpWithoutNonce returns the RLP encoded envelope contents, except the nonce. func (self *Envelope) rlpWithoutNonce() []byte { enc, _ := rlp.EncodeToBytes([]interface{}{self.Expiry, self.TTL, self.Topics, self.Data}) @@ -85,20 +75,19 @@ func (self *Envelope) Open(key *ecdsa.PrivateKey) (msg *Message, err error) { } data = data[1:] - if message.Flags&128 == 128 { - if len(data) < 65 { - return nil, fmt.Errorf("unable to open envelope. First bit set but len(data) < 65") + if message.Flags&signatureFlag == signatureFlag { + if len(data) < signatureLength { + return nil, fmt.Errorf("unable to open envelope. First bit set but len(data) < len(signature)") } - message.Signature, data = data[:65], data[65:] + message.Signature, data = data[:signatureLength], data[signatureLength:] } message.Payload = data - // Short circuit if the encryption was requested + // Decrypt the message, if requested if key == nil { return message, nil } - // Otherwise try to decrypt the message - message.Payload, err = crypto.Decrypt(key, message.Payload) + err = message.decrypt(key) switch err { case nil: return message, nil diff --git a/whisper/filter.go b/whisper/filter.go index b33f2c1a2..8fcc45afd 100644 --- a/whisper/filter.go +++ b/whisper/filter.go @@ -1,10 +1,13 @@ +// Contains the message filter for fine grained subscriptions. + package whisper import "crypto/ecdsa" +// Filter is used to subscribe to specific types of whisper messages. type Filter struct { - To *ecdsa.PublicKey - From *ecdsa.PublicKey - Topics [][]byte - Fn func(*Message) + To *ecdsa.PublicKey // Recipient of the message + From *ecdsa.PublicKey // Sender of the message + Topics []Topic // Topics to watch messages on + Fn func(*Message) // Handler in case of a match } diff --git a/whisper/main.go b/whisper/main.go index 422f0fa3b..3c8c3801f 100644 --- a/whisper/main.go +++ b/whisper/main.go @@ -69,10 +69,10 @@ func selfSend(shh *whisper.Whisper, payload []byte) error { }) // Wrap the payload and encrypt it msg := whisper.NewMessage(payload) - envelope, err := msg.Wrap(whisper.DefaultProofOfWork, whisper.Options{ + envelope, err := msg.Wrap(whisper.DefaultPoW, whisper.Options{ From: id, To: &id.PublicKey, - TTL: whisper.DefaultTimeToLive, + TTL: whisper.DefaultTTL, }) if err != nil { return fmt.Errorf("failed to seal message: %v", err) diff --git a/whisper/message.go b/whisper/message.go index 2666ee6e0..07c673567 100644 --- a/whisper/message.go +++ b/whisper/message.go @@ -30,13 +30,14 @@ type Options struct { From *ecdsa.PrivateKey To *ecdsa.PublicKey TTL time.Duration - Topics [][]byte + Topics []Topic } // NewMessage creates and initializes a non-signed, non-encrypted Whisper message. func NewMessage(payload []byte) *Message { - // Construct an initial flag set: bit #1 = 0 (no signature), rest random - flags := byte(rand.Intn(128)) + // Construct an initial flag set: no signature, rest random + flags := byte(rand.Intn(256)) + flags &= ^signatureFlag // Assemble and return the message return &Message{ @@ -61,7 +62,7 @@ func NewMessage(payload []byte) *Message { func (self *Message) Wrap(pow time.Duration, options Options) (*Envelope, error) { // Use the default TTL if non was specified if options.TTL == 0 { - options.TTL = DefaultTimeToLive + options.TTL = DefaultTTL } // Sign and encrypt the message if requested if options.From != nil { @@ -84,7 +85,7 @@ func (self *Message) Wrap(pow time.Duration, options Options) (*Envelope, error) // sign calculates and sets the cryptographic signature for the message , also // setting the sign flag. func (self *Message) sign(key *ecdsa.PrivateKey) (err error) { - self.Flags |= 1 << 7 + self.Flags |= signatureFlag self.Signature, err = crypto.Sign(self.hash(), key) return } @@ -93,6 +94,11 @@ func (self *Message) sign(key *ecdsa.PrivateKey) (err error) { func (self *Message) Recover() *ecdsa.PublicKey { defer func() { recover() }() // in case of invalid signature + // Short circuit if no signature is present + if self.Signature == nil { + return nil + } + // Otherwise try and recover the signature pub, err := crypto.SigToPub(self.hash(), self.Signature) if err != nil { glog.V(logger.Error).Infof("Could not get public key from signature: %v", err) @@ -102,8 +108,14 @@ func (self *Message) Recover() *ecdsa.PublicKey { } // encrypt encrypts a message payload with a public key. -func (self *Message) encrypt(to *ecdsa.PublicKey) (err error) { - self.Payload, err = crypto.Encrypt(to, self.Payload) +func (self *Message) encrypt(key *ecdsa.PublicKey) (err error) { + self.Payload, err = crypto.Encrypt(key, self.Payload) + return +} + +// decrypt decrypts an encrypted payload with a private key. +func (self *Message) decrypt(key *ecdsa.PrivateKey) (err error) { + self.Payload, err = crypto.Decrypt(key, self.Payload) return } diff --git a/whisper/message_test.go b/whisper/message_test.go index 8d4c5e990..18a254e5c 100644 --- a/whisper/message_test.go +++ b/whisper/message_test.go @@ -13,11 +13,11 @@ func TestMessageSimpleWrap(t *testing.T) { payload := []byte("hello world") msg := NewMessage(payload) - if _, err := msg.Wrap(DefaultProofOfWork, Options{}); err != nil { + if _, err := msg.Wrap(DefaultPoW, Options{}); err != nil { t.Fatalf("failed to wrap message: %v", err) } - if msg.Flags&128 != 0 { - t.Fatalf("signature flag mismatch: have %d, want %d", (msg.Flags&128)>>7, 0) + if msg.Flags&signatureFlag != 0 { + t.Fatalf("signature flag mismatch: have %d, want %d", msg.Flags&signatureFlag, 0) } if len(msg.Signature) != 0 { t.Fatalf("signature found for simple wrapping: 0x%x", msg.Signature) @@ -36,13 +36,13 @@ func TestMessageCleartextSignRecover(t *testing.T) { payload := []byte("hello world") msg := NewMessage(payload) - if _, err := msg.Wrap(DefaultProofOfWork, Options{ + if _, err := msg.Wrap(DefaultPoW, Options{ From: key, }); err != nil { t.Fatalf("failed to sign message: %v", err) } - if msg.Flags&128 != 128 { - t.Fatalf("signature flag mismatch: have %d, want %d", (msg.Flags&128)>>7, 1) + if msg.Flags&signatureFlag != signatureFlag { + t.Fatalf("signature flag mismatch: have %d, want %d", msg.Flags&signatureFlag, signatureFlag) } if bytes.Compare(msg.Payload, payload) != 0 { t.Fatalf("payload mismatch after signing: have 0x%x, want 0x%x", msg.Payload, payload) @@ -69,14 +69,14 @@ func TestMessageAnonymousEncryptDecrypt(t *testing.T) { payload := []byte("hello world") msg := NewMessage(payload) - envelope, err := msg.Wrap(DefaultProofOfWork, Options{ + envelope, err := msg.Wrap(DefaultPoW, Options{ To: &key.PublicKey, }) if err != nil { t.Fatalf("failed to encrypt message: %v", err) } - if msg.Flags&128 != 0 { - t.Fatalf("signature flag mismatch: have %d, want %d", (msg.Flags&128)>>7, 0) + if msg.Flags&signatureFlag != 0 { + t.Fatalf("signature flag mismatch: have %d, want %d", msg.Flags&signatureFlag, 0) } if len(msg.Signature) != 0 { t.Fatalf("signature found for anonymous message: 0x%x", msg.Signature) @@ -104,15 +104,15 @@ func TestMessageFullCrypto(t *testing.T) { payload := []byte("hello world") msg := NewMessage(payload) - envelope, err := msg.Wrap(DefaultProofOfWork, Options{ + envelope, err := msg.Wrap(DefaultPoW, Options{ From: fromKey, To: &toKey.PublicKey, }) if err != nil { t.Fatalf("failed to encrypt message: %v", err) } - if msg.Flags&128 != 128 { - t.Fatalf("signature flag mismatch: have %d, want %d", (msg.Flags&128)>>7, 1) + if msg.Flags&signatureFlag != signatureFlag { + t.Fatalf("signature flag mismatch: have %d, want %d", msg.Flags&signatureFlag, signatureFlag) } if len(msg.Signature) == 0 { t.Fatalf("no signature found for signed message") diff --git a/whisper/peer.go b/whisper/peer.go index 338166c25..e4301f37c 100644 --- a/whisper/peer.go +++ b/whisper/peer.go @@ -4,110 +4,160 @@ import ( "fmt" "time" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/rlp" "gopkg.in/fatih/set.v0" ) -const ( - protocolVersion uint64 = 0x02 -) - +// peer represents a whisper protocol peer connection. type peer struct { host *Whisper peer *p2p.Peer ws p2p.MsgReadWriter - // XXX Eventually this is going to reach exceptional large space. We need an expiry here - known *set.Set + known *set.Set // Messages already known by the peer to avoid wasting bandwidth quit chan struct{} } -func NewPeer(host *Whisper, p *p2p.Peer, ws p2p.MsgReadWriter) *peer { - return &peer{host, p, ws, set.New(), make(chan struct{})} -} - -func (self *peer) init() error { - if err := self.handleStatus(); err != nil { - return err +// newPeer creates and initializes a new whisper peer connection, returning either +// the newly constructed link or a failure reason. +func newPeer(host *Whisper, remote *p2p.Peer, rw p2p.MsgReadWriter) (*peer, error) { + p := &peer{ + host: host, + peer: remote, + ws: rw, + known: set.New(), + quit: make(chan struct{}), } - - return nil + if err := p.handshake(); err != nil { + return nil, err + } + return p, nil } +// start initiates the peer updater, periodically broadcasting the whisper packets +// into the network. func (self *peer) start() { go self.update() self.peer.Debugln("whisper started") } +// stop terminates the peer updater, stopping message forwarding to it. func (self *peer) stop() { + close(self.quit) self.peer.Debugln("whisper stopped") +} - close(self.quit) +// handshake sends the protocol initiation status message to the remote peer and +// verifies the remote status too. +func (self *peer) handshake() error { + // Send the handshake status message asynchronously + errc := make(chan error, 1) + go func() { + errc <- p2p.SendItems(self.ws, statusCode, protocolVersion) + }() + // Fetch the remote status packet and verify protocol match + packet, err := self.ws.ReadMsg() + if err != nil { + return err + } + if packet.Code != statusCode { + return fmt.Errorf("peer sent %x before status packet", packet.Code) + } + s := rlp.NewStream(packet.Payload) + if _, err := s.List(); err != nil { + return fmt.Errorf("bad status message: %v", err) + } + peerVersion, err := s.Uint() + if err != nil { + return fmt.Errorf("bad status message: %v", err) + } + if peerVersion != protocolVersion { + return fmt.Errorf("protocol version mismatch %d != %d", peerVersion, protocolVersion) + } + // Wait until out own status is consumed too + if err := <-errc; err != nil { + return fmt.Errorf("failed to send status packet: %v", err) + } + return nil } +// update executes periodic operations on the peer, including message transmission +// and expiration. func (self *peer) update() { - relay := time.NewTicker(300 * time.Millisecond) -out: + // Start the tickers for the updates + expire := time.NewTicker(expirationCycle) + transmit := time.NewTicker(transmissionCycle) + + // Loop and transmit until termination is requested for { select { - case <-relay.C: - err := self.broadcast(self.host.envelopes()) - if err != nil { - self.peer.Infoln("broadcast err:", err) - break out + case <-expire.C: + self.expire() + + case <-transmit.C: + if err := self.broadcast(); err != nil { + self.peer.Infoln("broadcast failed:", err) + return } case <-self.quit: - break out + return } } } -func (self *peer) broadcast(envelopes []*Envelope) error { - envs := make([]*Envelope, 0, len(envelopes)) - for _, env := range envelopes { - if !self.known.Has(env.Hash()) { - envs = append(envs, env) - self.known.Add(env.Hash()) - } - } - if len(envs) > 0 { - if err := p2p.Send(self.ws, envelopesMsg, envs); err != nil { - return err - } - self.peer.DebugDetailln("broadcasted", len(envs), "message(s)") - } - return nil +// mark marks an envelope known to the peer so that it won't be sent back. +func (self *peer) mark(envelope *Envelope) { + self.known.Add(envelope.Hash()) } -func (self *peer) addKnown(envelope *Envelope) { - self.known.Add(envelope.Hash()) +// marked checks if an envelope is already known to the remote peer. +func (self *peer) marked(envelope *Envelope) bool { + return self.known.Has(envelope.Hash()) } -func (self *peer) handleStatus() error { - ws := self.ws - if err := p2p.SendItems(ws, statusMsg, protocolVersion); err != nil { - return err - } - msg, err := ws.ReadMsg() - if err != nil { - return err +// expire iterates over all the known envelopes in the host and removes all +// expired (unknown) ones from the known list. +func (self *peer) expire() { + // Assemble the list of available envelopes + available := set.NewNonTS() + for _, envelope := range self.host.envelopes() { + available.Add(envelope.Hash()) } - if msg.Code != statusMsg { - return fmt.Errorf("peer send %x before status msg", msg.Code) - } - s := rlp.NewStream(msg.Payload) - if _, err := s.List(); err != nil { - return fmt.Errorf("bad status message: %v", err) + // Cross reference availability with known status + unmark := make(map[common.Hash]struct{}) + self.known.Each(func(v interface{}) bool { + if !available.Has(v.(common.Hash)) { + unmark[v.(common.Hash)] = struct{}{} + } + return true + }) + // Dump all known but unavailable + for hash, _ := range unmark { + self.known.Remove(hash) } - pv, err := s.Uint() - if err != nil { - return fmt.Errorf("bad status message: %v", err) +} + +// broadcast iterates over the collection of envelopes and transmits yet unknown +// ones over the network. +func (self *peer) broadcast() error { + // Fetch the envelopes and collect the unknown ones + envelopes := self.host.envelopes() + transmit := make([]*Envelope, 0, len(envelopes)) + for _, envelope := range envelopes { + if !self.marked(envelope) { + transmit = append(transmit, envelope) + self.mark(envelope) + } } - if pv != protocolVersion { - return fmt.Errorf("protocol version mismatch %d != %d", pv, protocolVersion) + // Transmit the unknown batch (potentially empty) + if err := p2p.Send(self.ws, messagesCode, transmit); err != nil { + return err } - return msg.Discard() // ignore anything after protocol version + self.peer.DebugDetailln("broadcasted", len(transmit), "message(s)") + + return nil } diff --git a/whisper/peer_test.go b/whisper/peer_test.go new file mode 100644 index 000000000..9008cdc59 --- /dev/null +++ b/whisper/peer_test.go @@ -0,0 +1,242 @@ +package whisper + +import ( + "testing" + "time" + + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/discover" +) + +type testPeer struct { + client *Whisper + stream *p2p.MsgPipeRW + termed chan struct{} +} + +func startTestPeer() *testPeer { + // Create a simulated P2P remote peer and data streams to it + remote := p2p.NewPeer(discover.NodeID{}, "", nil) + tester, tested := p2p.MsgPipe() + + // Create a whisper client and connect with it to the tester peer + client := New() + client.Start() + + termed := make(chan struct{}) + go func() { + defer client.Stop() + defer close(termed) + defer tested.Close() + + client.handlePeer(remote, tested) + }() + + return &testPeer{ + client: client, + stream: tester, + termed: termed, + } +} + +func startTestPeerInited() (*testPeer, error) { + peer := startTestPeer() + + if err := p2p.ExpectMsg(peer.stream, statusCode, []uint64{protocolVersion}); err != nil { + peer.stream.Close() + return nil, err + } + if err := p2p.SendItems(peer.stream, statusCode, protocolVersion); err != nil { + peer.stream.Close() + return nil, err + } + return peer, nil +} + +func TestPeerStatusMessage(t *testing.T) { + tester := startTestPeer() + + // Wait for the handshake status message and check it + if err := p2p.ExpectMsg(tester.stream, statusCode, []uint64{protocolVersion}); err != nil { + t.Fatalf("status message mismatch: %v", err) + } + // Terminate the node + tester.stream.Close() + + select { + case <-tester.termed: + case <-time.After(time.Second): + t.Fatalf("local close timed out") + } +} + +func TestPeerHandshakeFail(t *testing.T) { + tester := startTestPeer() + + // Wait for and check the handshake + if err := p2p.ExpectMsg(tester.stream, statusCode, []uint64{protocolVersion}); err != nil { + t.Fatalf("status message mismatch: %v", err) + } + // Send an invalid handshake status and verify disconnect + if err := p2p.SendItems(tester.stream, messagesCode); err != nil { + t.Fatalf("failed to send malformed status: %v", err) + } + select { + case <-tester.termed: + case <-time.After(time.Second): + t.Fatalf("remote close timed out") + } +} + +func TestPeerHandshakeSuccess(t *testing.T) { + tester := startTestPeer() + + // Wait for and check the handshake + if err := p2p.ExpectMsg(tester.stream, statusCode, []uint64{protocolVersion}); err != nil { + t.Fatalf("status message mismatch: %v", err) + } + // Send a valid handshake status and make sure connection stays live + if err := p2p.SendItems(tester.stream, statusCode, protocolVersion); err != nil { + t.Fatalf("failed to send status: %v", err) + } + select { + case <-tester.termed: + t.Fatalf("valid handshake disconnected") + + case <-time.After(100 * time.Millisecond): + } + // Clean up the test + tester.stream.Close() + + select { + case <-tester.termed: + case <-time.After(time.Second): + t.Fatalf("local close timed out") + } +} + +func TestPeerSend(t *testing.T) { + // Start a tester and execute the handshake + tester, err := startTestPeerInited() + if err != nil { + t.Fatalf("failed to start initialized peer: %v", err) + } + defer tester.stream.Close() + + // Construct a message and inject into the tester + message := NewMessage([]byte("peer broadcast test message")) + envelope, err := message.Wrap(DefaultPoW, Options{ + TTL: DefaultTTL, + }) + if err != nil { + t.Fatalf("failed to wrap message: %v", err) + } + if err := tester.client.Send(envelope); err != nil { + t.Fatalf("failed to send message: %v", err) + } + // Check that the message is eventually forwarded + payload := []interface{}{envelope} + if err := p2p.ExpectMsg(tester.stream, messagesCode, payload); err != nil { + t.Fatalf("message mismatch: %v", err) + } + // Make sure that even with a re-insert, an empty batch is received + if err := tester.client.Send(envelope); err != nil { + t.Fatalf("failed to send message: %v", err) + } + if err := p2p.ExpectMsg(tester.stream, messagesCode, []interface{}{}); err != nil { + t.Fatalf("message mismatch: %v", err) + } +} + +func TestPeerDeliver(t *testing.T) { + // Start a tester and execute the handshake + tester, err := startTestPeerInited() + if err != nil { + t.Fatalf("failed to start initialized peer: %v", err) + } + defer tester.stream.Close() + + // Watch for all inbound messages + arrived := make(chan struct{}, 1) + tester.client.Watch(Filter{ + Fn: func(message *Message) { + arrived <- struct{}{} + }, + }) + // Construct a message and deliver it to the tester peer + message := NewMessage([]byte("peer broadcast test message")) + envelope, err := message.Wrap(DefaultPoW, Options{ + TTL: DefaultTTL, + }) + if err != nil { + t.Fatalf("failed to wrap message: %v", err) + } + if err := p2p.Send(tester.stream, messagesCode, []*Envelope{envelope}); err != nil { + t.Fatalf("failed to transfer message: %v", err) + } + // Check that the message is delivered upstream + select { + case <-arrived: + case <-time.After(time.Second): + t.Fatalf("message delivery timeout") + } + // Check that a resend is not delivered + if err := p2p.Send(tester.stream, messagesCode, []*Envelope{envelope}); err != nil { + t.Fatalf("failed to transfer message: %v", err) + } + select { + case <-time.After(2 * transmissionCycle): + case <-arrived: + t.Fatalf("repeating message arrived") + } +} + +func TestPeerMessageExpiration(t *testing.T) { + // Start a tester and execute the handshake + tester, err := startTestPeerInited() + if err != nil { + t.Fatalf("failed to start initialized peer: %v", err) + } + defer tester.stream.Close() + + // Fetch the peer instance for later inspection + tester.client.peerMu.RLock() + if peers := len(tester.client.peers); peers != 1 { + t.Fatalf("peer pool size mismatch: have %v, want %v", peers, 1) + } + var peer *peer + for peer, _ = range tester.client.peers { + break + } + tester.client.peerMu.RUnlock() + + // Construct a message and pass it through the tester + message := NewMessage([]byte("peer test message")) + envelope, err := message.Wrap(DefaultPoW, Options{ + TTL: time.Second, + }) + if err != nil { + t.Fatalf("failed to wrap message: %v", err) + } + if err := tester.client.Send(envelope); err != nil { + t.Fatalf("failed to send message: %v", err) + } + payload := []interface{}{envelope} + if err := p2p.ExpectMsg(tester.stream, messagesCode, payload); err != nil { + t.Fatalf("message mismatch: %v", err) + } + // Check that the message is inside the cache + if !peer.known.Has(envelope.Hash()) { + t.Fatalf("message not found in cache") + } + // Discard messages until expiration and check cache again + exp := time.Now().Add(time.Second + expirationCycle) + for time.Now().Before(exp) { + if err := p2p.ExpectMsg(tester.stream, messagesCode, []interface{}{}); err != nil { + t.Fatalf("message mismatch: %v", err) + } + } + if peer.known.Has(envelope.Hash()) { + t.Fatalf("message not expired from cache") + } +} diff --git a/whisper/sort.go b/whisper/sort.go deleted file mode 100644 index 313ba5ac0..000000000 --- a/whisper/sort.go +++ /dev/null @@ -1,29 +0,0 @@ -package whisper - -import ( - "sort" - - "github.com/ethereum/go-ethereum/common" -) - -type sortedKeys struct { - k []int32 -} - -func (self *sortedKeys) Len() int { return len(self.k) } -func (self *sortedKeys) Less(i, j int) bool { return self.k[i] < self.k[j] } -func (self *sortedKeys) Swap(i, j int) { self.k[i], self.k[j] = self.k[j], self.k[i] } - -func sortKeys(m map[int32]common.Hash) []int32 { - sorted := new(sortedKeys) - sorted.k = make([]int32, len(m)) - i := 0 - for key, _ := range m { - sorted.k[i] = key - i++ - } - - sort.Sort(sorted) - - return sorted.k -} diff --git a/whisper/sort_test.go b/whisper/sort_test.go deleted file mode 100644 index a61fde4c2..000000000 --- a/whisper/sort_test.go +++ /dev/null @@ -1,23 +0,0 @@ -package whisper - -import ( - "testing" - - "github.com/ethereum/go-ethereum/common" -) - -func TestSorting(t *testing.T) { - m := map[int32]common.Hash{ - 1: {1}, - 3: {3}, - 2: {2}, - 5: {5}, - } - exp := []int32{1, 2, 3, 5} - res := sortKeys(m) - for i, k := range res { - if k != exp[i] { - t.Error(k, "failed. Expected", exp[i]) - } - } -} diff --git a/whisper/topic.go b/whisper/topic.go new file mode 100644 index 000000000..a965c7cc2 --- /dev/null +++ b/whisper/topic.go @@ -0,0 +1,61 @@ +// Contains the Whisper protocol Topic element. For formal details please see +// the specs at https://github.com/ethereum/wiki/wiki/Whisper-PoC-1-Protocol-Spec#topics. + +package whisper + +import "github.com/ethereum/go-ethereum/crypto" + +// Topic represents a cryptographically secure, probabilistic partial +// classifications of a message, determined as the first (left) 4 bytes of the +// SHA3 hash of some arbitrary data given by the original author of the message. +type Topic [4]byte + +// NewTopic creates a topic from the 4 byte prefix of the SHA3 hash of the data. +func NewTopic(data []byte) Topic { + prefix := [4]byte{} + copy(prefix[:], crypto.Sha3(data)[:4]) + return Topic(prefix) +} + +// NewTopics creates a list of topics from a list of binary data elements, by +// iteratively calling NewTopic on each of them. +func NewTopics(data ...[]byte) []Topic { + topics := make([]Topic, len(data)) + for i, element := range data { + topics[i] = NewTopic(element) + } + return topics +} + +// NewTopicFromString creates a topic using the binary data contents of the +// specified string. +func NewTopicFromString(data string) Topic { + return NewTopic([]byte(data)) +} + +// NewTopicsFromStrings creates a list of topics from a list of textual data +// elements, by iteratively calling NewTopicFromString on each of them. +func NewTopicsFromStrings(data ...string) []Topic { + topics := make([]Topic, len(data)) + for i, element := range data { + topics[i] = NewTopicFromString(element) + } + return topics +} + +// String converts a topic byte array to a string representation. +func (self *Topic) String() string { + return string(self[:]) +} + +// TopicSet represents a hash set to check if a topic exists or not. +type topicSet map[string]struct{} + +// NewTopicSet creates a topic hash set from a slice of topics. +func newTopicSet(topics []Topic) topicSet { + set := make(map[string]struct{}) + for _, topic := range topics { + set[topic.String()] = struct{}{} + } + return topicSet(set) +} diff --git a/whisper/topic_test.go b/whisper/topic_test.go new file mode 100644 index 000000000..4015079dc --- /dev/null +++ b/whisper/topic_test.go @@ -0,0 +1,67 @@ +package whisper + +import ( + "bytes" + "testing" +) + +var topicCreationTests = []struct { + data []byte + hash [4]byte +}{ + {hash: [4]byte{0xc5, 0xd2, 0x46, 0x01}, data: nil}, + {hash: [4]byte{0xc5, 0xd2, 0x46, 0x01}, data: []byte{}}, + {hash: [4]byte{0x8f, 0x9a, 0x2b, 0x7d}, data: []byte("test name")}, +} + +func TestTopicCreation(t *testing.T) { + // Create the topics individually + for i, tt := range topicCreationTests { + topic := NewTopic(tt.data) + if bytes.Compare(topic[:], tt.hash[:]) != 0 { + t.Errorf("binary test %d: hash mismatch: have %v, want %v.", i, topic, tt.hash) + } + } + for i, tt := range topicCreationTests { + topic := NewTopicFromString(string(tt.data)) + if bytes.Compare(topic[:], tt.hash[:]) != 0 { + t.Errorf("textual test %d: hash mismatch: have %v, want %v.", i, topic, tt.hash) + } + } + // Create the topics in batches + binaryData := make([][]byte, len(topicCreationTests)) + for i, tt := range topicCreationTests { + binaryData[i] = tt.data + } + textualData := make([]string, len(topicCreationTests)) + for i, tt := range topicCreationTests { + textualData[i] = string(tt.data) + } + + topics := NewTopics(binaryData...) + for i, tt := range topicCreationTests { + if bytes.Compare(topics[i][:], tt.hash[:]) != 0 { + t.Errorf("binary batch test %d: hash mismatch: have %v, want %v.", i, topics[i], tt.hash) + } + } + topics = NewTopicsFromStrings(textualData...) + for i, tt := range topicCreationTests { + if bytes.Compare(topics[i][:], tt.hash[:]) != 0 { + t.Errorf("textual batch test %d: hash mismatch: have %v, want %v.", i, topics[i], tt.hash) + } + } +} + +func TestTopicSetCreation(t *testing.T) { + topics := make([]Topic, len(topicCreationTests)) + for i, tt := range topicCreationTests { + topics[i] = NewTopic(tt.data) + } + set := newTopicSet(topics) + for i, tt := range topicCreationTests { + topic := NewTopic(tt.data) + if _, ok := set[topic.String()]; !ok { + t.Errorf("topic %d: not found in set", i) + } + } +} diff --git a/whisper/util.go b/whisper/util.go deleted file mode 100644 index 7a222395f..000000000 --- a/whisper/util.go +++ /dev/null @@ -1,36 +0,0 @@ -package whisper - -import "github.com/ethereum/go-ethereum/crypto" - -func hashTopic(topic []byte) []byte { - return crypto.Sha3(topic)[:4] -} - -// NOTE this isn't DRY, but I don't want to iterate twice. - -// Returns a formatted topics byte slice. -// data: unformatted data (e.g., no hashes needed) -func Topics(data [][]byte) [][]byte { - d := make([][]byte, len(data)) - for i, byts := range data { - d[i] = hashTopic(byts) - } - return d -} - -func TopicsFromString(data ...string) [][]byte { - d := make([][]byte, len(data)) - for i, str := range data { - d[i] = hashTopic([]byte(str)) - } - return d -} - -func bytesToMap(s [][]byte) map[string]struct{} { - m := make(map[string]struct{}) - for _, topic := range s { - m[string(topic)] = struct{}{} - } - - return m -} diff --git a/whisper/whisper.go b/whisper/whisper.go index d803e27d4..9317fad50 100644 --- a/whisper/whisper.go +++ b/whisper/whisper.go @@ -2,7 +2,6 @@ package whisper import ( "crypto/ecdsa" - "errors" "sync" "time" @@ -17,9 +16,22 @@ import ( ) const ( - statusMsg = 0x0 - envelopesMsg = 0x01 - whisperVersion = 0x02 + statusCode = 0x00 + messagesCode = 0x01 + + protocolVersion uint64 = 0x02 + protocolName = "shh" + + signatureFlag = byte(1 << 7) + signatureLength = 65 + + expirationCycle = 800 * time.Millisecond + transmissionCycle = 300 * time.Millisecond +) + +const ( + DefaultTTL = 50 * time.Second + DefaultPoW = 50 * time.Millisecond ) type MessageEvent struct { @@ -28,250 +40,298 @@ type MessageEvent struct { Message *Message } -const ( - DefaultTimeToLive = 50 * time.Second - DefaultProofOfWork = 50 * time.Millisecond -) - +// Whisper represents a dark communication interface through the Ethereum +// network, using its very own P2P communication layer. type Whisper struct { protocol p2p.Protocol filters *filter.Filters - mmu sync.RWMutex - messages map[common.Hash]*Envelope - expiry map[uint32]*set.SetNonTS + keys map[string]*ecdsa.PrivateKey - quit chan struct{} + messages map[common.Hash]*Envelope // Pool of messages currently tracked by this node + expirations map[uint32]*set.SetNonTS // Message expiration pool (TODO: something lighter) + poolMu sync.RWMutex // Mutex to sync the message and expiration pools - keys map[string]*ecdsa.PrivateKey + peers map[*peer]struct{} // Set of currently active peers + peerMu sync.RWMutex // Mutex to sync the active peer set + + quit chan struct{} } func New() *Whisper { whisper := &Whisper{ - messages: make(map[common.Hash]*Envelope), - filters: filter.New(), - expiry: make(map[uint32]*set.SetNonTS), - quit: make(chan struct{}), - keys: make(map[string]*ecdsa.PrivateKey), + filters: filter.New(), + keys: make(map[string]*ecdsa.PrivateKey), + messages: make(map[common.Hash]*Envelope), + expirations: make(map[uint32]*set.SetNonTS), + peers: make(map[*peer]struct{}), + quit: make(chan struct{}), } whisper.filters.Start() // p2p whisper sub protocol handler whisper.protocol = p2p.Protocol{ - Name: "shh", - Version: uint(whisperVersion), + Name: protocolName, + Version: uint(protocolVersion), Length: 2, - Run: whisper.msgHandler, + Run: whisper.handlePeer, } return whisper } -func (self *Whisper) Version() uint { - return self.protocol.Version -} - -func (self *Whisper) Start() { - glog.V(logger.Info).Infoln("Whisper started") - go self.update() -} - -func (self *Whisper) Stop() { - close(self.quit) +// Protocol returns the whisper sub-protocol handler for this particular client. +func (self *Whisper) Protocol() p2p.Protocol { + return self.protocol } -func (self *Whisper) Send(envelope *Envelope) error { - return self.add(envelope) +// Version returns the whisper sub-protocols version number. +func (self *Whisper) Version() uint { + return self.protocol.Version } +// NewIdentity generates a new cryptographic identity for the client, and injects +// it into the known identities for message decryption. func (self *Whisper) NewIdentity() *ecdsa.PrivateKey { key, err := crypto.GenerateKey() if err != nil { panic(err) } - self.keys[string(crypto.FromECDSAPub(&key.PublicKey))] = key return key } +// HasIdentity checks if the the whisper node is configured with the private key +// of the specified public pair. func (self *Whisper) HasIdentity(key *ecdsa.PublicKey) bool { return self.keys[string(crypto.FromECDSAPub(key))] != nil } +// GetIdentity retrieves the private key of the specified public identity. func (self *Whisper) GetIdentity(key *ecdsa.PublicKey) *ecdsa.PrivateKey { return self.keys[string(crypto.FromECDSAPub(key))] } -// func (self *Whisper) RemoveIdentity(key *ecdsa.PublicKey) bool { -// k := string(crypto.FromECDSAPub(key)) -// if _, ok := self.keys[k]; ok { -// delete(self.keys, k) -// return true -// } -// return false -// } - -func (self *Whisper) Watch(opts Filter) int { - return self.filters.Install(filter.Generic{ - Str1: string(crypto.FromECDSAPub(opts.To)), - Str2: string(crypto.FromECDSAPub(opts.From)), - Data: bytesToMap(opts.Topics), +// Watch installs a new message handler to run in case a matching packet arrives +// from the whisper network. +func (self *Whisper) Watch(options Filter) int { + filter := filter.Generic{ + Str1: string(crypto.FromECDSAPub(options.To)), + Str2: string(crypto.FromECDSAPub(options.From)), + Data: newTopicSet(options.Topics), Fn: func(data interface{}) { - opts.Fn(data.(*Message)) + options.Fn(data.(*Message)) }, - }) + } + return self.filters.Install(filter) } +// Unwatch removes an installed message handler. func (self *Whisper) Unwatch(id int) { self.filters.Uninstall(id) } -func (self *Whisper) Messages(id int) (messages []*Message) { - filter := self.filters.Get(id) - if filter != nil { - for _, e := range self.messages { - if msg, key := self.open(e); msg != nil { - f := createFilter(msg, e.Topics, key) - if self.filters.Match(filter, f) { - messages = append(messages, msg) +// Send injects a message into the whisper send queue, to be distributed in the +// network in the coming cycles. +func (self *Whisper) Send(envelope *Envelope) error { + return self.add(envelope) +} + +func (self *Whisper) Start() { + glog.V(logger.Info).Infoln("Whisper started") + go self.update() +} + +func (self *Whisper) Stop() { + close(self.quit) + glog.V(logger.Info).Infoln("Whisper stopped") +} + +// Messages retrieves the currently pooled messages matching a filter id. +func (self *Whisper) Messages(id int) []*Message { + messages := make([]*Message, 0) + if filter := self.filters.Get(id); filter != nil { + for _, envelope := range self.messages { + if message := self.open(envelope); message != nil { + if self.filters.Match(filter, createFilter(message, envelope.Topics)) { + messages = append(messages, message) } } } } - - return + return messages } -// Main handler for passing whisper messages to whisper peer objects -func (self *Whisper) msgHandler(peer *p2p.Peer, ws p2p.MsgReadWriter) error { - wpeer := NewPeer(self, peer, ws) - // initialise whisper peer (handshake/status) - if err := wpeer.init(); err != nil { +// func (self *Whisper) RemoveIdentity(key *ecdsa.PublicKey) bool { +// k := string(crypto.FromECDSAPub(key)) +// if _, ok := self.keys[k]; ok { +// delete(self.keys, k) +// return true +// } +// return false +// } + +// handlePeer is called by the underlying P2P layer when the whisper sub-protocol +// connection is negotiated. +func (self *Whisper) handlePeer(peer *p2p.Peer, rw p2p.MsgReadWriter) error { + // Create, initialize and start the whisper peer + whisperPeer, err := newPeer(self, peer, rw) + if err != nil { return err } - // kick of the main handler for broadcasting/managing envelopes - go wpeer.start() - defer wpeer.stop() - - // Main *read* loop. Writing is done by the peer it self. + whisperPeer.start() + defer whisperPeer.stop() + + // Start tracking the active peer + self.peerMu.Lock() + self.peers[whisperPeer] = struct{}{} + self.peerMu.Unlock() + + defer func() { + self.peerMu.Lock() + delete(self.peers, whisperPeer) + self.peerMu.Unlock() + }() + // Read and process inbound messages directly to merge into client-global state for { - msg, err := ws.ReadMsg() + // Fetch the next packet and decode the contained envelopes + packet, err := rw.ReadMsg() if err != nil { return err } - var envelopes []*Envelope - if err := msg.Decode(&envelopes); err != nil { - peer.Infoln(err) + if err := packet.Decode(&envelopes); err != nil { + peer.Infof("failed to decode enveloped: %v", err) continue } - + // Inject all envelopes into the internal pool for _, envelope := range envelopes { if err := self.add(envelope); err != nil { // TODO Punish peer here. Invalid envelope. - peer.Debugln(err) + peer.Debugf("failed to pool envelope: %f", err) } - wpeer.addKnown(envelope) + whisperPeer.mark(envelope) } } } -// takes care of adding envelopes to the messages pool. At this moment no sanity checks are being performed. +// add inserts a new envelope into the message pool to be distributed within the +// whisper network. It also inserts the envelope into the expiration pool at the +// appropriate time-stamp. func (self *Whisper) add(envelope *Envelope) error { - if !envelope.valid() { - return errors.New("invalid pow provided for envelope") - } - - self.mmu.Lock() - defer self.mmu.Unlock() + self.poolMu.Lock() + defer self.poolMu.Unlock() + // Insert the message into the tracked pool hash := envelope.Hash() + if _, ok := self.messages[hash]; ok { + glog.V(logger.Detail).Infof("whisper envelope already cached: %x\n", envelope) + return nil + } self.messages[hash] = envelope - if self.expiry[envelope.Expiry] == nil { - self.expiry[envelope.Expiry] = set.NewNonTS() + + // Insert the message into the expiration pool for later removal + if self.expirations[envelope.Expiry] == nil { + self.expirations[envelope.Expiry] = set.NewNonTS() } + if !self.expirations[envelope.Expiry].Has(hash) { + self.expirations[envelope.Expiry].Add(hash) - if !self.expiry[envelope.Expiry].Has(hash) { - self.expiry[envelope.Expiry].Add(hash) + // Notify the local node of a message arrival go self.postEvent(envelope) } + glog.V(logger.Detail).Infof("cached whisper envelope %x\n", envelope) - glog.V(logger.Detail).Infof("added whisper envelope %x\n", envelope) + return nil +} + +// postEvent opens an envelope with the configured identities and delivers the +// message upstream from application processing. +func (self *Whisper) postEvent(envelope *Envelope) { + if message := self.open(envelope); message != nil { + self.filters.Notify(createFilter(message, envelope.Topics), message) + } +} +// open tries to decrypt a whisper envelope with all the configured identities, +// returning the decrypted message and the key used to achieve it. If not keys +// are configured, open will return the payload as if non encrypted. +func (self *Whisper) open(envelope *Envelope) *Message { + // Short circuit if no identity is set, and assume clear-text + if len(self.keys) == 0 { + if message, err := envelope.Open(nil); err == nil { + return message + } + } + // Iterate over the keys and try to decrypt the message + for _, key := range self.keys { + message, err := envelope.Open(key) + if err == nil || err == ecies.ErrInvalidPublicKey { + message.To = &key.PublicKey + return message + } + } + // Failed to decrypt, don't return anything return nil } +// createFilter creates a message filter to check against installed handlers. +func createFilter(message *Message, topics []Topic) filter.Filter { + return filter.Generic{ + Str1: string(crypto.FromECDSAPub(message.To)), + Str2: string(crypto.FromECDSAPub(message.Recover())), + Data: newTopicSet(topics), + } +} + +// update loops until the lifetime of the whisper node, updating its internal +// state by expiring stale messages from the pool. func (self *Whisper) update() { - expire := time.NewTicker(800 * time.Millisecond) -out: + // Start a ticker to check for expirations + expire := time.NewTicker(expirationCycle) + + // Repeat updates until termination is requested for { select { case <-expire.C: self.expire() + case <-self.quit: - break out + return } } } +// expire iterates over all the expiration timestamps, removing all stale +// messages from the pools. func (self *Whisper) expire() { - self.mmu.Lock() - defer self.mmu.Unlock() + self.poolMu.Lock() + defer self.poolMu.Unlock() now := uint32(time.Now().Unix()) - for then, hashSet := range self.expiry { + for then, hashSet := range self.expirations { + // Short circuit if a future time if then > now { continue } - + // Dump all expired messages and remove timestamp hashSet.Each(func(v interface{}) bool { delete(self.messages, v.(common.Hash)) return true }) - self.expiry[then].Clear() + self.expirations[then].Clear() } } -func (self *Whisper) envelopes() (envelopes []*Envelope) { - self.mmu.RLock() - defer self.mmu.RUnlock() +// envelopes retrieves all the messages currently pooled by the node. +func (self *Whisper) envelopes() []*Envelope { + self.poolMu.RLock() + defer self.poolMu.RUnlock() - envelopes = make([]*Envelope, len(self.messages)) - i := 0 + envelopes := make([]*Envelope, 0, len(self.messages)) for _, envelope := range self.messages { - envelopes[i] = envelope - i++ - } - - return -} - -func (self *Whisper) postEvent(envelope *Envelope) { - if message, key := self.open(envelope); message != nil { - self.filters.Notify(createFilter(message, envelope.Topics, key), message) - } -} - -func (self *Whisper) open(envelope *Envelope) (*Message, *ecdsa.PrivateKey) { - for _, key := range self.keys { - if message, err := envelope.Open(key); err == nil || (err != nil && err == ecies.ErrInvalidPublicKey) { - message.To = &key.PublicKey - - return message, key - } - } - - return nil, nil -} - -func (self *Whisper) Protocol() p2p.Protocol { - return self.protocol -} - -func createFilter(message *Message, topics [][]byte, key *ecdsa.PrivateKey) filter.Filter { - return filter.Generic{ - Str1: string(crypto.FromECDSAPub(&key.PublicKey)), Str2: string(crypto.FromECDSAPub(message.Recover())), - Data: bytesToMap(topics), + envelopes = append(envelopes, envelope) } + return envelopes } diff --git a/whisper/whisper_test.go b/whisper/whisper_test.go index b29e34a5e..def8e68d8 100644 --- a/whisper/whisper_test.go +++ b/whisper/whisper_test.go @@ -1,38 +1,185 @@ package whisper import ( - "fmt" "testing" "time" + + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/discover" ) -func TestEvent(t *testing.T) { - res := make(chan *Message, 1) - whisper := New() - id := whisper.NewIdentity() - whisper.Watch(Filter{ - To: &id.PublicKey, +func startTestCluster(n int) []*Whisper { + // Create the batch of simulated peers + nodes := make([]*p2p.Peer, n) + for i := 0; i < n; i++ { + nodes[i] = p2p.NewPeer(discover.NodeID{}, "", nil) + } + whispers := make([]*Whisper, n) + for i := 0; i < n; i++ { + whispers[i] = New() + whispers[i].Start() + } + // Wire all the peers to the root one + for i := 1; i < n; i++ { + src, dst := p2p.MsgPipe() + + go whispers[0].handlePeer(nodes[i], src) + go whispers[i].handlePeer(nodes[0], dst) + } + return whispers +} + +func TestSelfMessage(t *testing.T) { + // Start the single node cluster + client := startTestCluster(1)[0] + + // Start watching for self messages, signal any arrivals + self := client.NewIdentity() + done := make(chan struct{}) + + client.Watch(Filter{ + To: &self.PublicKey, Fn: func(msg *Message) { - res <- msg + close(done) }, }) - - msg := NewMessage([]byte(fmt.Sprintf("Hello world. This is whisper-go. Incase you're wondering; the time is %v", time.Now()))) - envelope, err := msg.Wrap(DefaultProofOfWork, Options{ - TTL: DefaultTimeToLive, - From: id, - To: &id.PublicKey, + // Send a dummy message to oneself + msg := NewMessage([]byte("self whisper")) + envelope, err := msg.Wrap(DefaultPoW, Options{ + From: self, + To: &self.PublicKey, + TTL: DefaultTTL, }) if err != nil { - fmt.Println(err) - t.FailNow() + t.Fatalf("failed to wrap message: %v", err) + } + // Dump the message into the system and wait for it to pop back out + if err := client.Send(envelope); err != nil { + t.Fatalf("failed to send self-message: %v", err) + } + select { + case <-done: + case <-time.After(time.Second): + t.Fatalf("self-message receive timeout") } +} + +func TestDirectMessage(t *testing.T) { + // Start the sender-recipient cluster + cluster := startTestCluster(2) - tick := time.NewTicker(time.Second) - whisper.postEvent(envelope) + sender := cluster[0] + senderId := sender.NewIdentity() + + recipient := cluster[1] + recipientId := recipient.NewIdentity() + + // Watch for arriving messages on the recipient + done := make(chan struct{}) + recipient.Watch(Filter{ + To: &recipientId.PublicKey, + Fn: func(msg *Message) { + close(done) + }, + }) + // Send a dummy message from the sender + msg := NewMessage([]byte("direct whisper")) + envelope, err := msg.Wrap(DefaultPoW, Options{ + From: senderId, + To: &recipientId.PublicKey, + TTL: DefaultTTL, + }) + if err != nil { + t.Fatalf("failed to wrap message: %v", err) + } + if err := sender.Send(envelope); err != nil { + t.Fatalf("failed to send direct message: %v", err) + } + // Wait for an arrival or a timeout select { - case <-res: - case <-tick.C: - t.Error("did not receive message") + case <-done: + case <-time.After(time.Second): + t.Fatalf("direct message receive timeout") + } +} + +func TestAnonymousBroadcast(t *testing.T) { + testBroadcast(true, t) +} + +func TestIdentifiedBroadcast(t *testing.T) { + testBroadcast(false, t) +} + +func testBroadcast(anonymous bool, t *testing.T) { + // Start the single sender multi recipient cluster + cluster := startTestCluster(3) + + sender := cluster[1] + targets := cluster[1:] + for _, target := range targets { + if !anonymous { + target.NewIdentity() + } + } + // Watch for arriving messages on the recipients + dones := make([]chan struct{}, len(targets)) + for i := 0; i < len(targets); i++ { + done := make(chan struct{}) // need for the closure + dones[i] = done + + targets[i].Watch(Filter{ + Topics: NewTopicsFromStrings("broadcast topic"), + Fn: func(msg *Message) { + close(done) + }, + }) + } + // Send a dummy message from the sender + msg := NewMessage([]byte("broadcast whisper")) + envelope, err := msg.Wrap(DefaultPoW, Options{ + Topics: NewTopicsFromStrings("broadcast topic"), + TTL: DefaultTTL, + }) + if err != nil { + t.Fatalf("failed to wrap message: %v", err) + } + if err := sender.Send(envelope); err != nil { + t.Fatalf("failed to send broadcast message: %v", err) + } + // Wait for an arrival on each recipient, or timeouts + timeout := time.After(time.Second) + for _, done := range dones { + select { + case <-done: + case <-timeout: + t.Fatalf("broadcast message receive timeout") + } + } +} + +func TestMessageExpiration(t *testing.T) { + // Start the single node cluster and inject a dummy message + node := startTestCluster(1)[0] + + message := NewMessage([]byte("expiring message")) + envelope, err := message.Wrap(DefaultPoW, Options{ + TTL: time.Second, + }) + if err != nil { + t.Fatalf("failed to wrap message: %v", err) + } + if err := node.Send(envelope); err != nil { + t.Fatalf("failed to inject message: %v", err) + } + // Check that the message is inside the cache + if _, ok := node.messages[envelope.Hash()]; !ok { + t.Fatalf("message not found in cache") + } + // Wait for expiration and check cache again + time.Sleep(time.Second) // wait for expiration + time.Sleep(expirationCycle) // wait for cleanup cycle + if _, ok := node.messages[envelope.Hash()]; ok { + t.Fatalf("message not expired from cache") } } |