diff options
author | Igor Sysoev <igor@sysoev.ru> | 2017-02-22 15:09:59 +0300 |
---|---|---|
committer | Igor Sysoev <igor@sysoev.ru> | 2017-02-22 15:09:59 +0300 |
commit | 029942f4eb7196c2cff0d0e26bc6ff274138f7d8 (patch) | |
tree | f4686c4d7b9cd574fe94c6f4918479a580fecf75 /src/nxt_cycle.c | |
parent | 059a8642898a6bd4b47d13a1c1d599cd44af7e1c (diff) | |
download | unit-029942f4eb7196c2cff0d0e26bc6ff274138f7d8.tar.gz unit-029942f4eb7196c2cff0d0e26bc6ff274138f7d8.tar.bz2 |
I/O operations refactoring.
Diffstat (limited to 'src/nxt_cycle.c')
-rw-r--r-- | src/nxt_cycle.c | 147 |
1 files changed, 100 insertions, 47 deletions
diff --git a/src/nxt_cycle.c b/src/nxt_cycle.c index a39bb02a..81ad8ce2 100644 --- a/src/nxt_cycle.c +++ b/src/nxt_cycle.c @@ -11,9 +11,9 @@ #include <nxt_master_process.h> -static nxt_int_t nxt_cycle_inherited_listen_sockets(nxt_thread_t *thr, +static nxt_int_t nxt_cycle_inherited_listen_sockets(nxt_task_t *task, nxt_cycle_t *cycle); -static nxt_int_t nxt_cycle_systemd_listen_sockets(nxt_thread_t *thr, +static nxt_int_t nxt_cycle_systemd_listen_sockets(nxt_task_t *task, nxt_cycle_t *cycle); static nxt_int_t nxt_cycle_event_engines(nxt_thread_t *thr, nxt_cycle_t *cycle); static nxt_int_t nxt_cycle_processes(nxt_cycle_t *cycle); @@ -39,11 +39,14 @@ static nxt_sockaddr_t *nxt_cycle_sockaddr_inet_parse(nxt_str_t *addr, static nxt_int_t nxt_cycle_conf_apply(nxt_thread_t *thr, nxt_task_t *task, nxt_cycle_t *cycle); static nxt_int_t nxt_cycle_listen_socket(nxt_cycle_t *cycle); +static nxt_int_t nxt_cycle_app_listen_socket(nxt_cycle_t *cycle); static nxt_int_t nxt_cycle_hostname(nxt_thread_t *thr, nxt_cycle_t *cycle); static nxt_int_t nxt_cycle_log_files_init(nxt_cycle_t *cycle); static nxt_int_t nxt_cycle_log_files_create(nxt_cycle_t *cycle); -static nxt_int_t nxt_cycle_listen_sockets_create(nxt_cycle_t *cycle); -static void nxt_cycle_listen_sockets_close(nxt_cycle_t *cycle); +static nxt_int_t nxt_cycle_listen_sockets_create(nxt_task_t *task, + nxt_cycle_t *cycle); +static void nxt_cycle_listen_sockets_close(nxt_task_t *task, + nxt_cycle_t *cycle); static void nxt_cycle_pid_file_delete(nxt_cycle_t *cycle); static nxt_int_t nxt_cycle_shm_zones_enable(nxt_cycle_t *cycle); static nxt_int_t nxt_cycle_shm_zone_create(nxt_cycle_shm_zone_t *shm_zone); @@ -106,7 +109,7 @@ nxt_cycle_create(nxt_thread_t *thr, nxt_task_t *task, nxt_cycle_t *previous, cycle->listen_sockets = listen_sockets; if (previous == NULL) { - ret = nxt_cycle_inherited_listen_sockets(thr, cycle); + ret = nxt_cycle_inherited_listen_sockets(task, cycle); if (nxt_slow_path(ret != NXT_OK)) { goto fail; } @@ -166,7 +169,7 @@ fail: static nxt_int_t -nxt_cycle_inherited_listen_sockets(nxt_thread_t *thr, nxt_cycle_t *cycle) +nxt_cycle_inherited_listen_sockets(nxt_task_t *task, nxt_cycle_t *cycle) { u_char *v, *p; nxt_int_t type; @@ -177,11 +180,10 @@ nxt_cycle_inherited_listen_sockets(nxt_thread_t *thr, nxt_cycle_t *cycle) v = (u_char *) getenv("NGINX"); if (v == NULL) { - return nxt_cycle_systemd_listen_sockets(thr, cycle); + return nxt_cycle_systemd_listen_sockets(task, cycle); } - nxt_log_error(NXT_LOG_NOTICE, thr->log, - "using inherited listen sockets: %s", v); + nxt_log(task, NXT_LOG_CRIT, "using inherited listen sockets: %s", v); inherited_sockets = nxt_array_create(cycle->mem_pool, 1, sizeof(nxt_listen_socket_t)); @@ -197,9 +199,9 @@ nxt_cycle_inherited_listen_sockets(nxt_thread_t *thr, nxt_cycle_t *cycle) s = nxt_int_parse(v, p - v); if (nxt_slow_path(s < 0)) { - nxt_log_emerg(thr->log, "invalid socket number " - "\"%s\" in NGINX environment variable, " - "ignoring the rest of the variable", v); + nxt_log(task, NXT_LOG_CRIT, "invalid socket number " + "\"%s\" in NGINX environment variable, " + "ignoring the rest of the variable", v); return NXT_ERROR; } @@ -212,12 +214,12 @@ nxt_cycle_inherited_listen_sockets(nxt_thread_t *thr, nxt_cycle_t *cycle) ls->socket = s; - ls->sockaddr = nxt_getsockname(cycle->mem_pool, s); + ls->sockaddr = nxt_getsockname(task, cycle->mem_pool, s); if (nxt_slow_path(ls->sockaddr == NULL)) { return NXT_ERROR; } - type = nxt_socket_getsockopt(s, SOL_SOCKET, SO_TYPE); + type = nxt_socket_getsockopt(task, s, SOL_SOCKET, SO_TYPE); if (nxt_slow_path(type == -1)) { return NXT_ERROR; } @@ -231,7 +233,7 @@ nxt_cycle_inherited_listen_sockets(nxt_thread_t *thr, nxt_cycle_t *cycle) static nxt_int_t -nxt_cycle_systemd_listen_sockets(nxt_thread_t *thr, nxt_cycle_t *cycle) +nxt_cycle_systemd_listen_sockets(nxt_task_t *task, nxt_cycle_t *cycle) { u_char *nfd, *pid; nxt_int_t n; @@ -263,8 +265,7 @@ nxt_cycle_systemd_listen_sockets(nxt_thread_t *thr, nxt_cycle_t *cycle) return NXT_OK; } - nxt_log_error(NXT_LOG_NOTICE, thr->log, - "using %s systemd listen sockets", n); + nxt_log(task, NXT_LOG_INFO, "using %s systemd listen sockets", n); inherited_sockets = nxt_array_create(cycle->mem_pool, n, sizeof(nxt_listen_socket_t)); @@ -282,7 +283,7 @@ nxt_cycle_systemd_listen_sockets(nxt_thread_t *thr, nxt_cycle_t *cycle) ls->socket = s; - ls->sockaddr = nxt_getsockname(cycle->mem_pool, s); + ls->sockaddr = nxt_getsockname(task, cycle->mem_pool, s); if (nxt_slow_path(ls->sockaddr == NULL)) { return NXT_ERROR; } @@ -652,7 +653,7 @@ nxt_cycle_event_engine_change(nxt_thread_t *thr, nxt_task_t *task, { const nxt_event_interface_t *interface; - if (thr->engine->batch == cycle->batch + if (thr->engine->batch0 == cycle->batch && nxt_strcmp(thr->engine->event.name, cycle->engine) == 0) { return NXT_OK; @@ -877,7 +878,22 @@ nxt_cycle_conf_read_cmd(nxt_thread_t *thr, nxt_cycle_t *cycle) return NXT_ERROR; } - cycle->listen = sa; + cycle->app_listen = sa; + + continue; + } + + if (nxt_strcmp(p, "--upstream") == 0) { + if (*argv == NULL) { + nxt_log_emerg(thr->log, + "no argument for option \"--upstream\""); + return NXT_ERROR; + } + + p = *argv++; + + cycle->upstream.length = nxt_strlen(p); + cycle->upstream.start = (u_char *) p; continue; } @@ -1039,7 +1055,7 @@ nxt_cycle_sockaddr_unix_parse(nxt_str_t *addr, nxt_mem_pool_t *mp, #endif - sa = nxt_sockaddr_alloc(mp, socklen); + sa = nxt_sockaddr_alloc(mp, socklen, addr->length); if (nxt_slow_path(sa == NULL)) { return NULL; @@ -1206,8 +1222,8 @@ nxt_cycle_sockaddr_inet_parse(nxt_str_t *addr, nxt_mem_pool_t *mp, } } - sa = nxt_sockaddr_alloc(mp, sizeof(struct sockaddr_in)); - + sa = nxt_sockaddr_alloc(mp, sizeof(struct sockaddr_in), + NXT_INET_ADDR_STR_LEN); if (nxt_slow_path(sa == NULL)) { return NULL; } @@ -1241,6 +1257,10 @@ nxt_cycle_conf_apply(nxt_thread_t *thr, nxt_task_t *task, nxt_cycle_t *cycle) return NXT_ERROR; } + if (nxt_cycle_app_listen_socket(cycle) != NXT_OK) { + return NXT_ERROR; + } + if (nxt_cycle_listen_socket(cycle) != NXT_OK) { return NXT_ERROR; } @@ -1249,7 +1269,7 @@ nxt_cycle_conf_apply(nxt_thread_t *thr, nxt_task_t *task, nxt_cycle_t *cycle) return NXT_ERROR; } - if (nxt_cycle_listen_sockets_create(cycle) != NXT_OK) { + if (nxt_cycle_listen_sockets_create(task, cycle) != NXT_OK) { return NXT_ERROR; } @@ -1257,7 +1277,7 @@ nxt_cycle_conf_apply(nxt_thread_t *thr, nxt_task_t *task, nxt_cycle_t *cycle) return NXT_ERROR; } - nxt_cycle_listen_sockets_close(cycle); + nxt_cycle_listen_sockets_close(task, cycle); return NXT_OK; } @@ -1267,36 +1287,34 @@ static nxt_int_t nxt_cycle_listen_socket(nxt_cycle_t *cycle) { nxt_sockaddr_t *sa; -// nxt_work_queue_t *wq; nxt_listen_socket_t *ls; - if (cycle->listen == NULL) { - sa = nxt_sockaddr_alloc(cycle->mem_pool, sizeof(struct sockaddr_in)); + if (cycle->stream_listen == NULL) { + sa = nxt_sockaddr_alloc(cycle->mem_pool, sizeof(struct sockaddr_in), + NXT_INET_ADDR_STR_LEN); if (sa == NULL) { return NXT_ERROR; } sa->type = SOCK_STREAM; sa->u.sockaddr_in.sin_family = AF_INET; - sa->u.sockaddr_in.sin_port = htons(8080); + sa->u.sockaddr_in.sin_port = htons(8000); - cycle->listen = sa; + cycle->stream_listen = sa; } - if (nxt_sockaddr_text(cycle->mem_pool, cycle->listen, 1) != NXT_OK) { - return NXT_ERROR; - } + nxt_sockaddr_text(cycle->stream_listen); - ls = nxt_cycle_listen_socket_add(cycle, cycle->listen); + ls = nxt_cycle_listen_socket_add(cycle, cycle->stream_listen); if (ls == NULL) { return NXT_ERROR; } ls->read_after_accept = 1; -#if 0 ls->flags = NXT_NONBLOCK; +#if 0 /* STUB */ wq = nxt_mem_zalloc(cf->mem_pool, sizeof(nxt_work_queue_t)); if (wq == NULL) { @@ -1306,6 +1324,7 @@ nxt_cycle_listen_socket(nxt_cycle_t *cycle) /**/ ls->work_queue = wq; +#endif ls->handler = nxt_stream_connection_init; /* @@ -1316,7 +1335,40 @@ nxt_cycle_listen_socket(nxt_cycle_t *cycle) + sizeof(nxt_event_conn_proxy_t) + sizeof(nxt_event_conn_t) + 4 * sizeof(nxt_buf_t); -#endif + + return NXT_OK; +} + + +static nxt_int_t +nxt_cycle_app_listen_socket(nxt_cycle_t *cycle) +{ + nxt_sockaddr_t *sa; +// nxt_work_queue_t *wq; + nxt_listen_socket_t *ls; + + if (cycle->app_listen == NULL) { + sa = nxt_sockaddr_alloc(cycle->mem_pool, sizeof(struct sockaddr_in), + NXT_INET_ADDR_STR_LEN); + if (sa == NULL) { + return NXT_ERROR; + } + + sa->type = SOCK_STREAM; + sa->u.sockaddr_in.sin_family = AF_INET; + sa->u.sockaddr_in.sin_port = htons(8080); + + cycle->app_listen = sa; + } + + nxt_sockaddr_text(cycle->app_listen); + + ls = nxt_cycle_listen_socket_add(cycle, cycle->app_listen); + if (ls == NULL) { + return NXT_ERROR; + } + + ls->read_after_accept = 1; return NXT_OK; } @@ -1335,16 +1387,15 @@ nxt_cycle_listen_socket_add(nxt_cycle_t *cycle, nxt_sockaddr_t *sa) mp = cycle->mem_pool; - ls->sockaddr = nxt_sockaddr_create(mp, &sa->u.sockaddr, nxt_socklen(sa)); + ls->sockaddr = nxt_sockaddr_create(mp, &sa->u.sockaddr, sa->socklen, + sa->length); if (ls->sockaddr == NULL) { return NULL; } ls->sockaddr->type = sa->type; - if (nxt_sockaddr_text(mp, ls->sockaddr, 1) != NXT_OK) { - return NULL; - } + nxt_sockaddr_text(ls->sockaddr); ls->socket = -1; ls->backlog = NXT_LISTEN_BACKLOG; @@ -1483,7 +1534,7 @@ nxt_cycle_log_files_create(nxt_cycle_t *cycle) static nxt_int_t -nxt_cycle_listen_sockets_create(nxt_cycle_t *cycle) +nxt_cycle_listen_sockets_create(nxt_task_t *task, nxt_cycle_t *cycle) { nxt_uint_t c, p, ncurr, nprev; nxt_listen_socket_t *curr, *prev; @@ -1510,7 +1561,7 @@ nxt_cycle_listen_sockets_create(nxt_cycle_t *cycle) if (nxt_sockaddr_cmp(curr[c].sockaddr, prev[p].sockaddr)) { - if (nxt_listen_socket_update(&curr[c], &prev[p]) != NXT_OK) { + if (nxt_listen_socket_update(task, &curr[c], &prev[p]) != NXT_OK) { return NXT_ERROR; } @@ -1518,7 +1569,7 @@ nxt_cycle_listen_sockets_create(nxt_cycle_t *cycle) } } - if (nxt_listen_socket_create(&curr[c], cycle->test_config) != NXT_OK) { + if (nxt_listen_socket_create(task, &curr[c], 0) != NXT_OK) { return NXT_ERROR; } @@ -1532,7 +1583,7 @@ nxt_cycle_listen_sockets_create(nxt_cycle_t *cycle) static void -nxt_cycle_listen_sockets_close(nxt_cycle_t *cycle) +nxt_cycle_listen_sockets_close(nxt_task_t *task, nxt_cycle_t *cycle) { nxt_uint_t p, c, nprev, ncurr; nxt_listen_socket_t *curr, *prev; @@ -1555,7 +1606,7 @@ nxt_cycle_listen_sockets_close(nxt_cycle_t *cycle) } } - nxt_socket_close(prev[p].socket); + nxt_socket_close(task, prev[p].socket); next: @@ -1576,8 +1627,10 @@ nxt_cycle_listen_sockets_enable(nxt_task_t *task, nxt_cycle_t *cycle) n = cycle->listen_sockets->nelts; for (i = 0; i < n; i++) { - if (nxt_event_conn_listen(task, &ls[i]) != NXT_OK) { - return NXT_ERROR; + if (ls[i].flags == NXT_NONBLOCK) { + if (nxt_event_conn_listen(task, &ls[i]) != NXT_OK) { + return NXT_ERROR; + } } } |