diff options
author | Igor Sysoev <igor@sysoev.ru> | 2018-05-30 18:46:05 +0300 |
---|---|---|
committer | Igor Sysoev <igor@sysoev.ru> | 2018-05-30 18:46:05 +0300 |
commit | ff6ca2a82c956807e28bb290efa70e791e1f8f8d (patch) | |
tree | 06ac66b0e5c8382d79b203fdf4d717b75f188160 | |
parent | e54fb892a44705c5d6f81e0351070e368a6b38f1 (diff) | |
download | unit-ff6ca2a82c956807e28bb290efa70e791e1f8f8d.tar.gz unit-ff6ca2a82c956807e28bb290efa70e791e1f8f8d.tar.bz2 |
Fixed keep-alive hanging after reconfiguration.
Diffstat (limited to '')
-rw-r--r-- | src/nxt_conn.h | 4 | ||||
-rw-r--r-- | src/nxt_conn_accept.c | 11 | ||||
-rw-r--r-- | src/nxt_h1proto.c | 118 | ||||
-rw-r--r-- | src/nxt_http.h | 5 | ||||
-rw-r--r-- | src/nxt_http_request.c | 7 | ||||
-rw-r--r-- | src/nxt_router.c | 90 | ||||
-rw-r--r-- | src/nxt_router.h | 3 |
7 files changed, 136 insertions, 102 deletions
diff --git a/src/nxt_conn.h b/src/nxt_conn.h index 92686703..534fe85f 100644 --- a/src/nxt_conn.h +++ b/src/nxt_conn.h @@ -106,6 +106,7 @@ typedef struct { uint32_t ready; uint32_t batch; + uint32_t count; /* An accept() interface is cached to minimize memory accesses. */ nxt_work_handler_t accept; @@ -156,8 +157,7 @@ struct nxt_conn_s { nxt_task_t task; nxt_log_t log; - /* STUB: socket.data should be used later. */ - void *joint; + nxt_listen_event_t *listen; nxt_sockaddr_t *remote; nxt_sockaddr_t *local; diff --git a/src/nxt_conn_accept.c b/src/nxt_conn_accept.c index a2aaa940..4ad2d02f 100644 --- a/src/nxt_conn_accept.c +++ b/src/nxt_conn_accept.c @@ -45,6 +45,7 @@ nxt_listen_event(nxt_task_t *task, nxt_listen_socket_t *ls) engine = task->thread->engine; lev->batch = engine->batch; + lev->count = 1; lev->socket.read_work_queue = &engine->accept_work_queue; lev->socket.read_handler = nxt_conn_listen_handler; @@ -194,19 +195,23 @@ nxt_conn_accept(nxt_task_t *task, nxt_listen_event_t *lev, nxt_conn_t *c) nxt_queue_insert_head(&task->thread->engine->idle_connections, &c->link); + c->listen = lev; + lev->count++; + c->socket.data = NULL; + c->read_work_queue = lev->work_queue; c->write_work_queue = lev->work_queue; if (lev->listen->read_after_accept) { //c->socket.read_ready = 1; -// lev->listen->handler(task, c, lev->socket.data); +// lev->listen->handler(task, c, lev); nxt_work_queue_add(c->read_work_queue, lev->listen->handler, - &c->task, c, lev->socket.data); + &c->task, c, lev); } else { nxt_work_queue_add(c->write_work_queue, lev->listen->handler, - &c->task, c, lev->socket.data); + &c->task, c, lev); } next = nxt_conn_accept_next(task, lev); diff --git a/src/nxt_h1proto.c b/src/nxt_h1proto.c index 10099a2a..c0be75e2 100644 --- a/src/nxt_h1proto.c +++ b/src/nxt_h1proto.c @@ -16,7 +16,8 @@ static ssize_t nxt_h1p_conn_io_read_handler(nxt_conn_t *c); static void nxt_h1p_conn_proto_init(nxt_task_t *task, void *obj, void *data); static void nxt_h1p_conn_request_init(nxt_task_t *task, void *obj, void *data); -static void nxt_h1p_conn_header_parse(nxt_task_t *task, void *obj, void *data); +static void nxt_h1p_conn_request_header_parse(nxt_task_t *task, void *obj, + void *data); static nxt_int_t nxt_h1p_header_process(nxt_h1proto_t *h1p, nxt_http_request_t *r); static nxt_int_t nxt_h1p_header_buffer_test(nxt_task_t *task, @@ -40,19 +41,24 @@ static nxt_off_t nxt_h1p_request_body_bytes_sent(nxt_task_t *task, nxt_http_proto_t proto); static void nxt_h1p_request_discard(nxt_task_t *task, nxt_http_request_t *r, nxt_buf_t *last); -static void nxt_h1p_request_close(nxt_task_t *task, nxt_http_proto_t proto); +static void nxt_h1p_request_close(nxt_task_t *task, nxt_http_proto_t proto, + nxt_socket_conf_joint_t *joint); static void nxt_h1p_keepalive(nxt_task_t *task, nxt_h1proto_t *h1p, nxt_conn_t *c); static void nxt_h1p_conn_error(nxt_task_t *task, void *obj, void *data); static void nxt_h1p_conn_timeout(nxt_task_t *task, void *obj, void *data); +static nxt_msec_t nxt_h1p_conn_timeout_value(nxt_conn_t *c, + uintptr_t data); static void nxt_h1p_close(nxt_task_t *task, nxt_conn_t *c); +static void nxt_h1p_conn_free(nxt_task_t *task, void *obj, void *data); static void nxt_h1p_conn_request_error(nxt_task_t *task, void *obj, void *data); static void nxt_h1p_conn_request_timeout(nxt_task_t *task, void *obj, void *data); static void nxt_h1p_conn_request_send_timeout(nxt_task_t *task, void *obj, void *data); +static nxt_msec_t nxt_h1p_conn_request_timeout_value(nxt_conn_t *c, + uintptr_t data); nxt_inline void nxt_h1p_request_error(nxt_task_t *task, nxt_http_request_t *r); -static nxt_msec_t nxt_h1p_conn_timeout_value(nxt_conn_t *c, uintptr_t data); static const nxt_conn_state_t nxt_h1p_idle_state; @@ -60,6 +66,7 @@ static const nxt_conn_state_t nxt_h1p_header_parse_state; static const nxt_conn_state_t nxt_h1p_read_body_state; static const nxt_conn_state_t nxt_h1p_send_state; static const nxt_conn_state_t nxt_h1p_keepalive_state; +static const nxt_conn_state_t nxt_h1p_close_state; const nxt_http_proto_body_read_t nxt_http_proto_body_read[3] = { @@ -144,19 +151,17 @@ nxt_http_conn_init(nxt_task_t *task, void *obj, void *data) nxt_conn_t *c; nxt_socket_conf_t *skcf; nxt_event_engine_t *engine; + nxt_listen_event_t *lev; nxt_socket_conf_joint_t *joint; c = obj; - joint = data; + lev = data; nxt_debug(task, "http conn init"); - c->joint = joint; - joint->count++; - + joint = lev->socket.data; skcf = joint->socket_conf; c->local = skcf->sockaddr; - c->socket.data = NULL; engine = task->thread->engine; c->read_work_queue = &engine->fast_work_queue; @@ -192,7 +197,7 @@ nxt_h1p_conn_io_read_handler(nxt_conn_t *c) nxt_buf_t *b; nxt_socket_conf_joint_t *joint; - joint = c->joint; + joint = c->listen->socket.data; size = joint->socket_conf->header_buffer_size; b = nxt_buf_mem_alloc(c->mem_pool, size, 0); @@ -257,15 +262,18 @@ nxt_h1p_conn_request_init(nxt_task_t *task, void *obj, void *data) h1p->request = r; r->proto.h1 = h1p; - joint = c->joint; - r->socket_conf = joint->socket_conf; - r->remote = c->remote; ret = nxt_http_parse_request_init(&h1p->parser, r->mem_pool); if (nxt_fast_path(ret == NXT_OK)) { - nxt_h1p_conn_header_parse(task, c, h1p); + joint = c->listen->socket.data; + joint->count++; + + r->conf = joint; + c->local = joint->socket_conf->sockaddr; + + nxt_h1p_conn_request_header_parse(task, c, h1p); return; } @@ -283,18 +291,18 @@ nxt_h1p_conn_request_init(nxt_task_t *task, void *obj, void *data) static const nxt_conn_state_t nxt_h1p_header_parse_state nxt_aligned(64) = { - .ready_handler = nxt_h1p_conn_header_parse, + .ready_handler = nxt_h1p_conn_request_header_parse, .close_handler = nxt_h1p_conn_request_error, .error_handler = nxt_h1p_conn_request_error, .timer_handler = nxt_h1p_conn_request_timeout, - .timer_value = nxt_h1p_conn_timeout_value, + .timer_value = nxt_h1p_conn_request_timeout_value, .timer_data = offsetof(nxt_socket_conf_t, header_read_timeout), }; static void -nxt_h1p_conn_header_parse(nxt_task_t *task, void *obj, void *data) +nxt_h1p_conn_request_header_parse(nxt_task_t *task, void *obj, void *data) { nxt_int_t ret; nxt_conn_t *c; @@ -340,7 +348,7 @@ nxt_h1p_conn_header_parse(nxt_task_t *task, void *obj, void *data) goto error; case NXT_AGAIN: - status = nxt_h1p_header_buffer_test(task, h1p, c, r->socket_conf); + status = nxt_h1p_header_buffer_test(task, h1p, c, r->conf->socket_conf); if (nxt_fast_path(status == NXT_OK)) { c->read_state = &nxt_h1p_header_parse_state; @@ -504,7 +512,7 @@ nxt_h1p_request_body_read(nxt_task_t *task, nxt_http_request_t *r) goto ready; } - if (r->content_length_n > (nxt_off_t) r->socket_conf->max_body_size) { + if (r->content_length_n > (nxt_off_t) r->conf->socket_conf->max_body_size) { status = NXT_HTTP_PAYLOAD_TOO_LARGE; goto error; } @@ -575,7 +583,7 @@ static const nxt_conn_state_t nxt_h1p_read_body_state .error_handler = nxt_h1p_conn_request_error, .timer_handler = nxt_h1p_conn_request_timeout, - .timer_value = nxt_h1p_conn_timeout_value, + .timer_value = nxt_h1p_conn_request_timeout_value, .timer_data = offsetof(nxt_socket_conf_t, body_read_timeout), .timer_autoreset = 1, }; @@ -848,7 +856,7 @@ static const nxt_conn_state_t nxt_h1p_send_state .error_handler = nxt_h1p_conn_request_error, .timer_handler = nxt_h1p_conn_request_send_timeout, - .timer_value = nxt_h1p_conn_timeout_value, + .timer_value = nxt_h1p_conn_request_timeout_value, .timer_data = offsetof(nxt_socket_conf_t, send_timeout), .timer_autoreset = 1, }; @@ -995,7 +1003,8 @@ nxt_h1p_request_discard(nxt_task_t *task, nxt_http_request_t *r, static void -nxt_h1p_request_close(nxt_task_t *task, nxt_http_proto_t proto) +nxt_h1p_request_close(nxt_task_t *task, nxt_http_proto_t proto, + nxt_socket_conf_joint_t *joint) { nxt_conn_t *c; nxt_h1proto_t *h1p; @@ -1005,6 +1014,8 @@ nxt_h1p_request_close(nxt_task_t *task, nxt_http_proto_t proto) h1p = proto.h1; h1p->request = NULL; + nxt_router_conf_release(task, joint); + c = h1p->conn; if (h1p->keepalive) { @@ -1119,6 +1130,17 @@ nxt_h1p_conn_timeout(nxt_task_t *task, void *obj, void *data) } +static nxt_msec_t +nxt_h1p_conn_timeout_value(nxt_conn_t *c, uintptr_t data) +{ + nxt_socket_conf_joint_t *joint; + + joint = c->listen->socket.data; + + return nxt_value_at(nxt_msec_t, joint->socket_conf, data); +} + + static void nxt_h1p_close(nxt_task_t *task, nxt_conn_t *c) { @@ -1126,12 +1148,44 @@ nxt_h1p_close(nxt_task_t *task, nxt_conn_t *c) c->socket.data = NULL; - c->write_state = &nxt_router_conn_close_state; + c->write_state = &nxt_h1p_close_state; nxt_conn_close(task->thread->engine, c); } +static const nxt_conn_state_t nxt_h1p_close_state + nxt_aligned(64) = +{ + .ready_handler = nxt_h1p_conn_free, +}; + + +static void +nxt_h1p_conn_free(nxt_task_t *task, void *obj, void *data) +{ + nxt_conn_t *c; + nxt_listen_event_t *lev; + nxt_event_engine_t *engine; + + c = obj; + + nxt_debug(task, "h1p conn free"); + + nxt_queue_remove(&c->link); + + engine = task->thread->engine; + + nxt_sockaddr_cache_free(engine, c); + + lev = c->listen; + + nxt_conn_free(task, c); + + nxt_router_listen_event_release(&engine->task, lev, NULL); +} + + static void nxt_h1p_conn_request_error(nxt_task_t *task, void *obj, void *data) { @@ -1204,19 +1258,19 @@ nxt_h1p_conn_request_send_timeout(nxt_task_t *task, void *obj, void *data) } -nxt_inline void -nxt_h1p_request_error(nxt_task_t *task, nxt_http_request_t *r) +static nxt_msec_t +nxt_h1p_conn_request_timeout_value(nxt_conn_t *c, uintptr_t data) { - r->state->error_handler(task, r, r->proto.h1); -} + nxt_h1proto_t *h1p; + h1p = c->socket.data; -static nxt_msec_t -nxt_h1p_conn_timeout_value(nxt_conn_t *c, uintptr_t data) -{ - nxt_socket_conf_joint_t *joint; + return nxt_value_at(nxt_msec_t, h1p->request->conf->socket_conf, data); +} - joint = c->joint; - return nxt_value_at(nxt_msec_t, joint->socket_conf, data); +nxt_inline void +nxt_h1p_request_error(nxt_task_t *task, nxt_http_request_t *r) +{ + r->state->error_handler(task, r, r->proto.h1); } diff --git a/src/nxt_http.h b/src/nxt_http.h index 57abb8e0..57c624d7 100644 --- a/src/nxt_http.h +++ b/src/nxt_http.h @@ -101,7 +101,7 @@ typedef struct { struct nxt_http_request_s { nxt_http_proto_t proto; - nxt_socket_conf_t *socket_conf; + nxt_socket_conf_joint_t *conf; nxt_mp_t *mem_pool; @@ -153,7 +153,7 @@ typedef nxt_off_t (*nxt_http_proto_body_bytes_sent_t)(nxt_task_t *task, typedef void (*nxt_http_proto_discard_t)(nxt_task_t *task, nxt_http_request_t *r, nxt_buf_t *last); typedef void (*nxt_http_proto_close_t)(nxt_task_t *task, - nxt_http_proto_t proto); + nxt_http_proto_t proto, nxt_socket_conf_joint_t *joint); nxt_int_t nxt_http_init(nxt_task_t *task, nxt_runtime_t *rt); @@ -184,7 +184,6 @@ nxt_int_t nxt_http_request_content_length(void *ctx, nxt_http_field_t *field, extern nxt_lvlhsh_t nxt_response_fields_hash; -extern const nxt_conn_state_t nxt_router_conn_close_state; extern const nxt_http_proto_body_read_t nxt_http_proto_body_read[]; extern const nxt_http_proto_local_addr_t nxt_http_proto_local_addr[]; diff --git a/src/nxt_http_request.c b/src/nxt_http_request.c index 9fc4a0e1..d1a30f0e 100644 --- a/src/nxt_http_request.c +++ b/src/nxt_http_request.c @@ -445,6 +445,7 @@ nxt_http_request_close_handler(nxt_task_t *task, void *obj, void *data) nxt_http_proto_t proto; nxt_http_request_t *r; nxt_http_proto_close_t handler; + nxt_socket_conf_joint_t *conf; nxt_router_access_log_t *access_log; r = obj; @@ -452,10 +453,12 @@ nxt_http_request_close_handler(nxt_task_t *task, void *obj, void *data) nxt_debug(task, "http request close handler"); + conf = r->conf; + if (!r->logged) { r->logged = 1; - access_log = r->socket_conf->router_conf->access_log; + access_log = conf->socket_conf->router_conf->access_log; if (access_log != NULL) { access_log->handler(task, r, access_log); @@ -468,7 +471,7 @@ nxt_http_request_close_handler(nxt_task_t *task, void *obj, void *data) nxt_mp_release(r->mem_pool); if (proto.any != NULL) { - handler(task, proto); + handler(task, proto, conf); } } diff --git a/src/nxt_router.c b/src/nxt_router.c index 099aa339..fa9dd17f 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -208,8 +208,6 @@ static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data); static void nxt_router_listen_socket_release(nxt_task_t *task, nxt_socket_conf_t *skcf); -static void nxt_router_conf_release(nxt_task_t *task, - nxt_socket_conf_joint_t *joint); static void nxt_router_access_log_writer(nxt_task_t *task, nxt_http_request_t *r, nxt_router_access_log_t *access_log); @@ -254,7 +252,6 @@ static nxt_int_t nxt_perl_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, static nxt_int_t nxt_ruby_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, nxt_app_wmsg_t *wmsg); -static void nxt_router_conn_free(nxt_task_t *task, void *obj, void *data); static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data); static void nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, void *data); @@ -2611,21 +2608,21 @@ nxt_router_listen_socket_close(nxt_task_t *task, void *obj, void *data) timer = obj; lev = nxt_timer_data(timer, nxt_listen_event_t, timer); - joint = lev->socket.data; nxt_debug(task, "engine %p: listen socket close: %d", task->thread->engine, lev->socket.fd); nxt_queue_remove(&lev->link); + joint = lev->socket.data; + lev->socket.data = NULL; + /* 'task' refers to lev->task and we cannot use after nxt_free() */ task = &task->thread->engine->task; nxt_router_listen_socket_release(task, joint->socket_conf); - nxt_free(lev); - - nxt_router_conf_release(task, joint); + nxt_router_listen_event_release(task, lev, joint); } @@ -2656,13 +2653,36 @@ nxt_router_listen_socket_release(nxt_task_t *task, nxt_socket_conf_t *skcf) } -static void +void +nxt_router_listen_event_release(nxt_task_t *task, nxt_listen_event_t *lev, + nxt_socket_conf_joint_t *joint) +{ + nxt_event_engine_t *engine; + + nxt_debug(task, "listen event count: %D", lev->count); + + if (--lev->count == 0) { + nxt_free(lev); + } + + if (joint != NULL) { + nxt_router_conf_release(task, joint); + } + + engine = task->thread->engine; + + if (engine->shutdown && nxt_queue_is_empty(&engine->joints)) { + nxt_thread_exit(task->thread); + } +} + + +void nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint) { nxt_app_t *app; nxt_socket_conf_t *skcf; nxt_router_conf_t *rtcf; - nxt_event_engine_t *engine; nxt_thread_spinlock_t *lock; nxt_debug(task, "conf joint %p count: %D", joint, joint->count); @@ -2678,8 +2698,6 @@ nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint) * section protected by the spinlock because its memory pool may * be already destroyed by another thread. */ - engine = joint->engine; - skcf = joint->socket_conf; app = skcf->application; rtcf = skcf->router_conf; @@ -2720,10 +2738,6 @@ nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint) nxt_mp_destroy(rtcf->mem_pool); } - - if (engine->shutdown && nxt_queue_is_empty(&engine->joints)) { - nxt_thread_exit(task->thread); - } } @@ -4138,7 +4152,7 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_app_parse_ctx_t *ar) nxt_req_conn_link_t *rc; r = ar->request; - app = r->socket_conf->application; + app = r->conf->socket_conf->application; if (app == NULL) { nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); @@ -4708,50 +4722,6 @@ fail: } -const nxt_conn_state_t nxt_router_conn_close_state - nxt_aligned(64) = -{ - .ready_handler = nxt_router_conn_free, -}; - - -static void -nxt_router_conn_mp_cleanup(nxt_task_t *task, void *obj, void *data) -{ - nxt_socket_conf_joint_t *joint; - - joint = obj; - - nxt_router_conf_release(task, joint); -} - - -static void -nxt_router_conn_free(nxt_task_t *task, void *obj, void *data) -{ - nxt_conn_t *c; - nxt_event_engine_t *engine; - nxt_socket_conf_joint_t *joint; - - c = obj; - - nxt_debug(task, "router conn close done"); - - nxt_queue_remove(&c->link); - - engine = task->thread->engine; - - nxt_sockaddr_cache_free(engine, c); - - joint = c->joint; - - nxt_mp_cleanup(c->mem_pool, nxt_router_conn_mp_cleanup, - &engine->task, joint, NULL); - - nxt_conn_free(task, c); -} - - static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data) { diff --git a/src/nxt_router.h b/src/nxt_router.h index 8f10229c..4d82cb31 100644 --- a/src/nxt_router.h +++ b/src/nxt_router.h @@ -182,6 +182,9 @@ void nxt_router_access_log_reopen_handler(nxt_task_t *task, void nxt_router_process_http_request(nxt_task_t *task, nxt_app_parse_ctx_t *ar); void nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port); void nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i); +void nxt_router_listen_event_release(nxt_task_t *task, nxt_listen_event_t *lev, + nxt_socket_conf_joint_t *joint); +void nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint); #endif /* _NXT_ROUTER_H_INCLUDED_ */ |