diff options
-rw-r--r-- | src/nxt_port_socket.c | 12 | ||||
-rw-r--r-- | src/nxt_work_queue.c | 37 | ||||
-rw-r--r-- | src/nxt_work_queue.h | 2 |
3 files changed, 44 insertions, 7 deletions
diff --git a/src/nxt_port_socket.c b/src/nxt_port_socket.c index 883c45bb..56dfe755 100644 --- a/src/nxt_port_socket.c +++ b/src/nxt_port_socket.c @@ -235,6 +235,7 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data) ssize_t n; nxt_port_t *port; struct iovec iov[NXT_IOBUF_MAX * 10]; + nxt_work_queue_t *wq; nxt_queue_link_t *link; nxt_port_method_t m; nxt_port_send_msg_t *msg; @@ -301,22 +302,19 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data) goto fail; } + wq = &task->thread->engine->fast_work_queue; + if (msg->buf != plain_buf) { /* * Complete crafted mmap_msgs buf and restore msg->buf * for regular completion call. */ - nxt_port_mmap_completion(task, - port->socket.write_work_queue, - msg->buf); + nxt_port_mmap_completion(task, wq, msg->buf); msg->buf = plain_buf; } - msg->buf = nxt_sendbuf_completion(task, - port->socket.write_work_queue, - msg->buf, - plain_size); + msg->buf = nxt_sendbuf_completion(task, wq, msg->buf, plain_size); if (msg->buf != NULL) { /* diff --git a/src/nxt_work_queue.c b/src/nxt_work_queue.c index ecdc65ff..85f5cc3e 100644 --- a/src/nxt_work_queue.c +++ b/src/nxt_work_queue.c @@ -32,6 +32,39 @@ static void nxt_work_queue_allocate(nxt_work_queue_cache_t *cache); static nxt_uint_t nxt_work_queue_bucket_items = 409; +#if (NXT_DEBUG) + +nxt_inline void +nxt_work_queue_thread_assert(nxt_work_queue_t *wq) +{ + nxt_tid_t tid; + nxt_thread_t *thread; + + thread = nxt_thread(); + tid = nxt_thread_tid(thread); + + if (nxt_fast_path(wq->tid == tid)) { + return; + } + + if (nxt_slow_path(nxt_pid != wq->pid)) { + wq->pid = nxt_pid; + wq->tid = tid; + + return; + } + + nxt_log_alert(thread->log, "work queue locked by thread %PT", wq->tid); + nxt_abort(); +} + +#else + +#define nxt_work_queue_thread_assert(wq) + +#endif + + void nxt_work_queue_cache_create(nxt_work_queue_cache_t *cache, size_t chunk_size) { @@ -109,6 +142,8 @@ nxt_work_queue_add(nxt_work_queue_t *wq, nxt_work_handler_t handler, { nxt_work_t *work; + nxt_work_queue_thread_assert(wq); + for ( ;; ) { work = wq->cache->next; @@ -144,6 +179,8 @@ nxt_work_queue_pop(nxt_work_queue_t *wq, nxt_task_t **task, void **obj, { nxt_work_t *work; + nxt_work_queue_thread_assert(wq); + work = wq->head; wq->head = work->next; diff --git a/src/nxt_work_queue.h b/src/nxt_work_queue.h index 90c344dc..9819882e 100644 --- a/src/nxt_work_queue.h +++ b/src/nxt_work_queue.h @@ -68,6 +68,8 @@ struct nxt_work_queue_s { nxt_work_queue_cache_t *cache; #if (NXT_DEBUG) const char *name; + int32_t pid; + nxt_tid_t tid; #endif }; |