From 3af211dd65d6690afce9976a9f47ab1cdddb8d58 Mon Sep 17 00:00:00 2001 From: obscuren Date: Tue, 30 Sep 2014 23:26:52 +0200 Subject: Implemented WebSocket package --- block_pool.go | 5 +- ethchain/state_transition.go | 14 ++--- peer.go | 2 +- websocket/client.go | 122 +++++++++++++++++++++++++++++++++++++++++ websocket/message.go | 14 +++++ websocket/server.go | 127 +++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 272 insertions(+), 12 deletions(-) create mode 100644 websocket/client.go create mode 100644 websocket/message.go create mode 100644 websocket/server.go diff --git a/block_pool.go b/block_pool.go index 957b7601b..003d1db58 100644 --- a/block_pool.go +++ b/block_pool.go @@ -11,7 +11,6 @@ import ( "github.com/ethereum/eth-go/ethchain" "github.com/ethereum/eth-go/ethlog" "github.com/ethereum/eth-go/ethutil" - "github.com/ethereum/eth-go/ethwire" ) var poollogger = ethlog.NewLogger("BPOOL") @@ -99,8 +98,8 @@ func (self *BlockPool) Add(b *ethchain.Block, peer *Peer) { self.pool[hash] = &block{peer, peer, b, time.Now(), 0} if !self.eth.BlockChain().HasBlock(b.PrevHash) && !self.fetchingHashes { - poollogger.Infof("Unknown block, requesting parent (%x...)\n", b.PrevHash[0:4]) - peer.QueueMessage(ethwire.NewMessage(ethwire.MsgGetBlockHashesTy, []interface{}{b.PrevHash, uint32(256)})) + //poollogger.Infof("Unknown block, requesting parent (%x...)\n", b.PrevHash[0:4]) + //peer.QueueMessage(ethwire.NewMessage(ethwire.MsgGetBlockHashesTy, []interface{}{b.PrevHash, uint32(256)})) } } else if self.pool[hash] != nil { self.pool[hash].block = b diff --git a/ethchain/state_transition.go b/ethchain/state_transition.go index c1180a641..096464963 100644 --- a/ethchain/state_transition.go +++ b/ethchain/state_transition.go @@ -142,14 +142,12 @@ func (self *StateTransition) preCheck() (err error) { func (self *StateTransition) TransitionState() (err error) { statelogger.Debugf("(~) %x\n", self.tx.Hash()) - /* - defer func() { - if r := recover(); r != nil { - logger.Infoln(r) - err = fmt.Errorf("state transition err %v", r) - } - }() - */ + defer func() { + if r := recover(); r != nil { + statelogger.Infoln(r) + err = fmt.Errorf("state transition err %v", r) + } + }() // XXX Transactions after this point are considered valid. if err = self.preCheck(); err != nil { diff --git a/peer.go b/peer.go index f5d0fe4ed..318294509 100644 --- a/peer.go +++ b/peer.go @@ -320,7 +320,7 @@ out: case msg := <-p.outputQueue: if !p.statusKnown { switch msg.Type { - case ethwire.MsgStatusTy, ethwire.MsgGetTxsTy, ethwire.MsgTxTy, ethwire.MsgGetBlockHashesTy, ethwire.MsgBlockHashesTy, ethwire.MsgGetBlocksTy, ethwire.MsgBlockTy: + case ethwire.MsgGetTxsTy, ethwire.MsgTxTy, ethwire.MsgGetBlockHashesTy, ethwire.MsgBlockHashesTy, ethwire.MsgGetBlocksTy, ethwire.MsgBlockTy: break skip } } diff --git a/websocket/client.go b/websocket/client.go new file mode 100644 index 000000000..1ff0d3f64 --- /dev/null +++ b/websocket/client.go @@ -0,0 +1,122 @@ +package websocket + +import ( + "fmt" + "io" + + ws "code.google.com/p/go.net/websocket" +) + +const channelBufSize = 100 + +var maxId int = 0 + +type MsgFunc func(c *Client, msg *Message) + +// Chat client. +type Client struct { + id int + ws *ws.Conn + server *Server + ch chan *Message + doneCh chan bool + + onMessage MsgFunc +} + +// Create new chat client. +func NewClient(ws *ws.Conn, server *Server) *Client { + + if ws == nil { + panic("ws cannot be nil") + } + + if server == nil { + panic("server cannot be nil") + } + + maxId++ + ch := make(chan *Message, channelBufSize) + doneCh := make(chan bool) + + return &Client{maxId, ws, server, ch, doneCh, nil} +} + +func (c *Client) Id() int { + return c.id +} + +func (c *Client) Conn() *ws.Conn { + return c.ws +} + +func (c *Client) Write(data interface{}, seed int) { + msg := &Message{Seed: seed, Data: data} + select { + case c.ch <- msg: + default: + c.server.Del(c) + err := fmt.Errorf("client %d is disconnected.", c.id) + c.server.Err(err) + } +} + +func (c *Client) Done() { + c.doneCh <- true +} + +// Listen Write and Read request via chanel +func (c *Client) Listen() { + go c.listenWrite() + c.listenRead() +} + +// Listen write request via chanel +func (c *Client) listenWrite() { + logger.Debugln("Listening write to client") + for { + select { + + // send message to the client + case msg := <-c.ch: + logger.Debugln("Send:", msg) + ws.JSON.Send(c.ws, msg) + + // receive done request + case <-c.doneCh: + c.server.Del(c) + c.doneCh <- true // for listenRead method + return + } + } +} + +// Listen read request via chanel +func (c *Client) listenRead() { + logger.Debugln("Listening read from client") + for { + select { + + // receive done request + case <-c.doneCh: + c.server.Del(c) + c.doneCh <- true // for listenWrite method + return + + // read data from ws connection + default: + var msg Message + err := ws.JSON.Receive(c.ws, &msg) + if err == io.EOF { + c.doneCh <- true + } else if err != nil { + c.server.Err(err) + } else { + logger.Debugln(&msg) + if c.onMessage != nil { + c.onMessage(c, &msg) + } + } + } + } +} diff --git a/websocket/message.go b/websocket/message.go new file mode 100644 index 000000000..30e284814 --- /dev/null +++ b/websocket/message.go @@ -0,0 +1,14 @@ +package websocket + +import "github.com/ethereum/eth-go/ethutil" + +type Message struct { + Call string `json:"call"` + Args []interface{} `json:"args"` + Seed int `json:"seed"` + Data interface{} `json:"data"` +} + +func (self *Message) Arguments() *ethutil.Value { + return ethutil.NewValue(self.Args) +} diff --git a/websocket/server.go b/websocket/server.go new file mode 100644 index 000000000..648f3fdc3 --- /dev/null +++ b/websocket/server.go @@ -0,0 +1,127 @@ +package websocket + +import ( + "net/http" + + "github.com/ethereum/eth-go/ethlog" + + ws "code.google.com/p/go.net/websocket" +) + +var logger = ethlog.NewLogger("WS") + +// Chat server. +type Server struct { + httpServ string + pattern string + messages []*Message + clients map[int]*Client + addCh chan *Client + delCh chan *Client + sendAllCh chan string + doneCh chan bool + errCh chan error + msgFunc MsgFunc +} + +// Create new chat server. +func NewServer(pattern, httpServ string) *Server { + clients := make(map[int]*Client) + addCh := make(chan *Client) + delCh := make(chan *Client) + sendAllCh := make(chan string) + doneCh := make(chan bool) + errCh := make(chan error) + + return &Server{ + httpServ, + pattern, + nil, + clients, + addCh, + delCh, + sendAllCh, + doneCh, + errCh, + nil, + } +} + +func (s *Server) Add(c *Client) { + s.addCh <- c +} + +func (s *Server) Del(c *Client) { + s.delCh <- c +} + +func (s *Server) SendAll(msg string) { + s.sendAllCh <- msg +} + +func (s *Server) Done() { + s.doneCh <- true +} + +func (s *Server) Err(err error) { + s.errCh <- err +} + +func (s *Server) servHTTP() { + logger.Debugln("Serving http", s.httpServ) + err := http.ListenAndServe(s.httpServ, nil) + + logger.Warnln(err) +} + +func (s *Server) MessageFunc(f MsgFunc) { + s.msgFunc = f +} + +// Listen and serve. +// It serves client connection and broadcast request. +func (s *Server) Listen() { + logger.Debugln("Listening server...") + + // ws handler + onConnected := func(ws *ws.Conn) { + defer func() { + err := ws.Close() + if err != nil { + s.errCh <- err + } + }() + + client := NewClient(ws, s) + client.onMessage = s.msgFunc + s.Add(client) + client.Listen() + } + // Disable Origin check. Request don't need to come necessarily from origin. + http.HandleFunc(s.pattern, func(w http.ResponseWriter, req *http.Request) { + s := ws.Server{Handler: ws.Handler(onConnected)} + s.ServeHTTP(w, req) + }) + logger.Debugln("Created handler") + + go s.servHTTP() + + for { + select { + + // Add new a client + case c := <-s.addCh: + s.clients[c.id] = c + + // del a client + case c := <-s.delCh: + delete(s.clients, c.id) + + case err := <-s.errCh: + logger.Debugln("Error:", err.Error()) + + case <-s.doneCh: + return + } + } +} -- cgit v1.2.3