summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_event_conn.h
diff options
context:
space:
mode:
authorIgor Sysoev <igor@sysoev.ru>2017-02-22 15:09:59 +0300
committerIgor Sysoev <igor@sysoev.ru>2017-02-22 15:09:59 +0300
commit029942f4eb7196c2cff0d0e26bc6ff274138f7d8 (patch)
treef4686c4d7b9cd574fe94c6f4918479a580fecf75 /src/nxt_event_conn.h
parent059a8642898a6bd4b47d13a1c1d599cd44af7e1c (diff)
downloadunit-029942f4eb7196c2cff0d0e26bc6ff274138f7d8.tar.gz
unit-029942f4eb7196c2cff0d0e26bc6ff274138f7d8.tar.bz2
I/O operations refactoring.
Diffstat (limited to 'src/nxt_event_conn.h')
-rw-r--r--src/nxt_event_conn.h69
1 files changed, 32 insertions, 37 deletions
diff --git a/src/nxt_event_conn.h b/src/nxt_event_conn.h
index 3f907633..73415cfe 100644
--- a/src/nxt_event_conn.h
+++ b/src/nxt_event_conn.h
@@ -175,7 +175,7 @@ typedef struct {
nxt_task_t task;
uint32_t ready;
- uint32_t batch;
+ uint32_t batch0;
/* An accept() interface is cached to minimize memory accesses. */
nxt_work_handler_t accept;
@@ -189,18 +189,6 @@ typedef struct {
#define \
-nxt_event_conn_io_handle(thr, wq, handler, task, c, data) \
- do { \
- if (thr->engine->batch != 0) { \
- nxt_work_queue_add(wq, handler, task, c, data); \
- \
- } else { \
- handler(task, c, data); \
- } \
- } while (0)
-
-
-#define \
nxt_event_conn_timer_init(ev, c, wq) \
do { \
(ev)->work_queue = (wq); \
@@ -222,12 +210,12 @@ nxt_event_write_timer_conn(ev) \
#if (NXT_HAVE_UNIX_DOMAIN)
#define \
-nxt_event_conn_tcp_nodelay_on(c) \
+nxt_event_conn_tcp_nodelay_on(task, c) \
do { \
nxt_int_t ret; \
\
if ((c)->remote->u.sockaddr.sa_family != AF_UNIX) { \
- ret = nxt_socket_setsockopt((c)->socket.fd, IPPROTO_TCP, \
+ ret = nxt_socket_setsockopt(task, (c)->socket.fd, IPPROTO_TCP, \
TCP_NODELAY, 1); \
\
(c)->tcp_nodelay = (ret == NXT_OK); \
@@ -238,11 +226,11 @@ nxt_event_conn_tcp_nodelay_on(c) \
#else
#define \
-nxt_event_conn_tcp_nodelay_on(c) \
+nxt_event_conn_tcp_nodelay_on(task, c) \
do { \
nxt_int_t ret; \
\
- ret = nxt_socket_setsockopt((c)->socket.fd, IPPROTO_TCP, \
+ ret = nxt_socket_setsockopt(task, (c)->socket.fd, IPPROTO_TCP, \
TCP_NODELAY, 1); \
\
(c)->tcp_nodelay = (ret == NXT_OK); \
@@ -252,7 +240,7 @@ nxt_event_conn_tcp_nodelay_on(c) \
NXT_EXPORT nxt_event_conn_t *nxt_event_conn_create(nxt_mem_pool_t *mp,
- nxt_log_t *log);
+ nxt_task_t *task);
void nxt_event_conn_io_shutdown(nxt_task_t *task, void *obj, void *data);
NXT_EXPORT void nxt_event_conn_close(nxt_event_engine_t *engine,
nxt_event_conn_t *c);
@@ -262,8 +250,7 @@ NXT_EXPORT void nxt_event_conn_timer(nxt_event_engine_t *engine,
NXT_EXPORT void nxt_event_conn_work_queue_set(nxt_event_conn_t *c,
nxt_work_queue_t *wq);
-NXT_EXPORT void nxt_event_conn_connect(nxt_task_t *task, nxt_event_conn_t *c);
-void nxt_event_conn_batch_socket(nxt_task_t *task, void *obj, void *data);
+void nxt_event_conn_sys_socket(nxt_task_t *task, void *obj, void *data);
void nxt_event_conn_io_connect(nxt_task_t *task, void *obj, void *data);
nxt_int_t nxt_event_conn_socket(nxt_task_t *task, nxt_event_conn_t *c);
void nxt_event_conn_connect_test(nxt_task_t *task, void *obj, void *data);
@@ -277,17 +264,23 @@ NXT_EXPORT void nxt_event_conn_accept(nxt_task_t *task,
void nxt_event_conn_accept_error(nxt_task_t *task, nxt_event_conn_listen_t *cls,
const char *accept_syscall, nxt_err_t err);
-NXT_EXPORT void nxt_event_conn_read(nxt_task_t *task, nxt_event_conn_t *c);
+void nxt_conn_wait(nxt_event_conn_t *c);
+
void nxt_event_conn_io_read(nxt_task_t *task, void *obj, void *data);
ssize_t nxt_event_conn_io_recvbuf(nxt_event_conn_t *c, nxt_buf_t *b);
ssize_t nxt_event_conn_io_recv(nxt_event_conn_t *c, void *buf,
size_t size, nxt_uint_t flags);
-NXT_EXPORT void nxt_event_conn_write(nxt_task_t *task, nxt_event_conn_t *c);
+void nxt_conn_io_write(nxt_task_t *task, void *obj, void *data);
+ssize_t nxt_conn_io_sendbuf(nxt_task_t *task, nxt_sendbuf_t *sb);
+ssize_t nxt_conn_io_writev(nxt_task_t *task, nxt_sendbuf_t *sb,
+ nxt_iobuf_t *iob, nxt_uint_t niob);
+ssize_t nxt_conn_io_send(nxt_task_t *task, nxt_sendbuf_t *sb, void *buf,
+ size_t size);
+
size_t nxt_event_conn_write_limit(nxt_event_conn_t *c);
nxt_bool_t nxt_event_conn_write_delayed(nxt_event_engine_t *engine,
nxt_event_conn_t *c, size_t sent);
-void nxt_event_conn_io_write(nxt_task_t *task, void *obj, void *data);
ssize_t nxt_event_conn_io_write_chunk(nxt_event_conn_t *c, nxt_buf_t *b,
size_t limit);
ssize_t nxt_event_conn_io_writev(nxt_event_conn_t *c, nxt_iobuf_t *iob,
@@ -301,29 +294,30 @@ NXT_EXPORT void nxt_event_conn_job_sendfile(nxt_task_t *task,
nxt_event_conn_t *c);
-#define \
-nxt_event_conn_connect_enqueue(thr, task, c) \
- nxt_work_queue_add(&thr->engine->socket_work_queue, \
- nxt_event_conn_batch_socket, task, c, c->socket.data)
+#define nxt_event_conn_connect(engine, c) \
+ nxt_work_queue_add(&engine->socket_work_queue, nxt_event_conn_sys_socket, \
+ c->socket.task, c, c->socket.data)
-#define \
-nxt_event_conn_read_enqueue(thr, task, c) \
+#define nxt_event_conn_read(e, c) \
do { \
- c->socket.read_work_queue = &thr->engine->read_work_queue; \
+ nxt_event_engine_t *engine = e; \
\
- nxt_work_queue_add(&thr->engine->read_work_queue, \
- c->io->read, task, c, c->socket.data); \
+ c->socket.read_work_queue = &engine->read_work_queue; \
+ \
+ nxt_work_queue_add(&engine->read_work_queue, c->io->read, \
+ c->socket.task, c, c->socket.data); \
} while (0)
-#define \
-nxt_event_conn_write_enqueue(thr, task, c) \
+#define nxt_event_conn_write(e, c) \
do { \
- c->socket.write_work_queue = &thr->engine->write_work_queue; \
+ nxt_event_engine_t *engine = e; \
+ \
+ c->socket.write_work_queue = &engine->write_work_queue; \
\
- nxt_work_queue_add(&thr->engine->write_work_queue, \
- c->io->write, task, c, c->socket.data); \
+ nxt_work_queue_add(&engine->write_work_queue, c->io->write, \
+ c->socket.task, c, c->socket.data); \
} while (0)
@@ -353,6 +347,7 @@ typedef struct {
uint8_t connected; /* 1 bit */
uint8_t delayed; /* 1 bit */
uint8_t retries; /* 8 bits */
+ uint8_t retain; /* 2 bits */
nxt_work_handler_t completion_handler;
} nxt_event_conn_proxy_t;