diff options
author | Igor Sysoev <igor@sysoev.ru> | 2017-01-17 20:00:00 +0300 |
---|---|---|
committer | Igor Sysoev <igor@sysoev.ru> | 2017-01-17 20:00:00 +0300 |
commit | 16cbf3c076a0aca6d47adaf3f719493674cf2363 (patch) | |
tree | e6530480020f62a2bdbf249988ec3e2a751d3927 /src/nxt_application.c | |
download | unit-16cbf3c076a0aca6d47adaf3f719493674cf2363.tar.gz unit-16cbf3c076a0aca6d47adaf3f719493674cf2363.tar.bz2 |
Initial version.
Diffstat (limited to 'src/nxt_application.c')
-rw-r--r-- | src/nxt_application.c | 903 |
1 files changed, 903 insertions, 0 deletions
diff --git a/src/nxt_application.c b/src/nxt_application.c new file mode 100644 index 00000000..08d32b37 --- /dev/null +++ b/src/nxt_application.c @@ -0,0 +1,903 @@ + +/* + * Copyright (C) Igor Sysoev + * Copyright (C) Valentin V. Bartenev + * Copyright (C) NGINX, Inc. + */ + +#include <nxt_main.h> +#include <nxt_cycle.h> +#include <nxt_application.h> + + +#define NXT_PARSE_AGAIN (u_char *) -1 + + +static void nxt_app_thread(void *ctx); +static nxt_app_request_t *nxt_app_request_create(nxt_socket_t s, + nxt_log_t *log); +static void nxt_app_conn_update(nxt_thread_t *thr, nxt_event_conn_t *c, + nxt_log_t *log); +static nxt_int_t nxt_app_write_finish(nxt_app_request_t *r); +static void nxt_app_buf_complettion(nxt_thread_t *thr, void *obj, void *data); +static void nxt_app_delivery_handler(nxt_thread_t *thr, void *obj, void *data); +static void nxt_app_delivery_ready(nxt_thread_t *thr, void *obj, void *data); +static void nxt_app_delivery_complettion(nxt_thread_t *thr, void *obj, + void *data); +static void nxt_app_delivery_error(nxt_thread_t *thr, void *obj, void *data); +static void nxt_app_delivery_timeout(nxt_thread_t *thr, void *obj, void *data); +static nxt_msec_t nxt_app_delivery_timer_value(nxt_event_conn_t *c, + uintptr_t data); +static void nxt_app_delivery_done(nxt_thread_t *thr, nxt_event_conn_t *c); +static void nxt_app_close_request(nxt_thread_t *thr, nxt_app_request_t *r); + + +typedef struct nxt_app_http_parse_state_s nxt_app_http_parse_state_t; + +struct nxt_app_http_parse_state_s { + u_char *pos; + nxt_int_t (*handler)(nxt_app_request_header_t *h, u_char *start, + u_char *end, nxt_app_http_parse_state_t *state); +}; + +static nxt_int_t nxt_app_http_parse_request(nxt_app_request_t *r, u_char *buf, + size_t size); +static nxt_int_t nxt_app_http_parse_request_line(nxt_app_request_header_t *h, + u_char *start, u_char *end, nxt_app_http_parse_state_t *state); +static nxt_int_t nxt_app_http_parse_field_value(nxt_app_request_header_t *h, + u_char *start, u_char *end, nxt_app_http_parse_state_t *state); +static nxt_int_t nxt_app_http_parse_field_name(nxt_app_request_header_t *h, + u_char *start, u_char *end, nxt_app_http_parse_state_t *state); + +static nxt_int_t nxt_app_http_process_headers(nxt_app_request_t *r); + + +static const nxt_event_conn_state_t nxt_app_delivery_write_state; + +static nxt_application_module_t *nxt_app = &nxt_python_module; + +static nxt_thread_mutex_t nxt_app_mutex; +static nxt_thread_cond_t nxt_app_cond; + +static nxt_buf_t *nxt_app_buf_free; +static nxt_buf_t *nxt_app_buf_done; + +static nxt_event_engine_t *nxt_app_engine; +static nxt_mem_pool_t *nxt_app_mem_pool; + +static nxt_uint_t nxt_app_buf_current_number; +static nxt_uint_t nxt_app_buf_max_number = 16; + + +nxt_int_t +nxt_app_start(nxt_cycle_t *cycle) +{ + nxt_thread_link_t *link; + nxt_thread_handle_t handle; + + if (nxt_slow_path(nxt_thread_mutex_create(&nxt_app_mutex) != NXT_OK)) { + return NXT_ERROR; + } + + if (nxt_slow_path(nxt_thread_cond_create(&nxt_app_cond) != NXT_OK)) { + return NXT_ERROR; + } + + link = nxt_malloc(sizeof(nxt_thread_link_t)); + + if (nxt_fast_path(link != NULL)) { + link->start = nxt_app_thread; + link->data = cycle; + link->engine = NULL; + link->exit = NULL; + + return nxt_thread_create(&handle, link); + } + + return NXT_ERROR; +} + + +#define SIZE 4096 + +static void +nxt_app_thread(void *ctx) +{ + ssize_t n; + nxt_err_t err; + nxt_cycle_t *cycle; + nxt_socket_t s; + nxt_thread_t *thr; + nxt_app_request_t *r; + nxt_event_engine_t **engines; + nxt_listen_socket_t *ls; + u_char buf[SIZE]; + const size_t size = SIZE; + nxt_app_header_field_t fields[128]; + + thr = nxt_thread(); + + nxt_log_debug(thr->log, "app thread"); + + cycle = ctx; + engines = cycle->engines->elts; + + nxt_app_engine = engines[0]; + + nxt_app_mem_pool = nxt_mem_pool_create(512); + if (nxt_slow_path(nxt_app_mem_pool == NULL)) { + return; + } + + if (nxt_slow_path(nxt_app->init(thr) != NXT_OK)) { + nxt_log_debug(thr->log, "application init failed"); + } + + ls = cycle->listen_sockets->elts; + + for ( ;; ) { + s = accept(ls->socket, NULL, NULL); + + if (nxt_slow_path(s == -1)) { + err = nxt_socket_errno; + + nxt_log_error(NXT_LOG_ERR, thr->log, "accept(%d) failed %E", + ls->socket, err); + + if (err == EBADF) { + /* STUB: ls->socket has been closed on exit. */ + return; + } + + continue; + } + + nxt_log_debug(thr->log, "accept(%d): %d", ls->socket, s); + + n = recv(s, buf, size, 0); + + if (nxt_slow_path(n <= 0)) { + err = (n == 0) ? 0 : nxt_socket_errno; + + nxt_log_error(NXT_LOG_ERR, thr->log, "recv(%d, %uz) failed %E", + s, size, err); + close(s); + continue; + } + + nxt_log_debug(thr->log, "recv(%d, %uz): %z", s, size, n); + + r = nxt_app_request_create(s, thr->log); + if (nxt_slow_path(r == NULL)) { + goto fail; + } + + r->header.fields = fields; + + //nxt_app->start(r); + + if (nxt_app_http_parse_request(r, buf, n) != NXT_OK) { + nxt_log_debug(thr->log, "nxt_app_http_parse_request() failed"); + nxt_mem_pool_destroy(r->mem_pool); + goto fail; + } + + if (nxt_app_http_process_headers(r) != NXT_OK) { + nxt_log_debug(thr->log, "nxt_app_http_process_headers() failed"); + nxt_mem_pool_destroy(r->mem_pool); + goto fail; + } + + nxt_app->run(r); + + if (nxt_slow_path(nxt_app_write_finish(r) == NXT_ERROR)) { + goto fail; + } + + continue; + + fail: + + close(s); + nxt_nanosleep(1000000000); /* 1s */ + } +} + + +static nxt_app_request_t * +nxt_app_request_create(nxt_socket_t s, nxt_log_t *log) +{ + nxt_mem_pool_t *mp; + nxt_event_conn_t *c; + nxt_app_request_t *r; + + mp = nxt_mem_pool_create(1024); + if (nxt_slow_path(mp == NULL)) { + return NULL; + } + + r = nxt_mem_zalloc(mp, sizeof(nxt_app_request_t)); + if (nxt_slow_path(r == NULL)) { + return NULL; + } + + c = nxt_mem_zalloc(mp, sizeof(nxt_event_conn_t)); + if (nxt_slow_path(c == NULL)) { + return NULL; + } + + c->socket.fd = s; + c->socket.data = r; + + r->mem_pool = mp; + r->event_conn = c; + r->log = log; + + return r; +} + + +static nxt_int_t +nxt_app_http_parse_request(nxt_app_request_t *r, u_char *buf, size_t size) +{ + u_char *end; + ssize_t n; + nxt_err_t err; + nxt_socket_t s; + nxt_app_http_parse_state_t state; + + end = buf + size; + + state.pos = buf; + state.handler = nxt_app_http_parse_request_line; + + for ( ;; ) { + switch (state.handler(&r->header, state.pos, end, &state)) { + + case NXT_OK: + continue; + + case NXT_DONE: + r->body_preread.len = end - state.pos; + r->body_preread.data = state.pos; + + return NXT_OK; + + case NXT_AGAIN: + s = r->event_conn->socket.fd; + n = recv(s, end, SIZE - size, 0); + + if (nxt_slow_path(n <= 0)) { + err = (n == 0) ? 0 : nxt_socket_errno; + + nxt_log_error(NXT_LOG_ERR, r->log, "recv(%d, %uz) failed %E", + s, size, err); + + return NXT_ERROR; + } + + nxt_log_debug(r->log, "recv(%d, %uz): %z", s, SIZE - size, n); + + size += n; + end += n; + + continue; + } + + return NXT_ERROR; + } +} + + +static nxt_int_t +nxt_app_http_parse_request_line(nxt_app_request_header_t *h, u_char *start, + u_char *end, nxt_app_http_parse_state_t *state) +{ + u_char *p; + + for (p = start; /* void */; p++) { + + if (nxt_slow_path(p == end)) { + state->pos = p; + return NXT_AGAIN; + } + + if (*p == ' ') { + break; + } + } + + h->method.len = p - start; + h->method.data = start; + + start = p + 1; + + p = nxt_memchr(start, ' ', end - start); + + if (nxt_slow_path(p == NULL)) { + return NXT_AGAIN; + } + + h->path.len = p - start; + h->path.data = start; + + start = p + 1; + + if (nxt_slow_path((size_t) (end - start) < sizeof("HTTP/1.1\n") - 1)) { + return NXT_AGAIN; + } + + h->version.len = sizeof("HTTP/1.1") - 1; + h->version.data = start; + + p = start + sizeof("HTTP/1.1") - 1; + + if (nxt_slow_path(*p == '\n')) { + return nxt_app_http_parse_field_name(h, p + 1, end, state); + } + + if (nxt_slow_path(end - p < 2)) { + return NXT_AGAIN; + } + + return nxt_app_http_parse_field_name(h, p + 2, end, state); +} + + +static nxt_int_t +nxt_app_http_parse_field_name(nxt_app_request_header_t *h, u_char *start, + u_char *end, nxt_app_http_parse_state_t *state) +{ + u_char *p; + nxt_app_header_field_t *fld; + + if (nxt_slow_path(start == end)) { + goto again; + } + + if (nxt_slow_path(*start == '\n')) { + state->pos = start + 1; + return NXT_DONE; + } + + if (*start == '\r') { + if (nxt_slow_path(end - start < 2)) { + goto again; + } + + if (nxt_slow_path(start[1] != '\n')) { + return NXT_ERROR; + } + + state->pos = start + 2; + return NXT_DONE; + } + + p = nxt_memchr(start, ':', end - start); + + if (nxt_slow_path(p == NULL)) { + goto again; + } + + fld = &h->fields[h->fields_num]; + + fld->name.len = p - start; + fld->name.data = start; + + return nxt_app_http_parse_field_value(h, p + 1, end, state); + +again: + + state->pos = start; + state->handler = nxt_app_http_parse_field_name; + + return NXT_AGAIN; +} + + +static nxt_int_t +nxt_app_http_parse_field_value(nxt_app_request_header_t *h, u_char *start, + u_char *end, nxt_app_http_parse_state_t *state) +{ + u_char *p; + nxt_app_header_field_t *fld; + + for ( ;; ) { + if (nxt_slow_path(start == end)) { + goto again; + } + + if (*start != ' ') { + break; + } + + start++; + } + + p = nxt_memchr(start, '\n', end - start); + + if (nxt_slow_path(p == NULL)) { + goto again; + } + + fld = &h->fields[h->fields_num]; + + fld->value.len = p - start; + fld->value.data = start; + + fld->value.len -= (p[-1] == '\r'); + + h->fields_num++; + + state->pos = p + 1; + state->handler = nxt_app_http_parse_field_name; + + return NXT_OK; + +again: + + state->pos = start; + state->handler = nxt_app_http_parse_field_value; + + return NXT_AGAIN; +} + + +static nxt_int_t +nxt_app_http_process_headers(nxt_app_request_t *r) +{ + nxt_uint_t i; + nxt_app_header_field_t *fld; + + static const u_char content_length[14] = "Content-Length"; + static const u_char content_type[12] = "Content-Type"; + + for (i = 0; i < r->header.fields_num; i++) { + fld = &r->header.fields[i]; + + if (fld->name.len == sizeof(content_length) + && nxt_memcasecmp(fld->name.data, content_length, + sizeof(content_length)) == 0) + { + r->header.content_length = &fld->value; + r->body_rest = nxt_off_t_parse(fld->value.data, fld->value.len); + continue; + } + + if (fld->name.len == sizeof(content_type) + && nxt_memcasecmp(fld->name.data, content_type, + sizeof(content_type)) == 0) + { + r->header.content_type = &fld->value; + continue; + } + } + + return NXT_OK; +} + + +static void +nxt_app_conn_update(nxt_thread_t *thr, nxt_event_conn_t *c, nxt_log_t *log) +{ + static nxt_atomic_t ident = 1; + + c->socket.write_ready = 1; + + c->socket.log = &c->log; + c->log = *log; + + /* The while loop skips possible uint32_t overflow. */ + + while (c->log.ident == 0) { + c->log.ident = (uint32_t) nxt_atomic_fetch_add(&ident, 1); + } + + thr->engine->connections++; + + c->io = thr->engine->event->io; + c->max_chunk = NXT_INT32_T_MAX; + c->sendfile = NXT_CONN_SENDFILE_UNSET; + + c->socket.read_work_queue = &thr->engine->read_work_queue; + c->socket.write_work_queue = &thr->engine->write_work_queue; + c->read_work_queue = &thr->engine->read_work_queue; + c->write_work_queue = &thr->engine->write_work_queue; + + nxt_event_conn_timer_init(&c->read_timer, c, c->socket.read_work_queue); + nxt_event_conn_timer_init(&c->write_timer, c, c->socket.write_work_queue); + + nxt_log_debug(&c->log, "event connections: %uD", thr->engine->connections); +} + + +nxt_int_t +nxt_app_http_read_body(nxt_app_request_t *r, u_char *data, size_t len) +{ + size_t preread; + ssize_t n; + nxt_err_t err; + + if ((off_t) len > r->body_rest) { + len = (size_t) r->body_rest; + } + + preread = 0; + + if (r->body_preread.len != 0) { + preread = nxt_min(r->body_preread.len, len); + + nxt_memcpy(data, r->body_preread.data, preread); + + r->body_preread.len -= preread; + r->body_preread.data += preread; + + r->body_rest -= preread; + + len -= preread; + } + + if (len == 0) { + return NXT_OK; + } + + n = recv(r->event_conn->socket.fd, data + preread, len, 0); + + if (nxt_slow_path(n < (ssize_t) len)) { + if (n <= 0) { + err = (n == 0) ? 0 : nxt_socket_errno; + + nxt_log_error(NXT_LOG_ERR, r->log, "recv(%d, %uz) failed %E", + r->event_conn->socket.fd, len, err); + + return NXT_ERROR; + } + + nxt_log_error(NXT_LOG_ERR, r->log, + "client prematurely closed connection"); + + return NXT_ERROR; + } + + r->body_rest -= n; + + return NXT_OK; +} + + +nxt_int_t +nxt_app_write(nxt_app_request_t *r, const u_char *data, size_t len) +{ + size_t free; + nxt_err_t err; + nxt_buf_t *b, *out, **next; + nxt_uint_t bufs; + + out = NULL; + next = &out; + + b = r->output_buf; + + if (b == NULL) { + bufs = 0; + goto get_buf; + } + + bufs = 1; + + for ( ;; ) { + free = nxt_buf_mem_free_size(&b->mem); + + if (free > len) { + b->mem.free = nxt_cpymem(b->mem.free, data, len); + break; + } + + b->mem.free = nxt_cpymem(b->mem.free, data, free); + + data += free; + len -= free; + + *next = b; + next = &b->next; + + if (len == 0) { + b = NULL; + break; + } + + if (bufs == nxt_app_buf_max_number) { + bufs = 0; + *next = NULL; + + nxt_event_engine_post(nxt_app_engine, nxt_app_delivery_handler, + r->event_conn, out, &nxt_main_log); + + out = NULL; + next = &out; + } + + get_buf: + + if (nxt_slow_path(nxt_thread_mutex_lock(&nxt_app_mutex) != NXT_OK)) { + return NXT_ERROR; + } + + for ( ;; ) { + b = nxt_app_buf_free; + + if (b != NULL) { + nxt_app_buf_free = b->next; + break; + } + + if (nxt_app_buf_current_number < nxt_app_buf_max_number) { + break; + } + + err = nxt_thread_cond_wait(&nxt_app_cond, &nxt_app_mutex, + NXT_INFINITE_NSEC); + + if (nxt_slow_path(err != 0)) { + (void) nxt_thread_mutex_unlock(&nxt_app_mutex); + return NXT_ERROR; + } + } + + (void) nxt_thread_mutex_unlock(&nxt_app_mutex); + + if (b == NULL) { + b = nxt_buf_mem_alloc(nxt_app_mem_pool, 4096, 0); + if (nxt_slow_path(b == NULL)) { + return NXT_ERROR; + } + + b->completion_handler = nxt_app_buf_complettion; + + nxt_app_buf_current_number++; + } + + bufs++; + } + + r->output_buf = b; + + if (out != NULL) { + *next = NULL; + + nxt_event_engine_post(nxt_app_engine, nxt_app_delivery_handler, + r->event_conn, out, &nxt_main_log); + } + + return NXT_OK; +} + + +static nxt_int_t +nxt_app_write_finish(nxt_app_request_t *r) +{ + nxt_buf_t *b, *out; + + b = nxt_buf_sync_alloc(r->mem_pool, NXT_BUF_SYNC_LAST); + if (nxt_slow_path(b == NULL)) { + return NXT_ERROR; + } + + b->completion_handler = nxt_app_buf_complettion; + b->parent = (nxt_buf_t *) r; + + out = r->output_buf; + + if (out != NULL) { + r->output_buf = NULL; + out->next = b; + + } else { + out = b; + } + + nxt_event_engine_post(nxt_app_engine, nxt_app_delivery_handler, + r->event_conn, out, &nxt_main_log); + + return NXT_OK; +} + + +static void +nxt_app_buf_complettion(nxt_thread_t *thr, void *obj, void *data) +{ + nxt_buf_t *b; + + b = obj; + + nxt_log_debug(thr->log, "app buf completion"); + + b->next = nxt_app_buf_done; + nxt_app_buf_done = b; +} + + +static void +nxt_app_delivery_handler(nxt_thread_t *thr, void *obj, void *data) +{ + nxt_buf_t *b; + nxt_mem_pool_t *mp; + nxt_event_conn_t *c; + + c = obj; + b = data; + + nxt_log_debug(thr->log, "app delivery handler"); + + if (c->write != NULL) { + nxt_buf_chain_add(&c->write, b); + return; + } + + if (c->mem_pool == NULL) { + mp = nxt_mem_pool_create(256); + if (nxt_slow_path(mp == NULL)) { + close(c->socket.fd); + return; + } + + c->mem_pool = mp; + nxt_app_conn_update(thr, c, &nxt_main_log); + } + + if (c->socket.timedout || c->socket.error != 0) { + nxt_buf_chain_add(&nxt_app_buf_done, b); + nxt_thread_work_queue_add(thr, c->write_work_queue, + nxt_app_delivery_complettion, c, NULL, + thr->log); + return; + } + + c->write = b; + c->write_state = &nxt_app_delivery_write_state; + + nxt_event_conn_write(thr, c); +} + + +static const nxt_event_conn_state_t nxt_app_delivery_write_state + nxt_aligned(64) = +{ + NXT_EVENT_BUF_PROCESS, + NXT_EVENT_TIMER_AUTORESET, + + nxt_app_delivery_ready, + NULL, + nxt_app_delivery_error, + + nxt_app_delivery_timeout, + nxt_app_delivery_timer_value, + 0, +}; + + +static void +nxt_app_delivery_ready(nxt_thread_t *thr, void *obj, void *data) +{ + nxt_event_conn_t *c; + + c = obj; + + nxt_thread_work_queue_add(thr, c->write_work_queue, + nxt_app_delivery_complettion, c, NULL, thr->log); +} + + +static void +nxt_app_delivery_complettion(nxt_thread_t *thr, void *obj, void *data) +{ + nxt_buf_t *b, *bn, *free; + nxt_app_request_t *r; + + nxt_log_debug(thr->log, "app delivery complettion"); + + free = NULL; + + for (b = nxt_app_buf_done; b; b = bn) { + bn = b->next; + + if (nxt_buf_is_mem(b)) { + b->mem.pos = b->mem.start; + b->mem.free = b->mem.start; + + b->next = free; + free = b; + + continue; + } + + if (nxt_buf_is_last(b)) { + r = (nxt_app_request_t *) b->parent; + nxt_app_close_request(thr, r); + } + } + + nxt_app_buf_done = NULL; + + if (free == NULL) { + return; + } + + if (nxt_slow_path(nxt_thread_mutex_lock(&nxt_app_mutex) != NXT_OK)) { + return; + } + + nxt_buf_chain_add(&nxt_app_buf_free, free); + + (void) nxt_thread_mutex_unlock(&nxt_app_mutex); + + nxt_thread_time_update(thr); + + (void) nxt_thread_cond_signal(&nxt_app_cond); +} + + +static void +nxt_app_delivery_error(nxt_thread_t *thr, void *obj, void *data) +{ + nxt_event_conn_t *c; + + c = obj; + + nxt_log_debug(thr->log, "app delivery error"); + + nxt_app_delivery_done(thr, c); +} + + +static void +nxt_app_delivery_timeout(nxt_thread_t *thr, void *obj, void *data) +{ + nxt_event_conn_t *c; + + c = obj; + + nxt_log_debug(thr->log, "app delivery timeout"); + + nxt_app_delivery_done(thr, c); +} + + +static nxt_msec_t +nxt_app_delivery_timer_value(nxt_event_conn_t *c, uintptr_t data) +{ + /* 30000 ms */ + return 30000; +} + + +static void +nxt_app_delivery_done(nxt_thread_t *thr, nxt_event_conn_t *c) +{ + if (c->write == NULL) { + return; + } + + nxt_buf_chain_add(&nxt_app_buf_done, c->write); + + c->write = NULL; + + nxt_thread_work_queue_add(thr, c->write_work_queue, + nxt_app_delivery_complettion, c, NULL, thr->log); +} + + +static void +nxt_app_close_request(nxt_thread_t *thr, nxt_app_request_t *r) +{ + nxt_event_conn_t *c; + + nxt_log_debug(thr->log, "app close connection"); + + c = r->event_conn; + + nxt_event_conn_close(thr, c); + + nxt_mem_pool_destroy(c->mem_pool); + nxt_mem_pool_destroy(r->mem_pool); +} |