diff options
Diffstat (limited to 'src/go/unit')
-rw-r--r-- | src/go/unit/cbytes-1.6.go | 15 | ||||
-rw-r--r-- | src/go/unit/cbytes-1.7.go | 15 | ||||
-rw-r--r-- | src/go/unit/nxt_go_lib.c | 22 | ||||
-rw-r--r-- | src/go/unit/nxt_go_lib.h | 12 | ||||
-rw-r--r-- | src/go/unit/nxt_go_port.c | 41 | ||||
-rw-r--r-- | src/go/unit/nxt_go_run_ctx.c | 4 | ||||
-rw-r--r-- | src/go/unit/nxt_go_run_ctx.h | 3 | ||||
-rw-r--r-- | src/go/unit/port.go | 32 | ||||
-rw-r--r-- | src/go/unit/request.go | 138 | ||||
-rw-r--r-- | src/go/unit/response.go | 6 | ||||
-rw-r--r-- | src/go/unit/unit.go | 52 |
11 files changed, 96 insertions, 244 deletions
diff --git a/src/go/unit/cbytes-1.6.go b/src/go/unit/cbytes-1.6.go deleted file mode 100644 index f756b1de..00000000 --- a/src/go/unit/cbytes-1.6.go +++ /dev/null @@ -1,15 +0,0 @@ -// +build !go1.7 - -/* - * Copyright (C) Max Romanov - * Copyright (C) NGINX, Inc. - */ - -package unit - -import "C" -import "unsafe" - -func getCBytes(p []byte) unsafe.Pointer { - return unsafe.Pointer(C.CString(string(p))) // go <= 1.6 -} diff --git a/src/go/unit/cbytes-1.7.go b/src/go/unit/cbytes-1.7.go deleted file mode 100644 index e0de283d..00000000 --- a/src/go/unit/cbytes-1.7.go +++ /dev/null @@ -1,15 +0,0 @@ -// +build go1.7 - -/* - * Copyright (C) Max Romanov - * Copyright (C) NGINX, Inc. - */ - -package unit - -import "C" -import "unsafe" - -func getCBytes(p []byte) unsafe.Pointer { - return C.CBytes(p) // go >= 1.7 -} diff --git a/src/go/unit/nxt_go_lib.c b/src/go/unit/nxt_go_lib.c index 691688ab..84d64c0e 100644 --- a/src/go/unit/nxt_go_lib.c +++ b/src/go/unit/nxt_go_lib.c @@ -13,7 +13,7 @@ #include <nxt_main.h> int -nxt_go_response_write(nxt_go_request_t r, void *buf, size_t len) +nxt_go_response_write(nxt_go_request_t r, uintptr_t buf, size_t len) { nxt_int_t rc; nxt_go_run_ctx_t *ctx; @@ -25,14 +25,14 @@ nxt_go_response_write(nxt_go_request_t r, void *buf, size_t len) nxt_go_debug("write: %d", (int) len); ctx = (nxt_go_run_ctx_t *) r; - rc = nxt_go_ctx_write(ctx, buf, len); + rc = nxt_go_ctx_write(ctx, (void *) buf, len); return rc == NXT_OK ? len : -1; } int -nxt_go_request_read(nxt_go_request_t r, void *dst, size_t dst_len) +nxt_go_request_read(nxt_go_request_t r, uintptr_t dst, size_t dst_len) { size_t res; nxt_go_run_ctx_t *ctx; @@ -43,19 +43,19 @@ nxt_go_request_read(nxt_go_request_t r, void *dst, size_t dst_len) ctx = (nxt_go_run_ctx_t *) r; - dst_len = nxt_min(dst_len, ctx->r.body.preread_size); + dst_len = nxt_min(dst_len, ctx->request.body.preread_size); - res = nxt_go_ctx_read_raw(ctx, dst, dst_len); + res = nxt_go_ctx_read_raw(ctx, (void *) dst, dst_len); - ctx->r.body.preread_size -= res; + ctx->request.body.preread_size -= res; return res; } int -nxt_go_request_read_from(nxt_go_request_t r, void *dst, size_t dst_len, - void *src, size_t src_len) +nxt_go_request_read_from(nxt_go_request_t r, uintptr_t dst, size_t dst_len, + uintptr_t src, size_t src_len) { nxt_go_run_ctx_t *ctx; @@ -65,7 +65,7 @@ nxt_go_request_read_from(nxt_go_request_t r, void *dst, size_t dst_len, ctx = (nxt_go_run_ctx_t *) r; - nxt_go_ctx_add_msg(ctx, src, src_len); + nxt_go_ctx_add_msg(ctx, (void *) src, src_len); return nxt_go_request_read(r, dst, dst_len); } @@ -137,7 +137,7 @@ nxt_go_ready() nxt_go_request_t -nxt_go_process_port_msg(void *buf, size_t buf_len, void *oob, size_t oob_len) +nxt_go_process_port_msg(uintptr_t buf, size_t buf_len, uintptr_t oob, size_t oob_len) { - return nxt_go_port_on_read(buf, buf_len, oob, oob_len); + return nxt_go_port_on_read((void *) buf, buf_len, (void *) oob, oob_len); } diff --git a/src/go/unit/nxt_go_lib.h b/src/go/unit/nxt_go_lib.h index b3a86be9..33acf334 100644 --- a/src/go/unit/nxt_go_lib.h +++ b/src/go/unit/nxt_go_lib.h @@ -19,12 +19,12 @@ typedef struct { typedef uintptr_t nxt_go_request_t; -int nxt_go_response_write(nxt_go_request_t r, void *buf, size_t len); +int nxt_go_response_write(nxt_go_request_t r, uintptr_t buf, size_t len); -int nxt_go_request_read(nxt_go_request_t r, void *dst, size_t dst_len); +int nxt_go_request_read(nxt_go_request_t r, uintptr_t dst, size_t dst_len); -int nxt_go_request_read_from(nxt_go_request_t r, void *dst, size_t dst_len, - void *src, size_t src_len); +int nxt_go_request_read_from(nxt_go_request_t r, uintptr_t dst, size_t dst_len, + uintptr_t src, size_t src_len); int nxt_go_request_close(nxt_go_request_t r); @@ -32,8 +32,8 @@ int nxt_go_request_done(nxt_go_request_t r); void nxt_go_ready(); -nxt_go_request_t nxt_go_process_port_msg(void *buf, size_t buf_len, - void *oob, size_t oob_len); +nxt_go_request_t nxt_go_process_port_msg(uintptr_t buf, size_t buf_len, + uintptr_t oob, size_t oob_len); #endif /* _NXT_GO_LIB_H_INCLUDED_ */ diff --git a/src/go/unit/nxt_go_port.c b/src/go/unit/nxt_go_port.c index 8ea9a5ca..cbf12ab2 100644 --- a/src/go/unit/nxt_go_port.c +++ b/src/go/unit/nxt_go_port.c @@ -27,16 +27,15 @@ nxt_go_data_handler(nxt_port_msg_t *port_msg, size_t size) nxt_go_request_t r; nxt_app_request_header_t *h; - r = nxt_go_find_request(port_msg->stream); - if (r != 0) { - return r; - } + ctx = malloc(sizeof(nxt_go_run_ctx_t) + size); + + memcpy(ctx + 1, port_msg, size); + port_msg = (nxt_port_msg_t *) (ctx + 1); - ctx = malloc(sizeof(nxt_go_run_ctx_t)); nxt_go_ctx_init(ctx, port_msg, size - sizeof(nxt_port_msg_t)); r = (nxt_go_request_t)(ctx); - h = &ctx->r.header; + h = &ctx->request.header; nxt_go_ctx_read_str(ctx, &h->method); nxt_go_ctx_read_str(ctx, &h->target); @@ -58,18 +57,20 @@ nxt_go_data_handler(nxt_port_msg_t *port_msg, size_t size) h->path = h->target; } - nxt_go_new_request(r, port_msg->stream, nxt_go_str(&h->method), - nxt_go_str(&h->target)); + ctx->go_request = nxt_go_new_request(r, port_msg->stream, + nxt_go_str(&h->method), + nxt_go_str(&h->target)); nxt_go_ctx_read_str(ctx, &h->version); - nxt_go_request_set_proto(r, nxt_go_str(&h->version), + nxt_go_request_set_proto(ctx->go_request, nxt_go_str(&h->version), h->version.start[5] - '0', h->version.start[7] - '0'); - nxt_go_ctx_read_str(ctx, &ctx->r.remote); - if (ctx->r.remote.start != NULL) { - nxt_go_request_set_remote_addr(r, nxt_go_str(&ctx->r.remote)); + nxt_go_ctx_read_str(ctx, &ctx->request.remote); + if (ctx->request.remote.start != NULL) { + nxt_go_request_set_remote_addr(ctx->go_request, + nxt_go_str(&ctx->request.remote)); } nxt_go_ctx_read_str(ctx, &h->host); @@ -78,7 +79,7 @@ nxt_go_data_handler(nxt_port_msg_t *port_msg, size_t size) nxt_go_ctx_read_str(ctx, &h->content_length); if (h->host.start != NULL) { - nxt_go_request_set_host(r, nxt_go_str(&h->host)); + nxt_go_request_set_host(ctx->go_request, nxt_go_str(&h->host)); } nxt_go_ctx_read_size(ctx, &s); @@ -92,21 +93,23 @@ nxt_go_data_handler(nxt_port_msg_t *port_msg, size_t size) } rc = nxt_go_ctx_read_str(ctx, &v); - nxt_go_request_add_header(r, nxt_go_str(&n), nxt_go_str(&v)); + nxt_go_request_add_header(ctx->go_request, nxt_go_str(&n), + nxt_go_str(&v)); } while(1); nxt_go_ctx_read_size(ctx, &s); - ctx->r.body.preread_size = s; + ctx->request.body.preread_size = s; if (h->parsed_content_length > 0) { - nxt_go_request_set_content_length(r, h->parsed_content_length); + nxt_go_request_set_content_length(ctx->go_request, + h->parsed_content_length); } - if (ctx->r.body.preread_size < h->parsed_content_length) { - nxt_go_request_create_channel(r); + if (ctx->request.body.preread_size < h->parsed_content_length) { + nxt_go_warn("preread_size < content_length"); } - return r; + return ctx->go_request; } nxt_go_request_t diff --git a/src/go/unit/nxt_go_run_ctx.c b/src/go/unit/nxt_go_run_ctx.c index 902872ce..c5e41ee8 100644 --- a/src/go/unit/nxt_go_run_ctx.c +++ b/src/go/unit/nxt_go_run_ctx.c @@ -185,7 +185,7 @@ nxt_go_ctx_add_msg(nxt_go_run_ctx_t *ctx, nxt_port_msg_t *port_msg, size_t size) msg->start_offset = ctx->msg_last->start_offset; if (ctx->msg_last == &ctx->msg) { - msg->start_offset += ctx->r.body.preread_size; + msg->start_offset += ctx->request.body.preread_size; } else { msg->start_offset += ctx->msg_last->data_size; @@ -219,6 +219,8 @@ nxt_go_ctx_flush(nxt_go_run_ctx_t *ctx, int last) &ctx->wport_msg, sizeof(nxt_port_msg_t) + ctx->nwbuf * sizeof(nxt_port_mmap_msg_t), NULL, 0); + nxt_go_debug(" port send res = %d", rc); + ctx->nwbuf = 0; memset(&ctx->wbuf, 0, sizeof(ctx->wbuf)); diff --git a/src/go/unit/nxt_go_run_ctx.h b/src/go/unit/nxt_go_run_ctx.h index a5a972c6..f35b48de 100644 --- a/src/go/unit/nxt_go_run_ctx.h +++ b/src/go/unit/nxt_go_run_ctx.h @@ -47,7 +47,8 @@ typedef struct { nxt_port_msg_t wport_msg; char wmmap_msg_buf[ sizeof(nxt_port_mmap_msg_t) * 8 ]; - nxt_app_request_t r; + nxt_app_request_t request; + uintptr_t go_request; nxt_go_msg_t *msg_last; } nxt_go_run_ctx_t; diff --git a/src/go/unit/port.go b/src/go/unit/port.go index 2dc273fd..51733176 100644 --- a/src/go/unit/port.go +++ b/src/go/unit/port.go @@ -12,7 +12,6 @@ package unit import "C" import ( - "fmt" "net" "net/http" "os" @@ -74,6 +73,9 @@ func main_port() *port { } func add_port(p *port) { + + nxt_go_debug("add_port: %d:%d", p.key.pid, p.key.id); + port_registry_.Lock() if port_registry_.m == nil { port_registry_.m = make(map[port_key]*port) @@ -105,13 +107,13 @@ func getUnixConn(fd int) *net.UnixConn { c, err := net.FileConn(f) if err != nil { - fmt.Printf("FileConn error %s\n", err) + nxt_go_warn("FileConn error %s", err) return nil } uc, ok := c.(*net.UnixConn) if !ok { - fmt.Printf("Not a Unix-domain socket %d\n", fd) + nxt_go_warn("Not a Unix-domain socket %d", fd) return nil } @@ -135,6 +137,7 @@ func nxt_go_port_send(pid C.int, id C.int, buf unsafe.Pointer, buf_size C.int, p := find_port(key) if p == nil { + nxt_go_warn("port %d:%d not found", pid, id) return 0 } @@ -142,7 +145,7 @@ func nxt_go_port_send(pid C.int, id C.int, buf unsafe.Pointer, buf_size C.int, C.GoBytes(oob, oob_size), nil) if err != nil { - fmt.Printf("write result %d (%d), %s\n", n, oobn, err) + nxt_go_warn("write result %d (%d), %s", n, oobn, err) } return C.int(n) @@ -163,7 +166,7 @@ func nxt_go_main_send(buf unsafe.Pointer, buf_size C.int, oob unsafe.Pointer, C.GoBytes(oob, oob_size), nil) if err != nil { - fmt.Printf("write result %d (%d), %s\n", n, oobn, err) + nxt_go_warn("write result %d (%d), %s", n, oobn, err) } return C.int(n) @@ -188,6 +191,7 @@ func new_port(pid int, id int, t int, rcv int, snd int) *port { func (p *port) read(handler http.Handler) error { var buf [16384]byte var oob [1024]byte + var c_buf, c_oob cbuf n, oobn, _, _, err := p.rcv.ReadMsgUnix(buf[:], oob[:]) @@ -195,24 +199,16 @@ func (p *port) read(handler http.Handler) error { return err } - m := new_cmsg(buf[:n], oob[:oobn]) + c_buf.init(buf[:n]) + c_oob.init(oob[:oobn]) - c_req := C.nxt_go_process_port_msg(m.buf.b, m.buf.s, m.oob.b, m.oob.s) + go_req := C.nxt_go_process_port_msg(c_buf.b, c_buf.s, c_oob.b, c_oob.s) - if c_req == 0 { - m.Close() + if go_req == 0 { return nil } - r := find_request(c_req) - - if len(r.msgs) == 0 { - r.push(m) - } else if r.ch != nil { - r.ch <- m - } else { - m.Close() - } + r := get_request(go_req) go func(r *request) { if handler == nil { diff --git a/src/go/unit/request.go b/src/go/unit/request.go index 5af3db7b..4a839bdb 100644 --- a/src/go/unit/request.go +++ b/src/go/unit/request.go @@ -13,7 +13,7 @@ import "C" import ( "net/http" "net/url" - "sync" + "unsafe" ) type request struct { @@ -21,28 +21,14 @@ type request struct { resp *response c_req C.nxt_go_request_t id C.uint32_t - msgs []*cmsg - ch chan *cmsg } func (r *request) Read(p []byte) (n int, err error) { c := C.size_t(cap(p)) - b := C.malloc(c) - res := C.nxt_go_request_read(r.c_req, b, c) - - if res == -2 /* NXT_AGAIN */ { - m := <-r.ch + b := C.uintptr_t(uintptr(unsafe.Pointer(&p[0]))) - res = C.nxt_go_request_read_from(r.c_req, b, c, m.buf.b, - m.buf.s) - r.push(m) - } - - if res > 0 { - copy(p, C.GoBytes(b, res)) - } + res := C.nxt_go_request_read(r.c_req, b, c) - C.free(b) return int(res), nil } @@ -51,53 +37,6 @@ func (r *request) Close() error { return nil } -type request_registry struct { - sync.RWMutex - m map[C.nxt_go_request_t]*request - id map[C.uint32_t]*request -} - -var request_registry_ request_registry - -func find_request(c_req C.nxt_go_request_t) *request { - request_registry_.RLock() - res := request_registry_.m[c_req] - request_registry_.RUnlock() - - return res -} - -func find_request_by_id(id C.uint32_t) *request { - request_registry_.RLock() - res := request_registry_.id[id] - request_registry_.RUnlock() - - return res -} - -func add_request(r *request) { - request_registry_.Lock() - if request_registry_.m == nil { - request_registry_.m = make(map[C.nxt_go_request_t]*request) - request_registry_.id = make(map[C.uint32_t]*request) - } - - request_registry_.m[r.c_req] = r - request_registry_.id[r.id] = r - - request_registry_.Unlock() -} - -func remove_request(r *request) { - request_registry_.Lock() - if request_registry_.m != nil { - delete(request_registry_.m, r.c_req) - delete(request_registry_.id, r.id) - } - - request_registry_.Unlock() -} - func (r *request) response() *response { if r.resp == nil { r.resp = new_response(r.c_req, &r.req) @@ -107,33 +46,23 @@ func (r *request) response() *response { } func (r *request) done() { - remove_request(r) - C.nxt_go_request_done(r.c_req) - - for _, m := range r.msgs { - m.Close() - } - - if r.ch != nil { - close(r.ch) - } } -func (r *request) push(m *cmsg) { - r.msgs = append(r.msgs, m) +func get_request(go_req C.nxt_go_request_t) *request { + return (*request)(unsafe.Pointer(uintptr(go_req))) } //export nxt_go_new_request func nxt_go_new_request(c_req C.nxt_go_request_t, id C.uint32_t, - c_method *C.nxt_go_str_t, c_uri *C.nxt_go_str_t) { + c_method *C.nxt_go_str_t, c_uri *C.nxt_go_str_t) uintptr { uri := C.GoStringN(c_uri.start, c_uri.length) var URL *url.URL var err error if URL, err = url.ParseRequestURI(uri); err != nil { - return + return 0 } r := &request{ @@ -146,76 +75,49 @@ func nxt_go_new_request(c_req C.nxt_go_request_t, id C.uint32_t, }, c_req: c_req, id: id, - msgs: make([]*cmsg, 0, 1), } r.req.Body = r - add_request(r) -} - -//export nxt_go_find_request -func nxt_go_find_request(id C.uint32_t) C.nxt_go_request_t { - r := find_request_by_id(id) - - if r != nil { - return r.c_req - } - - return 0 + return uintptr(unsafe.Pointer(r)) } //export nxt_go_request_set_proto -func nxt_go_request_set_proto(c_req C.nxt_go_request_t, proto *C.nxt_go_str_t, +func nxt_go_request_set_proto(go_req C.nxt_go_request_t, proto *C.nxt_go_str_t, maj C.int, min C.int) { - r := find_request(c_req) + r := get_request(go_req) r.req.Proto = C.GoStringN(proto.start, proto.length) r.req.ProtoMajor = int(maj) r.req.ProtoMinor = int(min) } //export nxt_go_request_add_header -func nxt_go_request_add_header(c_req C.nxt_go_request_t, name *C.nxt_go_str_t, +func nxt_go_request_add_header(go_req C.nxt_go_request_t, name *C.nxt_go_str_t, value *C.nxt_go_str_t) { - r := find_request(c_req) + r := get_request(go_req) r.req.Header.Add(C.GoStringN(name.start, name.length), C.GoStringN(value.start, value.length)) } //export nxt_go_request_set_content_length -func nxt_go_request_set_content_length(c_req C.nxt_go_request_t, l C.int64_t) { - find_request(c_req).req.ContentLength = int64(l) -} - -//export nxt_go_request_create_channel -func nxt_go_request_create_channel(c_req C.nxt_go_request_t) { - find_request(c_req).ch = make(chan *cmsg) +func nxt_go_request_set_content_length(go_req C.nxt_go_request_t, l C.int64_t) { + get_request(go_req).req.ContentLength = int64(l) } //export nxt_go_request_set_host -func nxt_go_request_set_host(c_req C.nxt_go_request_t, host *C.nxt_go_str_t) { - find_request(c_req).req.Host = C.GoStringN(host.start, host.length) +func nxt_go_request_set_host(go_req C.nxt_go_request_t, host *C.nxt_go_str_t) { + get_request(go_req).req.Host = C.GoStringN(host.start, host.length) } //export nxt_go_request_set_url -func nxt_go_request_set_url(c_req C.nxt_go_request_t, scheme *C.char) { - find_request(c_req).req.URL.Scheme = C.GoString(scheme) +func nxt_go_request_set_url(go_req C.nxt_go_request_t, scheme *C.char) { + get_request(go_req).req.URL.Scheme = C.GoString(scheme) } //export nxt_go_request_set_remote_addr -func nxt_go_request_set_remote_addr(c_req C.nxt_go_request_t, +func nxt_go_request_set_remote_addr(go_req C.nxt_go_request_t, addr *C.nxt_go_str_t) { - find_request(c_req).req.RemoteAddr = C.GoStringN(addr.start, addr.length) -} - -//export nxt_go_request_serve -func nxt_go_request_serve(c_req C.nxt_go_request_t) { - r := find_request(c_req) - - go func(r *request) { - http.DefaultServeMux.ServeHTTP(r.response(), &r.req) - r.done() - }(r) + get_request(go_req).req.RemoteAddr = C.GoStringN(addr.start, addr.length) } diff --git a/src/go/unit/response.go b/src/go/unit/response.go index 18daa2f3..258a82c9 100644 --- a/src/go/unit/response.go +++ b/src/go/unit/response.go @@ -13,7 +13,6 @@ import "C" import ( "fmt" "net/http" - "os" ) type response struct { @@ -43,16 +42,15 @@ func (r *response) Write(p []byte) (n int, err error) { } l := C.size_t(len(p)) - b := getCBytes(p) + b := buf_ref(p) res := C.nxt_go_response_write(r.c_req, b, l) - C.free(b) return int(res), nil } func (r *response) WriteHeader(code int) { if r.headerSent { // Note: explicitly using Stderr, as Stdout is our HTTP output. - fmt.Fprintf(os.Stderr, "CGI attempted to write header twice") + nxt_go_warn("multiple response.WriteHeader calls") return } r.headerSent = true diff --git a/src/go/unit/unit.go b/src/go/unit/unit.go index 74e53ecf..82a54ecc 100644 --- a/src/go/unit/unit.go +++ b/src/go/unit/unit.go @@ -20,32 +20,21 @@ import ( ) type cbuf struct { - b unsafe.Pointer + b C.uintptr_t s C.size_t - f bool } -func new_cbuf(buf []byte) *cbuf { +func buf_ref(buf []byte) C.uintptr_t { if len(buf) == 0 { - return nil + return 0 } - return &cbuf{ - getCBytes(buf), C.size_t(len(buf)), true, - } + return C.uintptr_t(uintptr(unsafe.Pointer(&buf[0]))) } -func (buf *cbuf) Close() { - if buf == nil { - return - } - - if buf.f && buf.s > 0 { - C.free(buf.b) - buf.f = false - buf.b = nil - buf.s = 0 - } +func (buf *cbuf) init(b []byte) { + buf.b = buf_ref(b) + buf.s = C.size_t(len(b)) } func (buf *cbuf) GoBytes() []byte { @@ -54,24 +43,7 @@ func (buf *cbuf) GoBytes() []byte { return b[:0] } - return C.GoBytes(buf.b, C.int(buf.s)) -} - -type cmsg struct { - buf cbuf - oob cbuf -} - -func new_cmsg(buf []byte, oob []byte) *cmsg { - return &cmsg{ - buf: cbuf{getCBytes(buf), C.size_t(len(buf)), true}, - oob: cbuf{getCBytes(oob), C.size_t(len(oob)), true}, - } -} - -func (msg *cmsg) Close() { - msg.buf.Close() - msg.oob.Close() + return C.GoBytes(unsafe.Pointer(uintptr(buf.b)), C.int(buf.s)) } var nxt_go_quit bool = false @@ -81,6 +53,14 @@ func nxt_go_set_quit() { nxt_go_quit = true } +func nxt_go_warn(format string, args ...interface{}) { + fmt.Fprintf(os.Stderr, "[go warn] " + format + "\n", args...) +} + +func nxt_go_debug(format string, args ...interface{}) { + // fmt.Fprintf(os.Stderr, "[go debug] " + format + "\n", args...) +} + func ListenAndServe(addr string, handler http.Handler) error { var read_port *port |