From f319220a6c4515f3d31e546b4f1f5de0b94aceb7 Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Fri, 7 Jul 2017 16:01:34 +0300 Subject: Redirecting buffer completion handler to specific engine. There is a case in router where we use port in router connection thread. Buffers are allocated within connection memory pool which can be used only in this router thread. sendmsg() can be postponed into main router thread and completion handler will compare current engine and post itself to correct engine. --- src/nxt_port_memory.c | 29 ++++++++++++++++------------- 1 file changed, 16 insertions(+), 13 deletions(-) (limited to 'src/nxt_port_memory.c') diff --git a/src/nxt_port_memory.c b/src/nxt_port_memory.c index ba39c9c1..cf69a758 100644 --- a/src/nxt_port_memory.c +++ b/src/nxt_port_memory.c @@ -35,6 +35,10 @@ nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data) nxt_chunk_id_t c; nxt_port_mmap_header_t *hdr; + if (nxt_buf_ts_handle(task, obj, data)) { + return; + } + b = obj; nxt_debug(task, "mmap buf completion: %p %p", b, b->mem.start); @@ -71,7 +75,7 @@ nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data) c++; } - nxt_buf_free(mp, b); + nxt_mp_release(mp, b); } @@ -150,6 +154,10 @@ nxt_port_mmap_send_fd_buf_completion(nxt_task_t *task, void *obj, void *data) nxt_buf_t *b; nxt_mp_t *mp; + if (nxt_buf_ts_handle(task, obj, data)) { + return; + } + b = obj; mp = b->data; fd = (nxt_fd_t) (intptr_t) data; @@ -166,7 +174,7 @@ nxt_port_mmap_send_fd_buf_completion(nxt_task_t *task, void *obj, void *data) nxt_fd_close(fd); - nxt_buf_free(mp, b); + nxt_mp_release(mp, b); } @@ -252,13 +260,12 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process, port_mmap->hdr = mem; - b = nxt_buf_mem_alloc(port->mem_pool, 0, 0); + b = nxt_buf_mem_ts_alloc(task, port->mem_pool, 0); if (nxt_slow_path(b == NULL)) { goto remove_fail; } b->completion_handler = nxt_port_mmap_send_fd_buf_completion; - b->data = port->mem_pool; b->parent = (void *) (intptr_t) fd; /* Init segment header. */ @@ -396,24 +403,22 @@ nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size) return NULL; } - b = nxt_mp_zalloc(port->mem_pool, NXT_BUF_PORT_MMAP_SIZE); + b = nxt_buf_mem_ts_alloc(task, port->mem_pool, 0); if (nxt_slow_path(b == NULL)) { return NULL; } - b->data = port->mem_pool; b->completion_handler = nxt_port_mmap_buf_completion; - b->size = NXT_BUF_PORT_MMAP_SIZE; - nxt_buf_set_port_mmap(b); hdr = nxt_port_mmap_get(task, port, &c, size); if (nxt_slow_path(hdr == NULL)) { - nxt_buf_free(port->mem_pool, b); + nxt_mp_release(port->mem_pool, b); return NULL; } b->parent = hdr; + b->mem.start = nxt_port_mmap_chunk_start(hdr, c); b->mem.pos = b->mem.start; b->mem.free = b->mem.start; @@ -517,14 +522,12 @@ nxt_port_mmap_get_incoming_buf(nxt_task_t *task, nxt_port_t *port, return NULL; } - b = nxt_mp_zalloc(port->mem_pool, NXT_BUF_PORT_MMAP_SIZE); + b = nxt_buf_mem_ts_alloc(task, port->mem_pool, 0); if (nxt_slow_path(b == NULL)) { return NULL; } - b->data = port->mem_pool; b->completion_handler = nxt_port_mmap_buf_completion; - b->size = NXT_BUF_PORT_MMAP_SIZE; nxt_buf_set_port_mmap(b); @@ -559,7 +562,7 @@ nxt_port_mmap_write(nxt_task_t *task, nxt_port_t *port, bsize = sb->niov * sizeof(nxt_port_mmap_msg_t); - b = nxt_buf_mem_alloc(port->mem_pool, bsize, 0); + b = nxt_buf_mem_ts_alloc(task, port->mem_pool, bsize); if (nxt_slow_path(b == NULL)) { return; } -- cgit