diff options
author | Igor Sysoev <igor@sysoev.ru> | 2017-01-17 20:00:00 +0300 |
---|---|---|
committer | Igor Sysoev <igor@sysoev.ru> | 2017-01-17 20:00:00 +0300 |
commit | 16cbf3c076a0aca6d47adaf3f719493674cf2363 (patch) | |
tree | e6530480020f62a2bdbf249988ec3e2a751d3927 /src/nxt_event_conn.h | |
download | unit-16cbf3c076a0aca6d47adaf3f719493674cf2363.tar.gz unit-16cbf3c076a0aca6d47adaf3f719493674cf2363.tar.bz2 |
Initial version.
Diffstat (limited to 'src/nxt_event_conn.h')
-rw-r--r-- | src/nxt_event_conn.h | 382 |
1 files changed, 382 insertions, 0 deletions
diff --git a/src/nxt_event_conn.h b/src/nxt_event_conn.h new file mode 100644 index 00000000..fe7d794a --- /dev/null +++ b/src/nxt_event_conn.h @@ -0,0 +1,382 @@ + +/* + * Copyright (C) Igor Sysoev + * Copyright (C) NGINX, Inc. + */ + +#ifndef _NXT_EVENT_CONN_H_INCLUDED_ +#define _NXT_EVENT_CONN_H_INCLUDED_ + + +typedef nxt_msec_t (*nxt_event_conn_timer_val_t)(nxt_event_conn_t *c, + uintptr_t data); + + +#define NXT_EVENT_NO_BUF_PROCESS 0 +#define NXT_EVENT_BUF_PROCESS 1 +#define NXT_EVENT_BUF_COMPLETION 1 + +#define NXT_EVENT_TIMER_AUTORESET 1 +#define NXT_EVENT_TIMER_NO_AUTORESET 0 + + +typedef struct { + uint8_t process_buffers; + uint8_t autoreset_timer; + + nxt_work_handler_t ready_handler; + nxt_work_handler_t close_handler; + nxt_work_handler_t error_handler; + + nxt_work_handler_t timer_handler; + nxt_event_conn_timer_val_t timer_value; + uintptr_t timer_data; +} nxt_event_conn_state_t; + + +typedef struct { + double average; + size_t limit; + size_t limit_after; + size_t max_limit; + nxt_msec_t last; +} nxt_event_write_rate_t; + + +typedef struct { + + void (*connect)(nxt_thread_t *thr, void *obj, + void *data); + + void (*accept)(nxt_thread_t *thr, void *obj, + void *data); + + /* + * The read() with NULL c->read buffer waits readiness of a connection + * to avoid allocation of read buffer if the connection will time out + * or will be closed with error. The kqueue-specific read() can also + * detect case if a client did not sent anything and has just closed the + * connection without errors. In the latter case state's close_handler + * is called. + */ + void (*read)(nxt_thread_t *thr, void *obj, + void *data); + + ssize_t (*recvbuf)(nxt_event_conn_t *c, nxt_buf_t *b); + + ssize_t (*recv)(nxt_event_conn_t *c, void *buf, + size_t size, nxt_uint_t flags); + + /* + * The write() is an interface to write a buffer chain with a given rate + * limit. It calls write_chunk() in a cycle and handles write event timer. + */ + void (*write)(nxt_thread_t *thr, void *obj, + void *data); + + /* + * The write_chunk() interface writes a buffer chain with a given limit + * and toggles write event. SSL/TLS libraries' write_chunk() interface + * buffers data and calls the library specific send() interface to write + * the buffered data eventually. + */ + ssize_t (*write_chunk)(nxt_thread_t *thr, + nxt_event_conn_t *c, nxt_buf_t *b, + size_t limit); + + /* + * The sendbuf() is an interface for OS-specific sendfile + * implementations or simple writev(). + */ + ssize_t (*sendbuf)(nxt_event_conn_t *c, nxt_buf_t *b, + size_t limit); + /* + * The writev() is an interface to write several nxt_iobuf_t buffers. + */ + ssize_t (*writev)(nxt_event_conn_t *c, + nxt_iobuf_t *iob, nxt_uint_t niob); + /* + * The send() is an interface to write a single buffer. SSL/TLS + * libraries' send() interface handles also the libraries' errors. + */ + ssize_t (*send)(nxt_event_conn_t *c, void *buf, + size_t size); + + void (*shutdown)(nxt_thread_t *thr, void *obj, + void *data); +} nxt_event_conn_io_t; + + +struct nxt_event_conn_s { + /* + * Must be the first field, since nxt_event_fd_t + * and nxt_event_conn_t are used interchangeably. + */ + nxt_event_fd_t socket; + + nxt_buf_t *read; + const nxt_event_conn_state_t *read_state; + nxt_work_queue_t *read_work_queue; + nxt_event_timer_t read_timer; + + nxt_buf_t *write; + const nxt_event_conn_state_t *write_state; + nxt_work_queue_t *write_work_queue; + nxt_event_write_rate_t *rate; + nxt_event_timer_t write_timer; + + nxt_off_t sent; + uint32_t max_chunk; + uint32_t nbytes; + + nxt_event_conn_io_t *io; + +#if (NXT_SSLTLS || NXT_THREADS) + /* SunC does not support "zero-sized struct/union". */ + + union { +#if (NXT_SSLTLS) + void *ssltls; +#endif +#if (NXT_THREADS) + nxt_thread_pool_t *thread_pool; +#endif + } u; + +#endif + + nxt_mem_pool_t *mem_pool; + + nxt_log_t log; + + nxt_listen_socket_t *listen; + nxt_sockaddr_t *remote; + nxt_sockaddr_t *local; + const char *action; + + uint8_t peek; + uint8_t blocked; /* 1 bit */ + uint8_t delayed; /* 1 bit */ + +#define NXT_CONN_SENDFILE_OFF 0 +#define NXT_CONN_SENDFILE_ON 1 +#define NXT_CONN_SENDFILE_UNSET 3 + + uint8_t sendfile; /* 2 bits */ + uint8_t tcp_nodelay; /* 1 bit */ + + nxt_queue_link_t link; +}; + + +/* + * The nxt_event_conn_listen_t is separated from nxt_listen_socket_t + * because nxt_listen_socket_t is one per process whilst each worker + * thread uses own nxt_event_conn_listen_t. + */ +typedef struct { + /* Must be the first field. */ + nxt_event_fd_t socket; + + uint32_t ready; + uint32_t batch; + + /* An accept() interface is cached to minimize memory accesses. */ + void (*accept)(nxt_thread_t *thr, void *obj, + void *data); + + nxt_listen_socket_t *listen; + + nxt_event_timer_t timer; + + nxt_queue_link_t link; +} nxt_event_conn_listen_t; + + +#define \ +nxt_event_conn_io_handle(thr, wq, handler, c, data) \ + do { \ + if (thr->engine->batch != 0) { \ + nxt_thread_work_queue_add(thr, wq, handler, c, data, thr->log); \ + \ + } else { \ + handler(thr, c, data); \ + } \ + } while (0) + + +#define \ +nxt_event_conn_timer_init(ev, c, wq) \ + do { \ + (ev)->work_queue = (wq); \ + (ev)->log = &(c)->log; \ + (ev)->precision = NXT_EVENT_TIMER_DEFAULT_PRECISION; \ + nxt_event_timer_ident((ev), (c)->socket.fd); \ + } while (0) + + +#define \ +nxt_event_read_timer_conn(ev) \ + nxt_event_timer_data(ev, nxt_event_conn_t, read_timer) + + +#define \ +nxt_event_write_timer_conn(ev) \ + nxt_event_timer_data(ev, nxt_event_conn_t, write_timer) + + +#if (NXT_HAVE_UNIX_DOMAIN) + +#define \ +nxt_event_conn_tcp_nodelay_on(c) \ + do { \ + nxt_int_t ret; \ + \ + if ((c)->remote->u.sockaddr.sa_family != AF_UNIX) { \ + ret = nxt_socket_setsockopt((c)->socket.fd, IPPROTO_TCP, \ + TCP_NODELAY, 1); \ + \ + (c)->tcp_nodelay = (ret == NXT_OK); \ + } \ + } while (0) + + +#else + +#define \ +nxt_event_conn_tcp_nodelay_on(c) \ + do { \ + nxt_int_t ret; \ + \ + ret = nxt_socket_setsockopt((c)->socket.fd, IPPROTO_TCP, \ + TCP_NODELAY, 1); \ + \ + (c)->tcp_nodelay = (ret == NXT_OK); \ + } while (0) + +#endif + + +NXT_EXPORT nxt_event_conn_t *nxt_event_conn_create(nxt_mem_pool_t *mp, + nxt_log_t *log); +void nxt_event_conn_io_shutdown(nxt_thread_t *thr, void *obj, + void *data); +NXT_EXPORT void nxt_event_conn_close(nxt_thread_t *thr, nxt_event_conn_t *c); + +NXT_EXPORT void nxt_event_conn_timer(nxt_event_engine_t *engine, + nxt_event_conn_t *c, const nxt_event_conn_state_t *state, + nxt_event_timer_t *tev); +NXT_EXPORT void nxt_event_conn_work_queue_set(nxt_event_conn_t *c, + nxt_work_queue_t *wq); + +NXT_EXPORT void nxt_event_conn_connect(nxt_thread_t *thr, nxt_event_conn_t *c); +void nxt_event_conn_batch_socket(nxt_thread_t *thr, void *obj, + void *data); +void nxt_event_conn_io_connect(nxt_thread_t *thr, void *obj, + void *data); +nxt_int_t nxt_event_conn_socket(nxt_thread_t *thr, + nxt_event_conn_t *c); +void nxt_event_conn_connect_test(nxt_thread_t *thr, void *obj, + void *data); +void nxt_event_conn_connect_error(nxt_thread_t *thr, void *obj, + void *data); + +NXT_EXPORT nxt_int_t nxt_event_conn_listen(nxt_thread_t *thr, + nxt_listen_socket_t *ls); +void nxt_event_conn_io_accept(nxt_thread_t *thr, void *obj, + void *data); +NXT_EXPORT void nxt_event_conn_accept(nxt_thread_t *thr, + nxt_event_conn_listen_t *cls, nxt_event_conn_t *c); +void nxt_event_conn_accept_error(nxt_thread_t *thr, + nxt_event_conn_listen_t *cls, const char *accept_syscall, nxt_err_t err); + +NXT_EXPORT void nxt_event_conn_read(nxt_thread_t *thr, nxt_event_conn_t *c); +void nxt_event_conn_io_read(nxt_thread_t *thr, void *obj, + void *data); +ssize_t nxt_event_conn_io_recvbuf(nxt_event_conn_t *c, nxt_buf_t *b); +ssize_t nxt_event_conn_io_recv(nxt_event_conn_t *c, void *buf, + size_t size, nxt_uint_t flags); + +NXT_EXPORT void nxt_event_conn_write(nxt_thread_t *thr, nxt_event_conn_t *c); +size_t nxt_event_conn_write_limit(nxt_event_conn_t *c); +nxt_bool_t nxt_event_conn_write_delayed(nxt_event_engine_t *engine, + nxt_event_conn_t *c, size_t sent); +void nxt_event_conn_io_write(nxt_thread_t *thr, void *obj, + void *data); +ssize_t nxt_event_conn_io_write_chunk(nxt_thread_t *thr, + nxt_event_conn_t *c, nxt_buf_t *b, size_t limit); +ssize_t nxt_event_conn_io_writev(nxt_event_conn_t *c, + nxt_iobuf_t *iob, nxt_uint_t niob); +ssize_t nxt_event_conn_io_send(nxt_event_conn_t *c, void *buf, + size_t size); + +NXT_EXPORT void nxt_event_conn_job_sendfile(nxt_thread_t *thr, + nxt_event_conn_t *c); + + +#define \ +nxt_event_conn_connect_enqueue(thr, c) \ + nxt_thread_work_queue_add(thr, &thr->engine->socket_work_queue, \ + nxt_event_conn_batch_socket, \ + c, c->socket.data, c->socket.log) + + +#define \ +nxt_event_conn_read_enqueue(thr, c) \ + do { \ + c->socket.read_work_queue = &thr->engine->read_work_queue; \ + \ + nxt_thread_work_queue_add(thr, &thr->engine->read_work_queue, \ + c->io->read, c, c->socket.data, \ + c->socket.log); \ + } while (0) + + +#define \ +nxt_event_conn_write_enqueue(thr, c) \ + do { \ + c->socket.write_work_queue = &thr->engine->write_work_queue; \ + \ + nxt_thread_work_queue_add(thr, &thr->engine->write_work_queue, \ + c->io->write, c, c->socket.data, \ + c->socket.log); \ + } while (0) + + +extern nxt_event_conn_io_t nxt_unix_event_conn_io; + + +typedef struct { + /* + * Client and peer connections are not embedded because already + * existent connections can be switched to the event connection proxy. + */ + nxt_event_conn_t *client; + nxt_event_conn_t *peer; + nxt_buf_t *client_buffer; + nxt_buf_t *peer_buffer; + + size_t client_buffer_size; + size_t peer_buffer_size; + + nxt_msec_t client_wait_timeout; + nxt_msec_t connect_timeout; + nxt_msec_t reconnect_timeout; + nxt_msec_t peer_wait_timeout; + nxt_msec_t client_write_timeout; + nxt_msec_t peer_write_timeout; + + uint8_t connected; /* 1 bit */ + uint8_t delayed; /* 1 bit */ + uint8_t retries; /* 8 bits */ + + nxt_work_handler_t completion_handler; +} nxt_event_conn_proxy_t; + + +NXT_EXPORT nxt_event_conn_proxy_t *nxt_event_conn_proxy_create( + nxt_event_conn_t *c); +NXT_EXPORT void nxt_event_conn_proxy(nxt_event_conn_proxy_t *p); + + +#endif /* _NXT_EVENT_CONN_H_INCLUDED_ */ |