diff options
Diffstat (limited to '')
-rw-r--r-- | src/nxt_event_conn_connect.c | 213 |
1 files changed, 213 insertions, 0 deletions
diff --git a/src/nxt_event_conn_connect.c b/src/nxt_event_conn_connect.c new file mode 100644 index 00000000..f614dcda --- /dev/null +++ b/src/nxt_event_conn_connect.c @@ -0,0 +1,213 @@ + +/* + * Copyright (C) Igor Sysoev + * Copyright (C) NGINX, Inc. + */ + +#include <nxt_main.h> + + +void +nxt_event_conn_connect(nxt_thread_t *thr, nxt_event_conn_t *c) +{ + void *data; + + data = c->socket.data; + + if (thr->engine->batch != 0) { + nxt_thread_work_queue_add(thr, &thr->engine->socket_work_queue, + nxt_event_conn_batch_socket, c, data, + c->socket.log); + return; + } + + if (nxt_event_conn_socket(thr, c) == NXT_OK) { + c->io->connect(thr, c, data); + return; + } + + c->write_state->error_handler(thr, c, data); +} + + +void +nxt_event_conn_batch_socket(nxt_thread_t *thr, void *obj, void *data) +{ + nxt_event_conn_t *c; + nxt_work_handler_t handler; + + c = obj; + + if (nxt_event_conn_socket(thr, c) == NXT_OK) { + c->socket.write_work_queue = c->write_work_queue; + handler = c->io->connect; + + } else { + handler = c->write_state->error_handler; + } + + nxt_thread_work_queue_add(thr, &thr->engine->connect_work_queue, + handler, c, data, thr->log); +} + + +void +nxt_event_conn_io_connect(nxt_thread_t *thr, void *obj, void *data) +{ + nxt_event_conn_t *c; + nxt_work_handler_t handler; + const nxt_event_conn_state_t *state; + + c = obj; + + state = c->write_state; + + switch (nxt_socket_connect(c->socket.fd, c->remote)) { + + case NXT_OK: + c->socket.write_ready = 1; + handler = state->ready_handler; + break; + + case NXT_AGAIN: + c->socket.write_handler = nxt_event_conn_connect_test; + c->socket.error_handler = state->error_handler; + + nxt_event_conn_timer(thr->engine, c, state, &c->write_timer); + + nxt_event_fd_enable_write(thr->engine, &c->socket); + return; + + case NXT_DECLINED: + handler = state->close_handler; + break; + + default: /* NXT_ERROR */ + handler = state->error_handler; + break; + } + + nxt_event_conn_io_handle(thr, c->write_work_queue, handler, c, data); +} + + +nxt_int_t +nxt_event_conn_socket(nxt_thread_t *thr, nxt_event_conn_t *c) +{ + nxt_uint_t family; + nxt_socket_t s; + + nxt_log_debug(thr->log, "event conn socket"); + + family = c->remote->u.sockaddr.sa_family; + + s = nxt_socket_create(family, c->remote->type, 0, NXT_NONBLOCK); + + if (nxt_slow_path(s == -1)) { + return NXT_ERROR; + } + + c->sendfile = 1; + +#if (NXT_HAVE_UNIX_DOMAIN && NXT_SOLARIS) + + if (family == AF_UNIX) { + /* Solaris AF_UNIX does not support sendfilev(). */ + c->sendfile = 0; + } + +#endif + + c->socket.fd = s; + nxt_event_timer_ident(&c->read_timer, s); + nxt_event_timer_ident(&c->write_timer, s); + + if (c->local != NULL) { + if (nxt_slow_path(nxt_socket_bind(s, c->local, 0) != NXT_OK)) { + nxt_socket_close(s); + return NXT_ERROR; + } + } + + return NXT_OK; +} + + +void +nxt_event_conn_connect_test(nxt_thread_t *thr, void *obj, void *data) +{ + int ret, err; + socklen_t len; + nxt_event_conn_t *c; + + c = obj; + + nxt_log_debug(thr->log, "event connect test fd:%d", c->socket.fd); + + nxt_event_fd_block_write(thr->engine, &c->socket); + + if (c->write_state->autoreset_timer) { + nxt_event_timer_disable(&c->write_timer); + } + + err = 0; + len = sizeof(int); + + /* + * Linux and BSDs return 0 and store a pending error in the err argument; + * Solaris returns -1 and sets the errno. + */ + + ret = getsockopt(c->socket.fd, SOL_SOCKET, SO_ERROR, (void *) &err, &len); + + if (nxt_slow_path(ret == -1)) { + err = nxt_errno; + } + + if (err == 0) { + nxt_event_conn_io_handle(thr, c->write_work_queue, + c->write_state->ready_handler, c, data); + return; + } + + c->socket.error = err; + + nxt_log_error(nxt_socket_error_level(err, c->socket.log_error), thr->log, + "connect(%d, %*s) failed %E", + c->socket.fd, c->remote->text_len, c->remote->text, err); + + nxt_event_conn_connect_error(thr, c, data); +} + + +void +nxt_event_conn_connect_error(nxt_thread_t *thr, void *obj, void *data) +{ + nxt_event_conn_t *c; + nxt_work_handler_t handler; + const nxt_event_conn_state_t *state; + + c = obj; + + state = c->write_state; + + switch (c->socket.error) { + + case NXT_ECONNREFUSED: +#if (NXT_LINUX) + case NXT_EAGAIN: + /* + * Linux returns EAGAIN instead of ECONNREFUSED + * for UNIX sockets if a listen queue is full. + */ +#endif + handler = state->close_handler; + break; + + default: + handler = state->error_handler; + break; + } + + nxt_event_conn_io_handle(thr, c->write_work_queue, handler, c, data); +} |