summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_cycle.c
diff options
context:
space:
mode:
authorIgor Sysoev <igor@sysoev.ru>2017-02-22 15:09:59 +0300
committerIgor Sysoev <igor@sysoev.ru>2017-02-22 15:09:59 +0300
commit029942f4eb7196c2cff0d0e26bc6ff274138f7d8 (patch)
treef4686c4d7b9cd574fe94c6f4918479a580fecf75 /src/nxt_cycle.c
parent059a8642898a6bd4b47d13a1c1d599cd44af7e1c (diff)
downloadunit-029942f4eb7196c2cff0d0e26bc6ff274138f7d8.tar.gz
unit-029942f4eb7196c2cff0d0e26bc6ff274138f7d8.tar.bz2
I/O operations refactoring.
Diffstat (limited to '')
-rw-r--r--src/nxt_cycle.c147
1 files changed, 100 insertions, 47 deletions
diff --git a/src/nxt_cycle.c b/src/nxt_cycle.c
index a39bb02a..81ad8ce2 100644
--- a/src/nxt_cycle.c
+++ b/src/nxt_cycle.c
@@ -11,9 +11,9 @@
#include <nxt_master_process.h>
-static nxt_int_t nxt_cycle_inherited_listen_sockets(nxt_thread_t *thr,
+static nxt_int_t nxt_cycle_inherited_listen_sockets(nxt_task_t *task,
nxt_cycle_t *cycle);
-static nxt_int_t nxt_cycle_systemd_listen_sockets(nxt_thread_t *thr,
+static nxt_int_t nxt_cycle_systemd_listen_sockets(nxt_task_t *task,
nxt_cycle_t *cycle);
static nxt_int_t nxt_cycle_event_engines(nxt_thread_t *thr, nxt_cycle_t *cycle);
static nxt_int_t nxt_cycle_processes(nxt_cycle_t *cycle);
@@ -39,11 +39,14 @@ static nxt_sockaddr_t *nxt_cycle_sockaddr_inet_parse(nxt_str_t *addr,
static nxt_int_t nxt_cycle_conf_apply(nxt_thread_t *thr, nxt_task_t *task,
nxt_cycle_t *cycle);
static nxt_int_t nxt_cycle_listen_socket(nxt_cycle_t *cycle);
+static nxt_int_t nxt_cycle_app_listen_socket(nxt_cycle_t *cycle);
static nxt_int_t nxt_cycle_hostname(nxt_thread_t *thr, nxt_cycle_t *cycle);
static nxt_int_t nxt_cycle_log_files_init(nxt_cycle_t *cycle);
static nxt_int_t nxt_cycle_log_files_create(nxt_cycle_t *cycle);
-static nxt_int_t nxt_cycle_listen_sockets_create(nxt_cycle_t *cycle);
-static void nxt_cycle_listen_sockets_close(nxt_cycle_t *cycle);
+static nxt_int_t nxt_cycle_listen_sockets_create(nxt_task_t *task,
+ nxt_cycle_t *cycle);
+static void nxt_cycle_listen_sockets_close(nxt_task_t *task,
+ nxt_cycle_t *cycle);
static void nxt_cycle_pid_file_delete(nxt_cycle_t *cycle);
static nxt_int_t nxt_cycle_shm_zones_enable(nxt_cycle_t *cycle);
static nxt_int_t nxt_cycle_shm_zone_create(nxt_cycle_shm_zone_t *shm_zone);
@@ -106,7 +109,7 @@ nxt_cycle_create(nxt_thread_t *thr, nxt_task_t *task, nxt_cycle_t *previous,
cycle->listen_sockets = listen_sockets;
if (previous == NULL) {
- ret = nxt_cycle_inherited_listen_sockets(thr, cycle);
+ ret = nxt_cycle_inherited_listen_sockets(task, cycle);
if (nxt_slow_path(ret != NXT_OK)) {
goto fail;
}
@@ -166,7 +169,7 @@ fail:
static nxt_int_t
-nxt_cycle_inherited_listen_sockets(nxt_thread_t *thr, nxt_cycle_t *cycle)
+nxt_cycle_inherited_listen_sockets(nxt_task_t *task, nxt_cycle_t *cycle)
{
u_char *v, *p;
nxt_int_t type;
@@ -177,11 +180,10 @@ nxt_cycle_inherited_listen_sockets(nxt_thread_t *thr, nxt_cycle_t *cycle)
v = (u_char *) getenv("NGINX");
if (v == NULL) {
- return nxt_cycle_systemd_listen_sockets(thr, cycle);
+ return nxt_cycle_systemd_listen_sockets(task, cycle);
}
- nxt_log_error(NXT_LOG_NOTICE, thr->log,
- "using inherited listen sockets: %s", v);
+ nxt_log(task, NXT_LOG_CRIT, "using inherited listen sockets: %s", v);
inherited_sockets = nxt_array_create(cycle->mem_pool,
1, sizeof(nxt_listen_socket_t));
@@ -197,9 +199,9 @@ nxt_cycle_inherited_listen_sockets(nxt_thread_t *thr, nxt_cycle_t *cycle)
s = nxt_int_parse(v, p - v);
if (nxt_slow_path(s < 0)) {
- nxt_log_emerg(thr->log, "invalid socket number "
- "\"%s\" in NGINX environment variable, "
- "ignoring the rest of the variable", v);
+ nxt_log(task, NXT_LOG_CRIT, "invalid socket number "
+ "\"%s\" in NGINX environment variable, "
+ "ignoring the rest of the variable", v);
return NXT_ERROR;
}
@@ -212,12 +214,12 @@ nxt_cycle_inherited_listen_sockets(nxt_thread_t *thr, nxt_cycle_t *cycle)
ls->socket = s;
- ls->sockaddr = nxt_getsockname(cycle->mem_pool, s);
+ ls->sockaddr = nxt_getsockname(task, cycle->mem_pool, s);
if (nxt_slow_path(ls->sockaddr == NULL)) {
return NXT_ERROR;
}
- type = nxt_socket_getsockopt(s, SOL_SOCKET, SO_TYPE);
+ type = nxt_socket_getsockopt(task, s, SOL_SOCKET, SO_TYPE);
if (nxt_slow_path(type == -1)) {
return NXT_ERROR;
}
@@ -231,7 +233,7 @@ nxt_cycle_inherited_listen_sockets(nxt_thread_t *thr, nxt_cycle_t *cycle)
static nxt_int_t
-nxt_cycle_systemd_listen_sockets(nxt_thread_t *thr, nxt_cycle_t *cycle)
+nxt_cycle_systemd_listen_sockets(nxt_task_t *task, nxt_cycle_t *cycle)
{
u_char *nfd, *pid;
nxt_int_t n;
@@ -263,8 +265,7 @@ nxt_cycle_systemd_listen_sockets(nxt_thread_t *thr, nxt_cycle_t *cycle)
return NXT_OK;
}
- nxt_log_error(NXT_LOG_NOTICE, thr->log,
- "using %s systemd listen sockets", n);
+ nxt_log(task, NXT_LOG_INFO, "using %s systemd listen sockets", n);
inherited_sockets = nxt_array_create(cycle->mem_pool,
n, sizeof(nxt_listen_socket_t));
@@ -282,7 +283,7 @@ nxt_cycle_systemd_listen_sockets(nxt_thread_t *thr, nxt_cycle_t *cycle)
ls->socket = s;
- ls->sockaddr = nxt_getsockname(cycle->mem_pool, s);
+ ls->sockaddr = nxt_getsockname(task, cycle->mem_pool, s);
if (nxt_slow_path(ls->sockaddr == NULL)) {
return NXT_ERROR;
}
@@ -652,7 +653,7 @@ nxt_cycle_event_engine_change(nxt_thread_t *thr, nxt_task_t *task,
{
const nxt_event_interface_t *interface;
- if (thr->engine->batch == cycle->batch
+ if (thr->engine->batch0 == cycle->batch
&& nxt_strcmp(thr->engine->event.name, cycle->engine) == 0)
{
return NXT_OK;
@@ -877,7 +878,22 @@ nxt_cycle_conf_read_cmd(nxt_thread_t *thr, nxt_cycle_t *cycle)
return NXT_ERROR;
}
- cycle->listen = sa;
+ cycle->app_listen = sa;
+
+ continue;
+ }
+
+ if (nxt_strcmp(p, "--upstream") == 0) {
+ if (*argv == NULL) {
+ nxt_log_emerg(thr->log,
+ "no argument for option \"--upstream\"");
+ return NXT_ERROR;
+ }
+
+ p = *argv++;
+
+ cycle->upstream.length = nxt_strlen(p);
+ cycle->upstream.start = (u_char *) p;
continue;
}
@@ -1039,7 +1055,7 @@ nxt_cycle_sockaddr_unix_parse(nxt_str_t *addr, nxt_mem_pool_t *mp,
#endif
- sa = nxt_sockaddr_alloc(mp, socklen);
+ sa = nxt_sockaddr_alloc(mp, socklen, addr->length);
if (nxt_slow_path(sa == NULL)) {
return NULL;
@@ -1206,8 +1222,8 @@ nxt_cycle_sockaddr_inet_parse(nxt_str_t *addr, nxt_mem_pool_t *mp,
}
}
- sa = nxt_sockaddr_alloc(mp, sizeof(struct sockaddr_in));
-
+ sa = nxt_sockaddr_alloc(mp, sizeof(struct sockaddr_in),
+ NXT_INET_ADDR_STR_LEN);
if (nxt_slow_path(sa == NULL)) {
return NULL;
}
@@ -1241,6 +1257,10 @@ nxt_cycle_conf_apply(nxt_thread_t *thr, nxt_task_t *task, nxt_cycle_t *cycle)
return NXT_ERROR;
}
+ if (nxt_cycle_app_listen_socket(cycle) != NXT_OK) {
+ return NXT_ERROR;
+ }
+
if (nxt_cycle_listen_socket(cycle) != NXT_OK) {
return NXT_ERROR;
}
@@ -1249,7 +1269,7 @@ nxt_cycle_conf_apply(nxt_thread_t *thr, nxt_task_t *task, nxt_cycle_t *cycle)
return NXT_ERROR;
}
- if (nxt_cycle_listen_sockets_create(cycle) != NXT_OK) {
+ if (nxt_cycle_listen_sockets_create(task, cycle) != NXT_OK) {
return NXT_ERROR;
}
@@ -1257,7 +1277,7 @@ nxt_cycle_conf_apply(nxt_thread_t *thr, nxt_task_t *task, nxt_cycle_t *cycle)
return NXT_ERROR;
}
- nxt_cycle_listen_sockets_close(cycle);
+ nxt_cycle_listen_sockets_close(task, cycle);
return NXT_OK;
}
@@ -1267,36 +1287,34 @@ static nxt_int_t
nxt_cycle_listen_socket(nxt_cycle_t *cycle)
{
nxt_sockaddr_t *sa;
-// nxt_work_queue_t *wq;
nxt_listen_socket_t *ls;
- if (cycle->listen == NULL) {
- sa = nxt_sockaddr_alloc(cycle->mem_pool, sizeof(struct sockaddr_in));
+ if (cycle->stream_listen == NULL) {
+ sa = nxt_sockaddr_alloc(cycle->mem_pool, sizeof(struct sockaddr_in),
+ NXT_INET_ADDR_STR_LEN);
if (sa == NULL) {
return NXT_ERROR;
}
sa->type = SOCK_STREAM;
sa->u.sockaddr_in.sin_family = AF_INET;
- sa->u.sockaddr_in.sin_port = htons(8080);
+ sa->u.sockaddr_in.sin_port = htons(8000);
- cycle->listen = sa;
+ cycle->stream_listen = sa;
}
- if (nxt_sockaddr_text(cycle->mem_pool, cycle->listen, 1) != NXT_OK) {
- return NXT_ERROR;
- }
+ nxt_sockaddr_text(cycle->stream_listen);
- ls = nxt_cycle_listen_socket_add(cycle, cycle->listen);
+ ls = nxt_cycle_listen_socket_add(cycle, cycle->stream_listen);
if (ls == NULL) {
return NXT_ERROR;
}
ls->read_after_accept = 1;
-#if 0
ls->flags = NXT_NONBLOCK;
+#if 0
/* STUB */
wq = nxt_mem_zalloc(cf->mem_pool, sizeof(nxt_work_queue_t));
if (wq == NULL) {
@@ -1306,6 +1324,7 @@ nxt_cycle_listen_socket(nxt_cycle_t *cycle)
/**/
ls->work_queue = wq;
+#endif
ls->handler = nxt_stream_connection_init;
/*
@@ -1316,7 +1335,40 @@ nxt_cycle_listen_socket(nxt_cycle_t *cycle)
+ sizeof(nxt_event_conn_proxy_t)
+ sizeof(nxt_event_conn_t)
+ 4 * sizeof(nxt_buf_t);
-#endif
+
+ return NXT_OK;
+}
+
+
+static nxt_int_t
+nxt_cycle_app_listen_socket(nxt_cycle_t *cycle)
+{
+ nxt_sockaddr_t *sa;
+// nxt_work_queue_t *wq;
+ nxt_listen_socket_t *ls;
+
+ if (cycle->app_listen == NULL) {
+ sa = nxt_sockaddr_alloc(cycle->mem_pool, sizeof(struct sockaddr_in),
+ NXT_INET_ADDR_STR_LEN);
+ if (sa == NULL) {
+ return NXT_ERROR;
+ }
+
+ sa->type = SOCK_STREAM;
+ sa->u.sockaddr_in.sin_family = AF_INET;
+ sa->u.sockaddr_in.sin_port = htons(8080);
+
+ cycle->app_listen = sa;
+ }
+
+ nxt_sockaddr_text(cycle->app_listen);
+
+ ls = nxt_cycle_listen_socket_add(cycle, cycle->app_listen);
+ if (ls == NULL) {
+ return NXT_ERROR;
+ }
+
+ ls->read_after_accept = 1;
return NXT_OK;
}
@@ -1335,16 +1387,15 @@ nxt_cycle_listen_socket_add(nxt_cycle_t *cycle, nxt_sockaddr_t *sa)
mp = cycle->mem_pool;
- ls->sockaddr = nxt_sockaddr_create(mp, &sa->u.sockaddr, nxt_socklen(sa));
+ ls->sockaddr = nxt_sockaddr_create(mp, &sa->u.sockaddr, sa->socklen,
+ sa->length);
if (ls->sockaddr == NULL) {
return NULL;
}
ls->sockaddr->type = sa->type;
- if (nxt_sockaddr_text(mp, ls->sockaddr, 1) != NXT_OK) {
- return NULL;
- }
+ nxt_sockaddr_text(ls->sockaddr);
ls->socket = -1;
ls->backlog = NXT_LISTEN_BACKLOG;
@@ -1483,7 +1534,7 @@ nxt_cycle_log_files_create(nxt_cycle_t *cycle)
static nxt_int_t
-nxt_cycle_listen_sockets_create(nxt_cycle_t *cycle)
+nxt_cycle_listen_sockets_create(nxt_task_t *task, nxt_cycle_t *cycle)
{
nxt_uint_t c, p, ncurr, nprev;
nxt_listen_socket_t *curr, *prev;
@@ -1510,7 +1561,7 @@ nxt_cycle_listen_sockets_create(nxt_cycle_t *cycle)
if (nxt_sockaddr_cmp(curr[c].sockaddr, prev[p].sockaddr)) {
- if (nxt_listen_socket_update(&curr[c], &prev[p]) != NXT_OK) {
+ if (nxt_listen_socket_update(task, &curr[c], &prev[p]) != NXT_OK) {
return NXT_ERROR;
}
@@ -1518,7 +1569,7 @@ nxt_cycle_listen_sockets_create(nxt_cycle_t *cycle)
}
}
- if (nxt_listen_socket_create(&curr[c], cycle->test_config) != NXT_OK) {
+ if (nxt_listen_socket_create(task, &curr[c], 0) != NXT_OK) {
return NXT_ERROR;
}
@@ -1532,7 +1583,7 @@ nxt_cycle_listen_sockets_create(nxt_cycle_t *cycle)
static void
-nxt_cycle_listen_sockets_close(nxt_cycle_t *cycle)
+nxt_cycle_listen_sockets_close(nxt_task_t *task, nxt_cycle_t *cycle)
{
nxt_uint_t p, c, nprev, ncurr;
nxt_listen_socket_t *curr, *prev;
@@ -1555,7 +1606,7 @@ nxt_cycle_listen_sockets_close(nxt_cycle_t *cycle)
}
}
- nxt_socket_close(prev[p].socket);
+ nxt_socket_close(task, prev[p].socket);
next:
@@ -1576,8 +1627,10 @@ nxt_cycle_listen_sockets_enable(nxt_task_t *task, nxt_cycle_t *cycle)
n = cycle->listen_sockets->nelts;
for (i = 0; i < n; i++) {
- if (nxt_event_conn_listen(task, &ls[i]) != NXT_OK) {
- return NXT_ERROR;
+ if (ls[i].flags == NXT_NONBLOCK) {
+ if (nxt_event_conn_listen(task, &ls[i]) != NXT_OK) {
+ return NXT_ERROR;
+ }
}
}