diff options
author | Igor Sysoev <igor@sysoev.ru> | 2017-07-14 17:17:15 +0300 |
---|---|---|
committer | Igor Sysoev <igor@sysoev.ru> | 2017-07-14 17:17:15 +0300 |
commit | 668aabac3c492ad1325c09eacadbfdd9c60004b8 (patch) | |
tree | 8320479f58f609282d27ef87e7a0da53fca2b10c /src/nxt_router.c | |
parent | d669045b758effb22e1648a64f2219d0c1b01a9f (diff) | |
download | unit-668aabac3c492ad1325c09eacadbfdd9c60004b8.tar.gz unit-668aabac3c492ad1325c09eacadbfdd9c60004b8.tar.bz2 |
Router: using joint jobs to pass listening socket handlers to
worker engines.
Diffstat (limited to '')
-rw-r--r-- | src/nxt_router.c | 140 |
1 files changed, 65 insertions, 75 deletions
diff --git a/src/nxt_router.c b/src/nxt_router.c index ab9bcf82..6f285c22 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -359,11 +359,11 @@ nxt_router_conf_new(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, static void nxt_router_conf_wait(nxt_task_t *task, void *obj, void *data) { - nxt_router_temp_conf_t *tmcf; + nxt_joint_job_t *job; - tmcf = obj; + job = obj; - nxt_router_conf_success(task, tmcf); + nxt_router_conf_success(task, job->tmcf); } @@ -948,7 +948,8 @@ nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf, nxt_int_t ret; nxt_thread_spinlock_t *lock; - recf->creating = nxt_array_create(tmcf->mem_pool, 4, sizeof(nxt_work_t)); + recf->creating = nxt_array_create(tmcf->mem_pool, 4, + sizeof(nxt_joint_job_t)); if (nxt_slow_path(recf->creating == NULL)) { return NXT_ERROR; } @@ -988,7 +989,8 @@ nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf, nxt_int_t ret; nxt_thread_spinlock_t *lock; - recf->creating = nxt_array_create(tmcf->mem_pool, 4, sizeof(nxt_work_t)); + recf->creating = nxt_array_create(tmcf->mem_pool, 4, + sizeof(nxt_joint_job_t)); if (nxt_slow_path(recf->creating == NULL)) { return NXT_ERROR; } @@ -1001,7 +1003,8 @@ nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf, return ret; } - recf->updating = nxt_array_create(tmcf->mem_pool, 4, sizeof(nxt_work_t)); + recf->updating = nxt_array_create(tmcf->mem_pool, 4, + sizeof(nxt_joint_job_t)); if (nxt_slow_path(recf->updating == NULL)) { return NXT_ERROR; } @@ -1012,7 +1015,8 @@ nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf, return ret; } - recf->deleting = nxt_array_create(tmcf->mem_pool, 4, sizeof(nxt_work_t)); + recf->deleting = nxt_array_create(tmcf->mem_pool, 4, + sizeof(nxt_joint_job_t)); if (nxt_slow_path(recf->deleting == NULL)) { return NXT_ERROR; } @@ -1040,7 +1044,8 @@ nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf, { nxt_int_t ret; - recf->deleting = nxt_array_create(tmcf->mem_pool, 4, sizeof(nxt_work_t)); + recf->deleting = nxt_array_create(tmcf->mem_pool, + 4, sizeof(nxt_joint_job_t)); if (nxt_slow_path(recf->deleting == NULL)) { return NXT_ERROR; } @@ -1059,7 +1064,7 @@ nxt_router_engine_joints_create(nxt_mp_t *mp, nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, nxt_array_t *array, nxt_work_handler_t handler) { - nxt_work_t *work, *back; + nxt_joint_job_t *job; nxt_queue_link_t *qlk; nxt_socket_conf_joint_t *joint; @@ -1067,33 +1072,24 @@ nxt_router_engine_joints_create(nxt_mp_t *mp, nxt_router_temp_conf_t *tmcf, qlk != nxt_queue_tail(sockets); qlk = nxt_queue_next(qlk)) { - back = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_work_t)); - if (nxt_slow_path(back == NULL)) { + job = nxt_array_add(array); + if (nxt_slow_path(job == NULL)) { return NXT_ERROR; } - back->next = NULL; - back->handler = nxt_router_conf_wait; - back->task = &tmcf->engine->task; - back->obj = tmcf; - back->data = NULL; - - work = nxt_array_add(array); - if (nxt_slow_path(work == NULL)) { - return NXT_ERROR; - } - - work->next = NULL; - work->handler = handler; - work->task = &recf->engine->task; - work->obj = back; + job->task = tmcf->engine->task; + job->work.next = NULL; + job->work.handler = handler; + job->work.task = &job->task; + job->work.obj = job; + job->tmcf = tmcf; joint = nxt_mp_alloc(mp, sizeof(nxt_socket_conf_joint_t)); if (nxt_slow_path(joint == NULL)) { return NXT_ERROR; } - work->data = joint; + job->work.data = joint; joint->count = 1; joint->socket_conf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); @@ -1126,34 +1122,25 @@ static nxt_int_t nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf, nxt_queue_t *sockets) { - nxt_work_t *work, *back; + nxt_joint_job_t *job; nxt_queue_link_t *qlk; for (qlk = nxt_queue_first(sockets); qlk != nxt_queue_tail(sockets); qlk = nxt_queue_next(qlk)) { - back = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_work_t)); - if (nxt_slow_path(back == NULL)) { + job = nxt_array_add(recf->deleting); + if (nxt_slow_path(job == NULL)) { return NXT_ERROR; } - back->next = NULL; - back->handler = nxt_router_conf_wait; - back->task = &tmcf->engine->task; - back->obj = tmcf; - back->data = NULL; - - work = nxt_array_add(recf->deleting); - if (nxt_slow_path(work == NULL)) { - return NXT_ERROR; - } - - work->next = NULL; - work->handler = nxt_router_listen_socket_delete; - work->task = &recf->engine->task; - work->obj = back; - work->data = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); + job->task = tmcf->engine->task; + job->work.next = NULL; + job->work.handler = nxt_router_listen_socket_delete; + job->work.task = &job->task; + job->work.obj = job; + job->work.data = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); + job->tmcf = tmcf; } return NXT_OK; @@ -1254,35 +1241,35 @@ static void nxt_router_engine_post(nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf) { - nxt_uint_t n; - nxt_work_t *work; + nxt_uint_t n; + nxt_joint_job_t *job; if (recf->creating != NULL) { - work = recf->creating->elts; + job = recf->creating->elts; for (n = recf->creating->nelts; n != 0; n--) { - nxt_event_engine_post(recf->engine, work); - work++; + nxt_event_engine_post(recf->engine, &job->work); + job++; tmcf->count++; } } if (recf->updating != NULL) { - work = recf->updating->elts; + job = recf->updating->elts; for (n = recf->updating->nelts; n != 0; n--) { - nxt_event_engine_post(recf->engine, work); - work++; + nxt_event_engine_post(recf->engine, &job->work); + job++; tmcf->count++; } } if (recf->deleting != NULL) { - work = recf->deleting->elts; + job = recf->deleting->elts; for (n = recf->deleting->nelts; n != 0; n--) { - nxt_event_engine_post(recf->engine, work); - work++; + nxt_event_engine_post(recf->engine, &job->work); + job++; tmcf->count++; } } @@ -1355,13 +1342,12 @@ nxt_router_thread_start(void *data) static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data) { - nxt_work_t *work; + nxt_joint_job_t *job; nxt_listen_event_t *listen; nxt_listen_socket_t *ls; - nxt_router_temp_conf_t *tmcf; nxt_socket_conf_joint_t *joint; - work = obj; + job = obj; joint = data; ls = &joint->socket_conf->listen; @@ -1374,8 +1360,10 @@ nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data) listen->socket.data = joint; - tmcf = work->obj; - nxt_event_engine_post(tmcf->engine, work); + job->work.next = NULL; + job->work.handler = nxt_router_conf_wait; + + nxt_event_engine_post(job->tmcf->engine, &job->work); } @@ -1407,13 +1395,12 @@ nxt_router_listen_event(nxt_queue_t *listen_connections, static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj, void *data) { - nxt_work_t *work; + nxt_joint_job_t *job; nxt_event_engine_t *engine; nxt_listen_event_t *listen; - nxt_router_temp_conf_t *tmcf; nxt_socket_conf_joint_t *joint, *old; - work = obj; + job = obj; joint = data; engine = task->thread->engine; @@ -1424,8 +1411,10 @@ nxt_router_listen_socket_update(nxt_task_t *task, void *obj, void *data) old = listen->socket.data; listen->socket.data = joint; - tmcf = work->obj; - nxt_event_engine_post(tmcf->engine, work); + job->work.next = NULL; + job->work.handler = nxt_router_conf_wait; + + nxt_event_engine_post(job->tmcf->engine, &job->work); nxt_router_conf_release(task, old); } @@ -1434,13 +1423,12 @@ nxt_router_listen_socket_update(nxt_task_t *task, void *obj, void *data) static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, void *data) { - nxt_work_t *work; - nxt_socket_conf_t *skcf; - nxt_listen_event_t *listen; - nxt_event_engine_t *engine; - nxt_router_temp_conf_t *tmcf; + nxt_joint_job_t *job; + nxt_socket_conf_t *skcf; + nxt_listen_event_t *listen; + nxt_event_engine_t *engine; - work = obj; + job = obj; skcf = data; engine = task->thread->engine; @@ -1454,8 +1442,10 @@ nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, void *data) nxt_timer_add(engine, &listen->timer, 0); - tmcf = work->obj; - nxt_event_engine_post(tmcf->engine, work); + job->work.next = NULL; + job->work.handler = nxt_router_conf_wait; + + nxt_event_engine_post(job->tmcf->engine, &job->work); } |