summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_event_conn.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/nxt_event_conn.h')
-rw-r--r--src/nxt_event_conn.h108
1 files changed, 46 insertions, 62 deletions
diff --git a/src/nxt_event_conn.h b/src/nxt_event_conn.h
index fe7d794a..c35dcf42 100644
--- a/src/nxt_event_conn.h
+++ b/src/nxt_event_conn.h
@@ -45,11 +45,8 @@ typedef struct {
typedef struct {
- void (*connect)(nxt_thread_t *thr, void *obj,
- void *data);
-
- void (*accept)(nxt_thread_t *thr, void *obj,
- void *data);
+ nxt_work_handler_t connect;
+ nxt_work_handler_t accept;
/*
* The read() with NULL c->read buffer waits readiness of a connection
@@ -59,8 +56,7 @@ typedef struct {
* connection without errors. In the latter case state's close_handler
* is called.
*/
- void (*read)(nxt_thread_t *thr, void *obj,
- void *data);
+ nxt_work_handler_t read;
ssize_t (*recvbuf)(nxt_event_conn_t *c, nxt_buf_t *b);
@@ -71,8 +67,7 @@ typedef struct {
* The write() is an interface to write a buffer chain with a given rate
* limit. It calls write_chunk() in a cycle and handles write event timer.
*/
- void (*write)(nxt_thread_t *thr, void *obj,
- void *data);
+ nxt_work_handler_t write;
/*
* The write_chunk() interface writes a buffer chain with a given limit
@@ -80,9 +75,8 @@ typedef struct {
* buffers data and calls the library specific send() interface to write
* the buffered data eventually.
*/
- ssize_t (*write_chunk)(nxt_thread_t *thr,
- nxt_event_conn_t *c, nxt_buf_t *b,
- size_t limit);
+ ssize_t (*write_chunk)(nxt_event_conn_t *c,
+ nxt_buf_t *b, size_t limit);
/*
* The sendbuf() is an interface for OS-specific sendfile
@@ -102,8 +96,7 @@ typedef struct {
ssize_t (*send)(nxt_event_conn_t *c, void *buf,
size_t size);
- void (*shutdown)(nxt_thread_t *thr, void *obj,
- void *data);
+ nxt_work_handler_t shutdown;
} nxt_event_conn_io_t;
@@ -147,6 +140,7 @@ struct nxt_event_conn_s {
nxt_mem_pool_t *mem_pool;
+ nxt_task_t task;
nxt_log_t log;
nxt_listen_socket_t *listen;
@@ -178,12 +172,13 @@ typedef struct {
/* Must be the first field. */
nxt_event_fd_t socket;
+ nxt_task_t task;
+
uint32_t ready;
uint32_t batch;
/* An accept() interface is cached to minimize memory accesses. */
- void (*accept)(nxt_thread_t *thr, void *obj,
- void *data);
+ nxt_work_handler_t accept;
nxt_listen_socket_t *listen;
@@ -194,13 +189,13 @@ typedef struct {
#define \
-nxt_event_conn_io_handle(thr, wq, handler, c, data) \
+nxt_event_conn_io_handle(thr, wq, handler, task, c, data) \
do { \
if (thr->engine->batch != 0) { \
- nxt_thread_work_queue_add(thr, wq, handler, c, data, thr->log); \
+ nxt_thread_work_queue_add(thr, wq, handler, task, c, data); \
\
} else { \
- handler(thr, c, data); \
+ handler(task, c, data); \
} \
} while (0)
@@ -259,9 +254,8 @@ nxt_event_conn_tcp_nodelay_on(c) \
NXT_EXPORT nxt_event_conn_t *nxt_event_conn_create(nxt_mem_pool_t *mp,
nxt_log_t *log);
-void nxt_event_conn_io_shutdown(nxt_thread_t *thr, void *obj,
- void *data);
-NXT_EXPORT void nxt_event_conn_close(nxt_thread_t *thr, nxt_event_conn_t *c);
+void nxt_event_conn_io_shutdown(nxt_task_t *task, void *obj, void *data);
+NXT_EXPORT void nxt_event_conn_close(nxt_task_t *task, nxt_event_conn_t *c);
NXT_EXPORT void nxt_event_conn_timer(nxt_event_engine_t *engine,
nxt_event_conn_t *c, const nxt_event_conn_state_t *state,
@@ -269,77 +263,66 @@ NXT_EXPORT void nxt_event_conn_timer(nxt_event_engine_t *engine,
NXT_EXPORT void nxt_event_conn_work_queue_set(nxt_event_conn_t *c,
nxt_work_queue_t *wq);
-NXT_EXPORT void nxt_event_conn_connect(nxt_thread_t *thr, nxt_event_conn_t *c);
-void nxt_event_conn_batch_socket(nxt_thread_t *thr, void *obj,
- void *data);
-void nxt_event_conn_io_connect(nxt_thread_t *thr, void *obj,
- void *data);
-nxt_int_t nxt_event_conn_socket(nxt_thread_t *thr,
- nxt_event_conn_t *c);
-void nxt_event_conn_connect_test(nxt_thread_t *thr, void *obj,
- void *data);
-void nxt_event_conn_connect_error(nxt_thread_t *thr, void *obj,
- void *data);
+NXT_EXPORT void nxt_event_conn_connect(nxt_task_t *task, nxt_event_conn_t *c);
+void nxt_event_conn_batch_socket(nxt_task_t *task, void *obj, void *data);
+void nxt_event_conn_io_connect(nxt_task_t *task, void *obj, void *data);
+nxt_int_t nxt_event_conn_socket(nxt_task_t *task, nxt_event_conn_t *c);
+void nxt_event_conn_connect_test(nxt_task_t *task, void *obj, void *data);
+void nxt_event_conn_connect_error(nxt_task_t *task, void *obj, void *data);
-NXT_EXPORT nxt_int_t nxt_event_conn_listen(nxt_thread_t *thr,
+NXT_EXPORT nxt_int_t nxt_event_conn_listen(nxt_task_t *task,
nxt_listen_socket_t *ls);
-void nxt_event_conn_io_accept(nxt_thread_t *thr, void *obj,
- void *data);
-NXT_EXPORT void nxt_event_conn_accept(nxt_thread_t *thr,
+void nxt_event_conn_io_accept(nxt_task_t *task, void *obj, void *data);
+NXT_EXPORT void nxt_event_conn_accept(nxt_task_t *task,
nxt_event_conn_listen_t *cls, nxt_event_conn_t *c);
-void nxt_event_conn_accept_error(nxt_thread_t *thr,
- nxt_event_conn_listen_t *cls, const char *accept_syscall, nxt_err_t err);
+void nxt_event_conn_accept_error(nxt_task_t *task, nxt_event_conn_listen_t *cls,
+ const char *accept_syscall, nxt_err_t err);
-NXT_EXPORT void nxt_event_conn_read(nxt_thread_t *thr, nxt_event_conn_t *c);
-void nxt_event_conn_io_read(nxt_thread_t *thr, void *obj,
- void *data);
+NXT_EXPORT void nxt_event_conn_read(nxt_task_t *task, nxt_event_conn_t *c);
+void nxt_event_conn_io_read(nxt_task_t *task, void *obj, void *data);
ssize_t nxt_event_conn_io_recvbuf(nxt_event_conn_t *c, nxt_buf_t *b);
ssize_t nxt_event_conn_io_recv(nxt_event_conn_t *c, void *buf,
size_t size, nxt_uint_t flags);
-NXT_EXPORT void nxt_event_conn_write(nxt_thread_t *thr, nxt_event_conn_t *c);
+NXT_EXPORT void nxt_event_conn_write(nxt_task_t *task, nxt_event_conn_t *c);
size_t nxt_event_conn_write_limit(nxt_event_conn_t *c);
nxt_bool_t nxt_event_conn_write_delayed(nxt_event_engine_t *engine,
nxt_event_conn_t *c, size_t sent);
-void nxt_event_conn_io_write(nxt_thread_t *thr, void *obj,
- void *data);
-ssize_t nxt_event_conn_io_write_chunk(nxt_thread_t *thr,
- nxt_event_conn_t *c, nxt_buf_t *b, size_t limit);
-ssize_t nxt_event_conn_io_writev(nxt_event_conn_t *c,
- nxt_iobuf_t *iob, nxt_uint_t niob);
-ssize_t nxt_event_conn_io_send(nxt_event_conn_t *c, void *buf,
- size_t size);
-
-NXT_EXPORT void nxt_event_conn_job_sendfile(nxt_thread_t *thr,
+void nxt_event_conn_io_write(nxt_task_t *task, void *obj, void *data);
+ssize_t nxt_event_conn_io_write_chunk(nxt_event_conn_t *c, nxt_buf_t *b,
+ size_t limit);
+ssize_t nxt_event_conn_io_writev(nxt_event_conn_t *c, nxt_iobuf_t *iob,
+ nxt_uint_t niob);
+ssize_t nxt_event_conn_io_send(nxt_event_conn_t *c, void *buf, size_t size);
+
+NXT_EXPORT void nxt_event_conn_job_sendfile(nxt_task_t *task,
nxt_event_conn_t *c);
#define \
-nxt_event_conn_connect_enqueue(thr, c) \
+nxt_event_conn_connect_enqueue(thr, task, c) \
nxt_thread_work_queue_add(thr, &thr->engine->socket_work_queue, \
nxt_event_conn_batch_socket, \
- c, c->socket.data, c->socket.log)
+ task, c, c->socket.data)
#define \
-nxt_event_conn_read_enqueue(thr, c) \
+nxt_event_conn_read_enqueue(thr, task, c) \
do { \
c->socket.read_work_queue = &thr->engine->read_work_queue; \
\
nxt_thread_work_queue_add(thr, &thr->engine->read_work_queue, \
- c->io->read, c, c->socket.data, \
- c->socket.log); \
+ c->io->read, task, c, c->socket.data); \
} while (0)
#define \
-nxt_event_conn_write_enqueue(thr, c) \
+nxt_event_conn_write_enqueue(thr, task, c) \
do { \
c->socket.write_work_queue = &thr->engine->write_work_queue; \
\
nxt_thread_work_queue_add(thr, &thr->engine->write_work_queue, \
- c->io->write, c, c->socket.data, \
- c->socket.log); \
+ c->io->write, task, c, c->socket.data); \
} while (0)
@@ -376,7 +359,8 @@ typedef struct {
NXT_EXPORT nxt_event_conn_proxy_t *nxt_event_conn_proxy_create(
nxt_event_conn_t *c);
-NXT_EXPORT void nxt_event_conn_proxy(nxt_event_conn_proxy_t *p);
+NXT_EXPORT void nxt_event_conn_proxy(nxt_task_t *task,
+ nxt_event_conn_proxy_t *p);
#endif /* _NXT_EVENT_CONN_H_INCLUDED_ */