summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorIgor Sysoev <igor@sysoev.ru>2017-07-14 17:17:15 +0300
committerIgor Sysoev <igor@sysoev.ru>2017-07-14 17:17:15 +0300
commit3ed35d725ac4e04cfd798f682a19a198e4bb9ac7 (patch)
tree8a75e92d3c0c15f8b198a545b53f94512bea0acc
parent668aabac3c492ad1325c09eacadbfdd9c60004b8 (diff)
downloadunit-3ed35d725ac4e04cfd798f682a19a198e4bb9ac7.tar.gz
unit-3ed35d725ac4e04cfd798f682a19a198e4bb9ac7.tar.bz2
Router: using joint job queues instead of arrays to pass
listening socket handlers to worker engines.
Diffstat (limited to '')
-rw-r--r--src/nxt_router.c123
-rw-r--r--src/nxt_router.h4
2 files changed, 36 insertions, 91 deletions
diff --git a/src/nxt_router.c b/src/nxt_router.c
index 6f285c22..b9d4cc58 100644
--- a/src/nxt_router.c
+++ b/src/nxt_router.c
@@ -67,9 +67,9 @@ static nxt_int_t nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
nxt_router_engine_conf_t *recf);
static void nxt_router_engine_socket_count(nxt_queue_t *sockets);
-static nxt_int_t nxt_router_engine_joints_create(nxt_mp_t *mp,
- nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf,
- nxt_queue_t *sockets, nxt_array_t *array, nxt_work_handler_t handler);
+static nxt_int_t nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
+ nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
+ nxt_work_handler_t handler);
static nxt_int_t nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
nxt_router_engine_conf_t *recf, nxt_queue_t *sockets);
@@ -81,8 +81,7 @@ static void nxt_router_apps_sort(nxt_router_t *router,
nxt_router_temp_conf_t *tmcf);
static void nxt_router_engines_post(nxt_router_temp_conf_t *tmcf);
-static void nxt_router_engine_post(nxt_router_temp_conf_t *tmcf,
- nxt_router_engine_conf_t *recf);
+static void nxt_router_engine_post(nxt_router_engine_conf_t *recf);
static void nxt_router_thread_start(void *data);
static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj,
@@ -944,26 +943,17 @@ static nxt_int_t
nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
nxt_router_engine_conf_t *recf)
{
- nxt_mp_t *mp;
nxt_int_t ret;
nxt_thread_spinlock_t *lock;
- recf->creating = nxt_array_create(tmcf->mem_pool, 4,
- sizeof(nxt_joint_job_t));
- if (nxt_slow_path(recf->creating == NULL)) {
- return NXT_ERROR;
- }
-
- mp = tmcf->conf->mem_pool;
-
- ret = nxt_router_engine_joints_create(mp, tmcf, recf, &tmcf->creating,
- recf->creating, nxt_router_listen_socket_create);
+ ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->creating,
+ nxt_router_listen_socket_create);
if (nxt_slow_path(ret != NXT_OK)) {
return ret;
}
- ret = nxt_router_engine_joints_create(mp, tmcf, recf, &tmcf->updating,
- recf->creating, nxt_router_listen_socket_create);
+ ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->updating,
+ nxt_router_listen_socket_create);
if (nxt_slow_path(ret != NXT_OK)) {
return ret;
}
@@ -985,42 +975,21 @@ static nxt_int_t
nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
nxt_router_engine_conf_t *recf)
{
- nxt_mp_t *mp;
nxt_int_t ret;
nxt_thread_spinlock_t *lock;
- recf->creating = nxt_array_create(tmcf->mem_pool, 4,
- sizeof(nxt_joint_job_t));
- if (nxt_slow_path(recf->creating == NULL)) {
- return NXT_ERROR;
- }
-
- mp = tmcf->conf->mem_pool;
-
- ret = nxt_router_engine_joints_create(mp, tmcf, recf, &tmcf->creating,
- recf->creating, nxt_router_listen_socket_create);
+ ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->creating,
+ nxt_router_listen_socket_create);
if (nxt_slow_path(ret != NXT_OK)) {
return ret;
}
- recf->updating = nxt_array_create(tmcf->mem_pool, 4,
- sizeof(nxt_joint_job_t));
- if (nxt_slow_path(recf->updating == NULL)) {
- return NXT_ERROR;
- }
-
- ret = nxt_router_engine_joints_create(mp, tmcf, recf, &tmcf->updating,
- recf->updating, nxt_router_listen_socket_update);
+ ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->updating,
+ nxt_router_listen_socket_update);
if (nxt_slow_path(ret != NXT_OK)) {
return ret;
}
- recf->deleting = nxt_array_create(tmcf->mem_pool, 4,
- sizeof(nxt_joint_job_t));
- if (nxt_slow_path(recf->deleting == NULL)) {
- return NXT_ERROR;
- }
-
ret = nxt_router_engine_joints_delete(tmcf, recf, &tmcf->deleting);
if (nxt_slow_path(ret != NXT_OK)) {
return ret;
@@ -1044,12 +1013,6 @@ nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
{
nxt_int_t ret;
- recf->deleting = nxt_array_create(tmcf->mem_pool,
- 4, sizeof(nxt_joint_job_t));
- if (nxt_slow_path(recf->deleting == NULL)) {
- return NXT_ERROR;
- }
-
ret = nxt_router_engine_joints_delete(tmcf, recf, &tmcf->updating);
if (nxt_slow_path(ret != NXT_OK)) {
return ret;
@@ -1060,8 +1023,8 @@ nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
static nxt_int_t
-nxt_router_engine_joints_create(nxt_mp_t *mp, nxt_router_temp_conf_t *tmcf,
- nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, nxt_array_t *array,
+nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
+ nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
nxt_work_handler_t handler)
{
nxt_joint_job_t *job;
@@ -1072,19 +1035,24 @@ nxt_router_engine_joints_create(nxt_mp_t *mp, nxt_router_temp_conf_t *tmcf,
qlk != nxt_queue_tail(sockets);
qlk = nxt_queue_next(qlk))
{
- job = nxt_array_add(array);
+ job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
if (nxt_slow_path(job == NULL)) {
return NXT_ERROR;
}
+ job->work.next = recf->jobs;
+ recf->jobs = &job->work;
+
job->task = tmcf->engine->task;
- job->work.next = NULL;
job->work.handler = handler;
job->work.task = &job->task;
job->work.obj = job;
job->tmcf = tmcf;
- joint = nxt_mp_alloc(mp, sizeof(nxt_socket_conf_joint_t));
+ tmcf->count++;
+
+ joint = nxt_mp_alloc(tmcf->conf->mem_pool,
+ sizeof(nxt_socket_conf_joint_t));
if (nxt_slow_path(joint == NULL)) {
return NXT_ERROR;
}
@@ -1129,18 +1097,22 @@ nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
qlk != nxt_queue_tail(sockets);
qlk = nxt_queue_next(qlk))
{
- job = nxt_array_add(recf->deleting);
+ job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
if (nxt_slow_path(job == NULL)) {
return NXT_ERROR;
}
+ job->work.next = recf->jobs;
+ recf->jobs = &job->work;
+
job->task = tmcf->engine->task;
- job->work.next = NULL;
job->work.handler = nxt_router_listen_socket_delete;
job->work.task = &job->task;
job->work.obj = job;
job->work.data = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
job->tmcf = tmcf;
+
+ tmcf->count++;
}
return NXT_OK;
@@ -1231,47 +1203,22 @@ nxt_router_engines_post(nxt_router_temp_conf_t *tmcf)
recf = tmcf->engines->elts;
for (n = tmcf->engines->nelts; n != 0; n--) {
- nxt_router_engine_post(tmcf, recf);
+ nxt_router_engine_post(recf);
recf++;
}
}
static void
-nxt_router_engine_post(nxt_router_temp_conf_t *tmcf,
- nxt_router_engine_conf_t *recf)
+nxt_router_engine_post(nxt_router_engine_conf_t *recf)
{
- nxt_uint_t n;
- nxt_joint_job_t *job;
-
- if (recf->creating != NULL) {
- job = recf->creating->elts;
+ nxt_work_t *work, *next;
- for (n = recf->creating->nelts; n != 0; n--) {
- nxt_event_engine_post(recf->engine, &job->work);
- job++;
- tmcf->count++;
- }
- }
-
- if (recf->updating != NULL) {
- job = recf->updating->elts;
-
- for (n = recf->updating->nelts; n != 0; n--) {
- nxt_event_engine_post(recf->engine, &job->work);
- job++;
- tmcf->count++;
- }
- }
-
- if (recf->deleting != NULL) {
- job = recf->deleting->elts;
+ for (work = recf->jobs; work != NULL; work = next) {
+ next = work->next;
+ work->next = NULL;
- for (n = recf->deleting->nelts; n != 0; n--) {
- nxt_event_engine_post(recf->engine, &job->work);
- job++;
- tmcf->count++;
- }
+ nxt_event_engine_post(recf->engine, work);
}
}
diff --git a/src/nxt_router.h b/src/nxt_router.h
index 6f7c8d61..5c96bdac 100644
--- a/src/nxt_router.h
+++ b/src/nxt_router.h
@@ -35,9 +35,7 @@ typedef struct {
typedef struct {
nxt_event_engine_t *engine;
- nxt_array_t *creating; /* of nxt_joint_job_t */
- nxt_array_t *updating; /* of nxt_joint_job_t */
- nxt_array_t *deleting; /* of nxt_joint_job_t */
+ nxt_work_t *jobs;
} nxt_router_engine_conf_t;