diff options
-rw-r--r-- | src/nxt_buf.c | 153 | ||||
-rw-r--r-- | src/nxt_buf.h | 9 | ||||
-rw-r--r-- | src/nxt_buf_pool.c | 7 | ||||
-rw-r--r-- | src/nxt_log.h | 3 | ||||
-rw-r--r-- | src/nxt_port.c | 22 | ||||
-rw-r--r-- | src/nxt_port.h | 5 | ||||
-rw-r--r-- | src/nxt_port_memory.c | 29 | ||||
-rw-r--r-- | src/nxt_port_socket.c | 46 | ||||
-rw-r--r-- | src/nxt_router.c | 41 |
9 files changed, 236 insertions, 79 deletions
diff --git a/src/nxt_buf.c b/src/nxt_buf.c index cb0faf7c..826cd017 100644 --- a/src/nxt_buf.c +++ b/src/nxt_buf.c @@ -8,13 +8,18 @@ static void nxt_buf_completion(nxt_task_t *task, void *obj, void *data); +static void nxt_buf_ts_completion(nxt_task_t *task, void *obj, void *data); + + +typedef struct { + nxt_work_t work; + nxt_event_engine_t *engine; +} nxt_buf_ts_t; void nxt_buf_mem_init(nxt_buf_t *b, void *start, size_t size) { - b->size = NXT_BUF_MEM_SIZE; - b->mem.start = start; b->mem.pos = start; b->mem.free = start; @@ -27,26 +32,60 @@ nxt_buf_mem_alloc(nxt_mp_t *mp, size_t size, nxt_uint_t flags) { nxt_buf_t *b; - b = nxt_mp_zalloc(mp, NXT_BUF_MEM_SIZE); + b = nxt_mp_alloc(mp, NXT_BUF_MEM_SIZE + size); if (nxt_slow_path(b == NULL)) { return NULL; } + nxt_memzero(b, NXT_BUF_MEM_SIZE); + b->data = mp; b->completion_handler = nxt_buf_completion; - b->size = NXT_BUF_MEM_SIZE; if (size != 0) { - b->mem.start = nxt_mp_alloc(mp, size); - if (nxt_slow_path(b->mem.start == NULL)) { - return NULL; - } + b->mem.start = nxt_pointer_to(b, NXT_BUF_MEM_SIZE); + b->mem.pos = b->mem.start; + b->mem.free = b->mem.start; + b->mem.end = b->mem.start + size; + } + + return b; +} + + +nxt_buf_t * +nxt_buf_mem_ts_alloc(nxt_task_t *task, nxt_mp_t *mp, size_t size) +{ + nxt_buf_t *b; + nxt_buf_ts_t *ts; + + b = nxt_mp_retain(mp, NXT_BUF_MEM_SIZE + sizeof(nxt_buf_ts_t) + size); + if (nxt_slow_path(b == NULL)) { + return NULL; + } + + nxt_memzero(b, NXT_BUF_MEM_SIZE + sizeof(nxt_buf_ts_t)); + b->data = mp; + b->completion_handler = nxt_buf_ts_completion; + b->is_ts = 1; + + if (size != 0) { + b->mem.start = nxt_pointer_to(b, NXT_BUF_MEM_SIZE + + sizeof(nxt_buf_ts_t)); b->mem.pos = b->mem.start; b->mem.free = b->mem.start; b->mem.end = b->mem.start + size; } + ts = nxt_pointer_to(b, NXT_BUF_MEM_SIZE); + ts->engine = task->thread->engine; + + ts->work.handler = nxt_buf_ts_completion; + ts->work.task = task; + ts->work.obj = b; + ts->work.data = b->parent; + return b; } @@ -56,22 +95,19 @@ nxt_buf_file_alloc(nxt_mp_t *mp, size_t size, nxt_uint_t flags) { nxt_buf_t *b; - b = nxt_mp_zalloc(mp, NXT_BUF_FILE_SIZE); + b = nxt_mp_alloc(mp, NXT_BUF_FILE_SIZE + size); if (nxt_slow_path(b == NULL)) { return NULL; } + nxt_memzero(b, NXT_BUF_FILE_SIZE); + b->data = mp; b->completion_handler = nxt_buf_completion; - b->size = NXT_BUF_FILE_SIZE; nxt_buf_set_file(b); if (size != 0) { - b->mem.start = nxt_mp_alloc(mp, size); - if (nxt_slow_path(b->mem.start == NULL)) { - return NULL; - } - + b->mem.start = nxt_pointer_to(b, NXT_BUF_FILE_SIZE); b->mem.pos = b->mem.start; b->mem.free = b->mem.start; b->mem.end = b->mem.start + size; @@ -91,7 +127,6 @@ nxt_buf_mmap_alloc(nxt_mp_t *mp, size_t size) if (nxt_fast_path(b != NULL)) { b->data = mp; b->completion_handler = nxt_buf_completion; - b->size = NXT_BUF_MMAP_SIZE; nxt_buf_set_file(b); nxt_buf_set_mmap(b); @@ -112,7 +147,6 @@ nxt_buf_sync_alloc(nxt_mp_t *mp, nxt_uint_t flags) if (nxt_fast_path(b != NULL)) { b->data = mp; b->completion_handler = nxt_buf_completion; - b->size = NXT_BUF_SYNC_SIZE; nxt_buf_set_sync(b); b->is_nobuf = ((flags & NXT_BUF_SYNC_NOBUF) != 0); @@ -166,8 +200,91 @@ nxt_buf_completion(nxt_task_t *task, void *obj, void *data) nxt_debug(task, "buf completion: %p %p", b, b->mem.start); +#if (NXT_DEBUG) + if (nxt_slow_path(data != b->parent)) { + nxt_log_alert(task->log, "completion data (%p) != b->parent (%p)", + data, b->parent); + nxt_abort(); + } +#endif + + mp = b->data; + nxt_mp_free(mp, b); + + if (parent != NULL) { + nxt_debug(task, "parent retain:%uD", parent->retain); + + parent->retain--; + + if (parent->retain == 0) { + parent->mem.pos = parent->mem.free; + + parent->completion_handler(task, parent, parent->parent); + } + } +} + + +nxt_int_t +nxt_buf_ts_handle(nxt_task_t *task, void *obj, void *data) +{ + nxt_buf_t *b; + nxt_buf_ts_t *ts; + + b = obj; + +#if (NXT_DEBUG) + if (nxt_slow_path(b->is_ts == 0)) { + nxt_log_alert(task->log, "not a thread safe buf (%p) completed", b); + nxt_abort(); + } +#endif + + ts = nxt_pointer_to(b, NXT_BUF_MEM_SIZE); + + if (ts->engine != task->thread->engine) { + + nxt_debug(task, "buf ts: %p current engine is %p, expected %p", + b, task->thread->engine, ts->engine); + + ts->work.handler = b->completion_handler; + ts->work.obj = obj; + ts->work.data = data; + + nxt_event_engine_post(ts->engine, &ts->work); + + return 1; + } + + return 0; +} + + +static void +nxt_buf_ts_completion(nxt_task_t *task, void *obj, void *data) +{ + nxt_mp_t *mp; + nxt_buf_t *b, *parent; + + b = obj; + parent = data; + + if (nxt_buf_ts_handle(task, obj, data)) { + return; + } + + nxt_debug(task, "buf ts completion: %p %p", b, b->mem.start); + +#if (NXT_DEBUG) + if (nxt_slow_path(data != b->parent)) { + nxt_log_alert(task->log, "completion data (%p) != b->parent (%p)", + data, b->parent); + nxt_abort(); + } +#endif + mp = b->data; - nxt_buf_free(mp, b); + nxt_mp_release(mp, b); if (parent != NULL) { nxt_debug(task, "parent retain:%uD", parent->retain); diff --git a/src/nxt_buf.h b/src/nxt_buf.h index 43c0c453..ae96b24a 100644 --- a/src/nxt_buf.h +++ b/src/nxt_buf.h @@ -76,11 +76,6 @@ struct nxt_buf_s { nxt_buf_t *next; uint32_t retain; - /* - * Used by nxt_mem_cache_free() to return buffer - * in appropriate memory pool cache. - */ - uint8_t size; uint8_t is_file; /* 1 bit */ @@ -92,6 +87,7 @@ struct nxt_buf_s { uint16_t is_flush:1; uint16_t is_last:1; uint16_t is_port_mmap_sent:1; + uint16_t is_ts:1; nxt_buf_mem_t mem; @@ -245,11 +241,14 @@ nxt_buf_used_size(b) \ NXT_EXPORT void nxt_buf_mem_init(nxt_buf_t *b, void *start, size_t size); NXT_EXPORT nxt_buf_t *nxt_buf_mem_alloc(nxt_mp_t *mp, size_t size, nxt_uint_t flags); +NXT_EXPORT nxt_buf_t *nxt_buf_mem_ts_alloc(nxt_task_t *task, nxt_mp_t *mp, + size_t size); NXT_EXPORT nxt_buf_t *nxt_buf_file_alloc(nxt_mp_t *mp, size_t size, nxt_uint_t flags); NXT_EXPORT nxt_buf_t *nxt_buf_mmap_alloc(nxt_mp_t *mp, size_t size); NXT_EXPORT nxt_buf_t *nxt_buf_sync_alloc(nxt_mp_t *mp, nxt_uint_t flags); +NXT_EXPORT nxt_int_t nxt_buf_ts_handle(nxt_task_t *task, void *obj, void *data); #define \ nxt_buf_free(mp, b) \ diff --git a/src/nxt_buf_pool.c b/src/nxt_buf_pool.c index 7ef9def5..8259c60a 100644 --- a/src/nxt_buf_pool.c +++ b/src/nxt_buf_pool.c @@ -147,10 +147,6 @@ nxt_buf_pool_free(nxt_buf_pool_t *bp, nxt_buf_t *b) bp->current = NULL; } - if (!bp->mmap) { - nxt_mp_free(bp->mem_pool, b->mem.start); - } - nxt_buf_free(bp->mem_pool, b); return; @@ -176,15 +172,12 @@ nxt_buf_pool_free(nxt_buf_pool_t *bp, nxt_buf_t *b) void nxt_buf_pool_destroy(nxt_buf_pool_t *bp) { - u_char *p; nxt_buf_t *b; bp->destroy = 1; for (b = bp->free; b != NULL; b = b->next) { - p = b->mem.start; nxt_buf_free(bp->mem_pool, b); - nxt_mp_free(bp->mem_pool, p); } bp->free = b; /* NULL */ diff --git a/src/nxt_log.h b/src/nxt_log.h index e1b69442..74260576 100644 --- a/src/nxt_log.h +++ b/src/nxt_log.h @@ -122,7 +122,8 @@ nxt_log_debug(_log, ...) \ } while (0) -#define nxt_debug_alloc(...) +#define nxt_debug_alloc(_fmt, ARGS...) \ + nxt_thread_log_debug("%08p: " _fmt, mp, ##ARGS) #else diff --git a/src/nxt_port.c b/src/nxt_port.c index 91f0fe31..249a287c 100644 --- a/src/nxt_port.c +++ b/src/nxt_port.c @@ -10,8 +10,6 @@ static void nxt_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); -static void nxt_port_new_port_buf_completion(nxt_task_t *task, void *obj, - void *data); void @@ -102,13 +100,10 @@ nxt_port_send_new_port(nxt_task_t *task, nxt_runtime_t *rt, nxt_int_t nxt_port_send_port(nxt_task_t *task, nxt_port_t *port, nxt_port_t *new_port) { - nxt_mp_t *mp; nxt_buf_t *b; nxt_port_msg_new_port_t *msg; - mp = port->mem_pool; - - b = nxt_buf_mem_alloc(mp, sizeof(nxt_port_data_t), 0); + b = nxt_buf_mem_ts_alloc(task, port->mem_pool, sizeof(nxt_port_data_t)); if (nxt_slow_path(b == NULL)) { return NXT_ERROR; } @@ -116,8 +111,6 @@ nxt_port_send_port(nxt_task_t *task, nxt_port_t *port, nxt_port_t *new_port) nxt_debug(task, "send port %FD to process %PI", new_port->pair[1], port->pid); - b->data = mp; - b->completion_handler = nxt_port_new_port_buf_completion; b->mem.free += sizeof(nxt_port_msg_new_port_t); msg = (nxt_port_msg_new_port_t *) b->mem.pos; @@ -133,19 +126,6 @@ nxt_port_send_port(nxt_task_t *task, nxt_port_t *port, nxt_port_t *new_port) } -static void -nxt_port_new_port_buf_completion(nxt_task_t *task, void *obj, void *data) -{ - nxt_mp_t *mp; - nxt_buf_t *b; - - b = obj; - mp = b->data; - - nxt_buf_free(mp, b); -} - - void nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { diff --git a/src/nxt_port.h b/src/nxt_port.h index 66534cdc..927e0e2e 100644 --- a/src/nxt_port.h +++ b/src/nxt_port.h @@ -36,10 +36,13 @@ typedef struct { typedef struct { nxt_queue_link_t link; nxt_buf_t *buf; - nxt_mp_t *mem_pool; size_t share; nxt_fd_t fd; nxt_port_msg_t port_msg; + + nxt_work_t work; + nxt_event_engine_t *engine; + nxt_mp_t *mem_pool; } nxt_port_send_msg_t; diff --git a/src/nxt_port_memory.c b/src/nxt_port_memory.c index ba39c9c1..cf69a758 100644 --- a/src/nxt_port_memory.c +++ b/src/nxt_port_memory.c @@ -35,6 +35,10 @@ nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data) nxt_chunk_id_t c; nxt_port_mmap_header_t *hdr; + if (nxt_buf_ts_handle(task, obj, data)) { + return; + } + b = obj; nxt_debug(task, "mmap buf completion: %p %p", b, b->mem.start); @@ -71,7 +75,7 @@ nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data) c++; } - nxt_buf_free(mp, b); + nxt_mp_release(mp, b); } @@ -150,6 +154,10 @@ nxt_port_mmap_send_fd_buf_completion(nxt_task_t *task, void *obj, void *data) nxt_buf_t *b; nxt_mp_t *mp; + if (nxt_buf_ts_handle(task, obj, data)) { + return; + } + b = obj; mp = b->data; fd = (nxt_fd_t) (intptr_t) data; @@ -166,7 +174,7 @@ nxt_port_mmap_send_fd_buf_completion(nxt_task_t *task, void *obj, void *data) nxt_fd_close(fd); - nxt_buf_free(mp, b); + nxt_mp_release(mp, b); } @@ -252,13 +260,12 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process, port_mmap->hdr = mem; - b = nxt_buf_mem_alloc(port->mem_pool, 0, 0); + b = nxt_buf_mem_ts_alloc(task, port->mem_pool, 0); if (nxt_slow_path(b == NULL)) { goto remove_fail; } b->completion_handler = nxt_port_mmap_send_fd_buf_completion; - b->data = port->mem_pool; b->parent = (void *) (intptr_t) fd; /* Init segment header. */ @@ -396,24 +403,22 @@ nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size) return NULL; } - b = nxt_mp_zalloc(port->mem_pool, NXT_BUF_PORT_MMAP_SIZE); + b = nxt_buf_mem_ts_alloc(task, port->mem_pool, 0); if (nxt_slow_path(b == NULL)) { return NULL; } - b->data = port->mem_pool; b->completion_handler = nxt_port_mmap_buf_completion; - b->size = NXT_BUF_PORT_MMAP_SIZE; - nxt_buf_set_port_mmap(b); hdr = nxt_port_mmap_get(task, port, &c, size); if (nxt_slow_path(hdr == NULL)) { - nxt_buf_free(port->mem_pool, b); + nxt_mp_release(port->mem_pool, b); return NULL; } b->parent = hdr; + b->mem.start = nxt_port_mmap_chunk_start(hdr, c); b->mem.pos = b->mem.start; b->mem.free = b->mem.start; @@ -517,14 +522,12 @@ nxt_port_mmap_get_incoming_buf(nxt_task_t *task, nxt_port_t *port, return NULL; } - b = nxt_mp_zalloc(port->mem_pool, NXT_BUF_PORT_MMAP_SIZE); + b = nxt_buf_mem_ts_alloc(task, port->mem_pool, 0); if (nxt_slow_path(b == NULL)) { return NULL; } - b->data = port->mem_pool; b->completion_handler = nxt_port_mmap_buf_completion; - b->size = NXT_BUF_PORT_MMAP_SIZE; nxt_buf_set_port_mmap(b); @@ -559,7 +562,7 @@ nxt_port_mmap_write(nxt_task_t *task, nxt_port_t *port, bsize = sb->niov * sizeof(nxt_port_mmap_msg_t); - b = nxt_buf_mem_alloc(port->mem_pool, bsize, 0); + b = nxt_buf_mem_ts_alloc(task, port->mem_pool, bsize); if (nxt_slow_path(b == NULL)) { return; } diff --git a/src/nxt_port_socket.c b/src/nxt_port_socket.c index 3bf57479..73bb44bd 100644 --- a/src/nxt_port_socket.c +++ b/src/nxt_port_socket.c @@ -135,6 +135,37 @@ nxt_port_write_close(nxt_port_t *port) } +static void +nxt_port_release_send_msg(nxt_task_t *task, void *obj, void *data) +{ + nxt_event_engine_t *engine; + nxt_port_send_msg_t *msg; + + msg = obj; + engine = data; + +#if (NXT_DEBUG) + if (nxt_slow_path(data != msg->engine)) { + nxt_log_alert(task->log, "release msg data (%p) != msg->engine (%p)", + data, msg->engine); + nxt_abort(); + } +#endif + + if (engine != task->thread->engine) { + + nxt_debug(task, "current thread is %PT, expected %PT", + task->thread->tid, engine->task.thread->tid); + + nxt_event_engine_post(engine, &msg->work); + + return; + } + + nxt_mp_release(msg->mem_pool, obj); +} + + nxt_int_t nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port, nxt_buf_t *b) @@ -160,14 +191,25 @@ nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, } } - msg = nxt_mp_zalloc(port->mem_pool, sizeof(nxt_port_send_msg_t)); + msg = nxt_mp_retain(port->mem_pool, sizeof(nxt_port_send_msg_t)); if (nxt_slow_path(msg == NULL)) { return NXT_ERROR; } + msg->link.next = NULL; + msg->link.prev = NULL; + msg->buf = b; msg->fd = fd; msg->share = 0; + + msg->work.next = NULL; + msg->work.handler = nxt_port_release_send_msg; + msg->work.task = task; + msg->work.obj = msg; + msg->work.data = task->thread->engine; + + msg->engine = task->thread->engine; msg->mem_pool = port->mem_pool; msg->port_msg.stream = stream; @@ -292,7 +334,7 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data) } else { nxt_queue_remove(link); - nxt_mp_free(msg->mem_pool, msg); + nxt_port_release_send_msg(task, msg, msg->engine); } } else if (nxt_slow_path(n == NXT_ERROR)) { diff --git a/src/nxt_router.c b/src/nxt_router.c index f063d87d..8971fb5b 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -788,7 +788,6 @@ static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt, nxt_event_engine_t *engine) { - nxt_mp_t *mp; nxt_int_t ret; nxt_port_t *port; nxt_process_t *process; @@ -824,12 +823,6 @@ nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt, return ret; } - mp = nxt_mp_create(1024, 128, 256, 32); - if (nxt_slow_path(mp == NULL)) { - return NXT_ERROR; - } - - port->mem_pool = mp; port->engine = 0; port->type = NXT_PROCESS_ROUTER; @@ -1441,6 +1434,8 @@ static void nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c, nxt_app_parse_ctx_t *ap) { + nxt_mp_t *port_mp; + nxt_int_t res; nxt_port_t *port, *c_port; nxt_req_id_t req_id; nxt_app_wmsg_t wmsg; @@ -1466,6 +1461,7 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c, } while (nxt_event_engine_request_find(engine, req_id) != NULL); rc = nxt_conn_request_add(c, req_id); + if (nxt_slow_path(rc == NULL)) { // 500 Failed to allocate req->conn link nxt_log_alert(task->log, "failed to allocate req->conn link"); @@ -1476,11 +1472,20 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c, nxt_debug(task, "req_id %uxD linked to conn %p at engine %p", req_id, c, engine); + port_mp = port->mem_pool; + port->mem_pool = c->mem_pool; + c_port = nxt_process_connected_port_find(port->process, engine->port->pid, engine->port->id); if (nxt_slow_path(c_port != engine->port)) { - (void) nxt_port_send_port(task, port, engine->port); + res = nxt_port_send_port(task, port, engine->port); + + if (nxt_slow_path(res != NXT_OK)) { + // 500 Failed to send reply port + nxt_log_alert(task->log, "failed to send reply port to application"); + } + nxt_process_connected_port_add(port->process, engine->port); } @@ -1489,14 +1494,26 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c, wmsg.buf = &wmsg.write; wmsg.stream = req_id; - (void)nxt_app->prepare_msg(task, &ap->r, &wmsg); + res = nxt_app->prepare_msg(task, &ap->r, &wmsg); + + if (nxt_slow_path(res != NXT_OK)) { + // 500 Failed to prepare message + nxt_log_alert(task->log, "failed to prepare message for application"); + } nxt_debug(task, "about to send %d bytes buffer to worker port %d", nxt_buf_used_size(wmsg.write), wmsg.port->socket.fd); - (void) nxt_port_socket_write(task, wmsg.port, NXT_PORT_MSG_DATA, + res = nxt_port_socket_write(task, wmsg.port, NXT_PORT_MSG_DATA, -1, req_id, engine->port->id, wmsg.write); + + if (nxt_slow_path(res != NXT_OK)) { + // 500 Failed to send message + nxt_log_alert(task->log, "failed to send message to application"); + } + + port->mem_pool = port_mp; } @@ -1597,7 +1614,9 @@ nxt_router_conn_free(nxt_task_t *task, void *obj, void *data) } nxt_queue_loop; - nxt_mp_destroy(c->mem_pool); + nxt_queue_remove(&c->link); + + nxt_mp_release(c->mem_pool, c); } |