diff options
Diffstat (limited to '')
-rw-r--r-- | go/nxt_cgo_lib.c | 151 | ||||
-rw-r--r-- | go/nxt_cgo_lib.h | 32 | ||||
-rw-r--r-- | go/port.go | 48 | ||||
-rw-r--r-- | go/request.go | 154 | ||||
-rw-r--r-- | go/response.go | 32 | ||||
-rw-r--r-- | go/unit.go | 34 | ||||
-rw-r--r-- | src/nxt_unit.c | 75 | ||||
-rw-r--r-- | src/nxt_unit.h | 2 |
8 files changed, 225 insertions, 303 deletions
diff --git a/go/nxt_cgo_lib.c b/go/nxt_cgo_lib.c index f7171f55..330697c1 100644 --- a/go/nxt_cgo_lib.c +++ b/go/nxt_cgo_lib.c @@ -10,16 +10,10 @@ #include <nxt_unit_request.h> -static void nxt_cgo_request_handler(nxt_unit_request_info_t *req); -static nxt_cgo_str_t *nxt_cgo_str_init(nxt_cgo_str_t *dst, - nxt_unit_sptr_t *sptr, uint32_t length); -static int nxt_cgo_add_port(nxt_unit_ctx_t *, nxt_unit_port_t *port); -static void nxt_cgo_remove_port(nxt_unit_t *, nxt_unit_port_t *port); static ssize_t nxt_cgo_port_send(nxt_unit_ctx_t *, nxt_unit_port_t *port, const void *buf, size_t buf_size, const void *oob, size_t oob_size); static ssize_t nxt_cgo_port_recv(nxt_unit_ctx_t *, nxt_unit_port_t *port, void *buf, size_t buf_size, void *oob, size_t oob_size); -static void nxt_cgo_shm_ack_handler(nxt_unit_ctx_t *ctx); int nxt_cgo_run(uintptr_t handler) @@ -30,12 +24,12 @@ nxt_cgo_run(uintptr_t handler) memset(&init, 0, sizeof(init)); - init.callbacks.request_handler = nxt_cgo_request_handler; - init.callbacks.add_port = nxt_cgo_add_port; - init.callbacks.remove_port = nxt_cgo_remove_port; + init.callbacks.request_handler = nxt_go_request_handler; + init.callbacks.add_port = nxt_go_add_port; + init.callbacks.remove_port = nxt_go_remove_port; init.callbacks.port_send = nxt_cgo_port_send; init.callbacks.port_recv = nxt_cgo_port_recv; - init.callbacks.shm_ack_handler = nxt_cgo_shm_ack_handler; + init.callbacks.shm_ack_handler = nxt_go_shm_ack_handler; init.data = (void *) handler; @@ -52,76 +46,6 @@ nxt_cgo_run(uintptr_t handler) } -static void -nxt_cgo_request_handler(nxt_unit_request_info_t *req) -{ - uint32_t i; - uintptr_t go_req; - nxt_cgo_str_t method, uri, name, value, proto, host, remote_addr; - nxt_unit_field_t *f; - nxt_unit_request_t *r; - - r = req->request; - - go_req = nxt_go_request_create((uintptr_t) req, - nxt_cgo_str_init(&method, &r->method, r->method_length), - nxt_cgo_str_init(&uri, &r->target, r->target_length)); - - nxt_go_request_set_proto(go_req, - nxt_cgo_str_init(&proto, &r->version, r->version_length), 1, 1); - - for (i = 0; i < r->fields_count; i++) { - f = &r->fields[i]; - - nxt_go_request_add_header(go_req, - nxt_cgo_str_init(&name, &f->name, f->name_length), - nxt_cgo_str_init(&value, &f->value, f->value_length)); - } - - nxt_go_request_set_content_length(go_req, r->content_length); - nxt_go_request_set_host(go_req, - nxt_cgo_str_init(&host, &r->server_name, r->server_name_length)); - nxt_go_request_set_remote_addr(go_req, - nxt_cgo_str_init(&remote_addr, &r->remote, r->remote_length)); - - if (r->tls) { - nxt_go_request_set_tls(go_req); - } - - nxt_go_request_handler(go_req, (uintptr_t) req->unit->data); -} - - -static nxt_cgo_str_t * -nxt_cgo_str_init(nxt_cgo_str_t *dst, nxt_unit_sptr_t *sptr, uint32_t length) -{ - dst->length = length; - dst->start = nxt_unit_sptr_get(sptr); - - return dst; -} - - -static int -nxt_cgo_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) -{ - nxt_go_add_port((uintptr_t) ctx, port->id.pid, port->id.id, - port->in_fd, port->out_fd); - - port->in_fd = -1; - port->out_fd = -1; - - return NXT_UNIT_OK; -} - - -static void -nxt_cgo_remove_port(nxt_unit_t *unit, nxt_unit_port_t *port) -{ - nxt_go_remove_port(port->id.pid, port->id.id); -} - - static ssize_t nxt_cgo_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, const void *buf, size_t buf_size, const void *oob, size_t oob_size) @@ -140,78 +64,31 @@ nxt_cgo_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, } -static void -nxt_cgo_shm_ack_handler(nxt_unit_ctx_t *ctx) -{ - return nxt_go_shm_ack_handler(); -} - - -int -nxt_cgo_response_create(uintptr_t req, int status, int fields, - uint32_t fields_size) -{ - return nxt_unit_response_init((nxt_unit_request_info_t *) req, - status, fields, fields_size); -} - - -int -nxt_cgo_response_add_field(uintptr_t req, uintptr_t name, uint8_t name_len, - uintptr_t value, uint32_t value_len) -{ - return nxt_unit_response_add_field((nxt_unit_request_info_t *) req, - (char *) name, name_len, - (char *) value, value_len); -} - - -int -nxt_cgo_response_send(uintptr_t req) -{ - return nxt_unit_response_send((nxt_unit_request_info_t *) req); -} - - ssize_t -nxt_cgo_response_write(uintptr_t req, uintptr_t start, uint32_t len) +nxt_cgo_response_write(nxt_unit_request_info_t *req, uintptr_t start, + uint32_t len) { - return nxt_unit_response_write_nb((nxt_unit_request_info_t *) req, - (void *) start, len, 0); + return nxt_unit_response_write_nb(req, (void *) start, len, 0); } ssize_t -nxt_cgo_request_read(uintptr_t req, uintptr_t dst, uint32_t dst_len) -{ - return nxt_unit_request_read((nxt_unit_request_info_t *) req, - (void *) dst, dst_len); -} - - -int -nxt_cgo_request_close(uintptr_t req) +nxt_cgo_request_read(nxt_unit_request_info_t *req, uintptr_t dst, + uint32_t dst_len) { - return 0; + return nxt_unit_request_read(req, (void *) dst, dst_len); } void -nxt_cgo_request_done(uintptr_t req, int res) +nxt_cgo_warn(const char *msg, uint32_t msg_len) { - nxt_unit_request_done((nxt_unit_request_info_t *) req, res); -} - - -void -nxt_cgo_unit_run_shared(uintptr_t ctx) -{ - nxt_unit_run_shared((nxt_unit_ctx_t *) ctx); + nxt_unit_warn(NULL, "%.*s", (int) msg_len, (char *) msg); } void -nxt_cgo_warn(uintptr_t msg, uint32_t msg_len) +nxt_cgo_alert(const char *msg, uint32_t msg_len) { - nxt_unit_warn(NULL, "%.*s", (int) msg_len, (char *) msg); + nxt_unit_alert(NULL, "%.*s", (int) msg_len, (char *) msg); } diff --git a/go/nxt_cgo_lib.h b/go/nxt_cgo_lib.h index fa515be5..3705d1ef 100644 --- a/go/nxt_cgo_lib.h +++ b/go/nxt_cgo_lib.h @@ -11,32 +11,22 @@ #include <stdint.h> #include <stdlib.h> #include <sys/types.h> +#include <nxt_unit.h> +#include <nxt_unit_request.h> -typedef struct { - int length; - char *start; -} nxt_cgo_str_t; +enum { + NXT_FIELDS_OFFSET = offsetof(nxt_unit_request_t, fields) +}; int nxt_cgo_run(uintptr_t handler); -int nxt_cgo_response_create(uintptr_t req, int code, int fields, - uint32_t fields_size); +ssize_t nxt_cgo_response_write(nxt_unit_request_info_t *req, + uintptr_t src, uint32_t len); -int nxt_cgo_response_add_field(uintptr_t req, uintptr_t name, uint8_t name_len, - uintptr_t value, uint32_t value_len); +ssize_t nxt_cgo_request_read(nxt_unit_request_info_t *req, + uintptr_t dst, uint32_t dst_len); -int nxt_cgo_response_send(uintptr_t req); - -ssize_t nxt_cgo_response_write(uintptr_t req, uintptr_t src, uint32_t len); - -ssize_t nxt_cgo_request_read(uintptr_t req, uintptr_t dst, uint32_t dst_len); - -int nxt_cgo_request_close(uintptr_t req); - -void nxt_cgo_request_done(uintptr_t req, int res); - -void nxt_cgo_unit_run_shared(uintptr_t ctx); - -void nxt_cgo_warn(uintptr_t msg, uint32_t msg_len); +void nxt_cgo_warn(const char *msg, uint32_t msg_len); +void nxt_cgo_alert(const char *msg, uint32_t msg_len); #endif /* _NXT_CGO_LIB_H_INCLUDED_ */ @@ -11,6 +11,7 @@ package unit import "C" import ( + "io" "net" "os" "sync" @@ -79,13 +80,13 @@ func getUnixConn(fd int) *net.UnixConn { c, err := net.FileConn(f) if err != nil { - nxt_go_warn("FileConn error %s", err) + nxt_go_alert("FileConn error %s", err) return nil } uc, ok := c.(*net.UnixConn) if !ok { - nxt_go_warn("Not a Unix-domain socket %d", fd) + nxt_go_alert("Not a Unix-domain socket %d", fd) return nil } @@ -93,30 +94,37 @@ func getUnixConn(fd int) *net.UnixConn { } //export nxt_go_add_port -func nxt_go_add_port(ctx C.uintptr_t, pid C.int, id C.int, rcv C.int, snd C.int) { - p := &port{ +func nxt_go_add_port(ctx *C.nxt_unit_ctx_t, p *C.nxt_unit_port_t) C.int { + + new_port := &port{ key: port_key{ - pid: int(pid), - id: int(id), + pid: int(p.id.pid), + id: int(p.id.id), }, - rcv: getUnixConn(int(rcv)), - snd: getUnixConn(int(snd)), + rcv: getUnixConn(int(p.in_fd)), + snd: getUnixConn(int(p.out_fd)), } - add_port(p) + add_port(new_port) + + p.in_fd = -1 + p.out_fd = -1 - if id == 65535 { - go func(ctx C.uintptr_t) { - C.nxt_cgo_unit_run_shared(ctx); + if new_port.key.id == 65535 { + go func(ctx *C.nxt_unit_ctx_t) { + C.nxt_unit_run_shared(ctx); }(ctx) } + + return C.NXT_UNIT_OK } //export nxt_go_remove_port -func nxt_go_remove_port(pid C.int, id C.int) { +func nxt_go_remove_port(unit *C.nxt_unit_t, p *C.nxt_unit_port_t) { + key := port_key{ - pid: int(pid), - id: int(id), + pid: int(p.id.pid), + id: int(p.id.id), } port_registry_.Lock() @@ -139,7 +147,7 @@ func nxt_go_port_send(pid C.int, id C.int, buf unsafe.Pointer, buf_size C.int, p := find_port(key) if p == nil { - nxt_go_warn("port %d:%d not found", pid, id) + nxt_go_alert("port %d:%d not found", pid, id) return 0 } @@ -167,7 +175,7 @@ func nxt_go_port_recv(pid C.int, id C.int, buf unsafe.Pointer, buf_size C.int, p := find_port(key) if p == nil { - nxt_go_warn("port %d:%d not found", pid, id) + nxt_go_alert("port %d:%d not found", pid, id) return 0 } @@ -175,6 +183,12 @@ func nxt_go_port_recv(pid C.int, id C.int, buf unsafe.Pointer, buf_size C.int, GoBytes(oob, oob_size)) if err != nil { + if nerr, ok := err.(*net.OpError); ok { + if nerr.Err == io.EOF { + return 0 + } + } + nxt_go_warn("read result %d (%d), %s", n, oobn, err) n = -1 diff --git a/go/request.go b/go/request.go index 1d8c6702..7e2e848a 100644 --- a/go/request.go +++ b/go/request.go @@ -19,9 +19,9 @@ import ( ) type request struct { - req http.Request - resp *response - c_req C.uintptr_t + req http.Request + resp response + c_req *C.nxt_unit_request_info_t } func (r *request) Read(p []byte) (n int, err error) { @@ -35,110 +35,102 @@ func (r *request) Read(p []byte) (n int, err error) { } func (r *request) Close() error { - C.nxt_cgo_request_close(r.c_req) return nil } -func (r *request) response() *response { - if r.resp == nil { - r.resp = new_response(r.c_req, &r.req) - } +func new_request(c_req *C.nxt_unit_request_info_t) (r *request, err error) { + req := c_req.request - return r.resp -} + uri := GoStringN(&req.target, C.int(req.target_length)) -func (r *request) done() { - resp := r.response() - if !resp.headerSent { - resp.WriteHeader(http.StatusOK) + URL, err := url.ParseRequestURI(uri) + if err != nil { + return nil, err } - C.nxt_cgo_request_done(r.c_req, 0) -} - -func get_request(go_req uintptr) *request { - return (*request)(unsafe.Pointer(go_req)) -} - -//export nxt_go_request_create -func nxt_go_request_create(c_req C.uintptr_t, - c_method *C.nxt_cgo_str_t, c_uri *C.nxt_cgo_str_t) uintptr { - - uri := C.GoStringN(c_uri.start, c_uri.length) - var URL *url.URL - var err error - if URL, err = url.ParseRequestURI(uri); err != nil { - return 0 - } + proto := GoStringN(&req.version, C.int(req.version_length)) - r := &request{ - req: http.Request{ - Method: C.GoStringN(c_method.start, c_method.length), - URL: URL, - Header: http.Header{}, - Body: nil, + r = &request{ + req: http.Request { + URL: URL, + Header: http.Header{}, RequestURI: uri, + Method: GoStringN(&req.method, C.int(req.method_length)), + Proto: proto, + ProtoMajor: 1, + ProtoMinor: int(proto[7] - '0'), + ContentLength: int64(req.content_length), + Host: GoStringN(&req.server_name, C.int(req.server_name_length)), + RemoteAddr: GoStringN(&req.remote, C.int(req.remote_length)), }, + resp: response{header: http.Header{}, c_req: c_req}, c_req: c_req, } + r.req.Body = r - return uintptr(unsafe.Pointer(r)) -} + if req.tls != 0 { + r.req.TLS = &tls.ConnectionState{ } + r.req.URL.Scheme = "https" -//export nxt_go_request_set_proto -func nxt_go_request_set_proto(go_req uintptr, proto *C.nxt_cgo_str_t, - maj C.int, min C.int) { + } else { + r.req.URL.Scheme = "http" + } - r := get_request(go_req) - r.req.Proto = C.GoStringN(proto.start, proto.length) - r.req.ProtoMajor = int(maj) - r.req.ProtoMinor = int(min) -} + fields := get_fields(req) -//export nxt_go_request_add_header -func nxt_go_request_add_header(go_req uintptr, name *C.nxt_cgo_str_t, - value *C.nxt_cgo_str_t) { + for i := 0; i < len(fields); i++ { + f := &fields[i] - r := get_request(go_req) - r.req.Header.Add(C.GoStringN(name.start, name.length), - C.GoStringN(value.start, value.length)) -} + n := GoStringN(&f.name, C.int(f.name_length)) + v := GoStringN(&f.value, C.int(f.value_length)) -//export nxt_go_request_set_content_length -func nxt_go_request_set_content_length(go_req uintptr, l C.int64_t) { - get_request(go_req).req.ContentLength = int64(l) -} + r.req.Header.Add(n, v) + } -//export nxt_go_request_set_host -func nxt_go_request_set_host(go_req uintptr, host *C.nxt_cgo_str_t) { - get_request(go_req).req.Host = C.GoStringN(host.start, host.length) + return r, nil } -//export nxt_go_request_set_url -func nxt_go_request_set_url(go_req uintptr, scheme *C.char) { - get_request(go_req).req.URL.Scheme = C.GoString(scheme) -} +func get_fields(req *C.nxt_unit_request_t) []C.nxt_unit_field_t { + f := uintptr(unsafe.Pointer(req)) + uintptr(C.NXT_FIELDS_OFFSET) -//export nxt_go_request_set_remote_addr -func nxt_go_request_set_remote_addr(go_req uintptr, addr *C.nxt_cgo_str_t) { + h := &slice_header{ + Data: unsafe.Pointer(f), + Len: int(req.fields_count), + Cap: int(req.fields_count), + } - get_request(go_req).req.RemoteAddr = C.GoStringN(addr.start, addr.length) + return *(*[]C.nxt_unit_field_t)(unsafe.Pointer(h)) } -//export nxt_go_request_set_tls -func nxt_go_request_set_tls(go_req uintptr) { +//export nxt_go_request_handler +func nxt_go_request_handler(c_req *C.nxt_unit_request_info_t) { - get_request(go_req).req.TLS = &tls.ConnectionState{ } -} + go func(c_req *C.nxt_unit_request_info_t, handler http.Handler) { -//export nxt_go_request_handler -func nxt_go_request_handler(go_req uintptr, h uintptr) { - r := get_request(go_req) - handler := get_handler(h) - - go func(r *request) { - handler.ServeHTTP(r.response(), &r.req) - r.done() - }(r) + ctx := c_req.ctx + + for { + r, err := new_request(c_req) + + if err == nil { + handler.ServeHTTP(&r.resp, &r.req) + + if !r.resp.header_sent { + r.resp.WriteHeader(http.StatusOK) + } + + C.nxt_unit_request_done(c_req, C.NXT_UNIT_OK) + + } else { + C.nxt_unit_request_done(c_req, C.NXT_UNIT_ERROR) + } + + c_req = C.nxt_unit_dequeue_request(ctx) + if c_req == nil { + break + } + } + + }(c_req, get_handler(uintptr(c_req.unit.data))) } diff --git a/go/response.go b/go/response.go index bfa79656..a1af30b3 100644 --- a/go/response.go +++ b/go/response.go @@ -16,28 +16,17 @@ import ( type response struct { header http.Header - headerSent bool - req *http.Request - c_req C.uintptr_t + header_sent bool + c_req *C.nxt_unit_request_info_t ch chan int } -func new_response(c_req C.uintptr_t, req *http.Request) *response { - resp := &response{ - header: http.Header{}, - req: req, - c_req: c_req, - } - - return resp -} - func (r *response) Header() http.Header { return r.header } func (r *response) Write(p []byte) (n int, err error) { - if !r.headerSent { + if !r.header_sent { r.WriteHeader(http.StatusOK) } @@ -64,12 +53,11 @@ func (r *response) Write(p []byte) (n int, err error) { } func (r *response) WriteHeader(code int) { - if r.headerSent { - // Note: explicitly using Stderr, as Stdout is our HTTP output. + if r.header_sent { nxt_go_warn("multiple response.WriteHeader calls") return } - r.headerSent = true + r.header_sent = true // Set a default Content-Type if _, hasType := r.header["Content-Type"]; !hasType { @@ -86,21 +74,21 @@ func (r *response) WriteHeader(code int) { } } - C.nxt_cgo_response_create(r.c_req, C.int(code), C.int(fields), + C.nxt_unit_response_init(r.c_req, C.uint16_t(code), C.uint32_t(fields), C.uint32_t(fields_size)) for k, vv := range r.header { for _, v := range vv { - C.nxt_cgo_response_add_field(r.c_req, str_ref(k), C.uint8_t(len(k)), + C.nxt_unit_response_add_field(r.c_req, str_ref(k), C.uint8_t(len(k)), str_ref(v), C.uint32_t(len(v))) } } - C.nxt_cgo_response_send(r.c_req) + C.nxt_unit_response_send(r.c_req) } func (r *response) Flush() { - if !r.headerSent { + if !r.header_sent { r.WriteHeader(http.StatusOK) } } @@ -114,6 +102,6 @@ func wait_shm_ack(c chan int) { } //export nxt_go_shm_ack_handler -func nxt_go_shm_ack_handler() { +func nxt_go_shm_ack_handler(ctx *C.nxt_unit_ctx_t) { observer_registry_.notify(1) } @@ -30,15 +30,15 @@ func buf_ref(buf []byte) C.uintptr_t { return C.uintptr_t(uintptr(unsafe.Pointer(&buf[0]))) } -type StringHeader struct { +type string_header struct { Data unsafe.Pointer Len int } -func str_ref(s string) C.uintptr_t { - header := (*StringHeader)(unsafe.Pointer(&s)) +func str_ref(s string) *C.char { + header := (*string_header)(unsafe.Pointer(&s)) - return C.uintptr_t(uintptr(unsafe.Pointer(header.Data))) + return (*C.char)(header.Data) } func (buf *cbuf) init_bytes(b []byte) { @@ -46,12 +46,7 @@ func (buf *cbuf) init_bytes(b []byte) { buf.s = C.size_t(len(b)) } -func (buf *cbuf) init_string(s string) { - buf.b = str_ref(s) - buf.s = C.size_t(len(s)) -} - -type SliceHeader struct { +type slice_header struct { Data unsafe.Pointer Len int Cap int @@ -63,17 +58,17 @@ func (buf *cbuf) GoBytes() []byte { return b[:0] } - bytesHeader := &SliceHeader{ + header := &slice_header{ Data: unsafe.Pointer(uintptr(buf.b)), Len: int(buf.s), Cap: int(buf.s), } - return *(*[]byte)(unsafe.Pointer(bytesHeader)) + return *(*[]byte)(unsafe.Pointer(header)) } func GoBytes(buf unsafe.Pointer, size C.int) []byte { - bytesHeader := &SliceHeader{ + bytesHeader := &slice_header{ Data: buf, Len: int(size), Cap: int(size), @@ -82,12 +77,25 @@ func GoBytes(buf unsafe.Pointer, size C.int) []byte { return *(*[]byte)(unsafe.Pointer(bytesHeader)) } +func GoStringN(sptr *C.nxt_unit_sptr_t, l C.int) string { + p := unsafe.Pointer(sptr) + b := uintptr(p) + uintptr(*(*C.uint32_t)(p)) + + return C.GoStringN((*C.char)(unsafe.Pointer(b)), l) +} + func nxt_go_warn(format string, args ...interface{}) { str := fmt.Sprintf("[go] " + format, args...) C.nxt_cgo_warn(str_ref(str), C.uint32_t(len(str))) } +func nxt_go_alert(format string, args ...interface{}) { + str := fmt.Sprintf("[go] " + format, args...) + + C.nxt_cgo_alert(str_ref(str), C.uint32_t(len(str))) +} + type handler_registry struct { sync.RWMutex next uintptr diff --git a/src/nxt_unit.c b/src/nxt_unit.c index f0c68374..44525d04 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -54,12 +54,13 @@ static int nxt_unit_read_env(nxt_unit_port_t *ready_port, int *log_fd, uint32_t *stream, uint32_t *shm_limit); static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream, int queue_fd); -static int nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf); +static int nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf, + nxt_unit_request_info_t **preq); static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg); static int nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx); static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, - nxt_unit_recv_msg_t *recv_msg); + nxt_unit_recv_msg_t *recv_msg, nxt_unit_request_info_t **preq); static int nxt_unit_process_req_body(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg); static int nxt_unit_request_check_response_port(nxt_unit_request_info_t *req, @@ -904,7 +905,8 @@ nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream, int queue_fd) static int -nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) +nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf, + nxt_unit_request_info_t **preq) { int rc; pid_t pid; @@ -1040,7 +1042,7 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) break; case _NXT_PORT_MSG_REQ_HEADERS: - rc = nxt_unit_process_req_headers(ctx, &recv_msg); + rc = nxt_unit_process_req_headers(ctx, &recv_msg, preq); break; case _NXT_PORT_MSG_REQ_BODY: @@ -1213,7 +1215,8 @@ nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx) static int -nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) +nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg, + nxt_unit_request_info_t **preq) { int res; nxt_unit_impl_t *lib; @@ -1329,7 +1332,12 @@ nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) } } - lib->callbacks.request_handler(req); + if (preq == NULL) { + lib->callbacks.request_handler(req); + + } else { + *preq = req; + } } return NXT_UNIT_OK; @@ -2179,7 +2187,8 @@ nxt_unit_response_add_field(nxt_unit_request_info_t *req, resp = req->response; if (nxt_slow_path(resp->fields_count >= req->response_max_fields)) { - nxt_unit_req_warn(req, "add_field: too many response fields"); + nxt_unit_req_warn(req, "add_field: too many response fields (%d)", + (int) resp->fields_count); return NXT_UNIT_ERROR; } @@ -2356,6 +2365,8 @@ nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size) if (nxt_slow_path(rc != NXT_UNIT_OK)) { nxt_unit_mmap_buf_release(mmap_buf); + nxt_unit_req_alert(req, "response_buf_alloc: failed to get out buf"); + return NULL; } @@ -4537,7 +4548,7 @@ nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx) return rc; } - rc = nxt_unit_process_msg(ctx, rbuf); + rc = nxt_unit_process_msg(ctx, rbuf, NULL); if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { return NXT_UNIT_ERROR; } @@ -4686,7 +4697,7 @@ nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx) nxt_queue_each(rbuf, &pending_rbuf, nxt_unit_read_buf_t, link) { if (nxt_fast_path(rc != NXT_UNIT_ERROR)) { - rc = nxt_unit_process_msg(&ctx_impl->ctx, rbuf); + rc = nxt_unit_process_msg(&ctx_impl->ctx, rbuf, NULL); } else { nxt_unit_read_buf_release(ctx, rbuf); @@ -4793,7 +4804,7 @@ nxt_unit_run_ctx(nxt_unit_ctx_t *ctx) goto retry; } - rc = nxt_unit_process_msg(ctx, rbuf); + rc = nxt_unit_process_msg(ctx, rbuf, NULL); if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { break; } @@ -4902,7 +4913,7 @@ nxt_unit_run_shared(nxt_unit_ctx_t *ctx) break; } - rc = nxt_unit_process_msg(ctx, rbuf); + rc = nxt_unit_process_msg(ctx, rbuf, NULL); if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { break; } @@ -4921,6 +4932,46 @@ nxt_unit_run_shared(nxt_unit_ctx_t *ctx) } +nxt_unit_request_info_t * +nxt_unit_dequeue_request(nxt_unit_ctx_t *ctx) +{ + int rc; + nxt_unit_impl_t *lib; + nxt_unit_read_buf_t *rbuf; + nxt_unit_ctx_impl_t *ctx_impl; + nxt_unit_request_info_t *req; + + nxt_unit_ctx_use(ctx); + + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + + req = NULL; + + if (nxt_slow_path(!ctx_impl->online)) { + goto done; + } + + rbuf = nxt_unit_read_buf_get(ctx); + if (nxt_slow_path(rbuf == NULL)) { + goto done; + } + + rc = nxt_unit_app_queue_recv(lib->shared_port, rbuf); + if (rc != NXT_UNIT_OK) { + goto done; + } + + (void) nxt_unit_process_msg(ctx, rbuf, &req); + +done: + + nxt_unit_ctx_release(ctx); + + return req; +} + + int nxt_unit_is_main_ctx(nxt_unit_ctx_t *ctx) { @@ -4977,7 +5028,7 @@ retry: return rc; } - rc = nxt_unit_process_msg(ctx, rbuf); + rc = nxt_unit_process_msg(ctx, rbuf, NULL); if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { return NXT_UNIT_ERROR; } diff --git a/src/nxt_unit.h b/src/nxt_unit.h index 90cba2a3..1e1a8dbe 100644 --- a/src/nxt_unit.h +++ b/src/nxt_unit.h @@ -213,6 +213,8 @@ int nxt_unit_run_ctx(nxt_unit_ctx_t *ctx); int nxt_unit_run_shared(nxt_unit_ctx_t *ctx); +nxt_unit_request_info_t *nxt_unit_dequeue_request(nxt_unit_ctx_t *ctx); + int nxt_unit_is_main_ctx(nxt_unit_ctx_t *ctx); /* |