diff options
author | Igor Sysoev <igor@sysoev.ru> | 2017-01-27 11:35:11 +0300 |
---|---|---|
committer | Igor Sysoev <igor@sysoev.ru> | 2017-01-27 11:35:11 +0300 |
commit | ba0391577b06446307fa073f856f57748557e0dd (patch) | |
tree | b2b871a041edee242662c95197bed292531c3a9a /src | |
parent | 6886b83c1f3bfdc514d58ad6e9ab40873cafcb54 (diff) | |
download | unit-ba0391577b06446307fa073f856f57748557e0dd.tar.gz unit-ba0391577b06446307fa073f856f57748557e0dd.tar.bz2 |
Work queues refactoring.
Diffstat (limited to 'src')
34 files changed, 396 insertions, 762 deletions
diff --git a/src/nxt_application.c b/src/nxt_application.c index 0d618744..e87fcdf4 100644 --- a/src/nxt_application.c +++ b/src/nxt_application.c @@ -19,6 +19,7 @@ static nxt_app_request_t *nxt_app_request_create(nxt_socket_t s, static void nxt_app_conn_update(nxt_thread_t *thr, nxt_event_conn_t *c, nxt_log_t *log); static nxt_int_t nxt_app_write_finish(nxt_app_request_t *r); +static void nxt_app_buf_send(nxt_event_conn_t *c, nxt_buf_t *out); static void nxt_app_buf_completion(nxt_task_t *task, void *obj, void *data); static void nxt_app_delivery_handler(nxt_task_t *task, void *obj, void *data); static void nxt_app_delivery_ready(nxt_task_t *task, void *obj, void *data); @@ -29,7 +30,7 @@ static void nxt_app_delivery_timeout(nxt_task_t *task, void *obj, void *data); static nxt_msec_t nxt_app_delivery_timer_value(nxt_event_conn_t *c, uintptr_t data); static void nxt_app_delivery_done(nxt_task_t *task, nxt_event_conn_t *c); -static void nxt_app_close_request(nxt_task_t *task, nxt_app_request_t *r); +static void nxt_app_close_request(nxt_task_t *task, void *obj, void *data); typedef struct nxt_app_http_parse_state_s nxt_app_http_parse_state_t; @@ -40,6 +41,13 @@ struct nxt_app_http_parse_state_s { u_char *end, nxt_app_http_parse_state_t *state); }; + +typedef struct { + nxt_work_t work; + nxt_buf_t buf; +} nxt_app_buf_t; + + static nxt_int_t nxt_app_http_parse_request(nxt_app_request_t *r, u_char *buf, size_t size); static nxt_int_t nxt_app_http_parse_request_line(nxt_app_request_header_t *h, @@ -83,13 +91,11 @@ nxt_app_start(nxt_cycle_t *cycle) return NXT_ERROR; } - link = nxt_malloc(sizeof(nxt_thread_link_t)); + link = nxt_zalloc(sizeof(nxt_thread_link_t)); if (nxt_fast_path(link != NULL)) { link->start = nxt_app_thread; link->data = cycle; - link->engine = NULL; - link->exit = NULL; return nxt_thread_create(&handle, link); } @@ -136,8 +142,12 @@ nxt_app_thread(void *ctx) ls = cycle->listen_sockets->elts; for ( ;; ) { + nxt_log_debug(thr->log, "wait on accept"); + s = accept(ls->socket, NULL, NULL); + nxt_thread_time_update(thr); + if (nxt_slow_path(s == -1)) { err = nxt_socket_errno; @@ -190,6 +200,8 @@ nxt_app_thread(void *ctx) nxt_app->run(r); + nxt_log_debug(thr->log, "app request done"); + if (nxt_slow_path(nxt_app_write_finish(r) == NXT_ERROR)) { goto fail; } @@ -577,11 +589,12 @@ nxt_app_http_read_body(nxt_app_request_t *r, u_char *data, size_t len) nxt_int_t nxt_app_write(nxt_app_request_t *r, const u_char *data, size_t len) { - size_t free; - nxt_err_t err; - nxt_buf_t *b, *out, **next; - nxt_uint_t bufs; - nxt_event_conn_t *c; + void *start; + size_t free; + nxt_err_t err; + nxt_buf_t *b, *out, **next; + nxt_uint_t bufs; + nxt_app_buf_t *ab; out = NULL; next = &out; @@ -619,10 +632,8 @@ nxt_app_write(nxt_app_request_t *r, const u_char *data, size_t len) if (bufs == nxt_app_buf_max_number) { bufs = 0; *next = NULL; - c = r->event_conn; - nxt_event_engine_post(nxt_app_engine, nxt_app_delivery_handler, - &c->task, c, out, &nxt_main_log); + nxt_app_buf_send(r->event_conn, out); out = NULL; next = &out; @@ -658,11 +669,20 @@ nxt_app_write(nxt_app_request_t *r, const u_char *data, size_t len) (void) nxt_thread_mutex_unlock(&nxt_app_mutex); if (b == NULL) { - b = nxt_buf_mem_alloc(nxt_app_mem_pool, 4096, 0); - if (nxt_slow_path(b == NULL)) { + start = nxt_malloc(4096); + if (nxt_slow_path(start == NULL)) { + return NXT_ERROR; + } + + ab = nxt_zalloc(sizeof(nxt_app_buf_t)); + if (nxt_slow_path(ab == NULL)) { return NXT_ERROR; } + b = &ab->buf; + + nxt_buf_mem_init(b, start, 4096); + b->completion_handler = nxt_app_buf_completion; nxt_app_buf_current_number++; @@ -675,10 +695,8 @@ nxt_app_write(nxt_app_request_t *r, const u_char *data, size_t len) if (out != NULL) { *next = NULL; - c = r->event_conn; - nxt_event_engine_post(nxt_app_engine, nxt_app_delivery_handler, - &c->task, c, out, &nxt_main_log); + nxt_app_buf_send(r->event_conn, out); } return NXT_OK; @@ -688,8 +706,7 @@ nxt_app_write(nxt_app_request_t *r, const u_char *data, size_t len) static nxt_int_t nxt_app_write_finish(nxt_app_request_t *r) { - nxt_buf_t *b, *out; - nxt_event_conn_t *c; + nxt_buf_t *b, *out; b = nxt_buf_sync_alloc(r->mem_pool, NXT_BUF_SYNC_LAST); if (nxt_slow_path(b == NULL)) { @@ -709,16 +726,26 @@ nxt_app_write_finish(nxt_app_request_t *r) out = b; } - c = r->event_conn; - - nxt_event_engine_post(nxt_app_engine, nxt_app_delivery_handler, - &c->task, c, out, &nxt_main_log); + nxt_app_buf_send(r->event_conn, out); return NXT_OK; } static void +nxt_app_buf_send(nxt_event_conn_t *c, nxt_buf_t *out) +{ + nxt_app_buf_t *ab; + + ab = nxt_container_of(out, nxt_app_buf_t, buf); + + nxt_work_set(&ab->work, nxt_app_delivery_handler, &c->task, c, out); + + nxt_event_engine_post(nxt_app_engine, &ab->work); +} + + +static void nxt_app_buf_completion(nxt_task_t *task, void *obj, void *data) { nxt_buf_t *b; @@ -762,8 +789,8 @@ nxt_app_delivery_handler(nxt_task_t *task, void *obj, void *data) if (c->socket.timedout || c->socket.error != 0) { nxt_buf_chain_add(&nxt_app_buf_done, b); - nxt_thread_work_queue_add(task->thread, c->write_work_queue, - nxt_app_delivery_completion, task, c, NULL); + nxt_work_queue_add(c->write_work_queue, nxt_app_delivery_completion, + task, c, NULL); return; } @@ -799,8 +826,8 @@ nxt_app_delivery_ready(nxt_task_t *task, void *obj, void *data) nxt_debug(task, "app delivery ready"); - nxt_thread_work_queue_add(task->thread, c->write_work_queue, - nxt_app_delivery_completion, task, c, NULL); + nxt_work_queue_add(c->write_work_queue, + nxt_app_delivery_completion, task, c, NULL); } @@ -808,11 +835,8 @@ static void nxt_app_delivery_completion(nxt_task_t *task, void *obj, void *data) { nxt_buf_t *b, *bn, *free; - nxt_thread_t *thread; nxt_app_request_t *r; - thread = task->thread; - nxt_debug(task, "app delivery completion"); free = NULL; @@ -832,7 +856,9 @@ nxt_app_delivery_completion(nxt_task_t *task, void *obj, void *data) if (nxt_buf_is_last(b)) { r = (nxt_app_request_t *) b->parent; - nxt_app_close_request(task, r); + + nxt_work_queue_add(&task->thread->engine->final_work_queue, + nxt_app_close_request, task, r, NULL); } } @@ -850,7 +876,7 @@ nxt_app_delivery_completion(nxt_task_t *task, void *obj, void *data) (void) nxt_thread_mutex_unlock(&nxt_app_mutex); - nxt_thread_time_update(thread); + nxt_thread_time_update(task->thread); (void) nxt_thread_cond_signal(&nxt_app_cond); } @@ -903,20 +929,22 @@ nxt_app_delivery_done(nxt_task_t *task, nxt_event_conn_t *c) c->write = NULL; - nxt_thread_work_queue_add(task->thread, c->write_work_queue, - nxt_app_delivery_completion, task, c, NULL); + nxt_work_queue_add(c->write_work_queue, + nxt_app_delivery_completion, task, c, NULL); } static void -nxt_app_close_request(nxt_task_t *task, nxt_app_request_t *r) +nxt_app_close_request(nxt_task_t *task, void *obj, void *data) { - nxt_event_conn_t *c; - - nxt_debug(task, "app close connection"); + nxt_event_conn_t *c; + nxt_app_request_t *r; + r = obj; c = r->event_conn; + nxt_debug(task, "app close connection"); + nxt_event_conn_close(task, c); nxt_mem_pool_destroy(c->mem_pool); diff --git a/src/nxt_buf.c b/src/nxt_buf.c index 88444a3d..d2b6fe7a 100644 --- a/src/nxt_buf.c +++ b/src/nxt_buf.c @@ -10,6 +10,18 @@ static void nxt_buf_completion(nxt_task_t *task, void *obj, void *data); +void +nxt_buf_mem_init(nxt_buf_t *b, void *start, size_t size) +{ + b->size = NXT_BUF_MEM_SIZE; + + b->mem.start = start; + b->mem.pos = start; + b->mem.free = start; + b->mem.end = start + size; +} + + nxt_buf_t * nxt_buf_mem_alloc(nxt_mem_pool_t *mp, size_t size, nxt_uint_t flags) { diff --git a/src/nxt_buf.h b/src/nxt_buf.h index 240a1d22..85ab3602 100644 --- a/src/nxt_buf.h +++ b/src/nxt_buf.h @@ -226,6 +226,7 @@ nxt_buf_used_size(b) \ nxt_buf_mem_used_size(&(b)->mem)) +NXT_EXPORT void nxt_buf_mem_init(nxt_buf_t *b, void *start, size_t size); NXT_EXPORT nxt_buf_t *nxt_buf_mem_alloc(nxt_mem_pool_t *mp, size_t size, nxt_uint_t flags); NXT_EXPORT nxt_buf_t *nxt_buf_file_alloc(nxt_mem_pool_t *mp, size_t size, diff --git a/src/nxt_chan.c b/src/nxt_chan.c index baa4b9d4..6832ecdc 100644 --- a/src/nxt_chan.c +++ b/src/nxt_chan.c @@ -141,7 +141,7 @@ nxt_chan_write_enable(nxt_task_t *task, nxt_chan_t *chan) chan->socket.task = &chan->task; - chan->socket.write_work_queue = &task->thread->work_queue.main; + chan->socket.write_work_queue = &task->thread->engine->fast_work_queue; chan->socket.write_handler = nxt_chan_write_handler; chan->socket.error_handler = nxt_chan_error_handler; } @@ -290,9 +290,8 @@ nxt_chan_write_handler(nxt_task_t *task, void *obj, void *data) fail: - nxt_thread_work_queue_add(task->thread, &task->thread->work_queue.main, - nxt_chan_error_handler, task, &chan->socket, - NULL); + nxt_work_queue_add(&task->thread->engine->fast_work_queue, + nxt_chan_error_handler, task, &chan->socket, NULL); } @@ -308,7 +307,7 @@ nxt_chan_read_enable(nxt_task_t *task, nxt_chan_t *chan) chan->socket.task = &chan->task; - chan->socket.read_work_queue = &task->thread->work_queue.main; + chan->socket.read_work_queue = &task->thread->engine->fast_work_queue; chan->socket.read_handler = nxt_chan_read_handler; chan->socket.error_handler = nxt_chan_error_handler; @@ -378,9 +377,8 @@ nxt_chan_read_handler(nxt_task_t *task, void *obj, void *data) /* n == 0 || n == NXT_ERROR */ - nxt_thread_work_queue_add(task->thread, &task->thread->work_queue.main, - nxt_chan_error_handler, task, - &chan->socket, NULL); + nxt_work_queue_add(&task->thread->engine->fast_work_queue, + nxt_chan_error_handler, task, &chan->socket, NULL); return; } } diff --git a/src/nxt_cycle.c b/src/nxt_cycle.c index 4b6eeb40..4b19f9da 100644 --- a/src/nxt_cycle.c +++ b/src/nxt_cycle.c @@ -153,8 +153,8 @@ nxt_cycle_create(nxt_thread_t *thr, nxt_task_t *task, nxt_cycle_t *previous, nxt_log_debug(thr->log, "new cycle: %p", cycle); - nxt_thread_work_queue_add(thr, &thr->work_queue.main, nxt_cycle_start, - task, cycle, NULL); + nxt_work_queue_add(&thr->engine->fast_work_queue, nxt_cycle_start, + task, cycle, NULL); return NXT_OK; @@ -583,8 +583,8 @@ nxt_cycle_quit(nxt_task_t *task, nxt_cycle_t *cycle) nxt_cycle_close_idle_connections(thr, task); if (done) { - nxt_thread_work_queue_add(thr, &thr->work_queue.main, nxt_cycle_exit, - task, cycle, NULL); + nxt_work_queue_add(&thr->engine->fast_work_queue, nxt_cycle_exit, + task, cycle, NULL); } } diff --git a/src/nxt_epoll.c b/src/nxt_epoll.c index 0fe81092..0de9763a 100644 --- a/src/nxt_epoll.c +++ b/src/nxt_epoll.c @@ -648,10 +648,9 @@ nxt_epoll_commit_changes(nxt_task_t *task, nxt_epoll_event_set_t *es) nxt_log(ev->task, NXT_LOG_CRIT, "epoll_ctl(%d, %d, %d) failed %E", es->epoll, ch->op, ch->fd, nxt_errno); - nxt_thread_work_queue_add(task->thread, - &task->thread->work_queue.main, - nxt_epoll_error_handler, - ev->task, ev, ev->data); + nxt_work_queue_add(&task->thread->engine->fast_work_queue, + nxt_epoll_error_handler, + ev->task, ev, ev->data); ret = NXT_ERROR; } @@ -719,12 +718,12 @@ nxt_epoll_add_signal(nxt_epoll_event_set_t *es, nxt_event_signals_t *signals) nxt_main_log_debug("signalfd(): %d", fd); + thr = nxt_thread(); + es->signalfd.data = signals->handler; - es->signalfd.read_work_queue = nxt_thread_main_work_queue(); + es->signalfd.read_work_queue = &thr->engine->fast_work_queue; es->signalfd.read_handler = nxt_epoll_signalfd_handler; es->signalfd.log = &nxt_main_log; - - thr = nxt_thread(); es->signalfd.task = &thr->engine->task; ee.events = EPOLLIN; @@ -805,12 +804,12 @@ nxt_epoll_enable_post(nxt_event_set_t *event_set, nxt_work_handler_t handler) nxt_main_log_debug("eventfd(): %d", es->eventfd.fd); - es->eventfd.read_work_queue = nxt_thread_main_work_queue(); + thr = nxt_thread(); + + es->eventfd.read_work_queue = &thr->engine->fast_work_queue; es->eventfd.read_handler = nxt_epoll_eventfd_handler; es->eventfd.data = es; es->eventfd.log = &nxt_main_log; - - thr = nxt_thread(); es->eventfd.task = &thr->engine->task; ee.events = EPOLLIN | EPOLLET; @@ -960,9 +959,8 @@ nxt_epoll_poll(nxt_task_t *task, nxt_event_set_t *event_set, error = 0; - nxt_thread_work_queue_add(task->thread, ev->read_work_queue, - ev->read_handler, - ev->task, ev, ev->data); + nxt_work_queue_add(ev->read_work_queue, ev->read_handler, + ev->task, ev, ev->data); } else if (event_set->epoll.mode == 0) { /* Level-triggered mode. */ @@ -981,9 +979,8 @@ nxt_epoll_poll(nxt_task_t *task, nxt_event_set_t *event_set, error = 0; - nxt_thread_work_queue_add(task->thread, ev->write_work_queue, - ev->write_handler, - ev->task, ev, ev->data); + nxt_work_queue_add(ev->write_work_queue, ev->write_handler, + ev->task, ev, ev->data); } else if (event_set->epoll.mode == 0) { /* Level-triggered mode. */ diff --git a/src/nxt_event_conn.c b/src/nxt_event_conn.c index 6c4fa949..913038f1 100644 --- a/src/nxt_event_conn.c +++ b/src/nxt_event_conn.c @@ -85,8 +85,8 @@ nxt_event_conn_create(nxt_mem_pool_t *mp, nxt_log_t *log) c->max_chunk = NXT_INT32_T_MAX; c->sendfile = NXT_CONN_SENDFILE_UNSET; - c->socket.read_work_queue = &thr->work_queue.main; - c->socket.write_work_queue = &thr->work_queue.main; + c->socket.read_work_queue = &thr->engine->fast_work_queue; + c->socket.write_work_queue = &thr->engine->fast_work_queue; nxt_event_conn_timer_init(&c->read_timer, c, c->socket.read_work_queue); nxt_event_conn_timer_init(&c->write_timer, c, c->socket.write_work_queue); @@ -143,10 +143,6 @@ nxt_event_conn_close(nxt_task_t *task, nxt_event_conn_t *c) thr = task->thread; - nxt_thread_work_queue_drop(thr, c); - nxt_thread_work_queue_drop(thr, &c->read_timer); - nxt_thread_work_queue_drop(thr, &c->write_timer); - engine = thr->engine; nxt_event_timer_delete(engine, &c->read_timer); @@ -168,8 +164,8 @@ nxt_event_conn_close(nxt_task_t *task, nxt_event_conn_t *c) handler = nxt_event_conn_shutdown_socket; } - nxt_thread_work_queue_add(thr, wq, handler, task, - (void *) (uintptr_t) c->socket.fd, NULL); + nxt_work_queue_add(wq, handler, task, + (void *) (uintptr_t) c->socket.fd, NULL); } else { nxt_socket_close(c->socket.fd); @@ -188,10 +184,9 @@ nxt_event_conn_shutdown_socket(nxt_task_t *task, void *obj, void *data) nxt_socket_shutdown(s, SHUT_RDWR); - nxt_thread_work_queue_add(task->thread, - &task->thread->engine->close_work_queue, - nxt_event_conn_close_socket, task, - (void *) (uintptr_t) s, NULL); + nxt_work_queue_add(&task->thread->engine->close_work_queue, + nxt_event_conn_close_socket, task, + (void *) (uintptr_t) s, NULL); } diff --git a/src/nxt_event_conn.h b/src/nxt_event_conn.h index c35dcf42..0140664b 100644 --- a/src/nxt_event_conn.h +++ b/src/nxt_event_conn.h @@ -192,7 +192,7 @@ typedef struct { nxt_event_conn_io_handle(thr, wq, handler, task, c, data) \ do { \ if (thr->engine->batch != 0) { \ - nxt_thread_work_queue_add(thr, wq, handler, task, c, data); \ + nxt_work_queue_add(wq, handler, task, c, data); \ \ } else { \ handler(task, c, data); \ @@ -301,9 +301,8 @@ NXT_EXPORT void nxt_event_conn_job_sendfile(nxt_task_t *task, #define \ nxt_event_conn_connect_enqueue(thr, task, c) \ - nxt_thread_work_queue_add(thr, &thr->engine->socket_work_queue, \ - nxt_event_conn_batch_socket, \ - task, c, c->socket.data) + nxt_work_queue_add(&thr->engine->socket_work_queue, \ + nxt_event_conn_batch_socket, task, c, c->socket.data) #define \ @@ -311,8 +310,8 @@ nxt_event_conn_read_enqueue(thr, task, c) \ do { \ c->socket.read_work_queue = &thr->engine->read_work_queue; \ \ - nxt_thread_work_queue_add(thr, &thr->engine->read_work_queue, \ - c->io->read, task, c, c->socket.data); \ + nxt_work_queue_add(&thr->engine->read_work_queue, \ + c->io->read, task, c, c->socket.data); \ } while (0) @@ -321,8 +320,8 @@ nxt_event_conn_write_enqueue(thr, task, c) \ do { \ c->socket.write_work_queue = &thr->engine->write_work_queue; \ \ - nxt_thread_work_queue_add(thr, &thr->engine->write_work_queue, \ - c->io->write, task, c, c->socket.data); \ + nxt_work_queue_add(&thr->engine->write_work_queue, \ + c->io->write, task, c, c->socket.data); \ } while (0) diff --git a/src/nxt_event_conn_accept.c b/src/nxt_event_conn_accept.c index a1339235..27b5ac27 100644 --- a/src/nxt_event_conn_accept.c +++ b/src/nxt_event_conn_accept.c @@ -50,7 +50,7 @@ nxt_event_conn_listen(nxt_task_t *task, nxt_listen_socket_t *ls) cls->socket.read_work_queue = &engine->accept_work_queue; } else { - cls->socket.read_work_queue = &task->thread->work_queue.main; + cls->socket.read_work_queue = &engine->fast_work_queue; cls->batch = 1; } @@ -62,7 +62,7 @@ nxt_event_conn_listen(nxt_task_t *task, nxt_listen_socket_t *ls) cls->listen = ls; - cls->timer.work_queue = &task->thread->work_queue.main; + cls->timer.work_queue = &engine->fast_work_queue; cls->timer.handler = nxt_event_conn_listen_timer_handler; cls->timer.log = &nxt_main_log; @@ -221,15 +221,15 @@ nxt_event_conn_accept(nxt_task_t *task, nxt_event_conn_listen_t *cls, c->listen->handler(task, c, NULL); } else { - nxt_thread_work_queue_add(task->thread, c->write_work_queue, - c->listen->handler, task, c, NULL); + nxt_work_queue_add(c->write_work_queue, c->listen->handler, + task, c, NULL); } next = nxt_event_conn_accept_next(task, cls); if (next != NULL && cls->socket.read_ready) { - nxt_thread_work_queue_add(task->thread, cls->socket.read_work_queue, - cls->accept, task, cls, next); + nxt_work_queue_add(cls->socket.read_work_queue, + cls->accept, task, cls, next); } } diff --git a/src/nxt_event_conn_connect.c b/src/nxt_event_conn_connect.c index 55554720..3a80b8d7 100644 --- a/src/nxt_event_conn_connect.c +++ b/src/nxt_event_conn_connect.c @@ -17,8 +17,8 @@ nxt_event_conn_connect(nxt_task_t *task, nxt_event_conn_t *c) engine = task->thread->engine; if (engine->batch != 0) { - nxt_thread_work_queue_add(task->thread, &engine->socket_work_queue, - nxt_event_conn_batch_socket, task, c, data); + nxt_work_queue_add(&engine->socket_work_queue, + nxt_event_conn_batch_socket, task, c, data); return; } @@ -47,9 +47,8 @@ nxt_event_conn_batch_socket(nxt_task_t *task, void *obj, void *data) handler = c->write_state->error_handler; } - nxt_thread_work_queue_add(task->thread, - &task->thread->engine->connect_work_queue, - handler, task, c, data); + nxt_work_queue_add(&task->thread->engine->connect_work_queue, + handler, task, c, data); } diff --git a/src/nxt_event_conn_job_sendfile.c b/src/nxt_event_conn_job_sendfile.c index 86a05969..e9334016 100644 --- a/src/nxt_event_conn_job_sendfile.c +++ b/src/nxt_event_conn_job_sendfile.c @@ -157,9 +157,10 @@ done: fast: - nxt_thread_pool_post(task->thread->thread_pool, - nxt_event_conn_job_sendfile_handler, - &jbs->job.task, jbs, c); + nxt_work_set(&jbs->job.work, nxt_event_conn_job_sendfile_handler, + jbs->job.task, jbs, c); + + nxt_thread_pool_post(task->thread->thread_pool, &jbs->job.work); } @@ -257,8 +258,8 @@ nxt_event_conn_job_sendfile_completion(nxt_task_t *task, nxt_event_conn_t *c, break; } - nxt_thread_work_queue_add(task->thread, c->write_work_queue, - b->completion_handler, task, b, b->parent); + nxt_work_queue_add(c->write_work_queue, + b->completion_handler, task, b, b->parent); b = b->next; } diff --git a/src/nxt_event_conn_proxy.c b/src/nxt_event_conn_proxy.c index 0a2a6474..a63b18f6 100644 --- a/src/nxt_event_conn_proxy.c +++ b/src/nxt_event_conn_proxy.c @@ -456,9 +456,9 @@ nxt_event_conn_proxy_read_process(nxt_task_t *task, nxt_event_conn_proxy_t *p, } if (rb->mem.start != rb->mem.end) { - nxt_thread_work_queue_push(task->thread, source->read_work_queue, - nxt_event_conn_proxy_read, - task, source, source->socket.data); + nxt_work_queue_add(source->read_work_queue, + nxt_event_conn_proxy_read, + task, source, source->socket.data); break; } @@ -665,9 +665,9 @@ nxt_event_conn_proxy_write_process(nxt_task_t *task, nxt_event_conn_proxy_t *p, nxt_buf_free(sink->mem_pool, wb); } - nxt_thread_work_queue_push(task->thread, source->read_work_queue, - nxt_event_conn_proxy_read, task, source, - source->socket.data); + nxt_work_queue_add(source->read_work_queue, + nxt_event_conn_proxy_read, task, source, + source->socket.data); } @@ -1008,8 +1008,6 @@ nxt_event_conn_proxy_complete(nxt_task_t *task, nxt_event_conn_proxy_t *p) nxt_event_conn_close(task, p->peer); } else if (p->delayed) { - nxt_thread_work_queue_drop(task->thread, &p->peer->write_timer); - nxt_queue_remove(&p->peer->link); nxt_event_timer_delete(task->thread->engine, &p->peer->write_timer); } diff --git a/src/nxt_event_conn_read.c b/src/nxt_event_conn_read.c index f6bbbbd8..93a17ddc 100644 --- a/src/nxt_event_conn_read.c +++ b/src/nxt_event_conn_read.c @@ -20,8 +20,7 @@ nxt_event_conn_read(nxt_task_t *task, nxt_event_conn_t *c) wq = &task->thread->engine->read_work_queue; c->socket.read_work_queue = wq; - nxt_thread_work_queue_add(task->thread, wq, handler, task, c, - c->socket.data); + nxt_work_queue_add( wq, handler, task, c, c->socket.data); return; } @@ -134,8 +133,8 @@ ready: done: if (batch) { - nxt_thread_work_queue_add(task->thread, c->read_work_queue, handler, - task, c, data); + nxt_work_queue_add(c->read_work_queue, handler, task, c, data); + } else { handler(task, c, data); } diff --git a/src/nxt_event_engine.c b/src/nxt_event_engine.c index ead63b72..921de23c 100644 --- a/src/nxt_event_engine.c +++ b/src/nxt_event_engine.c @@ -20,8 +20,8 @@ static void nxt_event_engine_signal_pipe_error(nxt_task_t *task, void *obj, void *data); static void nxt_event_engine_signal_handler(nxt_task_t *task, void *obj, void *data); -static const nxt_event_sig_t *nxt_event_engine_signal_find(nxt_task_t *task, - nxt_uint_t signo); +static nxt_work_handler_t nxt_event_engine_queue_pop(nxt_event_engine_t *engine, + nxt_task_t **task, void **obj, void **data); nxt_event_engine_t * @@ -52,8 +52,21 @@ nxt_event_engine_create(nxt_thread_t *thr, const nxt_event_set_ops_t *event_set, } } - nxt_thread_work_queue_create(thr, 0); + engine->current_work_queue = &engine->fast_work_queue; + nxt_work_queue_cache_create(&engine->work_queue_cache, 0); + + engine->fast_work_queue.cache = &engine->work_queue_cache; + engine->accept_work_queue.cache = &engine->work_queue_cache; + engine->read_work_queue.cache = &engine->work_queue_cache; + engine->socket_work_queue.cache = &engine->work_queue_cache; + engine->connect_work_queue.cache = &engine->work_queue_cache; + engine->write_work_queue.cache = &engine->work_queue_cache; + engine->shutdown_work_queue.cache = &engine->work_queue_cache; + engine->close_work_queue.cache = &engine->work_queue_cache; + engine->final_work_queue.cache = &engine->work_queue_cache; + + nxt_work_queue_name(&engine->fast_work_queue, "fast"); nxt_work_queue_name(&engine->accept_work_queue, "accept"); nxt_work_queue_name(&engine->read_work_queue, "read"); nxt_work_queue_name(&engine->socket_work_queue, "socket"); @@ -61,12 +74,7 @@ nxt_event_engine_create(nxt_thread_t *thr, const nxt_event_set_ops_t *event_set, nxt_work_queue_name(&engine->write_work_queue, "write"); nxt_work_queue_name(&engine->shutdown_work_queue, "shutdown"); nxt_work_queue_name(&engine->close_work_queue, "close"); - -#if (NXT_THREADS) - - nxt_locked_work_queue_create(&engine->work_queue, 7); - -#endif + nxt_work_queue_name(&engine->final_work_queue, "final"); if (signals != NULL) { engine->signals = nxt_event_engine_signals(signals); @@ -134,7 +142,7 @@ event_set_fail: signals_fail: nxt_free(engine->signals); - nxt_thread_work_queue_destroy(thr); + nxt_work_queue_cache_destroy(&engine->work_queue_cache); nxt_free(engine->fibers); fibers_fail: @@ -193,9 +201,9 @@ nxt_event_engine_signal_pipe_create(nxt_event_engine_t *engine) } pipe->event.fd = pipe->fds[0]; - pipe->event.read_work_queue = &engine->task.thread->work_queue.main; + pipe->event.read_work_queue = &engine->fast_work_queue; pipe->event.read_handler = nxt_event_engine_signal_pipe; - pipe->event.write_work_queue = &engine->task.thread->work_queue.main; + pipe->event.write_work_queue = &engine->fast_work_queue; pipe->event.error_handler = nxt_event_engine_signal_pipe_error; pipe->event.log = &nxt_main_log; @@ -237,12 +245,11 @@ nxt_event_engine_signal_pipe_close(nxt_task_t *task, void *obj, void *data) void -nxt_event_engine_post(nxt_event_engine_t *engine, nxt_work_handler_t handler, - nxt_task_t *task, void *obj, void *data, nxt_log_t *log) +nxt_event_engine_post(nxt_event_engine_t *engine, nxt_work_t *work) { nxt_thread_log_debug("event engine post"); - nxt_locked_work_queue_add(&engine->work_queue, handler, task, obj, data); + nxt_locked_work_queue_add(&engine->locked_work_queue, work); nxt_event_engine_signal(engine, 0); } @@ -273,12 +280,11 @@ nxt_event_engine_signal(nxt_event_engine_t *engine, nxt_uint_t signo) static void nxt_event_engine_signal_pipe(nxt_task_t *task, void *obj, void *data) { - int i, n; - u_char signo; - nxt_bool_t post; - nxt_event_fd_t *ev; - const nxt_event_sig_t *sigev; - u_char buf[128]; + int i, n; + u_char signo; + nxt_bool_t post; + nxt_event_fd_t *ev; + u_char buf[128]; ev = obj; @@ -299,12 +305,8 @@ nxt_event_engine_signal_pipe(nxt_task_t *task, void *obj, void *data) post = 1; } else { - sigev = nxt_event_engine_signal_find(task, signo); - - if (nxt_fast_path(sigev != NULL)) { - sigev->handler(task, (void *) (uintptr_t) signo, - (void *) sigev->name); - } + nxt_event_engine_signal_handler(task, + (void *) (uintptr_t) signo, NULL); } } @@ -319,12 +321,14 @@ nxt_event_engine_signal_pipe(nxt_task_t *task, void *obj, void *data) static void nxt_event_engine_post_handler(nxt_task_t *task, void *obj, void *data) { - nxt_thread_t *thread; + nxt_thread_t *thread; + nxt_event_engine_t *engine; thread = task->thread; + engine = thread->engine; - nxt_locked_work_queue_move(thread, &thread->engine->work_queue, - &thread->work_queue.main); + nxt_locked_work_queue_move(thread, &engine->locked_work_queue, + &engine->fast_work_queue); } @@ -351,31 +355,17 @@ nxt_event_engine_signal_handler(nxt_task_t *task, void *obj, void *data) signo = (uintptr_t) obj; - sigev = nxt_event_engine_signal_find(task, signo); - - if (nxt_fast_path(sigev != NULL)) { - sigev->handler(task, (void *) (uintptr_t) signo, (void *) sigev->name); - } -} - - -static const nxt_event_sig_t * -nxt_event_engine_signal_find(nxt_task_t *task, nxt_uint_t signo) -{ - const nxt_event_sig_t *sigev; - for (sigev = task->thread->engine->signals->sigev; sigev->signo != 0; sigev++) { if (signo == (nxt_uint_t) sigev->signo) { - return sigev; + sigev->handler(task, (void *) signo, (void *) sigev->name); + return; } } nxt_log(task, NXT_LOG_CRIT, "signal %ui handler not found", signo); - - return NULL; } @@ -397,7 +387,7 @@ nxt_event_engine_change(nxt_thread_t *thr, nxt_task_t *task, nxt_event_engine_signals_stop(engine); /* - * Add to thread main work queue the signal events possibly + * Add to engine fast work queue the signal events possibly * received before the blocking signal processing. */ nxt_event_engine_signal_pipe(task, &engine->pipe->event, NULL); @@ -406,11 +396,11 @@ nxt_event_engine_change(nxt_thread_t *thr, nxt_task_t *task, if (engine->pipe != NULL && event_set->enable_post != NULL) { /* * An engine pipe must be closed after all signal events - * added above to thread main work queue will be processed. + * added above to engine fast work queue will be processed. */ - nxt_thread_work_queue_add(thr, &thr->work_queue.main, - nxt_event_engine_signal_pipe_close, - &engine->task, engine->pipe, NULL); + nxt_work_queue_add(&engine->final_work_queue, + nxt_event_engine_signal_pipe_close, + &engine->task, engine->pipe, NULL); engine->pipe = NULL; } @@ -455,8 +445,7 @@ nxt_event_engine_free(nxt_event_engine_t *engine) nxt_event_engine_signal_pipe_free(engine); nxt_free(engine->signals); - nxt_locked_work_queue_destroy(&engine->work_queue); - nxt_thread_work_queue_destroy(nxt_thread()); + nxt_work_queue_cache_destroy(&engine->work_queue_cache); engine->event->free(engine->event_set); @@ -466,6 +455,35 @@ nxt_event_engine_free(nxt_event_engine_t *engine) } +static nxt_work_handler_t +nxt_event_engine_queue_pop(nxt_event_engine_t *engine, nxt_task_t **task, + void **obj, void **data) +{ + nxt_work_queue_t *wq; + + wq = engine->current_work_queue; + + if (wq->head == NULL) { + wq = &engine->fast_work_queue; + + while (wq->head == NULL) { + engine->current_work_queue++; + wq = engine->current_work_queue; + + if (wq > &engine->final_work_queue) { + engine->current_work_queue = &engine->fast_work_queue; + + return NULL; + } + } + } + + nxt_debug(&engine->task, "work queue: %s", wq->name); + + return nxt_work_queue_pop(wq, task, obj, data); +} + + void nxt_event_engine_start(nxt_event_engine_t *engine) { @@ -487,40 +505,25 @@ nxt_event_engine_start(nxt_event_engine_t *engine) /* A return point from fibers. */ } - for ( ;; ) { + thr->log = &nxt_main_log; - for ( ;; ) { - handler = nxt_thread_work_queue_pop(thr, &task, &obj, &data); - - if (handler == NULL) { - break; - } - - handler(task, obj, data); - - thr->log = &nxt_main_log; - } + for ( ;; ) { for ( ;; ) { - handler = nxt_thread_last_work_queue_pop(thr, &task, &obj, &data); + handler = nxt_event_engine_queue_pop(engine, &task, &obj, &data); if (handler == NULL) { break; } handler(task, obj, data); - - thr->log = &nxt_main_log; } /* Attach some event engine work queues in preferred order. */ - nxt_work_queue_attach(thr, &engine->accept_work_queue); - nxt_work_queue_attach(thr, &engine->read_work_queue); - timeout = nxt_event_timer_find(engine); - engine->event->poll(task, engine->event_set, timeout); + engine->event->poll(&engine->task, engine->event_set, timeout); /* * Look up expired timers only if a new zero timer has been diff --git a/src/nxt_event_engine.h b/src/nxt_event_engine.h index 1ff1399a..54b3bcde 100644 --- a/src/nxt_event_engine.h +++ b/src/nxt_event_engine.h @@ -33,6 +33,9 @@ struct nxt_event_engine_s { */ nxt_event_engine_pipe_t *pipe; + nxt_work_queue_cache_t work_queue_cache; + nxt_work_queue_t *current_work_queue; + nxt_work_queue_t fast_work_queue; nxt_work_queue_t accept_work_queue; nxt_work_queue_t read_work_queue; nxt_work_queue_t socket_work_queue; @@ -40,8 +43,9 @@ struct nxt_event_engine_s { nxt_work_queue_t write_work_queue; nxt_work_queue_t shutdown_work_queue; nxt_work_queue_t close_work_queue; + nxt_work_queue_t final_work_queue; - nxt_locked_work_queue_t work_queue; + nxt_locked_work_queue_t locked_work_queue; nxt_event_signals_t *signals; @@ -68,8 +72,7 @@ NXT_EXPORT void nxt_event_engine_free(nxt_event_engine_t *engine); NXT_EXPORT void nxt_event_engine_start(nxt_event_engine_t *engine); NXT_EXPORT void nxt_event_engine_post(nxt_event_engine_t *engine, - nxt_work_handler_t handler, nxt_task_t *task, void *obj, void *data, - nxt_log_t *log); + nxt_work_t *work); NXT_EXPORT void nxt_event_engine_signal(nxt_event_engine_t *engine, nxt_uint_t signo); @@ -84,14 +87,4 @@ nxt_thread_event_engine(void) } -nxt_inline nxt_work_queue_t * -nxt_thread_main_work_queue(void) -{ - nxt_thread_t *thr; - - thr = nxt_thread(); - return &thr->work_queue.main; -} - - #endif /* _NXT_EVENT_ENGINE_H_INCLUDED_ */ diff --git a/src/nxt_event_timer.c b/src/nxt_event_timer.c index 4a260705..e9615f92 100644 --- a/src/nxt_event_timer.c +++ b/src/nxt_event_timer.c @@ -313,8 +313,7 @@ nxt_event_timer_expire(nxt_thread_t *thr, nxt_msec_t now) if (ev->state != NXT_EVENT_TIMER_DISABLED) { ev->state = NXT_EVENT_TIMER_DISABLED; - nxt_thread_work_queue_add(thr, ev->work_queue, ev->handler, - ev->task, ev, NULL); + nxt_work_queue_add(ev->work_queue, ev->handler, ev->task, ev, NULL); } } } diff --git a/src/nxt_fiber.c b/src/nxt_fiber.c index a7b89402..f792897d 100644 --- a/src/nxt_fiber.c +++ b/src/nxt_fiber.c @@ -16,7 +16,7 @@ static void nxt_fiber_timer_handler(nxt_task_t *task, void *obj, void *data); #define \ nxt_fiber_enqueue(thr, task, fib) \ - nxt_thread_work_queue_add(thr, &(thr)->work_queue.main, \ + nxt_work_queue_add(&(thr)->engine->fast_work_queue, \ nxt_fiber_switch_handler, task, fib, NULL) @@ -392,7 +392,7 @@ nxt_fiber_sleep(nxt_task_t *task, nxt_msec_t timeout) fib = task->thread->fiber; - fib->timer.work_queue = &task->thread->work_queue.main; + fib->timer.work_queue = &task->thread->engine->fast_work_queue; fib->timer.handler = nxt_fiber_timer_handler; fib->timer.log = &nxt_main_log; diff --git a/src/nxt_job.c b/src/nxt_job.c index e1256d61..86cfc462 100644 --- a/src/nxt_job.c +++ b/src/nxt_job.c @@ -44,8 +44,6 @@ nxt_job_create(nxt_mem_pool_t *mp, size_t size) /* Allow safe nxt_queue_remove() in nxt_job_destroy(). */ nxt_queue_self(&job->link); - job->task.ident = nxt_task_next_ident(); - return job; } @@ -58,8 +56,6 @@ nxt_job_init(nxt_job_t *job, size_t size) nxt_job_set_name(job, "job"); nxt_queue_self(&job->link); - - job->task.ident = nxt_task_next_ident(); } @@ -118,8 +114,11 @@ nxt_job_start(nxt_task_t *task, nxt_job_t *job, nxt_work_handler_t handler) job->engine = task->thread->engine; - ret = nxt_thread_pool_post(job->thread_pool, nxt_job_thread_trampoline, - &job->task, job, (void *) handler); + nxt_work_set(&job->work, nxt_job_thread_trampoline, + job->task, job, (void *) handler); + + ret = nxt_thread_pool_post(job->thread_pool, &job->work); + if (ret == NXT_OK) { return; } @@ -129,7 +128,7 @@ nxt_job_start(nxt_task_t *task, nxt_job_t *job, nxt_work_handler_t handler) #endif - handler(&job->task, job, job->data); + handler(job->task, job, job->data); } @@ -146,15 +145,13 @@ nxt_job_thread_trampoline(nxt_task_t *task, void *obj, void *data) job = obj; handler = (nxt_work_handler_t) data; - job->task.log = job->log; - nxt_debug(task, "%s thread", job->name); if (nxt_slow_path(job->cancel)) { nxt_job_return(task, job, job->abort_handler); } else { - handler(&job->task, job, job->data); + handler(job->task, job, job->data); } } @@ -170,8 +167,12 @@ nxt_job_return(nxt_task_t *task, nxt_job_t *job, nxt_work_handler_t handler) if (job->engine != NULL) { /* A return function is called in thread pool thread context. */ - nxt_event_engine_post(job->engine, nxt_job_thread_return_handler, - &job->task, job, (void *) handler, job->log); + + nxt_work_set(&job->work, nxt_job_thread_return_handler, + job->task, job, (void *) handler); + + nxt_event_engine_post(job->engine, &job->work); + return; } @@ -182,8 +183,8 @@ nxt_job_return(nxt_task_t *task, nxt_job_t *job, nxt_work_handler_t handler) handler = job->abort_handler; } - nxt_thread_work_queue_push(task->thread, &task->thread->work_queue.main, - handler, &job->task, job, job->data); + nxt_work_queue_add(&task->thread->engine->fast_work_queue, + handler, job->task, job, job->data); } @@ -198,14 +199,14 @@ nxt_job_thread_return_handler(nxt_task_t *task, void *obj, void *data) job = obj; handler = (nxt_work_handler_t) data; - job->task.thread = task->thread; + job->task->thread = task->thread; if (nxt_slow_path(job->cancel)) { nxt_debug(task, "%s cancellation", job->name); handler = job->abort_handler; } - handler(&job->task, job, job->data); + handler(job->task, job, job->data); } #endif diff --git a/src/nxt_job.h b/src/nxt_job.h index f1954b57..b1af2e80 100644 --- a/src/nxt_job.h +++ b/src/nxt_job.h @@ -33,7 +33,7 @@ typedef struct { void *data; - nxt_task_t task; + nxt_task_t *task; nxt_work_handler_t abort_handler; @@ -49,6 +49,8 @@ typedef struct { nxt_log_t *log; #endif + nxt_work_t work; + #if (NXT_DEBUG) const char *name; #endif diff --git a/src/nxt_job_resolve.c b/src/nxt_job_resolve.c index e44acd14..f0b34e2c 100644 --- a/src/nxt_job_resolve.c +++ b/src/nxt_job_resolve.c @@ -121,5 +121,5 @@ fail: freeaddrinfo(res); } - nxt_job_return(&jbr->job.task, &jbr->job, handler); + nxt_job_return(jbr->job.task, &jbr->job, handler); } diff --git a/src/nxt_kqueue.c b/src/nxt_kqueue.c index 7370a401..c03cd8f4 100644 --- a/src/nxt_kqueue.c +++ b/src/nxt_kqueue.c @@ -531,11 +531,13 @@ static void nxt_kqueue_error(nxt_kqueue_event_set_t *ks) { struct kevent *kev, *end; - nxt_thread_t *thr; + nxt_thread_t *thread; nxt_event_fd_t *ev; nxt_event_file_t *fev; + nxt_work_queue_t *wq; - thr = nxt_thread(); + thread = nxt_thread(); + wq = &thread->engine->fast_work_queue; end = &ks->changes[ks->nchanges]; for (kev = ks->changes; kev < end; kev++) { @@ -545,16 +547,14 @@ nxt_kqueue_error(nxt_kqueue_event_set_t *ks) case EVFILT_READ: case EVFILT_WRITE: ev = nxt_kevent_get_udata(kev->udata); - nxt_thread_work_queue_add(thr, &thr->work_queue.main, - nxt_kqueue_fd_error_handler, - ev->task, ev, ev->data); + nxt_work_queue_add(wq, nxt_kqueue_fd_error_handler, + ev->task, ev, ev->data); break; case EVFILT_VNODE: fev = nxt_kevent_get_udata(kev->udata); - nxt_thread_work_queue_add(thr, &thr->work_queue.main, - nxt_kqueue_file_error_handler, - fev->task, fev, fev->data); + nxt_work_queue_add(wq, nxt_kqueue_file_error_handler, + fev->task, fev, fev->data); break; } } @@ -768,7 +768,7 @@ nxt_kqueue_poll(nxt_task_t *task, nxt_event_set_t *event_set, } event_task = task; - wq = &task->thread->work_queue.main; + wq = &task->thread->engine->fast_work_queue; handler = nxt_kqueue_fd_error_handler; obj = nxt_kevent_get_udata(kev->udata); @@ -871,8 +871,7 @@ nxt_kqueue_poll(nxt_task_t *task, nxt_event_set_t *event_set, continue; } - nxt_thread_work_queue_add(task->thread, wq, handler, - event_task, obj, data); + nxt_work_queue_add(wq, handler, event_task, obj, data); } } @@ -938,8 +937,8 @@ nxt_kqueue_event_conn_connected(nxt_task_t *task, void *obj, void *data) nxt_event_timer_disable(&c->write_timer); } - nxt_thread_work_queue_add(task->thread, c->write_work_queue, - c->write_state->ready_handler, task, c, data); + nxt_work_queue_add(c->write_work_queue, c->write_state->ready_handler, + task, c, data); } @@ -1020,8 +1019,8 @@ nxt_kqueue_event_conn_io_read(nxt_task_t *task, void *obj, void *data) nxt_debug(task, "kevent fd:%d eof", c->socket.fd); c->socket.closed = 1; - nxt_thread_work_queue_add(task->thread, c->read_work_queue, - c->read_state->close_handler, task, c, data); + nxt_work_queue_add(c->read_work_queue, c->read_state->close_handler, + task, c, data); return; } diff --git a/src/nxt_log.h b/src/nxt_log.h index d04bc3dd..14f07383 100644 --- a/src/nxt_log.h +++ b/src/nxt_log.h @@ -123,7 +123,7 @@ nxt_log_debug(_log, ...) \ #else -#define nxt_log_debug(...) +#define nxt_debug(...) #define \ nxt_log_debug(...) diff --git a/src/nxt_log_moderation.c b/src/nxt_log_moderation.c index 5082270d..a571ae17 100644 --- a/src/nxt_log_moderation.c +++ b/src/nxt_log_moderation.c @@ -58,7 +58,7 @@ nxt_log_moderate_allow(nxt_log_moderation_t *mod) nxt_thread_spin_unlock(&mod->lock); if (timer) { - mod->timer.work_queue = &thr->work_queue.main; + mod->timer.work_queue = &thr->engine->fast_work_queue; mod->timer.handler = nxt_log_moderate_timer_handler; mod->timer.log = &nxt_main_log; diff --git a/src/nxt_master_process.c b/src/nxt_master_process.c index 8ce6e670..8a11aa16 100644 --- a/src/nxt_master_process.c +++ b/src/nxt_master_process.c @@ -228,7 +228,7 @@ nxt_master_process_new_cycle(nxt_task_t *task, nxt_cycle_t *cycle) cycle->timer.log = &nxt_main_log; nxt_event_timer_ident(&cycle->timer, -1); - cycle->timer.work_queue = &thr->work_queue.main; + cycle->timer.work_queue = &thr->engine->fast_work_queue; nxt_event_timer_add(thr->engine, &cycle->timer, 500); diff --git a/src/nxt_poll.c b/src/nxt_poll.c index 2cc0200b..6f6299b0 100644 --- a/src/nxt_poll.c +++ b/src/nxt_poll.c @@ -397,8 +397,8 @@ nxt_poll_commit_changes(nxt_thread_t *thr, nxt_poll_event_set_t *ps) break; } - nxt_thread_work_queue_add(thr, &thr->work_queue.main, - ev->error_handler, ev->task, ev, ev->data); + nxt_work_queue_add(&thr->engine->fast_work_queue, ev->error_handler, + ev->task, ev, ev->data); ret = NXT_ERROR; @@ -608,10 +608,9 @@ nxt_poll_set_poll(nxt_task_t *task, nxt_event_set_t *event_set, /* Mark the poll entry to ignore it by the kernel. */ pfd->fd = -1; - nxt_thread_work_queue_add(task->thread, - &task->thread->work_queue.main, - ev->error_handler, - ev->task, ev, ev->data); + nxt_work_queue_add(&ev->task->thread->engine->fast_work_queue, + ev->error_handler, + ev->task, ev, ev->data); goto next; } @@ -653,8 +652,8 @@ nxt_poll_set_poll(nxt_task_t *task, nxt_event_set_t *event_set, nxt_poll_change(event_set, ev, NXT_POLL_DELETE, 0); } - nxt_thread_work_queue_add(task->thread, ev->read_work_queue, - ev->read_handler, ev->task, ev, ev->data); + nxt_work_queue_add(ev->read_work_queue, ev->read_handler, + ev->task, ev, ev->data); } if ((events & POLLOUT) || (error && ev->write_handler != NULL)) { @@ -665,9 +664,8 @@ nxt_poll_set_poll(nxt_task_t *task, nxt_event_set_t *event_set, nxt_poll_change(event_set, ev, NXT_POLL_DELETE, 0); } - nxt_thread_work_queue_add(task->thread, ev->write_work_queue, - ev->write_handler, - ev->task, ev, ev->data); + nxt_work_queue_add(ev->write_work_queue, ev->write_handler, + ev->task, ev, ev->data); } next: diff --git a/src/nxt_select.c b/src/nxt_select.c index a2982d2f..8ee5808e 100644 --- a/src/nxt_select.c +++ b/src/nxt_select.c @@ -141,9 +141,8 @@ nxt_select_enable_read(nxt_event_set_t *event_set, nxt_event_fd_t *ev) if (fd < 0 || fd >= (nxt_fd_t) FD_SETSIZE) { thr = nxt_thread(); - nxt_thread_work_queue_add(thr, &thr->work_queue.main, - nxt_select_error_handler, - ev->task, ev, ev->data); + nxt_work_queue_add(&thr->engine->fast_work_queue, + nxt_select_error_handler, ev->task, ev, ev->data); return; } @@ -174,9 +173,8 @@ nxt_select_enable_write(nxt_event_set_t *event_set, nxt_event_fd_t *ev) if (fd < 0 || fd >= (nxt_fd_t) FD_SETSIZE) { thr = nxt_thread(); - nxt_thread_work_queue_add(thr, &thr->work_queue.main, - nxt_select_error_handler, - ev->task, ev, ev->data); + nxt_work_queue_add(&thr->engine->fast_work_queue, + nxt_select_error_handler, ev->task, ev, ev->data); return; } @@ -365,8 +363,8 @@ nxt_select_poll(nxt_task_t *task, nxt_event_set_t *event_set, nxt_select_disable_read(event_set, ev); } - nxt_thread_work_queue_add(task->thread, ev->read_work_queue, - ev->read_handler, ev->task, ev, ev->data); + nxt_work_queue_add(ev->read_work_queue, ev->read_handler, + ev->task, ev, ev->data); found = 1; } @@ -382,9 +380,8 @@ nxt_select_poll(nxt_task_t *task, nxt_event_set_t *event_set, nxt_select_disable_write(event_set, ev); } - nxt_thread_work_queue_add(task->thread, ev->write_work_queue, - ev->write_handler, - ev->task, ev, ev->data); + nxt_work_queue_add(ev->write_work_queue, ev->write_handler, + ev->task, ev, ev->data); found = 1; } diff --git a/src/nxt_sendbuf.c b/src/nxt_sendbuf.c index e8a4c190..309d3147 100644 --- a/src/nxt_sendbuf.c +++ b/src/nxt_sendbuf.c @@ -343,8 +343,7 @@ nxt_sendbuf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b, } } - nxt_thread_work_queue_add(task->thread, wq, b->completion_handler, task, - b, b->parent); + nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent); b = b->next; } diff --git a/src/nxt_sockaddr.c b/src/nxt_sockaddr.c index f14f690d..67e20b3f 100644 --- a/src/nxt_sockaddr.c +++ b/src/nxt_sockaddr.c @@ -495,7 +495,7 @@ nxt_job_sockaddr_parse(nxt_job_sockaddr_parse_t *jbs) return; } - nxt_job_return(&jbs->resolve.job.task, &jbs->resolve.job, handler); + nxt_job_return(jbs->resolve.job.task, &jbs->resolve.job, handler); } diff --git a/src/nxt_thread.c b/src/nxt_thread.c index 680412c2..24136bde 100644 --- a/src/nxt_thread.c +++ b/src/nxt_thread.c @@ -180,16 +180,22 @@ nxt_thread_time_cleanup(void *data) void nxt_thread_exit(nxt_thread_t *thr) { + nxt_thread_link_t *link; + nxt_log_debug(thr->log, "thread exit"); - if (thr->link != NULL) { - nxt_event_engine_post(thr->link->engine, thr->link->exit, - &thr->link->engine->task, - (void *) (uintptr_t) thr->handle, - NULL, &nxt_main_log); + link = thr->link; + thr->link = NULL; + + if (link != NULL) { + /* + * link->handler is already set to an exit handler, + * and link->task is already set to engine->task. + * The link should be freed by the exit handler. + */ + link->work.obj = thr->handle; - nxt_free(thr->link); - thr->link = NULL; + nxt_event_engine_post(link->engine, &link->work); } nxt_thread_time_free(thr); diff --git a/src/nxt_thread.h b/src/nxt_thread.h index ebff808f..b25bd24a 100644 --- a/src/nxt_thread.h +++ b/src/nxt_thread.h @@ -92,7 +92,7 @@ typedef struct { nxt_thread_start_t start; void *data; nxt_event_engine_t *engine; - nxt_work_handler_t exit; + nxt_work_t work; } nxt_thread_link_t; @@ -179,7 +179,6 @@ struct nxt_thread_s { nxt_thread_time_t time; nxt_event_engine_t *engine; - nxt_thread_work_queue_t work_queue; /* * Although pointer to a current fiber should be a property of diff --git a/src/nxt_thread_pool.c b/src/nxt_thread_pool.c index 463bfad4..fd7246c8 100644 --- a/src/nxt_thread_pool.c +++ b/src/nxt_thread_pool.c @@ -38,8 +38,7 @@ nxt_thread_pool_create(nxt_uint_t max_threads, nxt_nsec_t timeout, nxt_int_t -nxt_thread_pool_post(nxt_thread_pool_t *tp, nxt_work_handler_t handler, - nxt_task_t *task, void *obj, void *data) +nxt_thread_pool_post(nxt_thread_pool_t *tp, nxt_work_t *work) { nxt_thread_log_debug("thread pool post"); @@ -47,7 +46,7 @@ nxt_thread_pool_post(nxt_thread_pool_t *tp, nxt_work_handler_t handler, return NXT_ERROR; } - nxt_locked_work_queue_add(&tp->work_queue, handler, task, obj, data); + nxt_locked_work_queue_add(&tp->work_queue, work); (void) nxt_sem_post(&tp->sem); @@ -66,6 +65,11 @@ nxt_thread_pool_init(nxt_thread_pool_t *tp) return NXT_OK; } + if (tp->max_threads == 0) { + /* The pool is being destroyed. */ + return NXT_ERROR; + } + nxt_thread_spin_lock(&tp->work_queue.lock); ret = NXT_OK; @@ -78,8 +82,6 @@ nxt_thread_pool_init(nxt_thread_pool_t *tp) if (nxt_fast_path(nxt_sem_init(&tp->sem, 0) == NXT_OK)) { - nxt_locked_work_queue_create(&tp->work_queue, 0); - link = nxt_malloc(sizeof(nxt_thread_link_t)); if (nxt_fast_path(link != NULL)) { @@ -102,8 +104,6 @@ nxt_thread_pool_init(nxt_thread_pool_t *tp) (void) nxt_atomic_fetch_add(&tp->threads, -1); - nxt_locked_work_queue_destroy(&tp->work_queue); - ret = NXT_ERROR; } @@ -142,8 +142,6 @@ nxt_thread_pool_start(void *ctx) tp->init(); } - nxt_thread_work_queue_create(thr, 8); - for ( ;; ) { nxt_thread_pool_wait(tp); @@ -152,18 +150,8 @@ nxt_thread_pool_start(void *ctx) if (nxt_fast_path(handler != NULL)) { task->thread = thr; - nxt_log_debug(thr->log, "locked work queue"); - handler(task, obj, data); - } - for ( ;; ) { - thr->log = &nxt_main_log; - - handler = nxt_thread_work_queue_pop(thr, &task, &obj, &data); - - if (handler == NULL) { - break; - } + nxt_log_debug(thr->log, "locked work queue"); handler(task, obj, data); } @@ -245,8 +233,8 @@ nxt_thread_pool_destroy(nxt_thread_pool_t *tp) thr = nxt_thread(); if (!tp->ready) { - nxt_thread_work_queue_add(thr, &thr->work_queue.main, tp->exit, - &tp->task, tp, NULL); + nxt_work_queue_add(&thr->engine->fast_work_queue, tp->exit, + &tp->task, tp, NULL); return; } @@ -254,7 +242,9 @@ nxt_thread_pool_destroy(nxt_thread_pool_t *tp) /* Disable new threads creation and mark a pool as being destroyed. */ tp->max_threads = 0; - nxt_thread_pool_post(tp, nxt_thread_pool_exit, &tp->task, tp, NULL); + nxt_work_set(&tp->work, nxt_thread_pool_exit, &tp->task, tp, NULL); + + nxt_thread_pool_post(tp, &tp->work); } } @@ -293,24 +283,23 @@ nxt_thread_pool_exit(nxt_task_t *task, void *obj, void *data) nxt_debug(task, "thread pool threads: %A", threads); if (threads > 1) { - nxt_thread_pool_post(tp, nxt_thread_pool_exit, &tp->task, tp, - (void *) (uintptr_t) thread->handle); + nxt_work_set(&tp->work, nxt_thread_pool_exit, &tp->task, tp, + (void *) (uintptr_t) thread->handle); + + nxt_thread_pool_post(tp, &tp->work); } else { nxt_debug(task, "thread pool destroy"); - nxt_event_engine_post(tp->engine, tp->exit, &tp->task, tp, - (void *) (uintptr_t) thread->handle, - &nxt_main_log); - nxt_sem_destroy(&tp->sem); - nxt_locked_work_queue_destroy(&tp->work_queue); + nxt_work_set(&tp->work, nxt_thread_pool_exit, &tp->task, tp, + (void *) (uintptr_t) thread->handle); - nxt_free(tp); - } + nxt_event_engine_post(tp->engine, &tp->work); - nxt_thread_work_queue_destroy(thread); + /* The "tp" memory should be freed by tp->exit handler. */ + } nxt_thread_exit(thread); diff --git a/src/nxt_thread_pool.h b/src/nxt_thread_pool.h index 75899a06..750b98f8 100644 --- a/src/nxt_thread_pool.h +++ b/src/nxt_thread_pool.h @@ -20,6 +20,7 @@ struct nxt_thread_pool_s { nxt_sem_t sem; nxt_nsec_t timeout; + nxt_work_t work; nxt_task_t task; nxt_locked_work_queue_t work_queue; @@ -37,7 +38,7 @@ NXT_EXPORT nxt_thread_pool_t *nxt_thread_pool_create(nxt_uint_t max_threads, nxt_event_engine_t *engine, nxt_work_handler_t exit); NXT_EXPORT void nxt_thread_pool_destroy(nxt_thread_pool_t *tp); NXT_EXPORT nxt_int_t nxt_thread_pool_post(nxt_thread_pool_t *tp, - nxt_work_handler_t handler, nxt_task_t *task, void *obj, void *data); + nxt_work_t *work); #endif /* _NXT_UNIX_THREAD_POOL_H_INCLUDED_ */ diff --git a/src/nxt_work_queue.c b/src/nxt_work_queue.c index ffb9317f..ecdc65ff 100644 --- a/src/nxt_work_queue.c +++ b/src/nxt_work_queue.c @@ -25,12 +25,7 @@ * a new spare chunk is allocated again. */ -static void nxt_work_queue_allocate(nxt_work_queue_cache_t *cache, - nxt_thread_spinlock_t *lock); -static void nxt_work_queue_sleep(nxt_thread_spinlock_t *lock); -static nxt_work_queue_t *nxt_thread_current_work_queue(nxt_thread_t *thr); -static nxt_work_handler_t nxt_locked_work_queue_pop_work( - nxt_locked_work_queue_t *lwq, nxt_task_t **task, void **obj, void **data); +static void nxt_work_queue_allocate(nxt_work_queue_cache_t *cache); /* It should be adjusted with the "work_queue_bucket_items" directive. */ @@ -38,32 +33,29 @@ static nxt_uint_t nxt_work_queue_bucket_items = 409; void -nxt_thread_work_queue_create(nxt_thread_t *thr, size_t chunk_size) +nxt_work_queue_cache_create(nxt_work_queue_cache_t *cache, size_t chunk_size) { - nxt_memzero(&thr->work_queue, sizeof(nxt_thread_work_queue_t)); - - nxt_work_queue_name(&thr->work_queue.main, "main"); - nxt_work_queue_name(&thr->work_queue.last, "last"); + nxt_memzero(cache, sizeof(nxt_work_queue_cache_t)); if (chunk_size == 0) { chunk_size = nxt_work_queue_bucket_items; } /* nxt_work_queue_chunk_t already has one work item. */ - thr->work_queue.cache.chunk_size = chunk_size - 1; + cache->chunk_size = chunk_size - 1; - while (thr->work_queue.cache.next == NULL) { - nxt_work_queue_allocate(&thr->work_queue.cache, NULL); + while (cache->next == NULL) { + nxt_work_queue_allocate(cache); } } void -nxt_thread_work_queue_destroy(nxt_thread_t *thr) +nxt_work_queue_cache_destroy(nxt_work_queue_cache_t *cache) { nxt_work_queue_chunk_t *chunk, *next; - for (chunk = thr->work_queue.cache.chunk; chunk; chunk = next) { + for (chunk = cache->chunk; chunk; chunk = next) { next = chunk->next; nxt_free(chunk); } @@ -71,8 +63,7 @@ nxt_thread_work_queue_destroy(nxt_thread_t *thr) static void -nxt_work_queue_allocate(nxt_work_queue_cache_t *cache, - nxt_thread_spinlock_t *lock) +nxt_work_queue_allocate(nxt_work_queue_cache_t *cache) { size_t size; nxt_uint_t i, n; @@ -102,7 +93,6 @@ nxt_work_queue_allocate(nxt_work_queue_cache_t *cache, work = NULL; } else { - nxt_work_queue_sleep(lock); return; } @@ -111,36 +101,19 @@ nxt_work_queue_allocate(nxt_work_queue_cache_t *cache, } -static void -nxt_work_queue_sleep(nxt_thread_spinlock_t *lock) -{ - if (lock != NULL) { - nxt_thread_spin_unlock(lock); - } - - nxt_nanosleep(100 * 1000000); /* 100ms */ - - if (lock != NULL) { - nxt_thread_spin_lock(lock); - } -} - - /* Add a work to a work queue tail. */ void -nxt_thread_work_queue_add(nxt_thread_t *thr, nxt_work_queue_t *wq, - nxt_work_handler_t handler, nxt_task_t *task, void *obj, void *data) +nxt_work_queue_add(nxt_work_queue_t *wq, nxt_work_handler_t handler, + nxt_task_t *task, void *obj, void *data) { nxt_work_t *work; - nxt_work_queue_attach(thr, wq); - for ( ;; ) { - work = thr->work_queue.cache.next; + work = wq->cache->next; if (nxt_fast_path(work != NULL)) { - thr->work_queue.cache.next = work->next; + wq->cache->next = work->next; work->next = NULL; work->handler = handler; @@ -160,366 +133,56 @@ nxt_thread_work_queue_add(nxt_thread_t *thr, nxt_work_queue_t *wq, return; } - nxt_work_queue_allocate(&thr->work_queue.cache, NULL); + nxt_work_queue_allocate(wq->cache); } } -/* Push a work to a work queue head. */ - -void -nxt_thread_work_queue_push(nxt_thread_t *thr, nxt_work_queue_t *wq, - nxt_work_handler_t handler, nxt_task_t *task, void *obj, void *data) -{ - nxt_work_t *work; - - nxt_work_queue_attach(thr, wq); - - for ( ;; ) { - work = thr->work_queue.cache.next; - - if (nxt_fast_path(work != NULL)) { - thr->work_queue.cache.next = work->next; - work->next = wq->head; - - work->handler = handler; - work->obj = obj; - work->data = data; - - wq->head = work; - - if (wq->tail == NULL) { - wq->tail = work; - } - - return; - } - - nxt_work_queue_allocate(&thr->work_queue.cache, NULL); - } -} - - -/* Attach a work queue to a thread work queue. */ - -void -nxt_work_queue_attach(nxt_thread_t *thr, nxt_work_queue_t *wq) -{ - if (wq->next == NULL && wq != thr->work_queue.tail) { - - if (thr->work_queue.tail != NULL) { - thr->work_queue.tail->next = wq; - - } else { - thr->work_queue.head = wq; - } - - thr->work_queue.tail = wq; - } -} - - -/* Pop a work from a thread work queue head. */ - nxt_work_handler_t -nxt_thread_work_queue_pop(nxt_thread_t *thr, nxt_task_t **task, void **obj, +nxt_work_queue_pop(nxt_work_queue_t *wq, nxt_task_t **task, void **obj, void **data) { - nxt_work_t *work; - nxt_work_queue_t *wq; - - wq = nxt_thread_current_work_queue(thr); - - if (wq != NULL) { - - work = wq->head; - - if (work != NULL) { - wq->head = work->next; - - if (work->next == NULL) { - wq->tail = NULL; - } - - *task = work->task; - *obj = work->obj; - nxt_prefetch(*obj); - *data = work->data; - nxt_prefetch(*data); - - work->next = thr->work_queue.cache.next; - thr->work_queue.cache.next = work; - -#if (NXT_DEBUG) - - if (work->handler == NULL) { - nxt_log_alert(thr->log, "null work handler"); - nxt_abort(); - } - -#endif - - return work->handler; - } - } - - return NULL; -} - - -static nxt_work_queue_t * -nxt_thread_current_work_queue(nxt_thread_t *thr) -{ - nxt_work_queue_t *wq, *next; - - for (wq = thr->work_queue.head; wq != NULL; wq = next) { - - if (wq->head != NULL) { - nxt_log_debug(thr->log, "work queue: %s", wq->name); - return wq; - } - - /* Detach empty work queue. */ - next = wq->next; - wq->next = NULL; - thr->work_queue.head = next; - } - - thr->work_queue.tail = NULL; - - return NULL; -} - - -/* Drop a work with specified data from a thread work queue. */ - -void -nxt_thread_work_queue_drop(nxt_thread_t *thr, void *data) -{ - nxt_work_t *work, *prev, *next, **link; - nxt_work_queue_t *wq; - - for (wq = thr->work_queue.head; wq != NULL; wq = wq->next) { - - prev = NULL; - link = &wq->head; - - for (work = wq->head; work != NULL; work = next) { - - next = work->next; - - if (data != work->obj) { - prev = work; - link = &work->next; - - } else { - if (next == NULL) { - wq->tail = prev; - } - - nxt_log_debug(thr->log, "work queue drop"); - - *link = next; - - work->next = thr->work_queue.cache.next; - thr->work_queue.cache.next = work; - } - } - } -} - - -/* Add a work to the thread last work queue's tail. */ - -void -nxt_thread_last_work_queue_add(nxt_thread_t *thr, nxt_work_handler_t handler, - void *obj, void *data) -{ nxt_work_t *work; - for ( ;; ) { - work = thr->work_queue.cache.next; - - if (nxt_fast_path(work != NULL)) { - thr->work_queue.cache.next = work->next; - work->next = NULL; - - work->handler = handler; - work->obj = obj; - work->data = data; - - if (thr->work_queue.last.tail != NULL) { - thr->work_queue.last.tail->next = work; - - } else { - thr->work_queue.last.head = work; - } + work = wq->head; - thr->work_queue.last.tail = work; + wq->head = work->next; - return; - } - - nxt_work_queue_allocate(&thr->work_queue.cache, NULL); - } -} - - -/* Pop a work from the thread last work queue's head. */ - -nxt_work_handler_t -nxt_thread_last_work_queue_pop(nxt_thread_t *thr, nxt_task_t **task, void **obj, - void **data) -{ - nxt_work_t *work; - - work = thr->work_queue.last.head; - - if (work != NULL) { - nxt_log_debug(thr->log, "work queue: %s", thr->work_queue.last.name); - - thr->work_queue.last.head = work->next; - - if (work->next == NULL) { - thr->work_queue.last.tail = NULL; - } - - *task = work->task; - *obj = work->obj; - nxt_prefetch(*obj); - *data = work->data; - nxt_prefetch(*data); - - work->next = thr->work_queue.cache.next; - thr->work_queue.cache.next = work; - -#if (NXT_DEBUG) - - if (work->handler == NULL) { - nxt_log_alert(thr->log, "null work handler"); - nxt_abort(); - } - -#endif - - return work->handler; - } - - return NULL; -} - - -void -nxt_work_queue_destroy(nxt_work_queue_t *wq) -{ - nxt_thread_t *thr; - nxt_work_queue_t *q; - - thr = nxt_thread(); - - /* Detach from a thread work queue. */ - - if (thr->work_queue.head == wq) { - thr->work_queue.head = wq->next; - q = NULL; - goto found; - } - - for (q = thr->work_queue.head; q != NULL; q = q->next) { - if (q->next == wq) { - q->next = wq->next; - goto found; - } - } - - return; - -found: - - if (thr->work_queue.tail == wq) { - thr->work_queue.tail = q; - } - - /* Move all queue's works to a thread work queue cache. */ - - if (wq->tail != NULL) { - wq->tail->next = thr->work_queue.cache.next; - } - - if (wq->head != NULL) { - thr->work_queue.cache.next = wq->head; - } -} - - -/* Locked work queue operations. */ - -void -nxt_locked_work_queue_create(nxt_locked_work_queue_t *lwq, size_t chunk_size) -{ - nxt_memzero(lwq, sizeof(nxt_locked_work_queue_t)); - - if (chunk_size == 0) { - chunk_size = nxt_work_queue_bucket_items; + if (work->next == NULL) { + wq->tail = NULL; } - lwq->cache.chunk_size = chunk_size; + *task = work->task; - while (lwq->cache.next == NULL) { - nxt_work_queue_allocate(&lwq->cache, NULL); - } -} + *obj = work->obj; + nxt_prefetch(*obj); + *data = work->data; + nxt_prefetch(*data); -void -nxt_locked_work_queue_destroy(nxt_locked_work_queue_t *lwq) -{ - nxt_work_queue_chunk_t *chunk, *next; + work->next = wq->cache->next; + wq->cache->next = work; - for (chunk = lwq->cache.chunk; chunk; chunk = next) { - next = chunk->next; - nxt_free(chunk); - } + return work->handler; } /* Add a work to a locked work queue tail. */ void -nxt_locked_work_queue_add(nxt_locked_work_queue_t *lwq, - nxt_work_handler_t handler, nxt_task_t *task, void *obj, void *data) +nxt_locked_work_queue_add(nxt_locked_work_queue_t *lwq, nxt_work_t *work) { - nxt_work_t *work; - nxt_thread_spin_lock(&lwq->lock); - for ( ;; ) { - work = lwq->cache.next; - - if (nxt_fast_path(work != NULL)) { - lwq->cache.next = work->next; - - work->next = NULL; - work->handler = handler; - work->task = task; - work->obj = obj; - work->data = data; - - if (lwq->tail != NULL) { - lwq->tail->next = work; - - } else { - lwq->head = work; - } - - lwq->tail = work; + if (lwq->tail != NULL) { + lwq->tail->next = work; - break; - } - - nxt_work_queue_allocate(&lwq->cache, &lwq->lock); + } else { + lwq->head = work; } + lwq->tail = work; + nxt_thread_spin_unlock(&lwq->lock); } @@ -530,46 +193,36 @@ nxt_work_handler_t nxt_locked_work_queue_pop(nxt_locked_work_queue_t *lwq, nxt_task_t **task, void **obj, void **data) { + nxt_work_t *work; nxt_work_handler_t handler; - nxt_thread_spin_lock(&lwq->lock); - - handler = nxt_locked_work_queue_pop_work(lwq, task, obj, data); - - nxt_thread_spin_unlock(&lwq->lock); + handler = NULL; - return handler; -} + nxt_thread_spin_lock(&lwq->lock); + work = lwq->head; -static nxt_work_handler_t -nxt_locked_work_queue_pop_work(nxt_locked_work_queue_t *lwq, nxt_task_t **task, - void **obj, void **data) -{ - nxt_work_t *work; + if (work != NULL) { + *task = work->task; - work = lwq->head; + *obj = work->obj; + nxt_prefetch(*obj); - if (work == NULL) { - return NULL; - } + *data = work->data; + nxt_prefetch(*data); - *task = work->task; - *obj = work->obj; - nxt_prefetch(*obj); - *data = work->data; - nxt_prefetch(*data); + lwq->head = work->next; - lwq->head = work->next; + if (work->next == NULL) { + lwq->tail = NULL; + } - if (work->next == NULL) { - lwq->tail = NULL; + handler = work->handler; } - work->next = lwq->cache.next; - lwq->cache.next = work; + nxt_thread_spin_unlock(&lwq->lock); - return work->handler; + return handler; } @@ -579,29 +232,23 @@ void nxt_locked_work_queue_move(nxt_thread_t *thr, nxt_locked_work_queue_t *lwq, nxt_work_queue_t *wq) { - void *obj, *data; - nxt_task_t *task; - nxt_work_handler_t handler; + nxt_work_t *work; - /* Locked work queue head can be tested without a lock. */ + nxt_thread_spin_lock(&lwq->lock); - if (nxt_fast_path(lwq->head == NULL)) { - return; - } + work = lwq->head; - nxt_thread_spin_lock(&lwq->lock); + lwq->head = NULL; + lwq->tail = NULL; - for ( ;; ) { - handler = nxt_locked_work_queue_pop_work(lwq, &task, &obj, &data); + nxt_thread_spin_unlock(&lwq->lock); - if (handler == NULL) { - break; - } + while (work != NULL) { + work->task->thread = thr; - task->thread = thr; + nxt_work_queue_add(wq, work->handler, work->task, + work->obj, work->data); - nxt_thread_work_queue_add(thr, wq, handler, task, obj, data); + work = work->next; } - - nxt_thread_spin_unlock(&lwq->lock); } diff --git a/src/nxt_work_queue.h b/src/nxt_work_queue.h index b37fe1a3..4d2422cc 100644 --- a/src/nxt_work_queue.h +++ b/src/nxt_work_queue.h @@ -65,7 +65,7 @@ typedef struct nxt_work_queue_s nxt_work_queue_t; struct nxt_work_queue_s { nxt_work_t *head; nxt_work_t *tail; - nxt_work_queue_t *next; + nxt_work_queue_cache_t *cache; #if (NXT_DEBUG) const char *name; #endif @@ -73,15 +73,6 @@ struct nxt_work_queue_s { typedef struct { - nxt_work_queue_t *head; - nxt_work_queue_t *tail; - nxt_work_queue_t main; - nxt_work_queue_t last; - nxt_work_queue_cache_t cache; -} nxt_thread_work_queue_t; - - -typedef struct { nxt_thread_spinlock_t lock; nxt_work_t *head; nxt_work_t *tail; @@ -89,34 +80,26 @@ typedef struct { } nxt_locked_work_queue_t; -NXT_EXPORT void nxt_thread_work_queue_create(nxt_thread_t *thr, +NXT_EXPORT void nxt_work_queue_cache_create(nxt_work_queue_cache_t *cache, size_t chunk_size); -NXT_EXPORT void nxt_thread_work_queue_destroy(nxt_thread_t *thr); -NXT_EXPORT void nxt_thread_work_queue_add(nxt_thread_t *thr, - nxt_work_queue_t *wq, nxt_work_handler_t handler, nxt_task_t *task, - void *obj, void *data); -NXT_EXPORT void nxt_thread_work_queue_push(nxt_thread_t *thr, - nxt_work_queue_t *wq, nxt_work_handler_t handler, nxt_task_t *task, - void *obj, void *data); -NXT_EXPORT void nxt_work_queue_attach(nxt_thread_t *thr, nxt_work_queue_t *wq); -NXT_EXPORT nxt_work_handler_t nxt_thread_work_queue_pop(nxt_thread_t *thr, +NXT_EXPORT void nxt_work_queue_cache_destroy(nxt_work_queue_cache_t *cache); + +NXT_EXPORT void nxt_work_queue_add(nxt_work_queue_t *wq, + nxt_work_handler_t handler, nxt_task_t *task, void *obj, void *data); +NXT_EXPORT nxt_work_handler_t nxt_work_queue_pop(nxt_work_queue_t *wq, nxt_task_t **task, void **obj, void **data); -NXT_EXPORT void nxt_thread_work_queue_drop(nxt_thread_t *thr, void *data); -#define \ -nxt_thread_current_work_queue_add(thr, handler, task, obj, data) \ +#define nxt_work_set(_work, _handler, _task, _obj, _data) \ do { \ - nxt_thread_t *_thr = thr; \ + nxt_work_t *work = _work; \ \ - nxt_thread_work_queue_add(_thr, _thr->work_queue.head, \ - handler, task, obj, data); \ + work->handler = _handler; \ + work->task = _task; \ + work->obj = _obj; \ + work->data = _data; \ } while (0) - -NXT_EXPORT void nxt_work_queue_destroy(nxt_work_queue_t *wq); - - #if (NXT_DEBUG) #define \ @@ -131,17 +114,8 @@ nxt_work_queue_name(_wq, _name) #endif -NXT_EXPORT void nxt_thread_last_work_queue_add(nxt_thread_t *thr, - nxt_work_handler_t handler, void *obj, void *data); -NXT_EXPORT nxt_work_handler_t nxt_thread_last_work_queue_pop(nxt_thread_t *thr, - nxt_task_t **task, void **obj, void **data); - - -NXT_EXPORT void nxt_locked_work_queue_create(nxt_locked_work_queue_t *lwq, - size_t chunk_size); -NXT_EXPORT void nxt_locked_work_queue_destroy(nxt_locked_work_queue_t *lwq); NXT_EXPORT void nxt_locked_work_queue_add(nxt_locked_work_queue_t *lwq, - nxt_work_handler_t handler, nxt_task_t *task, void *obj, void *data); + nxt_work_t *work); NXT_EXPORT nxt_work_handler_t nxt_locked_work_queue_pop( nxt_locked_work_queue_t *lwq, nxt_task_t **task, void **obj, void **data); NXT_EXPORT void nxt_locked_work_queue_move(nxt_thread_t *thr, |