diff options
Diffstat (limited to '')
-rw-r--r-- | src/nxt_application.c | 119 |
1 files changed, 70 insertions, 49 deletions
diff --git a/src/nxt_application.c b/src/nxt_application.c index 08d32b37..0d618744 100644 --- a/src/nxt_application.c +++ b/src/nxt_application.c @@ -19,17 +19,17 @@ static nxt_app_request_t *nxt_app_request_create(nxt_socket_t s, static void nxt_app_conn_update(nxt_thread_t *thr, nxt_event_conn_t *c, nxt_log_t *log); static nxt_int_t nxt_app_write_finish(nxt_app_request_t *r); -static void nxt_app_buf_complettion(nxt_thread_t *thr, void *obj, void *data); -static void nxt_app_delivery_handler(nxt_thread_t *thr, void *obj, void *data); -static void nxt_app_delivery_ready(nxt_thread_t *thr, void *obj, void *data); -static void nxt_app_delivery_complettion(nxt_thread_t *thr, void *obj, +static void nxt_app_buf_completion(nxt_task_t *task, void *obj, void *data); +static void nxt_app_delivery_handler(nxt_task_t *task, void *obj, void *data); +static void nxt_app_delivery_ready(nxt_task_t *task, void *obj, void *data); +static void nxt_app_delivery_completion(nxt_task_t *task, void *obj, void *data); -static void nxt_app_delivery_error(nxt_thread_t *thr, void *obj, void *data); -static void nxt_app_delivery_timeout(nxt_thread_t *thr, void *obj, void *data); +static void nxt_app_delivery_error(nxt_task_t *task, void *obj, void *data); +static void nxt_app_delivery_timeout(nxt_task_t *task, void *obj, void *data); static nxt_msec_t nxt_app_delivery_timer_value(nxt_event_conn_t *c, uintptr_t data); -static void nxt_app_delivery_done(nxt_thread_t *thr, nxt_event_conn_t *c); -static void nxt_app_close_request(nxt_thread_t *thr, nxt_app_request_t *r); +static void nxt_app_delivery_done(nxt_task_t *task, nxt_event_conn_t *c); +static void nxt_app_close_request(nxt_task_t *task, nxt_app_request_t *r); typedef struct nxt_app_http_parse_state_s nxt_app_http_parse_state_t; @@ -229,6 +229,13 @@ nxt_app_request_create(nxt_socket_t s, nxt_log_t *log) c->socket.fd = s; c->socket.data = r; + c->task.thread = nxt_thread(); + c->task.log = log; + c->task.ident = log->ident; + c->socket.task = &c->task; + c->read_timer.task = &c->task; + c->write_timer.task = &c->task; + r->mem_pool = mp; r->event_conn = c; r->log = log; @@ -480,8 +487,6 @@ nxt_app_http_process_headers(nxt_app_request_t *r) static void nxt_app_conn_update(nxt_thread_t *thr, nxt_event_conn_t *c, nxt_log_t *log) { - static nxt_atomic_t ident = 1; - c->socket.write_ready = 1; c->socket.log = &c->log; @@ -490,11 +495,15 @@ nxt_app_conn_update(nxt_thread_t *thr, nxt_event_conn_t *c, nxt_log_t *log) /* The while loop skips possible uint32_t overflow. */ while (c->log.ident == 0) { - c->log.ident = (uint32_t) nxt_atomic_fetch_add(&ident, 1); + c->log.ident = nxt_task_next_ident(); } thr->engine->connections++; + c->task.thread = thr; + c->task.log = &c->log; + c->task.ident = c->log.ident; + c->io = thr->engine->event->io; c->max_chunk = NXT_INT32_T_MAX; c->sendfile = NXT_CONN_SENDFILE_UNSET; @@ -568,10 +577,11 @@ nxt_app_http_read_body(nxt_app_request_t *r, u_char *data, size_t len) nxt_int_t nxt_app_write(nxt_app_request_t *r, const u_char *data, size_t len) { - size_t free; - nxt_err_t err; - nxt_buf_t *b, *out, **next; - nxt_uint_t bufs; + size_t free; + nxt_err_t err; + nxt_buf_t *b, *out, **next; + nxt_uint_t bufs; + nxt_event_conn_t *c; out = NULL; next = &out; @@ -609,9 +619,10 @@ nxt_app_write(nxt_app_request_t *r, const u_char *data, size_t len) if (bufs == nxt_app_buf_max_number) { bufs = 0; *next = NULL; + c = r->event_conn; nxt_event_engine_post(nxt_app_engine, nxt_app_delivery_handler, - r->event_conn, out, &nxt_main_log); + &c->task, c, out, &nxt_main_log); out = NULL; next = &out; @@ -652,7 +663,7 @@ nxt_app_write(nxt_app_request_t *r, const u_char *data, size_t len) return NXT_ERROR; } - b->completion_handler = nxt_app_buf_complettion; + b->completion_handler = nxt_app_buf_completion; nxt_app_buf_current_number++; } @@ -664,9 +675,10 @@ nxt_app_write(nxt_app_request_t *r, const u_char *data, size_t len) if (out != NULL) { *next = NULL; + c = r->event_conn; nxt_event_engine_post(nxt_app_engine, nxt_app_delivery_handler, - r->event_conn, out, &nxt_main_log); + &c->task, c, out, &nxt_main_log); } return NXT_OK; @@ -676,14 +688,15 @@ nxt_app_write(nxt_app_request_t *r, const u_char *data, size_t len) static nxt_int_t nxt_app_write_finish(nxt_app_request_t *r) { - nxt_buf_t *b, *out; + nxt_buf_t *b, *out; + nxt_event_conn_t *c; b = nxt_buf_sync_alloc(r->mem_pool, NXT_BUF_SYNC_LAST); if (nxt_slow_path(b == NULL)) { return NXT_ERROR; } - b->completion_handler = nxt_app_buf_complettion; + b->completion_handler = nxt_app_buf_completion; b->parent = (nxt_buf_t *) r; out = r->output_buf; @@ -696,21 +709,23 @@ nxt_app_write_finish(nxt_app_request_t *r) out = b; } + c = r->event_conn; + nxt_event_engine_post(nxt_app_engine, nxt_app_delivery_handler, - r->event_conn, out, &nxt_main_log); + &c->task, c, out, &nxt_main_log); return NXT_OK; } static void -nxt_app_buf_complettion(nxt_thread_t *thr, void *obj, void *data) +nxt_app_buf_completion(nxt_task_t *task, void *obj, void *data) { nxt_buf_t *b; b = obj; - nxt_log_debug(thr->log, "app buf completion"); + nxt_debug(task, "app buf completion"); b->next = nxt_app_buf_done; nxt_app_buf_done = b; @@ -718,7 +733,7 @@ nxt_app_buf_complettion(nxt_thread_t *thr, void *obj, void *data) static void -nxt_app_delivery_handler(nxt_thread_t *thr, void *obj, void *data) +nxt_app_delivery_handler(nxt_task_t *task, void *obj, void *data) { nxt_buf_t *b; nxt_mem_pool_t *mp; @@ -727,7 +742,7 @@ nxt_app_delivery_handler(nxt_thread_t *thr, void *obj, void *data) c = obj; b = data; - nxt_log_debug(thr->log, "app delivery handler"); + nxt_debug(task, "app delivery handler"); if (c->write != NULL) { nxt_buf_chain_add(&c->write, b); @@ -742,21 +757,20 @@ nxt_app_delivery_handler(nxt_thread_t *thr, void *obj, void *data) } c->mem_pool = mp; - nxt_app_conn_update(thr, c, &nxt_main_log); + nxt_app_conn_update(task->thread, c, &nxt_main_log); } if (c->socket.timedout || c->socket.error != 0) { nxt_buf_chain_add(&nxt_app_buf_done, b); - nxt_thread_work_queue_add(thr, c->write_work_queue, - nxt_app_delivery_complettion, c, NULL, - thr->log); + nxt_thread_work_queue_add(task->thread, c->write_work_queue, + nxt_app_delivery_completion, task, c, NULL); return; } c->write = b; c->write_state = &nxt_app_delivery_write_state; - nxt_event_conn_write(thr, c); + nxt_event_conn_write(task, c); } @@ -777,24 +791,29 @@ static const nxt_event_conn_state_t nxt_app_delivery_write_state static void -nxt_app_delivery_ready(nxt_thread_t *thr, void *obj, void *data) +nxt_app_delivery_ready(nxt_task_t *task, void *obj, void *data) { nxt_event_conn_t *c; c = obj; - nxt_thread_work_queue_add(thr, c->write_work_queue, - nxt_app_delivery_complettion, c, NULL, thr->log); + nxt_debug(task, "app delivery ready"); + + nxt_thread_work_queue_add(task->thread, c->write_work_queue, + nxt_app_delivery_completion, task, c, NULL); } static void -nxt_app_delivery_complettion(nxt_thread_t *thr, void *obj, void *data) +nxt_app_delivery_completion(nxt_task_t *task, void *obj, void *data) { nxt_buf_t *b, *bn, *free; + nxt_thread_t *thread; nxt_app_request_t *r; - nxt_log_debug(thr->log, "app delivery complettion"); + thread = task->thread; + + nxt_debug(task, "app delivery completion"); free = NULL; @@ -813,7 +832,7 @@ nxt_app_delivery_complettion(nxt_thread_t *thr, void *obj, void *data) if (nxt_buf_is_last(b)) { r = (nxt_app_request_t *) b->parent; - nxt_app_close_request(thr, r); + nxt_app_close_request(task, r); } } @@ -831,35 +850,35 @@ nxt_app_delivery_complettion(nxt_thread_t *thr, void *obj, void *data) (void) nxt_thread_mutex_unlock(&nxt_app_mutex); - nxt_thread_time_update(thr); + nxt_thread_time_update(thread); (void) nxt_thread_cond_signal(&nxt_app_cond); } static void -nxt_app_delivery_error(nxt_thread_t *thr, void *obj, void *data) +nxt_app_delivery_error(nxt_task_t *task, void *obj, void *data) { nxt_event_conn_t *c; c = obj; - nxt_log_debug(thr->log, "app delivery error"); + nxt_debug(task, "app delivery error"); - nxt_app_delivery_done(thr, c); + nxt_app_delivery_done(task, c); } static void -nxt_app_delivery_timeout(nxt_thread_t *thr, void *obj, void *data) +nxt_app_delivery_timeout(nxt_task_t *task, void *obj, void *data) { nxt_event_conn_t *c; c = obj; - nxt_log_debug(thr->log, "app delivery timeout"); + nxt_debug(task, "app delivery timeout"); - nxt_app_delivery_done(thr, c); + nxt_app_delivery_done(task, c); } @@ -872,31 +891,33 @@ nxt_app_delivery_timer_value(nxt_event_conn_t *c, uintptr_t data) static void -nxt_app_delivery_done(nxt_thread_t *thr, nxt_event_conn_t *c) +nxt_app_delivery_done(nxt_task_t *task, nxt_event_conn_t *c) { if (c->write == NULL) { return; } + nxt_debug(task, "app delivery done"); + nxt_buf_chain_add(&nxt_app_buf_done, c->write); c->write = NULL; - nxt_thread_work_queue_add(thr, c->write_work_queue, - nxt_app_delivery_complettion, c, NULL, thr->log); + nxt_thread_work_queue_add(task->thread, c->write_work_queue, + nxt_app_delivery_completion, task, c, NULL); } static void -nxt_app_close_request(nxt_thread_t *thr, nxt_app_request_t *r) +nxt_app_close_request(nxt_task_t *task, nxt_app_request_t *r) { nxt_event_conn_t *c; - nxt_log_debug(thr->log, "app close connection"); + nxt_debug(task, "app close connection"); c = r->event_conn; - nxt_event_conn_close(thr, c); + nxt_event_conn_close(task, c); nxt_mem_pool_destroy(c->mem_pool); nxt_mem_pool_destroy(r->mem_pool); |