diff options
Diffstat (limited to 'src/nxt_buf.c')
-rw-r--r-- | src/nxt_buf.c | 153 |
1 files changed, 135 insertions, 18 deletions
diff --git a/src/nxt_buf.c b/src/nxt_buf.c index cb0faf7c..826cd017 100644 --- a/src/nxt_buf.c +++ b/src/nxt_buf.c @@ -8,13 +8,18 @@ static void nxt_buf_completion(nxt_task_t *task, void *obj, void *data); +static void nxt_buf_ts_completion(nxt_task_t *task, void *obj, void *data); + + +typedef struct { + nxt_work_t work; + nxt_event_engine_t *engine; +} nxt_buf_ts_t; void nxt_buf_mem_init(nxt_buf_t *b, void *start, size_t size) { - b->size = NXT_BUF_MEM_SIZE; - b->mem.start = start; b->mem.pos = start; b->mem.free = start; @@ -27,26 +32,60 @@ nxt_buf_mem_alloc(nxt_mp_t *mp, size_t size, nxt_uint_t flags) { nxt_buf_t *b; - b = nxt_mp_zalloc(mp, NXT_BUF_MEM_SIZE); + b = nxt_mp_alloc(mp, NXT_BUF_MEM_SIZE + size); if (nxt_slow_path(b == NULL)) { return NULL; } + nxt_memzero(b, NXT_BUF_MEM_SIZE); + b->data = mp; b->completion_handler = nxt_buf_completion; - b->size = NXT_BUF_MEM_SIZE; if (size != 0) { - b->mem.start = nxt_mp_alloc(mp, size); - if (nxt_slow_path(b->mem.start == NULL)) { - return NULL; - } + b->mem.start = nxt_pointer_to(b, NXT_BUF_MEM_SIZE); + b->mem.pos = b->mem.start; + b->mem.free = b->mem.start; + b->mem.end = b->mem.start + size; + } + + return b; +} + + +nxt_buf_t * +nxt_buf_mem_ts_alloc(nxt_task_t *task, nxt_mp_t *mp, size_t size) +{ + nxt_buf_t *b; + nxt_buf_ts_t *ts; + + b = nxt_mp_retain(mp, NXT_BUF_MEM_SIZE + sizeof(nxt_buf_ts_t) + size); + if (nxt_slow_path(b == NULL)) { + return NULL; + } + + nxt_memzero(b, NXT_BUF_MEM_SIZE + sizeof(nxt_buf_ts_t)); + b->data = mp; + b->completion_handler = nxt_buf_ts_completion; + b->is_ts = 1; + + if (size != 0) { + b->mem.start = nxt_pointer_to(b, NXT_BUF_MEM_SIZE + + sizeof(nxt_buf_ts_t)); b->mem.pos = b->mem.start; b->mem.free = b->mem.start; b->mem.end = b->mem.start + size; } + ts = nxt_pointer_to(b, NXT_BUF_MEM_SIZE); + ts->engine = task->thread->engine; + + ts->work.handler = nxt_buf_ts_completion; + ts->work.task = task; + ts->work.obj = b; + ts->work.data = b->parent; + return b; } @@ -56,22 +95,19 @@ nxt_buf_file_alloc(nxt_mp_t *mp, size_t size, nxt_uint_t flags) { nxt_buf_t *b; - b = nxt_mp_zalloc(mp, NXT_BUF_FILE_SIZE); + b = nxt_mp_alloc(mp, NXT_BUF_FILE_SIZE + size); if (nxt_slow_path(b == NULL)) { return NULL; } + nxt_memzero(b, NXT_BUF_FILE_SIZE); + b->data = mp; b->completion_handler = nxt_buf_completion; - b->size = NXT_BUF_FILE_SIZE; nxt_buf_set_file(b); if (size != 0) { - b->mem.start = nxt_mp_alloc(mp, size); - if (nxt_slow_path(b->mem.start == NULL)) { - return NULL; - } - + b->mem.start = nxt_pointer_to(b, NXT_BUF_FILE_SIZE); b->mem.pos = b->mem.start; b->mem.free = b->mem.start; b->mem.end = b->mem.start + size; @@ -91,7 +127,6 @@ nxt_buf_mmap_alloc(nxt_mp_t *mp, size_t size) if (nxt_fast_path(b != NULL)) { b->data = mp; b->completion_handler = nxt_buf_completion; - b->size = NXT_BUF_MMAP_SIZE; nxt_buf_set_file(b); nxt_buf_set_mmap(b); @@ -112,7 +147,6 @@ nxt_buf_sync_alloc(nxt_mp_t *mp, nxt_uint_t flags) if (nxt_fast_path(b != NULL)) { b->data = mp; b->completion_handler = nxt_buf_completion; - b->size = NXT_BUF_SYNC_SIZE; nxt_buf_set_sync(b); b->is_nobuf = ((flags & NXT_BUF_SYNC_NOBUF) != 0); @@ -166,8 +200,91 @@ nxt_buf_completion(nxt_task_t *task, void *obj, void *data) nxt_debug(task, "buf completion: %p %p", b, b->mem.start); +#if (NXT_DEBUG) + if (nxt_slow_path(data != b->parent)) { + nxt_log_alert(task->log, "completion data (%p) != b->parent (%p)", + data, b->parent); + nxt_abort(); + } +#endif + + mp = b->data; + nxt_mp_free(mp, b); + + if (parent != NULL) { + nxt_debug(task, "parent retain:%uD", parent->retain); + + parent->retain--; + + if (parent->retain == 0) { + parent->mem.pos = parent->mem.free; + + parent->completion_handler(task, parent, parent->parent); + } + } +} + + +nxt_int_t +nxt_buf_ts_handle(nxt_task_t *task, void *obj, void *data) +{ + nxt_buf_t *b; + nxt_buf_ts_t *ts; + + b = obj; + +#if (NXT_DEBUG) + if (nxt_slow_path(b->is_ts == 0)) { + nxt_log_alert(task->log, "not a thread safe buf (%p) completed", b); + nxt_abort(); + } +#endif + + ts = nxt_pointer_to(b, NXT_BUF_MEM_SIZE); + + if (ts->engine != task->thread->engine) { + + nxt_debug(task, "buf ts: %p current engine is %p, expected %p", + b, task->thread->engine, ts->engine); + + ts->work.handler = b->completion_handler; + ts->work.obj = obj; + ts->work.data = data; + + nxt_event_engine_post(ts->engine, &ts->work); + + return 1; + } + + return 0; +} + + +static void +nxt_buf_ts_completion(nxt_task_t *task, void *obj, void *data) +{ + nxt_mp_t *mp; + nxt_buf_t *b, *parent; + + b = obj; + parent = data; + + if (nxt_buf_ts_handle(task, obj, data)) { + return; + } + + nxt_debug(task, "buf ts completion: %p %p", b, b->mem.start); + +#if (NXT_DEBUG) + if (nxt_slow_path(data != b->parent)) { + nxt_log_alert(task->log, "completion data (%p) != b->parent (%p)", + data, b->parent); + nxt_abort(); + } +#endif + mp = b->data; - nxt_buf_free(mp, b); + nxt_mp_release(mp, b); if (parent != NULL) { nxt_debug(task, "parent retain:%uD", parent->retain); |