diff options
author | Igor Sysoev <igor@sysoev.ru> | 2017-07-14 17:17:15 +0300 |
---|---|---|
committer | Igor Sysoev <igor@sysoev.ru> | 2017-07-14 17:17:15 +0300 |
commit | 3ed35d725ac4e04cfd798f682a19a198e4bb9ac7 (patch) | |
tree | 8a75e92d3c0c15f8b198a545b53f94512bea0acc | |
parent | 668aabac3c492ad1325c09eacadbfdd9c60004b8 (diff) | |
download | unit-3ed35d725ac4e04cfd798f682a19a198e4bb9ac7.tar.gz unit-3ed35d725ac4e04cfd798f682a19a198e4bb9ac7.tar.bz2 |
Router: using joint job queues instead of arrays to pass
listening socket handlers to worker engines.
Diffstat (limited to '')
-rw-r--r-- | src/nxt_router.c | 123 | ||||
-rw-r--r-- | src/nxt_router.h | 4 |
2 files changed, 36 insertions, 91 deletions
diff --git a/src/nxt_router.c b/src/nxt_router.c index 6f285c22..b9d4cc58 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -67,9 +67,9 @@ static nxt_int_t nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf, static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf); static void nxt_router_engine_socket_count(nxt_queue_t *sockets); -static nxt_int_t nxt_router_engine_joints_create(nxt_mp_t *mp, - nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf, - nxt_queue_t *sockets, nxt_array_t *array, nxt_work_handler_t handler); +static nxt_int_t nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf, + nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, + nxt_work_handler_t handler); static nxt_int_t nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf, nxt_queue_t *sockets); @@ -81,8 +81,7 @@ static void nxt_router_apps_sort(nxt_router_t *router, nxt_router_temp_conf_t *tmcf); static void nxt_router_engines_post(nxt_router_temp_conf_t *tmcf); -static void nxt_router_engine_post(nxt_router_temp_conf_t *tmcf, - nxt_router_engine_conf_t *recf); +static void nxt_router_engine_post(nxt_router_engine_conf_t *recf); static void nxt_router_thread_start(void *data); static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj, @@ -944,26 +943,17 @@ static nxt_int_t nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf) { - nxt_mp_t *mp; nxt_int_t ret; nxt_thread_spinlock_t *lock; - recf->creating = nxt_array_create(tmcf->mem_pool, 4, - sizeof(nxt_joint_job_t)); - if (nxt_slow_path(recf->creating == NULL)) { - return NXT_ERROR; - } - - mp = tmcf->conf->mem_pool; - - ret = nxt_router_engine_joints_create(mp, tmcf, recf, &tmcf->creating, - recf->creating, nxt_router_listen_socket_create); + ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->creating, + nxt_router_listen_socket_create); if (nxt_slow_path(ret != NXT_OK)) { return ret; } - ret = nxt_router_engine_joints_create(mp, tmcf, recf, &tmcf->updating, - recf->creating, nxt_router_listen_socket_create); + ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->updating, + nxt_router_listen_socket_create); if (nxt_slow_path(ret != NXT_OK)) { return ret; } @@ -985,42 +975,21 @@ static nxt_int_t nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf) { - nxt_mp_t *mp; nxt_int_t ret; nxt_thread_spinlock_t *lock; - recf->creating = nxt_array_create(tmcf->mem_pool, 4, - sizeof(nxt_joint_job_t)); - if (nxt_slow_path(recf->creating == NULL)) { - return NXT_ERROR; - } - - mp = tmcf->conf->mem_pool; - - ret = nxt_router_engine_joints_create(mp, tmcf, recf, &tmcf->creating, - recf->creating, nxt_router_listen_socket_create); + ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->creating, + nxt_router_listen_socket_create); if (nxt_slow_path(ret != NXT_OK)) { return ret; } - recf->updating = nxt_array_create(tmcf->mem_pool, 4, - sizeof(nxt_joint_job_t)); - if (nxt_slow_path(recf->updating == NULL)) { - return NXT_ERROR; - } - - ret = nxt_router_engine_joints_create(mp, tmcf, recf, &tmcf->updating, - recf->updating, nxt_router_listen_socket_update); + ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->updating, + nxt_router_listen_socket_update); if (nxt_slow_path(ret != NXT_OK)) { return ret; } - recf->deleting = nxt_array_create(tmcf->mem_pool, 4, - sizeof(nxt_joint_job_t)); - if (nxt_slow_path(recf->deleting == NULL)) { - return NXT_ERROR; - } - ret = nxt_router_engine_joints_delete(tmcf, recf, &tmcf->deleting); if (nxt_slow_path(ret != NXT_OK)) { return ret; @@ -1044,12 +1013,6 @@ nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf, { nxt_int_t ret; - recf->deleting = nxt_array_create(tmcf->mem_pool, - 4, sizeof(nxt_joint_job_t)); - if (nxt_slow_path(recf->deleting == NULL)) { - return NXT_ERROR; - } - ret = nxt_router_engine_joints_delete(tmcf, recf, &tmcf->updating); if (nxt_slow_path(ret != NXT_OK)) { return ret; @@ -1060,8 +1023,8 @@ nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf, static nxt_int_t -nxt_router_engine_joints_create(nxt_mp_t *mp, nxt_router_temp_conf_t *tmcf, - nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, nxt_array_t *array, +nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf, + nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, nxt_work_handler_t handler) { nxt_joint_job_t *job; @@ -1072,19 +1035,24 @@ nxt_router_engine_joints_create(nxt_mp_t *mp, nxt_router_temp_conf_t *tmcf, qlk != nxt_queue_tail(sockets); qlk = nxt_queue_next(qlk)) { - job = nxt_array_add(array); + job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t)); if (nxt_slow_path(job == NULL)) { return NXT_ERROR; } + job->work.next = recf->jobs; + recf->jobs = &job->work; + job->task = tmcf->engine->task; - job->work.next = NULL; job->work.handler = handler; job->work.task = &job->task; job->work.obj = job; job->tmcf = tmcf; - joint = nxt_mp_alloc(mp, sizeof(nxt_socket_conf_joint_t)); + tmcf->count++; + + joint = nxt_mp_alloc(tmcf->conf->mem_pool, + sizeof(nxt_socket_conf_joint_t)); if (nxt_slow_path(joint == NULL)) { return NXT_ERROR; } @@ -1129,18 +1097,22 @@ nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf, qlk != nxt_queue_tail(sockets); qlk = nxt_queue_next(qlk)) { - job = nxt_array_add(recf->deleting); + job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t)); if (nxt_slow_path(job == NULL)) { return NXT_ERROR; } + job->work.next = recf->jobs; + recf->jobs = &job->work; + job->task = tmcf->engine->task; - job->work.next = NULL; job->work.handler = nxt_router_listen_socket_delete; job->work.task = &job->task; job->work.obj = job; job->work.data = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); job->tmcf = tmcf; + + tmcf->count++; } return NXT_OK; @@ -1231,47 +1203,22 @@ nxt_router_engines_post(nxt_router_temp_conf_t *tmcf) recf = tmcf->engines->elts; for (n = tmcf->engines->nelts; n != 0; n--) { - nxt_router_engine_post(tmcf, recf); + nxt_router_engine_post(recf); recf++; } } static void -nxt_router_engine_post(nxt_router_temp_conf_t *tmcf, - nxt_router_engine_conf_t *recf) +nxt_router_engine_post(nxt_router_engine_conf_t *recf) { - nxt_uint_t n; - nxt_joint_job_t *job; - - if (recf->creating != NULL) { - job = recf->creating->elts; + nxt_work_t *work, *next; - for (n = recf->creating->nelts; n != 0; n--) { - nxt_event_engine_post(recf->engine, &job->work); - job++; - tmcf->count++; - } - } - - if (recf->updating != NULL) { - job = recf->updating->elts; - - for (n = recf->updating->nelts; n != 0; n--) { - nxt_event_engine_post(recf->engine, &job->work); - job++; - tmcf->count++; - } - } - - if (recf->deleting != NULL) { - job = recf->deleting->elts; + for (work = recf->jobs; work != NULL; work = next) { + next = work->next; + work->next = NULL; - for (n = recf->deleting->nelts; n != 0; n--) { - nxt_event_engine_post(recf->engine, &job->work); - job++; - tmcf->count++; - } + nxt_event_engine_post(recf->engine, work); } } diff --git a/src/nxt_router.h b/src/nxt_router.h index 6f7c8d61..5c96bdac 100644 --- a/src/nxt_router.h +++ b/src/nxt_router.h @@ -35,9 +35,7 @@ typedef struct { typedef struct { nxt_event_engine_t *engine; - nxt_array_t *creating; /* of nxt_joint_job_t */ - nxt_array_t *updating; /* of nxt_joint_job_t */ - nxt_array_t *deleting; /* of nxt_joint_job_t */ + nxt_work_t *jobs; } nxt_router_engine_conf_t; |