summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/nxt_port_socket.c12
-rw-r--r--src/nxt_work_queue.c37
-rw-r--r--src/nxt_work_queue.h2
3 files changed, 44 insertions, 7 deletions
diff --git a/src/nxt_port_socket.c b/src/nxt_port_socket.c
index 883c45bb..56dfe755 100644
--- a/src/nxt_port_socket.c
+++ b/src/nxt_port_socket.c
@@ -235,6 +235,7 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
ssize_t n;
nxt_port_t *port;
struct iovec iov[NXT_IOBUF_MAX * 10];
+ nxt_work_queue_t *wq;
nxt_queue_link_t *link;
nxt_port_method_t m;
nxt_port_send_msg_t *msg;
@@ -301,22 +302,19 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
goto fail;
}
+ wq = &task->thread->engine->fast_work_queue;
+
if (msg->buf != plain_buf) {
/*
* Complete crafted mmap_msgs buf and restore msg->buf
* for regular completion call.
*/
- nxt_port_mmap_completion(task,
- port->socket.write_work_queue,
- msg->buf);
+ nxt_port_mmap_completion(task, wq, msg->buf);
msg->buf = plain_buf;
}
- msg->buf = nxt_sendbuf_completion(task,
- port->socket.write_work_queue,
- msg->buf,
- plain_size);
+ msg->buf = nxt_sendbuf_completion(task, wq, msg->buf, plain_size);
if (msg->buf != NULL) {
/*
diff --git a/src/nxt_work_queue.c b/src/nxt_work_queue.c
index ecdc65ff..85f5cc3e 100644
--- a/src/nxt_work_queue.c
+++ b/src/nxt_work_queue.c
@@ -32,6 +32,39 @@ static void nxt_work_queue_allocate(nxt_work_queue_cache_t *cache);
static nxt_uint_t nxt_work_queue_bucket_items = 409;
+#if (NXT_DEBUG)
+
+nxt_inline void
+nxt_work_queue_thread_assert(nxt_work_queue_t *wq)
+{
+ nxt_tid_t tid;
+ nxt_thread_t *thread;
+
+ thread = nxt_thread();
+ tid = nxt_thread_tid(thread);
+
+ if (nxt_fast_path(wq->tid == tid)) {
+ return;
+ }
+
+ if (nxt_slow_path(nxt_pid != wq->pid)) {
+ wq->pid = nxt_pid;
+ wq->tid = tid;
+
+ return;
+ }
+
+ nxt_log_alert(thread->log, "work queue locked by thread %PT", wq->tid);
+ nxt_abort();
+}
+
+#else
+
+#define nxt_work_queue_thread_assert(wq)
+
+#endif
+
+
void
nxt_work_queue_cache_create(nxt_work_queue_cache_t *cache, size_t chunk_size)
{
@@ -109,6 +142,8 @@ nxt_work_queue_add(nxt_work_queue_t *wq, nxt_work_handler_t handler,
{
nxt_work_t *work;
+ nxt_work_queue_thread_assert(wq);
+
for ( ;; ) {
work = wq->cache->next;
@@ -144,6 +179,8 @@ nxt_work_queue_pop(nxt_work_queue_t *wq, nxt_task_t **task, void **obj,
{
nxt_work_t *work;
+ nxt_work_queue_thread_assert(wq);
+
work = wq->head;
wq->head = work->next;
diff --git a/src/nxt_work_queue.h b/src/nxt_work_queue.h
index 90c344dc..9819882e 100644
--- a/src/nxt_work_queue.h
+++ b/src/nxt_work_queue.h
@@ -68,6 +68,8 @@ struct nxt_work_queue_s {
nxt_work_queue_cache_t *cache;
#if (NXT_DEBUG)
const char *name;
+ int32_t pid;
+ nxt_tid_t tid;
#endif
};