diff options
Diffstat (limited to 'src/nxt_application.c')
-rw-r--r-- | src/nxt_application.c | 104 |
1 files changed, 66 insertions, 38 deletions
diff --git a/src/nxt_application.c b/src/nxt_application.c index 0d618744..e87fcdf4 100644 --- a/src/nxt_application.c +++ b/src/nxt_application.c @@ -19,6 +19,7 @@ static nxt_app_request_t *nxt_app_request_create(nxt_socket_t s, static void nxt_app_conn_update(nxt_thread_t *thr, nxt_event_conn_t *c, nxt_log_t *log); static nxt_int_t nxt_app_write_finish(nxt_app_request_t *r); +static void nxt_app_buf_send(nxt_event_conn_t *c, nxt_buf_t *out); static void nxt_app_buf_completion(nxt_task_t *task, void *obj, void *data); static void nxt_app_delivery_handler(nxt_task_t *task, void *obj, void *data); static void nxt_app_delivery_ready(nxt_task_t *task, void *obj, void *data); @@ -29,7 +30,7 @@ static void nxt_app_delivery_timeout(nxt_task_t *task, void *obj, void *data); static nxt_msec_t nxt_app_delivery_timer_value(nxt_event_conn_t *c, uintptr_t data); static void nxt_app_delivery_done(nxt_task_t *task, nxt_event_conn_t *c); -static void nxt_app_close_request(nxt_task_t *task, nxt_app_request_t *r); +static void nxt_app_close_request(nxt_task_t *task, void *obj, void *data); typedef struct nxt_app_http_parse_state_s nxt_app_http_parse_state_t; @@ -40,6 +41,13 @@ struct nxt_app_http_parse_state_s { u_char *end, nxt_app_http_parse_state_t *state); }; + +typedef struct { + nxt_work_t work; + nxt_buf_t buf; +} nxt_app_buf_t; + + static nxt_int_t nxt_app_http_parse_request(nxt_app_request_t *r, u_char *buf, size_t size); static nxt_int_t nxt_app_http_parse_request_line(nxt_app_request_header_t *h, @@ -83,13 +91,11 @@ nxt_app_start(nxt_cycle_t *cycle) return NXT_ERROR; } - link = nxt_malloc(sizeof(nxt_thread_link_t)); + link = nxt_zalloc(sizeof(nxt_thread_link_t)); if (nxt_fast_path(link != NULL)) { link->start = nxt_app_thread; link->data = cycle; - link->engine = NULL; - link->exit = NULL; return nxt_thread_create(&handle, link); } @@ -136,8 +142,12 @@ nxt_app_thread(void *ctx) ls = cycle->listen_sockets->elts; for ( ;; ) { + nxt_log_debug(thr->log, "wait on accept"); + s = accept(ls->socket, NULL, NULL); + nxt_thread_time_update(thr); + if (nxt_slow_path(s == -1)) { err = nxt_socket_errno; @@ -190,6 +200,8 @@ nxt_app_thread(void *ctx) nxt_app->run(r); + nxt_log_debug(thr->log, "app request done"); + if (nxt_slow_path(nxt_app_write_finish(r) == NXT_ERROR)) { goto fail; } @@ -577,11 +589,12 @@ nxt_app_http_read_body(nxt_app_request_t *r, u_char *data, size_t len) nxt_int_t nxt_app_write(nxt_app_request_t *r, const u_char *data, size_t len) { - size_t free; - nxt_err_t err; - nxt_buf_t *b, *out, **next; - nxt_uint_t bufs; - nxt_event_conn_t *c; + void *start; + size_t free; + nxt_err_t err; + nxt_buf_t *b, *out, **next; + nxt_uint_t bufs; + nxt_app_buf_t *ab; out = NULL; next = &out; @@ -619,10 +632,8 @@ nxt_app_write(nxt_app_request_t *r, const u_char *data, size_t len) if (bufs == nxt_app_buf_max_number) { bufs = 0; *next = NULL; - c = r->event_conn; - nxt_event_engine_post(nxt_app_engine, nxt_app_delivery_handler, - &c->task, c, out, &nxt_main_log); + nxt_app_buf_send(r->event_conn, out); out = NULL; next = &out; @@ -658,11 +669,20 @@ nxt_app_write(nxt_app_request_t *r, const u_char *data, size_t len) (void) nxt_thread_mutex_unlock(&nxt_app_mutex); if (b == NULL) { - b = nxt_buf_mem_alloc(nxt_app_mem_pool, 4096, 0); - if (nxt_slow_path(b == NULL)) { + start = nxt_malloc(4096); + if (nxt_slow_path(start == NULL)) { + return NXT_ERROR; + } + + ab = nxt_zalloc(sizeof(nxt_app_buf_t)); + if (nxt_slow_path(ab == NULL)) { return NXT_ERROR; } + b = &ab->buf; + + nxt_buf_mem_init(b, start, 4096); + b->completion_handler = nxt_app_buf_completion; nxt_app_buf_current_number++; @@ -675,10 +695,8 @@ nxt_app_write(nxt_app_request_t *r, const u_char *data, size_t len) if (out != NULL) { *next = NULL; - c = r->event_conn; - nxt_event_engine_post(nxt_app_engine, nxt_app_delivery_handler, - &c->task, c, out, &nxt_main_log); + nxt_app_buf_send(r->event_conn, out); } return NXT_OK; @@ -688,8 +706,7 @@ nxt_app_write(nxt_app_request_t *r, const u_char *data, size_t len) static nxt_int_t nxt_app_write_finish(nxt_app_request_t *r) { - nxt_buf_t *b, *out; - nxt_event_conn_t *c; + nxt_buf_t *b, *out; b = nxt_buf_sync_alloc(r->mem_pool, NXT_BUF_SYNC_LAST); if (nxt_slow_path(b == NULL)) { @@ -709,16 +726,26 @@ nxt_app_write_finish(nxt_app_request_t *r) out = b; } - c = r->event_conn; - - nxt_event_engine_post(nxt_app_engine, nxt_app_delivery_handler, - &c->task, c, out, &nxt_main_log); + nxt_app_buf_send(r->event_conn, out); return NXT_OK; } static void +nxt_app_buf_send(nxt_event_conn_t *c, nxt_buf_t *out) +{ + nxt_app_buf_t *ab; + + ab = nxt_container_of(out, nxt_app_buf_t, buf); + + nxt_work_set(&ab->work, nxt_app_delivery_handler, &c->task, c, out); + + nxt_event_engine_post(nxt_app_engine, &ab->work); +} + + +static void nxt_app_buf_completion(nxt_task_t *task, void *obj, void *data) { nxt_buf_t *b; @@ -762,8 +789,8 @@ nxt_app_delivery_handler(nxt_task_t *task, void *obj, void *data) if (c->socket.timedout || c->socket.error != 0) { nxt_buf_chain_add(&nxt_app_buf_done, b); - nxt_thread_work_queue_add(task->thread, c->write_work_queue, - nxt_app_delivery_completion, task, c, NULL); + nxt_work_queue_add(c->write_work_queue, nxt_app_delivery_completion, + task, c, NULL); return; } @@ -799,8 +826,8 @@ nxt_app_delivery_ready(nxt_task_t *task, void *obj, void *data) nxt_debug(task, "app delivery ready"); - nxt_thread_work_queue_add(task->thread, c->write_work_queue, - nxt_app_delivery_completion, task, c, NULL); + nxt_work_queue_add(c->write_work_queue, + nxt_app_delivery_completion, task, c, NULL); } @@ -808,11 +835,8 @@ static void nxt_app_delivery_completion(nxt_task_t *task, void *obj, void *data) { nxt_buf_t *b, *bn, *free; - nxt_thread_t *thread; nxt_app_request_t *r; - thread = task->thread; - nxt_debug(task, "app delivery completion"); free = NULL; @@ -832,7 +856,9 @@ nxt_app_delivery_completion(nxt_task_t *task, void *obj, void *data) if (nxt_buf_is_last(b)) { r = (nxt_app_request_t *) b->parent; - nxt_app_close_request(task, r); + + nxt_work_queue_add(&task->thread->engine->final_work_queue, + nxt_app_close_request, task, r, NULL); } } @@ -850,7 +876,7 @@ nxt_app_delivery_completion(nxt_task_t *task, void *obj, void *data) (void) nxt_thread_mutex_unlock(&nxt_app_mutex); - nxt_thread_time_update(thread); + nxt_thread_time_update(task->thread); (void) nxt_thread_cond_signal(&nxt_app_cond); } @@ -903,20 +929,22 @@ nxt_app_delivery_done(nxt_task_t *task, nxt_event_conn_t *c) c->write = NULL; - nxt_thread_work_queue_add(task->thread, c->write_work_queue, - nxt_app_delivery_completion, task, c, NULL); + nxt_work_queue_add(c->write_work_queue, + nxt_app_delivery_completion, task, c, NULL); } static void -nxt_app_close_request(nxt_task_t *task, nxt_app_request_t *r) +nxt_app_close_request(nxt_task_t *task, void *obj, void *data) { - nxt_event_conn_t *c; - - nxt_debug(task, "app close connection"); + nxt_event_conn_t *c; + nxt_app_request_t *r; + r = obj; c = r->event_conn; + nxt_debug(task, "app close connection"); + nxt_event_conn_close(task, c); nxt_mem_pool_destroy(c->mem_pool); |