diff options
Diffstat (limited to 'whisper')
-rw-r--r-- | whisper/envelope.go | 121 | ||||
-rw-r--r-- | whisper/filter.go | 10 | ||||
-rw-r--r-- | whisper/main.go | 47 | ||||
-rw-r--r-- | whisper/message.go | 74 | ||||
-rw-r--r-- | whisper/messages_test.go | 51 | ||||
-rw-r--r-- | whisper/peer.go | 128 | ||||
-rw-r--r-- | whisper/sort.go | 25 | ||||
-rw-r--r-- | whisper/sort_test.go | 19 | ||||
-rw-r--r-- | whisper/util.go | 27 | ||||
-rw-r--r-- | whisper/whisper.go | 246 | ||||
-rw-r--r-- | whisper/whisper_test.go | 47 |
11 files changed, 795 insertions, 0 deletions
diff --git a/whisper/envelope.go b/whisper/envelope.go new file mode 100644 index 000000000..683e88128 --- /dev/null +++ b/whisper/envelope.go @@ -0,0 +1,121 @@ +package whisper + +import ( + "bytes" + "crypto/ecdsa" + "encoding/binary" + "fmt" + "io" + "time" + + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/ethutil" + "github.com/ethereum/go-ethereum/rlp" +) + +const ( + DefaultPow = 50 * time.Millisecond +) + +type Envelope struct { + Expiry uint32 // Whisper protocol specifies int32, really should be int64 + Ttl uint32 // ^^^^^^ + Topics [][]byte + Data []byte + Nonce uint32 + + hash Hash +} + +func NewEnvelopeFromReader(reader io.Reader) (*Envelope, error) { + var envelope Envelope + + buf := new(bytes.Buffer) + buf.ReadFrom(reader) + + h := H(crypto.Sha3(buf.Bytes())) + if err := rlp.Decode(buf, &envelope); err != nil { + return nil, err + } + + envelope.hash = h + + return &envelope, nil +} + +func (self *Envelope) Hash() Hash { + if self.hash == EmptyHash { + self.hash = H(crypto.Sha3(ethutil.Encode(self))) + } + + return self.hash +} + +func NewEnvelope(ttl time.Duration, topics [][]byte, data *Message) *Envelope { + exp := time.Now().Add(ttl) + + return &Envelope{uint32(exp.Unix()), uint32(ttl.Seconds()), topics, data.Bytes(), 0, Hash{}} +} + +func (self *Envelope) Seal(pow time.Duration) { + self.proveWork(pow) +} + +func (self *Envelope) Open(prv *ecdsa.PrivateKey) (msg *Message, err error) { + data := self.Data + var message Message + dataStart := 1 + if data[0] > 0 { + if len(data) < 66 { + return nil, fmt.Errorf("unable to open envelope. First bit set but len(data) < 66") + } + dataStart = 66 + message.Flags = data[0] + message.Signature = data[1:66] + } + message.Payload = data[dataStart:] + if prv != nil { + message.Payload, err = crypto.Decrypt(prv, message.Payload) + if err != nil { + return nil, fmt.Errorf("unable to open envelope. Decrypt failed: %v", err) + } + } + + return &message, nil +} + +func (self *Envelope) proveWork(dura time.Duration) { + var bestBit int + d := make([]byte, 64) + copy(d[:32], ethutil.Encode(self.withoutNonce())) + + then := time.Now().Add(dura).UnixNano() + for n := uint32(0); time.Now().UnixNano() < then; { + for i := 0; i < 1024; i++ { + binary.BigEndian.PutUint32(d[60:], n) + + fbs := ethutil.FirstBitSet(ethutil.BigD(crypto.Sha3(d))) + if fbs > bestBit { + bestBit = fbs + self.Nonce = n + } + + n++ + } + } +} + +func (self *Envelope) valid() bool { + d := make([]byte, 64) + copy(d[:32], ethutil.Encode(self.withoutNonce())) + binary.BigEndian.PutUint32(d[60:], self.Nonce) + return ethutil.FirstBitSet(ethutil.BigD(crypto.Sha3(d))) > 0 +} + +func (self *Envelope) withoutNonce() interface{} { + return []interface{}{self.Expiry, self.Ttl, ethutil.ByteSliceToInterface(self.Topics), self.Data} +} + +func (self *Envelope) RlpData() interface{} { + return []interface{}{self.Expiry, self.Ttl, ethutil.ByteSliceToInterface(self.Topics), self.Data, self.Nonce} +} diff --git a/whisper/filter.go b/whisper/filter.go new file mode 100644 index 000000000..4315aa556 --- /dev/null +++ b/whisper/filter.go @@ -0,0 +1,10 @@ +package whisper + +import "crypto/ecdsa" + +type Filter struct { + To *ecdsa.PrivateKey + From *ecdsa.PublicKey + Topics [][]byte + Fn func(*Message) +} diff --git a/whisper/main.go b/whisper/main.go new file mode 100644 index 000000000..2ee2f3ff1 --- /dev/null +++ b/whisper/main.go @@ -0,0 +1,47 @@ +// +build none + +package main + +import ( + "fmt" + "log" + "net" + "os" + + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/whisper" + "github.com/obscuren/secp256k1-go" +) + +func main() { + logger.AddLogSystem(logger.NewStdLogSystem(os.Stdout, log.LstdFlags, logger.InfoLevel)) + + pub, _ := secp256k1.GenerateKeyPair() + + whisper := whisper.New(&event.TypeMux{}) + + srv := p2p.Server{ + MaxPeers: 10, + Identity: p2p.NewSimpleClientIdentity("whisper-go", "1.0", "", string(pub)), + ListenAddr: ":30303", + NAT: p2p.UPNP(), + + Protocols: []p2p.Protocol{whisper.Protocol()}, + } + if err := srv.Start(); err != nil { + fmt.Println("could not start server:", err) + os.Exit(1) + } + + // add seed peers + seed, err := net.ResolveTCPAddr("tcp", "poc-7.ethdev.com:30300") + if err != nil { + fmt.Println("couldn't resolve:", err) + os.Exit(1) + } + srv.SuggestPeer(seed.IP, seed.Port, nil) + + select {} +} diff --git a/whisper/message.go b/whisper/message.go new file mode 100644 index 000000000..db0110b4a --- /dev/null +++ b/whisper/message.go @@ -0,0 +1,74 @@ +package whisper + +import ( + "crypto/ecdsa" + "time" + + "github.com/ethereum/go-ethereum/crypto" +) + +type Message struct { + Flags byte + Signature []byte + Payload []byte +} + +func NewMessage(payload []byte) *Message { + return &Message{Flags: 0, Payload: payload} +} + +func (self *Message) hash() []byte { + return crypto.Sha3(append([]byte{self.Flags}, self.Payload...)) +} + +func (self *Message) sign(key *ecdsa.PrivateKey) (err error) { + self.Flags = 1 + self.Signature, err = crypto.Sign(self.hash(), key) + return +} + +func (self *Message) Recover() *ecdsa.PublicKey { + defer func() { recover() }() // in case of invalid sig + return crypto.SigToPub(self.hash(), self.Signature) +} + +func (self *Message) Encrypt(to *ecdsa.PublicKey) (err error) { + self.Payload, err = crypto.Encrypt(to, self.Payload) + if err != nil { + return err + } + + return nil +} + +func (self *Message) Bytes() []byte { + return append([]byte{self.Flags}, append(self.Signature, self.Payload...)...) +} + +type Opts struct { + From *ecdsa.PrivateKey + To *ecdsa.PublicKey + Ttl time.Duration + Topics [][]byte +} + +func (self *Message) Seal(pow time.Duration, opts Opts) (*Envelope, error) { + if opts.From != nil { + err := self.sign(opts.From) + if err != nil { + return nil, err + } + } + + if opts.To != nil { + err := self.Encrypt(opts.To) + if err != nil { + return nil, err + } + } + + envelope := NewEnvelope(DefaultTtl, opts.Topics, self) + envelope.Seal(pow) + + return envelope, nil +} diff --git a/whisper/messages_test.go b/whisper/messages_test.go new file mode 100644 index 000000000..cba103011 --- /dev/null +++ b/whisper/messages_test.go @@ -0,0 +1,51 @@ +package whisper + +import ( + "bytes" + "crypto/elliptic" + "fmt" + "testing" + + "github.com/ethereum/go-ethereum/crypto" +) + +func TestSign(t *testing.T) { + prv, _ := crypto.GenerateKey() + msg := NewMessage([]byte("hello world")) + msg.sign(prv) + + pubKey := msg.Recover() + p1 := elliptic.Marshal(crypto.S256(), prv.PublicKey.X, prv.PublicKey.Y) + p2 := elliptic.Marshal(crypto.S256(), pubKey.X, pubKey.Y) + + if !bytes.Equal(p1, p2) { + t.Error("recovered pub key did not match") + } +} + +func TestMessageEncryptDecrypt(t *testing.T) { + prv1, _ := crypto.GenerateKey() + prv2, _ := crypto.GenerateKey() + + data := []byte("hello world") + msg := NewMessage(data) + envelope, err := msg.Seal(DefaultPow, Opts{ + From: prv1, + To: &prv2.PublicKey, + }) + if err != nil { + fmt.Println(err) + t.FailNow() + } + + msg1, err := envelope.Open(prv2) + if err != nil { + fmt.Println(err) + t.FailNow() + } + + if !bytes.Equal(msg1.Payload, data) { + fmt.Println("encryption error. data did not match") + t.FailNow() + } +} diff --git a/whisper/peer.go b/whisper/peer.go new file mode 100644 index 000000000..d42b374b5 --- /dev/null +++ b/whisper/peer.go @@ -0,0 +1,128 @@ +package whisper + +import ( + "fmt" + "io/ioutil" + "time" + + "github.com/ethereum/go-ethereum/p2p" + "gopkg.in/fatih/set.v0" +) + +const ( + protocolVersion = 0x02 +) + +type peer struct { + host *Whisper + peer *p2p.Peer + ws p2p.MsgReadWriter + + // XXX Eventually this is going to reach exceptional large space. We need an expiry here + known *set.Set + + quit chan struct{} +} + +func NewPeer(host *Whisper, p *p2p.Peer, ws p2p.MsgReadWriter) *peer { + return &peer{host, p, ws, set.New(), make(chan struct{})} +} + +func (self *peer) init() error { + if err := self.handleStatus(); err != nil { + return err + } + + return nil +} + +func (self *peer) start() { + go self.update() + self.peer.Infoln("whisper started") +} + +func (self *peer) stop() { + self.peer.Infoln("whisper stopped") + + close(self.quit) +} + +func (self *peer) update() { + relay := time.NewTicker(300 * time.Millisecond) +out: + for { + select { + case <-relay.C: + err := self.broadcast(self.host.envelopes()) + if err != nil { + self.peer.Infoln(err) + break out + } + + case <-self.quit: + break out + } + } +} + +func (self *peer) broadcast(envelopes []*Envelope) error { + envs := make([]interface{}, len(envelopes)) + i := 0 + for _, envelope := range envelopes { + if !self.known.Has(envelope.Hash()) { + envs[i] = envelope + self.known.Add(envelope.Hash()) + i++ + } + } + + if i > 0 { + msg := p2p.NewMsg(envelopesMsg, envs[:i]...) + if err := self.ws.WriteMsg(msg); err != nil { + return err + } + self.peer.Infoln("broadcasted", i, "message(s)") + } + + return nil +} + +func (self *peer) addKnown(envelope *Envelope) { + self.known.Add(envelope.Hash()) +} + +func (self *peer) handleStatus() error { + ws := self.ws + + if err := ws.WriteMsg(self.statusMsg()); err != nil { + return err + } + + msg, err := ws.ReadMsg() + if err != nil { + return err + } + + if msg.Code != statusMsg { + return fmt.Errorf("peer send %x before status msg", msg.Code) + } + + data, err := ioutil.ReadAll(msg.Payload) + if err != nil { + return err + } + + if len(data) == 0 { + return fmt.Errorf("malformed status. data len = 0") + } + + if pv := data[0]; pv != protocolVersion { + return fmt.Errorf("protocol version mismatch %d != %d", pv, protocolVersion) + } + + return nil +} + +func (self *peer) statusMsg() p2p.Msg { + return p2p.NewMsg(statusMsg, protocolVersion) +} diff --git a/whisper/sort.go b/whisper/sort.go new file mode 100644 index 000000000..8c5b46e9e --- /dev/null +++ b/whisper/sort.go @@ -0,0 +1,25 @@ +package whisper + +import "sort" + +type sortedKeys struct { + k []int32 +} + +func (self *sortedKeys) Len() int { return len(self.k) } +func (self *sortedKeys) Less(i, j int) bool { return self.k[i] < self.k[j] } +func (self *sortedKeys) Swap(i, j int) { self.k[i], self.k[j] = self.k[j], self.k[i] } + +func sortKeys(m map[int32]Hash) []int32 { + sorted := new(sortedKeys) + sorted.k = make([]int32, len(m)) + i := 0 + for key, _ := range m { + sorted.k[i] = key + i++ + } + + sort.Sort(sorted) + + return sorted.k +} diff --git a/whisper/sort_test.go b/whisper/sort_test.go new file mode 100644 index 000000000..5d8177d41 --- /dev/null +++ b/whisper/sort_test.go @@ -0,0 +1,19 @@ +package whisper + +import "testing" + +func TestSorting(t *testing.T) { + m := map[int32]Hash{ + 1: HS("1"), + 3: HS("3"), + 2: HS("2"), + 5: HS("5"), + } + exp := []int32{1, 2, 3, 5} + res := sortKeys(m) + for i, k := range res { + if k != exp[i] { + t.Error(k, "failed. Expected", exp[i]) + } + } +} diff --git a/whisper/util.go b/whisper/util.go new file mode 100644 index 000000000..abef1d667 --- /dev/null +++ b/whisper/util.go @@ -0,0 +1,27 @@ +package whisper + +import "github.com/ethereum/go-ethereum/crypto" + +func hashTopic(topic []byte) []byte { + return crypto.Sha3(topic)[:4] +} + +// NOTE this isn't DRY, but I don't want to iterate twice. + +// Returns a formatted topics byte slice. +// data: unformatted data (e.g., no hashes needed) +func Topics(data [][]byte) [][]byte { + d := make([][]byte, len(data)) + for i, byts := range data { + d[i] = hashTopic(byts) + } + return d +} + +func TopicsFromString(data []string) [][]byte { + d := make([][]byte, len(data)) + for i, str := range data { + d[i] = hashTopic([]byte(str)) + } + return d +} diff --git a/whisper/whisper.go b/whisper/whisper.go new file mode 100644 index 000000000..356debd1c --- /dev/null +++ b/whisper/whisper.go @@ -0,0 +1,246 @@ +package whisper + +import ( + "bytes" + "crypto/ecdsa" + "errors" + "fmt" + "sync" + "time" + + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/event/filter" + "github.com/ethereum/go-ethereum/p2p" + "gopkg.in/fatih/set.v0" +) + +// MOVE ME +type Hash struct { + hash string +} + +var EmptyHash Hash + +func H(hash []byte) Hash { + return Hash{string(hash)} +} +func HS(hash string) Hash { + return Hash{hash} +} + +func (self Hash) Compare(other Hash) int { + return bytes.Compare([]byte(self.hash), []byte(other.hash)) +} + +// MOVE ME END + +const ( + statusMsg = 0x0 + envelopesMsg = 0x01 +) + +type MessageEvent struct { + To *ecdsa.PrivateKey + From *ecdsa.PublicKey + Message *Message +} + +const DefaultTtl = 50 * time.Second + +type Whisper struct { + protocol p2p.Protocol + filters *filter.Filters + + mmu sync.RWMutex + messages map[Hash]*Envelope + expiry map[uint32]*set.SetNonTS + + quit chan struct{} + + keys []*ecdsa.PrivateKey +} + +func New() *Whisper { + whisper := &Whisper{ + messages: make(map[Hash]*Envelope), + filters: filter.New(), + expiry: make(map[uint32]*set.SetNonTS), + quit: make(chan struct{}), + } + whisper.filters.Start() + go whisper.update() + + // XXX TODO REMOVE TESTING CODE + msg := NewMessage([]byte(fmt.Sprintf("Hello world. This is whisper-go. Incase you're wondering; the time is %v", time.Now()))) + envelope, _ := msg.Seal(DefaultPow, Opts{ + Ttl: DefaultTtl, + }) + if err := whisper.Send(envelope); err != nil { + fmt.Println(err) + } + // XXX TODO REMOVE TESTING CODE + + // p2p whisper sub protocol handler + whisper.protocol = p2p.Protocol{ + Name: "shh", + Version: 2, + Length: 2, + Run: whisper.msgHandler, + } + + return whisper +} + +func (self *Whisper) Stop() { + close(self.quit) +} + +func (self *Whisper) Send(envelope *Envelope) error { + return self.add(envelope) +} + +func (self *Whisper) NewIdentity() *ecdsa.PrivateKey { + key, err := crypto.GenerateKey() + if err != nil { + panic(err) + } + self.keys = append(self.keys, key) + + return key +} + +func (self *Whisper) HasIdentity(key *ecdsa.PrivateKey) bool { + for _, key := range self.keys { + if key.D.Cmp(key.D) == 0 { + return true + } + } + return false +} + +func (self *Whisper) Watch(opts Filter) int { + return self.filters.Install(filter.Generic{ + Str1: string(crypto.FromECDSA(opts.To)), + Str2: string(crypto.FromECDSAPub(opts.From)), + Fn: func(data interface{}) { + opts.Fn(data.(*Message)) + }, + }) +} + +// Main handler for passing whisper messages to whisper peer objects +func (self *Whisper) msgHandler(peer *p2p.Peer, ws p2p.MsgReadWriter) error { + wpeer := NewPeer(self, peer, ws) + // initialise whisper peer (handshake/status) + if err := wpeer.init(); err != nil { + return err + } + // kick of the main handler for broadcasting/managing envelopes + go wpeer.start() + defer wpeer.stop() + + // Main *read* loop. Writing is done by the peer it self. + for { + msg, err := ws.ReadMsg() + if err != nil { + return err + } + + envelope, err := NewEnvelopeFromReader(msg.Payload) + if err != nil { + peer.Infoln(err) + continue + } + + if err := self.add(envelope); err != nil { + // TODO Punish peer here. Invalid envelope. + peer.Infoln(err) + } + wpeer.addKnown(envelope) + } +} + +// takes care of adding envelopes to the messages pool. At this moment no sanity checks are being performed. +func (self *Whisper) add(envelope *Envelope) error { + if !envelope.valid() { + return errors.New("invalid pow provided for envelope") + } + + self.mmu.Lock() + defer self.mmu.Unlock() + + hash := envelope.Hash() + self.messages[hash] = envelope + if self.expiry[envelope.Expiry] == nil { + self.expiry[envelope.Expiry] = set.NewNonTS() + } + + if !self.expiry[envelope.Expiry].Has(hash) { + self.expiry[envelope.Expiry].Add(hash) + self.postEvent(envelope) + } + + return nil +} + +func (self *Whisper) update() { + expire := time.NewTicker(800 * time.Millisecond) +out: + for { + select { + case <-expire.C: + self.expire() + case <-self.quit: + break out + } + } +} + +func (self *Whisper) expire() { + self.mmu.Lock() + defer self.mmu.Unlock() + + now := uint32(time.Now().Unix()) + for then, hashSet := range self.expiry { + if then > now { + continue + } + + hashSet.Each(func(v interface{}) bool { + delete(self.messages, v.(Hash)) + return true + }) + self.expiry[then].Clear() + } +} + +func (self *Whisper) envelopes() (envelopes []*Envelope) { + self.mmu.RLock() + defer self.mmu.RUnlock() + + envelopes = make([]*Envelope, len(self.messages)) + i := 0 + for _, envelope := range self.messages { + envelopes[i] = envelope + i++ + } + + return +} + +func (self *Whisper) postEvent(envelope *Envelope) { + for _, key := range self.keys { + if message, err := envelope.Open(key); err == nil { + // Create a custom filter? + self.filters.Notify(filter.Generic{ + Str1: string(crypto.FromECDSA(key)), Str2: string(crypto.FromECDSAPub(message.Recover())), + }, message) + } else { + fmt.Println(err) + } + } +} + +func (self *Whisper) Protocol() p2p.Protocol { + return self.protocol +} diff --git a/whisper/whisper_test.go b/whisper/whisper_test.go new file mode 100644 index 000000000..107cb8c97 --- /dev/null +++ b/whisper/whisper_test.go @@ -0,0 +1,47 @@ +package whisper + +import ( + "fmt" + "testing" + "time" +) + +func TestKeyManagement(t *testing.T) { + whisper := New() + + key := whisper.NewIdentity() + if !whisper.HasIdentity(key) { + t.Error("expected whisper to have identify") + } +} + +func TestEvent(t *testing.T) { + res := make(chan *Message, 1) + whisper := New() + id := whisper.NewIdentity() + whisper.Watch(Filter{ + To: id, + Fn: func(msg *Message) { + res <- msg + }, + }) + + msg := NewMessage([]byte(fmt.Sprintf("Hello world. This is whisper-go. Incase you're wondering; the time is %v", time.Now()))) + envelope, err := msg.Seal(DefaultPow, Opts{ + Ttl: DefaultTtl, + From: id, + To: &id.PublicKey, + }) + if err != nil { + fmt.Println(err) + t.FailNow() + } + + tick := time.NewTicker(time.Second) + whisper.postEvent(envelope) + select { + case <-res: + case <-tick.C: + t.Error("did not receive message") + } +} |