summaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorIgor Sysoev <igor@sysoev.ru>2017-01-27 11:35:11 +0300
committerIgor Sysoev <igor@sysoev.ru>2017-01-27 11:35:11 +0300
commitba0391577b06446307fa073f856f57748557e0dd (patch)
treeb2b871a041edee242662c95197bed292531c3a9a /src
parent6886b83c1f3bfdc514d58ad6e9ab40873cafcb54 (diff)
downloadunit-ba0391577b06446307fa073f856f57748557e0dd.tar.gz
unit-ba0391577b06446307fa073f856f57748557e0dd.tar.bz2
Work queues refactoring.
Diffstat (limited to 'src')
-rw-r--r--src/nxt_application.c104
-rw-r--r--src/nxt_buf.c12
-rw-r--r--src/nxt_buf.h1
-rw-r--r--src/nxt_chan.c14
-rw-r--r--src/nxt_cycle.c8
-rw-r--r--src/nxt_epoll.c29
-rw-r--r--src/nxt_event_conn.c19
-rw-r--r--src/nxt_event_conn.h15
-rw-r--r--src/nxt_event_conn_accept.c12
-rw-r--r--src/nxt_event_conn_connect.c9
-rw-r--r--src/nxt_event_conn_job_sendfile.c11
-rw-r--r--src/nxt_event_conn_proxy.c14
-rw-r--r--src/nxt_event_conn_read.c7
-rw-r--r--src/nxt_event_engine.c147
-rw-r--r--src/nxt_event_engine.h19
-rw-r--r--src/nxt_event_timer.c3
-rw-r--r--src/nxt_fiber.c4
-rw-r--r--src/nxt_job.c33
-rw-r--r--src/nxt_job.h4
-rw-r--r--src/nxt_job_resolve.c2
-rw-r--r--src/nxt_kqueue.c29
-rw-r--r--src/nxt_log.h2
-rw-r--r--src/nxt_log_moderation.c2
-rw-r--r--src/nxt_master_process.c2
-rw-r--r--src/nxt_poll.c20
-rw-r--r--src/nxt_select.c19
-rw-r--r--src/nxt_sendbuf.c3
-rw-r--r--src/nxt_sockaddr.c2
-rw-r--r--src/nxt_thread.c20
-rw-r--r--src/nxt_thread.h3
-rw-r--r--src/nxt_thread_pool.c55
-rw-r--r--src/nxt_thread_pool.h3
-rw-r--r--src/nxt_work_queue.c477
-rw-r--r--src/nxt_work_queue.h54
34 files changed, 396 insertions, 762 deletions
diff --git a/src/nxt_application.c b/src/nxt_application.c
index 0d618744..e87fcdf4 100644
--- a/src/nxt_application.c
+++ b/src/nxt_application.c
@@ -19,6 +19,7 @@ static nxt_app_request_t *nxt_app_request_create(nxt_socket_t s,
static void nxt_app_conn_update(nxt_thread_t *thr, nxt_event_conn_t *c,
nxt_log_t *log);
static nxt_int_t nxt_app_write_finish(nxt_app_request_t *r);
+static void nxt_app_buf_send(nxt_event_conn_t *c, nxt_buf_t *out);
static void nxt_app_buf_completion(nxt_task_t *task, void *obj, void *data);
static void nxt_app_delivery_handler(nxt_task_t *task, void *obj, void *data);
static void nxt_app_delivery_ready(nxt_task_t *task, void *obj, void *data);
@@ -29,7 +30,7 @@ static void nxt_app_delivery_timeout(nxt_task_t *task, void *obj, void *data);
static nxt_msec_t nxt_app_delivery_timer_value(nxt_event_conn_t *c,
uintptr_t data);
static void nxt_app_delivery_done(nxt_task_t *task, nxt_event_conn_t *c);
-static void nxt_app_close_request(nxt_task_t *task, nxt_app_request_t *r);
+static void nxt_app_close_request(nxt_task_t *task, void *obj, void *data);
typedef struct nxt_app_http_parse_state_s nxt_app_http_parse_state_t;
@@ -40,6 +41,13 @@ struct nxt_app_http_parse_state_s {
u_char *end, nxt_app_http_parse_state_t *state);
};
+
+typedef struct {
+ nxt_work_t work;
+ nxt_buf_t buf;
+} nxt_app_buf_t;
+
+
static nxt_int_t nxt_app_http_parse_request(nxt_app_request_t *r, u_char *buf,
size_t size);
static nxt_int_t nxt_app_http_parse_request_line(nxt_app_request_header_t *h,
@@ -83,13 +91,11 @@ nxt_app_start(nxt_cycle_t *cycle)
return NXT_ERROR;
}
- link = nxt_malloc(sizeof(nxt_thread_link_t));
+ link = nxt_zalloc(sizeof(nxt_thread_link_t));
if (nxt_fast_path(link != NULL)) {
link->start = nxt_app_thread;
link->data = cycle;
- link->engine = NULL;
- link->exit = NULL;
return nxt_thread_create(&handle, link);
}
@@ -136,8 +142,12 @@ nxt_app_thread(void *ctx)
ls = cycle->listen_sockets->elts;
for ( ;; ) {
+ nxt_log_debug(thr->log, "wait on accept");
+
s = accept(ls->socket, NULL, NULL);
+ nxt_thread_time_update(thr);
+
if (nxt_slow_path(s == -1)) {
err = nxt_socket_errno;
@@ -190,6 +200,8 @@ nxt_app_thread(void *ctx)
nxt_app->run(r);
+ nxt_log_debug(thr->log, "app request done");
+
if (nxt_slow_path(nxt_app_write_finish(r) == NXT_ERROR)) {
goto fail;
}
@@ -577,11 +589,12 @@ nxt_app_http_read_body(nxt_app_request_t *r, u_char *data, size_t len)
nxt_int_t
nxt_app_write(nxt_app_request_t *r, const u_char *data, size_t len)
{
- size_t free;
- nxt_err_t err;
- nxt_buf_t *b, *out, **next;
- nxt_uint_t bufs;
- nxt_event_conn_t *c;
+ void *start;
+ size_t free;
+ nxt_err_t err;
+ nxt_buf_t *b, *out, **next;
+ nxt_uint_t bufs;
+ nxt_app_buf_t *ab;
out = NULL;
next = &out;
@@ -619,10 +632,8 @@ nxt_app_write(nxt_app_request_t *r, const u_char *data, size_t len)
if (bufs == nxt_app_buf_max_number) {
bufs = 0;
*next = NULL;
- c = r->event_conn;
- nxt_event_engine_post(nxt_app_engine, nxt_app_delivery_handler,
- &c->task, c, out, &nxt_main_log);
+ nxt_app_buf_send(r->event_conn, out);
out = NULL;
next = &out;
@@ -658,11 +669,20 @@ nxt_app_write(nxt_app_request_t *r, const u_char *data, size_t len)
(void) nxt_thread_mutex_unlock(&nxt_app_mutex);
if (b == NULL) {
- b = nxt_buf_mem_alloc(nxt_app_mem_pool, 4096, 0);
- if (nxt_slow_path(b == NULL)) {
+ start = nxt_malloc(4096);
+ if (nxt_slow_path(start == NULL)) {
+ return NXT_ERROR;
+ }
+
+ ab = nxt_zalloc(sizeof(nxt_app_buf_t));
+ if (nxt_slow_path(ab == NULL)) {
return NXT_ERROR;
}
+ b = &ab->buf;
+
+ nxt_buf_mem_init(b, start, 4096);
+
b->completion_handler = nxt_app_buf_completion;
nxt_app_buf_current_number++;
@@ -675,10 +695,8 @@ nxt_app_write(nxt_app_request_t *r, const u_char *data, size_t len)
if (out != NULL) {
*next = NULL;
- c = r->event_conn;
- nxt_event_engine_post(nxt_app_engine, nxt_app_delivery_handler,
- &c->task, c, out, &nxt_main_log);
+ nxt_app_buf_send(r->event_conn, out);
}
return NXT_OK;
@@ -688,8 +706,7 @@ nxt_app_write(nxt_app_request_t *r, const u_char *data, size_t len)
static nxt_int_t
nxt_app_write_finish(nxt_app_request_t *r)
{
- nxt_buf_t *b, *out;
- nxt_event_conn_t *c;
+ nxt_buf_t *b, *out;
b = nxt_buf_sync_alloc(r->mem_pool, NXT_BUF_SYNC_LAST);
if (nxt_slow_path(b == NULL)) {
@@ -709,16 +726,26 @@ nxt_app_write_finish(nxt_app_request_t *r)
out = b;
}
- c = r->event_conn;
-
- nxt_event_engine_post(nxt_app_engine, nxt_app_delivery_handler,
- &c->task, c, out, &nxt_main_log);
+ nxt_app_buf_send(r->event_conn, out);
return NXT_OK;
}
static void
+nxt_app_buf_send(nxt_event_conn_t *c, nxt_buf_t *out)
+{
+ nxt_app_buf_t *ab;
+
+ ab = nxt_container_of(out, nxt_app_buf_t, buf);
+
+ nxt_work_set(&ab->work, nxt_app_delivery_handler, &c->task, c, out);
+
+ nxt_event_engine_post(nxt_app_engine, &ab->work);
+}
+
+
+static void
nxt_app_buf_completion(nxt_task_t *task, void *obj, void *data)
{
nxt_buf_t *b;
@@ -762,8 +789,8 @@ nxt_app_delivery_handler(nxt_task_t *task, void *obj, void *data)
if (c->socket.timedout || c->socket.error != 0) {
nxt_buf_chain_add(&nxt_app_buf_done, b);
- nxt_thread_work_queue_add(task->thread, c->write_work_queue,
- nxt_app_delivery_completion, task, c, NULL);
+ nxt_work_queue_add(c->write_work_queue, nxt_app_delivery_completion,
+ task, c, NULL);
return;
}
@@ -799,8 +826,8 @@ nxt_app_delivery_ready(nxt_task_t *task, void *obj, void *data)
nxt_debug(task, "app delivery ready");
- nxt_thread_work_queue_add(task->thread, c->write_work_queue,
- nxt_app_delivery_completion, task, c, NULL);
+ nxt_work_queue_add(c->write_work_queue,
+ nxt_app_delivery_completion, task, c, NULL);
}
@@ -808,11 +835,8 @@ static void
nxt_app_delivery_completion(nxt_task_t *task, void *obj, void *data)
{
nxt_buf_t *b, *bn, *free;
- nxt_thread_t *thread;
nxt_app_request_t *r;
- thread = task->thread;
-
nxt_debug(task, "app delivery completion");
free = NULL;
@@ -832,7 +856,9 @@ nxt_app_delivery_completion(nxt_task_t *task, void *obj, void *data)
if (nxt_buf_is_last(b)) {
r = (nxt_app_request_t *) b->parent;
- nxt_app_close_request(task, r);
+
+ nxt_work_queue_add(&task->thread->engine->final_work_queue,
+ nxt_app_close_request, task, r, NULL);
}
}
@@ -850,7 +876,7 @@ nxt_app_delivery_completion(nxt_task_t *task, void *obj, void *data)
(void) nxt_thread_mutex_unlock(&nxt_app_mutex);
- nxt_thread_time_update(thread);
+ nxt_thread_time_update(task->thread);
(void) nxt_thread_cond_signal(&nxt_app_cond);
}
@@ -903,20 +929,22 @@ nxt_app_delivery_done(nxt_task_t *task, nxt_event_conn_t *c)
c->write = NULL;
- nxt_thread_work_queue_add(task->thread, c->write_work_queue,
- nxt_app_delivery_completion, task, c, NULL);
+ nxt_work_queue_add(c->write_work_queue,
+ nxt_app_delivery_completion, task, c, NULL);
}
static void
-nxt_app_close_request(nxt_task_t *task, nxt_app_request_t *r)
+nxt_app_close_request(nxt_task_t *task, void *obj, void *data)
{
- nxt_event_conn_t *c;
-
- nxt_debug(task, "app close connection");
+ nxt_event_conn_t *c;
+ nxt_app_request_t *r;
+ r = obj;
c = r->event_conn;
+ nxt_debug(task, "app close connection");
+
nxt_event_conn_close(task, c);
nxt_mem_pool_destroy(c->mem_pool);
diff --git a/src/nxt_buf.c b/src/nxt_buf.c
index 88444a3d..d2b6fe7a 100644
--- a/src/nxt_buf.c
+++ b/src/nxt_buf.c
@@ -10,6 +10,18 @@
static void nxt_buf_completion(nxt_task_t *task, void *obj, void *data);
+void
+nxt_buf_mem_init(nxt_buf_t *b, void *start, size_t size)
+{
+ b->size = NXT_BUF_MEM_SIZE;
+
+ b->mem.start = start;
+ b->mem.pos = start;
+ b->mem.free = start;
+ b->mem.end = start + size;
+}
+
+
nxt_buf_t *
nxt_buf_mem_alloc(nxt_mem_pool_t *mp, size_t size, nxt_uint_t flags)
{
diff --git a/src/nxt_buf.h b/src/nxt_buf.h
index 240a1d22..85ab3602 100644
--- a/src/nxt_buf.h
+++ b/src/nxt_buf.h
@@ -226,6 +226,7 @@ nxt_buf_used_size(b) \
nxt_buf_mem_used_size(&(b)->mem))
+NXT_EXPORT void nxt_buf_mem_init(nxt_buf_t *b, void *start, size_t size);
NXT_EXPORT nxt_buf_t *nxt_buf_mem_alloc(nxt_mem_pool_t *mp, size_t size,
nxt_uint_t flags);
NXT_EXPORT nxt_buf_t *nxt_buf_file_alloc(nxt_mem_pool_t *mp, size_t size,
diff --git a/src/nxt_chan.c b/src/nxt_chan.c
index baa4b9d4..6832ecdc 100644
--- a/src/nxt_chan.c
+++ b/src/nxt_chan.c
@@ -141,7 +141,7 @@ nxt_chan_write_enable(nxt_task_t *task, nxt_chan_t *chan)
chan->socket.task = &chan->task;
- chan->socket.write_work_queue = &task->thread->work_queue.main;
+ chan->socket.write_work_queue = &task->thread->engine->fast_work_queue;
chan->socket.write_handler = nxt_chan_write_handler;
chan->socket.error_handler = nxt_chan_error_handler;
}
@@ -290,9 +290,8 @@ nxt_chan_write_handler(nxt_task_t *task, void *obj, void *data)
fail:
- nxt_thread_work_queue_add(task->thread, &task->thread->work_queue.main,
- nxt_chan_error_handler, task, &chan->socket,
- NULL);
+ nxt_work_queue_add(&task->thread->engine->fast_work_queue,
+ nxt_chan_error_handler, task, &chan->socket, NULL);
}
@@ -308,7 +307,7 @@ nxt_chan_read_enable(nxt_task_t *task, nxt_chan_t *chan)
chan->socket.task = &chan->task;
- chan->socket.read_work_queue = &task->thread->work_queue.main;
+ chan->socket.read_work_queue = &task->thread->engine->fast_work_queue;
chan->socket.read_handler = nxt_chan_read_handler;
chan->socket.error_handler = nxt_chan_error_handler;
@@ -378,9 +377,8 @@ nxt_chan_read_handler(nxt_task_t *task, void *obj, void *data)
/* n == 0 || n == NXT_ERROR */
- nxt_thread_work_queue_add(task->thread, &task->thread->work_queue.main,
- nxt_chan_error_handler, task,
- &chan->socket, NULL);
+ nxt_work_queue_add(&task->thread->engine->fast_work_queue,
+ nxt_chan_error_handler, task, &chan->socket, NULL);
return;
}
}
diff --git a/src/nxt_cycle.c b/src/nxt_cycle.c
index 4b6eeb40..4b19f9da 100644
--- a/src/nxt_cycle.c
+++ b/src/nxt_cycle.c
@@ -153,8 +153,8 @@ nxt_cycle_create(nxt_thread_t *thr, nxt_task_t *task, nxt_cycle_t *previous,
nxt_log_debug(thr->log, "new cycle: %p", cycle);
- nxt_thread_work_queue_add(thr, &thr->work_queue.main, nxt_cycle_start,
- task, cycle, NULL);
+ nxt_work_queue_add(&thr->engine->fast_work_queue, nxt_cycle_start,
+ task, cycle, NULL);
return NXT_OK;
@@ -583,8 +583,8 @@ nxt_cycle_quit(nxt_task_t *task, nxt_cycle_t *cycle)
nxt_cycle_close_idle_connections(thr, task);
if (done) {
- nxt_thread_work_queue_add(thr, &thr->work_queue.main, nxt_cycle_exit,
- task, cycle, NULL);
+ nxt_work_queue_add(&thr->engine->fast_work_queue, nxt_cycle_exit,
+ task, cycle, NULL);
}
}
diff --git a/src/nxt_epoll.c b/src/nxt_epoll.c
index 0fe81092..0de9763a 100644
--- a/src/nxt_epoll.c
+++ b/src/nxt_epoll.c
@@ -648,10 +648,9 @@ nxt_epoll_commit_changes(nxt_task_t *task, nxt_epoll_event_set_t *es)
nxt_log(ev->task, NXT_LOG_CRIT, "epoll_ctl(%d, %d, %d) failed %E",
es->epoll, ch->op, ch->fd, nxt_errno);
- nxt_thread_work_queue_add(task->thread,
- &task->thread->work_queue.main,
- nxt_epoll_error_handler,
- ev->task, ev, ev->data);
+ nxt_work_queue_add(&task->thread->engine->fast_work_queue,
+ nxt_epoll_error_handler,
+ ev->task, ev, ev->data);
ret = NXT_ERROR;
}
@@ -719,12 +718,12 @@ nxt_epoll_add_signal(nxt_epoll_event_set_t *es, nxt_event_signals_t *signals)
nxt_main_log_debug("signalfd(): %d", fd);
+ thr = nxt_thread();
+
es->signalfd.data = signals->handler;
- es->signalfd.read_work_queue = nxt_thread_main_work_queue();
+ es->signalfd.read_work_queue = &thr->engine->fast_work_queue;
es->signalfd.read_handler = nxt_epoll_signalfd_handler;
es->signalfd.log = &nxt_main_log;
-
- thr = nxt_thread();
es->signalfd.task = &thr->engine->task;
ee.events = EPOLLIN;
@@ -805,12 +804,12 @@ nxt_epoll_enable_post(nxt_event_set_t *event_set, nxt_work_handler_t handler)
nxt_main_log_debug("eventfd(): %d", es->eventfd.fd);
- es->eventfd.read_work_queue = nxt_thread_main_work_queue();
+ thr = nxt_thread();
+
+ es->eventfd.read_work_queue = &thr->engine->fast_work_queue;
es->eventfd.read_handler = nxt_epoll_eventfd_handler;
es->eventfd.data = es;
es->eventfd.log = &nxt_main_log;
-
- thr = nxt_thread();
es->eventfd.task = &thr->engine->task;
ee.events = EPOLLIN | EPOLLET;
@@ -960,9 +959,8 @@ nxt_epoll_poll(nxt_task_t *task, nxt_event_set_t *event_set,
error = 0;
- nxt_thread_work_queue_add(task->thread, ev->read_work_queue,
- ev->read_handler,
- ev->task, ev, ev->data);
+ nxt_work_queue_add(ev->read_work_queue, ev->read_handler,
+ ev->task, ev, ev->data);
} else if (event_set->epoll.mode == 0) {
/* Level-triggered mode. */
@@ -981,9 +979,8 @@ nxt_epoll_poll(nxt_task_t *task, nxt_event_set_t *event_set,
error = 0;
- nxt_thread_work_queue_add(task->thread, ev->write_work_queue,
- ev->write_handler,
- ev->task, ev, ev->data);
+ nxt_work_queue_add(ev->write_work_queue, ev->write_handler,
+ ev->task, ev, ev->data);
} else if (event_set->epoll.mode == 0) {
/* Level-triggered mode. */
diff --git a/src/nxt_event_conn.c b/src/nxt_event_conn.c
index 6c4fa949..913038f1 100644
--- a/src/nxt_event_conn.c
+++ b/src/nxt_event_conn.c
@@ -85,8 +85,8 @@ nxt_event_conn_create(nxt_mem_pool_t *mp, nxt_log_t *log)
c->max_chunk = NXT_INT32_T_MAX;
c->sendfile = NXT_CONN_SENDFILE_UNSET;
- c->socket.read_work_queue = &thr->work_queue.main;
- c->socket.write_work_queue = &thr->work_queue.main;
+ c->socket.read_work_queue = &thr->engine->fast_work_queue;
+ c->socket.write_work_queue = &thr->engine->fast_work_queue;
nxt_event_conn_timer_init(&c->read_timer, c, c->socket.read_work_queue);
nxt_event_conn_timer_init(&c->write_timer, c, c->socket.write_work_queue);
@@ -143,10 +143,6 @@ nxt_event_conn_close(nxt_task_t *task, nxt_event_conn_t *c)
thr = task->thread;
- nxt_thread_work_queue_drop(thr, c);
- nxt_thread_work_queue_drop(thr, &c->read_timer);
- nxt_thread_work_queue_drop(thr, &c->write_timer);
-
engine = thr->engine;
nxt_event_timer_delete(engine, &c->read_timer);
@@ -168,8 +164,8 @@ nxt_event_conn_close(nxt_task_t *task, nxt_event_conn_t *c)
handler = nxt_event_conn_shutdown_socket;
}
- nxt_thread_work_queue_add(thr, wq, handler, task,
- (void *) (uintptr_t) c->socket.fd, NULL);
+ nxt_work_queue_add(wq, handler, task,
+ (void *) (uintptr_t) c->socket.fd, NULL);
} else {
nxt_socket_close(c->socket.fd);
@@ -188,10 +184,9 @@ nxt_event_conn_shutdown_socket(nxt_task_t *task, void *obj, void *data)
nxt_socket_shutdown(s, SHUT_RDWR);
- nxt_thread_work_queue_add(task->thread,
- &task->thread->engine->close_work_queue,
- nxt_event_conn_close_socket, task,
- (void *) (uintptr_t) s, NULL);
+ nxt_work_queue_add(&task->thread->engine->close_work_queue,
+ nxt_event_conn_close_socket, task,
+ (void *) (uintptr_t) s, NULL);
}
diff --git a/src/nxt_event_conn.h b/src/nxt_event_conn.h
index c35dcf42..0140664b 100644
--- a/src/nxt_event_conn.h
+++ b/src/nxt_event_conn.h
@@ -192,7 +192,7 @@ typedef struct {
nxt_event_conn_io_handle(thr, wq, handler, task, c, data) \
do { \
if (thr->engine->batch != 0) { \
- nxt_thread_work_queue_add(thr, wq, handler, task, c, data); \
+ nxt_work_queue_add(wq, handler, task, c, data); \
\
} else { \
handler(task, c, data); \
@@ -301,9 +301,8 @@ NXT_EXPORT void nxt_event_conn_job_sendfile(nxt_task_t *task,
#define \
nxt_event_conn_connect_enqueue(thr, task, c) \
- nxt_thread_work_queue_add(thr, &thr->engine->socket_work_queue, \
- nxt_event_conn_batch_socket, \
- task, c, c->socket.data)
+ nxt_work_queue_add(&thr->engine->socket_work_queue, \
+ nxt_event_conn_batch_socket, task, c, c->socket.data)
#define \
@@ -311,8 +310,8 @@ nxt_event_conn_read_enqueue(thr, task, c) \
do { \
c->socket.read_work_queue = &thr->engine->read_work_queue; \
\
- nxt_thread_work_queue_add(thr, &thr->engine->read_work_queue, \
- c->io->read, task, c, c->socket.data); \
+ nxt_work_queue_add(&thr->engine->read_work_queue, \
+ c->io->read, task, c, c->socket.data); \
} while (0)
@@ -321,8 +320,8 @@ nxt_event_conn_write_enqueue(thr, task, c) \
do { \
c->socket.write_work_queue = &thr->engine->write_work_queue; \
\
- nxt_thread_work_queue_add(thr, &thr->engine->write_work_queue, \
- c->io->write, task, c, c->socket.data); \
+ nxt_work_queue_add(&thr->engine->write_work_queue, \
+ c->io->write, task, c, c->socket.data); \
} while (0)
diff --git a/src/nxt_event_conn_accept.c b/src/nxt_event_conn_accept.c
index a1339235..27b5ac27 100644
--- a/src/nxt_event_conn_accept.c
+++ b/src/nxt_event_conn_accept.c
@@ -50,7 +50,7 @@ nxt_event_conn_listen(nxt_task_t *task, nxt_listen_socket_t *ls)
cls->socket.read_work_queue = &engine->accept_work_queue;
} else {
- cls->socket.read_work_queue = &task->thread->work_queue.main;
+ cls->socket.read_work_queue = &engine->fast_work_queue;
cls->batch = 1;
}
@@ -62,7 +62,7 @@ nxt_event_conn_listen(nxt_task_t *task, nxt_listen_socket_t *ls)
cls->listen = ls;
- cls->timer.work_queue = &task->thread->work_queue.main;
+ cls->timer.work_queue = &engine->fast_work_queue;
cls->timer.handler = nxt_event_conn_listen_timer_handler;
cls->timer.log = &nxt_main_log;
@@ -221,15 +221,15 @@ nxt_event_conn_accept(nxt_task_t *task, nxt_event_conn_listen_t *cls,
c->listen->handler(task, c, NULL);
} else {
- nxt_thread_work_queue_add(task->thread, c->write_work_queue,
- c->listen->handler, task, c, NULL);
+ nxt_work_queue_add(c->write_work_queue, c->listen->handler,
+ task, c, NULL);
}
next = nxt_event_conn_accept_next(task, cls);
if (next != NULL && cls->socket.read_ready) {
- nxt_thread_work_queue_add(task->thread, cls->socket.read_work_queue,
- cls->accept, task, cls, next);
+ nxt_work_queue_add(cls->socket.read_work_queue,
+ cls->accept, task, cls, next);
}
}
diff --git a/src/nxt_event_conn_connect.c b/src/nxt_event_conn_connect.c
index 55554720..3a80b8d7 100644
--- a/src/nxt_event_conn_connect.c
+++ b/src/nxt_event_conn_connect.c
@@ -17,8 +17,8 @@ nxt_event_conn_connect(nxt_task_t *task, nxt_event_conn_t *c)
engine = task->thread->engine;
if (engine->batch != 0) {
- nxt_thread_work_queue_add(task->thread, &engine->socket_work_queue,
- nxt_event_conn_batch_socket, task, c, data);
+ nxt_work_queue_add(&engine->socket_work_queue,
+ nxt_event_conn_batch_socket, task, c, data);
return;
}
@@ -47,9 +47,8 @@ nxt_event_conn_batch_socket(nxt_task_t *task, void *obj, void *data)
handler = c->write_state->error_handler;
}
- nxt_thread_work_queue_add(task->thread,
- &task->thread->engine->connect_work_queue,
- handler, task, c, data);
+ nxt_work_queue_add(&task->thread->engine->connect_work_queue,
+ handler, task, c, data);
}
diff --git a/src/nxt_event_conn_job_sendfile.c b/src/nxt_event_conn_job_sendfile.c
index 86a05969..e9334016 100644
--- a/src/nxt_event_conn_job_sendfile.c
+++ b/src/nxt_event_conn_job_sendfile.c
@@ -157,9 +157,10 @@ done:
fast:
- nxt_thread_pool_post(task->thread->thread_pool,
- nxt_event_conn_job_sendfile_handler,
- &jbs->job.task, jbs, c);
+ nxt_work_set(&jbs->job.work, nxt_event_conn_job_sendfile_handler,
+ jbs->job.task, jbs, c);
+
+ nxt_thread_pool_post(task->thread->thread_pool, &jbs->job.work);
}
@@ -257,8 +258,8 @@ nxt_event_conn_job_sendfile_completion(nxt_task_t *task, nxt_event_conn_t *c,
break;
}
- nxt_thread_work_queue_add(task->thread, c->write_work_queue,
- b->completion_handler, task, b, b->parent);
+ nxt_work_queue_add(c->write_work_queue,
+ b->completion_handler, task, b, b->parent);
b = b->next;
}
diff --git a/src/nxt_event_conn_proxy.c b/src/nxt_event_conn_proxy.c
index 0a2a6474..a63b18f6 100644
--- a/src/nxt_event_conn_proxy.c
+++ b/src/nxt_event_conn_proxy.c
@@ -456,9 +456,9 @@ nxt_event_conn_proxy_read_process(nxt_task_t *task, nxt_event_conn_proxy_t *p,
}
if (rb->mem.start != rb->mem.end) {
- nxt_thread_work_queue_push(task->thread, source->read_work_queue,
- nxt_event_conn_proxy_read,
- task, source, source->socket.data);
+ nxt_work_queue_add(source->read_work_queue,
+ nxt_event_conn_proxy_read,
+ task, source, source->socket.data);
break;
}
@@ -665,9 +665,9 @@ nxt_event_conn_proxy_write_process(nxt_task_t *task, nxt_event_conn_proxy_t *p,
nxt_buf_free(sink->mem_pool, wb);
}
- nxt_thread_work_queue_push(task->thread, source->read_work_queue,
- nxt_event_conn_proxy_read, task, source,
- source->socket.data);
+ nxt_work_queue_add(source->read_work_queue,
+ nxt_event_conn_proxy_read, task, source,
+ source->socket.data);
}
@@ -1008,8 +1008,6 @@ nxt_event_conn_proxy_complete(nxt_task_t *task, nxt_event_conn_proxy_t *p)
nxt_event_conn_close(task, p->peer);
} else if (p->delayed) {
- nxt_thread_work_queue_drop(task->thread, &p->peer->write_timer);
-
nxt_queue_remove(&p->peer->link);
nxt_event_timer_delete(task->thread->engine, &p->peer->write_timer);
}
diff --git a/src/nxt_event_conn_read.c b/src/nxt_event_conn_read.c
index f6bbbbd8..93a17ddc 100644
--- a/src/nxt_event_conn_read.c
+++ b/src/nxt_event_conn_read.c
@@ -20,8 +20,7 @@ nxt_event_conn_read(nxt_task_t *task, nxt_event_conn_t *c)
wq = &task->thread->engine->read_work_queue;
c->socket.read_work_queue = wq;
- nxt_thread_work_queue_add(task->thread, wq, handler, task, c,
- c->socket.data);
+ nxt_work_queue_add( wq, handler, task, c, c->socket.data);
return;
}
@@ -134,8 +133,8 @@ ready:
done:
if (batch) {
- nxt_thread_work_queue_add(task->thread, c->read_work_queue, handler,
- task, c, data);
+ nxt_work_queue_add(c->read_work_queue, handler, task, c, data);
+
} else {
handler(task, c, data);
}
diff --git a/src/nxt_event_engine.c b/src/nxt_event_engine.c
index ead63b72..921de23c 100644
--- a/src/nxt_event_engine.c
+++ b/src/nxt_event_engine.c
@@ -20,8 +20,8 @@ static void nxt_event_engine_signal_pipe_error(nxt_task_t *task, void *obj,
void *data);
static void nxt_event_engine_signal_handler(nxt_task_t *task, void *obj,
void *data);
-static const nxt_event_sig_t *nxt_event_engine_signal_find(nxt_task_t *task,
- nxt_uint_t signo);
+static nxt_work_handler_t nxt_event_engine_queue_pop(nxt_event_engine_t *engine,
+ nxt_task_t **task, void **obj, void **data);
nxt_event_engine_t *
@@ -52,8 +52,21 @@ nxt_event_engine_create(nxt_thread_t *thr, const nxt_event_set_ops_t *event_set,
}
}
- nxt_thread_work_queue_create(thr, 0);
+ engine->current_work_queue = &engine->fast_work_queue;
+ nxt_work_queue_cache_create(&engine->work_queue_cache, 0);
+
+ engine->fast_work_queue.cache = &engine->work_queue_cache;
+ engine->accept_work_queue.cache = &engine->work_queue_cache;
+ engine->read_work_queue.cache = &engine->work_queue_cache;
+ engine->socket_work_queue.cache = &engine->work_queue_cache;
+ engine->connect_work_queue.cache = &engine->work_queue_cache;
+ engine->write_work_queue.cache = &engine->work_queue_cache;
+ engine->shutdown_work_queue.cache = &engine->work_queue_cache;
+ engine->close_work_queue.cache = &engine->work_queue_cache;
+ engine->final_work_queue.cache = &engine->work_queue_cache;
+
+ nxt_work_queue_name(&engine->fast_work_queue, "fast");
nxt_work_queue_name(&engine->accept_work_queue, "accept");
nxt_work_queue_name(&engine->read_work_queue, "read");
nxt_work_queue_name(&engine->socket_work_queue, "socket");
@@ -61,12 +74,7 @@ nxt_event_engine_create(nxt_thread_t *thr, const nxt_event_set_ops_t *event_set,
nxt_work_queue_name(&engine->write_work_queue, "write");
nxt_work_queue_name(&engine->shutdown_work_queue, "shutdown");
nxt_work_queue_name(&engine->close_work_queue, "close");
-
-#if (NXT_THREADS)
-
- nxt_locked_work_queue_create(&engine->work_queue, 7);
-
-#endif
+ nxt_work_queue_name(&engine->final_work_queue, "final");
if (signals != NULL) {
engine->signals = nxt_event_engine_signals(signals);
@@ -134,7 +142,7 @@ event_set_fail:
signals_fail:
nxt_free(engine->signals);
- nxt_thread_work_queue_destroy(thr);
+ nxt_work_queue_cache_destroy(&engine->work_queue_cache);
nxt_free(engine->fibers);
fibers_fail:
@@ -193,9 +201,9 @@ nxt_event_engine_signal_pipe_create(nxt_event_engine_t *engine)
}
pipe->event.fd = pipe->fds[0];
- pipe->event.read_work_queue = &engine->task.thread->work_queue.main;
+ pipe->event.read_work_queue = &engine->fast_work_queue;
pipe->event.read_handler = nxt_event_engine_signal_pipe;
- pipe->event.write_work_queue = &engine->task.thread->work_queue.main;
+ pipe->event.write_work_queue = &engine->fast_work_queue;
pipe->event.error_handler = nxt_event_engine_signal_pipe_error;
pipe->event.log = &nxt_main_log;
@@ -237,12 +245,11 @@ nxt_event_engine_signal_pipe_close(nxt_task_t *task, void *obj, void *data)
void
-nxt_event_engine_post(nxt_event_engine_t *engine, nxt_work_handler_t handler,
- nxt_task_t *task, void *obj, void *data, nxt_log_t *log)
+nxt_event_engine_post(nxt_event_engine_t *engine, nxt_work_t *work)
{
nxt_thread_log_debug("event engine post");
- nxt_locked_work_queue_add(&engine->work_queue, handler, task, obj, data);
+ nxt_locked_work_queue_add(&engine->locked_work_queue, work);
nxt_event_engine_signal(engine, 0);
}
@@ -273,12 +280,11 @@ nxt_event_engine_signal(nxt_event_engine_t *engine, nxt_uint_t signo)
static void
nxt_event_engine_signal_pipe(nxt_task_t *task, void *obj, void *data)
{
- int i, n;
- u_char signo;
- nxt_bool_t post;
- nxt_event_fd_t *ev;
- const nxt_event_sig_t *sigev;
- u_char buf[128];
+ int i, n;
+ u_char signo;
+ nxt_bool_t post;
+ nxt_event_fd_t *ev;
+ u_char buf[128];
ev = obj;
@@ -299,12 +305,8 @@ nxt_event_engine_signal_pipe(nxt_task_t *task, void *obj, void *data)
post = 1;
} else {
- sigev = nxt_event_engine_signal_find(task, signo);
-
- if (nxt_fast_path(sigev != NULL)) {
- sigev->handler(task, (void *) (uintptr_t) signo,
- (void *) sigev->name);
- }
+ nxt_event_engine_signal_handler(task,
+ (void *) (uintptr_t) signo, NULL);
}
}
@@ -319,12 +321,14 @@ nxt_event_engine_signal_pipe(nxt_task_t *task, void *obj, void *data)
static void
nxt_event_engine_post_handler(nxt_task_t *task, void *obj, void *data)
{
- nxt_thread_t *thread;
+ nxt_thread_t *thread;
+ nxt_event_engine_t *engine;
thread = task->thread;
+ engine = thread->engine;
- nxt_locked_work_queue_move(thread, &thread->engine->work_queue,
- &thread->work_queue.main);
+ nxt_locked_work_queue_move(thread, &engine->locked_work_queue,
+ &engine->fast_work_queue);
}
@@ -351,31 +355,17 @@ nxt_event_engine_signal_handler(nxt_task_t *task, void *obj, void *data)
signo = (uintptr_t) obj;
- sigev = nxt_event_engine_signal_find(task, signo);
-
- if (nxt_fast_path(sigev != NULL)) {
- sigev->handler(task, (void *) (uintptr_t) signo, (void *) sigev->name);
- }
-}
-
-
-static const nxt_event_sig_t *
-nxt_event_engine_signal_find(nxt_task_t *task, nxt_uint_t signo)
-{
- const nxt_event_sig_t *sigev;
-
for (sigev = task->thread->engine->signals->sigev;
sigev->signo != 0;
sigev++)
{
if (signo == (nxt_uint_t) sigev->signo) {
- return sigev;
+ sigev->handler(task, (void *) signo, (void *) sigev->name);
+ return;
}
}
nxt_log(task, NXT_LOG_CRIT, "signal %ui handler not found", signo);
-
- return NULL;
}
@@ -397,7 +387,7 @@ nxt_event_engine_change(nxt_thread_t *thr, nxt_task_t *task,
nxt_event_engine_signals_stop(engine);
/*
- * Add to thread main work queue the signal events possibly
+ * Add to engine fast work queue the signal events possibly
* received before the blocking signal processing.
*/
nxt_event_engine_signal_pipe(task, &engine->pipe->event, NULL);
@@ -406,11 +396,11 @@ nxt_event_engine_change(nxt_thread_t *thr, nxt_task_t *task,
if (engine->pipe != NULL && event_set->enable_post != NULL) {
/*
* An engine pipe must be closed after all signal events
- * added above to thread main work queue will be processed.
+ * added above to engine fast work queue will be processed.
*/
- nxt_thread_work_queue_add(thr, &thr->work_queue.main,
- nxt_event_engine_signal_pipe_close,
- &engine->task, engine->pipe, NULL);
+ nxt_work_queue_add(&engine->final_work_queue,
+ nxt_event_engine_signal_pipe_close,
+ &engine->task, engine->pipe, NULL);
engine->pipe = NULL;
}
@@ -455,8 +445,7 @@ nxt_event_engine_free(nxt_event_engine_t *engine)
nxt_event_engine_signal_pipe_free(engine);
nxt_free(engine->signals);
- nxt_locked_work_queue_destroy(&engine->work_queue);
- nxt_thread_work_queue_destroy(nxt_thread());
+ nxt_work_queue_cache_destroy(&engine->work_queue_cache);
engine->event->free(engine->event_set);
@@ -466,6 +455,35 @@ nxt_event_engine_free(nxt_event_engine_t *engine)
}
+static nxt_work_handler_t
+nxt_event_engine_queue_pop(nxt_event_engine_t *engine, nxt_task_t **task,
+ void **obj, void **data)
+{
+ nxt_work_queue_t *wq;
+
+ wq = engine->current_work_queue;
+
+ if (wq->head == NULL) {
+ wq = &engine->fast_work_queue;
+
+ while (wq->head == NULL) {
+ engine->current_work_queue++;
+ wq = engine->current_work_queue;
+
+ if (wq > &engine->final_work_queue) {
+ engine->current_work_queue = &engine->fast_work_queue;
+
+ return NULL;
+ }
+ }
+ }
+
+ nxt_debug(&engine->task, "work queue: %s", wq->name);
+
+ return nxt_work_queue_pop(wq, task, obj, data);
+}
+
+
void
nxt_event_engine_start(nxt_event_engine_t *engine)
{
@@ -487,40 +505,25 @@ nxt_event_engine_start(nxt_event_engine_t *engine)
/* A return point from fibers. */
}
- for ( ;; ) {
+ thr->log = &nxt_main_log;
- for ( ;; ) {
- handler = nxt_thread_work_queue_pop(thr, &task, &obj, &data);
-
- if (handler == NULL) {
- break;
- }
-
- handler(task, obj, data);
-
- thr->log = &nxt_main_log;
- }
+ for ( ;; ) {
for ( ;; ) {
- handler = nxt_thread_last_work_queue_pop(thr, &task, &obj, &data);
+ handler = nxt_event_engine_queue_pop(engine, &task, &obj, &data);
if (handler == NULL) {
break;
}
handler(task, obj, data);
-
- thr->log = &nxt_main_log;
}
/* Attach some event engine work queues in preferred order. */
- nxt_work_queue_attach(thr, &engine->accept_work_queue);
- nxt_work_queue_attach(thr, &engine->read_work_queue);
-
timeout = nxt_event_timer_find(engine);
- engine->event->poll(task, engine->event_set, timeout);
+ engine->event->poll(&engine->task, engine->event_set, timeout);
/*
* Look up expired timers only if a new zero timer has been
diff --git a/src/nxt_event_engine.h b/src/nxt_event_engine.h
index 1ff1399a..54b3bcde 100644
--- a/src/nxt_event_engine.h
+++ b/src/nxt_event_engine.h
@@ -33,6 +33,9 @@ struct nxt_event_engine_s {
*/
nxt_event_engine_pipe_t *pipe;
+ nxt_work_queue_cache_t work_queue_cache;
+ nxt_work_queue_t *current_work_queue;
+ nxt_work_queue_t fast_work_queue;
nxt_work_queue_t accept_work_queue;
nxt_work_queue_t read_work_queue;
nxt_work_queue_t socket_work_queue;
@@ -40,8 +43,9 @@ struct nxt_event_engine_s {
nxt_work_queue_t write_work_queue;
nxt_work_queue_t shutdown_work_queue;
nxt_work_queue_t close_work_queue;
+ nxt_work_queue_t final_work_queue;
- nxt_locked_work_queue_t work_queue;
+ nxt_locked_work_queue_t locked_work_queue;
nxt_event_signals_t *signals;
@@ -68,8 +72,7 @@ NXT_EXPORT void nxt_event_engine_free(nxt_event_engine_t *engine);
NXT_EXPORT void nxt_event_engine_start(nxt_event_engine_t *engine);
NXT_EXPORT void nxt_event_engine_post(nxt_event_engine_t *engine,
- nxt_work_handler_t handler, nxt_task_t *task, void *obj, void *data,
- nxt_log_t *log);
+ nxt_work_t *work);
NXT_EXPORT void nxt_event_engine_signal(nxt_event_engine_t *engine,
nxt_uint_t signo);
@@ -84,14 +87,4 @@ nxt_thread_event_engine(void)
}
-nxt_inline nxt_work_queue_t *
-nxt_thread_main_work_queue(void)
-{
- nxt_thread_t *thr;
-
- thr = nxt_thread();
- return &thr->work_queue.main;
-}
-
-
#endif /* _NXT_EVENT_ENGINE_H_INCLUDED_ */
diff --git a/src/nxt_event_timer.c b/src/nxt_event_timer.c
index 4a260705..e9615f92 100644
--- a/src/nxt_event_timer.c
+++ b/src/nxt_event_timer.c
@@ -313,8 +313,7 @@ nxt_event_timer_expire(nxt_thread_t *thr, nxt_msec_t now)
if (ev->state != NXT_EVENT_TIMER_DISABLED) {
ev->state = NXT_EVENT_TIMER_DISABLED;
- nxt_thread_work_queue_add(thr, ev->work_queue, ev->handler,
- ev->task, ev, NULL);
+ nxt_work_queue_add(ev->work_queue, ev->handler, ev->task, ev, NULL);
}
}
}
diff --git a/src/nxt_fiber.c b/src/nxt_fiber.c
index a7b89402..f792897d 100644
--- a/src/nxt_fiber.c
+++ b/src/nxt_fiber.c
@@ -16,7 +16,7 @@ static void nxt_fiber_timer_handler(nxt_task_t *task, void *obj, void *data);
#define \
nxt_fiber_enqueue(thr, task, fib) \
- nxt_thread_work_queue_add(thr, &(thr)->work_queue.main, \
+ nxt_work_queue_add(&(thr)->engine->fast_work_queue, \
nxt_fiber_switch_handler, task, fib, NULL)
@@ -392,7 +392,7 @@ nxt_fiber_sleep(nxt_task_t *task, nxt_msec_t timeout)
fib = task->thread->fiber;
- fib->timer.work_queue = &task->thread->work_queue.main;
+ fib->timer.work_queue = &task->thread->engine->fast_work_queue;
fib->timer.handler = nxt_fiber_timer_handler;
fib->timer.log = &nxt_main_log;
diff --git a/src/nxt_job.c b/src/nxt_job.c
index e1256d61..86cfc462 100644
--- a/src/nxt_job.c
+++ b/src/nxt_job.c
@@ -44,8 +44,6 @@ nxt_job_create(nxt_mem_pool_t *mp, size_t size)
/* Allow safe nxt_queue_remove() in nxt_job_destroy(). */
nxt_queue_self(&job->link);
- job->task.ident = nxt_task_next_ident();
-
return job;
}
@@ -58,8 +56,6 @@ nxt_job_init(nxt_job_t *job, size_t size)
nxt_job_set_name(job, "job");
nxt_queue_self(&job->link);
-
- job->task.ident = nxt_task_next_ident();
}
@@ -118,8 +114,11 @@ nxt_job_start(nxt_task_t *task, nxt_job_t *job, nxt_work_handler_t handler)
job->engine = task->thread->engine;
- ret = nxt_thread_pool_post(job->thread_pool, nxt_job_thread_trampoline,
- &job->task, job, (void *) handler);
+ nxt_work_set(&job->work, nxt_job_thread_trampoline,
+ job->task, job, (void *) handler);
+
+ ret = nxt_thread_pool_post(job->thread_pool, &job->work);
+
if (ret == NXT_OK) {
return;
}
@@ -129,7 +128,7 @@ nxt_job_start(nxt_task_t *task, nxt_job_t *job, nxt_work_handler_t handler)
#endif
- handler(&job->task, job, job->data);
+ handler(job->task, job, job->data);
}
@@ -146,15 +145,13 @@ nxt_job_thread_trampoline(nxt_task_t *task, void *obj, void *data)
job = obj;
handler = (nxt_work_handler_t) data;
- job->task.log = job->log;
-
nxt_debug(task, "%s thread", job->name);
if (nxt_slow_path(job->cancel)) {
nxt_job_return(task, job, job->abort_handler);
} else {
- handler(&job->task, job, job->data);
+ handler(job->task, job, job->data);
}
}
@@ -170,8 +167,12 @@ nxt_job_return(nxt_task_t *task, nxt_job_t *job, nxt_work_handler_t handler)
if (job->engine != NULL) {
/* A return function is called in thread pool thread context. */
- nxt_event_engine_post(job->engine, nxt_job_thread_return_handler,
- &job->task, job, (void *) handler, job->log);
+
+ nxt_work_set(&job->work, nxt_job_thread_return_handler,
+ job->task, job, (void *) handler);
+
+ nxt_event_engine_post(job->engine, &job->work);
+
return;
}
@@ -182,8 +183,8 @@ nxt_job_return(nxt_task_t *task, nxt_job_t *job, nxt_work_handler_t handler)
handler = job->abort_handler;
}
- nxt_thread_work_queue_push(task->thread, &task->thread->work_queue.main,
- handler, &job->task, job, job->data);
+ nxt_work_queue_add(&task->thread->engine->fast_work_queue,
+ handler, job->task, job, job->data);
}
@@ -198,14 +199,14 @@ nxt_job_thread_return_handler(nxt_task_t *task, void *obj, void *data)
job = obj;
handler = (nxt_work_handler_t) data;
- job->task.thread = task->thread;
+ job->task->thread = task->thread;
if (nxt_slow_path(job->cancel)) {
nxt_debug(task, "%s cancellation", job->name);
handler = job->abort_handler;
}
- handler(&job->task, job, job->data);
+ handler(job->task, job, job->data);
}
#endif
diff --git a/src/nxt_job.h b/src/nxt_job.h
index f1954b57..b1af2e80 100644
--- a/src/nxt_job.h
+++ b/src/nxt_job.h
@@ -33,7 +33,7 @@
typedef struct {
void *data;
- nxt_task_t task;
+ nxt_task_t *task;
nxt_work_handler_t abort_handler;
@@ -49,6 +49,8 @@ typedef struct {
nxt_log_t *log;
#endif
+ nxt_work_t work;
+
#if (NXT_DEBUG)
const char *name;
#endif
diff --git a/src/nxt_job_resolve.c b/src/nxt_job_resolve.c
index e44acd14..f0b34e2c 100644
--- a/src/nxt_job_resolve.c
+++ b/src/nxt_job_resolve.c
@@ -121,5 +121,5 @@ fail:
freeaddrinfo(res);
}
- nxt_job_return(&jbr->job.task, &jbr->job, handler);
+ nxt_job_return(jbr->job.task, &jbr->job, handler);
}
diff --git a/src/nxt_kqueue.c b/src/nxt_kqueue.c
index 7370a401..c03cd8f4 100644
--- a/src/nxt_kqueue.c
+++ b/src/nxt_kqueue.c
@@ -531,11 +531,13 @@ static void
nxt_kqueue_error(nxt_kqueue_event_set_t *ks)
{
struct kevent *kev, *end;
- nxt_thread_t *thr;
+ nxt_thread_t *thread;
nxt_event_fd_t *ev;
nxt_event_file_t *fev;
+ nxt_work_queue_t *wq;
- thr = nxt_thread();
+ thread = nxt_thread();
+ wq = &thread->engine->fast_work_queue;
end = &ks->changes[ks->nchanges];
for (kev = ks->changes; kev < end; kev++) {
@@ -545,16 +547,14 @@ nxt_kqueue_error(nxt_kqueue_event_set_t *ks)
case EVFILT_READ:
case EVFILT_WRITE:
ev = nxt_kevent_get_udata(kev->udata);
- nxt_thread_work_queue_add(thr, &thr->work_queue.main,
- nxt_kqueue_fd_error_handler,
- ev->task, ev, ev->data);
+ nxt_work_queue_add(wq, nxt_kqueue_fd_error_handler,
+ ev->task, ev, ev->data);
break;
case EVFILT_VNODE:
fev = nxt_kevent_get_udata(kev->udata);
- nxt_thread_work_queue_add(thr, &thr->work_queue.main,
- nxt_kqueue_file_error_handler,
- fev->task, fev, fev->data);
+ nxt_work_queue_add(wq, nxt_kqueue_file_error_handler,
+ fev->task, fev, fev->data);
break;
}
}
@@ -768,7 +768,7 @@ nxt_kqueue_poll(nxt_task_t *task, nxt_event_set_t *event_set,
}
event_task = task;
- wq = &task->thread->work_queue.main;
+ wq = &task->thread->engine->fast_work_queue;
handler = nxt_kqueue_fd_error_handler;
obj = nxt_kevent_get_udata(kev->udata);
@@ -871,8 +871,7 @@ nxt_kqueue_poll(nxt_task_t *task, nxt_event_set_t *event_set,
continue;
}
- nxt_thread_work_queue_add(task->thread, wq, handler,
- event_task, obj, data);
+ nxt_work_queue_add(wq, handler, event_task, obj, data);
}
}
@@ -938,8 +937,8 @@ nxt_kqueue_event_conn_connected(nxt_task_t *task, void *obj, void *data)
nxt_event_timer_disable(&c->write_timer);
}
- nxt_thread_work_queue_add(task->thread, c->write_work_queue,
- c->write_state->ready_handler, task, c, data);
+ nxt_work_queue_add(c->write_work_queue, c->write_state->ready_handler,
+ task, c, data);
}
@@ -1020,8 +1019,8 @@ nxt_kqueue_event_conn_io_read(nxt_task_t *task, void *obj, void *data)
nxt_debug(task, "kevent fd:%d eof", c->socket.fd);
c->socket.closed = 1;
- nxt_thread_work_queue_add(task->thread, c->read_work_queue,
- c->read_state->close_handler, task, c, data);
+ nxt_work_queue_add(c->read_work_queue, c->read_state->close_handler,
+ task, c, data);
return;
}
diff --git a/src/nxt_log.h b/src/nxt_log.h
index d04bc3dd..14f07383 100644
--- a/src/nxt_log.h
+++ b/src/nxt_log.h
@@ -123,7 +123,7 @@ nxt_log_debug(_log, ...) \
#else
-#define nxt_log_debug(...)
+#define nxt_debug(...)
#define \
nxt_log_debug(...)
diff --git a/src/nxt_log_moderation.c b/src/nxt_log_moderation.c
index 5082270d..a571ae17 100644
--- a/src/nxt_log_moderation.c
+++ b/src/nxt_log_moderation.c
@@ -58,7 +58,7 @@ nxt_log_moderate_allow(nxt_log_moderation_t *mod)
nxt_thread_spin_unlock(&mod->lock);
if (timer) {
- mod->timer.work_queue = &thr->work_queue.main;
+ mod->timer.work_queue = &thr->engine->fast_work_queue;
mod->timer.handler = nxt_log_moderate_timer_handler;
mod->timer.log = &nxt_main_log;
diff --git a/src/nxt_master_process.c b/src/nxt_master_process.c
index 8ce6e670..8a11aa16 100644
--- a/src/nxt_master_process.c
+++ b/src/nxt_master_process.c
@@ -228,7 +228,7 @@ nxt_master_process_new_cycle(nxt_task_t *task, nxt_cycle_t *cycle)
cycle->timer.log = &nxt_main_log;
nxt_event_timer_ident(&cycle->timer, -1);
- cycle->timer.work_queue = &thr->work_queue.main;
+ cycle->timer.work_queue = &thr->engine->fast_work_queue;
nxt_event_timer_add(thr->engine, &cycle->timer, 500);
diff --git a/src/nxt_poll.c b/src/nxt_poll.c
index 2cc0200b..6f6299b0 100644
--- a/src/nxt_poll.c
+++ b/src/nxt_poll.c
@@ -397,8 +397,8 @@ nxt_poll_commit_changes(nxt_thread_t *thr, nxt_poll_event_set_t *ps)
break;
}
- nxt_thread_work_queue_add(thr, &thr->work_queue.main,
- ev->error_handler, ev->task, ev, ev->data);
+ nxt_work_queue_add(&thr->engine->fast_work_queue, ev->error_handler,
+ ev->task, ev, ev->data);
ret = NXT_ERROR;
@@ -608,10 +608,9 @@ nxt_poll_set_poll(nxt_task_t *task, nxt_event_set_t *event_set,
/* Mark the poll entry to ignore it by the kernel. */
pfd->fd = -1;
- nxt_thread_work_queue_add(task->thread,
- &task->thread->work_queue.main,
- ev->error_handler,
- ev->task, ev, ev->data);
+ nxt_work_queue_add(&ev->task->thread->engine->fast_work_queue,
+ ev->error_handler,
+ ev->task, ev, ev->data);
goto next;
}
@@ -653,8 +652,8 @@ nxt_poll_set_poll(nxt_task_t *task, nxt_event_set_t *event_set,
nxt_poll_change(event_set, ev, NXT_POLL_DELETE, 0);
}
- nxt_thread_work_queue_add(task->thread, ev->read_work_queue,
- ev->read_handler, ev->task, ev, ev->data);
+ nxt_work_queue_add(ev->read_work_queue, ev->read_handler,
+ ev->task, ev, ev->data);
}
if ((events & POLLOUT) || (error && ev->write_handler != NULL)) {
@@ -665,9 +664,8 @@ nxt_poll_set_poll(nxt_task_t *task, nxt_event_set_t *event_set,
nxt_poll_change(event_set, ev, NXT_POLL_DELETE, 0);
}
- nxt_thread_work_queue_add(task->thread, ev->write_work_queue,
- ev->write_handler,
- ev->task, ev, ev->data);
+ nxt_work_queue_add(ev->write_work_queue, ev->write_handler,
+ ev->task, ev, ev->data);
}
next:
diff --git a/src/nxt_select.c b/src/nxt_select.c
index a2982d2f..8ee5808e 100644
--- a/src/nxt_select.c
+++ b/src/nxt_select.c
@@ -141,9 +141,8 @@ nxt_select_enable_read(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
if (fd < 0 || fd >= (nxt_fd_t) FD_SETSIZE) {
thr = nxt_thread();
- nxt_thread_work_queue_add(thr, &thr->work_queue.main,
- nxt_select_error_handler,
- ev->task, ev, ev->data);
+ nxt_work_queue_add(&thr->engine->fast_work_queue,
+ nxt_select_error_handler, ev->task, ev, ev->data);
return;
}
@@ -174,9 +173,8 @@ nxt_select_enable_write(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
if (fd < 0 || fd >= (nxt_fd_t) FD_SETSIZE) {
thr = nxt_thread();
- nxt_thread_work_queue_add(thr, &thr->work_queue.main,
- nxt_select_error_handler,
- ev->task, ev, ev->data);
+ nxt_work_queue_add(&thr->engine->fast_work_queue,
+ nxt_select_error_handler, ev->task, ev, ev->data);
return;
}
@@ -365,8 +363,8 @@ nxt_select_poll(nxt_task_t *task, nxt_event_set_t *event_set,
nxt_select_disable_read(event_set, ev);
}
- nxt_thread_work_queue_add(task->thread, ev->read_work_queue,
- ev->read_handler, ev->task, ev, ev->data);
+ nxt_work_queue_add(ev->read_work_queue, ev->read_handler,
+ ev->task, ev, ev->data);
found = 1;
}
@@ -382,9 +380,8 @@ nxt_select_poll(nxt_task_t *task, nxt_event_set_t *event_set,
nxt_select_disable_write(event_set, ev);
}
- nxt_thread_work_queue_add(task->thread, ev->write_work_queue,
- ev->write_handler,
- ev->task, ev, ev->data);
+ nxt_work_queue_add(ev->write_work_queue, ev->write_handler,
+ ev->task, ev, ev->data);
found = 1;
}
diff --git a/src/nxt_sendbuf.c b/src/nxt_sendbuf.c
index e8a4c190..309d3147 100644
--- a/src/nxt_sendbuf.c
+++ b/src/nxt_sendbuf.c
@@ -343,8 +343,7 @@ nxt_sendbuf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b,
}
}
- nxt_thread_work_queue_add(task->thread, wq, b->completion_handler, task,
- b, b->parent);
+ nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent);
b = b->next;
}
diff --git a/src/nxt_sockaddr.c b/src/nxt_sockaddr.c
index f14f690d..67e20b3f 100644
--- a/src/nxt_sockaddr.c
+++ b/src/nxt_sockaddr.c
@@ -495,7 +495,7 @@ nxt_job_sockaddr_parse(nxt_job_sockaddr_parse_t *jbs)
return;
}
- nxt_job_return(&jbs->resolve.job.task, &jbs->resolve.job, handler);
+ nxt_job_return(jbs->resolve.job.task, &jbs->resolve.job, handler);
}
diff --git a/src/nxt_thread.c b/src/nxt_thread.c
index 680412c2..24136bde 100644
--- a/src/nxt_thread.c
+++ b/src/nxt_thread.c
@@ -180,16 +180,22 @@ nxt_thread_time_cleanup(void *data)
void
nxt_thread_exit(nxt_thread_t *thr)
{
+ nxt_thread_link_t *link;
+
nxt_log_debug(thr->log, "thread exit");
- if (thr->link != NULL) {
- nxt_event_engine_post(thr->link->engine, thr->link->exit,
- &thr->link->engine->task,
- (void *) (uintptr_t) thr->handle,
- NULL, &nxt_main_log);
+ link = thr->link;
+ thr->link = NULL;
+
+ if (link != NULL) {
+ /*
+ * link->handler is already set to an exit handler,
+ * and link->task is already set to engine->task.
+ * The link should be freed by the exit handler.
+ */
+ link->work.obj = thr->handle;
- nxt_free(thr->link);
- thr->link = NULL;
+ nxt_event_engine_post(link->engine, &link->work);
}
nxt_thread_time_free(thr);
diff --git a/src/nxt_thread.h b/src/nxt_thread.h
index ebff808f..b25bd24a 100644
--- a/src/nxt_thread.h
+++ b/src/nxt_thread.h
@@ -92,7 +92,7 @@ typedef struct {
nxt_thread_start_t start;
void *data;
nxt_event_engine_t *engine;
- nxt_work_handler_t exit;
+ nxt_work_t work;
} nxt_thread_link_t;
@@ -179,7 +179,6 @@ struct nxt_thread_s {
nxt_thread_time_t time;
nxt_event_engine_t *engine;
- nxt_thread_work_queue_t work_queue;
/*
* Although pointer to a current fiber should be a property of
diff --git a/src/nxt_thread_pool.c b/src/nxt_thread_pool.c
index 463bfad4..fd7246c8 100644
--- a/src/nxt_thread_pool.c
+++ b/src/nxt_thread_pool.c
@@ -38,8 +38,7 @@ nxt_thread_pool_create(nxt_uint_t max_threads, nxt_nsec_t timeout,
nxt_int_t
-nxt_thread_pool_post(nxt_thread_pool_t *tp, nxt_work_handler_t handler,
- nxt_task_t *task, void *obj, void *data)
+nxt_thread_pool_post(nxt_thread_pool_t *tp, nxt_work_t *work)
{
nxt_thread_log_debug("thread pool post");
@@ -47,7 +46,7 @@ nxt_thread_pool_post(nxt_thread_pool_t *tp, nxt_work_handler_t handler,
return NXT_ERROR;
}
- nxt_locked_work_queue_add(&tp->work_queue, handler, task, obj, data);
+ nxt_locked_work_queue_add(&tp->work_queue, work);
(void) nxt_sem_post(&tp->sem);
@@ -66,6 +65,11 @@ nxt_thread_pool_init(nxt_thread_pool_t *tp)
return NXT_OK;
}
+ if (tp->max_threads == 0) {
+ /* The pool is being destroyed. */
+ return NXT_ERROR;
+ }
+
nxt_thread_spin_lock(&tp->work_queue.lock);
ret = NXT_OK;
@@ -78,8 +82,6 @@ nxt_thread_pool_init(nxt_thread_pool_t *tp)
if (nxt_fast_path(nxt_sem_init(&tp->sem, 0) == NXT_OK)) {
- nxt_locked_work_queue_create(&tp->work_queue, 0);
-
link = nxt_malloc(sizeof(nxt_thread_link_t));
if (nxt_fast_path(link != NULL)) {
@@ -102,8 +104,6 @@ nxt_thread_pool_init(nxt_thread_pool_t *tp)
(void) nxt_atomic_fetch_add(&tp->threads, -1);
- nxt_locked_work_queue_destroy(&tp->work_queue);
-
ret = NXT_ERROR;
}
@@ -142,8 +142,6 @@ nxt_thread_pool_start(void *ctx)
tp->init();
}
- nxt_thread_work_queue_create(thr, 8);
-
for ( ;; ) {
nxt_thread_pool_wait(tp);
@@ -152,18 +150,8 @@ nxt_thread_pool_start(void *ctx)
if (nxt_fast_path(handler != NULL)) {
task->thread = thr;
- nxt_log_debug(thr->log, "locked work queue");
- handler(task, obj, data);
- }
- for ( ;; ) {
- thr->log = &nxt_main_log;
-
- handler = nxt_thread_work_queue_pop(thr, &task, &obj, &data);
-
- if (handler == NULL) {
- break;
- }
+ nxt_log_debug(thr->log, "locked work queue");
handler(task, obj, data);
}
@@ -245,8 +233,8 @@ nxt_thread_pool_destroy(nxt_thread_pool_t *tp)
thr = nxt_thread();
if (!tp->ready) {
- nxt_thread_work_queue_add(thr, &thr->work_queue.main, tp->exit,
- &tp->task, tp, NULL);
+ nxt_work_queue_add(&thr->engine->fast_work_queue, tp->exit,
+ &tp->task, tp, NULL);
return;
}
@@ -254,7 +242,9 @@ nxt_thread_pool_destroy(nxt_thread_pool_t *tp)
/* Disable new threads creation and mark a pool as being destroyed. */
tp->max_threads = 0;
- nxt_thread_pool_post(tp, nxt_thread_pool_exit, &tp->task, tp, NULL);
+ nxt_work_set(&tp->work, nxt_thread_pool_exit, &tp->task, tp, NULL);
+
+ nxt_thread_pool_post(tp, &tp->work);
}
}
@@ -293,24 +283,23 @@ nxt_thread_pool_exit(nxt_task_t *task, void *obj, void *data)
nxt_debug(task, "thread pool threads: %A", threads);
if (threads > 1) {
- nxt_thread_pool_post(tp, nxt_thread_pool_exit, &tp->task, tp,
- (void *) (uintptr_t) thread->handle);
+ nxt_work_set(&tp->work, nxt_thread_pool_exit, &tp->task, tp,
+ (void *) (uintptr_t) thread->handle);
+
+ nxt_thread_pool_post(tp, &tp->work);
} else {
nxt_debug(task, "thread pool destroy");
- nxt_event_engine_post(tp->engine, tp->exit, &tp->task, tp,
- (void *) (uintptr_t) thread->handle,
- &nxt_main_log);
-
nxt_sem_destroy(&tp->sem);
- nxt_locked_work_queue_destroy(&tp->work_queue);
+ nxt_work_set(&tp->work, nxt_thread_pool_exit, &tp->task, tp,
+ (void *) (uintptr_t) thread->handle);
- nxt_free(tp);
- }
+ nxt_event_engine_post(tp->engine, &tp->work);
- nxt_thread_work_queue_destroy(thread);
+ /* The "tp" memory should be freed by tp->exit handler. */
+ }
nxt_thread_exit(thread);
diff --git a/src/nxt_thread_pool.h b/src/nxt_thread_pool.h
index 75899a06..750b98f8 100644
--- a/src/nxt_thread_pool.h
+++ b/src/nxt_thread_pool.h
@@ -20,6 +20,7 @@ struct nxt_thread_pool_s {
nxt_sem_t sem;
nxt_nsec_t timeout;
+ nxt_work_t work;
nxt_task_t task;
nxt_locked_work_queue_t work_queue;
@@ -37,7 +38,7 @@ NXT_EXPORT nxt_thread_pool_t *nxt_thread_pool_create(nxt_uint_t max_threads,
nxt_event_engine_t *engine, nxt_work_handler_t exit);
NXT_EXPORT void nxt_thread_pool_destroy(nxt_thread_pool_t *tp);
NXT_EXPORT nxt_int_t nxt_thread_pool_post(nxt_thread_pool_t *tp,
- nxt_work_handler_t handler, nxt_task_t *task, void *obj, void *data);
+ nxt_work_t *work);
#endif /* _NXT_UNIX_THREAD_POOL_H_INCLUDED_ */
diff --git a/src/nxt_work_queue.c b/src/nxt_work_queue.c
index ffb9317f..ecdc65ff 100644
--- a/src/nxt_work_queue.c
+++ b/src/nxt_work_queue.c
@@ -25,12 +25,7 @@
* a new spare chunk is allocated again.
*/
-static void nxt_work_queue_allocate(nxt_work_queue_cache_t *cache,
- nxt_thread_spinlock_t *lock);
-static void nxt_work_queue_sleep(nxt_thread_spinlock_t *lock);
-static nxt_work_queue_t *nxt_thread_current_work_queue(nxt_thread_t *thr);
-static nxt_work_handler_t nxt_locked_work_queue_pop_work(
- nxt_locked_work_queue_t *lwq, nxt_task_t **task, void **obj, void **data);
+static void nxt_work_queue_allocate(nxt_work_queue_cache_t *cache);
/* It should be adjusted with the "work_queue_bucket_items" directive. */
@@ -38,32 +33,29 @@ static nxt_uint_t nxt_work_queue_bucket_items = 409;
void
-nxt_thread_work_queue_create(nxt_thread_t *thr, size_t chunk_size)
+nxt_work_queue_cache_create(nxt_work_queue_cache_t *cache, size_t chunk_size)
{
- nxt_memzero(&thr->work_queue, sizeof(nxt_thread_work_queue_t));
-
- nxt_work_queue_name(&thr->work_queue.main, "main");
- nxt_work_queue_name(&thr->work_queue.last, "last");
+ nxt_memzero(cache, sizeof(nxt_work_queue_cache_t));
if (chunk_size == 0) {
chunk_size = nxt_work_queue_bucket_items;
}
/* nxt_work_queue_chunk_t already has one work item. */
- thr->work_queue.cache.chunk_size = chunk_size - 1;
+ cache->chunk_size = chunk_size - 1;
- while (thr->work_queue.cache.next == NULL) {
- nxt_work_queue_allocate(&thr->work_queue.cache, NULL);
+ while (cache->next == NULL) {
+ nxt_work_queue_allocate(cache);
}
}
void
-nxt_thread_work_queue_destroy(nxt_thread_t *thr)
+nxt_work_queue_cache_destroy(nxt_work_queue_cache_t *cache)
{
nxt_work_queue_chunk_t *chunk, *next;
- for (chunk = thr->work_queue.cache.chunk; chunk; chunk = next) {
+ for (chunk = cache->chunk; chunk; chunk = next) {
next = chunk->next;
nxt_free(chunk);
}
@@ -71,8 +63,7 @@ nxt_thread_work_queue_destroy(nxt_thread_t *thr)
static void
-nxt_work_queue_allocate(nxt_work_queue_cache_t *cache,
- nxt_thread_spinlock_t *lock)
+nxt_work_queue_allocate(nxt_work_queue_cache_t *cache)
{
size_t size;
nxt_uint_t i, n;
@@ -102,7 +93,6 @@ nxt_work_queue_allocate(nxt_work_queue_cache_t *cache,
work = NULL;
} else {
- nxt_work_queue_sleep(lock);
return;
}
@@ -111,36 +101,19 @@ nxt_work_queue_allocate(nxt_work_queue_cache_t *cache,
}
-static void
-nxt_work_queue_sleep(nxt_thread_spinlock_t *lock)
-{
- if (lock != NULL) {
- nxt_thread_spin_unlock(lock);
- }
-
- nxt_nanosleep(100 * 1000000); /* 100ms */
-
- if (lock != NULL) {
- nxt_thread_spin_lock(lock);
- }
-}
-
-
/* Add a work to a work queue tail. */
void
-nxt_thread_work_queue_add(nxt_thread_t *thr, nxt_work_queue_t *wq,
- nxt_work_handler_t handler, nxt_task_t *task, void *obj, void *data)
+nxt_work_queue_add(nxt_work_queue_t *wq, nxt_work_handler_t handler,
+ nxt_task_t *task, void *obj, void *data)
{
nxt_work_t *work;
- nxt_work_queue_attach(thr, wq);
-
for ( ;; ) {
- work = thr->work_queue.cache.next;
+ work = wq->cache->next;
if (nxt_fast_path(work != NULL)) {
- thr->work_queue.cache.next = work->next;
+ wq->cache->next = work->next;
work->next = NULL;
work->handler = handler;
@@ -160,366 +133,56 @@ nxt_thread_work_queue_add(nxt_thread_t *thr, nxt_work_queue_t *wq,
return;
}
- nxt_work_queue_allocate(&thr->work_queue.cache, NULL);
+ nxt_work_queue_allocate(wq->cache);
}
}
-/* Push a work to a work queue head. */
-
-void
-nxt_thread_work_queue_push(nxt_thread_t *thr, nxt_work_queue_t *wq,
- nxt_work_handler_t handler, nxt_task_t *task, void *obj, void *data)
-{
- nxt_work_t *work;
-
- nxt_work_queue_attach(thr, wq);
-
- for ( ;; ) {
- work = thr->work_queue.cache.next;
-
- if (nxt_fast_path(work != NULL)) {
- thr->work_queue.cache.next = work->next;
- work->next = wq->head;
-
- work->handler = handler;
- work->obj = obj;
- work->data = data;
-
- wq->head = work;
-
- if (wq->tail == NULL) {
- wq->tail = work;
- }
-
- return;
- }
-
- nxt_work_queue_allocate(&thr->work_queue.cache, NULL);
- }
-}
-
-
-/* Attach a work queue to a thread work queue. */
-
-void
-nxt_work_queue_attach(nxt_thread_t *thr, nxt_work_queue_t *wq)
-{
- if (wq->next == NULL && wq != thr->work_queue.tail) {
-
- if (thr->work_queue.tail != NULL) {
- thr->work_queue.tail->next = wq;
-
- } else {
- thr->work_queue.head = wq;
- }
-
- thr->work_queue.tail = wq;
- }
-}
-
-
-/* Pop a work from a thread work queue head. */
-
nxt_work_handler_t
-nxt_thread_work_queue_pop(nxt_thread_t *thr, nxt_task_t **task, void **obj,
+nxt_work_queue_pop(nxt_work_queue_t *wq, nxt_task_t **task, void **obj,
void **data)
{
- nxt_work_t *work;
- nxt_work_queue_t *wq;
-
- wq = nxt_thread_current_work_queue(thr);
-
- if (wq != NULL) {
-
- work = wq->head;
-
- if (work != NULL) {
- wq->head = work->next;
-
- if (work->next == NULL) {
- wq->tail = NULL;
- }
-
- *task = work->task;
- *obj = work->obj;
- nxt_prefetch(*obj);
- *data = work->data;
- nxt_prefetch(*data);
-
- work->next = thr->work_queue.cache.next;
- thr->work_queue.cache.next = work;
-
-#if (NXT_DEBUG)
-
- if (work->handler == NULL) {
- nxt_log_alert(thr->log, "null work handler");
- nxt_abort();
- }
-
-#endif
-
- return work->handler;
- }
- }
-
- return NULL;
-}
-
-
-static nxt_work_queue_t *
-nxt_thread_current_work_queue(nxt_thread_t *thr)
-{
- nxt_work_queue_t *wq, *next;
-
- for (wq = thr->work_queue.head; wq != NULL; wq = next) {
-
- if (wq->head != NULL) {
- nxt_log_debug(thr->log, "work queue: %s", wq->name);
- return wq;
- }
-
- /* Detach empty work queue. */
- next = wq->next;
- wq->next = NULL;
- thr->work_queue.head = next;
- }
-
- thr->work_queue.tail = NULL;
-
- return NULL;
-}
-
-
-/* Drop a work with specified data from a thread work queue. */
-
-void
-nxt_thread_work_queue_drop(nxt_thread_t *thr, void *data)
-{
- nxt_work_t *work, *prev, *next, **link;
- nxt_work_queue_t *wq;
-
- for (wq = thr->work_queue.head; wq != NULL; wq = wq->next) {
-
- prev = NULL;
- link = &wq->head;
-
- for (work = wq->head; work != NULL; work = next) {
-
- next = work->next;
-
- if (data != work->obj) {
- prev = work;
- link = &work->next;
-
- } else {
- if (next == NULL) {
- wq->tail = prev;
- }
-
- nxt_log_debug(thr->log, "work queue drop");
-
- *link = next;
-
- work->next = thr->work_queue.cache.next;
- thr->work_queue.cache.next = work;
- }
- }
- }
-}
-
-
-/* Add a work to the thread last work queue's tail. */
-
-void
-nxt_thread_last_work_queue_add(nxt_thread_t *thr, nxt_work_handler_t handler,
- void *obj, void *data)
-{
nxt_work_t *work;
- for ( ;; ) {
- work = thr->work_queue.cache.next;
-
- if (nxt_fast_path(work != NULL)) {
- thr->work_queue.cache.next = work->next;
- work->next = NULL;
-
- work->handler = handler;
- work->obj = obj;
- work->data = data;
-
- if (thr->work_queue.last.tail != NULL) {
- thr->work_queue.last.tail->next = work;
-
- } else {
- thr->work_queue.last.head = work;
- }
+ work = wq->head;
- thr->work_queue.last.tail = work;
+ wq->head = work->next;
- return;
- }
-
- nxt_work_queue_allocate(&thr->work_queue.cache, NULL);
- }
-}
-
-
-/* Pop a work from the thread last work queue's head. */
-
-nxt_work_handler_t
-nxt_thread_last_work_queue_pop(nxt_thread_t *thr, nxt_task_t **task, void **obj,
- void **data)
-{
- nxt_work_t *work;
-
- work = thr->work_queue.last.head;
-
- if (work != NULL) {
- nxt_log_debug(thr->log, "work queue: %s", thr->work_queue.last.name);
-
- thr->work_queue.last.head = work->next;
-
- if (work->next == NULL) {
- thr->work_queue.last.tail = NULL;
- }
-
- *task = work->task;
- *obj = work->obj;
- nxt_prefetch(*obj);
- *data = work->data;
- nxt_prefetch(*data);
-
- work->next = thr->work_queue.cache.next;
- thr->work_queue.cache.next = work;
-
-#if (NXT_DEBUG)
-
- if (work->handler == NULL) {
- nxt_log_alert(thr->log, "null work handler");
- nxt_abort();
- }
-
-#endif
-
- return work->handler;
- }
-
- return NULL;
-}
-
-
-void
-nxt_work_queue_destroy(nxt_work_queue_t *wq)
-{
- nxt_thread_t *thr;
- nxt_work_queue_t *q;
-
- thr = nxt_thread();
-
- /* Detach from a thread work queue. */
-
- if (thr->work_queue.head == wq) {
- thr->work_queue.head = wq->next;
- q = NULL;
- goto found;
- }
-
- for (q = thr->work_queue.head; q != NULL; q = q->next) {
- if (q->next == wq) {
- q->next = wq->next;
- goto found;
- }
- }
-
- return;
-
-found:
-
- if (thr->work_queue.tail == wq) {
- thr->work_queue.tail = q;
- }
-
- /* Move all queue's works to a thread work queue cache. */
-
- if (wq->tail != NULL) {
- wq->tail->next = thr->work_queue.cache.next;
- }
-
- if (wq->head != NULL) {
- thr->work_queue.cache.next = wq->head;
- }
-}
-
-
-/* Locked work queue operations. */
-
-void
-nxt_locked_work_queue_create(nxt_locked_work_queue_t *lwq, size_t chunk_size)
-{
- nxt_memzero(lwq, sizeof(nxt_locked_work_queue_t));
-
- if (chunk_size == 0) {
- chunk_size = nxt_work_queue_bucket_items;
+ if (work->next == NULL) {
+ wq->tail = NULL;
}
- lwq->cache.chunk_size = chunk_size;
+ *task = work->task;
- while (lwq->cache.next == NULL) {
- nxt_work_queue_allocate(&lwq->cache, NULL);
- }
-}
+ *obj = work->obj;
+ nxt_prefetch(*obj);
+ *data = work->data;
+ nxt_prefetch(*data);
-void
-nxt_locked_work_queue_destroy(nxt_locked_work_queue_t *lwq)
-{
- nxt_work_queue_chunk_t *chunk, *next;
+ work->next = wq->cache->next;
+ wq->cache->next = work;
- for (chunk = lwq->cache.chunk; chunk; chunk = next) {
- next = chunk->next;
- nxt_free(chunk);
- }
+ return work->handler;
}
/* Add a work to a locked work queue tail. */
void
-nxt_locked_work_queue_add(nxt_locked_work_queue_t *lwq,
- nxt_work_handler_t handler, nxt_task_t *task, void *obj, void *data)
+nxt_locked_work_queue_add(nxt_locked_work_queue_t *lwq, nxt_work_t *work)
{
- nxt_work_t *work;
-
nxt_thread_spin_lock(&lwq->lock);
- for ( ;; ) {
- work = lwq->cache.next;
-
- if (nxt_fast_path(work != NULL)) {
- lwq->cache.next = work->next;
-
- work->next = NULL;
- work->handler = handler;
- work->task = task;
- work->obj = obj;
- work->data = data;
-
- if (lwq->tail != NULL) {
- lwq->tail->next = work;
-
- } else {
- lwq->head = work;
- }
-
- lwq->tail = work;
+ if (lwq->tail != NULL) {
+ lwq->tail->next = work;
- break;
- }
-
- nxt_work_queue_allocate(&lwq->cache, &lwq->lock);
+ } else {
+ lwq->head = work;
}
+ lwq->tail = work;
+
nxt_thread_spin_unlock(&lwq->lock);
}
@@ -530,46 +193,36 @@ nxt_work_handler_t
nxt_locked_work_queue_pop(nxt_locked_work_queue_t *lwq, nxt_task_t **task,
void **obj, void **data)
{
+ nxt_work_t *work;
nxt_work_handler_t handler;
- nxt_thread_spin_lock(&lwq->lock);
-
- handler = nxt_locked_work_queue_pop_work(lwq, task, obj, data);
-
- nxt_thread_spin_unlock(&lwq->lock);
+ handler = NULL;
- return handler;
-}
+ nxt_thread_spin_lock(&lwq->lock);
+ work = lwq->head;
-static nxt_work_handler_t
-nxt_locked_work_queue_pop_work(nxt_locked_work_queue_t *lwq, nxt_task_t **task,
- void **obj, void **data)
-{
- nxt_work_t *work;
+ if (work != NULL) {
+ *task = work->task;
- work = lwq->head;
+ *obj = work->obj;
+ nxt_prefetch(*obj);
- if (work == NULL) {
- return NULL;
- }
+ *data = work->data;
+ nxt_prefetch(*data);
- *task = work->task;
- *obj = work->obj;
- nxt_prefetch(*obj);
- *data = work->data;
- nxt_prefetch(*data);
+ lwq->head = work->next;
- lwq->head = work->next;
+ if (work->next == NULL) {
+ lwq->tail = NULL;
+ }
- if (work->next == NULL) {
- lwq->tail = NULL;
+ handler = work->handler;
}
- work->next = lwq->cache.next;
- lwq->cache.next = work;
+ nxt_thread_spin_unlock(&lwq->lock);
- return work->handler;
+ return handler;
}
@@ -579,29 +232,23 @@ void
nxt_locked_work_queue_move(nxt_thread_t *thr, nxt_locked_work_queue_t *lwq,
nxt_work_queue_t *wq)
{
- void *obj, *data;
- nxt_task_t *task;
- nxt_work_handler_t handler;
+ nxt_work_t *work;
- /* Locked work queue head can be tested without a lock. */
+ nxt_thread_spin_lock(&lwq->lock);
- if (nxt_fast_path(lwq->head == NULL)) {
- return;
- }
+ work = lwq->head;
- nxt_thread_spin_lock(&lwq->lock);
+ lwq->head = NULL;
+ lwq->tail = NULL;
- for ( ;; ) {
- handler = nxt_locked_work_queue_pop_work(lwq, &task, &obj, &data);
+ nxt_thread_spin_unlock(&lwq->lock);
- if (handler == NULL) {
- break;
- }
+ while (work != NULL) {
+ work->task->thread = thr;
- task->thread = thr;
+ nxt_work_queue_add(wq, work->handler, work->task,
+ work->obj, work->data);
- nxt_thread_work_queue_add(thr, wq, handler, task, obj, data);
+ work = work->next;
}
-
- nxt_thread_spin_unlock(&lwq->lock);
}
diff --git a/src/nxt_work_queue.h b/src/nxt_work_queue.h
index b37fe1a3..4d2422cc 100644
--- a/src/nxt_work_queue.h
+++ b/src/nxt_work_queue.h
@@ -65,7 +65,7 @@ typedef struct nxt_work_queue_s nxt_work_queue_t;
struct nxt_work_queue_s {
nxt_work_t *head;
nxt_work_t *tail;
- nxt_work_queue_t *next;
+ nxt_work_queue_cache_t *cache;
#if (NXT_DEBUG)
const char *name;
#endif
@@ -73,15 +73,6 @@ struct nxt_work_queue_s {
typedef struct {
- nxt_work_queue_t *head;
- nxt_work_queue_t *tail;
- nxt_work_queue_t main;
- nxt_work_queue_t last;
- nxt_work_queue_cache_t cache;
-} nxt_thread_work_queue_t;
-
-
-typedef struct {
nxt_thread_spinlock_t lock;
nxt_work_t *head;
nxt_work_t *tail;
@@ -89,34 +80,26 @@ typedef struct {
} nxt_locked_work_queue_t;
-NXT_EXPORT void nxt_thread_work_queue_create(nxt_thread_t *thr,
+NXT_EXPORT void nxt_work_queue_cache_create(nxt_work_queue_cache_t *cache,
size_t chunk_size);
-NXT_EXPORT void nxt_thread_work_queue_destroy(nxt_thread_t *thr);
-NXT_EXPORT void nxt_thread_work_queue_add(nxt_thread_t *thr,
- nxt_work_queue_t *wq, nxt_work_handler_t handler, nxt_task_t *task,
- void *obj, void *data);
-NXT_EXPORT void nxt_thread_work_queue_push(nxt_thread_t *thr,
- nxt_work_queue_t *wq, nxt_work_handler_t handler, nxt_task_t *task,
- void *obj, void *data);
-NXT_EXPORT void nxt_work_queue_attach(nxt_thread_t *thr, nxt_work_queue_t *wq);
-NXT_EXPORT nxt_work_handler_t nxt_thread_work_queue_pop(nxt_thread_t *thr,
+NXT_EXPORT void nxt_work_queue_cache_destroy(nxt_work_queue_cache_t *cache);
+
+NXT_EXPORT void nxt_work_queue_add(nxt_work_queue_t *wq,
+ nxt_work_handler_t handler, nxt_task_t *task, void *obj, void *data);
+NXT_EXPORT nxt_work_handler_t nxt_work_queue_pop(nxt_work_queue_t *wq,
nxt_task_t **task, void **obj, void **data);
-NXT_EXPORT void nxt_thread_work_queue_drop(nxt_thread_t *thr, void *data);
-#define \
-nxt_thread_current_work_queue_add(thr, handler, task, obj, data) \
+#define nxt_work_set(_work, _handler, _task, _obj, _data) \
do { \
- nxt_thread_t *_thr = thr; \
+ nxt_work_t *work = _work; \
\
- nxt_thread_work_queue_add(_thr, _thr->work_queue.head, \
- handler, task, obj, data); \
+ work->handler = _handler; \
+ work->task = _task; \
+ work->obj = _obj; \
+ work->data = _data; \
} while (0)
-
-NXT_EXPORT void nxt_work_queue_destroy(nxt_work_queue_t *wq);
-
-
#if (NXT_DEBUG)
#define \
@@ -131,17 +114,8 @@ nxt_work_queue_name(_wq, _name)
#endif
-NXT_EXPORT void nxt_thread_last_work_queue_add(nxt_thread_t *thr,
- nxt_work_handler_t handler, void *obj, void *data);
-NXT_EXPORT nxt_work_handler_t nxt_thread_last_work_queue_pop(nxt_thread_t *thr,
- nxt_task_t **task, void **obj, void **data);
-
-
-NXT_EXPORT void nxt_locked_work_queue_create(nxt_locked_work_queue_t *lwq,
- size_t chunk_size);
-NXT_EXPORT void nxt_locked_work_queue_destroy(nxt_locked_work_queue_t *lwq);
NXT_EXPORT void nxt_locked_work_queue_add(nxt_locked_work_queue_t *lwq,
- nxt_work_handler_t handler, nxt_task_t *task, void *obj, void *data);
+ nxt_work_t *work);
NXT_EXPORT nxt_work_handler_t nxt_locked_work_queue_pop(
nxt_locked_work_queue_t *lwq, nxt_task_t **task, void **obj, void **data);
NXT_EXPORT void nxt_locked_work_queue_move(nxt_thread_t *thr,