diff options
author | Igor Sysoev <igor@sysoev.ru> | 2017-02-22 15:09:59 +0300 |
---|---|---|
committer | Igor Sysoev <igor@sysoev.ru> | 2017-02-22 15:09:59 +0300 |
commit | 029942f4eb7196c2cff0d0e26bc6ff274138f7d8 (patch) | |
tree | f4686c4d7b9cd574fe94c6f4918479a580fecf75 /src/nxt_event_conn.h | |
parent | 059a8642898a6bd4b47d13a1c1d599cd44af7e1c (diff) | |
download | unit-029942f4eb7196c2cff0d0e26bc6ff274138f7d8.tar.gz unit-029942f4eb7196c2cff0d0e26bc6ff274138f7d8.tar.bz2 |
I/O operations refactoring.
Diffstat (limited to 'src/nxt_event_conn.h')
-rw-r--r-- | src/nxt_event_conn.h | 69 |
1 files changed, 32 insertions, 37 deletions
diff --git a/src/nxt_event_conn.h b/src/nxt_event_conn.h index 3f907633..73415cfe 100644 --- a/src/nxt_event_conn.h +++ b/src/nxt_event_conn.h @@ -175,7 +175,7 @@ typedef struct { nxt_task_t task; uint32_t ready; - uint32_t batch; + uint32_t batch0; /* An accept() interface is cached to minimize memory accesses. */ nxt_work_handler_t accept; @@ -189,18 +189,6 @@ typedef struct { #define \ -nxt_event_conn_io_handle(thr, wq, handler, task, c, data) \ - do { \ - if (thr->engine->batch != 0) { \ - nxt_work_queue_add(wq, handler, task, c, data); \ - \ - } else { \ - handler(task, c, data); \ - } \ - } while (0) - - -#define \ nxt_event_conn_timer_init(ev, c, wq) \ do { \ (ev)->work_queue = (wq); \ @@ -222,12 +210,12 @@ nxt_event_write_timer_conn(ev) \ #if (NXT_HAVE_UNIX_DOMAIN) #define \ -nxt_event_conn_tcp_nodelay_on(c) \ +nxt_event_conn_tcp_nodelay_on(task, c) \ do { \ nxt_int_t ret; \ \ if ((c)->remote->u.sockaddr.sa_family != AF_UNIX) { \ - ret = nxt_socket_setsockopt((c)->socket.fd, IPPROTO_TCP, \ + ret = nxt_socket_setsockopt(task, (c)->socket.fd, IPPROTO_TCP, \ TCP_NODELAY, 1); \ \ (c)->tcp_nodelay = (ret == NXT_OK); \ @@ -238,11 +226,11 @@ nxt_event_conn_tcp_nodelay_on(c) \ #else #define \ -nxt_event_conn_tcp_nodelay_on(c) \ +nxt_event_conn_tcp_nodelay_on(task, c) \ do { \ nxt_int_t ret; \ \ - ret = nxt_socket_setsockopt((c)->socket.fd, IPPROTO_TCP, \ + ret = nxt_socket_setsockopt(task, (c)->socket.fd, IPPROTO_TCP, \ TCP_NODELAY, 1); \ \ (c)->tcp_nodelay = (ret == NXT_OK); \ @@ -252,7 +240,7 @@ nxt_event_conn_tcp_nodelay_on(c) \ NXT_EXPORT nxt_event_conn_t *nxt_event_conn_create(nxt_mem_pool_t *mp, - nxt_log_t *log); + nxt_task_t *task); void nxt_event_conn_io_shutdown(nxt_task_t *task, void *obj, void *data); NXT_EXPORT void nxt_event_conn_close(nxt_event_engine_t *engine, nxt_event_conn_t *c); @@ -262,8 +250,7 @@ NXT_EXPORT void nxt_event_conn_timer(nxt_event_engine_t *engine, NXT_EXPORT void nxt_event_conn_work_queue_set(nxt_event_conn_t *c, nxt_work_queue_t *wq); -NXT_EXPORT void nxt_event_conn_connect(nxt_task_t *task, nxt_event_conn_t *c); -void nxt_event_conn_batch_socket(nxt_task_t *task, void *obj, void *data); +void nxt_event_conn_sys_socket(nxt_task_t *task, void *obj, void *data); void nxt_event_conn_io_connect(nxt_task_t *task, void *obj, void *data); nxt_int_t nxt_event_conn_socket(nxt_task_t *task, nxt_event_conn_t *c); void nxt_event_conn_connect_test(nxt_task_t *task, void *obj, void *data); @@ -277,17 +264,23 @@ NXT_EXPORT void nxt_event_conn_accept(nxt_task_t *task, void nxt_event_conn_accept_error(nxt_task_t *task, nxt_event_conn_listen_t *cls, const char *accept_syscall, nxt_err_t err); -NXT_EXPORT void nxt_event_conn_read(nxt_task_t *task, nxt_event_conn_t *c); +void nxt_conn_wait(nxt_event_conn_t *c); + void nxt_event_conn_io_read(nxt_task_t *task, void *obj, void *data); ssize_t nxt_event_conn_io_recvbuf(nxt_event_conn_t *c, nxt_buf_t *b); ssize_t nxt_event_conn_io_recv(nxt_event_conn_t *c, void *buf, size_t size, nxt_uint_t flags); -NXT_EXPORT void nxt_event_conn_write(nxt_task_t *task, nxt_event_conn_t *c); +void nxt_conn_io_write(nxt_task_t *task, void *obj, void *data); +ssize_t nxt_conn_io_sendbuf(nxt_task_t *task, nxt_sendbuf_t *sb); +ssize_t nxt_conn_io_writev(nxt_task_t *task, nxt_sendbuf_t *sb, + nxt_iobuf_t *iob, nxt_uint_t niob); +ssize_t nxt_conn_io_send(nxt_task_t *task, nxt_sendbuf_t *sb, void *buf, + size_t size); + size_t nxt_event_conn_write_limit(nxt_event_conn_t *c); nxt_bool_t nxt_event_conn_write_delayed(nxt_event_engine_t *engine, nxt_event_conn_t *c, size_t sent); -void nxt_event_conn_io_write(nxt_task_t *task, void *obj, void *data); ssize_t nxt_event_conn_io_write_chunk(nxt_event_conn_t *c, nxt_buf_t *b, size_t limit); ssize_t nxt_event_conn_io_writev(nxt_event_conn_t *c, nxt_iobuf_t *iob, @@ -301,29 +294,30 @@ NXT_EXPORT void nxt_event_conn_job_sendfile(nxt_task_t *task, nxt_event_conn_t *c); -#define \ -nxt_event_conn_connect_enqueue(thr, task, c) \ - nxt_work_queue_add(&thr->engine->socket_work_queue, \ - nxt_event_conn_batch_socket, task, c, c->socket.data) +#define nxt_event_conn_connect(engine, c) \ + nxt_work_queue_add(&engine->socket_work_queue, nxt_event_conn_sys_socket, \ + c->socket.task, c, c->socket.data) -#define \ -nxt_event_conn_read_enqueue(thr, task, c) \ +#define nxt_event_conn_read(e, c) \ do { \ - c->socket.read_work_queue = &thr->engine->read_work_queue; \ + nxt_event_engine_t *engine = e; \ \ - nxt_work_queue_add(&thr->engine->read_work_queue, \ - c->io->read, task, c, c->socket.data); \ + c->socket.read_work_queue = &engine->read_work_queue; \ + \ + nxt_work_queue_add(&engine->read_work_queue, c->io->read, \ + c->socket.task, c, c->socket.data); \ } while (0) -#define \ -nxt_event_conn_write_enqueue(thr, task, c) \ +#define nxt_event_conn_write(e, c) \ do { \ - c->socket.write_work_queue = &thr->engine->write_work_queue; \ + nxt_event_engine_t *engine = e; \ + \ + c->socket.write_work_queue = &engine->write_work_queue; \ \ - nxt_work_queue_add(&thr->engine->write_work_queue, \ - c->io->write, task, c, c->socket.data); \ + nxt_work_queue_add(&engine->write_work_queue, c->io->write, \ + c->socket.task, c, c->socket.data); \ } while (0) @@ -353,6 +347,7 @@ typedef struct { uint8_t connected; /* 1 bit */ uint8_t delayed; /* 1 bit */ uint8_t retries; /* 8 bits */ + uint8_t retain; /* 2 bits */ nxt_work_handler_t completion_handler; } nxt_event_conn_proxy_t; |