summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorMax Romanov <max.romanov@nginx.com>2020-11-18 22:33:53 +0300
committerMax Romanov <max.romanov@nginx.com>2020-11-18 22:33:53 +0300
commit8132e1f700934a32bc9e3fb0ab66f550a335a326 (patch)
treec9e276355d4a1f6883efc85eaf2a1f3d65d2598a
parentd26afcb481d97cd71db014b16bde44e807043a2b (diff)
downloadunit-8132e1f700934a32bc9e3fb0ab66f550a335a326.tar.gz
unit-8132e1f700934a32bc9e3fb0ab66f550a335a326.tar.bz2
Go: removing C proxy functions and re-using goroutines.
Diffstat (limited to '')
-rw-r--r--go/nxt_cgo_lib.c151
-rw-r--r--go/nxt_cgo_lib.h32
-rw-r--r--go/port.go48
-rw-r--r--go/request.go154
-rw-r--r--go/response.go32
-rw-r--r--go/unit.go34
-rw-r--r--src/nxt_unit.c75
-rw-r--r--src/nxt_unit.h2
8 files changed, 225 insertions, 303 deletions
diff --git a/go/nxt_cgo_lib.c b/go/nxt_cgo_lib.c
index f7171f55..330697c1 100644
--- a/go/nxt_cgo_lib.c
+++ b/go/nxt_cgo_lib.c
@@ -10,16 +10,10 @@
#include <nxt_unit_request.h>
-static void nxt_cgo_request_handler(nxt_unit_request_info_t *req);
-static nxt_cgo_str_t *nxt_cgo_str_init(nxt_cgo_str_t *dst,
- nxt_unit_sptr_t *sptr, uint32_t length);
-static int nxt_cgo_add_port(nxt_unit_ctx_t *, nxt_unit_port_t *port);
-static void nxt_cgo_remove_port(nxt_unit_t *, nxt_unit_port_t *port);
static ssize_t nxt_cgo_port_send(nxt_unit_ctx_t *, nxt_unit_port_t *port,
const void *buf, size_t buf_size, const void *oob, size_t oob_size);
static ssize_t nxt_cgo_port_recv(nxt_unit_ctx_t *, nxt_unit_port_t *port,
void *buf, size_t buf_size, void *oob, size_t oob_size);
-static void nxt_cgo_shm_ack_handler(nxt_unit_ctx_t *ctx);
int
nxt_cgo_run(uintptr_t handler)
@@ -30,12 +24,12 @@ nxt_cgo_run(uintptr_t handler)
memset(&init, 0, sizeof(init));
- init.callbacks.request_handler = nxt_cgo_request_handler;
- init.callbacks.add_port = nxt_cgo_add_port;
- init.callbacks.remove_port = nxt_cgo_remove_port;
+ init.callbacks.request_handler = nxt_go_request_handler;
+ init.callbacks.add_port = nxt_go_add_port;
+ init.callbacks.remove_port = nxt_go_remove_port;
init.callbacks.port_send = nxt_cgo_port_send;
init.callbacks.port_recv = nxt_cgo_port_recv;
- init.callbacks.shm_ack_handler = nxt_cgo_shm_ack_handler;
+ init.callbacks.shm_ack_handler = nxt_go_shm_ack_handler;
init.data = (void *) handler;
@@ -52,76 +46,6 @@ nxt_cgo_run(uintptr_t handler)
}
-static void
-nxt_cgo_request_handler(nxt_unit_request_info_t *req)
-{
- uint32_t i;
- uintptr_t go_req;
- nxt_cgo_str_t method, uri, name, value, proto, host, remote_addr;
- nxt_unit_field_t *f;
- nxt_unit_request_t *r;
-
- r = req->request;
-
- go_req = nxt_go_request_create((uintptr_t) req,
- nxt_cgo_str_init(&method, &r->method, r->method_length),
- nxt_cgo_str_init(&uri, &r->target, r->target_length));
-
- nxt_go_request_set_proto(go_req,
- nxt_cgo_str_init(&proto, &r->version, r->version_length), 1, 1);
-
- for (i = 0; i < r->fields_count; i++) {
- f = &r->fields[i];
-
- nxt_go_request_add_header(go_req,
- nxt_cgo_str_init(&name, &f->name, f->name_length),
- nxt_cgo_str_init(&value, &f->value, f->value_length));
- }
-
- nxt_go_request_set_content_length(go_req, r->content_length);
- nxt_go_request_set_host(go_req,
- nxt_cgo_str_init(&host, &r->server_name, r->server_name_length));
- nxt_go_request_set_remote_addr(go_req,
- nxt_cgo_str_init(&remote_addr, &r->remote, r->remote_length));
-
- if (r->tls) {
- nxt_go_request_set_tls(go_req);
- }
-
- nxt_go_request_handler(go_req, (uintptr_t) req->unit->data);
-}
-
-
-static nxt_cgo_str_t *
-nxt_cgo_str_init(nxt_cgo_str_t *dst, nxt_unit_sptr_t *sptr, uint32_t length)
-{
- dst->length = length;
- dst->start = nxt_unit_sptr_get(sptr);
-
- return dst;
-}
-
-
-static int
-nxt_cgo_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
-{
- nxt_go_add_port((uintptr_t) ctx, port->id.pid, port->id.id,
- port->in_fd, port->out_fd);
-
- port->in_fd = -1;
- port->out_fd = -1;
-
- return NXT_UNIT_OK;
-}
-
-
-static void
-nxt_cgo_remove_port(nxt_unit_t *unit, nxt_unit_port_t *port)
-{
- nxt_go_remove_port(port->id.pid, port->id.id);
-}
-
-
static ssize_t
nxt_cgo_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
const void *buf, size_t buf_size, const void *oob, size_t oob_size)
@@ -140,78 +64,31 @@ nxt_cgo_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
}
-static void
-nxt_cgo_shm_ack_handler(nxt_unit_ctx_t *ctx)
-{
- return nxt_go_shm_ack_handler();
-}
-
-
-int
-nxt_cgo_response_create(uintptr_t req, int status, int fields,
- uint32_t fields_size)
-{
- return nxt_unit_response_init((nxt_unit_request_info_t *) req,
- status, fields, fields_size);
-}
-
-
-int
-nxt_cgo_response_add_field(uintptr_t req, uintptr_t name, uint8_t name_len,
- uintptr_t value, uint32_t value_len)
-{
- return nxt_unit_response_add_field((nxt_unit_request_info_t *) req,
- (char *) name, name_len,
- (char *) value, value_len);
-}
-
-
-int
-nxt_cgo_response_send(uintptr_t req)
-{
- return nxt_unit_response_send((nxt_unit_request_info_t *) req);
-}
-
-
ssize_t
-nxt_cgo_response_write(uintptr_t req, uintptr_t start, uint32_t len)
+nxt_cgo_response_write(nxt_unit_request_info_t *req, uintptr_t start,
+ uint32_t len)
{
- return nxt_unit_response_write_nb((nxt_unit_request_info_t *) req,
- (void *) start, len, 0);
+ return nxt_unit_response_write_nb(req, (void *) start, len, 0);
}
ssize_t
-nxt_cgo_request_read(uintptr_t req, uintptr_t dst, uint32_t dst_len)
-{
- return nxt_unit_request_read((nxt_unit_request_info_t *) req,
- (void *) dst, dst_len);
-}
-
-
-int
-nxt_cgo_request_close(uintptr_t req)
+nxt_cgo_request_read(nxt_unit_request_info_t *req, uintptr_t dst,
+ uint32_t dst_len)
{
- return 0;
+ return nxt_unit_request_read(req, (void *) dst, dst_len);
}
void
-nxt_cgo_request_done(uintptr_t req, int res)
+nxt_cgo_warn(const char *msg, uint32_t msg_len)
{
- nxt_unit_request_done((nxt_unit_request_info_t *) req, res);
-}
-
-
-void
-nxt_cgo_unit_run_shared(uintptr_t ctx)
-{
- nxt_unit_run_shared((nxt_unit_ctx_t *) ctx);
+ nxt_unit_warn(NULL, "%.*s", (int) msg_len, (char *) msg);
}
void
-nxt_cgo_warn(uintptr_t msg, uint32_t msg_len)
+nxt_cgo_alert(const char *msg, uint32_t msg_len)
{
- nxt_unit_warn(NULL, "%.*s", (int) msg_len, (char *) msg);
+ nxt_unit_alert(NULL, "%.*s", (int) msg_len, (char *) msg);
}
diff --git a/go/nxt_cgo_lib.h b/go/nxt_cgo_lib.h
index fa515be5..3705d1ef 100644
--- a/go/nxt_cgo_lib.h
+++ b/go/nxt_cgo_lib.h
@@ -11,32 +11,22 @@
#include <stdint.h>
#include <stdlib.h>
#include <sys/types.h>
+#include <nxt_unit.h>
+#include <nxt_unit_request.h>
-typedef struct {
- int length;
- char *start;
-} nxt_cgo_str_t;
+enum {
+ NXT_FIELDS_OFFSET = offsetof(nxt_unit_request_t, fields)
+};
int nxt_cgo_run(uintptr_t handler);
-int nxt_cgo_response_create(uintptr_t req, int code, int fields,
- uint32_t fields_size);
+ssize_t nxt_cgo_response_write(nxt_unit_request_info_t *req,
+ uintptr_t src, uint32_t len);
-int nxt_cgo_response_add_field(uintptr_t req, uintptr_t name, uint8_t name_len,
- uintptr_t value, uint32_t value_len);
+ssize_t nxt_cgo_request_read(nxt_unit_request_info_t *req,
+ uintptr_t dst, uint32_t dst_len);
-int nxt_cgo_response_send(uintptr_t req);
-
-ssize_t nxt_cgo_response_write(uintptr_t req, uintptr_t src, uint32_t len);
-
-ssize_t nxt_cgo_request_read(uintptr_t req, uintptr_t dst, uint32_t dst_len);
-
-int nxt_cgo_request_close(uintptr_t req);
-
-void nxt_cgo_request_done(uintptr_t req, int res);
-
-void nxt_cgo_unit_run_shared(uintptr_t ctx);
-
-void nxt_cgo_warn(uintptr_t msg, uint32_t msg_len);
+void nxt_cgo_warn(const char *msg, uint32_t msg_len);
+void nxt_cgo_alert(const char *msg, uint32_t msg_len);
#endif /* _NXT_CGO_LIB_H_INCLUDED_ */
diff --git a/go/port.go b/go/port.go
index 64004d91..78351322 100644
--- a/go/port.go
+++ b/go/port.go
@@ -11,6 +11,7 @@ package unit
import "C"
import (
+ "io"
"net"
"os"
"sync"
@@ -79,13 +80,13 @@ func getUnixConn(fd int) *net.UnixConn {
c, err := net.FileConn(f)
if err != nil {
- nxt_go_warn("FileConn error %s", err)
+ nxt_go_alert("FileConn error %s", err)
return nil
}
uc, ok := c.(*net.UnixConn)
if !ok {
- nxt_go_warn("Not a Unix-domain socket %d", fd)
+ nxt_go_alert("Not a Unix-domain socket %d", fd)
return nil
}
@@ -93,30 +94,37 @@ func getUnixConn(fd int) *net.UnixConn {
}
//export nxt_go_add_port
-func nxt_go_add_port(ctx C.uintptr_t, pid C.int, id C.int, rcv C.int, snd C.int) {
- p := &port{
+func nxt_go_add_port(ctx *C.nxt_unit_ctx_t, p *C.nxt_unit_port_t) C.int {
+
+ new_port := &port{
key: port_key{
- pid: int(pid),
- id: int(id),
+ pid: int(p.id.pid),
+ id: int(p.id.id),
},
- rcv: getUnixConn(int(rcv)),
- snd: getUnixConn(int(snd)),
+ rcv: getUnixConn(int(p.in_fd)),
+ snd: getUnixConn(int(p.out_fd)),
}
- add_port(p)
+ add_port(new_port)
+
+ p.in_fd = -1
+ p.out_fd = -1
- if id == 65535 {
- go func(ctx C.uintptr_t) {
- C.nxt_cgo_unit_run_shared(ctx);
+ if new_port.key.id == 65535 {
+ go func(ctx *C.nxt_unit_ctx_t) {
+ C.nxt_unit_run_shared(ctx);
}(ctx)
}
+
+ return C.NXT_UNIT_OK
}
//export nxt_go_remove_port
-func nxt_go_remove_port(pid C.int, id C.int) {
+func nxt_go_remove_port(unit *C.nxt_unit_t, p *C.nxt_unit_port_t) {
+
key := port_key{
- pid: int(pid),
- id: int(id),
+ pid: int(p.id.pid),
+ id: int(p.id.id),
}
port_registry_.Lock()
@@ -139,7 +147,7 @@ func nxt_go_port_send(pid C.int, id C.int, buf unsafe.Pointer, buf_size C.int,
p := find_port(key)
if p == nil {
- nxt_go_warn("port %d:%d not found", pid, id)
+ nxt_go_alert("port %d:%d not found", pid, id)
return 0
}
@@ -167,7 +175,7 @@ func nxt_go_port_recv(pid C.int, id C.int, buf unsafe.Pointer, buf_size C.int,
p := find_port(key)
if p == nil {
- nxt_go_warn("port %d:%d not found", pid, id)
+ nxt_go_alert("port %d:%d not found", pid, id)
return 0
}
@@ -175,6 +183,12 @@ func nxt_go_port_recv(pid C.int, id C.int, buf unsafe.Pointer, buf_size C.int,
GoBytes(oob, oob_size))
if err != nil {
+ if nerr, ok := err.(*net.OpError); ok {
+ if nerr.Err == io.EOF {
+ return 0
+ }
+ }
+
nxt_go_warn("read result %d (%d), %s", n, oobn, err)
n = -1
diff --git a/go/request.go b/go/request.go
index 1d8c6702..7e2e848a 100644
--- a/go/request.go
+++ b/go/request.go
@@ -19,9 +19,9 @@ import (
)
type request struct {
- req http.Request
- resp *response
- c_req C.uintptr_t
+ req http.Request
+ resp response
+ c_req *C.nxt_unit_request_info_t
}
func (r *request) Read(p []byte) (n int, err error) {
@@ -35,110 +35,102 @@ func (r *request) Read(p []byte) (n int, err error) {
}
func (r *request) Close() error {
- C.nxt_cgo_request_close(r.c_req)
return nil
}
-func (r *request) response() *response {
- if r.resp == nil {
- r.resp = new_response(r.c_req, &r.req)
- }
+func new_request(c_req *C.nxt_unit_request_info_t) (r *request, err error) {
+ req := c_req.request
- return r.resp
-}
+ uri := GoStringN(&req.target, C.int(req.target_length))
-func (r *request) done() {
- resp := r.response()
- if !resp.headerSent {
- resp.WriteHeader(http.StatusOK)
+ URL, err := url.ParseRequestURI(uri)
+ if err != nil {
+ return nil, err
}
- C.nxt_cgo_request_done(r.c_req, 0)
-}
-
-func get_request(go_req uintptr) *request {
- return (*request)(unsafe.Pointer(go_req))
-}
-
-//export nxt_go_request_create
-func nxt_go_request_create(c_req C.uintptr_t,
- c_method *C.nxt_cgo_str_t, c_uri *C.nxt_cgo_str_t) uintptr {
-
- uri := C.GoStringN(c_uri.start, c_uri.length)
- var URL *url.URL
- var err error
- if URL, err = url.ParseRequestURI(uri); err != nil {
- return 0
- }
+ proto := GoStringN(&req.version, C.int(req.version_length))
- r := &request{
- req: http.Request{
- Method: C.GoStringN(c_method.start, c_method.length),
- URL: URL,
- Header: http.Header{},
- Body: nil,
+ r = &request{
+ req: http.Request {
+ URL: URL,
+ Header: http.Header{},
RequestURI: uri,
+ Method: GoStringN(&req.method, C.int(req.method_length)),
+ Proto: proto,
+ ProtoMajor: 1,
+ ProtoMinor: int(proto[7] - '0'),
+ ContentLength: int64(req.content_length),
+ Host: GoStringN(&req.server_name, C.int(req.server_name_length)),
+ RemoteAddr: GoStringN(&req.remote, C.int(req.remote_length)),
},
+ resp: response{header: http.Header{}, c_req: c_req},
c_req: c_req,
}
+
r.req.Body = r
- return uintptr(unsafe.Pointer(r))
-}
+ if req.tls != 0 {
+ r.req.TLS = &tls.ConnectionState{ }
+ r.req.URL.Scheme = "https"
-//export nxt_go_request_set_proto
-func nxt_go_request_set_proto(go_req uintptr, proto *C.nxt_cgo_str_t,
- maj C.int, min C.int) {
+ } else {
+ r.req.URL.Scheme = "http"
+ }
- r := get_request(go_req)
- r.req.Proto = C.GoStringN(proto.start, proto.length)
- r.req.ProtoMajor = int(maj)
- r.req.ProtoMinor = int(min)
-}
+ fields := get_fields(req)
-//export nxt_go_request_add_header
-func nxt_go_request_add_header(go_req uintptr, name *C.nxt_cgo_str_t,
- value *C.nxt_cgo_str_t) {
+ for i := 0; i < len(fields); i++ {
+ f := &fields[i]
- r := get_request(go_req)
- r.req.Header.Add(C.GoStringN(name.start, name.length),
- C.GoStringN(value.start, value.length))
-}
+ n := GoStringN(&f.name, C.int(f.name_length))
+ v := GoStringN(&f.value, C.int(f.value_length))
-//export nxt_go_request_set_content_length
-func nxt_go_request_set_content_length(go_req uintptr, l C.int64_t) {
- get_request(go_req).req.ContentLength = int64(l)
-}
+ r.req.Header.Add(n, v)
+ }
-//export nxt_go_request_set_host
-func nxt_go_request_set_host(go_req uintptr, host *C.nxt_cgo_str_t) {
- get_request(go_req).req.Host = C.GoStringN(host.start, host.length)
+ return r, nil
}
-//export nxt_go_request_set_url
-func nxt_go_request_set_url(go_req uintptr, scheme *C.char) {
- get_request(go_req).req.URL.Scheme = C.GoString(scheme)
-}
+func get_fields(req *C.nxt_unit_request_t) []C.nxt_unit_field_t {
+ f := uintptr(unsafe.Pointer(req)) + uintptr(C.NXT_FIELDS_OFFSET)
-//export nxt_go_request_set_remote_addr
-func nxt_go_request_set_remote_addr(go_req uintptr, addr *C.nxt_cgo_str_t) {
+ h := &slice_header{
+ Data: unsafe.Pointer(f),
+ Len: int(req.fields_count),
+ Cap: int(req.fields_count),
+ }
- get_request(go_req).req.RemoteAddr = C.GoStringN(addr.start, addr.length)
+ return *(*[]C.nxt_unit_field_t)(unsafe.Pointer(h))
}
-//export nxt_go_request_set_tls
-func nxt_go_request_set_tls(go_req uintptr) {
+//export nxt_go_request_handler
+func nxt_go_request_handler(c_req *C.nxt_unit_request_info_t) {
- get_request(go_req).req.TLS = &tls.ConnectionState{ }
-}
+ go func(c_req *C.nxt_unit_request_info_t, handler http.Handler) {
-//export nxt_go_request_handler
-func nxt_go_request_handler(go_req uintptr, h uintptr) {
- r := get_request(go_req)
- handler := get_handler(h)
-
- go func(r *request) {
- handler.ServeHTTP(r.response(), &r.req)
- r.done()
- }(r)
+ ctx := c_req.ctx
+
+ for {
+ r, err := new_request(c_req)
+
+ if err == nil {
+ handler.ServeHTTP(&r.resp, &r.req)
+
+ if !r.resp.header_sent {
+ r.resp.WriteHeader(http.StatusOK)
+ }
+
+ C.nxt_unit_request_done(c_req, C.NXT_UNIT_OK)
+
+ } else {
+ C.nxt_unit_request_done(c_req, C.NXT_UNIT_ERROR)
+ }
+
+ c_req = C.nxt_unit_dequeue_request(ctx)
+ if c_req == nil {
+ break
+ }
+ }
+
+ }(c_req, get_handler(uintptr(c_req.unit.data)))
}
diff --git a/go/response.go b/go/response.go
index bfa79656..a1af30b3 100644
--- a/go/response.go
+++ b/go/response.go
@@ -16,28 +16,17 @@ import (
type response struct {
header http.Header
- headerSent bool
- req *http.Request
- c_req C.uintptr_t
+ header_sent bool
+ c_req *C.nxt_unit_request_info_t
ch chan int
}
-func new_response(c_req C.uintptr_t, req *http.Request) *response {
- resp := &response{
- header: http.Header{},
- req: req,
- c_req: c_req,
- }
-
- return resp
-}
-
func (r *response) Header() http.Header {
return r.header
}
func (r *response) Write(p []byte) (n int, err error) {
- if !r.headerSent {
+ if !r.header_sent {
r.WriteHeader(http.StatusOK)
}
@@ -64,12 +53,11 @@ func (r *response) Write(p []byte) (n int, err error) {
}
func (r *response) WriteHeader(code int) {
- if r.headerSent {
- // Note: explicitly using Stderr, as Stdout is our HTTP output.
+ if r.header_sent {
nxt_go_warn("multiple response.WriteHeader calls")
return
}
- r.headerSent = true
+ r.header_sent = true
// Set a default Content-Type
if _, hasType := r.header["Content-Type"]; !hasType {
@@ -86,21 +74,21 @@ func (r *response) WriteHeader(code int) {
}
}
- C.nxt_cgo_response_create(r.c_req, C.int(code), C.int(fields),
+ C.nxt_unit_response_init(r.c_req, C.uint16_t(code), C.uint32_t(fields),
C.uint32_t(fields_size))
for k, vv := range r.header {
for _, v := range vv {
- C.nxt_cgo_response_add_field(r.c_req, str_ref(k), C.uint8_t(len(k)),
+ C.nxt_unit_response_add_field(r.c_req, str_ref(k), C.uint8_t(len(k)),
str_ref(v), C.uint32_t(len(v)))
}
}
- C.nxt_cgo_response_send(r.c_req)
+ C.nxt_unit_response_send(r.c_req)
}
func (r *response) Flush() {
- if !r.headerSent {
+ if !r.header_sent {
r.WriteHeader(http.StatusOK)
}
}
@@ -114,6 +102,6 @@ func wait_shm_ack(c chan int) {
}
//export nxt_go_shm_ack_handler
-func nxt_go_shm_ack_handler() {
+func nxt_go_shm_ack_handler(ctx *C.nxt_unit_ctx_t) {
observer_registry_.notify(1)
}
diff --git a/go/unit.go b/go/unit.go
index 1534479e..b5dd4f6c 100644
--- a/go/unit.go
+++ b/go/unit.go
@@ -30,15 +30,15 @@ func buf_ref(buf []byte) C.uintptr_t {
return C.uintptr_t(uintptr(unsafe.Pointer(&buf[0])))
}
-type StringHeader struct {
+type string_header struct {
Data unsafe.Pointer
Len int
}
-func str_ref(s string) C.uintptr_t {
- header := (*StringHeader)(unsafe.Pointer(&s))
+func str_ref(s string) *C.char {
+ header := (*string_header)(unsafe.Pointer(&s))
- return C.uintptr_t(uintptr(unsafe.Pointer(header.Data)))
+ return (*C.char)(header.Data)
}
func (buf *cbuf) init_bytes(b []byte) {
@@ -46,12 +46,7 @@ func (buf *cbuf) init_bytes(b []byte) {
buf.s = C.size_t(len(b))
}
-func (buf *cbuf) init_string(s string) {
- buf.b = str_ref(s)
- buf.s = C.size_t(len(s))
-}
-
-type SliceHeader struct {
+type slice_header struct {
Data unsafe.Pointer
Len int
Cap int
@@ -63,17 +58,17 @@ func (buf *cbuf) GoBytes() []byte {
return b[:0]
}
- bytesHeader := &SliceHeader{
+ header := &slice_header{
Data: unsafe.Pointer(uintptr(buf.b)),
Len: int(buf.s),
Cap: int(buf.s),
}
- return *(*[]byte)(unsafe.Pointer(bytesHeader))
+ return *(*[]byte)(unsafe.Pointer(header))
}
func GoBytes(buf unsafe.Pointer, size C.int) []byte {
- bytesHeader := &SliceHeader{
+ bytesHeader := &slice_header{
Data: buf,
Len: int(size),
Cap: int(size),
@@ -82,12 +77,25 @@ func GoBytes(buf unsafe.Pointer, size C.int) []byte {
return *(*[]byte)(unsafe.Pointer(bytesHeader))
}
+func GoStringN(sptr *C.nxt_unit_sptr_t, l C.int) string {
+ p := unsafe.Pointer(sptr)
+ b := uintptr(p) + uintptr(*(*C.uint32_t)(p))
+
+ return C.GoStringN((*C.char)(unsafe.Pointer(b)), l)
+}
+
func nxt_go_warn(format string, args ...interface{}) {
str := fmt.Sprintf("[go] " + format, args...)
C.nxt_cgo_warn(str_ref(str), C.uint32_t(len(str)))
}
+func nxt_go_alert(format string, args ...interface{}) {
+ str := fmt.Sprintf("[go] " + format, args...)
+
+ C.nxt_cgo_alert(str_ref(str), C.uint32_t(len(str)))
+}
+
type handler_registry struct {
sync.RWMutex
next uintptr
diff --git a/src/nxt_unit.c b/src/nxt_unit.c
index f0c68374..44525d04 100644
--- a/src/nxt_unit.c
+++ b/src/nxt_unit.c
@@ -54,12 +54,13 @@ static int nxt_unit_read_env(nxt_unit_port_t *ready_port,
int *log_fd, uint32_t *stream, uint32_t *shm_limit);
static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream,
int queue_fd);
-static int nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf);
+static int nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf,
+ nxt_unit_request_info_t **preq);
static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx,
nxt_unit_recv_msg_t *recv_msg);
static int nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx);
static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx,
- nxt_unit_recv_msg_t *recv_msg);
+ nxt_unit_recv_msg_t *recv_msg, nxt_unit_request_info_t **preq);
static int nxt_unit_process_req_body(nxt_unit_ctx_t *ctx,
nxt_unit_recv_msg_t *recv_msg);
static int nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
@@ -904,7 +905,8 @@ nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream, int queue_fd)
static int
-nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
+nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf,
+ nxt_unit_request_info_t **preq)
{
int rc;
pid_t pid;
@@ -1040,7 +1042,7 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
break;
case _NXT_PORT_MSG_REQ_HEADERS:
- rc = nxt_unit_process_req_headers(ctx, &recv_msg);
+ rc = nxt_unit_process_req_headers(ctx, &recv_msg, preq);
break;
case _NXT_PORT_MSG_REQ_BODY:
@@ -1213,7 +1215,8 @@ nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx)
static int
-nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
+nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg,
+ nxt_unit_request_info_t **preq)
{
int res;
nxt_unit_impl_t *lib;
@@ -1329,7 +1332,12 @@ nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
}
}
- lib->callbacks.request_handler(req);
+ if (preq == NULL) {
+ lib->callbacks.request_handler(req);
+
+ } else {
+ *preq = req;
+ }
}
return NXT_UNIT_OK;
@@ -2179,7 +2187,8 @@ nxt_unit_response_add_field(nxt_unit_request_info_t *req,
resp = req->response;
if (nxt_slow_path(resp->fields_count >= req->response_max_fields)) {
- nxt_unit_req_warn(req, "add_field: too many response fields");
+ nxt_unit_req_warn(req, "add_field: too many response fields (%d)",
+ (int) resp->fields_count);
return NXT_UNIT_ERROR;
}
@@ -2356,6 +2365,8 @@ nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size)
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
nxt_unit_mmap_buf_release(mmap_buf);
+ nxt_unit_req_alert(req, "response_buf_alloc: failed to get out buf");
+
return NULL;
}
@@ -4537,7 +4548,7 @@ nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx)
return rc;
}
- rc = nxt_unit_process_msg(ctx, rbuf);
+ rc = nxt_unit_process_msg(ctx, rbuf, NULL);
if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
return NXT_UNIT_ERROR;
}
@@ -4686,7 +4697,7 @@ nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx)
nxt_queue_each(rbuf, &pending_rbuf, nxt_unit_read_buf_t, link) {
if (nxt_fast_path(rc != NXT_UNIT_ERROR)) {
- rc = nxt_unit_process_msg(&ctx_impl->ctx, rbuf);
+ rc = nxt_unit_process_msg(&ctx_impl->ctx, rbuf, NULL);
} else {
nxt_unit_read_buf_release(ctx, rbuf);
@@ -4793,7 +4804,7 @@ nxt_unit_run_ctx(nxt_unit_ctx_t *ctx)
goto retry;
}
- rc = nxt_unit_process_msg(ctx, rbuf);
+ rc = nxt_unit_process_msg(ctx, rbuf, NULL);
if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
break;
}
@@ -4902,7 +4913,7 @@ nxt_unit_run_shared(nxt_unit_ctx_t *ctx)
break;
}
- rc = nxt_unit_process_msg(ctx, rbuf);
+ rc = nxt_unit_process_msg(ctx, rbuf, NULL);
if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
break;
}
@@ -4921,6 +4932,46 @@ nxt_unit_run_shared(nxt_unit_ctx_t *ctx)
}
+nxt_unit_request_info_t *
+nxt_unit_dequeue_request(nxt_unit_ctx_t *ctx)
+{
+ int rc;
+ nxt_unit_impl_t *lib;
+ nxt_unit_read_buf_t *rbuf;
+ nxt_unit_ctx_impl_t *ctx_impl;
+ nxt_unit_request_info_t *req;
+
+ nxt_unit_ctx_use(ctx);
+
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+ ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
+
+ req = NULL;
+
+ if (nxt_slow_path(!ctx_impl->online)) {
+ goto done;
+ }
+
+ rbuf = nxt_unit_read_buf_get(ctx);
+ if (nxt_slow_path(rbuf == NULL)) {
+ goto done;
+ }
+
+ rc = nxt_unit_app_queue_recv(lib->shared_port, rbuf);
+ if (rc != NXT_UNIT_OK) {
+ goto done;
+ }
+
+ (void) nxt_unit_process_msg(ctx, rbuf, &req);
+
+done:
+
+ nxt_unit_ctx_release(ctx);
+
+ return req;
+}
+
+
int
nxt_unit_is_main_ctx(nxt_unit_ctx_t *ctx)
{
@@ -4977,7 +5028,7 @@ retry:
return rc;
}
- rc = nxt_unit_process_msg(ctx, rbuf);
+ rc = nxt_unit_process_msg(ctx, rbuf, NULL);
if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
return NXT_UNIT_ERROR;
}
diff --git a/src/nxt_unit.h b/src/nxt_unit.h
index 90cba2a3..1e1a8dbe 100644
--- a/src/nxt_unit.h
+++ b/src/nxt_unit.h
@@ -213,6 +213,8 @@ int nxt_unit_run_ctx(nxt_unit_ctx_t *ctx);
int nxt_unit_run_shared(nxt_unit_ctx_t *ctx);
+nxt_unit_request_info_t *nxt_unit_dequeue_request(nxt_unit_ctx_t *ctx);
+
int nxt_unit_is_main_ctx(nxt_unit_ctx_t *ctx);
/*