From ebe2d9d872c5482e02508f1d3e9c3a56e8a41d44 Mon Sep 17 00:00:00 2001 From: obscuren Date: Mon, 8 Dec 2014 12:43:33 +0100 Subject: First draft of Whisper messages relaying --- whisper/envelope.go | 96 +++++++++++++++++++++++++++++++ whisper/main.go | 46 +++++++++++++++ whisper/message.go | 15 +++++ whisper/peer.go | 114 +++++++++++++++++++++++++++++++++++++ whisper/sort.go | 25 ++++++++ whisper/sort_test.go | 19 +++++++ whisper/whisper.go | 157 +++++++++++++++++++++++++++++++++++++++++++++++++++ 7 files changed, 472 insertions(+) create mode 100644 whisper/envelope.go create mode 100644 whisper/main.go create mode 100644 whisper/message.go create mode 100644 whisper/peer.go create mode 100644 whisper/sort.go create mode 100644 whisper/sort_test.go create mode 100644 whisper/whisper.go (limited to 'whisper') diff --git a/whisper/envelope.go b/whisper/envelope.go new file mode 100644 index 000000000..f9254843c --- /dev/null +++ b/whisper/envelope.go @@ -0,0 +1,96 @@ +package whisper + +import ( + "bytes" + "encoding/binary" + "io" + "time" + + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/ethutil" + "github.com/ethereum/go-ethereum/rlp" +) + +const ( + DefaultTtl = 50 * time.Second +) + +type Envelope struct { + Expiry int32 // Whisper protocol specifies int32, really should be int64 + Ttl int32 // ^^^^^^ + Topics [][]byte + Data []byte + Nonce uint32 + + hash Hash +} + +func NewEnvelopeFromReader(reader io.Reader) (*Envelope, error) { + var envelope Envelope + + buf := new(bytes.Buffer) + buf.ReadFrom(reader) + + h := H(crypto.Sha3(buf.Bytes())) + if err := rlp.Decode(buf, &envelope); err != nil { + return nil, err + } + + envelope.hash = h + + return &envelope, nil +} + +func (self *Envelope) Hash() Hash { + if self.hash == EmptyHash { + self.hash = H(crypto.Sha3(ethutil.Encode(self))) + } + + return self.hash +} + +func NewEnvelope(ttl time.Duration, topics [][]byte, data *Message) *Envelope { + exp := time.Now().Add(ttl) + + return &Envelope{int32(exp.Unix()), int32(ttl.Seconds()), topics, data.Bytes(), 0, Hash{}} +} + +func (self *Envelope) Seal() { + self.proveWork(DefaultTtl) +} + +func (self *Envelope) proveWork(dura time.Duration) { + var bestBit int + d := make([]byte, 64) + copy(d[:32], ethutil.Encode(self.withoutNonce())) + + then := time.Now().Add(dura).UnixNano() + for n := uint32(0); time.Now().UnixNano() < then; { + for i := 0; i < 1024; i++ { + binary.BigEndian.PutUint32(d[60:], n) + + fbs := ethutil.FirstBitSet(ethutil.BigD(crypto.Sha3(d))) + if fbs > bestBit { + bestBit = fbs + self.Nonce = n + } + + n++ + } + } +} + +func (self *Envelope) valid() bool { + d := make([]byte, 64) + copy(d[:32], ethutil.Encode(self.withoutNonce())) + binary.BigEndian.PutUint32(d[60:], self.Nonce) + return ethutil.FirstBitSet(ethutil.BigD(crypto.Sha3(d))) > 0 +} + +func (self *Envelope) withoutNonce() interface{} { + return []interface{}{self.Expiry, self.Ttl, ethutil.ByteSliceToInterface(self.Topics), self.Data} +} + +func (self *Envelope) RlpData() interface{} { + return []interface{}{self.Expiry, self.Ttl, ethutil.ByteSliceToInterface(self.Topics), self.Data, self.Nonce} +} diff --git a/whisper/main.go b/whisper/main.go new file mode 100644 index 000000000..3868f604f --- /dev/null +++ b/whisper/main.go @@ -0,0 +1,46 @@ +// +build none + +package main + +import ( + "fmt" + "log" + "net" + "os" + + "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/whisper" + "github.com/obscuren/secp256k1-go" +) + +func main() { + logger.AddLogSystem(logger.NewStdLogSystem(os.Stdout, log.LstdFlags, logger.InfoLevel)) + + pub, sec := secp256k1.GenerateKeyPair() + + whisper := whisper.New(pub, sec) + + srv := p2p.Server{ + MaxPeers: 10, + Identity: p2p.NewSimpleClientIdentity("whisper-go", "1.0", "", string(pub)), + ListenAddr: ":30303", + NAT: p2p.UPNP(), + + Protocols: []p2p.Protocol{whisper.Protocol()}, + } + if err := srv.Start(); err != nil { + fmt.Println("could not start server:", err) + os.Exit(1) + } + + // add seed peers + seed, err := net.ResolveTCPAddr("tcp", "poc-7.ethdev.com:30300") + if err != nil { + fmt.Println("couldn't resolve:", err) + os.Exit(1) + } + srv.SuggestPeer(seed.IP, seed.Port, nil) + + select {} +} diff --git a/whisper/message.go b/whisper/message.go new file mode 100644 index 000000000..21cf163e6 --- /dev/null +++ b/whisper/message.go @@ -0,0 +1,15 @@ +package whisper + +type Message struct { + Flags byte + Signature []byte + Payload []byte +} + +func NewMessage(payload []byte) *Message { + return &Message{Flags: 0, Payload: payload} +} + +func (self *Message) Bytes() []byte { + return append([]byte{self.Flags}, append(self.Signature, self.Payload...)...) +} diff --git a/whisper/peer.go b/whisper/peer.go new file mode 100644 index 000000000..5fe50ba59 --- /dev/null +++ b/whisper/peer.go @@ -0,0 +1,114 @@ +package whisper + +import ( + "fmt" + "io/ioutil" + "time" + + "github.com/ethereum/go-ethereum/p2p" + "gopkg.in/fatih/set.v0" +) + +const ( + protocolVersion = 0x02 +) + +type peer struct { + host *Whisper + peer *p2p.Peer + ws p2p.MsgReadWriter + + // XXX Eventually this is going to reach exceptional large space. We need an expiry here + known *set.Set + + quit chan struct{} +} + +func NewPeer(host *Whisper, p *p2p.Peer, ws p2p.MsgReadWriter) *peer { + return &peer{host, p, ws, set.New(), make(chan struct{})} +} + +func (self *peer) init() error { + if err := self.handleStatus(); err != nil { + return err + } + + return nil +} + +func (self *peer) start() { + go self.update() +} + +func (self *peer) update() { + relay := time.NewTicker(300 * time.Millisecond) +out: + for { + select { + case <-relay.C: + err := self.broadcast(self.host.envelopes()) + if err != nil { + self.peer.Infoln(err) + break out + } + + case <-self.quit: + break out + } + } +} + +func (self *peer) broadcast(envelopes []*Envelope) error { + envs := make([]interface{}, len(envelopes)) + i := 0 + for _, envelope := range envelopes { + if !self.known.Has(envelope.Hash()) { + envs[i] = envelope + self.known.Add(envelope.Hash()) + i++ + } + } + + msg := p2p.NewMsg(envelopesMsg, envs[:i]...) + if err := self.ws.WriteMsg(msg); err != nil { + return err + } + + return nil +} + +func (self *peer) handleStatus() error { + ws := self.ws + + if err := ws.WriteMsg(self.statusMsg()); err != nil { + return err + } + + msg, err := ws.ReadMsg() + if err != nil { + return err + } + + if msg.Code != statusMsg { + return fmt.Errorf("peer send %x before status msg", msg.Code) + } + + data, err := ioutil.ReadAll(msg.Payload) + if err != nil { + return err + } + + if len(data) == 0 { + return fmt.Errorf("malformed status. data len = 0") + } + + if pv := data[0]; pv != protocolVersion { + return fmt.Errorf("protocol version mismatch %d != %d", pv, protocolVersion) + } + + return nil +} + +func (self *peer) statusMsg() p2p.Msg { + return p2p.NewMsg(statusMsg, protocolVersion) +} diff --git a/whisper/sort.go b/whisper/sort.go new file mode 100644 index 000000000..8c5b46e9e --- /dev/null +++ b/whisper/sort.go @@ -0,0 +1,25 @@ +package whisper + +import "sort" + +type sortedKeys struct { + k []int32 +} + +func (self *sortedKeys) Len() int { return len(self.k) } +func (self *sortedKeys) Less(i, j int) bool { return self.k[i] < self.k[j] } +func (self *sortedKeys) Swap(i, j int) { self.k[i], self.k[j] = self.k[j], self.k[i] } + +func sortKeys(m map[int32]Hash) []int32 { + sorted := new(sortedKeys) + sorted.k = make([]int32, len(m)) + i := 0 + for key, _ := range m { + sorted.k[i] = key + i++ + } + + sort.Sort(sorted) + + return sorted.k +} diff --git a/whisper/sort_test.go b/whisper/sort_test.go new file mode 100644 index 000000000..5d8177d41 --- /dev/null +++ b/whisper/sort_test.go @@ -0,0 +1,19 @@ +package whisper + +import "testing" + +func TestSorting(t *testing.T) { + m := map[int32]Hash{ + 1: HS("1"), + 3: HS("3"), + 2: HS("2"), + 5: HS("5"), + } + exp := []int32{1, 2, 3, 5} + res := sortKeys(m) + for i, k := range res { + if k != exp[i] { + t.Error(k, "failed. Expected", exp[i]) + } + } +} diff --git a/whisper/whisper.go b/whisper/whisper.go new file mode 100644 index 000000000..692e6bc2c --- /dev/null +++ b/whisper/whisper.go @@ -0,0 +1,157 @@ +package whisper + +import ( + "fmt" + "sync" + "time" + + "github.com/ethereum/go-ethereum/p2p" + "gopkg.in/fatih/set.v0" +) + +// MOVE ME +type Hash struct { + hash string +} + +var EmptyHash Hash + +func H(hash []byte) Hash { + return Hash{string(hash)} +} +func HS(hash string) Hash { + return Hash{hash} +} + +// MOVE ME END + +const ( + statusMsg = 0x0 + envelopesMsg = 0x01 +) + +type Whisper struct { + pub, sec []byte + protocol p2p.Protocol + + mmu sync.RWMutex + messages map[Hash]*Envelope + expiry map[int32]*set.SetNonTS + + quit chan struct{} +} + +func New(pub, sec []byte) *Whisper { + whisper := &Whisper{ + pub: pub, + sec: sec, + messages: make(map[Hash]*Envelope), + expiry: make(map[int32]*set.SetNonTS), + quit: make(chan struct{}), + } + go whisper.update() + + // p2p whisper sub protocol handler + whisper.protocol = p2p.Protocol{ + Name: "shh", + Version: 2, + Length: 2, + Run: whisper.msgHandler, + } + + return whisper +} + +func (self *Whisper) Stop() { + close(self.quit) +} + +func (self *Whisper) Send(ttl time.Duration, topics [][]byte, data *Message) { + envelope := NewEnvelope(ttl, topics, data) + envelope.Seal() + + self.add(envelope) +} + +func (self *Whisper) msgHandler(peer *p2p.Peer, ws p2p.MsgReadWriter) error { + wpeer := NewPeer(self, peer, ws) + if err := wpeer.init(); err != nil { + return err + } + go wpeer.start() + + for { + msg, err := ws.ReadMsg() + if err != nil { + return err + } + + envelope, err := NewEnvelopeFromReader(msg.Payload) + if err != nil { + peer.Infoln(err) + continue + } + + self.add(envelope) + } +} + +func (self *Whisper) add(envelope *Envelope) { + self.mmu.Lock() + defer self.mmu.Unlock() + + fmt.Println("received envelope", envelope) + self.messages[envelope.Hash()] = envelope + if self.expiry[envelope.Expiry] == nil { + self.expiry[envelope.Expiry] = set.NewNonTS() + } + self.expiry[envelope.Expiry].Add(envelope.Hash()) +} + +func (self *Whisper) update() { + expire := time.NewTicker(800 * time.Millisecond) +out: + for { + select { + case <-expire.C: + self.expire() + case <-self.quit: + break out + } + } +} +func (self *Whisper) expire() { + self.mmu.Lock() + defer self.mmu.Unlock() + + now := int32(time.Now().Unix()) + for then, hashSet := range self.expiry { + if then > now { + continue + } + + hashSet.Each(func(v interface{}) bool { + delete(self.messages, v.(Hash)) + return true + }) + self.expiry[then].Clear() + } +} + +func (self *Whisper) envelopes() (envelopes []*Envelope) { + self.mmu.RLock() + defer self.mmu.RUnlock() + + envelopes = make([]*Envelope, len(self.messages)) + i := 0 + for _, envelope := range self.messages { + envelopes[i] = envelope + i++ + } + + return +} + +func (self *Whisper) Protocol() p2p.Protocol { + return self.protocol +} -- cgit v1.2.3 From e3a8412df3fe75fe498a3fce64fd2fd691a18183 Mon Sep 17 00:00:00 2001 From: obscuren Date: Mon, 8 Dec 2014 13:16:50 +0100 Subject: Proper start/stoping wpeers --- whisper/peer.go | 15 ++++++++++++--- whisper/whisper.go | 14 ++++++++++++-- 2 files changed, 24 insertions(+), 5 deletions(-) (limited to 'whisper') diff --git a/whisper/peer.go b/whisper/peer.go index 5fe50ba59..3471ddb2f 100644 --- a/whisper/peer.go +++ b/whisper/peer.go @@ -38,6 +38,13 @@ func (self *peer) init() error { func (self *peer) start() { go self.update() + self.peer.Infoln("whisper started") +} + +func (self *peer) stop() { + self.peer.Infoln("whisper stopped") + + close(self.quit) } func (self *peer) update() { @@ -69,9 +76,11 @@ func (self *peer) broadcast(envelopes []*Envelope) error { } } - msg := p2p.NewMsg(envelopesMsg, envs[:i]...) - if err := self.ws.WriteMsg(msg); err != nil { - return err + if i > 0 { + msg := p2p.NewMsg(envelopesMsg, envs[:i]...) + if err := self.ws.WriteMsg(msg); err != nil { + return err + } } return nil diff --git a/whisper/whisper.go b/whisper/whisper.go index 692e6bc2c..255bd2152 100644 --- a/whisper/whisper.go +++ b/whisper/whisper.go @@ -1,7 +1,7 @@ package whisper import ( - "fmt" + "bytes" "sync" "time" @@ -23,6 +23,10 @@ func HS(hash string) Hash { return Hash{hash} } +func (self Hash) Compare(other Hash) int { + return bytes.Compare([]byte(self.hash), []byte(other.hash)) +} + // MOVE ME END const ( @@ -73,13 +77,18 @@ func (self *Whisper) Send(ttl time.Duration, topics [][]byte, data *Message) { self.add(envelope) } +// Main handler for passing whisper messages to whisper peer objects func (self *Whisper) msgHandler(peer *p2p.Peer, ws p2p.MsgReadWriter) error { wpeer := NewPeer(self, peer, ws) + // init whisper peer (handshake/status) if err := wpeer.init(); err != nil { return err } + // kick of the main handler for broadcasting/managing envelopes go wpeer.start() + defer wpeer.stop() + // Main *read* loop. Writing is done by the peer it self. for { msg, err := ws.ReadMsg() if err != nil { @@ -96,11 +105,11 @@ func (self *Whisper) msgHandler(peer *p2p.Peer, ws p2p.MsgReadWriter) error { } } +// takes care of adding envelopes to the messages pool. At this moment no sanity checks are being performed. func (self *Whisper) add(envelope *Envelope) { self.mmu.Lock() defer self.mmu.Unlock() - fmt.Println("received envelope", envelope) self.messages[envelope.Hash()] = envelope if self.expiry[envelope.Expiry] == nil { self.expiry[envelope.Expiry] = set.NewNonTS() @@ -120,6 +129,7 @@ out: } } } + func (self *Whisper) expire() { self.mmu.Lock() defer self.mmu.Unlock() -- cgit v1.2.3 From 76842b0df8b5605682362bd57fbd6eb315bcaf1f Mon Sep 17 00:00:00 2001 From: obscuren Date: Mon, 8 Dec 2014 14:25:52 +0100 Subject: Additional checks and debug output --- whisper/envelope.go | 4 ++-- whisper/peer.go | 5 +++++ whisper/whisper.go | 7 +++++++ 3 files changed, 14 insertions(+), 2 deletions(-) (limited to 'whisper') diff --git a/whisper/envelope.go b/whisper/envelope.go index f9254843c..8e66a7bbb 100644 --- a/whisper/envelope.go +++ b/whisper/envelope.go @@ -12,7 +12,7 @@ import ( ) const ( - DefaultTtl = 50 * time.Second + DefaultPow = 50 * time.Millisecond ) type Envelope struct { @@ -56,7 +56,7 @@ func NewEnvelope(ttl time.Duration, topics [][]byte, data *Message) *Envelope { } func (self *Envelope) Seal() { - self.proveWork(DefaultTtl) + self.proveWork(DefaultPow) } func (self *Envelope) proveWork(dura time.Duration) { diff --git a/whisper/peer.go b/whisper/peer.go index 3471ddb2f..d42b374b5 100644 --- a/whisper/peer.go +++ b/whisper/peer.go @@ -81,11 +81,16 @@ func (self *peer) broadcast(envelopes []*Envelope) error { if err := self.ws.WriteMsg(msg); err != nil { return err } + self.peer.Infoln("broadcasted", i, "message(s)") } return nil } +func (self *peer) addKnown(envelope *Envelope) { + self.known.Add(envelope.Hash()) +} + func (self *peer) handleStatus() error { ws := self.ws diff --git a/whisper/whisper.go b/whisper/whisper.go index 255bd2152..78e4d4848 100644 --- a/whisper/whisper.go +++ b/whisper/whisper.go @@ -2,6 +2,7 @@ package whisper import ( "bytes" + "fmt" "sync" "time" @@ -34,6 +35,8 @@ const ( envelopesMsg = 0x01 ) +const defaultTtl = 50 * time.Second + type Whisper struct { pub, sec []byte protocol p2p.Protocol @@ -55,6 +58,8 @@ func New(pub, sec []byte) *Whisper { } go whisper.update() + whisper.Send(defaultTtl, nil, NewMessage([]byte("Hello world. This is whisper-go"))) + // p2p whisper sub protocol handler whisper.protocol = p2p.Protocol{ Name: "shh", @@ -102,6 +107,7 @@ func (self *Whisper) msgHandler(peer *p2p.Peer, ws p2p.MsgReadWriter) error { } self.add(envelope) + wpeer.addKnown(envelope) } } @@ -110,6 +116,7 @@ func (self *Whisper) add(envelope *Envelope) { self.mmu.Lock() defer self.mmu.Unlock() + fmt.Println("add", envelope) self.messages[envelope.Hash()] = envelope if self.expiry[envelope.Expiry] == nil { self.expiry[envelope.Expiry] = set.NewNonTS() -- cgit v1.2.3 From 984c7e6689f720e0b60a462e00793364de397909 Mon Sep 17 00:00:00 2001 From: obscuren Date: Wed, 10 Dec 2014 00:03:50 +0100 Subject: Added encryption for messages better API for sealing messages --- whisper/envelope.go | 10 +++++----- whisper/message.go | 49 +++++++++++++++++++++++++++++++++++++++++++++++++ whisper/whisper.go | 51 +++++++++++++++++++++++++++++++++++---------------- 3 files changed, 89 insertions(+), 21 deletions(-) (limited to 'whisper') diff --git a/whisper/envelope.go b/whisper/envelope.go index 8e66a7bbb..eb80098ad 100644 --- a/whisper/envelope.go +++ b/whisper/envelope.go @@ -16,8 +16,8 @@ const ( ) type Envelope struct { - Expiry int32 // Whisper protocol specifies int32, really should be int64 - Ttl int32 // ^^^^^^ + Expiry uint32 // Whisper protocol specifies int32, really should be int64 + Ttl uint32 // ^^^^^^ Topics [][]byte Data []byte Nonce uint32 @@ -52,11 +52,11 @@ func (self *Envelope) Hash() Hash { func NewEnvelope(ttl time.Duration, topics [][]byte, data *Message) *Envelope { exp := time.Now().Add(ttl) - return &Envelope{int32(exp.Unix()), int32(ttl.Seconds()), topics, data.Bytes(), 0, Hash{}} + return &Envelope{uint32(exp.Unix()), uint32(ttl.Seconds()), topics, data.Bytes(), 0, Hash{}} } -func (self *Envelope) Seal() { - self.proveWork(DefaultPow) +func (self *Envelope) Seal(pow time.Duration) { + self.proveWork(pow) } func (self *Envelope) proveWork(dura time.Duration) { diff --git a/whisper/message.go b/whisper/message.go index 21cf163e6..408b9f7df 100644 --- a/whisper/message.go +++ b/whisper/message.go @@ -1,5 +1,11 @@ package whisper +import ( + "time" + + "github.com/ethereum/go-ethereum/crypto" +) + type Message struct { Flags byte Signature []byte @@ -10,6 +16,49 @@ func NewMessage(payload []byte) *Message { return &Message{Flags: 0, Payload: payload} } +func (self *Message) hash() []byte { + return crypto.Sha3(append([]byte{self.Flags}, self.Payload...)) +} + +func (self *Message) sign(key []byte) (err error) { + self.Flags = 1 + self.Signature, err = crypto.Sign(self.hash(), key) + return +} + +func (self *Message) Encrypt(from, to []byte) (err error) { + err = self.sign(from) + if err != nil { + return err + } + + self.Payload, err = crypto.Encrypt(to, self.Payload) + if err != nil { + return err + } + + return nil +} + func (self *Message) Bytes() []byte { return append([]byte{self.Flags}, append(self.Signature, self.Payload...)...) } + +type Opts struct { + From, To []byte // private(sender), public(receiver) key + Ttl time.Duration + Topics [][]byte +} + +func (self *Message) Seal(pow time.Duration, opts Opts) (*Envelope, error) { + if len(opts.To) > 0 && len(opts.From) > 0 { + if err := self.Encrypt(opts.From, opts.To); err != nil { + return nil, err + } + } + + envelope := NewEnvelope(DefaultTtl, opts.Topics, self) + envelope.Seal(pow) + + return envelope, nil +} diff --git a/whisper/whisper.go b/whisper/whisper.go index 78e4d4848..b4e37b959 100644 --- a/whisper/whisper.go +++ b/whisper/whisper.go @@ -2,6 +2,7 @@ package whisper import ( "bytes" + "errors" "fmt" "sync" "time" @@ -35,7 +36,7 @@ const ( envelopesMsg = 0x01 ) -const defaultTtl = 50 * time.Second +const DefaultTtl = 50 * time.Second type Whisper struct { pub, sec []byte @@ -43,7 +44,7 @@ type Whisper struct { mmu sync.RWMutex messages map[Hash]*Envelope - expiry map[int32]*set.SetNonTS + expiry map[uint32]*set.SetNonTS quit chan struct{} } @@ -53,12 +54,18 @@ func New(pub, sec []byte) *Whisper { pub: pub, sec: sec, messages: make(map[Hash]*Envelope), - expiry: make(map[int32]*set.SetNonTS), + expiry: make(map[uint32]*set.SetNonTS), quit: make(chan struct{}), } go whisper.update() - whisper.Send(defaultTtl, nil, NewMessage([]byte("Hello world. This is whisper-go"))) + msg := NewMessage([]byte(fmt.Sprintf("Hello world. This is whisper-go. Incase you're wondering; the time is %v", time.Now()))) + envelope, _ := msg.Seal(DefaultPow, Opts{ + Ttl: DefaultTtl, + }) + if err := whisper.Send(envelope); err != nil { + fmt.Println(err) + } // p2p whisper sub protocol handler whisper.protocol = p2p.Protocol{ @@ -75,17 +82,14 @@ func (self *Whisper) Stop() { close(self.quit) } -func (self *Whisper) Send(ttl time.Duration, topics [][]byte, data *Message) { - envelope := NewEnvelope(ttl, topics, data) - envelope.Seal() - - self.add(envelope) +func (self *Whisper) Send(envelope *Envelope) error { + return self.add(envelope) } // Main handler for passing whisper messages to whisper peer objects func (self *Whisper) msgHandler(peer *p2p.Peer, ws p2p.MsgReadWriter) error { wpeer := NewPeer(self, peer, ws) - // init whisper peer (handshake/status) + // initialise whisper peer (handshake/status) if err := wpeer.init(); err != nil { return err } @@ -106,22 +110,37 @@ func (self *Whisper) msgHandler(peer *p2p.Peer, ws p2p.MsgReadWriter) error { continue } - self.add(envelope) + if err := self.add(envelope); err != nil { + // TODO Punish peer here. Invalid envelope. + peer.Infoln(err) + } wpeer.addKnown(envelope) } } // takes care of adding envelopes to the messages pool. At this moment no sanity checks are being performed. -func (self *Whisper) add(envelope *Envelope) { +func (self *Whisper) add(envelope *Envelope) error { + if !envelope.valid() { + return errors.New("invalid pow for envelope") + } + self.mmu.Lock() defer self.mmu.Unlock() - fmt.Println("add", envelope) - self.messages[envelope.Hash()] = envelope + hash := envelope.Hash() + self.messages[hash] = envelope if self.expiry[envelope.Expiry] == nil { self.expiry[envelope.Expiry] = set.NewNonTS() } - self.expiry[envelope.Expiry].Add(envelope.Hash()) + + if !self.expiry[envelope.Expiry].Has(hash) { + self.expiry[envelope.Expiry].Add(hash) + // TODO notify listeners (given that we had any ...) + } + + fmt.Println("add", envelope) + + return nil } func (self *Whisper) update() { @@ -141,7 +160,7 @@ func (self *Whisper) expire() { self.mmu.Lock() defer self.mmu.Unlock() - now := int32(time.Now().Unix()) + now := uint32(time.Now().Unix()) for then, hashSet := range self.expiry { if then > now { continue -- cgit v1.2.3 From dda778eda7ad9b94acf14c3c91c1c29e711e170f Mon Sep 17 00:00:00 2001 From: obscuren Date: Wed, 10 Dec 2014 14:17:32 +0100 Subject: Updated whisper messages to new crypto api + added tests --- whisper/envelope.go | 20 +++++++++++++++++++ whisper/main.go | 2 +- whisper/message.go | 18 +++++++++++------ whisper/messages_test.go | 51 ++++++++++++++++++++++++++++++++++++++++++++++++ whisper/whisper.go | 9 +++++---- 5 files changed, 89 insertions(+), 11 deletions(-) create mode 100644 whisper/messages_test.go (limited to 'whisper') diff --git a/whisper/envelope.go b/whisper/envelope.go index eb80098ad..359fa1568 100644 --- a/whisper/envelope.go +++ b/whisper/envelope.go @@ -2,7 +2,9 @@ package whisper import ( "bytes" + "crypto/ecdsa" "encoding/binary" + "fmt" "io" "time" @@ -59,6 +61,24 @@ func (self *Envelope) Seal(pow time.Duration) { self.proveWork(pow) } +func (self *Envelope) Open(prv *ecdsa.PrivateKey) (*Message, error) { + data := self.Data + if data[0] > 0 && len(data) < 66 { + return nil, fmt.Errorf("unable to open envelope. First bit set but len(data) < 66") + } + + if data[0] > 0 { + payload, err := crypto.Decrypt(prv, data[66:]) + if err != nil { + return nil, fmt.Errorf("unable to open envelope. Decrypt failed: %v", err) + } + + return NewMessage(payload), nil + } + + return NewMessage(data[1:]), nil +} + func (self *Envelope) proveWork(dura time.Duration) { var bestBit int d := make([]byte, 64) diff --git a/whisper/main.go b/whisper/main.go index 3868f604f..80050d899 100644 --- a/whisper/main.go +++ b/whisper/main.go @@ -19,7 +19,7 @@ func main() { pub, sec := secp256k1.GenerateKeyPair() - whisper := whisper.New(pub, sec) + whisper := whisper.New(sec) srv := p2p.Server{ MaxPeers: 10, diff --git a/whisper/message.go b/whisper/message.go index 408b9f7df..8ce5d880b 100644 --- a/whisper/message.go +++ b/whisper/message.go @@ -1,6 +1,7 @@ package whisper import ( + "crypto/ecdsa" "time" "github.com/ethereum/go-ethereum/crypto" @@ -20,13 +21,17 @@ func (self *Message) hash() []byte { return crypto.Sha3(append([]byte{self.Flags}, self.Payload...)) } -func (self *Message) sign(key []byte) (err error) { +func (self *Message) sign(key *ecdsa.PrivateKey) (err error) { self.Flags = 1 self.Signature, err = crypto.Sign(self.hash(), key) return } -func (self *Message) Encrypt(from, to []byte) (err error) { +func (self *Message) Recover() *ecdsa.PublicKey { + return crypto.SigToPub(self.hash(), self.Signature) +} + +func (self *Message) Encrypt(from *ecdsa.PrivateKey, to *ecdsa.PublicKey) (err error) { err = self.sign(from) if err != nil { return err @@ -45,13 +50,14 @@ func (self *Message) Bytes() []byte { } type Opts struct { - From, To []byte // private(sender), public(receiver) key - Ttl time.Duration - Topics [][]byte + From *ecdsa.PrivateKey + To *ecdsa.PublicKey + Ttl time.Duration + Topics [][]byte } func (self *Message) Seal(pow time.Duration, opts Opts) (*Envelope, error) { - if len(opts.To) > 0 && len(opts.From) > 0 { + if opts.To != nil && opts.From != nil { if err := self.Encrypt(opts.From, opts.To); err != nil { return nil, err } diff --git a/whisper/messages_test.go b/whisper/messages_test.go new file mode 100644 index 000000000..cba103011 --- /dev/null +++ b/whisper/messages_test.go @@ -0,0 +1,51 @@ +package whisper + +import ( + "bytes" + "crypto/elliptic" + "fmt" + "testing" + + "github.com/ethereum/go-ethereum/crypto" +) + +func TestSign(t *testing.T) { + prv, _ := crypto.GenerateKey() + msg := NewMessage([]byte("hello world")) + msg.sign(prv) + + pubKey := msg.Recover() + p1 := elliptic.Marshal(crypto.S256(), prv.PublicKey.X, prv.PublicKey.Y) + p2 := elliptic.Marshal(crypto.S256(), pubKey.X, pubKey.Y) + + if !bytes.Equal(p1, p2) { + t.Error("recovered pub key did not match") + } +} + +func TestMessageEncryptDecrypt(t *testing.T) { + prv1, _ := crypto.GenerateKey() + prv2, _ := crypto.GenerateKey() + + data := []byte("hello world") + msg := NewMessage(data) + envelope, err := msg.Seal(DefaultPow, Opts{ + From: prv1, + To: &prv2.PublicKey, + }) + if err != nil { + fmt.Println(err) + t.FailNow() + } + + msg1, err := envelope.Open(prv2) + if err != nil { + fmt.Println(err) + t.FailNow() + } + + if !bytes.Equal(msg1.Payload, data) { + fmt.Println("encryption error. data did not match") + t.FailNow() + } +} diff --git a/whisper/whisper.go b/whisper/whisper.go index b4e37b959..4d7a2a23e 100644 --- a/whisper/whisper.go +++ b/whisper/whisper.go @@ -2,11 +2,13 @@ package whisper import ( "bytes" + "crypto/ecdsa" "errors" "fmt" "sync" "time" + "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/p2p" "gopkg.in/fatih/set.v0" ) @@ -39,7 +41,7 @@ const ( const DefaultTtl = 50 * time.Second type Whisper struct { - pub, sec []byte + key *ecdsa.PrivateKey protocol p2p.Protocol mmu sync.RWMutex @@ -49,10 +51,9 @@ type Whisper struct { quit chan struct{} } -func New(pub, sec []byte) *Whisper { +func New(sec []byte) *Whisper { whisper := &Whisper{ - pub: pub, - sec: sec, + key: crypto.ToECDSA(sec), messages: make(map[Hash]*Envelope), expiry: make(map[uint32]*set.SetNonTS), quit: make(chan struct{}), -- cgit v1.2.3 From a17a1f9208f858601f6660dbd7f1b77dd9a3f3d9 Mon Sep 17 00:00:00 2001 From: obscuren Date: Fri, 12 Dec 2014 22:23:42 +0100 Subject: Implemented watching using filter package * Added filters / watches * Removed event dep --- whisper/envelope.go | 23 ++++++++++------- whisper/filter.go | 10 ++++++++ whisper/main.go | 5 ++-- whisper/message.go | 20 +++++++++------ whisper/whisper.go | 66 +++++++++++++++++++++++++++++++++++++++++++------ whisper/whisper_test.go | 47 +++++++++++++++++++++++++++++++++++ 6 files changed, 145 insertions(+), 26 deletions(-) create mode 100644 whisper/filter.go create mode 100644 whisper/whisper_test.go (limited to 'whisper') diff --git a/whisper/envelope.go b/whisper/envelope.go index 359fa1568..683e88128 100644 --- a/whisper/envelope.go +++ b/whisper/envelope.go @@ -61,22 +61,27 @@ func (self *Envelope) Seal(pow time.Duration) { self.proveWork(pow) } -func (self *Envelope) Open(prv *ecdsa.PrivateKey) (*Message, error) { +func (self *Envelope) Open(prv *ecdsa.PrivateKey) (msg *Message, err error) { data := self.Data - if data[0] > 0 && len(data) < 66 { - return nil, fmt.Errorf("unable to open envelope. First bit set but len(data) < 66") - } - + var message Message + dataStart := 1 if data[0] > 0 { - payload, err := crypto.Decrypt(prv, data[66:]) + if len(data) < 66 { + return nil, fmt.Errorf("unable to open envelope. First bit set but len(data) < 66") + } + dataStart = 66 + message.Flags = data[0] + message.Signature = data[1:66] + } + message.Payload = data[dataStart:] + if prv != nil { + message.Payload, err = crypto.Decrypt(prv, message.Payload) if err != nil { return nil, fmt.Errorf("unable to open envelope. Decrypt failed: %v", err) } - - return NewMessage(payload), nil } - return NewMessage(data[1:]), nil + return &message, nil } func (self *Envelope) proveWork(dura time.Duration) { diff --git a/whisper/filter.go b/whisper/filter.go new file mode 100644 index 000000000..4315aa556 --- /dev/null +++ b/whisper/filter.go @@ -0,0 +1,10 @@ +package whisper + +import "crypto/ecdsa" + +type Filter struct { + To *ecdsa.PrivateKey + From *ecdsa.PublicKey + Topics [][]byte + Fn func(*Message) +} diff --git a/whisper/main.go b/whisper/main.go index 80050d899..2ee2f3ff1 100644 --- a/whisper/main.go +++ b/whisper/main.go @@ -8,6 +8,7 @@ import ( "net" "os" + "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/whisper" @@ -17,9 +18,9 @@ import ( func main() { logger.AddLogSystem(logger.NewStdLogSystem(os.Stdout, log.LstdFlags, logger.InfoLevel)) - pub, sec := secp256k1.GenerateKeyPair() + pub, _ := secp256k1.GenerateKeyPair() - whisper := whisper.New(sec) + whisper := whisper.New(&event.TypeMux{}) srv := p2p.Server{ MaxPeers: 10, diff --git a/whisper/message.go b/whisper/message.go index 8ce5d880b..db0110b4a 100644 --- a/whisper/message.go +++ b/whisper/message.go @@ -28,15 +28,11 @@ func (self *Message) sign(key *ecdsa.PrivateKey) (err error) { } func (self *Message) Recover() *ecdsa.PublicKey { + defer func() { recover() }() // in case of invalid sig return crypto.SigToPub(self.hash(), self.Signature) } -func (self *Message) Encrypt(from *ecdsa.PrivateKey, to *ecdsa.PublicKey) (err error) { - err = self.sign(from) - if err != nil { - return err - } - +func (self *Message) Encrypt(to *ecdsa.PublicKey) (err error) { self.Payload, err = crypto.Encrypt(to, self.Payload) if err != nil { return err @@ -57,8 +53,16 @@ type Opts struct { } func (self *Message) Seal(pow time.Duration, opts Opts) (*Envelope, error) { - if opts.To != nil && opts.From != nil { - if err := self.Encrypt(opts.From, opts.To); err != nil { + if opts.From != nil { + err := self.sign(opts.From) + if err != nil { + return nil, err + } + } + + if opts.To != nil { + err := self.Encrypt(opts.To) + if err != nil { return nil, err } } diff --git a/whisper/whisper.go b/whisper/whisper.go index 4d7a2a23e..356debd1c 100644 --- a/whisper/whisper.go +++ b/whisper/whisper.go @@ -9,6 +9,7 @@ import ( "time" "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/event/filter" "github.com/ethereum/go-ethereum/p2p" "gopkg.in/fatih/set.v0" ) @@ -38,28 +39,38 @@ const ( envelopesMsg = 0x01 ) +type MessageEvent struct { + To *ecdsa.PrivateKey + From *ecdsa.PublicKey + Message *Message +} + const DefaultTtl = 50 * time.Second type Whisper struct { - key *ecdsa.PrivateKey protocol p2p.Protocol + filters *filter.Filters mmu sync.RWMutex messages map[Hash]*Envelope expiry map[uint32]*set.SetNonTS quit chan struct{} + + keys []*ecdsa.PrivateKey } -func New(sec []byte) *Whisper { +func New() *Whisper { whisper := &Whisper{ - key: crypto.ToECDSA(sec), messages: make(map[Hash]*Envelope), + filters: filter.New(), expiry: make(map[uint32]*set.SetNonTS), quit: make(chan struct{}), } + whisper.filters.Start() go whisper.update() + // XXX TODO REMOVE TESTING CODE msg := NewMessage([]byte(fmt.Sprintf("Hello world. This is whisper-go. Incase you're wondering; the time is %v", time.Now()))) envelope, _ := msg.Seal(DefaultPow, Opts{ Ttl: DefaultTtl, @@ -67,6 +78,7 @@ func New(sec []byte) *Whisper { if err := whisper.Send(envelope); err != nil { fmt.Println(err) } + // XXX TODO REMOVE TESTING CODE // p2p whisper sub protocol handler whisper.protocol = p2p.Protocol{ @@ -87,6 +99,35 @@ func (self *Whisper) Send(envelope *Envelope) error { return self.add(envelope) } +func (self *Whisper) NewIdentity() *ecdsa.PrivateKey { + key, err := crypto.GenerateKey() + if err != nil { + panic(err) + } + self.keys = append(self.keys, key) + + return key +} + +func (self *Whisper) HasIdentity(key *ecdsa.PrivateKey) bool { + for _, key := range self.keys { + if key.D.Cmp(key.D) == 0 { + return true + } + } + return false +} + +func (self *Whisper) Watch(opts Filter) int { + return self.filters.Install(filter.Generic{ + Str1: string(crypto.FromECDSA(opts.To)), + Str2: string(crypto.FromECDSAPub(opts.From)), + Fn: func(data interface{}) { + opts.Fn(data.(*Message)) + }, + }) +} + // Main handler for passing whisper messages to whisper peer objects func (self *Whisper) msgHandler(peer *p2p.Peer, ws p2p.MsgReadWriter) error { wpeer := NewPeer(self, peer, ws) @@ -122,7 +163,7 @@ func (self *Whisper) msgHandler(peer *p2p.Peer, ws p2p.MsgReadWriter) error { // takes care of adding envelopes to the messages pool. At this moment no sanity checks are being performed. func (self *Whisper) add(envelope *Envelope) error { if !envelope.valid() { - return errors.New("invalid pow for envelope") + return errors.New("invalid pow provided for envelope") } self.mmu.Lock() @@ -136,11 +177,9 @@ func (self *Whisper) add(envelope *Envelope) error { if !self.expiry[envelope.Expiry].Has(hash) { self.expiry[envelope.Expiry].Add(hash) - // TODO notify listeners (given that we had any ...) + self.postEvent(envelope) } - fmt.Println("add", envelope) - return nil } @@ -189,6 +228,19 @@ func (self *Whisper) envelopes() (envelopes []*Envelope) { return } +func (self *Whisper) postEvent(envelope *Envelope) { + for _, key := range self.keys { + if message, err := envelope.Open(key); err == nil { + // Create a custom filter? + self.filters.Notify(filter.Generic{ + Str1: string(crypto.FromECDSA(key)), Str2: string(crypto.FromECDSAPub(message.Recover())), + }, message) + } else { + fmt.Println(err) + } + } +} + func (self *Whisper) Protocol() p2p.Protocol { return self.protocol } diff --git a/whisper/whisper_test.go b/whisper/whisper_test.go new file mode 100644 index 000000000..107cb8c97 --- /dev/null +++ b/whisper/whisper_test.go @@ -0,0 +1,47 @@ +package whisper + +import ( + "fmt" + "testing" + "time" +) + +func TestKeyManagement(t *testing.T) { + whisper := New() + + key := whisper.NewIdentity() + if !whisper.HasIdentity(key) { + t.Error("expected whisper to have identify") + } +} + +func TestEvent(t *testing.T) { + res := make(chan *Message, 1) + whisper := New() + id := whisper.NewIdentity() + whisper.Watch(Filter{ + To: id, + Fn: func(msg *Message) { + res <- msg + }, + }) + + msg := NewMessage([]byte(fmt.Sprintf("Hello world. This is whisper-go. Incase you're wondering; the time is %v", time.Now()))) + envelope, err := msg.Seal(DefaultPow, Opts{ + Ttl: DefaultTtl, + From: id, + To: &id.PublicKey, + }) + if err != nil { + fmt.Println(err) + t.FailNow() + } + + tick := time.NewTicker(time.Second) + whisper.postEvent(envelope) + select { + case <-res: + case <-tick.C: + t.Error("did not receive message") + } +} -- cgit v1.2.3 From ef4135eabe5cb25f8972371c5681e1611ce0cde9 Mon Sep 17 00:00:00 2001 From: obscuren Date: Mon, 15 Dec 2014 16:12:34 +0100 Subject: Added topic utility functions to whisper --- whisper/util.go | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) create mode 100644 whisper/util.go (limited to 'whisper') diff --git a/whisper/util.go b/whisper/util.go new file mode 100644 index 000000000..abef1d667 --- /dev/null +++ b/whisper/util.go @@ -0,0 +1,27 @@ +package whisper + +import "github.com/ethereum/go-ethereum/crypto" + +func hashTopic(topic []byte) []byte { + return crypto.Sha3(topic)[:4] +} + +// NOTE this isn't DRY, but I don't want to iterate twice. + +// Returns a formatted topics byte slice. +// data: unformatted data (e.g., no hashes needed) +func Topics(data [][]byte) [][]byte { + d := make([][]byte, len(data)) + for i, byts := range data { + d[i] = hashTopic(byts) + } + return d +} + +func TopicsFromString(data []string) [][]byte { + d := make([][]byte, len(data)) + for i, str := range data { + d[i] = hashTopic([]byte(str)) + } + return d +} -- cgit v1.2.3