diff options
Diffstat (limited to 'whisper')
-rw-r--r-- | whisper/doc.go | 16 | ||||
-rw-r--r-- | whisper/envelope.go | 129 | ||||
-rw-r--r-- | whisper/filter.go | 10 | ||||
-rw-r--r-- | whisper/main.go | 37 | ||||
-rw-r--r-- | whisper/message.go | 81 | ||||
-rw-r--r-- | whisper/messages_test.go | 50 | ||||
-rw-r--r-- | whisper/peer.go | 113 | ||||
-rw-r--r-- | whisper/sort.go | 29 | ||||
-rw-r--r-- | whisper/sort_test.go | 23 | ||||
-rw-r--r-- | whisper/util.go | 36 | ||||
-rw-r--r-- | whisper/whisper.go | 275 | ||||
-rw-r--r-- | whisper/whisper_test.go | 38 |
12 files changed, 837 insertions, 0 deletions
diff --git a/whisper/doc.go b/whisper/doc.go new file mode 100644 index 000000000..986df8fb9 --- /dev/null +++ b/whisper/doc.go @@ -0,0 +1,16 @@ +/* +Package whisper implements the Whisper PoC-1. + +(https://github.com/ethereum/wiki/wiki/Whisper-PoC-1-Protocol-Spec) + +Whisper combines aspects of both DHTs and datagram messaging systems (e.g. UDP). +As such it may be likened and compared to both, not dissimilar to the +matter/energy duality (apologies to physicists for the blatant abuse of a +fundamental and beautiful natural principle). + +Whisper is a pure identity-based messaging system. Whisper provides a low-level +(non-application-specific) but easily-accessible API without being based upon +or prejudiced by the low-level hardware attributes and characteristics, +particularly the notion of singular endpoints. +*/ +package whisper diff --git a/whisper/envelope.go b/whisper/envelope.go new file mode 100644 index 000000000..20e3e6d39 --- /dev/null +++ b/whisper/envelope.go @@ -0,0 +1,129 @@ +package whisper + +import ( + "crypto/ecdsa" + "encoding/binary" + "fmt" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/crypto/ecies" + "github.com/ethereum/go-ethereum/rlp" +) + +const ( + DefaultPow = 50 * time.Millisecond +) + +type Envelope struct { + Expiry uint32 // Whisper protocol specifies int32, really should be int64 + TTL uint32 // ^^^^^^ + Topics [][]byte + Data []byte + Nonce uint32 + + hash common.Hash +} + +func (self *Envelope) Hash() common.Hash { + if (self.hash == common.Hash{}) { + enc, _ := rlp.EncodeToBytes(self) + self.hash = crypto.Sha3Hash(enc) + } + return self.hash +} + +func NewEnvelope(ttl time.Duration, topics [][]byte, data *Message) *Envelope { + exp := time.Now().Add(ttl) + return &Envelope{ + Expiry: uint32(exp.Unix()), + TTL: uint32(ttl.Seconds()), + Topics: topics, + Data: data.Bytes(), + Nonce: 0, + } +} + +func (self *Envelope) Seal(pow time.Duration) { + self.proveWork(pow) +} + +func (self *Envelope) Open(prv *ecdsa.PrivateKey) (msg *Message, err error) { + data := self.Data + var message Message + dataStart := 1 + if data[0] > 0 { + if len(data) < 66 { + return nil, fmt.Errorf("unable to open envelope. First bit set but len(data) < 66") + } + dataStart = 66 + message.Flags = data[0] + message.Signature = data[1:66] + } + + payload := data[dataStart:] + if prv != nil { + message.Payload, err = crypto.Decrypt(prv, payload) + switch err { + case nil: // OK + case ecies.ErrInvalidPublicKey: // Payload isn't encrypted + message.Payload = payload + return &message, err + default: + return nil, fmt.Errorf("unable to open envelope. Decrypt failed: %v", err) + } + } + + return &message, nil +} + +func (self *Envelope) proveWork(dura time.Duration) { + var bestBit int + d := make([]byte, 64) + enc, _ := rlp.EncodeToBytes(self.withoutNonce()) + copy(d[:32], enc) + + then := time.Now().Add(dura).UnixNano() + for n := uint32(0); time.Now().UnixNano() < then; { + for i := 0; i < 1024; i++ { + binary.BigEndian.PutUint32(d[60:], n) + + fbs := common.FirstBitSet(common.BigD(crypto.Sha3(d))) + if fbs > bestBit { + bestBit = fbs + self.Nonce = n + } + + n++ + } + } +} + +func (self *Envelope) valid() bool { + d := make([]byte, 64) + enc, _ := rlp.EncodeToBytes(self.withoutNonce()) + copy(d[:32], enc) + binary.BigEndian.PutUint32(d[60:], self.Nonce) + return common.FirstBitSet(common.BigD(crypto.Sha3(d))) > 0 +} + +func (self *Envelope) withoutNonce() interface{} { + return []interface{}{self.Expiry, self.TTL, self.Topics, self.Data} +} + +// rlpenv is an Envelope but is not an rlp.Decoder. +// It is used for decoding because we need to +type rlpenv Envelope + +func (self *Envelope) DecodeRLP(s *rlp.Stream) error { + raw, err := s.Raw() + if err != nil { + return err + } + if err := rlp.DecodeBytes(raw, (*rlpenv)(self)); err != nil { + return err + } + self.hash = crypto.Sha3Hash(raw) + return nil +} diff --git a/whisper/filter.go b/whisper/filter.go new file mode 100644 index 000000000..b33f2c1a2 --- /dev/null +++ b/whisper/filter.go @@ -0,0 +1,10 @@ +package whisper + +import "crypto/ecdsa" + +type Filter struct { + To *ecdsa.PublicKey + From *ecdsa.PublicKey + Topics [][]byte + Fn func(*Message) +} diff --git a/whisper/main.go b/whisper/main.go new file mode 100644 index 000000000..9f35dbb8d --- /dev/null +++ b/whisper/main.go @@ -0,0 +1,37 @@ +// +build none + +package main + +import ( + "fmt" + "log" + "os" + + "github.com/ethereum/go-ethereum/crypto/secp256k1" + "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/whisper" +) + +func main() { + logger.AddLogSystem(logger.NewStdLogSystem(os.Stdout, log.LstdFlags, logger.InfoLevel)) + + pub, _ := secp256k1.GenerateKeyPair() + + whisper := whisper.New() + + srv := p2p.Server{ + MaxPeers: 10, + Identity: p2p.NewSimpleClientIdentity("whisper-go", "1.0", "", string(pub)), + ListenAddr: ":30300", + NAT: p2p.UPNP(), + + Protocols: []p2p.Protocol{whisper.Protocol()}, + } + if err := srv.Start(); err != nil { + fmt.Println("could not start server:", err) + os.Exit(1) + } + + select {} +} diff --git a/whisper/message.go b/whisper/message.go new file mode 100644 index 000000000..5d9e5b5c1 --- /dev/null +++ b/whisper/message.go @@ -0,0 +1,81 @@ +package whisper + +import ( + "crypto/ecdsa" + "time" + + "github.com/ethereum/go-ethereum/crypto" +) + +type Message struct { + Flags byte + Signature []byte + Payload []byte + Sent int64 + + To *ecdsa.PublicKey +} + +func NewMessage(payload []byte) *Message { + return &Message{Flags: 0, Payload: payload, Sent: time.Now().Unix()} +} + +func (self *Message) hash() []byte { + return crypto.Sha3(append([]byte{self.Flags}, self.Payload...)) +} + +func (self *Message) sign(key *ecdsa.PrivateKey) (err error) { + self.Flags = 1 + self.Signature, err = crypto.Sign(self.hash(), key) + return +} + +func (self *Message) Recover() *ecdsa.PublicKey { + defer func() { recover() }() // in case of invalid sig + return crypto.SigToPub(self.hash(), self.Signature) +} + +func (self *Message) Encrypt(to *ecdsa.PublicKey) (err error) { + self.Payload, err = crypto.Encrypt(to, self.Payload) + if err != nil { + return err + } + + return nil +} + +func (self *Message) Bytes() []byte { + return append([]byte{self.Flags}, append(self.Signature, self.Payload...)...) +} + +type Opts struct { + From *ecdsa.PrivateKey + To *ecdsa.PublicKey + Ttl time.Duration + Topics [][]byte +} + +func (self *Message) Seal(pow time.Duration, opts Opts) (*Envelope, error) { + if opts.From != nil { + err := self.sign(opts.From) + if err != nil { + return nil, err + } + } + + if opts.To != nil { + err := self.Encrypt(opts.To) + if err != nil { + return nil, err + } + } + + if opts.Ttl == 0 { + opts.Ttl = DefaultTtl + } + + envelope := NewEnvelope(opts.Ttl, opts.Topics, self) + envelope.Seal(pow) + + return envelope, nil +} diff --git a/whisper/messages_test.go b/whisper/messages_test.go new file mode 100644 index 000000000..93caa31b3 --- /dev/null +++ b/whisper/messages_test.go @@ -0,0 +1,50 @@ +package whisper + +import ( + "bytes" + "crypto/elliptic" + "fmt" + "testing" + + "github.com/ethereum/go-ethereum/crypto" +) + +func TestSign(t *testing.T) { + prv, _ := crypto.GenerateKey() + msg := NewMessage([]byte("hello world")) + msg.sign(prv) + + pubKey := msg.Recover() + p1 := elliptic.Marshal(crypto.S256(), prv.PublicKey.X, prv.PublicKey.Y) + p2 := elliptic.Marshal(crypto.S256(), pubKey.X, pubKey.Y) + + if !bytes.Equal(p1, p2) { + t.Error("recovered pub key did not match") + } +} + +func TestMessageEncryptDecrypt(t *testing.T) { + prv1, _ := crypto.GenerateKey() + prv2, _ := crypto.GenerateKey() + + data := []byte("hello world") + msg := NewMessage(data) + envelope, err := msg.Seal(DefaultPow, Opts{ + From: prv1, + To: &prv2.PublicKey, + }) + if err != nil { + fmt.Println(err) + t.FailNow() + } + + msg1, err := envelope.Open(prv2) + if err != nil { + t.Error(err) + t.FailNow() + } + + if !bytes.Equal(msg1.Payload, data) { + t.Error("encryption error. data did not match") + } +} diff --git a/whisper/peer.go b/whisper/peer.go new file mode 100644 index 000000000..338166c25 --- /dev/null +++ b/whisper/peer.go @@ -0,0 +1,113 @@ +package whisper + +import ( + "fmt" + "time" + + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/rlp" + "gopkg.in/fatih/set.v0" +) + +const ( + protocolVersion uint64 = 0x02 +) + +type peer struct { + host *Whisper + peer *p2p.Peer + ws p2p.MsgReadWriter + + // XXX Eventually this is going to reach exceptional large space. We need an expiry here + known *set.Set + + quit chan struct{} +} + +func NewPeer(host *Whisper, p *p2p.Peer, ws p2p.MsgReadWriter) *peer { + return &peer{host, p, ws, set.New(), make(chan struct{})} +} + +func (self *peer) init() error { + if err := self.handleStatus(); err != nil { + return err + } + + return nil +} + +func (self *peer) start() { + go self.update() + self.peer.Debugln("whisper started") +} + +func (self *peer) stop() { + self.peer.Debugln("whisper stopped") + + close(self.quit) +} + +func (self *peer) update() { + relay := time.NewTicker(300 * time.Millisecond) +out: + for { + select { + case <-relay.C: + err := self.broadcast(self.host.envelopes()) + if err != nil { + self.peer.Infoln("broadcast err:", err) + break out + } + + case <-self.quit: + break out + } + } +} + +func (self *peer) broadcast(envelopes []*Envelope) error { + envs := make([]*Envelope, 0, len(envelopes)) + for _, env := range envelopes { + if !self.known.Has(env.Hash()) { + envs = append(envs, env) + self.known.Add(env.Hash()) + } + } + if len(envs) > 0 { + if err := p2p.Send(self.ws, envelopesMsg, envs); err != nil { + return err + } + self.peer.DebugDetailln("broadcasted", len(envs), "message(s)") + } + return nil +} + +func (self *peer) addKnown(envelope *Envelope) { + self.known.Add(envelope.Hash()) +} + +func (self *peer) handleStatus() error { + ws := self.ws + if err := p2p.SendItems(ws, statusMsg, protocolVersion); err != nil { + return err + } + msg, err := ws.ReadMsg() + if err != nil { + return err + } + if msg.Code != statusMsg { + return fmt.Errorf("peer send %x before status msg", msg.Code) + } + s := rlp.NewStream(msg.Payload) + if _, err := s.List(); err != nil { + return fmt.Errorf("bad status message: %v", err) + } + pv, err := s.Uint() + if err != nil { + return fmt.Errorf("bad status message: %v", err) + } + if pv != protocolVersion { + return fmt.Errorf("protocol version mismatch %d != %d", pv, protocolVersion) + } + return msg.Discard() // ignore anything after protocol version +} diff --git a/whisper/sort.go b/whisper/sort.go new file mode 100644 index 000000000..313ba5ac0 --- /dev/null +++ b/whisper/sort.go @@ -0,0 +1,29 @@ +package whisper + +import ( + "sort" + + "github.com/ethereum/go-ethereum/common" +) + +type sortedKeys struct { + k []int32 +} + +func (self *sortedKeys) Len() int { return len(self.k) } +func (self *sortedKeys) Less(i, j int) bool { return self.k[i] < self.k[j] } +func (self *sortedKeys) Swap(i, j int) { self.k[i], self.k[j] = self.k[j], self.k[i] } + +func sortKeys(m map[int32]common.Hash) []int32 { + sorted := new(sortedKeys) + sorted.k = make([]int32, len(m)) + i := 0 + for key, _ := range m { + sorted.k[i] = key + i++ + } + + sort.Sort(sorted) + + return sorted.k +} diff --git a/whisper/sort_test.go b/whisper/sort_test.go new file mode 100644 index 000000000..a61fde4c2 --- /dev/null +++ b/whisper/sort_test.go @@ -0,0 +1,23 @@ +package whisper + +import ( + "testing" + + "github.com/ethereum/go-ethereum/common" +) + +func TestSorting(t *testing.T) { + m := map[int32]common.Hash{ + 1: {1}, + 3: {3}, + 2: {2}, + 5: {5}, + } + exp := []int32{1, 2, 3, 5} + res := sortKeys(m) + for i, k := range res { + if k != exp[i] { + t.Error(k, "failed. Expected", exp[i]) + } + } +} diff --git a/whisper/util.go b/whisper/util.go new file mode 100644 index 000000000..7a222395f --- /dev/null +++ b/whisper/util.go @@ -0,0 +1,36 @@ +package whisper + +import "github.com/ethereum/go-ethereum/crypto" + +func hashTopic(topic []byte) []byte { + return crypto.Sha3(topic)[:4] +} + +// NOTE this isn't DRY, but I don't want to iterate twice. + +// Returns a formatted topics byte slice. +// data: unformatted data (e.g., no hashes needed) +func Topics(data [][]byte) [][]byte { + d := make([][]byte, len(data)) + for i, byts := range data { + d[i] = hashTopic(byts) + } + return d +} + +func TopicsFromString(data ...string) [][]byte { + d := make([][]byte, len(data)) + for i, str := range data { + d[i] = hashTopic([]byte(str)) + } + return d +} + +func bytesToMap(s [][]byte) map[string]struct{} { + m := make(map[string]struct{}) + for _, topic := range s { + m[string(topic)] = struct{}{} + } + + return m +} diff --git a/whisper/whisper.go b/whisper/whisper.go new file mode 100644 index 000000000..1d019aea5 --- /dev/null +++ b/whisper/whisper.go @@ -0,0 +1,275 @@ +package whisper + +import ( + "crypto/ecdsa" + "errors" + "sync" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/crypto/ecies" + "github.com/ethereum/go-ethereum/event/filter" + "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/p2p" + "gopkg.in/fatih/set.v0" +) + +const ( + statusMsg = 0x0 + envelopesMsg = 0x01 + whisperVersion = 0x02 +) + +type MessageEvent struct { + To *ecdsa.PrivateKey + From *ecdsa.PublicKey + Message *Message +} + +const DefaultTtl = 50 * time.Second + +var wlogger = logger.NewLogger("SHH") + +type Whisper struct { + protocol p2p.Protocol + filters *filter.Filters + + mmu sync.RWMutex + messages map[common.Hash]*Envelope + expiry map[uint32]*set.SetNonTS + + quit chan struct{} + + keys map[string]*ecdsa.PrivateKey +} + +func New() *Whisper { + whisper := &Whisper{ + messages: make(map[common.Hash]*Envelope), + filters: filter.New(), + expiry: make(map[uint32]*set.SetNonTS), + quit: make(chan struct{}), + keys: make(map[string]*ecdsa.PrivateKey), + } + whisper.filters.Start() + + // p2p whisper sub protocol handler + whisper.protocol = p2p.Protocol{ + Name: "shh", + Version: uint(whisperVersion), + Length: 2, + Run: whisper.msgHandler, + } + + return whisper +} + +func (self *Whisper) Version() uint { + return self.protocol.Version +} + +func (self *Whisper) Start() { + wlogger.Infoln("Whisper started") + go self.update() +} + +func (self *Whisper) Stop() { + close(self.quit) +} + +func (self *Whisper) Send(envelope *Envelope) error { + return self.add(envelope) +} + +func (self *Whisper) NewIdentity() *ecdsa.PrivateKey { + key, err := crypto.GenerateKey() + if err != nil { + panic(err) + } + + self.keys[string(crypto.FromECDSAPub(&key.PublicKey))] = key + + return key +} + +func (self *Whisper) HasIdentity(key *ecdsa.PublicKey) bool { + return self.keys[string(crypto.FromECDSAPub(key))] != nil +} + +func (self *Whisper) GetIdentity(key *ecdsa.PublicKey) *ecdsa.PrivateKey { + return self.keys[string(crypto.FromECDSAPub(key))] +} + +// func (self *Whisper) RemoveIdentity(key *ecdsa.PublicKey) bool { +// k := string(crypto.FromECDSAPub(key)) +// if _, ok := self.keys[k]; ok { +// delete(self.keys, k) +// return true +// } +// return false +// } + +func (self *Whisper) Watch(opts Filter) int { + return self.filters.Install(filter.Generic{ + Str1: string(crypto.FromECDSAPub(opts.To)), + Str2: string(crypto.FromECDSAPub(opts.From)), + Data: bytesToMap(opts.Topics), + Fn: func(data interface{}) { + opts.Fn(data.(*Message)) + }, + }) +} + +func (self *Whisper) Unwatch(id int) { + self.filters.Uninstall(id) +} + +func (self *Whisper) Messages(id int) (messages []*Message) { + filter := self.filters.Get(id) + if filter != nil { + for _, e := range self.messages { + if msg, key := self.open(e); msg != nil { + f := createFilter(msg, e.Topics, key) + if self.filters.Match(filter, f) { + messages = append(messages, msg) + } + } + } + } + + return +} + +// Main handler for passing whisper messages to whisper peer objects +func (self *Whisper) msgHandler(peer *p2p.Peer, ws p2p.MsgReadWriter) error { + wpeer := NewPeer(self, peer, ws) + // initialise whisper peer (handshake/status) + if err := wpeer.init(); err != nil { + return err + } + // kick of the main handler for broadcasting/managing envelopes + go wpeer.start() + defer wpeer.stop() + + // Main *read* loop. Writing is done by the peer it self. + for { + msg, err := ws.ReadMsg() + if err != nil { + return err + } + + var envelopes []*Envelope + if err := msg.Decode(&envelopes); err != nil { + peer.Infoln(err) + continue + } + + for _, envelope := range envelopes { + if err := self.add(envelope); err != nil { + // TODO Punish peer here. Invalid envelope. + peer.Infoln(err) + } + wpeer.addKnown(envelope) + } + } +} + +// takes care of adding envelopes to the messages pool. At this moment no sanity checks are being performed. +func (self *Whisper) add(envelope *Envelope) error { + if !envelope.valid() { + return errors.New("invalid pow provided for envelope") + } + + self.mmu.Lock() + defer self.mmu.Unlock() + + hash := envelope.Hash() + self.messages[hash] = envelope + if self.expiry[envelope.Expiry] == nil { + self.expiry[envelope.Expiry] = set.NewNonTS() + } + + if !self.expiry[envelope.Expiry].Has(hash) { + self.expiry[envelope.Expiry].Add(hash) + go self.postEvent(envelope) + } + + wlogger.DebugDetailf("added whisper envelope %x\n", envelope) + + return nil +} + +func (self *Whisper) update() { + expire := time.NewTicker(800 * time.Millisecond) +out: + for { + select { + case <-expire.C: + self.expire() + case <-self.quit: + break out + } + } +} + +func (self *Whisper) expire() { + self.mmu.Lock() + defer self.mmu.Unlock() + + now := uint32(time.Now().Unix()) + for then, hashSet := range self.expiry { + if then > now { + continue + } + + hashSet.Each(func(v interface{}) bool { + delete(self.messages, v.(common.Hash)) + return true + }) + self.expiry[then].Clear() + } +} + +func (self *Whisper) envelopes() (envelopes []*Envelope) { + self.mmu.RLock() + defer self.mmu.RUnlock() + + envelopes = make([]*Envelope, len(self.messages)) + i := 0 + for _, envelope := range self.messages { + envelopes[i] = envelope + i++ + } + + return +} + +func (self *Whisper) postEvent(envelope *Envelope) { + if message, key := self.open(envelope); message != nil { + self.filters.Notify(createFilter(message, envelope.Topics, key), message) + } +} + +func (self *Whisper) open(envelope *Envelope) (*Message, *ecdsa.PrivateKey) { + for _, key := range self.keys { + if message, err := envelope.Open(key); err == nil || (err != nil && err == ecies.ErrInvalidPublicKey) { + message.To = &key.PublicKey + + return message, key + } + } + + return nil, nil +} + +func (self *Whisper) Protocol() p2p.Protocol { + return self.protocol +} + +func createFilter(message *Message, topics [][]byte, key *ecdsa.PrivateKey) filter.Filter { + return filter.Generic{ + Str1: string(crypto.FromECDSAPub(&key.PublicKey)), Str2: string(crypto.FromECDSAPub(message.Recover())), + Data: bytesToMap(topics), + } +} diff --git a/whisper/whisper_test.go b/whisper/whisper_test.go new file mode 100644 index 000000000..3e3945a0a --- /dev/null +++ b/whisper/whisper_test.go @@ -0,0 +1,38 @@ +package whisper + +import ( + "fmt" + "testing" + "time" +) + +func TestEvent(t *testing.T) { + res := make(chan *Message, 1) + whisper := New() + id := whisper.NewIdentity() + whisper.Watch(Filter{ + To: &id.PublicKey, + Fn: func(msg *Message) { + res <- msg + }, + }) + + msg := NewMessage([]byte(fmt.Sprintf("Hello world. This is whisper-go. Incase you're wondering; the time is %v", time.Now()))) + envelope, err := msg.Seal(DefaultPow, Opts{ + Ttl: DefaultTtl, + From: id, + To: &id.PublicKey, + }) + if err != nil { + fmt.Println(err) + t.FailNow() + } + + tick := time.NewTicker(time.Second) + whisper.postEvent(envelope) + select { + case <-res: + case <-tick.C: + t.Error("did not receive message") + } +} |