summaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/nxt_application.c1079
-rw-r--r--src/nxt_application.h192
-rw-r--r--src/nxt_master_process.c2
-rw-r--r--src/nxt_master_process.h1
-rw-r--r--src/nxt_runtime.h2
-rw-r--r--src/nxt_worker_process.c9
6 files changed, 520 insertions, 765 deletions
diff --git a/src/nxt_application.c b/src/nxt_application.c
index f3d014cb..d7393de7 100644
--- a/src/nxt_application.c
+++ b/src/nxt_application.c
@@ -1,5 +1,6 @@
/*
+ * Copyright (C) Max Romanov
* Copyright (C) Igor Sysoev
* Copyright (C) Valentin V. Bartenev
* Copyright (C) NGINX, Inc.
@@ -10,83 +11,17 @@
#include <nxt_application.h>
-#define NXT_PARSE_AGAIN (u_char *) -1
-
-
-static nxt_int_t nxt_app_listen_socket(nxt_task_t *task, nxt_runtime_t *rt);
-static void nxt_app_thread(void *ctx);
-static nxt_app_request_t *nxt_app_request_create(nxt_socket_t s,
- nxt_log_t *log);
-static void nxt_app_conn_update(nxt_thread_t *thr, nxt_conn_t *c,
- nxt_log_t *log);
-static nxt_int_t nxt_app_write_finish(nxt_app_request_t *r);
-static void nxt_app_buf_send(nxt_conn_t *c, nxt_buf_t *out);
-static void nxt_app_delivery_handler(nxt_task_t *task, void *obj, void *data);
-static void nxt_app_delivery_ready(nxt_task_t *task, void *obj, void *data);
-static void nxt_app_delivery_completion(nxt_task_t *task, void *obj,
- void *data);
-static void nxt_app_delivery_error(nxt_task_t *task, void *obj, void *data);
-static void nxt_app_delivery_timeout(nxt_task_t *task, void *obj, void *data);
-static nxt_msec_t nxt_app_delivery_timer_value(nxt_conn_t *c,
- uintptr_t data);
-static void nxt_app_delivery_done(nxt_task_t *task, nxt_conn_t *c);
-static void nxt_app_close_request(nxt_task_t *task, void *obj, void *data);
-
-
-typedef struct nxt_app_http_parse_state_s nxt_app_http_parse_state_t;
-
-struct nxt_app_http_parse_state_s {
- u_char *pos;
- nxt_int_t (*handler)(nxt_app_request_header_t *h, u_char *start,
- u_char *end, nxt_app_http_parse_state_t *state);
-};
-
-
-typedef struct {
- nxt_work_t work;
- nxt_buf_t buf;
-} nxt_app_buf_t;
-
-
-static nxt_int_t nxt_app_http_parse_request(nxt_app_request_t *r, u_char *buf,
- size_t size);
-static nxt_int_t nxt_app_http_parse_request_line(nxt_app_request_header_t *h,
- u_char *start, u_char *end, nxt_app_http_parse_state_t *state);
-static nxt_int_t nxt_app_http_parse_field_value(nxt_app_request_header_t *h,
- u_char *start, u_char *end, nxt_app_http_parse_state_t *state);
-static nxt_int_t nxt_app_http_parse_field_name(nxt_app_request_header_t *h,
- u_char *start, u_char *end, nxt_app_http_parse_state_t *state);
-
-static nxt_int_t nxt_app_http_process_headers(nxt_app_request_t *r);
-
-
-static const nxt_event_conn_state_t nxt_app_delivery_write_state;
-
nxt_application_module_t *nxt_app;
static nxt_thread_mutex_t nxt_app_mutex;
static nxt_thread_cond_t nxt_app_cond;
-static nxt_buf_t *nxt_app_buf_free;
-static nxt_buf_t *nxt_app_buf_done;
-
-static nxt_event_engine_t *nxt_app_engine;
-static nxt_mp_t *nxt_app_mem_pool;
-
-static nxt_uint_t nxt_app_buf_current_number;
-static nxt_uint_t nxt_app_buf_max_number = 16;
-
+static nxt_http_fields_hash_entry_t nxt_app_request_fields[];
+static nxt_http_fields_hash_t *nxt_app_request_fields_hash;
nxt_int_t
nxt_app_start(nxt_task_t *task, nxt_runtime_t *rt)
{
- nxt_thread_link_t *link;
- nxt_thread_handle_t handle;
-
- if (nxt_app_listen_socket(task, rt) != NXT_OK) {
- return NXT_ERROR;
- }
-
if (nxt_slow_path(nxt_thread_mutex_create(&nxt_app_mutex) != NXT_OK)) {
return NXT_ERROR;
}
@@ -95,901 +30,551 @@ nxt_app_start(nxt_task_t *task, nxt_runtime_t *rt)
return NXT_ERROR;
}
- link = nxt_zalloc(sizeof(nxt_thread_link_t));
-
- if (nxt_fast_path(link != NULL)) {
- link->start = nxt_app_thread;
- link->work.data = rt;
-
- return nxt_thread_create(&handle, link);
+ if (nxt_slow_path(nxt_app->init(task) != NXT_OK)) {
+ nxt_debug(task, "application init failed");
}
- return NXT_ERROR;
+ return NXT_OK;
}
-static nxt_int_t
-nxt_app_listen_socket(nxt_task_t *task, nxt_runtime_t *rt)
+nxt_int_t
+nxt_app_http_init(nxt_task_t *task, nxt_runtime_t *rt)
{
- nxt_sockaddr_t *sa;
- nxt_listen_socket_t *ls;
-
- sa = nxt_sockaddr_alloc(rt->mem_pool, sizeof(struct sockaddr_in),
- NXT_INET_ADDR_STR_LEN);
- if (sa == NULL) {
- return NXT_ERROR;
- }
-
- sa->type = SOCK_STREAM;
- sa->u.sockaddr_in.sin_family = AF_INET;
- sa->u.sockaddr_in.sin_port = htons(8080);
-
- nxt_sockaddr_text(sa);
+ nxt_http_fields_hash_t *hash;
- ls = nxt_runtime_listen_socket_add(rt, sa);
- if (ls == NULL) {
+ hash = nxt_http_fields_hash_create(nxt_app_request_fields, rt->mem_pool);
+ if (nxt_slow_path(hash == NULL)) {
return NXT_ERROR;
}
- ls->read_after_accept = 1;
-
- if (nxt_listen_socket_create(task, ls, 0) != NXT_OK) {
- return NXT_ERROR;
- }
+ nxt_app_request_fields_hash = hash;
return NXT_OK;
}
-#define SIZE 4096
-
-static void
-nxt_app_thread(void *ctx)
+void
+nxt_port_app_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
{
- ssize_t n;
- nxt_err_t err;
- nxt_socket_t s;
- nxt_thread_t *thr;
- nxt_runtime_t *rt;
- nxt_queue_link_t *link;
- nxt_app_request_t *r;
- nxt_listen_socket_t *ls;
- u_char buf[SIZE];
- const size_t size = SIZE;
- nxt_app_header_field_t fields[128];
-
- thr = nxt_thread();
-
- nxt_log_debug(thr->log, "app thread");
-
- rt = ctx;
-
- link = nxt_queue_first(&rt->engines);
- nxt_app_engine = nxt_queue_link_data(link, nxt_event_engine_t, link);
-
- nxt_app_mem_pool = nxt_mp_create(1024, 128, 256, 32);
- if (nxt_slow_path(nxt_app_mem_pool == NULL)) {
- return;
- }
-
- if (nxt_slow_path(nxt_app->init(thr) != NXT_OK)) {
- nxt_log_debug(thr->log, "application init failed");
- }
-
- ls = rt->listen_sockets->elts;
-
- for ( ;; ) {
- nxt_log_debug(thr->log, "wait on accept");
-
- s = accept(ls->socket, NULL, NULL);
-
- nxt_thread_time_update(thr);
-
- if (nxt_slow_path(s == -1)) {
- err = nxt_socket_errno;
-
- nxt_log_error(NXT_LOG_ERR, thr->log, "accept(%d) failed %E",
- ls->socket, err);
-
- if (err == EBADF) {
- /* STUB: ls->socket has been closed on exit. */
- return;
- }
-
- continue;
- }
-
- nxt_log_debug(thr->log, "accept(%d): %d", ls->socket, s);
-
- n = recv(s, buf, size, 0);
-
- if (nxt_slow_path(n <= 0)) {
- err = (n == 0) ? 0 : nxt_socket_errno;
-
- nxt_log_error(NXT_LOG_ERR, thr->log, "recv(%d, %uz) failed %E",
- s, size, err);
- close(s);
- continue;
- }
-
- nxt_log_debug(thr->log, "recv(%d, %uz): %z", s, size, n);
-
- r = nxt_app_request_create(s, thr->log);
- if (nxt_slow_path(r == NULL)) {
- goto fail;
- }
-
- r->header.fields = fields;
-
- //nxt_app->start(r);
-
- if (nxt_app_http_parse_request(r, buf, n) != NXT_OK) {
- nxt_log_debug(thr->log, "nxt_app_http_parse_request() failed");
- nxt_mp_destroy(r->mem_pool);
- goto fail;
- }
-
- if (nxt_app_http_process_headers(r) != NXT_OK) {
- nxt_log_debug(thr->log, "nxt_app_http_process_headers() failed");
- nxt_mp_destroy(r->mem_pool);
- goto fail;
- }
-
- nxt_app->run(r);
-
- nxt_log_debug(thr->log, "app request done");
-
- if (nxt_slow_path(nxt_app_write_finish(r) == NXT_ERROR)) {
- goto fail;
- }
-
- continue;
-
- fail:
-
- close(s);
- nxt_nanosleep(1000000000); /* 1s */
- }
-}
-
+ size_t dump_size;
+ nxt_buf_t *b;
+ nxt_port_t *port;
+ nxt_app_rmsg_t rmsg = { msg->buf };
+ nxt_app_wmsg_t wmsg;
-static nxt_app_request_t *
-nxt_app_request_create(nxt_socket_t s, nxt_log_t *log)
-{
- nxt_mp_t *mp;
- nxt_conn_t *c;
- nxt_app_request_t *r;
+ b = msg->buf;
+ dump_size = b->mem.free - b->mem.pos;
- mp = nxt_mp_create(1024, 128, 256, 32);
- if (nxt_slow_path(mp == NULL)) {
- return NULL;
+ if (dump_size > 300) {
+ dump_size = 300;
}
- r = nxt_mp_zalloc(mp, sizeof(nxt_app_request_t));
- if (nxt_slow_path(r == NULL)) {
- return NULL;
- }
+ nxt_debug(task, "app data: %*s ...", dump_size, b->mem.pos);
- c = nxt_mp_zalloc(mp, sizeof(nxt_conn_t));
- if (nxt_slow_path(c == NULL)) {
- return NULL;
+ port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid,
+ msg->port_msg.reply_port);
+ if (nxt_slow_path(port == NULL)) {
+ //
}
- c->socket.fd = s;
- c->socket.data = r;
-
- c->task.thread = nxt_thread();
- c->task.log = log;
- c->task.ident = log->ident;
- c->socket.task = &c->task;
- c->read_timer.task = &c->task;
- c->write_timer.task = &c->task;
+ wmsg.port = port;
+ wmsg.write = NULL;
+ wmsg.buf = &wmsg.write;
+ wmsg.stream = msg->port_msg.stream;
- r->mem_pool = mp;
- r->event_conn = c;
- r->log = log;
-
- return r;
+ nxt_app->run(task, &rmsg, &wmsg);
}
-static nxt_int_t
-nxt_app_http_parse_request(nxt_app_request_t *r, u_char *buf, size_t size)
+nxt_inline nxt_port_t *
+nxt_app_msg_get_port(nxt_task_t *task, nxt_app_wmsg_t *msg)
{
- u_char *end;
- ssize_t n;
- nxt_err_t err;
- nxt_socket_t s;
- nxt_app_http_parse_state_t state;
-
- end = buf + size;
+ return msg->port;
+}
- state.pos = buf;
- state.handler = nxt_app_http_parse_request_line;
- for ( ;; ) {
- switch (state.handler(&r->header, state.pos, end, &state)) {
+u_char *
+nxt_app_msg_write_get_buf(nxt_task_t *task, nxt_app_wmsg_t *msg, size_t size)
+{
+ size_t free_size;
+ u_char *res;
+ nxt_buf_t *b;
+ nxt_port_t *port;
- case NXT_OK:
- continue;
+ res = NULL;
- case NXT_DONE:
- r->body_preread.length = end - state.pos;
- r->body_preread.start = state.pos;
+ do {
+ b = *msg->buf;
- return NXT_OK;
+ if (b == NULL) {
+ port = nxt_app_msg_get_port(task, msg);
+ if (nxt_slow_path(port == NULL)) {
+ return NULL;
+ }
- case NXT_AGAIN:
- s = r->event_conn->socket.fd;
- n = recv(s, end, SIZE - size, 0);
+ b = nxt_port_mmap_get_buf(task, port, size);
+ if (nxt_slow_path(b == NULL)) {
+ return NULL;
+ }
- if (nxt_slow_path(n <= 0)) {
- err = (n == 0) ? 0 : nxt_socket_errno;
+ *msg->buf = b;
- nxt_log_error(NXT_LOG_ERR, r->log, "recv(%d, %uz) failed %E",
- s, size, err);
+ free_size = nxt_buf_mem_free_size(&b->mem);
- return NXT_ERROR;
+ if (nxt_slow_path(free_size < size)) {
+ nxt_debug(task, "requested buffer too big (%z < %z)",
+ free_size, size);
+ return NULL;
}
- nxt_log_debug(r->log, "recv(%d, %uz): %z", s, SIZE - size, n);
-
- size += n;
- end += n;
-
- continue;
}
- return NXT_ERROR;
- }
-}
-
-
-static nxt_int_t
-nxt_app_http_parse_request_line(nxt_app_request_header_t *h, u_char *start,
- u_char *end, nxt_app_http_parse_state_t *state)
-{
- u_char *p;
+ free_size = nxt_buf_mem_free_size(&b->mem);
- for (p = start; /* void */; p++) {
+ if (free_size >= size) {
+ res = b->mem.free;
+ b->mem.free += size;
- if (nxt_slow_path(p == end)) {
- state->pos = p;
- return NXT_AGAIN;
+ return res;
}
- if (*p == ' ') {
- break;
- }
- }
-
- h->method.length = p - start;
- h->method.start = start;
-
- start = p + 1;
-
- p = nxt_memchr(start, ' ', end - start);
-
- if (nxt_slow_path(p == NULL)) {
- return NXT_AGAIN;
- }
-
- h->path.length = p - start;
- h->path.start = start;
-
- start = p + 1;
-
- if (nxt_slow_path((size_t) (end - start) < sizeof("HTTP/1.1\n") - 1)) {
- return NXT_AGAIN;
- }
-
- h->version.length = sizeof("HTTP/1.1") - 1;
- h->version.start = start;
-
- p = start + sizeof("HTTP/1.1") - 1;
+ if (nxt_port_mmap_increase_buf(task, b, size) == NXT_OK) {
+ res = b->mem.free;
+ b->mem.free += size;
- if (nxt_slow_path(*p == '\n')) {
- return nxt_app_http_parse_field_name(h, p + 1, end, state);
- }
-
- if (nxt_slow_path(end - p < 2)) {
- return NXT_AGAIN;
- }
+ return res;
+ }
- return nxt_app_http_parse_field_name(h, p + 2, end, state);
+ msg->buf = &b->next;
+ } while(1);
}
-static nxt_int_t
-nxt_app_http_parse_field_name(nxt_app_request_header_t *h, u_char *start,
- u_char *end, nxt_app_http_parse_state_t *state)
+nxt_int_t
+nxt_app_msg_write(nxt_task_t *task, nxt_app_wmsg_t *msg, u_char *c, size_t size)
{
- u_char *p;
- nxt_app_header_field_t *fld;
-
- if (nxt_slow_path(start == end)) {
- goto again;
- }
+ u_char *dst;
+ size_t dst_length;
- if (nxt_slow_path(*start == '\n')) {
- state->pos = start + 1;
- return NXT_DONE;
- }
+ if (c != NULL) {
+ dst_length = size + (size < 128 ? 1 : 4) + 1;
- if (*start == '\r') {
- if (nxt_slow_path(end - start < 2)) {
- goto again;
- }
-
- if (nxt_slow_path(start[1] != '\n')) {
+ dst = nxt_app_msg_write_get_buf(task, msg, dst_length);
+ if (nxt_slow_path(dst == NULL)) {
+ nxt_debug(task, "nxt_app_msg_write: get_buf(%uz) failed",
+ dst_length);
return NXT_ERROR;
}
- state->pos = start + 2;
- return NXT_DONE;
- }
+ dst = nxt_app_msg_write_length(dst, size + 1); /* +1 for trailing 0 */
- p = nxt_memchr(start, ':', end - start);
+ nxt_memcpy(dst, c, size);
+ dst[size] = 0;
- if (nxt_slow_path(p == NULL)) {
- goto again;
- }
-
- fld = &h->fields[h->fields_num];
-
- fld->name.length = p - start;
- fld->name.start = start;
+ nxt_debug(task, "nxt_app_msg_write: %uz %*s", size, (int)size, c);
+ } else {
+ dst_length = 1;
- return nxt_app_http_parse_field_value(h, p + 1, end, state);
+ dst = nxt_app_msg_write_get_buf(task, msg, dst_length);
+ if (nxt_slow_path(dst == NULL)) {
+ nxt_debug(task, "nxt_app_msg_write: get_buf(%uz) failed",
+ dst_length);
+ return NXT_ERROR;
+ }
-again:
+ dst = nxt_app_msg_write_length(dst, 0);
- state->pos = start;
- state->handler = nxt_app_http_parse_field_name;
+ nxt_debug(task, "nxt_app_msg_write: NULL");
+ }
- return NXT_AGAIN;
+ return NXT_OK;
}
-static nxt_int_t
-nxt_app_http_parse_field_value(nxt_app_request_header_t *h, u_char *start,
- u_char *end, nxt_app_http_parse_state_t *state)
+nxt_int_t
+nxt_app_msg_write_prefixed_upcase(nxt_task_t *task, nxt_app_wmsg_t *msg,
+ const nxt_str_t *prefix, const nxt_str_t *v)
{
- u_char *p;
- nxt_app_header_field_t *fld;
-
- for ( ;; ) {
- if (nxt_slow_path(start == end)) {
- goto again;
- }
-
- if (*start != ' ') {
- break;
- }
+ u_char *dst, *src;
+ size_t i, length, dst_length;
- start++;
- }
+ length = prefix->length + v->length;
- p = nxt_memchr(start, '\n', end - start);
+ dst_length = length + (length < 128 ? 1 : 4) + 1;
- if (nxt_slow_path(p == NULL)) {
- goto again;
+ dst = nxt_app_msg_write_get_buf(task, msg, dst_length);
+ if (nxt_slow_path(dst == NULL)) {
+ return NXT_ERROR;
}
- fld = &h->fields[h->fields_num];
-
- fld->value.length = p - start;
- fld->value.start = start;
-
- fld->value.length -= (p[-1] == '\r');
-
- h->fields_num++;
-
- state->pos = p + 1;
- state->handler = nxt_app_http_parse_field_name;
-
- return NXT_OK;
-
-again:
-
- state->pos = start;
- state->handler = nxt_app_http_parse_field_value;
-
- return NXT_AGAIN;
-}
-
+ dst = nxt_app_msg_write_length(dst, length + 1); /* +1 for trailing 0 */
-static nxt_int_t
-nxt_app_http_process_headers(nxt_app_request_t *r)
-{
- nxt_uint_t i;
- nxt_app_header_field_t *fld;
+ nxt_memcpy(dst, prefix->start, prefix->length);
+ dst += prefix->length;
- static const u_char content_length[14] = "Content-Length";
- static const u_char content_type[12] = "Content-Type";
+ src = v->start;
+ for (i = 0; i < v->length; i++, dst++, src++) {
- for (i = 0; i < r->header.fields_num; i++) {
- fld = &r->header.fields[i];
-
- if (fld->name.length == sizeof(content_length)
- && nxt_memcasecmp(fld->name.start, content_length,
- sizeof(content_length)) == 0)
- {
- r->header.content_length = &fld->value;
- r->body_rest = nxt_off_t_parse(fld->value.start, fld->value.length);
+ if (*src >= 'a' && *src <= 'z') {
+ *dst = *src & ~0x20;
continue;
}
- if (fld->name.length == sizeof(content_type)
- && nxt_memcasecmp(fld->name.start, content_type,
- sizeof(content_type)) == 0)
- {
- r->header.content_type = &fld->value;
+ if (*src == '-') {
+ *dst = '_';
continue;
}
- }
- return NXT_OK;
-}
-
-
-static void
-nxt_app_conn_update(nxt_thread_t *thr, nxt_conn_t *c, nxt_log_t *log)
-{
- c->socket.write_ready = 1;
-
- c->socket.log = &c->log;
- c->log = *log;
-
- /* The while loop skips possible uint32_t overflow. */
-
- while (c->log.ident == 0) {
- c->log.ident = nxt_task_next_ident();
+ *dst = *src;
}
- thr->engine->connections++;
-
- c->task.thread = thr;
- c->task.log = &c->log;
- c->task.ident = c->log.ident;
+ *dst = 0;
- c->io = thr->engine->event.io;
- c->max_chunk = NXT_INT32_T_MAX;
- c->sendfile = NXT_CONN_SENDFILE_UNSET;
-
- c->socket.read_work_queue = &thr->engine->read_work_queue;
- c->socket.write_work_queue = &thr->engine->write_work_queue;
- c->read_work_queue = &thr->engine->read_work_queue;
- c->write_work_queue = &thr->engine->write_work_queue;
-
- nxt_conn_timer_init(&c->read_timer, c, c->socket.read_work_queue);
- nxt_conn_timer_init(&c->write_timer, c, c->socket.write_work_queue);
-
- nxt_log_debug(&c->log, "event connections: %uD", thr->engine->connections);
+ return NXT_OK;
}
nxt_int_t
-nxt_app_http_read_body(nxt_app_request_t *r, u_char *start, size_t length)
+nxt_app_msg_read_str(nxt_task_t *task, nxt_app_rmsg_t *msg, nxt_str_t *str)
{
- size_t preread;
- ssize_t n;
- nxt_err_t err;
+ size_t length;
+ nxt_buf_t *buf;
- if ((off_t) length > r->body_rest) {
- length = (size_t) r->body_rest;
- }
+ do {
+ buf = msg->buf;
- preread = 0;
-
- if (r->body_preread.length != 0) {
- preread = nxt_min(r->body_preread.length, length);
+ if (nxt_slow_path(buf == NULL)) {
+ return NXT_DONE;
+ }
- nxt_memcpy(start, r->body_preread.start, preread);
+ if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) < 1)) {
+ if (nxt_fast_path(nxt_buf_mem_used_size(&buf->mem) == 0)) {
+ msg->buf = buf->next;
+ continue;
+ }
+ return NXT_ERROR;
+ }
- r->body_preread.length -= preread;
- r->body_preread.start += preread;
+ if (buf->mem.pos[0] >= 128) {
+ if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) < 4)) {
+ return NXT_ERROR;
+ }
+ }
- r->body_rest -= preread;
+ break;
+ } while (1);
- length -= preread;
- }
+ buf->mem.pos = nxt_app_msg_read_length(buf->mem.pos, &length);
- if (length == 0) {
- return NXT_OK;
+ if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) < (intptr_t)length))
+ {
+ return NXT_ERROR;
}
- n = recv(r->event_conn->socket.fd, start + preread, length, 0);
-
- if (nxt_slow_path(n < (ssize_t) length)) {
- if (n <= 0) {
- err = (n == 0) ? 0 : nxt_socket_errno;
+ if (length > 0) {
+ str->start = buf->mem.pos;
+ str->length = length - 1;
- nxt_log_error(NXT_LOG_ERR, r->log, "recv(%d, %uz) failed %E",
- r->event_conn->socket.fd, length, err);
-
- return NXT_ERROR;
- }
+ buf->mem.pos += length;
- nxt_log_error(NXT_LOG_ERR, r->log,
- "client prematurely closed connection");
+ nxt_debug(task, "nxt_read_str: %d %*s", (int)length - 1,
+ (int)length - 1, str->start);
+ } else {
+ str->start = NULL;
+ str->length = 0;
- return NXT_ERROR;
+ nxt_debug(task, "nxt_read_str: NULL");
}
- r->body_rest -= n;
-
return NXT_OK;
}
nxt_int_t
-nxt_app_write(nxt_app_request_t *r, const u_char *data, size_t length)
+nxt_app_msg_read_nvp(nxt_task_t *task, nxt_app_rmsg_t *rmsg, nxt_str_t *n,
+ nxt_str_t *v)
{
- void *start;
- size_t free;
- nxt_err_t err;
- nxt_buf_t *b, *out, **next;
- nxt_uint_t bufs;
- nxt_app_buf_t *ab;
-
- out = NULL;
- next = &out;
+ nxt_int_t rc;
- b = r->output_buf;
-
- if (b == NULL) {
- bufs = 0;
- goto get_buf;
+ rc = nxt_app_msg_read_str(task, rmsg, n);
+ if (nxt_slow_path(rc != NXT_OK)) {
+ return rc;
}
- bufs = 1;
-
- for ( ;; ) {
- free = nxt_buf_mem_free_size(&b->mem);
-
- if (free > length) {
- b->mem.free = nxt_cpymem(b->mem.free, data, length);
- break;
- }
-
- b->mem.free = nxt_cpymem(b->mem.free, data, free);
+ rc = nxt_app_msg_read_str(task, rmsg, v);
+ if (nxt_slow_path(rc != NXT_OK)) {
+ return rc;
+ }
- data += free;
- length -= free;
+ return rc;
+}
- *next = b;
- next = &b->next;
- if (length == 0) {
- b = NULL;
- break;
- }
-
- if (bufs == nxt_app_buf_max_number) {
- bufs = 0;
- *next = NULL;
+nxt_int_t
+nxt_app_msg_read_size(nxt_task_t *task, nxt_app_rmsg_t *msg, size_t *size)
+{
+ nxt_buf_t *buf;
- nxt_app_buf_send(r->event_conn, out);
+ do {
+ buf = msg->buf;
- out = NULL;
- next = &out;
+ if (nxt_slow_path(buf == NULL)) {
+ return NXT_DONE;
}
- get_buf:
-
- if (nxt_slow_path(nxt_thread_mutex_lock(&nxt_app_mutex) != NXT_OK)) {
- return NXT_ERROR;
- }
-
- for ( ;; ) {
- b = nxt_app_buf_free;
-
- if (b != NULL) {
- nxt_app_buf_free = b->next;
- break;
- }
-
- if (nxt_app_buf_current_number < nxt_app_buf_max_number) {
- break;
- }
-
- err = nxt_thread_cond_wait(&nxt_app_cond, &nxt_app_mutex,
- NXT_INFINITE_NSEC);
-
- if (nxt_slow_path(err != 0)) {
- (void) nxt_thread_mutex_unlock(&nxt_app_mutex);
- return NXT_ERROR;
+ if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) < 1)) {
+ if (nxt_fast_path(nxt_buf_mem_used_size(&buf->mem) == 0)) {
+ msg->buf = buf->next;
+ continue;
}
+ return NXT_ERROR;
}
- (void) nxt_thread_mutex_unlock(&nxt_app_mutex);
-
- if (b == NULL) {
- start = nxt_malloc(4096);
- if (nxt_slow_path(start == NULL)) {
+ if (buf->mem.pos[0] >= 128) {
+ if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) < 4)) {
return NXT_ERROR;
}
-
- ab = nxt_zalloc(sizeof(nxt_app_buf_t));
- if (nxt_slow_path(ab == NULL)) {
- return NXT_ERROR;
- }
-
- b = &ab->buf;
-
- nxt_buf_mem_init(b, start, 4096);
-
- b->completion_handler = NULL;
-
- nxt_app_buf_current_number++;
}
- bufs++;
- }
+ break;
+ } while (1);
- r->output_buf = b;
+ buf->mem.pos = nxt_app_msg_read_length(buf->mem.pos, size);
- if (out != NULL) {
- *next = NULL;
-
- nxt_app_buf_send(r->event_conn, out);
- }
+ nxt_debug(task, "nxt_read_size: %d", (int)*size);
return NXT_OK;
}
static nxt_int_t
-nxt_app_write_finish(nxt_app_request_t *r)
+nxt_app_request_content_length(void *ctx, nxt_http_field_t *field,
+ nxt_log_t *log)
{
- nxt_buf_t *b, *out;
+ nxt_str_t *v;
+ nxt_app_parse_ctx_t *c;
+ nxt_app_request_header_t *h;
- b = nxt_buf_sync_alloc(r->mem_pool, NXT_BUF_SYNC_LAST);
- if (nxt_slow_path(b == NULL)) {
- return NXT_ERROR;
- }
-
- b->completion_handler = NULL;
- b->parent = (nxt_buf_t *) r;
+ c = ctx;
+ h = &c->r.header;
+ v = &field->value;
- out = r->output_buf;
-
- if (out != NULL) {
- r->output_buf = NULL;
- out->next = b;
-
- } else {
- out = b;
- }
-
- nxt_app_buf_send(r->event_conn, out);
+ h->content_length = *v;
+ h->parsed_content_length = nxt_off_t_parse(v->start, v->length);
return NXT_OK;
}
-static void
-nxt_app_buf_send(nxt_conn_t *c, nxt_buf_t *out)
+static nxt_int_t
+nxt_app_request_content_type(void *ctx, nxt_http_field_t *field,
+ nxt_log_t *log)
{
- nxt_app_buf_t *ab;
+ nxt_app_parse_ctx_t *c;
+ nxt_app_request_header_t *h;
- ab = nxt_container_of(out, nxt_app_buf_t, buf);
+ c = ctx;
+ h = &c->r.header;
- nxt_work_set(&ab->work, nxt_app_delivery_handler, &c->task, c, out);
+ h->content_type = field->value;
- nxt_event_engine_post(nxt_app_engine, &ab->work);
+ return NXT_OK;
}
-static void
-nxt_app_delivery_handler(nxt_task_t *task, void *obj, void *data)
+static nxt_int_t
+nxt_app_request_cookie(void *ctx, nxt_http_field_t *field,
+ nxt_log_t *log)
{
- nxt_mp_t *mp;
- nxt_buf_t *b;
- nxt_conn_t *c;
+ nxt_app_parse_ctx_t *c;
+ nxt_app_request_header_t *h;
- c = obj;
- b = data;
+ c = ctx;
+ h = &c->r.header;
- nxt_debug(task, "app delivery handler");
+ h->cookie = field->value;
- if (c->write != NULL) {
- nxt_buf_chain_add(&c->write, b);
- return;
- }
+ return NXT_OK;
+}
- if (c->mem_pool == NULL) {
- mp = nxt_mp_create(1024, 128, 256, 32);
- if (nxt_slow_path(mp == NULL)) {
- close(c->socket.fd);
- return;
- }
- c->mem_pool = mp;
- nxt_app_conn_update(task->thread, c, &nxt_main_log);
- }
+static nxt_int_t
+nxt_app_request_host(void *ctx, nxt_http_field_t *field,
+ nxt_log_t *log)
+{
+ nxt_app_parse_ctx_t *c;
+ nxt_app_request_header_t *h;
- if (c->socket.timedout || c->socket.error != 0) {
- nxt_buf_chain_add(&nxt_app_buf_done, b);
- nxt_work_queue_add(c->write_work_queue, nxt_app_delivery_completion,
- task, c, NULL);
- return;
- }
+ c = ctx;
+ h = &c->r.header;
- c->write = b;
- c->write_state = &nxt_app_delivery_write_state;
+ h->host = field->value;
- nxt_conn_write(task->thread->engine, c);
+ return NXT_OK;
}
-static const nxt_event_conn_state_t nxt_app_delivery_write_state
- nxt_aligned(64) =
-{
- .ready_handler = nxt_app_delivery_ready,
- .error_handler = nxt_app_delivery_error,
+static nxt_http_fields_hash_entry_t nxt_app_request_fields[] = {
+ { nxt_string("Content-Length"), &nxt_app_request_content_length, 0 },
+ { nxt_string("Content-Type"), &nxt_app_request_content_type, 0 },
+ { nxt_string("Cookie"), &nxt_app_request_cookie, 0 },
+ { nxt_string("Host"), &nxt_app_request_host, 0 },
- .timer_handler = nxt_app_delivery_timeout,
- .timer_value = nxt_app_delivery_timer_value,
- .timer_data = 0,
- .timer_autoreset = 1,
+ { nxt_null_string, NULL, 0 }
};
-static void
-nxt_app_delivery_ready(nxt_task_t *task, void *obj, void *data)
+nxt_int_t
+nxt_app_http_req_init(nxt_task_t *task, nxt_app_parse_ctx_t *ctx)
{
- nxt_buf_t *b, *next;
- nxt_conn_t *c;
+ nxt_int_t rc;
- c = obj;
+ ctx->mem_pool = nxt_mp_create(1024, 128, 256, 32);
- nxt_debug(task, "app delivery ready");
-
- for (b = c->write; b != NULL; b = next) {
-
- if (nxt_buf_is_mem(b)) {
- if (b->mem.pos != b->mem.free) {
- break;
- }
- }
-
- next = b->next;
- b->next = nxt_app_buf_done;
- nxt_app_buf_done = b;
+ rc = nxt_http_parse_request_init(&ctx->parser, ctx->mem_pool);
+ if (nxt_slow_path(rc != NXT_OK)) {
+ return rc;
}
- nxt_work_queue_add(c->write_work_queue,
- nxt_app_delivery_completion, task, c, NULL);
+ ctx->parser.fields_hash = nxt_app_request_fields_hash;
+
+ return NXT_OK;
}
-static const nxt_event_conn_state_t nxt_app_delivery_close_state
- nxt_aligned(64) =
+nxt_int_t
+nxt_app_http_req_parse(nxt_task_t *task, nxt_app_parse_ctx_t *ctx,
+ nxt_buf_t *buf)
{
- .ready_handler = nxt_app_close_request,
-};
+ nxt_int_t rc;
+ nxt_app_request_body_t *b;
+ nxt_http_request_parse_t *p;
+ nxt_app_request_header_t *h;
+ p = &ctx->parser;
+ b = &ctx->r.body;
+ h = &ctx->r.header;
-static void
-nxt_app_delivery_completion(nxt_task_t *task, void *obj, void *data)
-{
- nxt_buf_t *b, *bn, *free;
- nxt_conn_t *c;
- nxt_app_request_t *r;
+ if (h->done == 0) {
+ rc = nxt_http_parse_request(p, &buf->mem);
- nxt_debug(task, "app delivery completion");
+ if (nxt_slow_path(rc != NXT_DONE)) {
+ return rc;
+ }
- free = NULL;
+ rc = nxt_http_fields_process(p->fields, ctx, task->log);
- for (b = nxt_app_buf_done; b; b = bn) {
- bn = b->next;
+ if (nxt_slow_path(rc != NXT_OK)) {
+ return rc;
+ }
- if (nxt_buf_is_mem(b)) {
- b->mem.pos = b->mem.start;
- b->mem.free = b->mem.start;
+ h->fields = p->fields;
+ h->done = 1;
- b->next = free;
- free = b;
+ h->version.start = p->version.str;
+ h->version.length = nxt_strlen(p->version.str);
- continue;
- }
+ h->method = p->method;
+
+ h->path.start = p->target_start;
+ h->path.length = p->target_end - p->target_start;
+
+ h->path_no_query = h->path;
- if (nxt_buf_is_last(b)) {
- r = (nxt_app_request_t *) b->parent;
+ if (p->args_start != NULL) {
+ h->query.start = p->args_start;
+ h->query.length = p->target_end - p->args_start;
- c = r->event_conn;
- c->write_state = &nxt_app_delivery_close_state;
+ if (p->args_start > p->target_start) {
+ h->path_no_query.length = p->args_start - p->target_start - 1;
+ }
+ }
- nxt_conn_close(task->thread->engine, c);
+ if (h->parsed_content_length == 0) {
+ b->done = 1;
}
}
- nxt_app_buf_done = NULL;
+ if (b->done == 0) {
+ b->preread.length = buf->mem.free - buf->mem.pos;
+ b->preread.start = buf->mem.pos;
- if (free == NULL) {
- return;
+ b->done = b->preread.length >= (size_t) h->parsed_content_length;
}
- if (nxt_slow_path(nxt_thread_mutex_lock(&nxt_app_mutex) != NXT_OK)) {
- return;
+ if (h->done == 1 && b->done == 1) {
+ return NXT_DONE;
}
- nxt_buf_chain_add(&nxt_app_buf_free, free);
-
- (void) nxt_thread_mutex_unlock(&nxt_app_mutex);
-
- nxt_thread_time_update(task->thread);
-
- (void) nxt_thread_cond_signal(&nxt_app_cond);
+ return NXT_AGAIN;
}
-static void
-nxt_app_delivery_error(nxt_task_t *task, void *obj, void *data)
+nxt_int_t
+nxt_app_http_req_done(nxt_task_t *task, nxt_app_parse_ctx_t *ctx)
{
- nxt_conn_t *c;
-
- c = obj;
+ nxt_mp_destroy(ctx->mem_pool);
- nxt_debug(task, "app delivery error");
-
- nxt_app_delivery_done(task, c);
+ return NXT_OK;
}
-static void
-nxt_app_delivery_timeout(nxt_task_t *task, void *obj, void *data)
+nxt_int_t
+nxt_app_msg_flush(nxt_task_t *task, nxt_app_wmsg_t *msg, nxt_bool_t last)
{
- nxt_conn_t *c;
-
- c = obj;
-
- nxt_debug(task, "app delivery timeout");
+ nxt_int_t rc;
+ nxt_buf_t *b;
+ nxt_port_t *port;
- nxt_app_delivery_done(task, c);
-}
+ rc = NXT_OK;
+ port = nxt_app_msg_get_port(task, msg);
+ if (nxt_slow_path(port == NULL)) {
+ return NXT_ERROR;
+ }
-static nxt_msec_t
-nxt_app_delivery_timer_value(nxt_conn_t *c, uintptr_t data)
-{
- /* 30000 ms */
- return 30000;
-}
+ if (nxt_slow_path(last == 1)) {
+ do {
+ b = *msg->buf;
+ if (b == NULL) {
+ b = nxt_buf_sync_alloc(port->mem_pool, NXT_BUF_SYNC_LAST);
+ *msg->buf = b;
+ break;
+ }
-static void
-nxt_app_delivery_done(nxt_task_t *task, nxt_conn_t *c)
-{
- if (c->write == NULL) {
- return;
+ msg->buf = &b->next;
+ } while(1);
}
- nxt_debug(task, "app delivery done");
+ if (nxt_slow_path(msg->write != NULL)) {
+ rc = nxt_port_socket_write(task, port, NXT_PORT_MSG_DATA,
+ -1, msg->stream, 0, msg->write);
- nxt_buf_chain_add(&nxt_app_buf_done, c->write);
-
- c->write = NULL;
+ msg->write = NULL;
+ msg->buf = &msg->write;
+ }
- nxt_work_queue_add(c->write_work_queue,
- nxt_app_delivery_completion, task, c, NULL);
+ return rc;
}
-static void
-nxt_app_close_request(nxt_task_t *task, void *obj, void *data)
+nxt_int_t
+nxt_app_msg_write_raw(nxt_task_t *task, nxt_app_wmsg_t *msg, const u_char *c,
+ size_t size)
{
- nxt_conn_t *c;
- nxt_app_request_t *r;
+ u_char *dst;
- c = obj;
+ dst = nxt_app_msg_write_get_buf(task, msg, size);
+ if (nxt_slow_path(dst == NULL)) {
+ return NXT_ERROR;
+ }
- nxt_debug(task, "app close connection");
+ nxt_memcpy(dst, c, size);
- r = c->socket.data;
+ nxt_debug(task, "nxt_app_msg_write_raw: %d %*s", (int)size,
+ (int)size, c);
- nxt_mp_destroy(c->mem_pool);
- nxt_mp_destroy(r->mem_pool);
+ return NXT_OK;
}
diff --git a/src/nxt_application.h b/src/nxt_application.h
index 7995ec0d..fe36ea4d 100644
--- a/src/nxt_application.h
+++ b/src/nxt_application.h
@@ -1,5 +1,6 @@
/*
+ * Copyright (C) Max Romanov
* Copyright (C) Valentin V. Bartenev
* Copyright (C) NGINX, Inc.
*/
@@ -17,36 +18,124 @@ typedef struct {
typedef struct {
nxt_str_t method;
nxt_str_t path;
+ nxt_str_t path_no_query;
+ nxt_str_t query;
nxt_str_t version;
- nxt_uint_t fields_num;
- nxt_app_header_field_t *fields;
- nxt_str_t *content_length;
- nxt_str_t *content_type;
+ nxt_list_t *fields;
+
+ nxt_str_t cookie;
+ nxt_str_t content_length;
+ nxt_str_t content_type;
+ nxt_str_t host;
+
+ off_t parsed_content_length;
+ nxt_bool_t done;
} nxt_app_request_header_t;
typedef struct {
- nxt_event_engine_t *engine;
- nxt_mp_t *mem_pool;
- nxt_conn_t *event_conn;
- nxt_log_t *log;
+ nxt_str_t preread;
+ nxt_bool_t done;
+} nxt_app_request_body_t;
- nxt_buf_t *output_buf;
+typedef struct {
nxt_app_request_header_t header;
- nxt_str_t body_preread;
- off_t body_rest;
- void *ctx;
+ nxt_app_request_body_t body;
} nxt_app_request_t;
typedef struct {
- nxt_int_t (*init)(nxt_thread_t *thr);
- nxt_int_t (*start)(nxt_app_request_t *r);
- nxt_int_t (*header)(nxt_app_request_t *r,
- nxt_app_header_field_t *field);
- nxt_int_t (*run)(nxt_app_request_t *r);
+ nxt_app_request_t r;
+ nxt_http_request_parse_t parser;
+ nxt_mp_t *mem_pool;
+} nxt_app_parse_ctx_t;
+
+
+nxt_int_t nxt_app_http_req_init(nxt_task_t *task, nxt_app_parse_ctx_t *ctx);
+
+nxt_int_t nxt_app_http_req_parse(nxt_task_t *task, nxt_app_parse_ctx_t *ctx,
+ nxt_buf_t *buf);
+
+nxt_int_t nxt_app_http_req_done(nxt_task_t *task, nxt_app_parse_ctx_t *ctx);
+
+nxt_int_t nxt_app_http_init(nxt_task_t *task, nxt_runtime_t *rt);
+
+
+typedef struct nxt_app_wmsg_s nxt_app_wmsg_t;
+typedef struct nxt_app_rmsg_s nxt_app_rmsg_t;
+
+struct nxt_app_wmsg_s {
+ nxt_port_t *port; /* where prepared buf will be sent */
+ nxt_buf_t *write;
+ nxt_buf_t **buf;
+ uint32_t stream;
+};
+
+struct nxt_app_rmsg_s {
+ nxt_buf_t *buf; /* current buffer to read */
+};
+
+
+nxt_inline u_char *
+nxt_app_msg_write_length(u_char *dst, size_t length);
+
+/* TODO asynchronous mmap buffer assignment */
+u_char *nxt_app_msg_write_get_buf(nxt_task_t *task, nxt_app_wmsg_t *msg,
+ size_t size);
+
+nxt_int_t nxt_app_msg_write(nxt_task_t *task, nxt_app_wmsg_t *msg,
+ u_char *c, size_t size);
+
+nxt_int_t nxt_app_msg_write_prefixed_upcase(nxt_task_t *task,
+ nxt_app_wmsg_t *msg, const nxt_str_t *prefix, const nxt_str_t *v);
+
+nxt_inline nxt_int_t
+nxt_app_msg_write_nvp_(nxt_task_t *task, nxt_app_wmsg_t *msg,
+ u_char *n, size_t nsize, u_char *v, size_t vsize);
+
+
+#define nxt_app_msg_write_const(task, msg, c) \
+ nxt_app_msg_write((task), (msg), (u_char *)(c), sizeof(c) - 1)
+
+#define nxt_app_msg_write_str(task, msg, str) \
+ nxt_app_msg_write((task), (msg), (str)->start, (str)->length)
+
+#define nxt_app_msg_write_cstr(task, msg, c) \
+ nxt_app_msg_write((task), (msg), (c), nxt_strlen(c))
+
+#define nxt_app_msg_write_nvp(task, msg, n, v) \
+ nxt_app_msg_write_nvp_((task), (msg), (u_char *)(n), sizeof(n) - 1, \
+ (v)->start, (v)->length)
+
+nxt_inline nxt_int_t nxt_app_msg_write_size(nxt_task_t *task,
+ nxt_app_wmsg_t *msg, size_t size);
+
+nxt_int_t nxt_app_msg_flush(nxt_task_t *task, nxt_app_wmsg_t *msg,
+ nxt_bool_t last);
+
+nxt_int_t nxt_app_msg_write_raw(nxt_task_t *task, nxt_app_wmsg_t *msg,
+ const u_char *c, size_t size);
+
+nxt_int_t nxt_app_msg_read_str(nxt_task_t *task, nxt_app_rmsg_t *msg,
+ nxt_str_t *str);
+
+nxt_int_t nxt_app_msg_read_nvp(nxt_task_t *task, nxt_app_rmsg_t *rmsg,
+ nxt_str_t *n, nxt_str_t *v);
+
+nxt_int_t nxt_app_msg_read_size(nxt_task_t *task, nxt_app_rmsg_t *rmsg,
+ size_t *size);
+
+
+typedef struct {
+ nxt_int_t (*init)(nxt_task_t *task);
+ nxt_int_t (*prepare_msg)(nxt_task_t *task,
+ nxt_app_request_t *r,
+ nxt_app_wmsg_t *wmsg);
+ nxt_int_t (*run)(nxt_task_t *task,
+ nxt_app_rmsg_t *rmsg,
+ nxt_app_wmsg_t *wmsg);
} nxt_application_module_t;
@@ -58,5 +147,74 @@ nxt_int_t nxt_app_http_read_body(nxt_app_request_t *r, u_char *data,
size_t len);
nxt_int_t nxt_app_write(nxt_app_request_t *r, const u_char *data, size_t len);
+nxt_inline u_char *
+nxt_app_msg_write_length(u_char *dst, size_t length)
+{
+ if (length < 128) {
+ *dst = length;
+ dst++;
+ } else {
+ dst[0] = 0x80U | (length >> 24);
+ dst[1] = 0xFFU & (length >> 16);
+ dst[2] = 0xFFU & (length >> 8);
+ dst[3] = 0xFFU & length;
+ dst += 4;
+ }
+
+ return dst;
+}
+
+
+nxt_inline nxt_int_t
+nxt_app_msg_write_nvp_(nxt_task_t *task, nxt_app_wmsg_t *msg,
+ u_char *n, size_t nsize, u_char *v, size_t vsize)
+{
+ nxt_int_t rc;
+
+ rc = nxt_app_msg_write(task, msg, n, nsize);
+ if (nxt_slow_path(rc != NXT_OK)) {
+ return rc;
+ }
+
+ return nxt_app_msg_write(task, msg, v, vsize);
+}
+
+
+nxt_inline nxt_int_t
+nxt_app_msg_write_size(nxt_task_t *task, nxt_app_wmsg_t *msg, size_t size)
+{
+ u_char *dst;
+ size_t dst_length;
+
+ dst_length = size < 128 ? 1 : 4;
+
+ dst = nxt_app_msg_write_get_buf(task, msg, dst_length);
+ if (nxt_slow_path(dst == NULL)) {
+ return NXT_ERROR;
+ }
+
+ nxt_app_msg_write_length(dst, size);
+
+ return NXT_OK;
+}
+
+
+nxt_inline u_char *
+nxt_app_msg_read_length(u_char *src, size_t *length)
+{
+ if (src[0] < 128) {
+ *length = src[0];
+ src++;
+ } else {
+ *length = ((src[0] & 0x7fU) << 24) +
+ (src[1] << 16) +
+ (src[2] << 8) +
+ src[3];
+ src += 4;
+ }
+
+ return src;
+}
+
#endif /* _NXT_APPLICATION_H_INCLIDED_ */
diff --git a/src/nxt_master_process.c b/src/nxt_master_process.c
index 7a23baf2..54b5c11c 100644
--- a/src/nxt_master_process.c
+++ b/src/nxt_master_process.c
@@ -189,7 +189,7 @@ nxt_master_start_worker_processes(nxt_task_t *task, nxt_runtime_t *rt)
init->start = nxt_app_start;
init->name = "worker process";
init->user_cred = &rt->user_cred;
- init->port_handlers = nxt_worker_process_port_handlers;
+ init->port_handlers = nxt_app_process_port_handlers;
init->signals = nxt_worker_process_signals;
init->type = NXT_PROCESS_WORKER;
diff --git a/src/nxt_master_process.h b/src/nxt_master_process.h
index 46c570be..ff84ca9f 100644
--- a/src/nxt_master_process.h
+++ b/src/nxt_master_process.h
@@ -17,6 +17,7 @@ nxt_int_t nxt_router_start(nxt_task_t *task, nxt_runtime_t *rt);
extern nxt_port_handler_t nxt_worker_process_port_handlers[];
+extern nxt_port_handler_t nxt_app_process_port_handlers[];
extern const nxt_sig_event_t nxt_master_process_signals[];
extern const nxt_sig_event_t nxt_worker_process_signals[];
diff --git a/src/nxt_runtime.h b/src/nxt_runtime.h
index 13072021..8316d1e3 100644
--- a/src/nxt_runtime.h
+++ b/src/nxt_runtime.h
@@ -135,6 +135,8 @@ void nxt_cdecl nxt_log_time_handler(nxt_uint_t level, nxt_log_t *log,
void nxt_stream_connection_init(nxt_task_t *task, void *obj, void *data);
nxt_int_t nxt_app_start(nxt_task_t *task, nxt_runtime_t *rt);
+void nxt_port_app_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
+
#define nxt_runtime_process_each(rt, process) \
do { \
diff --git a/src/nxt_worker_process.c b/src/nxt_worker_process.c
index 6407c734..4c5b6888 100644
--- a/src/nxt_worker_process.c
+++ b/src/nxt_worker_process.c
@@ -30,6 +30,15 @@ nxt_port_handler_t nxt_worker_process_port_handlers[] = {
};
+nxt_port_handler_t nxt_app_process_port_handlers[] = {
+ nxt_worker_process_quit_handler,
+ nxt_port_new_port_handler,
+ nxt_port_change_log_file_handler,
+ nxt_port_mmap_handler,
+ nxt_port_app_data_handler,
+};
+
+
const nxt_sig_event_t nxt_worker_process_signals[] = {
nxt_event_signal(SIGHUP, nxt_worker_process_signal_handler),
nxt_event_signal(SIGINT, nxt_worker_process_sigterm_handler),