aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Godeps/Godeps.json4
-rw-r--r--Godeps/_workspace/src/github.com/rs/cors/.travis.yml4
-rw-r--r--Godeps/_workspace/src/github.com/rs/cors/LICENSE19
-rw-r--r--Godeps/_workspace/src/github.com/rs/cors/README.md84
-rw-r--r--Godeps/_workspace/src/github.com/rs/cors/cors.go308
-rw-r--r--Godeps/_workspace/src/github.com/rs/cors/utils.go27
-rw-r--r--Godeps/_workspace/src/github.com/rs/xhandler/.travis.yml7
-rw-r--r--Godeps/_workspace/src/github.com/rs/xhandler/LICENSE19
-rw-r--r--Godeps/_workspace/src/github.com/rs/xhandler/README.md134
-rw-r--r--Godeps/_workspace/src/github.com/rs/xhandler/chain.go93
-rw-r--r--Godeps/_workspace/src/github.com/rs/xhandler/middleware.go59
-rw-r--r--Godeps/_workspace/src/github.com/rs/xhandler/xhandler.go42
-rw-r--r--cmd/utils/flags.go2
-rw-r--r--rpc/http.go277
-rw-r--r--rpc/server.go60
15 files changed, 903 insertions, 236 deletions
diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json
index 9bcc8c756..e02f15882 100644
--- a/Godeps/Godeps.json
+++ b/Godeps/Godeps.json
@@ -139,6 +139,10 @@
"Rev": "53221230c215611a90762720c9042ac782ef74ee"
},
{
+ "ImportPath": "github.com/rs/cors",
+ "Rev": "5950cf11d77f8a61b432a25dd4d444b4ced01379"
+ },
+ {
"ImportPath": "github.com/syndtr/goleveldb/leveldb",
"Rev": "ad0d8b2ab58a55ed5c58073aa46451d5e1ca1280"
},
diff --git a/Godeps/_workspace/src/github.com/rs/cors/.travis.yml b/Godeps/_workspace/src/github.com/rs/cors/.travis.yml
new file mode 100644
index 000000000..bbb5185a2
--- /dev/null
+++ b/Godeps/_workspace/src/github.com/rs/cors/.travis.yml
@@ -0,0 +1,4 @@
+language: go
+go:
+- 1.3
+- 1.4
diff --git a/Godeps/_workspace/src/github.com/rs/cors/LICENSE b/Godeps/_workspace/src/github.com/rs/cors/LICENSE
new file mode 100644
index 000000000..d8e2df5a4
--- /dev/null
+++ b/Godeps/_workspace/src/github.com/rs/cors/LICENSE
@@ -0,0 +1,19 @@
+Copyright (c) 2014 Olivier Poitrey <rs@dailymotion.com>
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is furnished
+to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE.
diff --git a/Godeps/_workspace/src/github.com/rs/cors/README.md b/Godeps/_workspace/src/github.com/rs/cors/README.md
new file mode 100644
index 000000000..6f70c30ac
--- /dev/null
+++ b/Godeps/_workspace/src/github.com/rs/cors/README.md
@@ -0,0 +1,84 @@
+# Go CORS handler [![godoc](http://img.shields.io/badge/godoc-reference-blue.svg?style=flat)](https://godoc.org/github.com/rs/cors) [![license](http://img.shields.io/badge/license-MIT-red.svg?style=flat)](https://raw.githubusercontent.com/rs/cors/master/LICENSE) [![build](https://img.shields.io/travis/rs/cors.svg?style=flat)](https://travis-ci.org/rs/cors)
+
+CORS is a `net/http` handler implementing [Cross Origin Resource Sharing W3 specification](http://www.w3.org/TR/cors/) in Golang.
+
+## Getting Started
+
+After installing Go and setting up your [GOPATH](http://golang.org/doc/code.html#GOPATH), create your first `.go` file. We'll call it `server.go`.
+
+```go
+package main
+
+import (
+ "net/http"
+
+ "github.com/rs/cors"
+)
+
+func main() {
+ h := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ w.Header().Set("Content-Type", "application/json")
+ w.Write([]byte("{\"hello\": \"world\"}"))
+ })
+
+ // cors.Default() setup the middleware with default options being
+ // all origins accepted with simple methods (GET, POST). See
+ // documentation below for more options.
+ handler = cors.Default().Handler(h)
+ http.ListenAndServe(":8080", handler)
+}
+```
+
+Install `cors`:
+
+ go get github.com/rs/cors
+
+Then run your server:
+
+ go run server.go
+
+The server now runs on `localhost:8080`:
+
+ $ curl -D - -H 'Origin: http://foo.com' http://localhost:8080/
+ HTTP/1.1 200 OK
+ Access-Control-Allow-Origin: foo.com
+ Content-Type: application/json
+ Date: Sat, 25 Oct 2014 03:43:57 GMT
+ Content-Length: 18
+
+ {"hello": "world"}
+
+### More Examples
+
+* `net/http`: [examples/nethttp/server.go](https://github.com/rs/cors/blob/master/examples/nethttp/server.go)
+* [Goji](https://goji.io): [examples/goji/server.go](https://github.com/rs/cors/blob/master/examples/goji/server.go)
+* [Martini](http://martini.codegangsta.io): [examples/martini/server.go](https://github.com/rs/cors/blob/master/examples/martini/server.go)
+* [Negroni](https://github.com/codegangsta/negroni): [examples/negroni/server.go](https://github.com/rs/cors/blob/master/examples/negroni/server.go)
+* [Alice](https://github.com/justinas/alice): [examples/alice/server.go](https://github.com/rs/cors/blob/master/examples/alice/server.go)
+
+## Parameters
+
+Parameters are passed to the middleware thru the `cors.New` method as follow:
+
+```go
+c := cors.New(cors.Options{
+ AllowedOrigins: []string{"http://foo.com"},
+ AllowCredentials: true,
+})
+
+// Insert the middleware
+handler = c.Handler(handler)
+```
+
+* **AllowedOrigins** `[]string`: A list of origins a cross-domain request can be executed from. If the special `*` value is present in the list, all origins will be allowed. The default value is `*`.
+* **AllowedMethods** `[]string`: A list of methods the client is allowed to use with cross-domain requests.
+* **AllowedHeaders** `[]string`: A list of non simple headers the client is allowed to use with cross-domain requests. Default value is simple methods (`GET` and `POST`)
+* **ExposedHeaders** `[]string`: Indicates which headers are safe to expose to the API of a CORS API specification
+* **AllowCredentials** `bool`: Indicates whether the request can include user credentials like cookies, HTTP authentication or client side SSL certificates. The default is `false`.
+* **MaxAge** `int`: Indicates how long (in seconds) the results of a preflight request can be cached. The default is `0` which stands for no max age.
+
+See [API documentation](http://godoc.org/github.com/rs/cors) for more info.
+
+## Licenses
+
+All source code is licensed under the [MIT License](https://raw.github.com/rs/cors/master/LICENSE).
diff --git a/Godeps/_workspace/src/github.com/rs/cors/cors.go b/Godeps/_workspace/src/github.com/rs/cors/cors.go
new file mode 100644
index 000000000..276bc40bb
--- /dev/null
+++ b/Godeps/_workspace/src/github.com/rs/cors/cors.go
@@ -0,0 +1,308 @@
+/*
+Package cors is net/http handler to handle CORS related requests
+as defined by http://www.w3.org/TR/cors/
+
+You can configure it by passing an option struct to cors.New:
+
+ c := cors.New(cors.Options{
+ AllowedOrigins: []string{"foo.com"},
+ AllowedMethods: []string{"GET", "POST", "DELETE"},
+ AllowCredentials: true,
+ })
+
+Then insert the handler in the chain:
+
+ handler = c.Handler(handler)
+
+See Options documentation for more options.
+
+The resulting handler is a standard net/http handler.
+*/
+package cors
+
+import (
+ "log"
+ "net/http"
+ "os"
+ "strconv"
+ "strings"
+)
+
+// Options is a configuration container to setup the CORS middleware.
+type Options struct {
+ // AllowedOrigins is a list of origins a cross-domain request can be executed from.
+ // If the special "*" value is present in the list, all origins will be allowed.
+ // Default value is ["*"]
+ AllowedOrigins []string
+ // AllowedMethods is a list of methods the client is allowed to use with
+ // cross-domain requests. Default value is simple methods (GET and POST)
+ AllowedMethods []string
+ // AllowedHeaders is list of non simple headers the client is allowed to use with
+ // cross-domain requests.
+ // If the special "*" value is present in the list, all headers will be allowed.
+ // Default value is [] but "Origin" is always appended to the list.
+ AllowedHeaders []string
+ // ExposedHeaders indicates which headers are safe to expose to the API of a CORS
+ // API specification
+ ExposedHeaders []string
+ // AllowCredentials indicates whether the request can include user credentials like
+ // cookies, HTTP authentication or client side SSL certificates.
+ AllowCredentials bool
+ // MaxAge indicates how long (in seconds) the results of a preflight request
+ // can be cached
+ MaxAge int
+ // Debugging flag adds additional output to debug server side CORS issues
+ Debug bool
+ // log object to use when debugging
+ log *log.Logger
+}
+
+type Cors struct {
+ // The CORS Options
+ options Options
+}
+
+// New creates a new Cors handler with the provided options.
+func New(options Options) *Cors {
+ // Normalize options
+ // Note: for origins and methods matching, the spec requires a case-sensitive matching.
+ // As it may error prone, we chose to ignore the spec here.
+ normOptions := Options{
+ AllowedOrigins: convert(options.AllowedOrigins, strings.ToLower),
+ AllowedMethods: convert(options.AllowedMethods, strings.ToUpper),
+ // Origin is always appended as some browsers will always request
+ // for this header at preflight
+ AllowedHeaders: convert(append(options.AllowedHeaders, "Origin"), http.CanonicalHeaderKey),
+ ExposedHeaders: convert(options.ExposedHeaders, http.CanonicalHeaderKey),
+ AllowCredentials: options.AllowCredentials,
+ MaxAge: options.MaxAge,
+ Debug: options.Debug,
+ log: log.New(os.Stdout, "[cors] ", log.LstdFlags),
+ }
+ if len(normOptions.AllowedOrigins) == 0 {
+ // Default is all origins
+ normOptions.AllowedOrigins = []string{"*"}
+ }
+ if len(normOptions.AllowedHeaders) == 1 {
+ // Add some sensible defaults
+ normOptions.AllowedHeaders = []string{"Origin", "Accept", "Content-Type"}
+ }
+ if len(normOptions.AllowedMethods) == 0 {
+ // Default is simple methods
+ normOptions.AllowedMethods = []string{"GET", "POST"}
+ }
+
+ if normOptions.Debug {
+ normOptions.log.Printf("Options: %v", normOptions)
+ }
+ return &Cors{
+ options: normOptions,
+ }
+}
+
+// Default creates a new Cors handler with default options
+func Default() *Cors {
+ return New(Options{})
+}
+
+// Handler apply the CORS specification on the request, and add relevant CORS headers
+// as necessary.
+func (cors *Cors) Handler(h http.Handler) http.Handler {
+ return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ if r.Method == "OPTIONS" {
+ cors.logf("Handler: Preflight request")
+ cors.handlePreflight(w, r)
+ // Preflight requests are standalone and should stop the chain as some other
+ // middleware may not handle OPTIONS requests correctly. One typical example
+ // is authentication middleware ; OPTIONS requests won't carry authentication
+ // headers (see #1)
+ } else {
+ cors.logf("Handler: Actual request")
+ cors.handleActualRequest(w, r)
+ h.ServeHTTP(w, r)
+ }
+ })
+}
+
+// Martini compatible handler
+func (cors *Cors) HandlerFunc(w http.ResponseWriter, r *http.Request) {
+ if r.Method == "OPTIONS" {
+ cors.logf("HandlerFunc: Preflight request")
+ cors.handlePreflight(w, r)
+ } else {
+ cors.logf("HandlerFunc: Actual request")
+ cors.handleActualRequest(w, r)
+ }
+}
+
+// Negroni compatible interface
+func (cors *Cors) ServeHTTP(w http.ResponseWriter, r *http.Request, next http.HandlerFunc) {
+ if r.Method == "OPTIONS" {
+ cors.logf("ServeHTTP: Preflight request")
+ cors.handlePreflight(w, r)
+ // Preflight requests are standalone and should stop the chain as some other
+ // middleware may not handle OPTIONS requests correctly. One typical example
+ // is authentication middleware ; OPTIONS requests won't carry authentication
+ // headers (see #1)
+ } else {
+ cors.logf("ServeHTTP: Actual request")
+ cors.handleActualRequest(w, r)
+ next(w, r)
+ }
+}
+
+// handlePreflight handles pre-flight CORS requests
+func (cors *Cors) handlePreflight(w http.ResponseWriter, r *http.Request) {
+ options := cors.options
+ headers := w.Header()
+ origin := r.Header.Get("Origin")
+
+ if r.Method != "OPTIONS" {
+ cors.logf(" Preflight aborted: %s!=OPTIONS", r.Method)
+ return
+ }
+ if origin == "" {
+ cors.logf(" Preflight aborted: empty origin")
+ return
+ }
+ if !cors.isOriginAllowed(origin) {
+ cors.logf(" Preflight aborted: origin '%s' not allowed", origin)
+ return
+ }
+
+ reqMethod := r.Header.Get("Access-Control-Request-Method")
+ if !cors.isMethodAllowed(reqMethod) {
+ cors.logf(" Preflight aborted: method '%s' not allowed", reqMethod)
+ return
+ }
+ reqHeaders := parseHeaderList(r.Header.Get("Access-Control-Request-Headers"))
+ if !cors.areHeadersAllowed(reqHeaders) {
+ cors.logf(" Preflight aborted: headers '%v' not allowed", reqHeaders)
+ return
+ }
+ headers.Set("Access-Control-Allow-Origin", origin)
+ headers.Add("Vary", "Origin")
+ // Spec says: Since the list of methods can be unbounded, simply returning the method indicated
+ // by Access-Control-Request-Method (if supported) can be enough
+ headers.Set("Access-Control-Allow-Methods", strings.ToUpper(reqMethod))
+ if len(reqHeaders) > 0 {
+
+ // Spec says: Since the list of headers can be unbounded, simply returning supported headers
+ // from Access-Control-Request-Headers can be enough
+ headers.Set("Access-Control-Allow-Headers", strings.Join(reqHeaders, ", "))
+ }
+ if options.AllowCredentials {
+ headers.Set("Access-Control-Allow-Credentials", "true")
+ }
+ if options.MaxAge > 0 {
+ headers.Set("Access-Control-Max-Age", strconv.Itoa(options.MaxAge))
+ }
+ cors.logf(" Preflight response headers: %v", headers)
+}
+
+// handleActualRequest handles simple cross-origin requests, actual request or redirects
+func (cors *Cors) handleActualRequest(w http.ResponseWriter, r *http.Request) {
+ options := cors.options
+ headers := w.Header()
+ origin := r.Header.Get("Origin")
+
+ if r.Method == "OPTIONS" {
+ cors.logf(" Actual request no headers added: method == %s", r.Method)
+ return
+ }
+ if origin == "" {
+ cors.logf(" Actual request no headers added: missing origin")
+ return
+ }
+ if !cors.isOriginAllowed(origin) {
+ cors.logf(" Actual request no headers added: origin '%s' not allowed", origin)
+ return
+ }
+
+ // Note that spec does define a way to specifically disallow a simple method like GET or
+ // POST. Access-Control-Allow-Methods is only used for pre-flight requests and the
+ // spec doesn't instruct to check the allowed methods for simple cross-origin requests.
+ // We think it's a nice feature to be able to have control on those methods though.
+ if !cors.isMethodAllowed(r.Method) {
+ if cors.options.Debug {
+ cors.logf(" Actual request no headers added: method '%s' not allowed",
+ r.Method)
+ }
+
+ return
+ }
+ headers.Set("Access-Control-Allow-Origin", origin)
+ headers.Add("Vary", "Origin")
+ if len(options.ExposedHeaders) > 0 {
+ headers.Set("Access-Control-Expose-Headers", strings.Join(options.ExposedHeaders, ", "))
+ }
+ if options.AllowCredentials {
+ headers.Set("Access-Control-Allow-Credentials", "true")
+ }
+ cors.logf(" Actual response added headers: %v", headers)
+}
+
+// convenience method. checks if debugging is turned on before printing
+func (cors *Cors) logf(format string, a ...interface{}) {
+ if cors.options.Debug {
+ cors.options.log.Printf(format, a...)
+ }
+}
+
+// isOriginAllowed checks if a given origin is allowed to perform cross-domain requests
+// on the endpoint
+func (cors *Cors) isOriginAllowed(origin string) bool {
+ allowedOrigins := cors.options.AllowedOrigins
+ origin = strings.ToLower(origin)
+ for _, allowedOrigin := range allowedOrigins {
+ switch allowedOrigin {
+ case "*":
+ return true
+ case origin:
+ return true
+ }
+ }
+ return false
+}
+
+// isMethodAllowed checks if a given method can be used as part of a cross-domain request
+// on the endpoing
+func (cors *Cors) isMethodAllowed(method string) bool {
+ allowedMethods := cors.options.AllowedMethods
+ if len(allowedMethods) == 0 {
+ // If no method allowed, always return false, even for preflight request
+ return false
+ }
+ method = strings.ToUpper(method)
+ if method == "OPTIONS" {
+ // Always allow preflight requests
+ return true
+ }
+ for _, allowedMethod := range allowedMethods {
+ if allowedMethod == method {
+ return true
+ }
+ }
+ return false
+}
+
+// areHeadersAllowed checks if a given list of headers are allowed to used within
+// a cross-domain request.
+func (cors *Cors) areHeadersAllowed(requestedHeaders []string) bool {
+ if len(requestedHeaders) == 0 {
+ return true
+ }
+ for _, header := range requestedHeaders {
+ found := false
+ for _, allowedHeader := range cors.options.AllowedHeaders {
+ if allowedHeader == "*" || allowedHeader == header {
+ found = true
+ break
+ }
+ }
+ if !found {
+ return false
+ }
+ }
+ return true
+}
diff --git a/Godeps/_workspace/src/github.com/rs/cors/utils.go b/Godeps/_workspace/src/github.com/rs/cors/utils.go
new file mode 100644
index 000000000..429ab1114
--- /dev/null
+++ b/Godeps/_workspace/src/github.com/rs/cors/utils.go
@@ -0,0 +1,27 @@
+package cors
+
+import (
+ "net/http"
+ "strings"
+)
+
+type converter func(string) string
+
+// convert converts a list of string using the passed converter function
+func convert(s []string, c converter) []string {
+ out := []string{}
+ for _, i := range s {
+ out = append(out, c(i))
+ }
+ return out
+}
+
+func parseHeaderList(headerList string) (headers []string) {
+ for _, header := range strings.Split(headerList, ",") {
+ header = http.CanonicalHeaderKey(strings.TrimSpace(header))
+ if header != "" {
+ headers = append(headers, header)
+ }
+ }
+ return headers
+}
diff --git a/Godeps/_workspace/src/github.com/rs/xhandler/.travis.yml b/Godeps/_workspace/src/github.com/rs/xhandler/.travis.yml
new file mode 100644
index 000000000..b65c7a9f1
--- /dev/null
+++ b/Godeps/_workspace/src/github.com/rs/xhandler/.travis.yml
@@ -0,0 +1,7 @@
+language: go
+go:
+- 1.5
+- tip
+matrix:
+ allow_failures:
+ - go: tip
diff --git a/Godeps/_workspace/src/github.com/rs/xhandler/LICENSE b/Godeps/_workspace/src/github.com/rs/xhandler/LICENSE
new file mode 100644
index 000000000..47c5e9d2d
--- /dev/null
+++ b/Godeps/_workspace/src/github.com/rs/xhandler/LICENSE
@@ -0,0 +1,19 @@
+Copyright (c) 2015 Olivier Poitrey <rs@dailymotion.com>
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is furnished
+to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE.
diff --git a/Godeps/_workspace/src/github.com/rs/xhandler/README.md b/Godeps/_workspace/src/github.com/rs/xhandler/README.md
new file mode 100644
index 000000000..91c594bd2
--- /dev/null
+++ b/Godeps/_workspace/src/github.com/rs/xhandler/README.md
@@ -0,0 +1,134 @@
+# XHandler
+
+[![godoc](http://img.shields.io/badge/godoc-reference-blue.svg?style=flat)](https://godoc.org/github.com/rs/xhandler) [![license](http://img.shields.io/badge/license-MIT-red.svg?style=flat)](https://raw.githubusercontent.com/rs/xhandler/master/LICENSE) [![Build Status](https://travis-ci.org/rs/xhandler.svg?branch=master)](https://travis-ci.org/rs/xhandler) [![Coverage](http://gocover.io/_badge/github.com/rs/xhandler)](http://gocover.io/github.com/rs/xhandler)
+
+XHandler is a bridge between [net/context](https://godoc.org/golang.org/x/net/context) and `http.Handler`.
+
+It lets you enforce `net/context` in your handlers without sacrificing compatibility with existing `http.Handlers` nor imposing a specific router.
+
+Thanks to `net/context` deadline management, `xhandler` is able to enforce a per request deadline and will cancel the context when the client closes the connection unexpectedly.
+
+You may create your own `net/context` aware handler pretty much the same way as you would do with http.Handler.
+
+Read more about xhandler on [Dailymotion engineering blog](http://engineering.dailymotion.com/our-way-to-go/).
+
+## Installing
+
+ go get -u github.com/rs/xhandler
+
+## Usage
+
+```go
+package main
+
+import (
+ "log"
+ "net/http"
+ "time"
+
+ "github.com/rs/cors"
+ "github.com/rs/xhandler"
+ "golang.org/x/net/context"
+)
+
+type myMiddleware struct {
+ next xhandler.HandlerC
+}
+
+func (h myMiddleware) ServeHTTPC(ctx context.Context, w http.ResponseWriter, r *http.Request) {
+ ctx = context.WithValue(ctx, "test", "World")
+ h.next.ServeHTTPC(ctx, w, r)
+}
+
+func main() {
+ c := xhandler.Chain{}
+
+ // Add close notifier handler so context is cancelled when the client closes
+ // the connection
+ c.UseC(xhandler.CloseHandler)
+
+ // Add timeout handler
+ c.UseC(xhandler.TimeoutHandler(2 * time.Second))
+
+ // Middleware putting something in the context
+ c.UseC(func(next xhandler.HandlerC) xhandler.HandlerC {
+ return myMiddleware{next: next}
+ })
+
+ // Mix it with a non-context-aware middleware handler
+ c.Use(cors.Default().Handler)
+
+ // Final handler (using handlerFuncC), reading from the context
+ xh := xhandler.HandlerFuncC(func(ctx context.Context, w http.ResponseWriter, r *http.Request) {
+ value := ctx.Value("test").(string)
+ w.Write([]byte("Hello " + value))
+ })
+
+ // Bridge context aware handlers with http.Handler using xhandler.Handle()
+ http.Handle("/test", c.Handler(xh))
+
+ if err := http.ListenAndServe(":8080", nil); err != nil {
+ log.Fatal(err)
+ }
+}
+```
+
+### Using xmux
+
+Xhandler comes with an optional context aware [muxer](https://github.com/rs/xmux) forked from [httprouter](https://github.com/julienschmidt/httprouter):
+
+```go
+package main
+
+import (
+ "fmt"
+ "log"
+ "net/http"
+ "time"
+
+ "github.com/rs/xhandler"
+ "github.com/rs/xmux"
+ "golang.org/x/net/context"
+)
+
+func main() {
+ c := xhandler.Chain{}
+
+ // Append a context-aware middleware handler
+ c.UseC(xhandler.CloseHandler)
+
+ // Another context-aware middleware handler
+ c.UseC(xhandler.TimeoutHandler(2 * time.Second))
+
+ mux := xmux.New()
+
+ // Use c.Handler to terminate the chain with your final handler
+ mux.GET("/welcome/:name", xhandler.HandlerFuncC(func(ctx context.Context, w http.ResponseWriter, req *http.Request) {
+ fmt.Fprintf(w, "Welcome %s!", xmux.Params(ctx).Get("name"))
+ }))
+
+ if err := http.ListenAndServe(":8080", c.Handler(mux)); err != nil {
+ log.Fatal(err)
+ }
+}
+```
+
+See [xmux](https://github.com/rs/xmux) for more examples.
+
+## Context Aware Middleware
+
+Here is a list of `net/context` aware middleware handlers implementing `xhandler.HandlerC` interface.
+
+Feel free to put up a PR linking your middleware if you have built one:
+
+| Middleware | Author | Description |
+| ---------- | ------ | ----------- |
+| [xmux](https://github.com/rs/xmux) | [Olivier Poitrey](https://github.com/rs) | HTTP request muxer |
+| [xlog](https://github.com/rs/xlog) | [Olivier Poitrey](https://github.com/rs) | HTTP handler logger |
+| [xstats](https://github.com/rs/xstats) | [Olivier Poitrey](https://github.com/rs) | A generic client for service instrumentation |
+| [xaccess](https://github.com/rs/xaccess) | [Olivier Poitrey](https://github.com/rs) | HTTP handler access logger with [xlog](https://github.com/rs/xlog) and [xstats](https://github.com/rs/xstats) |
+| [cors](https://github.com/rs/cors) | [Olivier Poitrey](https://github.com/rs) | [Cross Origin Resource Sharing](http://www.w3.org/TR/cors/) (CORS) support |
+
+## Licenses
+
+All source code is licensed under the [MIT License](https://raw.github.com/rs/xhandler/master/LICENSE).
diff --git a/Godeps/_workspace/src/github.com/rs/xhandler/chain.go b/Godeps/_workspace/src/github.com/rs/xhandler/chain.go
new file mode 100644
index 000000000..ffac67e8a
--- /dev/null
+++ b/Godeps/_workspace/src/github.com/rs/xhandler/chain.go
@@ -0,0 +1,93 @@
+package xhandler
+
+import (
+ "net/http"
+
+ "github.com/ethereum/go-ethereum/Godeps/_workspace/src/golang.org/x/net/context"
+)
+
+// Chain is an helper to chain middleware handlers together for an easier
+// management.
+type Chain []func(next HandlerC) HandlerC
+
+// UseC appends a context-aware handler to the middleware chain.
+func (c *Chain) UseC(f func(next HandlerC) HandlerC) {
+ *c = append(*c, f)
+}
+
+// Use appends a standard http.Handler to the middleware chain without
+// lossing track of the context when inserted between two context aware handlers.
+//
+// Caveat: the f function will be called on each request so you are better to put
+// any initialization sequence outside of this function.
+func (c *Chain) Use(f func(next http.Handler) http.Handler) {
+ xf := func(next HandlerC) HandlerC {
+ return HandlerFuncC(func(ctx context.Context, w http.ResponseWriter, r *http.Request) {
+ n := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ next.ServeHTTPC(ctx, w, r)
+ })
+ f(n).ServeHTTP(w, r)
+ })
+ }
+ *c = append(*c, xf)
+}
+
+// Handler wraps the provided final handler with all the middleware appended to
+// the chain and return a new standard http.Handler instance.
+// The context.Background() context is injected automatically.
+func (c Chain) Handler(xh HandlerC) http.Handler {
+ ctx := context.Background()
+ return c.HandlerCtx(ctx, xh)
+}
+
+// HandlerFC is an helper to provide a function (HandlerFuncC) to Handler().
+//
+// HandlerFC is equivalent to:
+// c.Handler(xhandler.HandlerFuncC(xhc))
+func (c Chain) HandlerFC(xhf HandlerFuncC) http.Handler {
+ ctx := context.Background()
+ return c.HandlerCtx(ctx, HandlerFuncC(xhf))
+}
+
+// HandlerH is an helper to provide a standard http handler (http.HandlerFunc)
+// to Handler(). Your final handler won't have access the context though.
+func (c Chain) HandlerH(h http.Handler) http.Handler {
+ ctx := context.Background()
+ return c.HandlerCtx(ctx, HandlerFuncC(func(ctx context.Context, w http.ResponseWriter, r *http.Request) {
+ h.ServeHTTP(w, r)
+ }))
+}
+
+// HandlerF is an helper to provide a standard http handler function
+// (http.HandlerFunc) to Handler(). Your final handler won't have access
+// the context though.
+func (c Chain) HandlerF(hf http.HandlerFunc) http.Handler {
+ ctx := context.Background()
+ return c.HandlerCtx(ctx, HandlerFuncC(func(ctx context.Context, w http.ResponseWriter, r *http.Request) {
+ hf(w, r)
+ }))
+}
+
+// HandlerCtx wraps the provided final handler with all the middleware appended to
+// the chain and return a new standard http.Handler instance.
+func (c Chain) HandlerCtx(ctx context.Context, xh HandlerC) http.Handler {
+ return New(ctx, c.HandlerC(xh))
+}
+
+// HandlerC wraps the provided final handler with all the middleware appended to
+// the chain and returns a HandlerC instance.
+func (c Chain) HandlerC(xh HandlerC) HandlerC {
+ for i := len(c) - 1; i >= 0; i-- {
+ xh = c[i](xh)
+ }
+ return xh
+}
+
+// HandlerCF wraps the provided final handler func with all the middleware appended to
+// the chain and returns a HandlerC instance.
+//
+// HandlerCF is equivalent to:
+// c.HandlerC(xhandler.HandlerFuncC(xhc))
+func (c Chain) HandlerCF(xhc HandlerFuncC) HandlerC {
+ return c.HandlerC(HandlerFuncC(xhc))
+}
diff --git a/Godeps/_workspace/src/github.com/rs/xhandler/middleware.go b/Godeps/_workspace/src/github.com/rs/xhandler/middleware.go
new file mode 100644
index 000000000..64b180323
--- /dev/null
+++ b/Godeps/_workspace/src/github.com/rs/xhandler/middleware.go
@@ -0,0 +1,59 @@
+package xhandler
+
+import (
+ "net/http"
+ "time"
+
+ "github.com/ethereum/go-ethereum/Godeps/_workspace/src/golang.org/x/net/context"
+)
+
+// CloseHandler returns a Handler cancelling the context when the client
+// connection close unexpectedly.
+func CloseHandler(next HandlerC) HandlerC {
+ return HandlerFuncC(func(ctx context.Context, w http.ResponseWriter, r *http.Request) {
+ // Cancel the context if the client closes the connection
+ if wcn, ok := w.(http.CloseNotifier); ok {
+ var cancel context.CancelFunc
+ ctx, cancel = context.WithCancel(ctx)
+ defer cancel()
+
+ notify := wcn.CloseNotify()
+ go func() {
+ select {
+ case <-notify:
+ cancel()
+ case <-ctx.Done():
+ }
+ }()
+ }
+
+ next.ServeHTTPC(ctx, w, r)
+ })
+}
+
+// TimeoutHandler returns a Handler which adds a timeout to the context.
+//
+// Child handlers have the responsability to obey the context deadline and to return
+// an appropriate error (or not) response in case of timeout.
+func TimeoutHandler(timeout time.Duration) func(next HandlerC) HandlerC {
+ return func(next HandlerC) HandlerC {
+ return HandlerFuncC(func(ctx context.Context, w http.ResponseWriter, r *http.Request) {
+ ctx, _ = context.WithTimeout(ctx, timeout)
+ next.ServeHTTPC(ctx, w, r)
+ })
+ }
+}
+
+// If is a special handler that will skip insert the condNext handler only if a condition
+// applies at runtime.
+func If(cond func(ctx context.Context, w http.ResponseWriter, r *http.Request) bool, condNext func(next HandlerC) HandlerC) func(next HandlerC) HandlerC {
+ return func(next HandlerC) HandlerC {
+ return HandlerFuncC(func(ctx context.Context, w http.ResponseWriter, r *http.Request) {
+ if cond(ctx, w, r) {
+ condNext(next).ServeHTTPC(ctx, w, r)
+ } else {
+ next.ServeHTTPC(ctx, w, r)
+ }
+ })
+ }
+}
diff --git a/Godeps/_workspace/src/github.com/rs/xhandler/xhandler.go b/Godeps/_workspace/src/github.com/rs/xhandler/xhandler.go
new file mode 100644
index 000000000..b71789804
--- /dev/null
+++ b/Godeps/_workspace/src/github.com/rs/xhandler/xhandler.go
@@ -0,0 +1,42 @@
+// Package xhandler provides a bridge between http.Handler and net/context.
+//
+// xhandler enforces net/context in your handlers without sacrificing
+// compatibility with existing http.Handlers nor imposing a specific router.
+//
+// Thanks to net/context deadline management, xhandler is able to enforce
+// a per request deadline and will cancel the context in when the client close
+// the connection unexpectedly.
+//
+// You may create net/context aware middlewares pretty much the same way as
+// you would do with http.Handler.
+package xhandler
+
+import (
+ "net/http"
+
+ "github.com/ethereum/go-ethereum/Godeps/_workspace/src/golang.org/x/net/context"
+)
+
+// HandlerC is a net/context aware http.Handler
+type HandlerC interface {
+ ServeHTTPC(context.Context, http.ResponseWriter, *http.Request)
+}
+
+// HandlerFuncC type is an adapter to allow the use of ordinary functions
+// as a xhandler.Handler. If f is a function with the appropriate signature,
+// xhandler.HandlerFuncC(f) is a xhandler.Handler object that calls f.
+type HandlerFuncC func(context.Context, http.ResponseWriter, *http.Request)
+
+// ServeHTTPC calls f(ctx, w, r).
+func (f HandlerFuncC) ServeHTTPC(ctx context.Context, w http.ResponseWriter, r *http.Request) {
+ f(ctx, w, r)
+}
+
+// New creates a conventional http.Handler injecting the provided root
+// context to sub handlers. This handler is used as a bridge between conventional
+// http.Handler and context aware handlers.
+func New(ctx context.Context, h HandlerC) http.Handler {
+ return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ h.ServeHTTPC(ctx, w, r)
+ })
+}
diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go
index 69fb0b9db..07265ee20 100644
--- a/cmd/utils/flags.go
+++ b/cmd/utils/flags.go
@@ -245,7 +245,7 @@ var (
}
RPCCORSDomainFlag = cli.StringFlag{
Name: "rpccorsdomain",
- Usage: "Domains from which to accept cross origin requests (browser enforced)",
+ Usage: "Comma separated list of domains from which to accept cross origin requests (browser enforced)",
Value: "",
}
RPCApiFlag = cli.StringFlag{
diff --git a/rpc/http.go b/rpc/http.go
index d9053b003..af3d29014 100644
--- a/rpc/http.go
+++ b/rpc/http.go
@@ -17,240 +17,23 @@
package rpc
import (
- "bufio"
"bytes"
"encoding/json"
"fmt"
- "io"
"io/ioutil"
- "net"
"net/http"
"net/url"
- "strconv"
"strings"
- "time"
- "github.com/ethereum/go-ethereum/logger"
- "github.com/ethereum/go-ethereum/logger/glog"
- "gopkg.in/fatih/set.v0"
+ "io"
+
+ "github.com/rs/cors"
)
const (
- httpReadDeadLine = 60 * time.Second // wait max httpReadDeadeline for next request
+ maxHTTPRequestContentLength = 1024 * 128
)
-// httpMessageStream is the glue between a HTTP connection which is message based
-// and the RPC codecs that expect json requests to be read from a stream. It will
-// parse HTTP messages and offer the bodies of these requests as a stream through
-// the Read method. This will require full control of the connection and thus need
-// a "hijacked" HTTP connection.
-type httpMessageStream struct {
- conn net.Conn // TCP connection
- rw *bufio.ReadWriter // buffered where HTTP requests/responses are read/written from/to
- currentReq *http.Request // pending request, codec can pass in a too small buffer for a single read we need to keep track of the current requests if it was not read at once
- payloadBytesRead int64 // number of bytes which are read from the current request
- allowedOrigins *set.Set // allowed CORS domains
- origin string // origin of this connection/request
-}
-
-// NewHttpMessageStream will create a new http message stream parser that can be
-// used by the codes in the RPC package. It will take full control of the given
-// connection and thus needs to be hijacked. It will read and write HTTP messages
-// from the passed rwbuf. The allowed origins are the RPC CORS domains the user has supplied.
-func NewHTTPMessageStream(c net.Conn, rwbuf *bufio.ReadWriter, initialReq *http.Request, allowdOrigins []string) *httpMessageStream {
- r := &httpMessageStream{conn: c, rw: rwbuf, currentReq: initialReq, allowedOrigins: set.New()}
- for _, origin := range allowdOrigins {
- r.allowedOrigins.Add(origin)
- }
- return r
-}
-
-// handleOptionsRequest handles the HTTP preflight requests (OPTIONS) that browsers
-// make to enforce CORS rules. Only the POST method is allowed and the origin must
-// be on the rpccorsdomain list the user has specified.
-func (h *httpMessageStream) handleOptionsRequest(req *http.Request) error {
- headers := req.Header
-
- if !strings.EqualFold(req.Method, "OPTIONS") {
- return fmt.Errorf("preflight aborted: %s!=OPTIONS", req.Method)
- }
-
- origin := headers.Get("Origin")
- if origin == "" {
- return fmt.Errorf("preflight aborted: empty origin")
- }
-
- responseHeaders := make(http.Header)
- responseHeaders.Set("Access-Control-Allow-Methods", "POST")
- if h.allowedOrigins.Has(origin) || h.allowedOrigins.Has("*") {
- responseHeaders.Set("Access-Control-Allow-Origin", origin)
- } else {
- glog.V(logger.Info).Infof("origin '%s' not allowed", origin)
- }
- responseHeaders.Set("Access-Control-Allow-Headers", "Content-Type")
- responseHeaders.Set("Date", string(httpTimestamp(time.Now())))
- responseHeaders.Set("Content-Type", "text/plain; charset=utf-8")
- responseHeaders.Set("Content-Length", "0")
- responseHeaders.Set("Vary", "Origin")
-
- defer h.rw.Flush()
-
- if _, err := h.rw.WriteString("HTTP/1.1 200 OK\r\n"); err != nil {
- glog.V(logger.Error).Infof("unable to write OPTIONS response: %v\n", err)
- return err
- }
- if err := responseHeaders.Write(h.rw); err != nil {
- glog.V(logger.Error).Infof("unable to write OPTIONS headers: %v\n", err)
- }
- if _, err := h.rw.WriteString("\r\n"); err != nil {
- glog.V(logger.Error).Infof("unable to write OPTIONS response: %v\n", err)
- }
-
- return nil
-}
-
-// Read will read incoming HTTP requests and reads the body data from these requests
-// as an endless stream of data.
-func (h *httpMessageStream) Read(buf []byte) (n int, err error) {
- h.conn.SetReadDeadline(time.Now().Add(httpReadDeadLine))
- for {
- // if the last request was read completely try to read the next request
- if h.currentReq == nil {
- if h.currentReq, err = http.ReadRequest(bufio.NewReader(h.rw)); err != nil {
- return 0, err
- }
- }
-
- // The "options" method is http specific and not interested for the RPC server.
- // Handle it internally and wait for the next request.
- if strings.EqualFold(h.currentReq.Method, "OPTIONS") {
- if err = h.handleOptionsRequest(h.currentReq); err != nil {
- glog.V(logger.Info).Infof("RPC/HTTP OPTIONS error: %v\n", err)
- h.currentReq = nil
- return 0, err
- }
-
- // processed valid request -> reset deadline
- h.conn.SetReadDeadline(time.Now().Add(httpReadDeadLine))
- h.currentReq = nil
- continue
- }
-
- if strings.EqualFold(h.currentReq.Method, "GET") || strings.EqualFold(h.currentReq.Method, "POST") {
- n, err := h.currentReq.Body.Read(buf)
- h.payloadBytesRead += int64(n)
-
- // entire payload read, read new request next time
- if err == io.EOF || h.payloadBytesRead >= h.currentReq.ContentLength {
- h.origin = h.currentReq.Header.Get("origin")
- h.payloadBytesRead = 0
- h.currentReq.Body.Close()
- h.currentReq = nil
- err = nil // io.EOF is not an error
- } else if err != nil {
- // unable to read body
- h.currentReq.Body.Close()
- h.currentReq = nil
- h.payloadBytesRead = 0
- }
- // partial read of body
- return n, err
- }
- return 0, fmt.Errorf("unsupported HTTP method '%s'", h.currentReq.Method)
- }
-}
-
-// Write will create a HTTP response with the given payload and send it to the peer.
-func (h *httpMessageStream) Write(payload []byte) (int, error) {
- defer h.rw.Flush()
-
- responseHeaders := make(http.Header)
- responseHeaders.Set("Content-Type", "application/json")
- responseHeaders.Set("Content-Length", strconv.Itoa(len(payload)))
- if h.origin != "" {
- responseHeaders.Set("Access-Control-Allow-Origin", h.origin)
- }
-
- h.rw.WriteString("HTTP/1.1 200 OK\r\n")
- responseHeaders.Write(h.rw)
- h.rw.WriteString("\r\n")
-
- return h.rw.Write(payload)
-}
-
-// Close will close the underlying TCP connection this instance has taken ownership over.
-func (h *httpMessageStream) Close() error {
- h.rw.Flush()
- return h.conn.Close()
-}
-
-// TimeFormat is the time format to use with time.Parse and time.Time.Format when
-// parsing or generating times in HTTP headers. It is like time.RFC1123 but hard
-// codes GMT as the time zone.
-const TimeFormat = "Mon, 02 Jan 2006 15:04:05 GMT"
-
-// httpTimestamp formats the given t as specified in RFC1123.
-func httpTimestamp(t time.Time) []byte {
- const days = "SunMonTueWedThuFriSat"
- const months = "JanFebMarAprMayJunJulAugSepOctNovDec"
-
- b := make([]byte, 0)
- t = t.UTC()
- yy, mm, dd := t.Date()
- hh, mn, ss := t.Clock()
- day := days[3*t.Weekday():]
- mon := months[3*(mm-1):]
-
- return append(b,
- day[0], day[1], day[2], ',', ' ',
- byte('0'+dd/10), byte('0'+dd%10), ' ',
- mon[0], mon[1], mon[2], ' ',
- byte('0'+yy/1000), byte('0'+(yy/100)%10), byte('0'+(yy/10)%10), byte('0'+yy%10), ' ',
- byte('0'+hh/10), byte('0'+hh%10), ':',
- byte('0'+mn/10), byte('0'+mn%10), ':',
- byte('0'+ss/10), byte('0'+ss%10), ' ',
- 'G', 'M', 'T')
-}
-
-// httpConnHijacker is a http.Handler implementation that will hijack the HTTP
-// connection, wraps it in a HttpMessageStream that is then wrapped in a JSON
-// codec which will be served on the rpcServer.
-type httpConnHijacker struct {
- corsdomains []string
- rpcServer *Server
-}
-
-// ServeHTTP will hijack the connection, wraps the captured connection in a
-// HttpMessageStream which is then used as codec.
-func (h *httpConnHijacker) ServeHTTP(w http.ResponseWriter, req *http.Request) {
- hj, ok := w.(http.Hijacker)
- if !ok {
- http.Error(w, "webserver doesn't support hijacking", http.StatusInternalServerError)
- return
- }
-
- conn, rwbuf, err := hj.Hijack()
- if err != nil {
- http.Error(w, err.Error(), http.StatusInternalServerError)
- return
- }
-
- httpRequestStream := NewHTTPMessageStream(conn, rwbuf, req, h.corsdomains)
-
- codec := NewJSONCodec(httpRequestStream)
- go h.rpcServer.ServeCodec(codec)
-}
-
-// NewHTTPServer creates a new HTTP RPC server around an API provider.
-func NewHTTPServer(cors string, handler *Server) *http.Server {
- return &http.Server{
- Handler: &httpConnHijacker{
- corsdomains: strings.Split(cors, ","),
- rpcServer: handler,
- },
- }
-}
-
// httpClient connects to a geth RPC server over HTTP.
type httpClient struct {
endpoint *url.URL // HTTP-RPC server endpoint
@@ -313,3 +96,55 @@ func (client *httpClient) Close() {
func (client *httpClient) SupportedModules() (map[string]string, error) {
return SupportedModules(client)
}
+
+// httpReadWriteNopCloser wraps a io.Reader and io.Writer with a NOP Close method.
+type httpReadWriteNopCloser struct {
+ io.Reader
+ io.Writer
+}
+
+// Close does nothing and returns always nil
+func (t *httpReadWriteNopCloser) Close() error {
+ return nil
+}
+
+// newJSONHTTPHandler creates a HTTP handler that will parse incoming JSON requests,
+// send the request to the given API provider and sends the response back to the caller.
+func newJSONHTTPHandler(srv *Server) http.HandlerFunc {
+ return func(w http.ResponseWriter, r *http.Request) {
+ if r.ContentLength > maxHTTPRequestContentLength {
+ http.Error(w,
+ fmt.Sprintf("content length too large (%d>%d)", r.ContentLength, maxHTTPRequestContentLength),
+ http.StatusRequestEntityTooLarge)
+ return
+ }
+
+ w.Header().Set("content-type", "application/json")
+
+ // create a codec that reads direct from the request body until
+ // EOF and writes the response to w and order the server to process
+ // a single request.
+ codec := NewJSONCodec(&httpReadWriteNopCloser{r.Body, w})
+ defer codec.Close()
+ srv.ServeSingleRequest(codec)
+ }
+}
+
+// NewHTTPServer creates a new HTTP RPC server around an API provider.
+func NewHTTPServer(corsString string, srv *Server) *http.Server {
+ var allowedOrigins []string
+ for _, domain := range strings.Split(corsString, ",") {
+ allowedOrigins = append(allowedOrigins, strings.TrimSpace(domain))
+ }
+
+ c := cors.New(cors.Options{
+ AllowedOrigins: allowedOrigins,
+ AllowedMethods: []string{"POST", "GET"},
+ })
+
+ handler := c.Handler(newJSONHTTPHandler(srv))
+
+ return &http.Server{
+ Handler: handler,
+ }
+}
diff --git a/rpc/server.go b/rpc/server.go
index f42ee2d37..22448f8e3 100644
--- a/rpc/server.go
+++ b/rpc/server.go
@@ -117,14 +117,12 @@ func (s *Server) RegisterName(name string, rcvr interface{}) error {
return nil
}
-// ServeCodec reads incoming requests from codec, calls the appropriate callback and writes the
-// response back using the given codec. It will block until the codec is closed.
-//
-// This server will:
-// 1. allow for asynchronous and parallel request execution
-// 2. supports notifications (pub/sub)
-// 3. supports request batches
-func (s *Server) ServeCodec(codec ServerCodec) {
+// serveRequest will reads requests from the codec, calls the RPC callback and
+// writes the response to the given codec.
+// If singleShot is true it will process a single request, otherwise it will handle
+// requests until the codec returns an error when reading a request (in most cases
+// an EOF). It executes requests in parallel when singleShot is false.
+func (s *Server) serveRequest(codec ServerCodec, singleShot bool) error {
defer func() {
if err := recover(); err != nil {
const size = 64 << 10
@@ -132,7 +130,12 @@ func (s *Server) ServeCodec(codec ServerCodec) {
buf = buf[:runtime.Stack(buf, false)]
glog.Errorln(string(buf))
}
- codec.Close()
+
+ s.codecsMu.Lock()
+ s.codecs.Remove(codec)
+ s.codecsMu.Unlock()
+
+ return
}()
ctx, cancel := context.WithCancel(context.Background())
@@ -141,20 +144,22 @@ func (s *Server) ServeCodec(codec ServerCodec) {
s.codecsMu.Lock()
if atomic.LoadInt32(&s.run) != 1 { // server stopped
s.codecsMu.Unlock()
- return
+ return &shutdownError{}
}
s.codecs.Add(codec)
s.codecsMu.Unlock()
+ // test if the server is ordered to stop
for atomic.LoadInt32(&s.run) == 1 {
reqs, batch, err := s.readRequest(codec)
-
if err != nil {
glog.V(logger.Debug).Infof("%v\n", err)
codec.Write(codec.CreateErrorResponse(nil, err))
- break
+ return nil
}
+ // check if server is ordered to shutdown and return an error
+ // telling the client that his request failed.
if atomic.LoadInt32(&s.run) != 1 {
err = &shutdownError{}
if batch {
@@ -166,15 +171,42 @@ func (s *Server) ServeCodec(codec ServerCodec) {
} else {
codec.Write(codec.CreateErrorResponse(&reqs[0].id, err))
}
- break
+ return nil
}
- if batch {
+ if singleShot && batch {
+ s.execBatch(ctx, codec, reqs)
+ return nil
+ } else if singleShot && !batch {
+ s.exec(ctx, codec, reqs[0])
+ return nil
+ } else if !singleShot && batch {
go s.execBatch(ctx, codec, reqs)
} else {
go s.exec(ctx, codec, reqs[0])
}
}
+
+ return nil
+}
+
+// ServeCodec reads incoming requests from codec, calls the appropriate callback and writes the
+// response back using the given codec. It will block until the codec is closed or the server is
+// stopped. In either case the codec is closed.
+//
+// This server will:
+// 1. allow for asynchronous and parallel request execution
+// 2. supports notifications (pub/sub)
+// 3. supports request batches
+func (s *Server) ServeCodec(codec ServerCodec) {
+ defer codec.Close()
+ s.serveRequest(codec, false)
+}
+
+// ServeSingleRequest reads and processes a single RPC request from the given codec. It will not
+// close the codec unless a non-recoverable error has occurred.
+func (s *Server) ServeSingleRequest(codec ServerCodec) {
+ s.serveRequest(codec, true)
}
// Stop will stop reading new requests, wait for stopPendingRequestTimeout to allow pending requests to finish,