summaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/nxt_unit.c75
-rw-r--r--src/nxt_unit.h2
2 files changed, 65 insertions, 12 deletions
diff --git a/src/nxt_unit.c b/src/nxt_unit.c
index f0c68374..44525d04 100644
--- a/src/nxt_unit.c
+++ b/src/nxt_unit.c
@@ -54,12 +54,13 @@ static int nxt_unit_read_env(nxt_unit_port_t *ready_port,
int *log_fd, uint32_t *stream, uint32_t *shm_limit);
static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream,
int queue_fd);
-static int nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf);
+static int nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf,
+ nxt_unit_request_info_t **preq);
static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx,
nxt_unit_recv_msg_t *recv_msg);
static int nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx);
static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx,
- nxt_unit_recv_msg_t *recv_msg);
+ nxt_unit_recv_msg_t *recv_msg, nxt_unit_request_info_t **preq);
static int nxt_unit_process_req_body(nxt_unit_ctx_t *ctx,
nxt_unit_recv_msg_t *recv_msg);
static int nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
@@ -904,7 +905,8 @@ nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream, int queue_fd)
static int
-nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
+nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf,
+ nxt_unit_request_info_t **preq)
{
int rc;
pid_t pid;
@@ -1040,7 +1042,7 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
break;
case _NXT_PORT_MSG_REQ_HEADERS:
- rc = nxt_unit_process_req_headers(ctx, &recv_msg);
+ rc = nxt_unit_process_req_headers(ctx, &recv_msg, preq);
break;
case _NXT_PORT_MSG_REQ_BODY:
@@ -1213,7 +1215,8 @@ nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx)
static int
-nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
+nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg,
+ nxt_unit_request_info_t **preq)
{
int res;
nxt_unit_impl_t *lib;
@@ -1329,7 +1332,12 @@ nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
}
}
- lib->callbacks.request_handler(req);
+ if (preq == NULL) {
+ lib->callbacks.request_handler(req);
+
+ } else {
+ *preq = req;
+ }
}
return NXT_UNIT_OK;
@@ -2179,7 +2187,8 @@ nxt_unit_response_add_field(nxt_unit_request_info_t *req,
resp = req->response;
if (nxt_slow_path(resp->fields_count >= req->response_max_fields)) {
- nxt_unit_req_warn(req, "add_field: too many response fields");
+ nxt_unit_req_warn(req, "add_field: too many response fields (%d)",
+ (int) resp->fields_count);
return NXT_UNIT_ERROR;
}
@@ -2356,6 +2365,8 @@ nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size)
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
nxt_unit_mmap_buf_release(mmap_buf);
+ nxt_unit_req_alert(req, "response_buf_alloc: failed to get out buf");
+
return NULL;
}
@@ -4537,7 +4548,7 @@ nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx)
return rc;
}
- rc = nxt_unit_process_msg(ctx, rbuf);
+ rc = nxt_unit_process_msg(ctx, rbuf, NULL);
if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
return NXT_UNIT_ERROR;
}
@@ -4686,7 +4697,7 @@ nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx)
nxt_queue_each(rbuf, &pending_rbuf, nxt_unit_read_buf_t, link) {
if (nxt_fast_path(rc != NXT_UNIT_ERROR)) {
- rc = nxt_unit_process_msg(&ctx_impl->ctx, rbuf);
+ rc = nxt_unit_process_msg(&ctx_impl->ctx, rbuf, NULL);
} else {
nxt_unit_read_buf_release(ctx, rbuf);
@@ -4793,7 +4804,7 @@ nxt_unit_run_ctx(nxt_unit_ctx_t *ctx)
goto retry;
}
- rc = nxt_unit_process_msg(ctx, rbuf);
+ rc = nxt_unit_process_msg(ctx, rbuf, NULL);
if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
break;
}
@@ -4902,7 +4913,7 @@ nxt_unit_run_shared(nxt_unit_ctx_t *ctx)
break;
}
- rc = nxt_unit_process_msg(ctx, rbuf);
+ rc = nxt_unit_process_msg(ctx, rbuf, NULL);
if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
break;
}
@@ -4921,6 +4932,46 @@ nxt_unit_run_shared(nxt_unit_ctx_t *ctx)
}
+nxt_unit_request_info_t *
+nxt_unit_dequeue_request(nxt_unit_ctx_t *ctx)
+{
+ int rc;
+ nxt_unit_impl_t *lib;
+ nxt_unit_read_buf_t *rbuf;
+ nxt_unit_ctx_impl_t *ctx_impl;
+ nxt_unit_request_info_t *req;
+
+ nxt_unit_ctx_use(ctx);
+
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+ ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
+
+ req = NULL;
+
+ if (nxt_slow_path(!ctx_impl->online)) {
+ goto done;
+ }
+
+ rbuf = nxt_unit_read_buf_get(ctx);
+ if (nxt_slow_path(rbuf == NULL)) {
+ goto done;
+ }
+
+ rc = nxt_unit_app_queue_recv(lib->shared_port, rbuf);
+ if (rc != NXT_UNIT_OK) {
+ goto done;
+ }
+
+ (void) nxt_unit_process_msg(ctx, rbuf, &req);
+
+done:
+
+ nxt_unit_ctx_release(ctx);
+
+ return req;
+}
+
+
int
nxt_unit_is_main_ctx(nxt_unit_ctx_t *ctx)
{
@@ -4977,7 +5028,7 @@ retry:
return rc;
}
- rc = nxt_unit_process_msg(ctx, rbuf);
+ rc = nxt_unit_process_msg(ctx, rbuf, NULL);
if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
return NXT_UNIT_ERROR;
}
diff --git a/src/nxt_unit.h b/src/nxt_unit.h
index 90cba2a3..1e1a8dbe 100644
--- a/src/nxt_unit.h
+++ b/src/nxt_unit.h
@@ -213,6 +213,8 @@ int nxt_unit_run_ctx(nxt_unit_ctx_t *ctx);
int nxt_unit_run_shared(nxt_unit_ctx_t *ctx);
+nxt_unit_request_info_t *nxt_unit_dequeue_request(nxt_unit_ctx_t *ctx);
+
int nxt_unit_is_main_ctx(nxt_unit_ctx_t *ctx);
/*