summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_event_conn.h
diff options
context:
space:
mode:
authorIgor Sysoev <igor@sysoev.ru>2017-01-17 20:00:00 +0300
committerIgor Sysoev <igor@sysoev.ru>2017-01-17 20:00:00 +0300
commit16cbf3c076a0aca6d47adaf3f719493674cf2363 (patch)
treee6530480020f62a2bdbf249988ec3e2a751d3927 /src/nxt_event_conn.h
downloadunit-16cbf3c076a0aca6d47adaf3f719493674cf2363.tar.gz
unit-16cbf3c076a0aca6d47adaf3f719493674cf2363.tar.bz2
Initial version.
Diffstat (limited to 'src/nxt_event_conn.h')
-rw-r--r--src/nxt_event_conn.h382
1 files changed, 382 insertions, 0 deletions
diff --git a/src/nxt_event_conn.h b/src/nxt_event_conn.h
new file mode 100644
index 00000000..fe7d794a
--- /dev/null
+++ b/src/nxt_event_conn.h
@@ -0,0 +1,382 @@
+
+/*
+ * Copyright (C) Igor Sysoev
+ * Copyright (C) NGINX, Inc.
+ */
+
+#ifndef _NXT_EVENT_CONN_H_INCLUDED_
+#define _NXT_EVENT_CONN_H_INCLUDED_
+
+
+typedef nxt_msec_t (*nxt_event_conn_timer_val_t)(nxt_event_conn_t *c,
+ uintptr_t data);
+
+
+#define NXT_EVENT_NO_BUF_PROCESS 0
+#define NXT_EVENT_BUF_PROCESS 1
+#define NXT_EVENT_BUF_COMPLETION 1
+
+#define NXT_EVENT_TIMER_AUTORESET 1
+#define NXT_EVENT_TIMER_NO_AUTORESET 0
+
+
+typedef struct {
+ uint8_t process_buffers;
+ uint8_t autoreset_timer;
+
+ nxt_work_handler_t ready_handler;
+ nxt_work_handler_t close_handler;
+ nxt_work_handler_t error_handler;
+
+ nxt_work_handler_t timer_handler;
+ nxt_event_conn_timer_val_t timer_value;
+ uintptr_t timer_data;
+} nxt_event_conn_state_t;
+
+
+typedef struct {
+ double average;
+ size_t limit;
+ size_t limit_after;
+ size_t max_limit;
+ nxt_msec_t last;
+} nxt_event_write_rate_t;
+
+
+typedef struct {
+
+ void (*connect)(nxt_thread_t *thr, void *obj,
+ void *data);
+
+ void (*accept)(nxt_thread_t *thr, void *obj,
+ void *data);
+
+ /*
+ * The read() with NULL c->read buffer waits readiness of a connection
+ * to avoid allocation of read buffer if the connection will time out
+ * or will be closed with error. The kqueue-specific read() can also
+ * detect case if a client did not sent anything and has just closed the
+ * connection without errors. In the latter case state's close_handler
+ * is called.
+ */
+ void (*read)(nxt_thread_t *thr, void *obj,
+ void *data);
+
+ ssize_t (*recvbuf)(nxt_event_conn_t *c, nxt_buf_t *b);
+
+ ssize_t (*recv)(nxt_event_conn_t *c, void *buf,
+ size_t size, nxt_uint_t flags);
+
+ /*
+ * The write() is an interface to write a buffer chain with a given rate
+ * limit. It calls write_chunk() in a cycle and handles write event timer.
+ */
+ void (*write)(nxt_thread_t *thr, void *obj,
+ void *data);
+
+ /*
+ * The write_chunk() interface writes a buffer chain with a given limit
+ * and toggles write event. SSL/TLS libraries' write_chunk() interface
+ * buffers data and calls the library specific send() interface to write
+ * the buffered data eventually.
+ */
+ ssize_t (*write_chunk)(nxt_thread_t *thr,
+ nxt_event_conn_t *c, nxt_buf_t *b,
+ size_t limit);
+
+ /*
+ * The sendbuf() is an interface for OS-specific sendfile
+ * implementations or simple writev().
+ */
+ ssize_t (*sendbuf)(nxt_event_conn_t *c, nxt_buf_t *b,
+ size_t limit);
+ /*
+ * The writev() is an interface to write several nxt_iobuf_t buffers.
+ */
+ ssize_t (*writev)(nxt_event_conn_t *c,
+ nxt_iobuf_t *iob, nxt_uint_t niob);
+ /*
+ * The send() is an interface to write a single buffer. SSL/TLS
+ * libraries' send() interface handles also the libraries' errors.
+ */
+ ssize_t (*send)(nxt_event_conn_t *c, void *buf,
+ size_t size);
+
+ void (*shutdown)(nxt_thread_t *thr, void *obj,
+ void *data);
+} nxt_event_conn_io_t;
+
+
+struct nxt_event_conn_s {
+ /*
+ * Must be the first field, since nxt_event_fd_t
+ * and nxt_event_conn_t are used interchangeably.
+ */
+ nxt_event_fd_t socket;
+
+ nxt_buf_t *read;
+ const nxt_event_conn_state_t *read_state;
+ nxt_work_queue_t *read_work_queue;
+ nxt_event_timer_t read_timer;
+
+ nxt_buf_t *write;
+ const nxt_event_conn_state_t *write_state;
+ nxt_work_queue_t *write_work_queue;
+ nxt_event_write_rate_t *rate;
+ nxt_event_timer_t write_timer;
+
+ nxt_off_t sent;
+ uint32_t max_chunk;
+ uint32_t nbytes;
+
+ nxt_event_conn_io_t *io;
+
+#if (NXT_SSLTLS || NXT_THREADS)
+ /* SunC does not support "zero-sized struct/union". */
+
+ union {
+#if (NXT_SSLTLS)
+ void *ssltls;
+#endif
+#if (NXT_THREADS)
+ nxt_thread_pool_t *thread_pool;
+#endif
+ } u;
+
+#endif
+
+ nxt_mem_pool_t *mem_pool;
+
+ nxt_log_t log;
+
+ nxt_listen_socket_t *listen;
+ nxt_sockaddr_t *remote;
+ nxt_sockaddr_t *local;
+ const char *action;
+
+ uint8_t peek;
+ uint8_t blocked; /* 1 bit */
+ uint8_t delayed; /* 1 bit */
+
+#define NXT_CONN_SENDFILE_OFF 0
+#define NXT_CONN_SENDFILE_ON 1
+#define NXT_CONN_SENDFILE_UNSET 3
+
+ uint8_t sendfile; /* 2 bits */
+ uint8_t tcp_nodelay; /* 1 bit */
+
+ nxt_queue_link_t link;
+};
+
+
+/*
+ * The nxt_event_conn_listen_t is separated from nxt_listen_socket_t
+ * because nxt_listen_socket_t is one per process whilst each worker
+ * thread uses own nxt_event_conn_listen_t.
+ */
+typedef struct {
+ /* Must be the first field. */
+ nxt_event_fd_t socket;
+
+ uint32_t ready;
+ uint32_t batch;
+
+ /* An accept() interface is cached to minimize memory accesses. */
+ void (*accept)(nxt_thread_t *thr, void *obj,
+ void *data);
+
+ nxt_listen_socket_t *listen;
+
+ nxt_event_timer_t timer;
+
+ nxt_queue_link_t link;
+} nxt_event_conn_listen_t;
+
+
+#define \
+nxt_event_conn_io_handle(thr, wq, handler, c, data) \
+ do { \
+ if (thr->engine->batch != 0) { \
+ nxt_thread_work_queue_add(thr, wq, handler, c, data, thr->log); \
+ \
+ } else { \
+ handler(thr, c, data); \
+ } \
+ } while (0)
+
+
+#define \
+nxt_event_conn_timer_init(ev, c, wq) \
+ do { \
+ (ev)->work_queue = (wq); \
+ (ev)->log = &(c)->log; \
+ (ev)->precision = NXT_EVENT_TIMER_DEFAULT_PRECISION; \
+ nxt_event_timer_ident((ev), (c)->socket.fd); \
+ } while (0)
+
+
+#define \
+nxt_event_read_timer_conn(ev) \
+ nxt_event_timer_data(ev, nxt_event_conn_t, read_timer)
+
+
+#define \
+nxt_event_write_timer_conn(ev) \
+ nxt_event_timer_data(ev, nxt_event_conn_t, write_timer)
+
+
+#if (NXT_HAVE_UNIX_DOMAIN)
+
+#define \
+nxt_event_conn_tcp_nodelay_on(c) \
+ do { \
+ nxt_int_t ret; \
+ \
+ if ((c)->remote->u.sockaddr.sa_family != AF_UNIX) { \
+ ret = nxt_socket_setsockopt((c)->socket.fd, IPPROTO_TCP, \
+ TCP_NODELAY, 1); \
+ \
+ (c)->tcp_nodelay = (ret == NXT_OK); \
+ } \
+ } while (0)
+
+
+#else
+
+#define \
+nxt_event_conn_tcp_nodelay_on(c) \
+ do { \
+ nxt_int_t ret; \
+ \
+ ret = nxt_socket_setsockopt((c)->socket.fd, IPPROTO_TCP, \
+ TCP_NODELAY, 1); \
+ \
+ (c)->tcp_nodelay = (ret == NXT_OK); \
+ } while (0)
+
+#endif
+
+
+NXT_EXPORT nxt_event_conn_t *nxt_event_conn_create(nxt_mem_pool_t *mp,
+ nxt_log_t *log);
+void nxt_event_conn_io_shutdown(nxt_thread_t *thr, void *obj,
+ void *data);
+NXT_EXPORT void nxt_event_conn_close(nxt_thread_t *thr, nxt_event_conn_t *c);
+
+NXT_EXPORT void nxt_event_conn_timer(nxt_event_engine_t *engine,
+ nxt_event_conn_t *c, const nxt_event_conn_state_t *state,
+ nxt_event_timer_t *tev);
+NXT_EXPORT void nxt_event_conn_work_queue_set(nxt_event_conn_t *c,
+ nxt_work_queue_t *wq);
+
+NXT_EXPORT void nxt_event_conn_connect(nxt_thread_t *thr, nxt_event_conn_t *c);
+void nxt_event_conn_batch_socket(nxt_thread_t *thr, void *obj,
+ void *data);
+void nxt_event_conn_io_connect(nxt_thread_t *thr, void *obj,
+ void *data);
+nxt_int_t nxt_event_conn_socket(nxt_thread_t *thr,
+ nxt_event_conn_t *c);
+void nxt_event_conn_connect_test(nxt_thread_t *thr, void *obj,
+ void *data);
+void nxt_event_conn_connect_error(nxt_thread_t *thr, void *obj,
+ void *data);
+
+NXT_EXPORT nxt_int_t nxt_event_conn_listen(nxt_thread_t *thr,
+ nxt_listen_socket_t *ls);
+void nxt_event_conn_io_accept(nxt_thread_t *thr, void *obj,
+ void *data);
+NXT_EXPORT void nxt_event_conn_accept(nxt_thread_t *thr,
+ nxt_event_conn_listen_t *cls, nxt_event_conn_t *c);
+void nxt_event_conn_accept_error(nxt_thread_t *thr,
+ nxt_event_conn_listen_t *cls, const char *accept_syscall, nxt_err_t err);
+
+NXT_EXPORT void nxt_event_conn_read(nxt_thread_t *thr, nxt_event_conn_t *c);
+void nxt_event_conn_io_read(nxt_thread_t *thr, void *obj,
+ void *data);
+ssize_t nxt_event_conn_io_recvbuf(nxt_event_conn_t *c, nxt_buf_t *b);
+ssize_t nxt_event_conn_io_recv(nxt_event_conn_t *c, void *buf,
+ size_t size, nxt_uint_t flags);
+
+NXT_EXPORT void nxt_event_conn_write(nxt_thread_t *thr, nxt_event_conn_t *c);
+size_t nxt_event_conn_write_limit(nxt_event_conn_t *c);
+nxt_bool_t nxt_event_conn_write_delayed(nxt_event_engine_t *engine,
+ nxt_event_conn_t *c, size_t sent);
+void nxt_event_conn_io_write(nxt_thread_t *thr, void *obj,
+ void *data);
+ssize_t nxt_event_conn_io_write_chunk(nxt_thread_t *thr,
+ nxt_event_conn_t *c, nxt_buf_t *b, size_t limit);
+ssize_t nxt_event_conn_io_writev(nxt_event_conn_t *c,
+ nxt_iobuf_t *iob, nxt_uint_t niob);
+ssize_t nxt_event_conn_io_send(nxt_event_conn_t *c, void *buf,
+ size_t size);
+
+NXT_EXPORT void nxt_event_conn_job_sendfile(nxt_thread_t *thr,
+ nxt_event_conn_t *c);
+
+
+#define \
+nxt_event_conn_connect_enqueue(thr, c) \
+ nxt_thread_work_queue_add(thr, &thr->engine->socket_work_queue, \
+ nxt_event_conn_batch_socket, \
+ c, c->socket.data, c->socket.log)
+
+
+#define \
+nxt_event_conn_read_enqueue(thr, c) \
+ do { \
+ c->socket.read_work_queue = &thr->engine->read_work_queue; \
+ \
+ nxt_thread_work_queue_add(thr, &thr->engine->read_work_queue, \
+ c->io->read, c, c->socket.data, \
+ c->socket.log); \
+ } while (0)
+
+
+#define \
+nxt_event_conn_write_enqueue(thr, c) \
+ do { \
+ c->socket.write_work_queue = &thr->engine->write_work_queue; \
+ \
+ nxt_thread_work_queue_add(thr, &thr->engine->write_work_queue, \
+ c->io->write, c, c->socket.data, \
+ c->socket.log); \
+ } while (0)
+
+
+extern nxt_event_conn_io_t nxt_unix_event_conn_io;
+
+
+typedef struct {
+ /*
+ * Client and peer connections are not embedded because already
+ * existent connections can be switched to the event connection proxy.
+ */
+ nxt_event_conn_t *client;
+ nxt_event_conn_t *peer;
+ nxt_buf_t *client_buffer;
+ nxt_buf_t *peer_buffer;
+
+ size_t client_buffer_size;
+ size_t peer_buffer_size;
+
+ nxt_msec_t client_wait_timeout;
+ nxt_msec_t connect_timeout;
+ nxt_msec_t reconnect_timeout;
+ nxt_msec_t peer_wait_timeout;
+ nxt_msec_t client_write_timeout;
+ nxt_msec_t peer_write_timeout;
+
+ uint8_t connected; /* 1 bit */
+ uint8_t delayed; /* 1 bit */
+ uint8_t retries; /* 8 bits */
+
+ nxt_work_handler_t completion_handler;
+} nxt_event_conn_proxy_t;
+
+
+NXT_EXPORT nxt_event_conn_proxy_t *nxt_event_conn_proxy_create(
+ nxt_event_conn_t *c);
+NXT_EXPORT void nxt_event_conn_proxy(nxt_event_conn_proxy_t *p);
+
+
+#endif /* _NXT_EVENT_CONN_H_INCLUDED_ */