diff options
author | Max Romanov <max.romanov@nginx.com> | 2017-06-23 19:20:08 +0300 |
---|---|---|
committer | Max Romanov <max.romanov@nginx.com> | 2017-06-23 19:20:08 +0300 |
commit | e7a0634a718ca1f2379f4694c17ef4219f5538fa (patch) | |
tree | fea96a90de03c1297b9378c0cfc92bd463ed5795 /src/nxt_application.c | |
parent | 3b9aa27625e50dea5fed2644e80ea454605a3006 (diff) | |
download | unit-e7a0634a718ca1f2379f4694c17ef4219f5538fa.tar.gz unit-e7a0634a718ca1f2379f4694c17ef4219f5538fa.tar.bz2 |
Application-side message processing.
Usage on the router side:
nxt_app_wmsg_t wmsg;
nxt_app_parse_ctx_t parse_ctx;
nxt_app_http_req_init(task, &parse_ctx);
/* parse incoming request data */
if (nxt_app_http_req_parse(task, &parse_ctx, buf) == NXT_DONE) {
/* choose app */
nxt_app = nxt_select_app(... &parse_ctx.r ...);
/* find port */
wmsg.port = nxt_get_app_port(... nxt_app ...);
wmsg.buf = &wmsg.write;
/* fill write message buffer in shared mem */
nxt_app->prepare_msg(task, &parse_ctx.r, &wmsg);
/* send message to app for processing */
nxt_port_socket_write(task, wmsg.port, NXT_PORT_MSG_DATA,
-1, 0, 0, wmsg.write);
}
Diffstat (limited to 'src/nxt_application.c')
-rw-r--r-- | src/nxt_application.c | 1079 |
1 files changed, 332 insertions, 747 deletions
diff --git a/src/nxt_application.c b/src/nxt_application.c index f3d014cb..d7393de7 100644 --- a/src/nxt_application.c +++ b/src/nxt_application.c @@ -1,5 +1,6 @@ /* + * Copyright (C) Max Romanov * Copyright (C) Igor Sysoev * Copyright (C) Valentin V. Bartenev * Copyright (C) NGINX, Inc. @@ -10,83 +11,17 @@ #include <nxt_application.h> -#define NXT_PARSE_AGAIN (u_char *) -1 - - -static nxt_int_t nxt_app_listen_socket(nxt_task_t *task, nxt_runtime_t *rt); -static void nxt_app_thread(void *ctx); -static nxt_app_request_t *nxt_app_request_create(nxt_socket_t s, - nxt_log_t *log); -static void nxt_app_conn_update(nxt_thread_t *thr, nxt_conn_t *c, - nxt_log_t *log); -static nxt_int_t nxt_app_write_finish(nxt_app_request_t *r); -static void nxt_app_buf_send(nxt_conn_t *c, nxt_buf_t *out); -static void nxt_app_delivery_handler(nxt_task_t *task, void *obj, void *data); -static void nxt_app_delivery_ready(nxt_task_t *task, void *obj, void *data); -static void nxt_app_delivery_completion(nxt_task_t *task, void *obj, - void *data); -static void nxt_app_delivery_error(nxt_task_t *task, void *obj, void *data); -static void nxt_app_delivery_timeout(nxt_task_t *task, void *obj, void *data); -static nxt_msec_t nxt_app_delivery_timer_value(nxt_conn_t *c, - uintptr_t data); -static void nxt_app_delivery_done(nxt_task_t *task, nxt_conn_t *c); -static void nxt_app_close_request(nxt_task_t *task, void *obj, void *data); - - -typedef struct nxt_app_http_parse_state_s nxt_app_http_parse_state_t; - -struct nxt_app_http_parse_state_s { - u_char *pos; - nxt_int_t (*handler)(nxt_app_request_header_t *h, u_char *start, - u_char *end, nxt_app_http_parse_state_t *state); -}; - - -typedef struct { - nxt_work_t work; - nxt_buf_t buf; -} nxt_app_buf_t; - - -static nxt_int_t nxt_app_http_parse_request(nxt_app_request_t *r, u_char *buf, - size_t size); -static nxt_int_t nxt_app_http_parse_request_line(nxt_app_request_header_t *h, - u_char *start, u_char *end, nxt_app_http_parse_state_t *state); -static nxt_int_t nxt_app_http_parse_field_value(nxt_app_request_header_t *h, - u_char *start, u_char *end, nxt_app_http_parse_state_t *state); -static nxt_int_t nxt_app_http_parse_field_name(nxt_app_request_header_t *h, - u_char *start, u_char *end, nxt_app_http_parse_state_t *state); - -static nxt_int_t nxt_app_http_process_headers(nxt_app_request_t *r); - - -static const nxt_event_conn_state_t nxt_app_delivery_write_state; - nxt_application_module_t *nxt_app; static nxt_thread_mutex_t nxt_app_mutex; static nxt_thread_cond_t nxt_app_cond; -static nxt_buf_t *nxt_app_buf_free; -static nxt_buf_t *nxt_app_buf_done; - -static nxt_event_engine_t *nxt_app_engine; -static nxt_mp_t *nxt_app_mem_pool; - -static nxt_uint_t nxt_app_buf_current_number; -static nxt_uint_t nxt_app_buf_max_number = 16; - +static nxt_http_fields_hash_entry_t nxt_app_request_fields[]; +static nxt_http_fields_hash_t *nxt_app_request_fields_hash; nxt_int_t nxt_app_start(nxt_task_t *task, nxt_runtime_t *rt) { - nxt_thread_link_t *link; - nxt_thread_handle_t handle; - - if (nxt_app_listen_socket(task, rt) != NXT_OK) { - return NXT_ERROR; - } - if (nxt_slow_path(nxt_thread_mutex_create(&nxt_app_mutex) != NXT_OK)) { return NXT_ERROR; } @@ -95,901 +30,551 @@ nxt_app_start(nxt_task_t *task, nxt_runtime_t *rt) return NXT_ERROR; } - link = nxt_zalloc(sizeof(nxt_thread_link_t)); - - if (nxt_fast_path(link != NULL)) { - link->start = nxt_app_thread; - link->work.data = rt; - - return nxt_thread_create(&handle, link); + if (nxt_slow_path(nxt_app->init(task) != NXT_OK)) { + nxt_debug(task, "application init failed"); } - return NXT_ERROR; + return NXT_OK; } -static nxt_int_t -nxt_app_listen_socket(nxt_task_t *task, nxt_runtime_t *rt) +nxt_int_t +nxt_app_http_init(nxt_task_t *task, nxt_runtime_t *rt) { - nxt_sockaddr_t *sa; - nxt_listen_socket_t *ls; - - sa = nxt_sockaddr_alloc(rt->mem_pool, sizeof(struct sockaddr_in), - NXT_INET_ADDR_STR_LEN); - if (sa == NULL) { - return NXT_ERROR; - } - - sa->type = SOCK_STREAM; - sa->u.sockaddr_in.sin_family = AF_INET; - sa->u.sockaddr_in.sin_port = htons(8080); - - nxt_sockaddr_text(sa); + nxt_http_fields_hash_t *hash; - ls = nxt_runtime_listen_socket_add(rt, sa); - if (ls == NULL) { + hash = nxt_http_fields_hash_create(nxt_app_request_fields, rt->mem_pool); + if (nxt_slow_path(hash == NULL)) { return NXT_ERROR; } - ls->read_after_accept = 1; - - if (nxt_listen_socket_create(task, ls, 0) != NXT_OK) { - return NXT_ERROR; - } + nxt_app_request_fields_hash = hash; return NXT_OK; } -#define SIZE 4096 - -static void -nxt_app_thread(void *ctx) +void +nxt_port_app_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { - ssize_t n; - nxt_err_t err; - nxt_socket_t s; - nxt_thread_t *thr; - nxt_runtime_t *rt; - nxt_queue_link_t *link; - nxt_app_request_t *r; - nxt_listen_socket_t *ls; - u_char buf[SIZE]; - const size_t size = SIZE; - nxt_app_header_field_t fields[128]; - - thr = nxt_thread(); - - nxt_log_debug(thr->log, "app thread"); - - rt = ctx; - - link = nxt_queue_first(&rt->engines); - nxt_app_engine = nxt_queue_link_data(link, nxt_event_engine_t, link); - - nxt_app_mem_pool = nxt_mp_create(1024, 128, 256, 32); - if (nxt_slow_path(nxt_app_mem_pool == NULL)) { - return; - } - - if (nxt_slow_path(nxt_app->init(thr) != NXT_OK)) { - nxt_log_debug(thr->log, "application init failed"); - } - - ls = rt->listen_sockets->elts; - - for ( ;; ) { - nxt_log_debug(thr->log, "wait on accept"); - - s = accept(ls->socket, NULL, NULL); - - nxt_thread_time_update(thr); - - if (nxt_slow_path(s == -1)) { - err = nxt_socket_errno; - - nxt_log_error(NXT_LOG_ERR, thr->log, "accept(%d) failed %E", - ls->socket, err); - - if (err == EBADF) { - /* STUB: ls->socket has been closed on exit. */ - return; - } - - continue; - } - - nxt_log_debug(thr->log, "accept(%d): %d", ls->socket, s); - - n = recv(s, buf, size, 0); - - if (nxt_slow_path(n <= 0)) { - err = (n == 0) ? 0 : nxt_socket_errno; - - nxt_log_error(NXT_LOG_ERR, thr->log, "recv(%d, %uz) failed %E", - s, size, err); - close(s); - continue; - } - - nxt_log_debug(thr->log, "recv(%d, %uz): %z", s, size, n); - - r = nxt_app_request_create(s, thr->log); - if (nxt_slow_path(r == NULL)) { - goto fail; - } - - r->header.fields = fields; - - //nxt_app->start(r); - - if (nxt_app_http_parse_request(r, buf, n) != NXT_OK) { - nxt_log_debug(thr->log, "nxt_app_http_parse_request() failed"); - nxt_mp_destroy(r->mem_pool); - goto fail; - } - - if (nxt_app_http_process_headers(r) != NXT_OK) { - nxt_log_debug(thr->log, "nxt_app_http_process_headers() failed"); - nxt_mp_destroy(r->mem_pool); - goto fail; - } - - nxt_app->run(r); - - nxt_log_debug(thr->log, "app request done"); - - if (nxt_slow_path(nxt_app_write_finish(r) == NXT_ERROR)) { - goto fail; - } - - continue; - - fail: - - close(s); - nxt_nanosleep(1000000000); /* 1s */ - } -} - + size_t dump_size; + nxt_buf_t *b; + nxt_port_t *port; + nxt_app_rmsg_t rmsg = { msg->buf }; + nxt_app_wmsg_t wmsg; -static nxt_app_request_t * -nxt_app_request_create(nxt_socket_t s, nxt_log_t *log) -{ - nxt_mp_t *mp; - nxt_conn_t *c; - nxt_app_request_t *r; + b = msg->buf; + dump_size = b->mem.free - b->mem.pos; - mp = nxt_mp_create(1024, 128, 256, 32); - if (nxt_slow_path(mp == NULL)) { - return NULL; + if (dump_size > 300) { + dump_size = 300; } - r = nxt_mp_zalloc(mp, sizeof(nxt_app_request_t)); - if (nxt_slow_path(r == NULL)) { - return NULL; - } + nxt_debug(task, "app data: %*s ...", dump_size, b->mem.pos); - c = nxt_mp_zalloc(mp, sizeof(nxt_conn_t)); - if (nxt_slow_path(c == NULL)) { - return NULL; + port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid, + msg->port_msg.reply_port); + if (nxt_slow_path(port == NULL)) { + // } - c->socket.fd = s; - c->socket.data = r; - - c->task.thread = nxt_thread(); - c->task.log = log; - c->task.ident = log->ident; - c->socket.task = &c->task; - c->read_timer.task = &c->task; - c->write_timer.task = &c->task; + wmsg.port = port; + wmsg.write = NULL; + wmsg.buf = &wmsg.write; + wmsg.stream = msg->port_msg.stream; - r->mem_pool = mp; - r->event_conn = c; - r->log = log; - - return r; + nxt_app->run(task, &rmsg, &wmsg); } -static nxt_int_t -nxt_app_http_parse_request(nxt_app_request_t *r, u_char *buf, size_t size) +nxt_inline nxt_port_t * +nxt_app_msg_get_port(nxt_task_t *task, nxt_app_wmsg_t *msg) { - u_char *end; - ssize_t n; - nxt_err_t err; - nxt_socket_t s; - nxt_app_http_parse_state_t state; - - end = buf + size; + return msg->port; +} - state.pos = buf; - state.handler = nxt_app_http_parse_request_line; - for ( ;; ) { - switch (state.handler(&r->header, state.pos, end, &state)) { +u_char * +nxt_app_msg_write_get_buf(nxt_task_t *task, nxt_app_wmsg_t *msg, size_t size) +{ + size_t free_size; + u_char *res; + nxt_buf_t *b; + nxt_port_t *port; - case NXT_OK: - continue; + res = NULL; - case NXT_DONE: - r->body_preread.length = end - state.pos; - r->body_preread.start = state.pos; + do { + b = *msg->buf; - return NXT_OK; + if (b == NULL) { + port = nxt_app_msg_get_port(task, msg); + if (nxt_slow_path(port == NULL)) { + return NULL; + } - case NXT_AGAIN: - s = r->event_conn->socket.fd; - n = recv(s, end, SIZE - size, 0); + b = nxt_port_mmap_get_buf(task, port, size); + if (nxt_slow_path(b == NULL)) { + return NULL; + } - if (nxt_slow_path(n <= 0)) { - err = (n == 0) ? 0 : nxt_socket_errno; + *msg->buf = b; - nxt_log_error(NXT_LOG_ERR, r->log, "recv(%d, %uz) failed %E", - s, size, err); + free_size = nxt_buf_mem_free_size(&b->mem); - return NXT_ERROR; + if (nxt_slow_path(free_size < size)) { + nxt_debug(task, "requested buffer too big (%z < %z)", + free_size, size); + return NULL; } - nxt_log_debug(r->log, "recv(%d, %uz): %z", s, SIZE - size, n); - - size += n; - end += n; - - continue; } - return NXT_ERROR; - } -} - - -static nxt_int_t -nxt_app_http_parse_request_line(nxt_app_request_header_t *h, u_char *start, - u_char *end, nxt_app_http_parse_state_t *state) -{ - u_char *p; + free_size = nxt_buf_mem_free_size(&b->mem); - for (p = start; /* void */; p++) { + if (free_size >= size) { + res = b->mem.free; + b->mem.free += size; - if (nxt_slow_path(p == end)) { - state->pos = p; - return NXT_AGAIN; + return res; } - if (*p == ' ') { - break; - } - } - - h->method.length = p - start; - h->method.start = start; - - start = p + 1; - - p = nxt_memchr(start, ' ', end - start); - - if (nxt_slow_path(p == NULL)) { - return NXT_AGAIN; - } - - h->path.length = p - start; - h->path.start = start; - - start = p + 1; - - if (nxt_slow_path((size_t) (end - start) < sizeof("HTTP/1.1\n") - 1)) { - return NXT_AGAIN; - } - - h->version.length = sizeof("HTTP/1.1") - 1; - h->version.start = start; - - p = start + sizeof("HTTP/1.1") - 1; + if (nxt_port_mmap_increase_buf(task, b, size) == NXT_OK) { + res = b->mem.free; + b->mem.free += size; - if (nxt_slow_path(*p == '\n')) { - return nxt_app_http_parse_field_name(h, p + 1, end, state); - } - - if (nxt_slow_path(end - p < 2)) { - return NXT_AGAIN; - } + return res; + } - return nxt_app_http_parse_field_name(h, p + 2, end, state); + msg->buf = &b->next; + } while(1); } -static nxt_int_t -nxt_app_http_parse_field_name(nxt_app_request_header_t *h, u_char *start, - u_char *end, nxt_app_http_parse_state_t *state) +nxt_int_t +nxt_app_msg_write(nxt_task_t *task, nxt_app_wmsg_t *msg, u_char *c, size_t size) { - u_char *p; - nxt_app_header_field_t *fld; - - if (nxt_slow_path(start == end)) { - goto again; - } + u_char *dst; + size_t dst_length; - if (nxt_slow_path(*start == '\n')) { - state->pos = start + 1; - return NXT_DONE; - } + if (c != NULL) { + dst_length = size + (size < 128 ? 1 : 4) + 1; - if (*start == '\r') { - if (nxt_slow_path(end - start < 2)) { - goto again; - } - - if (nxt_slow_path(start[1] != '\n')) { + dst = nxt_app_msg_write_get_buf(task, msg, dst_length); + if (nxt_slow_path(dst == NULL)) { + nxt_debug(task, "nxt_app_msg_write: get_buf(%uz) failed", + dst_length); return NXT_ERROR; } - state->pos = start + 2; - return NXT_DONE; - } + dst = nxt_app_msg_write_length(dst, size + 1); /* +1 for trailing 0 */ - p = nxt_memchr(start, ':', end - start); + nxt_memcpy(dst, c, size); + dst[size] = 0; - if (nxt_slow_path(p == NULL)) { - goto again; - } - - fld = &h->fields[h->fields_num]; - - fld->name.length = p - start; - fld->name.start = start; + nxt_debug(task, "nxt_app_msg_write: %uz %*s", size, (int)size, c); + } else { + dst_length = 1; - return nxt_app_http_parse_field_value(h, p + 1, end, state); + dst = nxt_app_msg_write_get_buf(task, msg, dst_length); + if (nxt_slow_path(dst == NULL)) { + nxt_debug(task, "nxt_app_msg_write: get_buf(%uz) failed", + dst_length); + return NXT_ERROR; + } -again: + dst = nxt_app_msg_write_length(dst, 0); - state->pos = start; - state->handler = nxt_app_http_parse_field_name; + nxt_debug(task, "nxt_app_msg_write: NULL"); + } - return NXT_AGAIN; + return NXT_OK; } -static nxt_int_t -nxt_app_http_parse_field_value(nxt_app_request_header_t *h, u_char *start, - u_char *end, nxt_app_http_parse_state_t *state) +nxt_int_t +nxt_app_msg_write_prefixed_upcase(nxt_task_t *task, nxt_app_wmsg_t *msg, + const nxt_str_t *prefix, const nxt_str_t *v) { - u_char *p; - nxt_app_header_field_t *fld; - - for ( ;; ) { - if (nxt_slow_path(start == end)) { - goto again; - } - - if (*start != ' ') { - break; - } + u_char *dst, *src; + size_t i, length, dst_length; - start++; - } + length = prefix->length + v->length; - p = nxt_memchr(start, '\n', end - start); + dst_length = length + (length < 128 ? 1 : 4) + 1; - if (nxt_slow_path(p == NULL)) { - goto again; + dst = nxt_app_msg_write_get_buf(task, msg, dst_length); + if (nxt_slow_path(dst == NULL)) { + return NXT_ERROR; } - fld = &h->fields[h->fields_num]; - - fld->value.length = p - start; - fld->value.start = start; - - fld->value.length -= (p[-1] == '\r'); - - h->fields_num++; - - state->pos = p + 1; - state->handler = nxt_app_http_parse_field_name; - - return NXT_OK; - -again: - - state->pos = start; - state->handler = nxt_app_http_parse_field_value; - - return NXT_AGAIN; -} - + dst = nxt_app_msg_write_length(dst, length + 1); /* +1 for trailing 0 */ -static nxt_int_t -nxt_app_http_process_headers(nxt_app_request_t *r) -{ - nxt_uint_t i; - nxt_app_header_field_t *fld; + nxt_memcpy(dst, prefix->start, prefix->length); + dst += prefix->length; - static const u_char content_length[14] = "Content-Length"; - static const u_char content_type[12] = "Content-Type"; + src = v->start; + for (i = 0; i < v->length; i++, dst++, src++) { - for (i = 0; i < r->header.fields_num; i++) { - fld = &r->header.fields[i]; - - if (fld->name.length == sizeof(content_length) - && nxt_memcasecmp(fld->name.start, content_length, - sizeof(content_length)) == 0) - { - r->header.content_length = &fld->value; - r->body_rest = nxt_off_t_parse(fld->value.start, fld->value.length); + if (*src >= 'a' && *src <= 'z') { + *dst = *src & ~0x20; continue; } - if (fld->name.length == sizeof(content_type) - && nxt_memcasecmp(fld->name.start, content_type, - sizeof(content_type)) == 0) - { - r->header.content_type = &fld->value; + if (*src == '-') { + *dst = '_'; continue; } - } - return NXT_OK; -} - - -static void -nxt_app_conn_update(nxt_thread_t *thr, nxt_conn_t *c, nxt_log_t *log) -{ - c->socket.write_ready = 1; - - c->socket.log = &c->log; - c->log = *log; - - /* The while loop skips possible uint32_t overflow. */ - - while (c->log.ident == 0) { - c->log.ident = nxt_task_next_ident(); + *dst = *src; } - thr->engine->connections++; - - c->task.thread = thr; - c->task.log = &c->log; - c->task.ident = c->log.ident; + *dst = 0; - c->io = thr->engine->event.io; - c->max_chunk = NXT_INT32_T_MAX; - c->sendfile = NXT_CONN_SENDFILE_UNSET; - - c->socket.read_work_queue = &thr->engine->read_work_queue; - c->socket.write_work_queue = &thr->engine->write_work_queue; - c->read_work_queue = &thr->engine->read_work_queue; - c->write_work_queue = &thr->engine->write_work_queue; - - nxt_conn_timer_init(&c->read_timer, c, c->socket.read_work_queue); - nxt_conn_timer_init(&c->write_timer, c, c->socket.write_work_queue); - - nxt_log_debug(&c->log, "event connections: %uD", thr->engine->connections); + return NXT_OK; } nxt_int_t -nxt_app_http_read_body(nxt_app_request_t *r, u_char *start, size_t length) +nxt_app_msg_read_str(nxt_task_t *task, nxt_app_rmsg_t *msg, nxt_str_t *str) { - size_t preread; - ssize_t n; - nxt_err_t err; + size_t length; + nxt_buf_t *buf; - if ((off_t) length > r->body_rest) { - length = (size_t) r->body_rest; - } + do { + buf = msg->buf; - preread = 0; - - if (r->body_preread.length != 0) { - preread = nxt_min(r->body_preread.length, length); + if (nxt_slow_path(buf == NULL)) { + return NXT_DONE; + } - nxt_memcpy(start, r->body_preread.start, preread); + if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) < 1)) { + if (nxt_fast_path(nxt_buf_mem_used_size(&buf->mem) == 0)) { + msg->buf = buf->next; + continue; + } + return NXT_ERROR; + } - r->body_preread.length -= preread; - r->body_preread.start += preread; + if (buf->mem.pos[0] >= 128) { + if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) < 4)) { + return NXT_ERROR; + } + } - r->body_rest -= preread; + break; + } while (1); - length -= preread; - } + buf->mem.pos = nxt_app_msg_read_length(buf->mem.pos, &length); - if (length == 0) { - return NXT_OK; + if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) < (intptr_t)length)) + { + return NXT_ERROR; } - n = recv(r->event_conn->socket.fd, start + preread, length, 0); - - if (nxt_slow_path(n < (ssize_t) length)) { - if (n <= 0) { - err = (n == 0) ? 0 : nxt_socket_errno; + if (length > 0) { + str->start = buf->mem.pos; + str->length = length - 1; - nxt_log_error(NXT_LOG_ERR, r->log, "recv(%d, %uz) failed %E", - r->event_conn->socket.fd, length, err); - - return NXT_ERROR; - } + buf->mem.pos += length; - nxt_log_error(NXT_LOG_ERR, r->log, - "client prematurely closed connection"); + nxt_debug(task, "nxt_read_str: %d %*s", (int)length - 1, + (int)length - 1, str->start); + } else { + str->start = NULL; + str->length = 0; - return NXT_ERROR; + nxt_debug(task, "nxt_read_str: NULL"); } - r->body_rest -= n; - return NXT_OK; } nxt_int_t -nxt_app_write(nxt_app_request_t *r, const u_char *data, size_t length) +nxt_app_msg_read_nvp(nxt_task_t *task, nxt_app_rmsg_t *rmsg, nxt_str_t *n, + nxt_str_t *v) { - void *start; - size_t free; - nxt_err_t err; - nxt_buf_t *b, *out, **next; - nxt_uint_t bufs; - nxt_app_buf_t *ab; - - out = NULL; - next = &out; + nxt_int_t rc; - b = r->output_buf; - - if (b == NULL) { - bufs = 0; - goto get_buf; + rc = nxt_app_msg_read_str(task, rmsg, n); + if (nxt_slow_path(rc != NXT_OK)) { + return rc; } - bufs = 1; - - for ( ;; ) { - free = nxt_buf_mem_free_size(&b->mem); - - if (free > length) { - b->mem.free = nxt_cpymem(b->mem.free, data, length); - break; - } - - b->mem.free = nxt_cpymem(b->mem.free, data, free); + rc = nxt_app_msg_read_str(task, rmsg, v); + if (nxt_slow_path(rc != NXT_OK)) { + return rc; + } - data += free; - length -= free; + return rc; +} - *next = b; - next = &b->next; - if (length == 0) { - b = NULL; - break; - } - - if (bufs == nxt_app_buf_max_number) { - bufs = 0; - *next = NULL; +nxt_int_t +nxt_app_msg_read_size(nxt_task_t *task, nxt_app_rmsg_t *msg, size_t *size) +{ + nxt_buf_t *buf; - nxt_app_buf_send(r->event_conn, out); + do { + buf = msg->buf; - out = NULL; - next = &out; + if (nxt_slow_path(buf == NULL)) { + return NXT_DONE; } - get_buf: - - if (nxt_slow_path(nxt_thread_mutex_lock(&nxt_app_mutex) != NXT_OK)) { - return NXT_ERROR; - } - - for ( ;; ) { - b = nxt_app_buf_free; - - if (b != NULL) { - nxt_app_buf_free = b->next; - break; - } - - if (nxt_app_buf_current_number < nxt_app_buf_max_number) { - break; - } - - err = nxt_thread_cond_wait(&nxt_app_cond, &nxt_app_mutex, - NXT_INFINITE_NSEC); - - if (nxt_slow_path(err != 0)) { - (void) nxt_thread_mutex_unlock(&nxt_app_mutex); - return NXT_ERROR; + if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) < 1)) { + if (nxt_fast_path(nxt_buf_mem_used_size(&buf->mem) == 0)) { + msg->buf = buf->next; + continue; } + return NXT_ERROR; } - (void) nxt_thread_mutex_unlock(&nxt_app_mutex); - - if (b == NULL) { - start = nxt_malloc(4096); - if (nxt_slow_path(start == NULL)) { + if (buf->mem.pos[0] >= 128) { + if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) < 4)) { return NXT_ERROR; } - - ab = nxt_zalloc(sizeof(nxt_app_buf_t)); - if (nxt_slow_path(ab == NULL)) { - return NXT_ERROR; - } - - b = &ab->buf; - - nxt_buf_mem_init(b, start, 4096); - - b->completion_handler = NULL; - - nxt_app_buf_current_number++; } - bufs++; - } + break; + } while (1); - r->output_buf = b; + buf->mem.pos = nxt_app_msg_read_length(buf->mem.pos, size); - if (out != NULL) { - *next = NULL; - - nxt_app_buf_send(r->event_conn, out); - } + nxt_debug(task, "nxt_read_size: %d", (int)*size); return NXT_OK; } static nxt_int_t -nxt_app_write_finish(nxt_app_request_t *r) +nxt_app_request_content_length(void *ctx, nxt_http_field_t *field, + nxt_log_t *log) { - nxt_buf_t *b, *out; + nxt_str_t *v; + nxt_app_parse_ctx_t *c; + nxt_app_request_header_t *h; - b = nxt_buf_sync_alloc(r->mem_pool, NXT_BUF_SYNC_LAST); - if (nxt_slow_path(b == NULL)) { - return NXT_ERROR; - } - - b->completion_handler = NULL; - b->parent = (nxt_buf_t *) r; + c = ctx; + h = &c->r.header; + v = &field->value; - out = r->output_buf; - - if (out != NULL) { - r->output_buf = NULL; - out->next = b; - - } else { - out = b; - } - - nxt_app_buf_send(r->event_conn, out); + h->content_length = *v; + h->parsed_content_length = nxt_off_t_parse(v->start, v->length); return NXT_OK; } -static void -nxt_app_buf_send(nxt_conn_t *c, nxt_buf_t *out) +static nxt_int_t +nxt_app_request_content_type(void *ctx, nxt_http_field_t *field, + nxt_log_t *log) { - nxt_app_buf_t *ab; + nxt_app_parse_ctx_t *c; + nxt_app_request_header_t *h; - ab = nxt_container_of(out, nxt_app_buf_t, buf); + c = ctx; + h = &c->r.header; - nxt_work_set(&ab->work, nxt_app_delivery_handler, &c->task, c, out); + h->content_type = field->value; - nxt_event_engine_post(nxt_app_engine, &ab->work); + return NXT_OK; } -static void -nxt_app_delivery_handler(nxt_task_t *task, void *obj, void *data) +static nxt_int_t +nxt_app_request_cookie(void *ctx, nxt_http_field_t *field, + nxt_log_t *log) { - nxt_mp_t *mp; - nxt_buf_t *b; - nxt_conn_t *c; + nxt_app_parse_ctx_t *c; + nxt_app_request_header_t *h; - c = obj; - b = data; + c = ctx; + h = &c->r.header; - nxt_debug(task, "app delivery handler"); + h->cookie = field->value; - if (c->write != NULL) { - nxt_buf_chain_add(&c->write, b); - return; - } + return NXT_OK; +} - if (c->mem_pool == NULL) { - mp = nxt_mp_create(1024, 128, 256, 32); - if (nxt_slow_path(mp == NULL)) { - close(c->socket.fd); - return; - } - c->mem_pool = mp; - nxt_app_conn_update(task->thread, c, &nxt_main_log); - } +static nxt_int_t +nxt_app_request_host(void *ctx, nxt_http_field_t *field, + nxt_log_t *log) +{ + nxt_app_parse_ctx_t *c; + nxt_app_request_header_t *h; - if (c->socket.timedout || c->socket.error != 0) { - nxt_buf_chain_add(&nxt_app_buf_done, b); - nxt_work_queue_add(c->write_work_queue, nxt_app_delivery_completion, - task, c, NULL); - return; - } + c = ctx; + h = &c->r.header; - c->write = b; - c->write_state = &nxt_app_delivery_write_state; + h->host = field->value; - nxt_conn_write(task->thread->engine, c); + return NXT_OK; } -static const nxt_event_conn_state_t nxt_app_delivery_write_state - nxt_aligned(64) = -{ - .ready_handler = nxt_app_delivery_ready, - .error_handler = nxt_app_delivery_error, +static nxt_http_fields_hash_entry_t nxt_app_request_fields[] = { + { nxt_string("Content-Length"), &nxt_app_request_content_length, 0 }, + { nxt_string("Content-Type"), &nxt_app_request_content_type, 0 }, + { nxt_string("Cookie"), &nxt_app_request_cookie, 0 }, + { nxt_string("Host"), &nxt_app_request_host, 0 }, - .timer_handler = nxt_app_delivery_timeout, - .timer_value = nxt_app_delivery_timer_value, - .timer_data = 0, - .timer_autoreset = 1, + { nxt_null_string, NULL, 0 } }; -static void -nxt_app_delivery_ready(nxt_task_t *task, void *obj, void *data) +nxt_int_t +nxt_app_http_req_init(nxt_task_t *task, nxt_app_parse_ctx_t *ctx) { - nxt_buf_t *b, *next; - nxt_conn_t *c; + nxt_int_t rc; - c = obj; + ctx->mem_pool = nxt_mp_create(1024, 128, 256, 32); - nxt_debug(task, "app delivery ready"); - - for (b = c->write; b != NULL; b = next) { - - if (nxt_buf_is_mem(b)) { - if (b->mem.pos != b->mem.free) { - break; - } - } - - next = b->next; - b->next = nxt_app_buf_done; - nxt_app_buf_done = b; + rc = nxt_http_parse_request_init(&ctx->parser, ctx->mem_pool); + if (nxt_slow_path(rc != NXT_OK)) { + return rc; } - nxt_work_queue_add(c->write_work_queue, - nxt_app_delivery_completion, task, c, NULL); + ctx->parser.fields_hash = nxt_app_request_fields_hash; + + return NXT_OK; } -static const nxt_event_conn_state_t nxt_app_delivery_close_state - nxt_aligned(64) = +nxt_int_t +nxt_app_http_req_parse(nxt_task_t *task, nxt_app_parse_ctx_t *ctx, + nxt_buf_t *buf) { - .ready_handler = nxt_app_close_request, -}; + nxt_int_t rc; + nxt_app_request_body_t *b; + nxt_http_request_parse_t *p; + nxt_app_request_header_t *h; + p = &ctx->parser; + b = &ctx->r.body; + h = &ctx->r.header; -static void -nxt_app_delivery_completion(nxt_task_t *task, void *obj, void *data) -{ - nxt_buf_t *b, *bn, *free; - nxt_conn_t *c; - nxt_app_request_t *r; + if (h->done == 0) { + rc = nxt_http_parse_request(p, &buf->mem); - nxt_debug(task, "app delivery completion"); + if (nxt_slow_path(rc != NXT_DONE)) { + return rc; + } - free = NULL; + rc = nxt_http_fields_process(p->fields, ctx, task->log); - for (b = nxt_app_buf_done; b; b = bn) { - bn = b->next; + if (nxt_slow_path(rc != NXT_OK)) { + return rc; + } - if (nxt_buf_is_mem(b)) { - b->mem.pos = b->mem.start; - b->mem.free = b->mem.start; + h->fields = p->fields; + h->done = 1; - b->next = free; - free = b; + h->version.start = p->version.str; + h->version.length = nxt_strlen(p->version.str); - continue; - } + h->method = p->method; + + h->path.start = p->target_start; + h->path.length = p->target_end - p->target_start; + + h->path_no_query = h->path; - if (nxt_buf_is_last(b)) { - r = (nxt_app_request_t *) b->parent; + if (p->args_start != NULL) { + h->query.start = p->args_start; + h->query.length = p->target_end - p->args_start; - c = r->event_conn; - c->write_state = &nxt_app_delivery_close_state; + if (p->args_start > p->target_start) { + h->path_no_query.length = p->args_start - p->target_start - 1; + } + } - nxt_conn_close(task->thread->engine, c); + if (h->parsed_content_length == 0) { + b->done = 1; } } - nxt_app_buf_done = NULL; + if (b->done == 0) { + b->preread.length = buf->mem.free - buf->mem.pos; + b->preread.start = buf->mem.pos; - if (free == NULL) { - return; + b->done = b->preread.length >= (size_t) h->parsed_content_length; } - if (nxt_slow_path(nxt_thread_mutex_lock(&nxt_app_mutex) != NXT_OK)) { - return; + if (h->done == 1 && b->done == 1) { + return NXT_DONE; } - nxt_buf_chain_add(&nxt_app_buf_free, free); - - (void) nxt_thread_mutex_unlock(&nxt_app_mutex); - - nxt_thread_time_update(task->thread); - - (void) nxt_thread_cond_signal(&nxt_app_cond); + return NXT_AGAIN; } -static void -nxt_app_delivery_error(nxt_task_t *task, void *obj, void *data) +nxt_int_t +nxt_app_http_req_done(nxt_task_t *task, nxt_app_parse_ctx_t *ctx) { - nxt_conn_t *c; - - c = obj; + nxt_mp_destroy(ctx->mem_pool); - nxt_debug(task, "app delivery error"); - - nxt_app_delivery_done(task, c); + return NXT_OK; } -static void -nxt_app_delivery_timeout(nxt_task_t *task, void *obj, void *data) +nxt_int_t +nxt_app_msg_flush(nxt_task_t *task, nxt_app_wmsg_t *msg, nxt_bool_t last) { - nxt_conn_t *c; - - c = obj; - - nxt_debug(task, "app delivery timeout"); + nxt_int_t rc; + nxt_buf_t *b; + nxt_port_t *port; - nxt_app_delivery_done(task, c); -} + rc = NXT_OK; + port = nxt_app_msg_get_port(task, msg); + if (nxt_slow_path(port == NULL)) { + return NXT_ERROR; + } -static nxt_msec_t -nxt_app_delivery_timer_value(nxt_conn_t *c, uintptr_t data) -{ - /* 30000 ms */ - return 30000; -} + if (nxt_slow_path(last == 1)) { + do { + b = *msg->buf; + if (b == NULL) { + b = nxt_buf_sync_alloc(port->mem_pool, NXT_BUF_SYNC_LAST); + *msg->buf = b; + break; + } -static void -nxt_app_delivery_done(nxt_task_t *task, nxt_conn_t *c) -{ - if (c->write == NULL) { - return; + msg->buf = &b->next; + } while(1); } - nxt_debug(task, "app delivery done"); + if (nxt_slow_path(msg->write != NULL)) { + rc = nxt_port_socket_write(task, port, NXT_PORT_MSG_DATA, + -1, msg->stream, 0, msg->write); - nxt_buf_chain_add(&nxt_app_buf_done, c->write); - - c->write = NULL; + msg->write = NULL; + msg->buf = &msg->write; + } - nxt_work_queue_add(c->write_work_queue, - nxt_app_delivery_completion, task, c, NULL); + return rc; } -static void -nxt_app_close_request(nxt_task_t *task, void *obj, void *data) +nxt_int_t +nxt_app_msg_write_raw(nxt_task_t *task, nxt_app_wmsg_t *msg, const u_char *c, + size_t size) { - nxt_conn_t *c; - nxt_app_request_t *r; + u_char *dst; - c = obj; + dst = nxt_app_msg_write_get_buf(task, msg, size); + if (nxt_slow_path(dst == NULL)) { + return NXT_ERROR; + } - nxt_debug(task, "app close connection"); + nxt_memcpy(dst, c, size); - r = c->socket.data; + nxt_debug(task, "nxt_app_msg_write_raw: %d %*s", (int)size, + (int)size, c); - nxt_mp_destroy(c->mem_pool); - nxt_mp_destroy(r->mem_pool); + return NXT_OK; } |