summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_buf.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/nxt_buf.c153
1 files changed, 135 insertions, 18 deletions
diff --git a/src/nxt_buf.c b/src/nxt_buf.c
index cb0faf7c..826cd017 100644
--- a/src/nxt_buf.c
+++ b/src/nxt_buf.c
@@ -8,13 +8,18 @@
static void nxt_buf_completion(nxt_task_t *task, void *obj, void *data);
+static void nxt_buf_ts_completion(nxt_task_t *task, void *obj, void *data);
+
+
+typedef struct {
+ nxt_work_t work;
+ nxt_event_engine_t *engine;
+} nxt_buf_ts_t;
void
nxt_buf_mem_init(nxt_buf_t *b, void *start, size_t size)
{
- b->size = NXT_BUF_MEM_SIZE;
-
b->mem.start = start;
b->mem.pos = start;
b->mem.free = start;
@@ -27,26 +32,60 @@ nxt_buf_mem_alloc(nxt_mp_t *mp, size_t size, nxt_uint_t flags)
{
nxt_buf_t *b;
- b = nxt_mp_zalloc(mp, NXT_BUF_MEM_SIZE);
+ b = nxt_mp_alloc(mp, NXT_BUF_MEM_SIZE + size);
if (nxt_slow_path(b == NULL)) {
return NULL;
}
+ nxt_memzero(b, NXT_BUF_MEM_SIZE);
+
b->data = mp;
b->completion_handler = nxt_buf_completion;
- b->size = NXT_BUF_MEM_SIZE;
if (size != 0) {
- b->mem.start = nxt_mp_alloc(mp, size);
- if (nxt_slow_path(b->mem.start == NULL)) {
- return NULL;
- }
+ b->mem.start = nxt_pointer_to(b, NXT_BUF_MEM_SIZE);
+ b->mem.pos = b->mem.start;
+ b->mem.free = b->mem.start;
+ b->mem.end = b->mem.start + size;
+ }
+
+ return b;
+}
+
+
+nxt_buf_t *
+nxt_buf_mem_ts_alloc(nxt_task_t *task, nxt_mp_t *mp, size_t size)
+{
+ nxt_buf_t *b;
+ nxt_buf_ts_t *ts;
+
+ b = nxt_mp_retain(mp, NXT_BUF_MEM_SIZE + sizeof(nxt_buf_ts_t) + size);
+ if (nxt_slow_path(b == NULL)) {
+ return NULL;
+ }
+
+ nxt_memzero(b, NXT_BUF_MEM_SIZE + sizeof(nxt_buf_ts_t));
+ b->data = mp;
+ b->completion_handler = nxt_buf_ts_completion;
+ b->is_ts = 1;
+
+ if (size != 0) {
+ b->mem.start = nxt_pointer_to(b, NXT_BUF_MEM_SIZE +
+ sizeof(nxt_buf_ts_t));
b->mem.pos = b->mem.start;
b->mem.free = b->mem.start;
b->mem.end = b->mem.start + size;
}
+ ts = nxt_pointer_to(b, NXT_BUF_MEM_SIZE);
+ ts->engine = task->thread->engine;
+
+ ts->work.handler = nxt_buf_ts_completion;
+ ts->work.task = task;
+ ts->work.obj = b;
+ ts->work.data = b->parent;
+
return b;
}
@@ -56,22 +95,19 @@ nxt_buf_file_alloc(nxt_mp_t *mp, size_t size, nxt_uint_t flags)
{
nxt_buf_t *b;
- b = nxt_mp_zalloc(mp, NXT_BUF_FILE_SIZE);
+ b = nxt_mp_alloc(mp, NXT_BUF_FILE_SIZE + size);
if (nxt_slow_path(b == NULL)) {
return NULL;
}
+ nxt_memzero(b, NXT_BUF_FILE_SIZE);
+
b->data = mp;
b->completion_handler = nxt_buf_completion;
- b->size = NXT_BUF_FILE_SIZE;
nxt_buf_set_file(b);
if (size != 0) {
- b->mem.start = nxt_mp_alloc(mp, size);
- if (nxt_slow_path(b->mem.start == NULL)) {
- return NULL;
- }
-
+ b->mem.start = nxt_pointer_to(b, NXT_BUF_FILE_SIZE);
b->mem.pos = b->mem.start;
b->mem.free = b->mem.start;
b->mem.end = b->mem.start + size;
@@ -91,7 +127,6 @@ nxt_buf_mmap_alloc(nxt_mp_t *mp, size_t size)
if (nxt_fast_path(b != NULL)) {
b->data = mp;
b->completion_handler = nxt_buf_completion;
- b->size = NXT_BUF_MMAP_SIZE;
nxt_buf_set_file(b);
nxt_buf_set_mmap(b);
@@ -112,7 +147,6 @@ nxt_buf_sync_alloc(nxt_mp_t *mp, nxt_uint_t flags)
if (nxt_fast_path(b != NULL)) {
b->data = mp;
b->completion_handler = nxt_buf_completion;
- b->size = NXT_BUF_SYNC_SIZE;
nxt_buf_set_sync(b);
b->is_nobuf = ((flags & NXT_BUF_SYNC_NOBUF) != 0);
@@ -166,8 +200,91 @@ nxt_buf_completion(nxt_task_t *task, void *obj, void *data)
nxt_debug(task, "buf completion: %p %p", b, b->mem.start);
+#if (NXT_DEBUG)
+ if (nxt_slow_path(data != b->parent)) {
+ nxt_log_alert(task->log, "completion data (%p) != b->parent (%p)",
+ data, b->parent);
+ nxt_abort();
+ }
+#endif
+
+ mp = b->data;
+ nxt_mp_free(mp, b);
+
+ if (parent != NULL) {
+ nxt_debug(task, "parent retain:%uD", parent->retain);
+
+ parent->retain--;
+
+ if (parent->retain == 0) {
+ parent->mem.pos = parent->mem.free;
+
+ parent->completion_handler(task, parent, parent->parent);
+ }
+ }
+}
+
+
+nxt_int_t
+nxt_buf_ts_handle(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_buf_t *b;
+ nxt_buf_ts_t *ts;
+
+ b = obj;
+
+#if (NXT_DEBUG)
+ if (nxt_slow_path(b->is_ts == 0)) {
+ nxt_log_alert(task->log, "not a thread safe buf (%p) completed", b);
+ nxt_abort();
+ }
+#endif
+
+ ts = nxt_pointer_to(b, NXT_BUF_MEM_SIZE);
+
+ if (ts->engine != task->thread->engine) {
+
+ nxt_debug(task, "buf ts: %p current engine is %p, expected %p",
+ b, task->thread->engine, ts->engine);
+
+ ts->work.handler = b->completion_handler;
+ ts->work.obj = obj;
+ ts->work.data = data;
+
+ nxt_event_engine_post(ts->engine, &ts->work);
+
+ return 1;
+ }
+
+ return 0;
+}
+
+
+static void
+nxt_buf_ts_completion(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_mp_t *mp;
+ nxt_buf_t *b, *parent;
+
+ b = obj;
+ parent = data;
+
+ if (nxt_buf_ts_handle(task, obj, data)) {
+ return;
+ }
+
+ nxt_debug(task, "buf ts completion: %p %p", b, b->mem.start);
+
+#if (NXT_DEBUG)
+ if (nxt_slow_path(data != b->parent)) {
+ nxt_log_alert(task->log, "completion data (%p) != b->parent (%p)",
+ data, b->parent);
+ nxt_abort();
+ }
+#endif
+
mp = b->data;
- nxt_buf_free(mp, b);
+ nxt_mp_release(mp, b);
if (parent != NULL) {
nxt_debug(task, "parent retain:%uD", parent->retain);