summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_job.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/nxt_job.c202
1 files changed, 202 insertions, 0 deletions
diff --git a/src/nxt_job.c b/src/nxt_job.c
new file mode 100644
index 00000000..2b7d8818
--- /dev/null
+++ b/src/nxt_job.c
@@ -0,0 +1,202 @@
+
+/*
+ * Copyright (C) Igor Sysoev
+ * Copyright (C) NGINX, Inc.
+ */
+
+#include <nxt_main.h>
+
+
+#if (NXT_THREADS)
+static void nxt_job_thread_trampoline(nxt_thread_t *thr, void *obj, void *data);
+static void nxt_job_thread_return_handler(nxt_thread_t *thr, void *obj,
+ void *data);
+#endif
+
+
+void *
+nxt_job_create(nxt_mem_pool_t *mp, size_t size)
+{
+ size_t cache_size;
+ nxt_job_t *job;
+
+ if (mp == NULL) {
+ mp = nxt_mem_pool_create(256);
+
+ if (nxt_slow_path(mp == NULL)) {
+ return NULL;
+ }
+
+ job = nxt_mem_zalloc(mp, size);
+ cache_size = 0;
+
+ } else {
+ job = nxt_mem_cache_zalloc0(mp, size);
+ cache_size = size;
+ }
+
+ if (nxt_fast_path(job != NULL)) {
+ job->cache_size = (uint16_t) cache_size;
+ job->mem_pool = mp;
+ nxt_job_set_name(job, "job");
+ }
+
+ /* Allow safe nxt_queue_remove() in nxt_job_destroy(). */
+ nxt_queue_self(&job->link);
+
+ return job;
+}
+
+
+void
+nxt_job_init(nxt_job_t *job, size_t size)
+{
+ nxt_memzero(job, size);
+
+ nxt_job_set_name(job, "job");
+
+ nxt_queue_self(&job->link);
+}
+
+
+void
+nxt_job_destroy(void *data)
+{
+ nxt_job_t *job;
+
+ job = data;
+
+ nxt_queue_remove(&job->link);
+
+ if (job->cache_size == 0) {
+
+ if (job->mem_pool != NULL) {
+ nxt_mem_pool_destroy(job->mem_pool);
+ }
+
+ } else {
+ nxt_mem_cache_free0(job->mem_pool, job, job->cache_size);
+ }
+}
+
+
+nxt_int_t
+nxt_job_cleanup_add(nxt_mem_pool_t *mp, nxt_job_t *job)
+{
+ nxt_mem_pool_cleanup_t *mpcl;
+
+ mpcl = nxt_mem_pool_cleanup(mp, 0);
+
+ if (nxt_fast_path(mpcl != NULL)) {
+ mpcl->handler = nxt_job_destroy;
+ mpcl->data = job;
+ return NXT_OK;
+ }
+
+ return NXT_ERROR;
+}
+
+
+/*
+ * The (void *) casts in nxt_thread_pool_post() and nxt_event_engine_post()
+ * calls and to the "nxt_work_handler_t" are required by Sun C.
+ */
+
+void
+nxt_job_start(nxt_thread_t *thr, nxt_job_t *job, nxt_work_handler_t handler)
+{
+ nxt_log_debug(thr->log, "%s start", job->name);
+
+#if (NXT_THREADS)
+
+ if (job->thread_pool != NULL) {
+ nxt_int_t ret;
+
+ job->engine = thr->engine;
+ ret = nxt_thread_pool_post(job->thread_pool, nxt_job_thread_trampoline,
+ job, (void *) handler, job->log);
+ if (ret == NXT_OK) {
+ return;
+ }
+
+ handler = job->abort_handler;
+ }
+
+#endif
+
+ handler(thr, job, job->data);
+}
+
+
+#if (NXT_THREADS)
+
+/* A trampoline function is called by a thread pool thread. */
+
+static void
+nxt_job_thread_trampoline(nxt_thread_t *thr, void *obj, void *data)
+{
+ nxt_job_t *job;
+ nxt_work_handler_t handler;
+
+ job = obj;
+ handler = (nxt_work_handler_t) data;
+
+ nxt_log_debug(thr->log, "%s thread", job->name);
+
+ if (nxt_slow_path(job->cancel)) {
+ nxt_job_return(thr, job, job->abort_handler);
+
+ } else {
+ handler(thr, job, job->data);
+ }
+}
+
+#endif
+
+
+void
+nxt_job_return(nxt_thread_t *thr, nxt_job_t *job, nxt_work_handler_t handler)
+{
+ nxt_log_debug(thr->log, "%s return", job->name);
+
+#if (NXT_THREADS)
+
+ if (job->engine != NULL) {
+ /* A return function is called in thread pool thread context. */
+ nxt_event_engine_post(job->engine, nxt_job_thread_return_handler,
+ job, (void *) handler, job->log);
+ return;
+ }
+
+#endif
+
+ if (nxt_slow_path(job->cancel)) {
+ nxt_log_debug(thr->log, "%s cancellation", job->name);
+ handler = job->abort_handler;
+ }
+
+ nxt_thread_work_queue_push(thr, &thr->work_queue.main,
+ handler, job, job->data, thr->log);
+}
+
+
+#if (NXT_THREADS)
+
+static void
+nxt_job_thread_return_handler(nxt_thread_t *thr, void *obj, void *data)
+{
+ nxt_job_t *job;
+ nxt_work_handler_t handler;
+
+ job = obj;
+ handler = (nxt_work_handler_t) data;
+
+ if (nxt_slow_path(job->cancel)) {
+ nxt_log_debug(thr->log, "%s cancellation", job->name);
+ handler = job->abort_handler;
+ }
+
+ handler(thr, job, job->data);
+}
+
+#endif