summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--go/nxt_cgo_lib.c11
-rw-r--r--go/nxt_cgo_lib.h2
-rw-r--r--go/port.go8
-rw-r--r--src/nodejs/unit-http/unit.cpp22
-rw-r--r--src/nxt_http_websocket.c32
-rw-r--r--src/nxt_port.c2
-rw-r--r--src/nxt_port.h11
-rw-r--r--src/nxt_router.c1504
-rw-r--r--src/nxt_router.h14
-rw-r--r--src/nxt_router_request.h37
-rw-r--r--src/nxt_unit.c467
-rw-r--r--src/nxt_unit.h13
12 files changed, 894 insertions, 1229 deletions
diff --git a/go/nxt_cgo_lib.c b/go/nxt_cgo_lib.c
index 937996b0..f7171f55 100644
--- a/go/nxt_cgo_lib.c
+++ b/go/nxt_cgo_lib.c
@@ -44,7 +44,7 @@ nxt_cgo_run(uintptr_t handler)
return NXT_UNIT_ERROR;
}
- rc = nxt_unit_run(ctx);
+ rc = nxt_unit_run_ctx(ctx);
nxt_unit_done(ctx);
@@ -105,7 +105,7 @@ nxt_cgo_str_init(nxt_cgo_str_t *dst, nxt_unit_sptr_t *sptr, uint32_t length)
static int
nxt_cgo_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
{
- nxt_go_add_port(port->id.pid, port->id.id,
+ nxt_go_add_port((uintptr_t) ctx, port->id.pid, port->id.id,
port->in_fd, port->out_fd);
port->in_fd = -1;
@@ -204,6 +204,13 @@ nxt_cgo_request_done(uintptr_t req, int res)
void
+nxt_cgo_unit_run_shared(uintptr_t ctx)
+{
+ nxt_unit_run_shared((nxt_unit_ctx_t *) ctx);
+}
+
+
+void
nxt_cgo_warn(uintptr_t msg, uint32_t msg_len)
{
nxt_unit_warn(NULL, "%.*s", (int) msg_len, (char *) msg);
diff --git a/go/nxt_cgo_lib.h b/go/nxt_cgo_lib.h
index 5317380b..fa515be5 100644
--- a/go/nxt_cgo_lib.h
+++ b/go/nxt_cgo_lib.h
@@ -35,6 +35,8 @@ int nxt_cgo_request_close(uintptr_t req);
void nxt_cgo_request_done(uintptr_t req, int res);
+void nxt_cgo_unit_run_shared(uintptr_t ctx);
+
void nxt_cgo_warn(uintptr_t msg, uint32_t msg_len);
#endif /* _NXT_CGO_LIB_H_INCLUDED_ */
diff --git a/go/port.go b/go/port.go
index 59a13f8b..64004d91 100644
--- a/go/port.go
+++ b/go/port.go
@@ -93,7 +93,7 @@ func getUnixConn(fd int) *net.UnixConn {
}
//export nxt_go_add_port
-func nxt_go_add_port(pid C.int, id C.int, rcv C.int, snd C.int) {
+func nxt_go_add_port(ctx C.uintptr_t, pid C.int, id C.int, rcv C.int, snd C.int) {
p := &port{
key: port_key{
pid: int(pid),
@@ -104,6 +104,12 @@ func nxt_go_add_port(pid C.int, id C.int, rcv C.int, snd C.int) {
}
add_port(p)
+
+ if id == 65535 {
+ go func(ctx C.uintptr_t) {
+ C.nxt_cgo_unit_run_shared(ctx);
+ }(ctx)
+ }
}
//export nxt_go_remove_port
diff --git a/src/nodejs/unit-http/unit.cpp b/src/nodejs/unit-http/unit.cpp
index 468acf96..1ee5b742 100644
--- a/src/nodejs/unit-http/unit.cpp
+++ b/src/nodejs/unit-http/unit.cpp
@@ -20,7 +20,7 @@ napi_ref Unit::constructor_;
struct port_data_t {
nxt_unit_ctx_t *ctx;
- nxt_unit_port_id_t port_id;
+ nxt_unit_port_t *port;
uv_poll_t poll;
};
@@ -351,7 +351,11 @@ Unit::shm_ack_handler(nxt_unit_ctx_t *ctx)
static void
nxt_uv_read_callback(uv_poll_t *handle, int status, int events)
{
- nxt_unit_run_once((nxt_unit_ctx_t *) handle->data);
+ port_data_t *data;
+
+ data = (port_data_t *) handle->data;
+
+ nxt_unit_process_port_msg(data->ctx, data->port);
}
@@ -396,21 +400,14 @@ Unit::add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
port->data = data;
data->ctx = ctx;
- data->port_id = port->id;
- data->poll.data = ctx;
+ data->port = port;
+ data->poll.data = data;
}
return NXT_UNIT_OK;
}
-inline bool
-operator == (const nxt_unit_port_id_t &p1, const nxt_unit_port_id_t &p2)
-{
- return p1.pid == p2.pid && p1.id == p2.id;
-}
-
-
void
Unit::remove_port(nxt_unit_t *unit, nxt_unit_port_t *port)
{
@@ -419,10 +416,9 @@ Unit::remove_port(nxt_unit_t *unit, nxt_unit_port_t *port)
if (port->data != NULL) {
data = (port_data_t *) port->data;
- if (data->port_id == port->id) {
+ if (data->port == port) {
uv_poll_stop(&data->poll);
- data->poll.data = data;
uv_close((uv_handle_t *) &data->poll, delete_port_data);
}
}
diff --git a/src/nxt_http_websocket.c b/src/nxt_http_websocket.c
index 4d31b320..393c20ac 100644
--- a/src/nxt_http_websocket.c
+++ b/src/nxt_http_websocket.c
@@ -33,15 +33,13 @@ nxt_http_websocket_client(nxt_task_t *task, void *obj, void *data)
nxt_buf_t *out, *buf, **out_tail, *b, *next;
nxt_int_t res;
nxt_http_request_t *r;
- nxt_request_app_link_t *req_app_link;
nxt_request_rpc_data_t *req_rpc_data;
nxt_websocket_header_t *wsh;
r = obj;
+ req_rpc_data = r->req_rpc_data;
- if (nxt_slow_path((req_rpc_data = r->req_rpc_data) == NULL
- || (req_app_link = req_rpc_data->req_app_link) == NULL))
- {
+ if (nxt_slow_path(req_rpc_data == NULL)) {
nxt_debug(task, "websocket client frame for destroyed request");
return;
@@ -69,8 +67,7 @@ nxt_http_websocket_client(nxt_task_t *task, void *obj, void *data)
if (buf == NULL || buf_free_size == 0) {
buf_free_size = nxt_min(frame_size, PORT_MMAP_DATA_SIZE);
- buf = nxt_port_mmap_get_buf(task,
- &req_app_link->app_port->process->outgoing,
+ buf = nxt_port_mmap_get_buf(task, &req_rpc_data->app->outgoing,
buf_free_size);
*out_tail = buf;
@@ -101,10 +98,10 @@ nxt_http_websocket_client(nxt_task_t *task, void *obj, void *data)
b = next;
}
- res = nxt_port_socket_twrite(task, req_app_link->app_port,
+ res = nxt_port_socket_twrite(task, req_rpc_data->app_port,
NXT_PORT_MSG_WEBSOCKET, -1,
- req_app_link->stream,
- req_app_link->reply_port->id, out, NULL);
+ req_rpc_data->stream,
+ task->thread->engine->port->id, out, NULL);
if (nxt_slow_path(res != NXT_OK)) {
// TODO: handle
}
@@ -130,32 +127,27 @@ static void
nxt_http_websocket_error_handler(nxt_task_t *task, void *obj, void *data)
{
nxt_http_request_t *r;
- nxt_request_app_link_t *req_app_link;
nxt_request_rpc_data_t *req_rpc_data;
nxt_debug(task, "http websocket error handler");
r = obj;
+ req_rpc_data = r->req_rpc_data;
- if ((req_rpc_data = r->req_rpc_data) == NULL) {
+ if (req_rpc_data == NULL) {
nxt_debug(task, " req_rpc_data is NULL");
goto close_handler;
}
- if ((req_app_link = req_rpc_data->req_app_link) == NULL) {
- nxt_debug(task, " req_app_link is NULL");
- goto close_handler;
- }
-
- if (req_app_link->app_port == NULL) {
+ if (req_rpc_data->app_port == NULL) {
nxt_debug(task, " app_port is NULL");
goto close_handler;
}
- (void) nxt_port_socket_twrite(task, req_app_link->app_port,
+ (void) nxt_port_socket_twrite(task, req_rpc_data->app_port,
NXT_PORT_MSG_WEBSOCKET_LAST,
- -1, req_app_link->stream,
- req_app_link->reply_port->id, NULL, NULL);
+ -1, req_rpc_data->stream,
+ task->thread->engine->port->id, NULL, NULL);
close_handler:
diff --git a/src/nxt_port.c b/src/nxt_port.c
index 7232c465..54434d70 100644
--- a/src/nxt_port.c
+++ b/src/nxt_port.c
@@ -67,8 +67,6 @@ nxt_port_new(nxt_task_t *task, nxt_port_id_t id, nxt_pid_t pid,
nxt_queue_init(&port->messages);
nxt_thread_mutex_create(&port->write_mutex);
- nxt_queue_init(&port->pending_requests);
- nxt_queue_init(&port->active_websockets);
} else {
nxt_mp_destroy(mp);
diff --git a/src/nxt_port.h b/src/nxt_port.h
index 3a8a200a..9a933e75 100644
--- a/src/nxt_port.h
+++ b/src/nxt_port.h
@@ -41,6 +41,7 @@ struct nxt_port_handlers_s {
/* Request headers. */
nxt_port_handler_t req_headers;
+ nxt_port_handler_t req_headers_ack;
/* Websocket frame. */
nxt_port_handler_t websocket_frame;
@@ -89,6 +90,7 @@ typedef enum {
_NXT_PORT_MSG_QUIT = nxt_port_handler_idx(quit),
_NXT_PORT_MSG_REQ_HEADERS = nxt_port_handler_idx(req_headers),
+ _NXT_PORT_MSG_REQ_HEADERS_ACK = nxt_port_handler_idx(req_headers_ack),
_NXT_PORT_MSG_WEBSOCKET = nxt_port_handler_idx(websocket_frame),
_NXT_PORT_MSG_DATA = nxt_port_handler_idx(data),
@@ -113,7 +115,8 @@ typedef enum {
NXT_PORT_MSG_NEW_PORT = nxt_msg_last(_NXT_PORT_MSG_NEW_PORT),
NXT_PORT_MSG_GET_PORT = nxt_msg_last(_NXT_PORT_MSG_GET_PORT),
NXT_PORT_MSG_MMAP = nxt_msg_last(_NXT_PORT_MSG_MMAP)
- | NXT_PORT_MSG_CLOSE_FD | NXT_PORT_MSG_SYNC,
+ | NXT_PORT_MSG_SYNC,
+ NXT_PORT_MSG_GET_MMAP = nxt_msg_last(_NXT_PORT_MSG_GET_MMAP),
NXT_PORT_MSG_PROCESS_CREATED = nxt_msg_last(_NXT_PORT_MSG_PROCESS_CREATED),
NXT_PORT_MSG_PROCESS_READY = nxt_msg_last(_NXT_PORT_MSG_PROCESS_READY),
@@ -193,6 +196,7 @@ struct nxt_port_s {
nxt_queue_link_t app_link; /* for nxt_app_t.ports */
nxt_app_t *app;
+ nxt_port_t *main_app_port;
nxt_queue_link_t idle_link; /* for nxt_app_t.idle_ports */
nxt_msec_t idle_start;
@@ -205,11 +209,10 @@ struct nxt_port_s {
/* Maximum interleave of message parts. */
uint32_t max_share;
- uint32_t app_pending_responses;
uint32_t app_responses;
- nxt_queue_t pending_requests;
- nxt_queue_t active_websockets;
+ uint32_t active_websockets;
+ uint32_t active_requests;
nxt_port_handler_t handler;
nxt_port_handler_t *data;
diff --git a/src/nxt_router.c b/src/nxt_router.c
index 4df1489d..44b303e4 100644
--- a/src/nxt_router.c
+++ b/src/nxt_router.c
@@ -61,59 +61,13 @@ typedef struct {
} nxt_app_rpc_t;
-struct nxt_port_select_state_s {
- nxt_app_t *app;
- nxt_request_app_link_t *req_app_link;
-
- nxt_port_t *failed_port;
- int failed_port_use_delta;
-
- uint8_t start_process; /* 1 bit */
- nxt_request_app_link_t *shared_ra;
- nxt_port_t *port;
-};
-
-typedef struct nxt_port_select_state_s nxt_port_select_state_t;
-
static nxt_int_t nxt_router_prefork(nxt_task_t *task, nxt_process_t *process,
nxt_mp_t *mp);
static nxt_int_t nxt_router_start(nxt_task_t *task, nxt_process_data_t *data);
static void nxt_router_greet_controller(nxt_task_t *task,
nxt_port_t *controller_port);
-static void nxt_router_port_select(nxt_task_t *task,
- nxt_port_select_state_t *state);
-
-static nxt_int_t nxt_router_port_post_select(nxt_task_t *task,
- nxt_port_select_state_t *state);
-
static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app);
-static void nxt_request_app_link_update_peer(nxt_task_t *task,
- nxt_request_app_link_t *req_app_link);
-
-
-nxt_inline void
-nxt_request_app_link_inc_use(nxt_request_app_link_t *req_app_link)
-{
- nxt_atomic_fetch_add(&req_app_link->use_count, 1);
-}
-
-nxt_inline void
-nxt_request_app_link_chk_use(nxt_request_app_link_t *req_app_link, int i)
-{
-#if (NXT_DEBUG)
- int c;
-
- c = nxt_atomic_fetch_add(&req_app_link->use_count, i);
-
- nxt_assert((c + i) > 0);
-#else
- (void) nxt_atomic_fetch_add(&req_app_link->use_count, i);
-#endif
-}
-
-static void nxt_request_app_link_use(nxt_task_t *task,
- nxt_request_app_link_t *req_app_link, int i);
static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task);
static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data);
@@ -196,6 +150,8 @@ static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj,
void *data);
static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj,
void *data);
+static void nxt_router_req_headers_ack_handler(nxt_task_t *task,
+ nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data);
static void nxt_router_listen_socket_release(nxt_task_t *task,
nxt_socket_conf_t *skcf);
@@ -220,6 +176,8 @@ static void nxt_router_access_log_reopen_error(nxt_task_t *task,
static void nxt_router_app_port_ready(nxt_task_t *task,
nxt_port_recv_msg_t *msg, void *data);
+static nxt_int_t nxt_router_app_shared_port_send(nxt_task_t *task,
+ nxt_port_t *app_port);
static void nxt_router_app_port_error(nxt_task_t *task,
nxt_port_recv_msg_t *msg, void *data);
@@ -227,13 +185,15 @@ static void nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app);
static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
nxt_apr_action_t action);
-static nxt_int_t nxt_router_app_port(nxt_task_t *task, nxt_app_t *app,
- nxt_request_app_link_t *req_app_link);
+static void nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app,
+ nxt_request_rpc_data_t *req_rpc_data);
+static void nxt_router_http_request_done(nxt_task_t *task, void *obj,
+ void *data);
static void nxt_router_app_prepare_request(nxt_task_t *task,
- nxt_request_app_link_t *req_app_link);
+ nxt_request_rpc_data_t *req_rpc_data);
static nxt_buf_t *nxt_router_prepare_msg(nxt_task_t *task,
- nxt_http_request_t *r, nxt_port_t *port, const nxt_str_t *prefix);
+ nxt_http_request_t *r, nxt_app_t *app, const nxt_str_t *prefix);
static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data);
static void nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj,
@@ -250,7 +210,7 @@ static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data);
static void nxt_router_app_joint_use(nxt_task_t *task,
nxt_app_joint_t *app_joint, int i);
-static nxt_int_t nxt_router_http_request_done(nxt_task_t *task,
+static void nxt_router_http_request_release_post(nxt_task_t *task,
nxt_http_request_t *r);
static void nxt_router_http_request_release(nxt_task_t *task, void *obj,
void *data);
@@ -501,83 +461,6 @@ nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app)
}
-nxt_inline void
-nxt_request_app_link_init(nxt_task_t *task,
- nxt_request_app_link_t *req_app_link, nxt_request_rpc_data_t *req_rpc_data)
-{
- nxt_buf_t *body;
- nxt_event_engine_t *engine;
-
- engine = task->thread->engine;
-
- nxt_memzero(req_app_link, sizeof(nxt_request_app_link_t));
-
- req_app_link->stream = req_rpc_data->stream;
- req_app_link->use_count = 1;
- req_app_link->req_rpc_data = req_rpc_data;
- req_rpc_data->req_app_link = req_app_link;
- req_app_link->reply_port = engine->port;
- req_app_link->request = req_rpc_data->request;
- req_app_link->apr_action = NXT_APR_GOT_RESPONSE;
-
- req_app_link->work.handler = NULL;
- req_app_link->work.task = &engine->task;
- req_app_link->work.obj = req_app_link;
- req_app_link->work.data = engine;
-
- body = req_rpc_data->request->body;
-
- if (body != NULL && nxt_buf_is_file(body)) {
- req_app_link->body_fd = body->file->fd;
-
- body->file->fd = -1;
-
- } else {
- req_app_link->body_fd = -1;
- }
-}
-
-
-nxt_inline nxt_request_app_link_t *
-nxt_request_app_link_alloc(nxt_task_t *task,
- nxt_request_app_link_t *ra_src, nxt_request_rpc_data_t *req_rpc_data)
-{
- nxt_mp_t *mp;
- nxt_request_app_link_t *req_app_link;
-
- if (ra_src != NULL && ra_src->mem_pool != NULL) {
- return ra_src;
- }
-
- mp = req_rpc_data->request->mem_pool;
-
- req_app_link = nxt_mp_alloc(mp, sizeof(nxt_request_app_link_t));
-
- if (nxt_slow_path(req_app_link == NULL)) {
-
- req_rpc_data->req_app_link = NULL;
-
- if (ra_src != NULL) {
- ra_src->req_rpc_data = NULL;
- }
-
- return NULL;
- }
-
- nxt_mp_retain(mp);
-
- nxt_request_app_link_init(task, req_app_link, req_rpc_data);
-
- if (ra_src != NULL) {
- req_app_link->body_fd = ra_src->body_fd;
- }
-
- req_app_link->mem_pool = mp;
-
- return req_app_link;
-}
-
-
nxt_inline nxt_bool_t
nxt_router_msg_cancel(nxt_task_t *task, nxt_msg_info_t *msg_info,
uint32_t stream)
@@ -614,198 +497,6 @@ nxt_router_msg_cancel(nxt_task_t *task, nxt_msg_info_t *msg_info,
}
-static void
-nxt_request_app_link_update_peer_handler(nxt_task_t *task, void *obj,
- void *data)
-{
- nxt_request_app_link_t *req_app_link;
-
- req_app_link = obj;
-
- nxt_request_app_link_update_peer(task, req_app_link);
-
- nxt_request_app_link_use(task, req_app_link, -1);
-}
-
-
-static void
-nxt_request_app_link_update_peer(nxt_task_t *task,
- nxt_request_app_link_t *req_app_link)
-{
- nxt_event_engine_t *engine;
- nxt_request_rpc_data_t *req_rpc_data;
-
- engine = req_app_link->work.data;
-
- if (task->thread->engine != engine) {
- nxt_request_app_link_inc_use(req_app_link);
-
- req_app_link->work.handler = nxt_request_app_link_update_peer_handler;
- req_app_link->work.task = &engine->task;
- req_app_link->work.next = NULL;
-
- nxt_debug(task, "req_app_link stream #%uD post update peer to %p",
- req_app_link->stream, engine);
-
- nxt_event_engine_post(engine, &req_app_link->work);
-
- return;
- }
-
- nxt_debug(task, "req_app_link stream #%uD update peer",
- req_app_link->stream);
-
- req_rpc_data = req_app_link->req_rpc_data;
-
- if (req_rpc_data != NULL && req_app_link->app_port != NULL) {
- nxt_port_rpc_ex_set_peer(task, engine->port, req_rpc_data,
- req_app_link->app_port->pid);
- }
-}
-
-
-static void
-nxt_request_app_link_release(nxt_task_t *task,
- nxt_request_app_link_t *req_app_link)
-{
- nxt_mp_t *mp;
- nxt_http_request_t *r;
- nxt_request_rpc_data_t *req_rpc_data;
-
- nxt_assert(task->thread->engine == req_app_link->work.data);
- nxt_assert(req_app_link->use_count == 0);
-
- nxt_debug(task, "req_app_link stream #%uD release", req_app_link->stream);
-
- req_rpc_data = req_app_link->req_rpc_data;
-
- if (req_rpc_data != NULL) {
- if (nxt_slow_path(req_app_link->err_code != 0)) {
- nxt_http_request_error(task, req_rpc_data->request,
- req_app_link->err_code);
-
- } else {
- req_rpc_data->app_port = req_app_link->app_port;
- req_rpc_data->apr_action = req_app_link->apr_action;
- req_rpc_data->msg_info = req_app_link->msg_info;
-
- if (req_rpc_data->app->timeout != 0) {
- r = req_rpc_data->request;
-
- r->timer.handler = nxt_router_app_timeout;
- r->timer_data = req_rpc_data;
- nxt_timer_add(task->thread->engine, &r->timer,
- req_rpc_data->app->timeout);
- }
-
- req_app_link->app_port = NULL;
- req_app_link->msg_info.buf = NULL;
- }
-
- req_rpc_data->req_app_link = NULL;
- req_app_link->req_rpc_data = NULL;
- }
-
- if (req_app_link->app_port != NULL) {
- nxt_router_app_port_release(task, req_app_link->app_port,
- req_app_link->apr_action);
-
- req_app_link->app_port = NULL;
- }
-
- if (req_app_link->body_fd != -1) {
- nxt_fd_close(req_app_link->body_fd);
-
- req_app_link->body_fd = -1;
- }
-
- nxt_router_msg_cancel(task, &req_app_link->msg_info, req_app_link->stream);
-
- mp = req_app_link->mem_pool;
-
- if (mp != NULL) {
- nxt_mp_free(mp, req_app_link);
- nxt_mp_release(mp);
- }
-}
-
-
-static void
-nxt_request_app_link_release_handler(nxt_task_t *task, void *obj, void *data)
-{
- nxt_request_app_link_t *req_app_link;
-
- req_app_link = obj;
-
- nxt_assert(req_app_link->work.data == data);
-
- nxt_request_app_link_use(task, req_app_link, -1);
-}
-
-
-static void
-nxt_request_app_link_use(nxt_task_t *task, nxt_request_app_link_t *req_app_link,
- int i)
-{
- int c;
- nxt_event_engine_t *engine;
-
- c = nxt_atomic_fetch_add(&req_app_link->use_count, i);
-
- if (i < 0 && c == -i) {
- engine = req_app_link->work.data;
-
- if (task->thread->engine == engine) {
- nxt_request_app_link_release(task, req_app_link);
-
- return;
- }
-
- nxt_request_app_link_inc_use(req_app_link);
-
- req_app_link->work.handler = nxt_request_app_link_release_handler;
- req_app_link->work.task = &engine->task;
- req_app_link->work.next = NULL;
-
- nxt_debug(task, "req_app_link stream #%uD post release to %p",
- req_app_link->stream, engine);
-
- nxt_event_engine_post(engine, &req_app_link->work);
- }
-}
-
-
-nxt_inline void
-nxt_request_app_link_error(nxt_task_t *task, nxt_app_t *app,
- nxt_request_app_link_t *req_app_link, const char *str)
-{
- req_app_link->app_port = NULL;
- req_app_link->err_code = 500;
- req_app_link->err_str = str;
-
- nxt_alert(task, "app \"%V\" internal error: %s on #%uD",
- &app->name, str, req_app_link->stream);
-}
-
-
-nxt_inline void
-nxt_request_app_link_pending(nxt_task_t *task, nxt_app_t *app,
- nxt_request_app_link_t *req_app_link)
-{
- nxt_queue_insert_tail(&req_app_link->app_port->pending_requests,
- &req_app_link->link_port_pending);
- nxt_queue_insert_tail(&app->pending, &req_app_link->link_app_pending);
-
- nxt_request_app_link_inc_use(req_app_link);
-
- req_app_link->res_time = nxt_thread_monotonic_time(task->thread)
- + app->res_timeout;
-
- nxt_debug(task, "req_app_link stream #%uD enqueue to pending_requests",
- req_app_link->stream);
-}
-
-
nxt_inline nxt_bool_t
nxt_queue_chk_remove(nxt_queue_link_t *lnk)
{
@@ -825,8 +516,9 @@ nxt_inline void
nxt_request_rpc_data_unlink(nxt_task_t *task,
nxt_request_rpc_data_t *req_rpc_data)
{
- int ra_use_delta;
- nxt_request_app_link_t *req_app_link;
+ nxt_http_request_t *r;
+
+ nxt_router_msg_cancel(task, &req_rpc_data->msg_info, req_rpc_data->stream);
if (req_rpc_data->app_port != NULL) {
nxt_router_app_port_release(task, req_rpc_data->app_port,
@@ -835,53 +527,34 @@ nxt_request_rpc_data_unlink(nxt_task_t *task,
req_rpc_data->app_port = NULL;
}
- nxt_router_msg_cancel(task, &req_rpc_data->msg_info, req_rpc_data->stream);
-
- req_app_link = req_rpc_data->req_app_link;
- if (req_app_link != NULL) {
- req_rpc_data->req_app_link = NULL;
- req_app_link->req_rpc_data = NULL;
-
- ra_use_delta = 0;
-
- nxt_thread_mutex_lock(&req_rpc_data->app->mutex);
+ if (req_rpc_data->app != NULL) {
+ nxt_router_app_use(task, req_rpc_data->app, -1);
- if (req_app_link->link_app_requests.next == NULL
- && req_app_link->link_port_pending.next == NULL
- && req_app_link->link_app_pending.next == NULL
- && req_app_link->link_port_websockets.next == NULL)
- {
- req_app_link = NULL;
+ req_rpc_data->app = NULL;
+ }
- } else {
- ra_use_delta -=
- nxt_queue_chk_remove(&req_app_link->link_app_requests)
- + nxt_queue_chk_remove(&req_app_link->link_port_pending)
- + nxt_queue_chk_remove(&req_app_link->link_port_websockets);
+ r = req_rpc_data->request;
- nxt_queue_chk_remove(&req_app_link->link_app_pending);
- }
+ if (r != NULL) {
+ r->timer_data = NULL;
- nxt_thread_mutex_unlock(&req_rpc_data->app->mutex);
+ nxt_router_http_request_release_post(task, r);
- if (req_app_link != NULL) {
- nxt_request_app_link_use(task, req_app_link, ra_use_delta);
- }
+ r->req_rpc_data = NULL;
+ req_rpc_data->request = NULL;
}
- if (req_rpc_data->app != NULL) {
- nxt_router_app_use(task, req_rpc_data->app, -1);
+ if (req_rpc_data->msg_info.body_fd != -1) {
+ nxt_fd_close(req_rpc_data->msg_info.body_fd);
- req_rpc_data->app = NULL;
+ req_rpc_data->msg_info.body_fd = -1;
}
- if (req_rpc_data->request != NULL) {
- req_rpc_data->request->timer_data = NULL;
-
- nxt_router_http_request_done(task, req_rpc_data->request);
+ if (req_rpc_data->rpc_cancel) {
+ req_rpc_data->rpc_cancel = 0;
- req_rpc_data->request->req_rpc_data = NULL;
- req_rpc_data->request = NULL;
+ nxt_port_rpc_cancel(task, task->thread->engine->port,
+ req_rpc_data->stream);
}
}
@@ -889,25 +562,62 @@ nxt_request_rpc_data_unlink(nxt_task_t *task,
void
nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
{
+ nxt_app_t *app;
+ nxt_port_t *port, *main_app_port;
+ nxt_runtime_t *rt;
+
nxt_port_new_port_handler(task, msg);
- if (msg->u.new_port != NULL
- && msg->u.new_port->type == NXT_PROCESS_CONTROLLER)
- {
+ port = msg->u.new_port;
+
+ if (port != NULL && port->type == NXT_PROCESS_CONTROLLER) {
nxt_router_greet_controller(task, msg->u.new_port);
}
- if (msg->port_msg.stream == 0) {
- return;
- }
+ if (port == NULL || port->type != NXT_PROCESS_APP) {
+
+ if (msg->port_msg.stream == 0) {
+ return;
+ }
- if (msg->u.new_port == NULL
- || msg->u.new_port->type != NXT_PROCESS_APP)
- {
msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
}
- nxt_port_rpc_handler(task, msg);
+ if (msg->port_msg.stream != 0) {
+ nxt_port_rpc_handler(task, msg);
+ return;
+ }
+
+ /*
+ * Port with "id == 0" is application 'main' port and it always
+ * should come with non-zero stream.
+ */
+ nxt_assert(port->id != 0);
+
+ /* Find 'main' app port and get app reference. */
+ rt = task->thread->runtime;
+
+ /*
+ * It is safe to access 'runtime->ports' hash because 'NEW_PORT'
+ * sent to main port (with id == 0) and processed in main thread.
+ */
+ main_app_port = nxt_port_hash_find(&rt->ports, port->pid, 0);
+ nxt_assert(main_app_port != NULL);
+
+ app = main_app_port->app;
+ nxt_assert(app != NULL);
+
+ nxt_thread_mutex_lock(&app->mutex);
+
+ /* TODO here should be find-and-add code because there can be
+ port waiters in port_hash */
+ nxt_port_hash_add(&app->port_hash, port);
+ app->port_hash_count++;
+
+ nxt_thread_mutex_unlock(&app->mutex);
+
+ port->app = app;
+ port->main_app_port = main_app_port;
}
@@ -1100,8 +810,10 @@ nxt_router_app_can_start(nxt_app_t *app)
nxt_inline nxt_bool_t
nxt_router_app_need_start(nxt_app_t *app)
{
- return app->idle_processes + app->pending_processes
- < app->spare_processes;
+ return (app->active_requests
+ > app->port_hash_count + app->pending_processes)
+ || (app->spare_processes
+ > app->idle_processes + app->pending_processes);
}
@@ -1530,6 +1242,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
nxt_app_t *app, *prev;
nxt_str_t *t, *s, *targets;
nxt_uint_t n, i;
+ nxt_port_t *port;
nxt_router_t *router;
nxt_app_joint_t *app_joint;
nxt_conf_value_t *conf, *http, *value, *websocket;
@@ -1744,8 +1457,6 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
nxt_queue_init(&app->ports);
nxt_queue_init(&app->spare_ports);
nxt_queue_init(&app->idle_ports);
- nxt_queue_init(&app->requests);
- nxt_queue_init(&app->pending);
app->name.length = name.length;
nxt_memcpy(app->name.start, name.start, name.length);
@@ -1758,7 +1469,6 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
app->timeout = apcf.timeout;
app->res_timeout = apcf.res_timeout * 1000000;
app->idle_timeout = apcf.idle_timeout;
- app->max_pending_responses = 2;
app->max_requests = apcf.requests;
app->targets = targets;
@@ -1789,6 +1499,25 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
app_joint->free_app_work.handler = nxt_router_free_app;
app_joint->free_app_work.task = &engine->task;
app_joint->free_app_work.obj = app_joint;
+
+ port = nxt_port_new(task, (nxt_port_id_t) -1, nxt_pid,
+ NXT_PROCESS_APP);
+ if (nxt_slow_path(port == NULL)) {
+ return NXT_ERROR;
+ }
+
+ ret = nxt_port_socket_init(task, port, 0);
+ if (nxt_slow_path(ret != NXT_OK)) {
+ nxt_port_use(task, port, -1);
+ return NXT_ERROR;
+ }
+
+ nxt_port_write_enable(task, port);
+ port->app = app;
+
+ app->shared_port = port;
+
+ nxt_thread_mutex_create(&app->outgoing.mutex);
}
}
@@ -2522,7 +2251,13 @@ nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
app = rpc->app;
port = msg->u.new_port;
+
+ nxt_assert(port != NULL);
+ nxt_assert(port->type == NXT_PROCESS_APP);
+ nxt_assert(port->id == 0);
+
port->app = app;
+ port->main_app_port = port;
app->pending_processes--;
app->processes++;
@@ -2532,11 +2267,15 @@ nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
nxt_queue_insert_tail(&app->ports, &port->app_link);
nxt_queue_insert_tail(&app->spare_ports, &port->idle_link);
+ nxt_port_hash_add(&app->port_hash, port);
+ app->port_hash_count++;
port->idle_start = 0;
nxt_port_inc_use(port);
+ nxt_router_app_shared_port_send(task, port);
+
nxt_work_queue_add(&engine->fast_work_queue,
nxt_router_conf_apply, task, rpc->temp_conf, NULL);
}
@@ -2939,10 +2678,11 @@ nxt_router_engine_post(nxt_event_engine_t *engine, nxt_work_t *jobs)
static nxt_port_handlers_t nxt_router_app_port_handlers = {
- .rpc_error = nxt_port_rpc_handler,
- .mmap = nxt_port_mmap_handler,
- .data = nxt_port_rpc_handler,
- .oosm = nxt_router_oosm_handler,
+ .rpc_error = nxt_port_rpc_handler,
+ .mmap = nxt_port_mmap_handler,
+ .data = nxt_port_rpc_handler,
+ .oosm = nxt_router_oosm_handler,
+ .req_headers_ack = nxt_port_rpc_handler,
};
@@ -3736,22 +3476,17 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
void *data)
{
nxt_int_t ret;
+ nxt_app_t *app;
nxt_buf_t *b, *next;
nxt_port_t *app_port;
nxt_unit_field_t *f;
nxt_http_field_t *field;
nxt_http_request_t *r;
nxt_unit_response_t *resp;
- nxt_request_app_link_t *req_app_link;
nxt_request_rpc_data_t *req_rpc_data;
- b = msg->buf;
req_rpc_data = data;
- if (msg->size == 0) {
- b = NULL;
- }
-
r = req_rpc_data->request;
if (nxt_slow_path(r == NULL)) {
return;
@@ -3762,19 +3497,32 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
return;
}
+ app = req_rpc_data->app;
+ nxt_assert(app != NULL);
+
+ if (msg->port_msg.type == _NXT_PORT_MSG_REQ_HEADERS_ACK) {
+ nxt_router_req_headers_ack_handler(task, msg, req_rpc_data);
+
+ return;
+ }
+
+ b = (msg->size == 0) ? NULL : msg->buf;
+
if (msg->port_msg.last != 0) {
nxt_debug(task, "router data create last buf");
nxt_buf_chain_add(&b, nxt_http_buf_last(r));
+ req_rpc_data->rpc_cancel = 0;
+ req_rpc_data->apr_action = NXT_APR_GOT_RESPONSE;
+
nxt_request_rpc_data_unlink(task, req_rpc_data);
} else {
- if (req_rpc_data->app != NULL && req_rpc_data->app->timeout != 0) {
+ if (app->timeout != 0) {
r->timer.handler = nxt_router_app_timeout;
r->timer_data = req_rpc_data;
- nxt_timer_add(task->thread->engine, &r->timer,
- req_rpc_data->app->timeout);
+ nxt_timer_add(task->thread->engine, &r->timer, app->timeout);
}
}
@@ -3870,39 +3618,21 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
if (r->websocket_handshake
&& r->status == NXT_HTTP_SWITCHING_PROTOCOLS)
{
- req_app_link = nxt_request_app_link_alloc(task,
- req_rpc_data->req_app_link,
- req_rpc_data);
- if (nxt_slow_path(req_app_link == NULL)) {
- goto fail;
- }
-
- app_port = req_app_link->app_port;
-
- if (app_port == NULL && req_rpc_data->app_port != NULL) {
- req_app_link->app_port = req_rpc_data->app_port;
- app_port = req_app_link->app_port;
- req_app_link->apr_action = req_rpc_data->apr_action;
-
- req_rpc_data->app_port = NULL;
- }
-
+ app_port = req_rpc_data->app_port;
if (nxt_slow_path(app_port == NULL)) {
goto fail;
}
- nxt_thread_mutex_lock(&req_rpc_data->app->mutex);
+ nxt_thread_mutex_lock(&app->mutex);
- nxt_queue_insert_tail(&app_port->active_websockets,
- &req_app_link->link_port_websockets);
+ app_port->main_app_port->active_websockets++;
- nxt_thread_mutex_unlock(&req_rpc_data->app->mutex);
+ nxt_thread_mutex_unlock(&app->mutex);
nxt_router_app_port_release(task, app_port, NXT_APR_UPGRADE);
- req_app_link->apr_action = NXT_APR_CLOSE;
+ req_rpc_data->apr_action = NXT_APR_CLOSE;
- nxt_debug(task, "req_app_link stream #%uD upgrade",
- req_app_link->stream);
+ nxt_debug(task, "stream #%uD upgrade", req_rpc_data->stream);
r->state = &nxt_http_websocket;
@@ -3921,6 +3651,94 @@ fail:
}
+static void
+nxt_router_req_headers_ack_handler(nxt_task_t *task,
+ nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data)
+{
+ nxt_app_t *app;
+ nxt_bool_t start_process;
+ nxt_port_t *app_port, *main_app_port, *idle_port;
+ nxt_queue_link_t *idle_lnk;
+ nxt_http_request_t *r;
+
+ nxt_debug(task, "stream #%uD: got ack from %PI:%d",
+ req_rpc_data->stream,
+ msg->port_msg.pid, msg->port_msg.reply_port);
+
+ nxt_port_rpc_ex_set_peer(task, msg->port, req_rpc_data,
+ msg->port_msg.pid);
+
+ app = req_rpc_data->app;
+
+ start_process = 0;
+
+ nxt_thread_mutex_lock(&app->mutex);
+
+ app_port = nxt_port_hash_find(&app->port_hash, msg->port_msg.pid,
+ msg->port_msg.reply_port);
+ if (nxt_slow_path(app_port == NULL)) {
+ nxt_thread_mutex_unlock(&app->mutex);
+
+ r = req_rpc_data->request;
+ nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
+
+ return;
+ }
+
+ main_app_port = app_port->main_app_port;
+
+ if (nxt_queue_chk_remove(&main_app_port->idle_link)) {
+ app->idle_processes--;
+
+ /* Check port was in 'spare_ports' using idle_start field. */
+ if (main_app_port->idle_start == 0
+ && app->idle_processes >= app->spare_processes)
+ {
+ /*
+ * If there is a vacant space in spare ports,
+ * move the last idle to spare_ports.
+ */
+ nxt_assert(!nxt_queue_is_empty(&app->idle_ports));
+
+ idle_lnk = nxt_queue_last(&app->idle_ports);
+ idle_port = nxt_queue_link_data(idle_lnk, nxt_port_t, idle_link);
+ nxt_queue_remove(idle_lnk);
+
+ nxt_queue_insert_tail(&app->spare_ports, idle_lnk);
+
+ idle_port->idle_start = 0;
+ }
+
+ if (nxt_router_app_can_start(app) && nxt_router_app_need_start(app)) {
+ app->pending_processes++;
+ start_process = 1;
+ }
+ }
+
+ main_app_port->active_requests++;
+
+ nxt_port_inc_use(app_port);
+
+ nxt_thread_mutex_unlock(&app->mutex);
+
+ if (start_process) {
+ nxt_router_start_app_process(task, app);
+ }
+
+ nxt_port_use(task, req_rpc_data->app_port, -1);
+
+ req_rpc_data->app_port = app_port;
+
+ if (app->timeout != 0) {
+ r = req_rpc_data->request;
+
+ r->timer.handler = nxt_router_app_timeout;
+ r->timer_data = req_rpc_data;
+ nxt_timer_add(task->thread->engine, &r->timer, app->timeout);
+ }
+}
+
+
static const nxt_http_request_state_t nxt_http_request_send_state
nxt_aligned(64) =
{
@@ -3949,42 +3767,14 @@ static void
nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
void *data)
{
- nxt_int_t res;
- nxt_port_t *port;
- nxt_bool_t cancelled;
- nxt_request_app_link_t *req_app_link;
nxt_request_rpc_data_t *req_rpc_data;
req_rpc_data = data;
- req_app_link = req_rpc_data->req_app_link;
-
- if (req_app_link != NULL) {
- cancelled = nxt_router_msg_cancel(task, &req_app_link->msg_info,
- req_app_link->stream);
- if (cancelled) {
- res = nxt_router_app_port(task, req_rpc_data->app, req_app_link);
-
- if (res == NXT_OK) {
- port = req_app_link->app_port;
-
- if (nxt_slow_path(port == NULL)) {
- nxt_log(task, NXT_LOG_ERR,
- "port is NULL in cancelled req_app_link");
- return;
- }
-
- nxt_port_rpc_ex_set_peer(task, task->thread->engine->port,
- req_rpc_data, port->pid);
-
- nxt_router_app_prepare_request(task, req_app_link);
- }
+ req_rpc_data->rpc_cancel = 0;
- msg->port_msg.last = 0;
-
- return;
- }
- }
+ /* TODO cancel message and return if cancelled. */
+ // nxt_router_msg_cancel(task, &req_rpc_data->msg_info, req_rpc_data->stream);
if (req_rpc_data->request != NULL) {
nxt_http_request_error(task, req_rpc_data->request,
@@ -4008,6 +3798,8 @@ nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
nxt_assert(app_joint != NULL);
nxt_assert(port != NULL);
+ nxt_assert(port->type == NXT_PROCESS_APP);
+ nxt_assert(port->id == 0);
app = app_joint->app;
@@ -4022,6 +3814,7 @@ nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
}
port->app = app;
+ port->main_app_port = port;
nxt_thread_mutex_lock(&app->mutex);
@@ -4029,24 +3822,60 @@ nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
app->pending_processes--;
app->processes++;
+ nxt_port_hash_add(&app->port_hash, port);
+ app->port_hash_count++;
nxt_thread_mutex_unlock(&app->mutex);
nxt_debug(task, "app '%V' new port ready, pid %PI, %d/%d",
&app->name, port->pid, app->processes, app->pending_processes);
+ nxt_router_app_shared_port_send(task, port);
+
nxt_router_app_port_release(task, port, NXT_APR_NEW_PORT);
}
+static nxt_int_t
+nxt_router_app_shared_port_send(nxt_task_t *task, nxt_port_t *app_port)
+{
+ nxt_buf_t *b;
+ nxt_port_t *port;
+ nxt_port_msg_new_port_t *msg;
+
+ b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool,
+ sizeof(nxt_port_data_t));
+ if (nxt_slow_path(b == NULL)) {
+ return NXT_ERROR;
+ }
+
+ port = app_port->app->shared_port;
+
+ nxt_debug(task, "send port %FD to process %PI",
+ port->pair[0], app_port->pid);
+
+ b->mem.free += sizeof(nxt_port_msg_new_port_t);
+ msg = (nxt_port_msg_new_port_t *) b->mem.pos;
+
+ msg->id = port->id;
+ msg->pid = port->pid;
+ msg->max_size = port->max_size;
+ msg->max_share = port->max_share;
+ msg->type = port->type;
+
+ return nxt_port_socket_twrite(task, app_port,
+ NXT_PORT_MSG_NEW_PORT,
+ port->pair[0],
+ 0, 0, b, NULL);
+}
+
+
static void
nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
void *data)
{
- nxt_app_t *app;
- nxt_app_joint_t *app_joint;
- nxt_queue_link_t *lnk;
- nxt_request_app_link_t *req_app_link;
+ nxt_app_t *app;
+ nxt_app_joint_t *app_joint;
app_joint = data;
@@ -4070,32 +3899,11 @@ nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
app->pending_processes--;
- if (!nxt_queue_is_empty(&app->requests)) {
- lnk = nxt_queue_last(&app->requests);
- nxt_queue_remove(lnk);
- lnk->next = NULL;
-
- req_app_link = nxt_queue_link_data(lnk, nxt_request_app_link_t,
- link_app_requests);
-
- } else {
- req_app_link = NULL;
- }
-
nxt_thread_mutex_unlock(&app->mutex);
- if (req_app_link != NULL) {
- nxt_debug(task, "app '%V' %p abort next stream #%uD",
- &app->name, app, req_app_link->stream);
-
- nxt_request_app_link_error(task, app, req_app_link,
- "Failed to start application process");
- nxt_request_app_link_use(task, req_app_link, -1);
- }
+ /* TODO req_app_link to cancel first pending message */
}
-nxt_inline nxt_port_t *
-nxt_router_app_get_port_for_quit(nxt_app_t *app);
void
nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i)
@@ -4116,63 +3924,6 @@ nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i)
}
-nxt_inline nxt_bool_t
-nxt_router_app_first_port_busy(nxt_app_t *app)
-{
- nxt_port_t *port;
- nxt_queue_link_t *lnk;
-
- lnk = nxt_queue_first(&app->ports);
- port = nxt_queue_link_data(lnk, nxt_port_t, app_link);
-
- return port->app_pending_responses > 0;
-}
-
-
-nxt_inline nxt_port_t *
-nxt_router_pop_first_port(nxt_app_t *app)
-{
- nxt_port_t *port;
- nxt_queue_link_t *lnk;
-
- lnk = nxt_queue_first(&app->ports);
- nxt_queue_remove(lnk);
-
- port = nxt_queue_link_data(lnk, nxt_port_t, app_link);
-
- port->app_pending_responses++;
-
- if (nxt_queue_chk_remove(&port->idle_link)) {
- app->idle_processes--;
-
- if (port->idle_start == 0) {
- nxt_assert(app->idle_processes < app->spare_processes);
-
- } else {
- nxt_assert(app->idle_processes >= app->spare_processes);
-
- port->idle_start = 0;
- }
- }
-
- if ((app->max_pending_responses == 0
- || port->app_pending_responses < app->max_pending_responses)
- && (app->max_requests == 0
- || port->app_responses + port->app_pending_responses
- < app->max_requests))
- {
- nxt_queue_insert_tail(&app->ports, lnk);
-
- nxt_port_inc_use(port);
-
- } else {
- lnk->next = NULL;
- }
-
- return port;
-}
-
-
nxt_inline nxt_port_t *
nxt_router_app_get_port_for_quit(nxt_app_t *app)
{
@@ -4184,12 +3935,6 @@ nxt_router_app_get_port_for_quit(nxt_app_t *app)
nxt_queue_each(port, &app->ports, nxt_port_t, app_link) {
- if (port->app_pending_responses > 0) {
- port = NULL;
-
- continue;
- }
-
/* Caller is responsible to decrease port use count. */
nxt_queue_chk_remove(&port->app_link);
@@ -4197,6 +3942,9 @@ nxt_router_app_get_port_for_quit(nxt_app_t *app)
app->idle_processes--;
}
+ nxt_port_hash_remove(&app->port_hash, port);
+ app->port_hash_count--;
+
port->app = NULL;
app->processes--;
@@ -4222,71 +3970,36 @@ nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app)
static void
-nxt_router_app_process_request(nxt_task_t *task, void *obj, void *data)
-{
- nxt_request_app_link_t *req_app_link;
-
- req_app_link = data;
-
-#if (NXT_DEBUG)
- {
- nxt_app_t *app;
-
- app = obj;
-
- nxt_assert(app != NULL);
- nxt_assert(req_app_link != NULL);
- nxt_assert(req_app_link->app_port != NULL);
-
- nxt_debug(task, "app '%V' %p process next stream #%uD",
- &app->name, app, req_app_link->stream);
- }
-#endif
-
- nxt_router_app_prepare_request(task, req_app_link);
-
- nxt_request_app_link_use(task, req_app_link, -1);
-}
-
-
-static void
nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
nxt_apr_action_t action)
{
- int inc_use;
- uint32_t dec_pending, got_response;
- nxt_app_t *app;
- nxt_bool_t port_unchained;
- nxt_bool_t send_quit, cancelled, adjust_idle_timer;
- nxt_queue_link_t *lnk;
- nxt_request_app_link_t *req_app_link, *pending_ra, *re_ra;
- nxt_port_select_state_t state;
+ int inc_use;
+ uint32_t got_response, dec_requests;
+ nxt_app_t *app;
+ nxt_bool_t port_unchained, send_quit, adjust_idle_timer;
+ nxt_port_t *main_app_port;
nxt_assert(port != NULL);
nxt_assert(port->app != NULL);
- req_app_link = NULL;
-
app = port->app;
inc_use = 0;
- dec_pending = 0;
got_response = 0;
+ dec_requests = 0;
switch (action) {
case NXT_APR_NEW_PORT:
break;
case NXT_APR_REQUEST_FAILED:
- dec_pending = 1;
+ dec_requests = 1;
inc_use = -1;
break;
case NXT_APR_GOT_RESPONSE:
- dec_pending = 1;
got_response = 1;
inc_use = -1;
break;
case NXT_APR_UPGRADE:
- dec_pending = 1;
got_response = 1;
break;
case NXT_APR_CLOSE:
@@ -4294,120 +4007,49 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
break;
}
- nxt_thread_mutex_lock(&app->mutex);
-
- port->app_pending_responses -= dec_pending;
- port->app_responses += got_response;
+ nxt_debug(task, "app '%V' release port %PI:%d: %d %d", &app->name,
+ port->pid, port->id,
+ (int) inc_use, (int) got_response);
- if (port->pair[1] != -1
- && (app->max_pending_responses == 0
- || port->app_pending_responses < app->max_pending_responses)
- && (app->max_requests == 0
- || port->app_responses + port->app_pending_responses
- < app->max_requests))
- {
- if (port->app_link.next == NULL) {
- if (port->app_pending_responses > 0) {
- nxt_queue_insert_tail(&app->ports, &port->app_link);
+ if (port == app->shared_port) {
+ nxt_thread_mutex_lock(&app->mutex);
- } else {
- nxt_queue_insert_head(&app->ports, &port->app_link);
- }
+ app->active_requests -= got_response + dec_requests;
- nxt_port_inc_use(port);
+ nxt_thread_mutex_unlock(&app->mutex);
- } else {
- if (port->app_pending_responses == 0
- && nxt_queue_first(&app->ports) != &port->app_link)
- {
- nxt_queue_remove(&port->app_link);
- nxt_queue_insert_head(&app->ports, &port->app_link);
- }
- }
+ goto adjust_use;
}
- if (!nxt_queue_is_empty(&app->ports)
- && !nxt_queue_is_empty(&app->requests))
- {
- lnk = nxt_queue_first(&app->requests);
- nxt_queue_remove(lnk);
- lnk->next = NULL;
-
- req_app_link = nxt_queue_link_data(lnk, nxt_request_app_link_t,
- link_app_requests);
+ main_app_port = port->main_app_port;
- req_app_link->app_port = nxt_router_pop_first_port(app);
+ nxt_thread_mutex_lock(&app->mutex);
- if (req_app_link->app_port->app_pending_responses > 1) {
- nxt_request_app_link_pending(task, app, req_app_link);
- }
- }
+ main_app_port->app_responses += got_response;
+ main_app_port->active_requests -= got_response + dec_requests;
+ app->active_requests -= got_response + dec_requests;
- /* Pop first pending request for this port. */
- if (dec_pending > 0
- && !nxt_queue_is_empty(&port->pending_requests))
+ if (main_app_port->pair[1] != -1
+ && (app->max_requests == 0
+ || main_app_port->app_responses < app->max_requests))
{
- lnk = nxt_queue_first(&port->pending_requests);
- nxt_queue_remove(lnk);
- lnk->next = NULL;
-
- pending_ra = nxt_queue_link_data(lnk, nxt_request_app_link_t,
- link_port_pending);
-
- nxt_assert(pending_ra->link_app_pending.next != NULL);
-
- nxt_queue_remove(&pending_ra->link_app_pending);
- pending_ra->link_app_pending.next = NULL;
-
- } else {
- pending_ra = NULL;
- }
-
- /* Try to cancel and re-schedule first stalled request for this app. */
- if (got_response > 0 && !nxt_queue_is_empty(&app->pending)) {
- lnk = nxt_queue_first(&app->pending);
-
- re_ra = nxt_queue_link_data(lnk, nxt_request_app_link_t,
- link_app_pending);
-
- if (re_ra->res_time <= nxt_thread_monotonic_time(task->thread)) {
+ if (main_app_port->app_link.next == NULL) {
+ nxt_queue_insert_tail(&app->ports, &main_app_port->app_link);
- nxt_debug(task, "app '%V' stalled request #%uD detected",
- &app->name, re_ra->stream);
-
- cancelled = nxt_router_msg_cancel(task, &re_ra->msg_info,
- re_ra->stream);
-
- if (cancelled) {
- state.req_app_link = re_ra;
- state.app = app;
-
- /*
- * Need to increment use count "in advance" because
- * nxt_router_port_select() will remove re_ra from lists
- * and decrement use count.
- */
- nxt_request_app_link_inc_use(re_ra);
-
- nxt_router_port_select(task, &state);
-
- goto re_ra_cancelled;
- }
+ nxt_port_inc_use(main_app_port);
}
}
- re_ra = NULL;
-
-re_ra_cancelled:
-
send_quit = (app->max_requests > 0
- && port->app_pending_responses == 0
- && port->app_responses >= app->max_requests);
+ && main_app_port->app_responses >= app->max_requests);
if (send_quit) {
- port_unchained = nxt_queue_chk_remove(&port->app_link);
+ port_unchained = nxt_queue_chk_remove(&main_app_port->app_link);
- port->app = NULL;
+ nxt_port_hash_remove(&app->port_hash, main_app_port);
+ app->port_hash_count--;
+
+ main_app_port->app = NULL;
app->processes--;
} else {
@@ -4416,9 +4058,10 @@ re_ra_cancelled:
adjust_idle_timer = 0;
- if (port->pair[1] != -1 && !send_quit && port->app_pending_responses == 0
- && nxt_queue_is_empty(&port->active_websockets)
- && port->idle_link.next == NULL)
+ if (main_app_port->pair[1] != -1 && !send_quit
+ && main_app_port->active_requests == 0
+ && main_app_port->active_websockets == 0
+ && main_app_port->idle_link.next == NULL)
{
if (app->idle_processes == app->spare_processes
&& app->adjust_idle_work.data == NULL)
@@ -4429,12 +4072,12 @@ re_ra_cancelled:
}
if (app->idle_processes < app->spare_processes) {
- nxt_queue_insert_tail(&app->spare_ports, &port->idle_link);
+ nxt_queue_insert_tail(&app->spare_ports, &main_app_port->idle_link);
} else {
- nxt_queue_insert_tail(&app->idle_ports, &port->idle_link);
+ nxt_queue_insert_tail(&app->idle_ports, &main_app_port->idle_link);
- port->idle_start = task->thread->engine->timers.now;
+ main_app_port->idle_start = task->thread->engine->timers.now;
}
app->idle_processes++;
@@ -4447,60 +4090,22 @@ re_ra_cancelled:
nxt_event_engine_post(app->engine, &app->adjust_idle_work);
}
- if (pending_ra != NULL) {
- nxt_request_app_link_use(task, pending_ra, -1);
- }
-
- if (re_ra != NULL) {
- if (nxt_router_port_post_select(task, &state) == NXT_OK) {
- /*
- * Reference counter already incremented above, this will
- * keep re_ra while nxt_router_app_process_request()
- * task is in queue. Reference counter decreased in
- * nxt_router_app_process_request() after processing.
- */
-
- nxt_work_queue_add(&task->thread->engine->fast_work_queue,
- nxt_router_app_process_request,
- &task->thread->engine->task, app, re_ra);
-
- } else {
- nxt_request_app_link_use(task, re_ra, -1);
- }
- }
-
- if (req_app_link != NULL) {
- /*
- * There should be call nxt_request_app_link_inc_use(req_app_link),
- * because of one more link in the queue. But one link was
- * recently removed from app->requests linked list.
- * Corresponding decrement is in nxt_router_app_process_request().
- */
-
- nxt_work_queue_add(&task->thread->engine->fast_work_queue,
- nxt_router_app_process_request,
- &task->thread->engine->task, app, req_app_link);
-
- goto adjust_use;
- }
-
/* ? */
- if (port->pair[1] == -1) {
+ if (main_app_port->pair[1] == -1) {
nxt_debug(task, "app '%V' %p port %p already closed (pid %PI dead?)",
- &app->name, app, port, port->pid);
+ &app->name, app, main_app_port, main_app_port->pid);
goto adjust_use;
}
if (send_quit) {
- nxt_debug(task, "app '%V' %p send QUIT to port",
- &app->name, app);
+ nxt_debug(task, "app '%V' %p send QUIT to port", &app->name, app);
- nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT,
- -1, 0, 0, NULL);
+ nxt_port_socket_write(task, main_app_port, NXT_PORT_MSG_QUIT, -1, 0, 0,
+ NULL);
if (port_unchained) {
- nxt_port_use(task, port, -1);
+ nxt_port_use(task, main_app_port, -1);
}
goto adjust_use;
@@ -4529,6 +4134,18 @@ nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port)
nxt_thread_mutex_lock(&app->mutex);
+ nxt_port_hash_remove(&app->port_hash, port);
+ app->port_hash_count--;
+
+ if (port->id != 0) {
+ nxt_thread_mutex_unlock(&app->mutex);
+
+ nxt_debug(task, "app '%V' port (%PI, %d) closed", &app->name,
+ port->pid, port->id);
+
+ return;
+ }
+
unchain = nxt_queue_chk_remove(&port->app_link);
if (nxt_queue_chk_remove(&port->idle_link)) {
@@ -4553,8 +4170,7 @@ nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port)
start_process = !task->thread->engine->shutdown
&& nxt_router_app_can_start(app)
- && (!nxt_queue_is_empty(&app->requests)
- || nxt_router_app_need_start(app));
+ && nxt_router_app_need_start(app);
if (start_process) {
app->pending_processes++;
@@ -4603,6 +4219,10 @@ nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, void *data)
app->adjust_idle_work.data = NULL;
}
+ nxt_debug(task, "app '%V' idle_processes %d, spare_processes %d",
+ &app->name,
+ (int) app->idle_processes, (int) app->spare_processes);
+
while (app->idle_processes > app->spare_processes) {
nxt_assert(!nxt_queue_is_empty(&app->idle_ports));
@@ -4612,6 +4232,10 @@ nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, void *data)
timeout = port->idle_start + app->idle_timeout;
+ nxt_debug(task, "app '%V' pid %PI, start %M, timeout %M, threshold %M",
+ &app->name, port->pid,
+ port->idle_start, timeout, threshold);
+
if (timeout > threshold) {
break;
}
@@ -4621,6 +4245,9 @@ nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, void *data)
nxt_queue_chk_remove(&port->app_link);
+ nxt_port_hash_remove(&app->port_hash, port);
+ app->port_hash_count--;
+
app->idle_processes--;
app->processes--;
port->app = NULL;
@@ -4704,12 +4331,23 @@ nxt_router_free_app(nxt_task_t *task, void *obj, void *data)
}
nxt_assert(app->processes == 0);
+ nxt_assert(app->active_requests == 0);
+ nxt_assert(app->port_hash_count == 0);
nxt_assert(app->idle_processes == 0);
- nxt_assert(nxt_queue_is_empty(&app->requests));
nxt_assert(nxt_queue_is_empty(&app->ports));
nxt_assert(nxt_queue_is_empty(&app->spare_ports));
nxt_assert(nxt_queue_is_empty(&app->idle_ports));
+ nxt_port_mmaps_destroy(&app->outgoing, 1);
+
+ nxt_thread_mutex_destroy(&app->outgoing.mutex);
+
+ if (app->shared_port != NULL) {
+ app->shared_port->app = NULL;
+ nxt_port_close(task, app->shared_port);
+ nxt_port_use(task, app->shared_port, -1);
+ }
+
nxt_thread_mutex_destroy(&app->mutex);
nxt_mp_destroy(app->mem_pool);
@@ -4726,178 +4364,34 @@ nxt_router_free_app(nxt_task_t *task, void *obj, void *data)
static void
-nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state)
-{
- int ra_use_delta;
- nxt_app_t *app;
- nxt_bool_t can_start_process;
- nxt_request_app_link_t *req_app_link;
-
- req_app_link = state->req_app_link;
- app = state->app;
-
- state->failed_port_use_delta = 0;
- ra_use_delta = -nxt_queue_chk_remove(&req_app_link->link_app_requests);
-
- if (nxt_queue_chk_remove(&req_app_link->link_port_pending))
- {
- nxt_assert(req_app_link->link_app_pending.next != NULL);
-
- nxt_queue_remove(&req_app_link->link_app_pending);
- req_app_link->link_app_pending.next = NULL;
-
- ra_use_delta--;
- }
-
- state->failed_port = req_app_link->app_port;
-
- if (req_app_link->app_port != NULL) {
- state->failed_port_use_delta--;
-
- state->failed_port->app_pending_responses--;
-
- if (nxt_queue_chk_remove(&state->failed_port->app_link)) {
- state->failed_port_use_delta--;
- }
-
- req_app_link->app_port = NULL;
- }
-
- can_start_process = nxt_router_app_can_start(app);
-
- state->port = NULL;
- state->start_process = 0;
-
- if (nxt_queue_is_empty(&app->ports)
- || (can_start_process && nxt_router_app_first_port_busy(app)) )
- {
- req_app_link = nxt_request_app_link_alloc(task, req_app_link,
- req_app_link->req_rpc_data);
- if (nxt_slow_path(req_app_link == NULL)) {
- goto fail;
- }
-
- if (nxt_slow_path(state->failed_port != NULL)) {
- nxt_queue_insert_head(&app->requests,
- &req_app_link->link_app_requests);
-
- } else {
- nxt_queue_insert_tail(&app->requests,
- &req_app_link->link_app_requests);
- }
-
- nxt_request_app_link_inc_use(req_app_link);
-
- nxt_debug(task, "req_app_link stream #%uD enqueue to app->requests",
- req_app_link->stream);
-
- if (can_start_process) {
- app->pending_processes++;
- state->start_process = 1;
- }
-
- } else {
- state->port = nxt_router_pop_first_port(app);
-
- if (state->port->app_pending_responses > 1) {
- req_app_link = nxt_request_app_link_alloc(task, req_app_link,
- req_app_link->req_rpc_data);
- if (nxt_slow_path(req_app_link == NULL)) {
- goto fail;
- }
-
- req_app_link->app_port = state->port;
-
- nxt_request_app_link_pending(task, app, req_app_link);
- }
-
- if (can_start_process && nxt_router_app_need_start(app)) {
- app->pending_processes++;
- state->start_process = 1;
- }
- }
-
- nxt_request_app_link_chk_use(req_app_link, ra_use_delta);
-
-fail:
-
- state->shared_ra = req_app_link;
-}
-
-
-static nxt_int_t
-nxt_router_port_post_select(nxt_task_t *task, nxt_port_select_state_t *state)
+nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app,
+ nxt_request_rpc_data_t *req_rpc_data)
{
- nxt_int_t res;
- nxt_app_t *app;
- nxt_request_app_link_t *req_app_link;
-
- req_app_link = state->shared_ra;
- app = state->app;
-
- if (state->failed_port_use_delta != 0) {
- nxt_port_use(task, state->failed_port, state->failed_port_use_delta);
- }
-
- if (nxt_slow_path(req_app_link == NULL)) {
- if (state->port != NULL) {
- nxt_port_use(task, state->port, -1);
- }
-
- nxt_request_app_link_error(task, app, state->req_app_link,
- "Failed to allocate shared req<->app link");
-
- return NXT_ERROR;
- }
-
- if (state->port != NULL) {
- nxt_debug(task, "already have port for app '%V' %p ", &app->name, app);
+ nxt_bool_t start_process;
+ nxt_port_t *port;
- req_app_link->app_port = state->port;
+ start_process = 0;
- if (state->start_process) {
- nxt_router_start_app_process(task, app);
- }
+ nxt_thread_mutex_lock(&app->mutex);
- return NXT_OK;
- }
+ port = app->shared_port;
+ nxt_port_inc_use(port);
- if (!state->start_process) {
- nxt_debug(task, "app '%V' %p too many running or pending processes",
- &app->name, app);
+ app->active_requests++;
- return NXT_AGAIN;
+ if (nxt_router_app_can_start(app) && nxt_router_app_need_start(app)) {
+ app->pending_processes++;
+ start_process = 1;
}
- res = nxt_router_start_app_process(task, app);
+ nxt_thread_mutex_unlock(&app->mutex);
- if (nxt_slow_path(res != NXT_OK)) {
- nxt_request_app_link_error(task, app, req_app_link,
- "Failed to start app process");
+ req_rpc_data->app_port = port;
+ req_rpc_data->apr_action = NXT_APR_REQUEST_FAILED;
- return NXT_ERROR;
+ if (start_process) {
+ nxt_router_start_app_process(task, app);
}
-
- return NXT_AGAIN;
-}
-
-
-static nxt_int_t
-nxt_router_app_port(nxt_task_t *task, nxt_app_t *app,
- nxt_request_app_link_t *req_app_link)
-{
- nxt_port_select_state_t state;
-
- state.req_app_link = req_app_link;
- state.app = app;
-
- nxt_thread_mutex_lock(&app->mutex);
-
- nxt_router_port_select(task, &state);
-
- nxt_thread_mutex_unlock(&app->mutex);
-
- return nxt_router_port_post_select(task, &state);
}
@@ -4905,10 +4399,7 @@ void
nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r,
nxt_app_t *app)
{
- nxt_int_t res;
- nxt_port_t *port;
nxt_event_engine_t *engine;
- nxt_request_app_link_t ra_local, *req_app_link;
nxt_request_rpc_data_t *req_rpc_data;
engine = task->thread->engine;
@@ -4927,7 +4418,7 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r,
* in port handlers. Need to fixup request memory pool. Counterpart
* release will be called via following call chain:
* nxt_request_rpc_data_unlink() ->
- * nxt_router_http_request_done() ->
+ * nxt_router_http_request_release_post() ->
* nxt_router_http_request_release()
*/
nxt_mp_retain(r->mem_pool);
@@ -4939,29 +4430,37 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r,
req_rpc_data->stream = nxt_port_rpc_ex_stream(req_rpc_data);
req_rpc_data->app = app;
+ req_rpc_data->msg_info.body_fd = -1;
+ req_rpc_data->rpc_cancel = 1;
nxt_router_app_use(task, app, 1);
req_rpc_data->request = r;
r->req_rpc_data = req_rpc_data;
- req_app_link = &ra_local;
- nxt_request_app_link_init(task, req_app_link, req_rpc_data);
+ if (r->last != NULL) {
+ r->last->completion_handler = nxt_router_http_request_done;
+ }
- res = nxt_router_app_port(task, app, req_app_link);
- req_app_link = req_rpc_data->req_app_link;
+ nxt_router_app_port_get(task, app, req_rpc_data);
+ nxt_router_app_prepare_request(task, req_rpc_data);
+}
- if (res == NXT_OK) {
- port = req_app_link->app_port;
- nxt_assert(port != NULL);
+static void
+nxt_router_http_request_done(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_http_request_t *r;
+
+ r = data;
- nxt_port_rpc_ex_set_peer(task, engine->port, req_rpc_data, port->pid);
+ nxt_debug(task, "router http request done (rpc_data %p)", r->req_rpc_data);
- nxt_router_app_prepare_request(task, req_app_link);
+ if (r->req_rpc_data) {
+ nxt_request_rpc_data_unlink(task, r->req_rpc_data);
}
- nxt_request_app_link_use(task, req_app_link, -1);
+ nxt_http_request_close_handler(task, r, r->proto.any);
}
@@ -4973,76 +4472,80 @@ nxt_router_dummy_buf_completion(nxt_task_t *task, void *obj, void *data)
static void
nxt_router_app_prepare_request(nxt_task_t *task,
- nxt_request_app_link_t *req_app_link)
+ nxt_request_rpc_data_t *req_rpc_data)
{
- nxt_buf_t *buf;
+ nxt_app_t *app;
+ nxt_buf_t *buf, *body;
nxt_int_t res;
nxt_port_t *port, *reply_port;
- nxt_apr_action_t apr_action;
- nxt_assert(req_app_link->app_port != NULL);
+ app = req_rpc_data->app;
- port = req_app_link->app_port;
- reply_port = req_app_link->reply_port;
+ nxt_assert(app != NULL);
- apr_action = NXT_APR_REQUEST_FAILED;
+ port = req_rpc_data->app_port;
- buf = nxt_router_prepare_msg(task, req_app_link->request, port,
- nxt_app_msg_prefix[port->app->type]);
+ nxt_assert(port != NULL);
+
+ reply_port = task->thread->engine->port;
+ buf = nxt_router_prepare_msg(task, req_rpc_data->request, app,
+ nxt_app_msg_prefix[app->type]);
if (nxt_slow_path(buf == NULL)) {
- nxt_request_app_link_error(task, port->app, req_app_link,
- "Failed to prepare message for application");
- goto release_port;
+ nxt_alert(task, "stream #%uD, app '%V': failed to prepare app message",
+ req_rpc_data->stream, &app->name);
+
+ nxt_http_request_error(task, req_rpc_data->request,
+ NXT_HTTP_INTERNAL_SERVER_ERROR);
+
+ return;
}
nxt_debug(task, "about to send %O bytes buffer to app process port %d",
nxt_buf_used_size(buf),
port->socket.fd);
- apr_action = NXT_APR_NEW_PORT;
-
- req_app_link->msg_info.buf = buf;
- req_app_link->msg_info.completion_handler = buf->completion_handler;
+ req_rpc_data->msg_info.buf = buf;
+ req_rpc_data->msg_info.completion_handler = buf->completion_handler;
- for (; buf; buf = buf->next) {
+ do {
buf->completion_handler = nxt_router_dummy_buf_completion;
- }
+ buf = buf->next;
+ } while (buf != NULL);
- buf = req_app_link->msg_info.buf;
+ buf = req_rpc_data->msg_info.buf;
- res = nxt_port_mmap_get_tracking(task, &port->process->outgoing,
- &req_app_link->msg_info.tracking,
- req_app_link->stream);
- if (nxt_slow_path(res != NXT_OK)) {
- nxt_request_app_link_error(task, port->app, req_app_link,
- "Failed to get tracking area");
- goto release_port;
- }
+ body = req_rpc_data->request->body;
- if (req_app_link->body_fd != -1) {
- nxt_debug(task, "stream #%uD: send body fd %d", req_app_link->stream,
- req_app_link->body_fd);
+ if (body != NULL && nxt_buf_is_file(body)) {
+ req_rpc_data->msg_info.body_fd = body->file->fd;
- lseek(req_app_link->body_fd, 0, SEEK_SET);
+ body->file->fd = -1;
+
+ } else {
+ req_rpc_data->msg_info.body_fd = -1;
}
- res = nxt_port_socket_twrite(task, port, NXT_PORT_MSG_REQ_HEADERS,
- req_app_link->body_fd,
- req_app_link->stream, reply_port->id, buf,
- &req_app_link->msg_info.tracking);
+ if (req_rpc_data->msg_info.body_fd != -1) {
+ nxt_debug(task, "stream #%uD: send body fd %d", req_rpc_data->stream,
+ req_rpc_data->msg_info.body_fd);
- if (nxt_slow_path(res != NXT_OK)) {
- nxt_request_app_link_error(task, port->app, req_app_link,
- "Failed to send message to application");
- goto release_port;
+ lseek(req_rpc_data->msg_info.body_fd, 0, SEEK_SET);
}
-release_port:
+ res = nxt_port_socket_twrite(task, port,
+ NXT_PORT_MSG_REQ_HEADERS,
+ req_rpc_data->msg_info.body_fd,
+ req_rpc_data->stream, reply_port->id, buf,
+ NULL);
- nxt_router_app_port_release(task, port, apr_action);
+ if (nxt_slow_path(res != NXT_OK)) {
+ nxt_alert(task, "stream #%uD, app '%V': failed to send app message",
+ req_rpc_data->stream, &app->name);
- nxt_request_app_link_update_peer(task, req_app_link);
+ nxt_http_request_error(task, req_rpc_data->request,
+ NXT_HTTP_INTERNAL_SERVER_ERROR);
+ }
}
@@ -5100,7 +4603,7 @@ nxt_fields_next(nxt_fields_iter_t *i)
static nxt_buf_t *
nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r,
- nxt_port_t *port, const nxt_str_t *prefix)
+ nxt_app_t *app, const nxt_str_t *prefix)
{
void *target_pos, *query_pos;
u_char *pos, *end, *p, c;
@@ -5141,7 +4644,7 @@ nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r,
return NULL;
}
- out = nxt_port_mmap_get_buf(task, &port->process->outgoing,
+ out = nxt_port_mmap_get_buf(task, &app->outgoing,
nxt_min(req_size + content_length, PORT_MMAP_DATA_SIZE));
if (nxt_slow_path(out == NULL)) {
return NULL;
@@ -5323,8 +4826,7 @@ nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r,
if (buf == NULL) {
free_size = nxt_min(size, PORT_MMAP_DATA_SIZE);
- buf = nxt_port_mmap_get_buf(task, &port->process->outgoing,
- free_size);
+ buf = nxt_port_mmap_get_buf(task, &app->outgoing, free_size);
if (nxt_slow_path(buf == NULL)) {
while (out != NULL) {
buf = out->next;
@@ -5372,15 +4874,9 @@ nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r,
static void
nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data)
{
- nxt_app_t *app;
- nxt_bool_t cancelled, unlinked;
- nxt_port_t *port;
nxt_timer_t *timer;
- nxt_queue_link_t *lnk;
nxt_http_request_t *r;
- nxt_request_app_link_t *pending_ra;
nxt_request_rpc_data_t *req_rpc_data;
- nxt_port_select_state_t state;
timer = obj;
@@ -5388,94 +4884,6 @@ nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data)
r = nxt_timer_data(timer, nxt_http_request_t, timer);
req_rpc_data = r->timer_data;
- app = req_rpc_data->app;
-
- if (app == NULL) {
- goto generate_error;
- }
-
- port = NULL;
- pending_ra = NULL;
-
- if (req_rpc_data->app_port != NULL) {
- port = req_rpc_data->app_port;
- req_rpc_data->app_port = NULL;
- }
-
- if (port == NULL && req_rpc_data->req_app_link != NULL
- && req_rpc_data->req_app_link->app_port != NULL)
- {
- port = req_rpc_data->req_app_link->app_port;
- req_rpc_data->req_app_link->app_port = NULL;
- }
-
- if (port == NULL) {
- goto generate_error;
- }
-
- nxt_thread_mutex_lock(&app->mutex);
-
- unlinked = nxt_queue_chk_remove(&port->app_link);
-
- if (!nxt_queue_is_empty(&port->pending_requests)) {
- lnk = nxt_queue_first(&port->pending_requests);
-
- pending_ra = nxt_queue_link_data(lnk, nxt_request_app_link_t,
- link_port_pending);
-
- nxt_assert(pending_ra->link_app_pending.next != NULL);
-
- nxt_debug(task, "app '%V' pending request #%uD found",
- &app->name, pending_ra->stream);
-
- cancelled = nxt_router_msg_cancel(task, &pending_ra->msg_info,
- pending_ra->stream);
-
- if (cancelled) {
- state.req_app_link = pending_ra;
- state.app = app;
-
- /*
- * Need to increment use count "in advance" because
- * nxt_router_port_select() will remove pending_ra from lists
- * and decrement use count.
- */
- nxt_request_app_link_inc_use(pending_ra);
-
- nxt_router_port_select(task, &state);
-
- } else {
- pending_ra = NULL;
- }
- }
-
- nxt_thread_mutex_unlock(&app->mutex);
-
- if (pending_ra != NULL) {
- if (nxt_router_port_post_select(task, &state) == NXT_OK) {
- /*
- * Reference counter already incremented above, this will
- * keep pending_ra while nxt_router_app_process_request()
- * task is in queue. Reference counter decreased in
- * nxt_router_app_process_request() after processing.
- */
-
- nxt_work_queue_add(&task->thread->engine->fast_work_queue,
- nxt_router_app_process_request,
- &task->thread->engine->task, app, pending_ra);
-
- } else {
- nxt_request_app_link_use(task, pending_ra, -1);
- }
- }
-
- nxt_debug(task, "send quit to app '%V' pid %PI", &app->name, port->pid);
-
- nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
-
- nxt_port_use(task, port, unlinked ? -2 : -1);
-
-generate_error:
nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE);
@@ -5483,13 +4891,11 @@ generate_error:
}
-static nxt_int_t
-nxt_router_http_request_done(nxt_task_t *task, nxt_http_request_t *r)
+static void
+nxt_router_http_request_release_post(nxt_task_t *task, nxt_http_request_t *r)
{
r->timer.handler = nxt_router_http_request_release;
nxt_timer_add(task->thread->engine, &r->timer, 0);
-
- return NXT_OK;
}
@@ -5498,7 +4904,7 @@ nxt_router_http_request_release(nxt_task_t *task, void *obj, void *data)
{
nxt_http_request_t *r;
- nxt_debug(task, "http app release");
+ nxt_debug(task, "http request pool release");
r = nxt_timer_data(obj, nxt_http_request_t, timer);
@@ -5593,7 +4999,18 @@ nxt_router_get_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
nxt_assert(port->type == NXT_PROCESS_APP);
- mmaps = &port->process->outgoing;
+ if (nxt_slow_path(port->app == NULL)) {
+ nxt_alert(task, "get_mmap_handler: app == NULL for reply port %PI:%d",
+ port->pid, port->id);
+
+ // FIXME
+ nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR,
+ -1, msg->port_msg.stream, 0, NULL);
+
+ return;
+ }
+
+ mmaps = &port->app->outgoing;
nxt_thread_mutex_lock(&mmaps->mutex);
if (nxt_slow_path(get_mmap_msg->id >= mmaps->size)) {
@@ -5602,6 +5019,9 @@ nxt_router_get_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
nxt_alert(task, "get_mmap_handler: mmap id is too big (%d)",
(int) get_mmap_msg->id);
+ // FIXME
+ nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR,
+ -1, msg->port_msg.stream, 0, NULL);
return;
}
diff --git a/src/nxt_router.h b/src/nxt_router.h
index d8e93be6..7e0dc7d7 100644
--- a/src/nxt_router.h
+++ b/src/nxt_router.h
@@ -101,18 +101,20 @@ typedef struct {
struct nxt_app_s {
- nxt_thread_mutex_t mutex; /* Protects ports queue. */
- nxt_queue_t ports; /* of nxt_port_t.app_link */
+ nxt_thread_mutex_t mutex; /* Protects ports queue. */
+ nxt_queue_t ports; /* of nxt_port_t.app_link */
+ nxt_lvlhsh_t port_hash; /* of nxt_port_t */
nxt_queue_t spare_ports; /* of nxt_port_t.idle_link */
nxt_queue_t idle_ports; /* of nxt_port_t.idle_link */
nxt_work_t adjust_idle_work;
nxt_event_engine_t *engine;
- nxt_queue_t requests; /* of nxt_request_app_link_t */
- nxt_queue_t pending; /* of nxt_request_app_link_t */
nxt_str_t name;
+ uint32_t port_hash_count;
+
+ uint32_t active_requests;
uint32_t pending_processes;
uint32_t processes;
uint32_t idle_processes;
@@ -120,7 +122,6 @@ struct nxt_app_s {
uint32_t max_processes;
uint32_t spare_processes;
uint32_t max_pending_processes;
- uint32_t max_pending_responses;
uint32_t max_requests;
nxt_msec_t timeout;
@@ -139,6 +140,9 @@ struct nxt_app_s {
nxt_atomic_t use_count;
nxt_app_joint_t *joint;
+ nxt_port_t *shared_port;
+
+ nxt_port_mmaps_t outgoing;
};
diff --git a/src/nxt_router_request.h b/src/nxt_router_request.h
index a38980ee..1271520d 100644
--- a/src/nxt_router_request.h
+++ b/src/nxt_router_request.h
@@ -9,14 +9,12 @@
typedef struct nxt_msg_info_s {
nxt_buf_t *buf;
+ nxt_fd_t body_fd;
nxt_port_mmap_tracking_t tracking;
nxt_work_handler_t completion_handler;
} nxt_msg_info_t;
-typedef struct nxt_request_app_link_s nxt_request_app_link_t;
-
-
typedef enum {
NXT_APR_NEW_PORT,
NXT_APR_REQUEST_FAILED,
@@ -35,38 +33,9 @@ typedef struct {
nxt_http_request_t *request;
nxt_msg_info_t msg_info;
- nxt_request_app_link_t *req_app_link;
-} nxt_request_rpc_data_t;
-
-
-struct nxt_request_app_link_s {
- uint32_t stream;
- nxt_atomic_t use_count;
-
- nxt_port_t *app_port;
- nxt_apr_action_t apr_action;
-
- nxt_port_t *reply_port;
- nxt_http_request_t *request;
- nxt_msg_info_t msg_info;
- nxt_request_rpc_data_t *req_rpc_data;
- nxt_fd_t body_fd;
- nxt_nsec_t res_time;
-
- nxt_queue_link_t link_app_requests; /* for nxt_app_t.requests */
- /* for nxt_port_t.pending_requests */
- nxt_queue_link_t link_port_pending;
- nxt_queue_link_t link_app_pending; /* for nxt_app_t.pending */
- /* for nxt_port_t.active_websockets */
- nxt_queue_link_t link_port_websockets;
-
- nxt_mp_t *mem_pool;
- nxt_work_t work;
-
- int err_code;
- const char *err_str;
-};
+ nxt_bool_t rpc_cancel;
+} nxt_request_rpc_data_t;
#endif /* _NXT_ROUTER_REQUEST_H_INCLUDED_ */
diff --git a/src/nxt_unit.c b/src/nxt_unit.c
index b321a0d4..7fb2826d 100644
--- a/src/nxt_unit.c
+++ b/src/nxt_unit.c
@@ -38,8 +38,8 @@ typedef struct nxt_unit_websocket_frame_impl_s nxt_unit_websocket_frame_impl_t;
static nxt_unit_impl_t *nxt_unit_create(nxt_unit_init_t *init);
static int nxt_unit_ctx_init(nxt_unit_impl_t *lib,
nxt_unit_ctx_impl_t *ctx_impl, void *data);
-nxt_inline void nxt_unit_ctx_use(nxt_unit_ctx_impl_t *ctx_impl);
-nxt_inline void nxt_unit_ctx_release(nxt_unit_ctx_impl_t *ctx_impl);
+nxt_inline void nxt_unit_ctx_use(nxt_unit_ctx_t *ctx);
+nxt_inline void nxt_unit_ctx_release(nxt_unit_ctx_t *ctx);
nxt_inline void nxt_unit_lib_use(nxt_unit_impl_t *lib);
nxt_inline void nxt_unit_lib_release(nxt_unit_impl_t *lib);
nxt_inline void nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
@@ -58,6 +58,7 @@ static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx,
nxt_unit_recv_msg_t *recv_msg);
static int nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
nxt_unit_port_id_t *port_id);
+static int nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req);
static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx,
nxt_unit_recv_msg_t *recv_msg);
static int nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx);
@@ -122,9 +123,12 @@ static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_impl_t *lib,
static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_impl_t *lib,
pid_t pid, int remove);
static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib);
-static void nxt_unit_read_buf(nxt_unit_ctx_t *ctx,
- nxt_unit_read_buf_t *rbuf);
-static void nxt_unit_process_ready_req(nxt_unit_ctx_impl_t *ctx_impl);
+static int nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx);
+static int nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf);
+static int nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx);
+static void nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx);
+static int nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx,
+ nxt_unit_port_t *port);
static void nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl);
static nxt_unit_port_t *nxt_unit_create_port(nxt_unit_ctx_t *ctx);
@@ -150,9 +154,8 @@ static ssize_t nxt_unit_port_send(nxt_unit_ctx_t *ctx,
const void *oob, size_t oob_size);
static ssize_t nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd,
const void *buf, size_t buf_size, const void *oob, size_t oob_size);
-static ssize_t nxt_unit_port_recv(nxt_unit_ctx_t *ctx,
- nxt_unit_port_t *port, void *buf, size_t buf_size,
- void *oob, size_t oob_size);
+static int nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
+ nxt_unit_read_buf_t *rbuf);
static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash,
nxt_unit_port_t *port);
@@ -308,6 +311,7 @@ struct nxt_unit_impl_s {
nxt_lvlhsh_t ports; /* of nxt_unit_port_impl_t */
nxt_unit_port_t *router_port;
+ nxt_unit_port_t *shared_port;
nxt_queue_t contexts; /* of nxt_unit_ctx_impl_t */
@@ -452,7 +456,7 @@ nxt_unit_init(nxt_unit_init_t *init)
fail:
- nxt_unit_ctx_release(&lib->main_ctx);
+ nxt_unit_ctx_release(&lib->main_ctx.ctx);
return NULL;
}
@@ -496,6 +500,7 @@ nxt_unit_create(nxt_unit_init_t *init)
lib->use_count = 0;
lib->router_port = NULL;
+ lib->shared_port = NULL;
rc = nxt_unit_ctx_init(lib, &lib->main_ctx, init->ctx_data);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
@@ -570,16 +575,23 @@ nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl,
nxt_inline void
-nxt_unit_ctx_use(nxt_unit_ctx_impl_t *ctx_impl)
+nxt_unit_ctx_use(nxt_unit_ctx_t *ctx)
{
+ nxt_unit_ctx_impl_t *ctx_impl;
+
+ ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
+
nxt_atomic_fetch_add(&ctx_impl->use_count, 1);
}
nxt_inline void
-nxt_unit_ctx_release(nxt_unit_ctx_impl_t *ctx_impl)
+nxt_unit_ctx_release(nxt_unit_ctx_t *ctx)
{
- long c;
+ long c;
+ nxt_unit_ctx_impl_t *ctx_impl;
+
+ ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
c = nxt_atomic_fetch_add(&ctx_impl->use_count, -1);
@@ -624,6 +636,10 @@ nxt_unit_lib_release(nxt_unit_impl_t *lib)
nxt_unit_port_release(lib->router_port);
}
+ if (nxt_fast_path(lib->shared_port != NULL)) {
+ nxt_unit_port_release(lib->shared_port);
+ }
+
free(lib);
}
}
@@ -805,6 +821,15 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
recv_msg.incoming_buf = NULL;
if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) {
+ if (nxt_slow_path(rbuf->size == 0)) {
+ nxt_unit_debug(ctx, "read port closed");
+
+ nxt_unit_quit(ctx);
+ rc = NXT_UNIT_OK;
+
+ goto fail;
+ }
+
nxt_unit_alert(ctx, "message too small (%d bytes)", (int) rbuf->size);
goto fail;
}
@@ -946,6 +971,13 @@ fail:
nxt_unit_process_release(recv_msg.process);
}
+ if (nxt_fast_path(rc != NXT_UNIT_AGAIN)) {
+#if (NXT_DEBUG)
+ memset(rbuf->buf, 0xAC, rbuf->size);
+#endif
+ nxt_unit_read_buf_release(ctx, rbuf);
+ }
+
return rc;
}
@@ -954,6 +986,7 @@ static int
nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
{
int nb;
+ nxt_unit_impl_t *lib;
nxt_unit_port_t new_port, *port;
nxt_port_msg_new_port_t *new_port_msg;
@@ -978,21 +1011,33 @@ nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
recv_msg->stream, (int) new_port_msg->pid,
(int) new_port_msg->id, recv_msg->fd);
- nb = 0;
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+
+ if (new_port_msg->id == (nxt_port_id_t) -1) {
+ nxt_unit_port_id_init(&new_port.id, lib->pid, new_port_msg->id);
- if (nxt_slow_path(ioctl(recv_msg->fd, FIONBIO, &nb) == -1)) {
- nxt_unit_alert(ctx, "#%"PRIu32": new_port: ioctl(%d, FIONBIO, 0) "
- "failed: %s (%d)",
- recv_msg->stream, recv_msg->fd, strerror(errno), errno);
+ new_port.in_fd = recv_msg->fd;
+ new_port.out_fd = -1;
- return NXT_UNIT_ERROR;
+ } else {
+ nb = 0;
+
+ if (nxt_slow_path(ioctl(recv_msg->fd, FIONBIO, &nb) == -1)) {
+ nxt_unit_alert(ctx, "#%"PRIu32": new_port: ioctl(%d, FIONBIO, 0) "
+ "failed: %s (%d)",
+ recv_msg->stream, recv_msg->fd, strerror(errno), errno);
+
+ return NXT_UNIT_ERROR;
+ }
+
+ nxt_unit_port_id_init(&new_port.id, new_port_msg->pid,
+ new_port_msg->id);
+
+ new_port.in_fd = -1;
+ new_port.out_fd = recv_msg->fd;
}
- nxt_unit_port_id_init(&new_port.id, new_port_msg->pid,
- new_port_msg->id);
- new_port.in_fd = -1;
- new_port.out_fd = recv_msg->fd;
new_port.data = NULL;
recv_msg->fd = -1;
@@ -1002,7 +1047,12 @@ nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
return NXT_UNIT_ERROR;
}
- nxt_unit_port_release(port);
+ if (new_port_msg->id == (nxt_port_id_t) -1) {
+ lib->shared_port = port;
+
+ } else {
+ nxt_unit_port_release(port);
+ }
return NXT_UNIT_OK;
}
@@ -1102,6 +1152,11 @@ nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
}
if (nxt_fast_path(res == NXT_UNIT_OK)) {
+ res = nxt_unit_send_req_headers_ack(req);
+ if (nxt_slow_path(res != NXT_UNIT_OK)) {
+ return res;
+ }
+
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
lib->callbacks.request_handler(req);
@@ -1221,6 +1276,36 @@ nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
static int
+nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req)
+{
+ ssize_t res;
+ nxt_port_msg_t msg;
+ nxt_unit_impl_t *lib;
+ nxt_unit_ctx_impl_t *ctx_impl;
+ nxt_unit_request_info_impl_t *req_impl;
+
+ lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit);
+ ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx);
+ req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
+
+ memset(&msg, 0, sizeof(nxt_port_msg_t));
+
+ msg.stream = req_impl->stream;
+ msg.pid = lib->pid;
+ msg.reply_port = ctx_impl->read_port->id.id;
+ msg.type = _NXT_PORT_MSG_REQ_HEADERS_ACK;
+
+ res = nxt_unit_port_send(req->ctx, req->response_port,
+ &msg, sizeof(msg), NULL, 0);
+ if (nxt_slow_path(res != sizeof(msg))) {
+ return NXT_UNIT_ERROR;
+ }
+
+ return NXT_UNIT_OK;
+}
+
+
+static int
nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
{
size_t hsize;
@@ -3267,7 +3352,9 @@ nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx)
return NXT_UNIT_ERROR;
}
- nxt_unit_read_buf(ctx, rbuf);
+ memset(rbuf->oob, 0, sizeof(struct cmsghdr));
+
+ nxt_unit_port_recv(ctx, ctx_impl->read_port, rbuf);
if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) {
nxt_unit_read_buf_release(ctx, rbuf);
@@ -4218,26 +4305,23 @@ nxt_unit_process_pop_first(nxt_unit_impl_t *lib)
int
nxt_unit_run(nxt_unit_ctx_t *ctx)
{
- int rc;
- nxt_unit_impl_t *lib;
- nxt_unit_ctx_impl_t *ctx_impl;
-
- ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
+ int rc;
+ nxt_unit_impl_t *lib;
- nxt_unit_ctx_use(ctx_impl);
+ nxt_unit_ctx_use(ctx);
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
rc = NXT_UNIT_OK;
while (nxt_fast_path(lib->online)) {
- rc = nxt_unit_run_once(ctx);
+ rc = nxt_unit_run_once_impl(ctx);
- if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
break;
}
}
- nxt_unit_ctx_release(ctx_impl);
+ nxt_unit_ctx_release(ctx);
return rc;
}
@@ -4246,109 +4330,163 @@ nxt_unit_run(nxt_unit_ctx_t *ctx)
int
nxt_unit_run_once(nxt_unit_ctx_t *ctx)
{
- int rc;
- nxt_queue_link_t *link;
- nxt_unit_ctx_impl_t *ctx_impl;
- nxt_unit_read_buf_t *rbuf;
+ int rc;
- ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
+ nxt_unit_ctx_use(ctx);
- nxt_unit_ctx_use(ctx_impl);
+ rc = nxt_unit_run_once_impl(ctx);
- pthread_mutex_lock(&ctx_impl->mutex);
+ nxt_unit_ctx_release(ctx);
- if (!nxt_queue_is_empty(&ctx_impl->pending_rbuf)) {
+ return rc;
+}
-next_pending:
- link = nxt_queue_first(&ctx_impl->pending_rbuf);
- nxt_queue_remove(link);
+static int
+nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx)
+{
+ int rc;
+ nxt_unit_read_buf_t *rbuf;
- rbuf = nxt_container_of(link, nxt_unit_read_buf_t, link);
+ rbuf = nxt_unit_read_buf_get(ctx);
+ if (nxt_slow_path(rbuf == NULL)) {
+ return NXT_UNIT_ERROR;
+ }
- pthread_mutex_unlock(&ctx_impl->mutex);
+ rc = nxt_unit_read_buf(ctx, rbuf);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ nxt_unit_read_buf_release(ctx, rbuf);
- } else {
- rbuf = nxt_unit_read_buf_get_impl(ctx_impl);
+ return rc;
+ }
- pthread_mutex_unlock(&ctx_impl->mutex);
+ rc = nxt_unit_process_msg(ctx, rbuf);
+ if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
+ return NXT_UNIT_ERROR;
+ }
- if (nxt_slow_path(rbuf == NULL)) {
+ rc = nxt_unit_process_pending_rbuf(ctx);
+ if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
+ return NXT_UNIT_ERROR;
+ }
- nxt_unit_ctx_release(ctx_impl);
+ nxt_unit_process_ready_req(ctx);
- return NXT_UNIT_ERROR;
- }
+ return rc;
+}
- nxt_unit_read_buf(ctx, rbuf);
- }
- if (nxt_fast_path(rbuf->size > 0)) {
- rc = nxt_unit_process_msg(ctx, rbuf);
+static int
+nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
+{
+ int res, err;
+ nxt_unit_impl_t *lib;
+ nxt_unit_ctx_impl_t *ctx_impl;
+ struct pollfd fds[2];
-#if (NXT_DEBUG)
- if (nxt_fast_path(rc != NXT_UNIT_AGAIN)) {
- memset(rbuf->buf, 0xAC, rbuf->size);
- }
-#endif
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+ ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
- } else {
- rc = NXT_UNIT_ERROR;
+ memset(rbuf->oob, 0, sizeof(struct cmsghdr));
+
+ if (ctx_impl->wait_items > 0 || lib->shared_port == NULL) {
+ return nxt_unit_port_recv(ctx, ctx_impl->read_port, rbuf);
}
- if (nxt_slow_path(rc == NXT_UNIT_AGAIN)) {
- rc = NXT_UNIT_OK;
+retry:
- } else {
- nxt_unit_read_buf_release(ctx, rbuf);
- }
+ fds[0].fd = ctx_impl->read_port->in_fd;
+ fds[0].events = POLLIN;
+ fds[0].revents = 0;
- if (nxt_slow_path(rc == NXT_UNIT_CANCELLED)) {
- rc = NXT_UNIT_OK;
- }
+ fds[1].fd = lib->shared_port->in_fd;
+ fds[1].events = POLLIN;
+ fds[1].revents = 0;
- if (nxt_fast_path(rc == NXT_UNIT_OK)) {
- pthread_mutex_lock(&ctx_impl->mutex);
+ res = poll(fds, 2, -1);
+ if (nxt_slow_path(res < 0)) {
+ err = errno;
- if (!nxt_queue_is_empty(&ctx_impl->pending_rbuf)) {
- goto next_pending;
+ if (err == EINTR) {
+ goto retry;
}
- pthread_mutex_unlock(&ctx_impl->mutex);
+ nxt_unit_alert(ctx, "poll() failed: %s (%d)",
+ strerror(err), err);
- nxt_unit_process_ready_req(ctx_impl);
+ rbuf->size = -1;
+
+ return (err == EAGAIN) ? NXT_UNIT_AGAIN : NXT_UNIT_ERROR;
}
- nxt_unit_ctx_release(ctx_impl);
+ if ((fds[0].revents & POLLIN) != 0) {
+ return nxt_unit_port_recv(ctx, ctx_impl->read_port, rbuf);
+ }
- return rc;
+ if ((fds[1].revents & POLLIN) != 0) {
+ return nxt_unit_port_recv(ctx, lib->shared_port, rbuf);
+ }
+
+ rbuf->size = -1;
+
+ return NXT_UNIT_ERROR;
}
-static void
-nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
+static int
+nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx)
{
+ int rc;
+ nxt_queue_t pending_rbuf;
nxt_unit_ctx_impl_t *ctx_impl;
+ nxt_unit_read_buf_t *rbuf;
+
+ nxt_queue_init(&pending_rbuf);
ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
- memset(rbuf->oob, 0, sizeof(struct cmsghdr));
+ pthread_mutex_lock(&ctx_impl->mutex);
+
+ if (nxt_queue_is_empty(&ctx_impl->pending_rbuf)) {
+ pthread_mutex_unlock(&ctx_impl->mutex);
+
+ return NXT_UNIT_OK;
+ }
+
+ nxt_queue_add(&pending_rbuf, &ctx_impl->pending_rbuf);
+ nxt_queue_init(&ctx_impl->pending_rbuf);
- rbuf->size = nxt_unit_port_recv(ctx, ctx_impl->read_port,
- rbuf->buf, sizeof(rbuf->buf),
- rbuf->oob, sizeof(rbuf->oob));
+ pthread_mutex_unlock(&ctx_impl->mutex);
+
+ rc = NXT_UNIT_OK;
+
+ nxt_queue_each(rbuf, &pending_rbuf, nxt_unit_read_buf_t, link) {
+
+ if (nxt_fast_path(rc != NXT_UNIT_ERROR)) {
+ rc = nxt_unit_process_msg(&ctx_impl->ctx, rbuf);
+
+ } else {
+ nxt_unit_read_buf_release(ctx, rbuf);
+ }
+
+ } nxt_queue_loop;
+
+ return rc;
}
static void
-nxt_unit_process_ready_req(nxt_unit_ctx_impl_t *ctx_impl)
+nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx)
{
nxt_queue_t ready_req;
nxt_unit_impl_t *lib;
+ nxt_unit_ctx_impl_t *ctx_impl;
nxt_unit_request_info_impl_t *req_impl;
nxt_queue_init(&ready_req);
+ ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
+
pthread_mutex_lock(&ctx_impl->mutex);
if (nxt_queue_is_empty(&ctx_impl->ready_req)) {
@@ -4367,20 +4505,121 @@ nxt_unit_process_ready_req(nxt_unit_ctx_impl_t *ctx_impl)
{
lib = nxt_container_of(ctx_impl->ctx.unit, nxt_unit_impl_t, unit);
+ (void) nxt_unit_send_req_headers_ack(&req_impl->req);
+
lib->callbacks.request_handler(&req_impl->req);
} nxt_queue_loop;
}
-void
-nxt_unit_done(nxt_unit_ctx_t *ctx)
+int
+nxt_unit_run_ctx(nxt_unit_ctx_t *ctx)
{
+ int rc;
+ nxt_unit_impl_t *lib;
nxt_unit_ctx_impl_t *ctx_impl;
+ nxt_unit_ctx_use(ctx);
+
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
- nxt_unit_ctx_release(ctx_impl);
+ rc = NXT_UNIT_OK;
+
+ while (nxt_fast_path(lib->online)) {
+ rc = nxt_unit_process_port_msg_impl(ctx, ctx_impl->read_port);
+
+ if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
+ break;
+ }
+ }
+
+ nxt_unit_ctx_release(ctx);
+
+ return rc;
+}
+
+
+int
+nxt_unit_run_shared(nxt_unit_ctx_t *ctx)
+{
+ int rc;
+ nxt_unit_impl_t *lib;
+
+ nxt_unit_ctx_use(ctx);
+
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+ rc = NXT_UNIT_OK;
+
+ while (nxt_fast_path(lib->online)) {
+ rc = nxt_unit_process_port_msg_impl(ctx, lib->shared_port);
+
+ if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
+ break;
+ }
+ }
+
+ nxt_unit_ctx_release(ctx);
+
+ return rc;
+}
+
+
+int
+nxt_unit_process_port_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
+{
+ int rc;
+
+ nxt_unit_ctx_use(ctx);
+
+ rc = nxt_unit_process_port_msg_impl(ctx, port);
+
+ nxt_unit_ctx_release(ctx);
+
+ return rc;
+}
+
+
+static int
+nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
+{
+ int rc;
+ nxt_unit_read_buf_t *rbuf;
+
+ rbuf = nxt_unit_read_buf_get(ctx);
+ if (nxt_slow_path(rbuf == NULL)) {
+ return NXT_UNIT_ERROR;
+ }
+
+ memset(rbuf->oob, 0, sizeof(struct cmsghdr));
+
+ rc = nxt_unit_port_recv(ctx, port, rbuf);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ nxt_unit_read_buf_release(ctx, rbuf);
+ return rc;
+ }
+
+ rc = nxt_unit_process_msg(ctx, rbuf);
+ if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
+ return NXT_UNIT_ERROR;
+ }
+
+ rc = nxt_unit_process_pending_rbuf(ctx);
+ if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
+ return NXT_UNIT_ERROR;
+ }
+
+ nxt_unit_process_ready_req(ctx);
+
+ return rc;
+}
+
+
+void
+nxt_unit_done(nxt_unit_ctx_t *ctx)
+{
+ nxt_unit_ctx_release(ctx);
}
@@ -5056,12 +5295,11 @@ retry:
}
-static ssize_t
+static int
nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
- void *buf, size_t buf_size, void *oob, size_t oob_size)
+ nxt_unit_read_buf_t *rbuf)
{
- int fd;
- ssize_t res;
+ int fd, err;
struct iovec iov[1];
struct msghdr msg;
nxt_unit_impl_t *lib;
@@ -5069,40 +5307,57 @@ nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
if (lib->callbacks.port_recv != NULL) {
- return lib->callbacks.port_recv(ctx, port,
- buf, buf_size, oob, oob_size);
+ rbuf->size = lib->callbacks.port_recv(ctx, port,
+ rbuf->buf, sizeof(rbuf->buf),
+ rbuf->oob, sizeof(rbuf->oob));
+
+ if (nxt_slow_path(rbuf->size < 0)) {
+ return NXT_UNIT_ERROR;
+ }
+
+ return NXT_UNIT_OK;
}
- iov[0].iov_base = buf;
- iov[0].iov_len = buf_size;
+ iov[0].iov_base = rbuf->buf;
+ iov[0].iov_len = sizeof(rbuf->buf);
msg.msg_name = NULL;
msg.msg_namelen = 0;
msg.msg_iov = iov;
msg.msg_iovlen = 1;
msg.msg_flags = 0;
- msg.msg_control = oob;
- msg.msg_controllen = oob_size;
+ msg.msg_control = rbuf->oob;
+ msg.msg_controllen = sizeof(rbuf->oob);
fd = port->in_fd;
retry:
- res = recvmsg(fd, &msg, 0);
+ rbuf->size = recvmsg(fd, &msg, 0);
- if (nxt_slow_path(res == -1)) {
- if (errno == EINTR) {
+ if (nxt_slow_path(rbuf->size == -1)) {
+ err = errno;
+
+ if (err == EINTR) {
goto retry;
}
+ if (err == EAGAIN) {
+ nxt_unit_debug(ctx, "recvmsg(%d) failed: %s (%d)",
+ fd, strerror(errno), errno);
+
+ return NXT_UNIT_AGAIN;
+ }
+
nxt_unit_alert(ctx, "recvmsg(%d) failed: %s (%d)",
fd, strerror(errno), errno);
- } else {
- nxt_unit_debug(ctx, "recvmsg(%d): %d", fd, (int) res);
+ return NXT_UNIT_ERROR;
}
- return res;
+ nxt_unit_debug(ctx, "recvmsg(%d): %d", fd, (int) rbuf->size);
+
+ return NXT_UNIT_OK;
}
diff --git a/src/nxt_unit.h b/src/nxt_unit.h
index 79157f5f..0f16773f 100644
--- a/src/nxt_unit.h
+++ b/src/nxt_unit.h
@@ -202,8 +202,21 @@ nxt_unit_ctx_t *nxt_unit_init(nxt_unit_init_t *);
*/
int nxt_unit_run(nxt_unit_ctx_t *);
+int nxt_unit_run_ctx(nxt_unit_ctx_t *ctx);
+
+int nxt_unit_run_shared(nxt_unit_ctx_t *ctx);
+
+/*
+ * Receive and process one message, invoke configured callbacks.
+ *
+ * If application implements it's own event loop, each datagram received
+ * from port socket should be initially processed by unit. This function
+ * may invoke other application-defined callback for message processing.
+ */
int nxt_unit_run_once(nxt_unit_ctx_t *ctx);
+int nxt_unit_process_port_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port);
+
/* Destroy application library object. */
void nxt_unit_done(nxt_unit_ctx_t *);