diff options
Diffstat (limited to 'src/nxt_event_conn.h')
-rw-r--r-- | src/nxt_event_conn.h | 108 |
1 files changed, 46 insertions, 62 deletions
diff --git a/src/nxt_event_conn.h b/src/nxt_event_conn.h index fe7d794a..c35dcf42 100644 --- a/src/nxt_event_conn.h +++ b/src/nxt_event_conn.h @@ -45,11 +45,8 @@ typedef struct { typedef struct { - void (*connect)(nxt_thread_t *thr, void *obj, - void *data); - - void (*accept)(nxt_thread_t *thr, void *obj, - void *data); + nxt_work_handler_t connect; + nxt_work_handler_t accept; /* * The read() with NULL c->read buffer waits readiness of a connection @@ -59,8 +56,7 @@ typedef struct { * connection without errors. In the latter case state's close_handler * is called. */ - void (*read)(nxt_thread_t *thr, void *obj, - void *data); + nxt_work_handler_t read; ssize_t (*recvbuf)(nxt_event_conn_t *c, nxt_buf_t *b); @@ -71,8 +67,7 @@ typedef struct { * The write() is an interface to write a buffer chain with a given rate * limit. It calls write_chunk() in a cycle and handles write event timer. */ - void (*write)(nxt_thread_t *thr, void *obj, - void *data); + nxt_work_handler_t write; /* * The write_chunk() interface writes a buffer chain with a given limit @@ -80,9 +75,8 @@ typedef struct { * buffers data and calls the library specific send() interface to write * the buffered data eventually. */ - ssize_t (*write_chunk)(nxt_thread_t *thr, - nxt_event_conn_t *c, nxt_buf_t *b, - size_t limit); + ssize_t (*write_chunk)(nxt_event_conn_t *c, + nxt_buf_t *b, size_t limit); /* * The sendbuf() is an interface for OS-specific sendfile @@ -102,8 +96,7 @@ typedef struct { ssize_t (*send)(nxt_event_conn_t *c, void *buf, size_t size); - void (*shutdown)(nxt_thread_t *thr, void *obj, - void *data); + nxt_work_handler_t shutdown; } nxt_event_conn_io_t; @@ -147,6 +140,7 @@ struct nxt_event_conn_s { nxt_mem_pool_t *mem_pool; + nxt_task_t task; nxt_log_t log; nxt_listen_socket_t *listen; @@ -178,12 +172,13 @@ typedef struct { /* Must be the first field. */ nxt_event_fd_t socket; + nxt_task_t task; + uint32_t ready; uint32_t batch; /* An accept() interface is cached to minimize memory accesses. */ - void (*accept)(nxt_thread_t *thr, void *obj, - void *data); + nxt_work_handler_t accept; nxt_listen_socket_t *listen; @@ -194,13 +189,13 @@ typedef struct { #define \ -nxt_event_conn_io_handle(thr, wq, handler, c, data) \ +nxt_event_conn_io_handle(thr, wq, handler, task, c, data) \ do { \ if (thr->engine->batch != 0) { \ - nxt_thread_work_queue_add(thr, wq, handler, c, data, thr->log); \ + nxt_thread_work_queue_add(thr, wq, handler, task, c, data); \ \ } else { \ - handler(thr, c, data); \ + handler(task, c, data); \ } \ } while (0) @@ -259,9 +254,8 @@ nxt_event_conn_tcp_nodelay_on(c) \ NXT_EXPORT nxt_event_conn_t *nxt_event_conn_create(nxt_mem_pool_t *mp, nxt_log_t *log); -void nxt_event_conn_io_shutdown(nxt_thread_t *thr, void *obj, - void *data); -NXT_EXPORT void nxt_event_conn_close(nxt_thread_t *thr, nxt_event_conn_t *c); +void nxt_event_conn_io_shutdown(nxt_task_t *task, void *obj, void *data); +NXT_EXPORT void nxt_event_conn_close(nxt_task_t *task, nxt_event_conn_t *c); NXT_EXPORT void nxt_event_conn_timer(nxt_event_engine_t *engine, nxt_event_conn_t *c, const nxt_event_conn_state_t *state, @@ -269,77 +263,66 @@ NXT_EXPORT void nxt_event_conn_timer(nxt_event_engine_t *engine, NXT_EXPORT void nxt_event_conn_work_queue_set(nxt_event_conn_t *c, nxt_work_queue_t *wq); -NXT_EXPORT void nxt_event_conn_connect(nxt_thread_t *thr, nxt_event_conn_t *c); -void nxt_event_conn_batch_socket(nxt_thread_t *thr, void *obj, - void *data); -void nxt_event_conn_io_connect(nxt_thread_t *thr, void *obj, - void *data); -nxt_int_t nxt_event_conn_socket(nxt_thread_t *thr, - nxt_event_conn_t *c); -void nxt_event_conn_connect_test(nxt_thread_t *thr, void *obj, - void *data); -void nxt_event_conn_connect_error(nxt_thread_t *thr, void *obj, - void *data); +NXT_EXPORT void nxt_event_conn_connect(nxt_task_t *task, nxt_event_conn_t *c); +void nxt_event_conn_batch_socket(nxt_task_t *task, void *obj, void *data); +void nxt_event_conn_io_connect(nxt_task_t *task, void *obj, void *data); +nxt_int_t nxt_event_conn_socket(nxt_task_t *task, nxt_event_conn_t *c); +void nxt_event_conn_connect_test(nxt_task_t *task, void *obj, void *data); +void nxt_event_conn_connect_error(nxt_task_t *task, void *obj, void *data); -NXT_EXPORT nxt_int_t nxt_event_conn_listen(nxt_thread_t *thr, +NXT_EXPORT nxt_int_t nxt_event_conn_listen(nxt_task_t *task, nxt_listen_socket_t *ls); -void nxt_event_conn_io_accept(nxt_thread_t *thr, void *obj, - void *data); -NXT_EXPORT void nxt_event_conn_accept(nxt_thread_t *thr, +void nxt_event_conn_io_accept(nxt_task_t *task, void *obj, void *data); +NXT_EXPORT void nxt_event_conn_accept(nxt_task_t *task, nxt_event_conn_listen_t *cls, nxt_event_conn_t *c); -void nxt_event_conn_accept_error(nxt_thread_t *thr, - nxt_event_conn_listen_t *cls, const char *accept_syscall, nxt_err_t err); +void nxt_event_conn_accept_error(nxt_task_t *task, nxt_event_conn_listen_t *cls, + const char *accept_syscall, nxt_err_t err); -NXT_EXPORT void nxt_event_conn_read(nxt_thread_t *thr, nxt_event_conn_t *c); -void nxt_event_conn_io_read(nxt_thread_t *thr, void *obj, - void *data); +NXT_EXPORT void nxt_event_conn_read(nxt_task_t *task, nxt_event_conn_t *c); +void nxt_event_conn_io_read(nxt_task_t *task, void *obj, void *data); ssize_t nxt_event_conn_io_recvbuf(nxt_event_conn_t *c, nxt_buf_t *b); ssize_t nxt_event_conn_io_recv(nxt_event_conn_t *c, void *buf, size_t size, nxt_uint_t flags); -NXT_EXPORT void nxt_event_conn_write(nxt_thread_t *thr, nxt_event_conn_t *c); +NXT_EXPORT void nxt_event_conn_write(nxt_task_t *task, nxt_event_conn_t *c); size_t nxt_event_conn_write_limit(nxt_event_conn_t *c); nxt_bool_t nxt_event_conn_write_delayed(nxt_event_engine_t *engine, nxt_event_conn_t *c, size_t sent); -void nxt_event_conn_io_write(nxt_thread_t *thr, void *obj, - void *data); -ssize_t nxt_event_conn_io_write_chunk(nxt_thread_t *thr, - nxt_event_conn_t *c, nxt_buf_t *b, size_t limit); -ssize_t nxt_event_conn_io_writev(nxt_event_conn_t *c, - nxt_iobuf_t *iob, nxt_uint_t niob); -ssize_t nxt_event_conn_io_send(nxt_event_conn_t *c, void *buf, - size_t size); - -NXT_EXPORT void nxt_event_conn_job_sendfile(nxt_thread_t *thr, +void nxt_event_conn_io_write(nxt_task_t *task, void *obj, void *data); +ssize_t nxt_event_conn_io_write_chunk(nxt_event_conn_t *c, nxt_buf_t *b, + size_t limit); +ssize_t nxt_event_conn_io_writev(nxt_event_conn_t *c, nxt_iobuf_t *iob, + nxt_uint_t niob); +ssize_t nxt_event_conn_io_send(nxt_event_conn_t *c, void *buf, size_t size); + +NXT_EXPORT void nxt_event_conn_job_sendfile(nxt_task_t *task, nxt_event_conn_t *c); #define \ -nxt_event_conn_connect_enqueue(thr, c) \ +nxt_event_conn_connect_enqueue(thr, task, c) \ nxt_thread_work_queue_add(thr, &thr->engine->socket_work_queue, \ nxt_event_conn_batch_socket, \ - c, c->socket.data, c->socket.log) + task, c, c->socket.data) #define \ -nxt_event_conn_read_enqueue(thr, c) \ +nxt_event_conn_read_enqueue(thr, task, c) \ do { \ c->socket.read_work_queue = &thr->engine->read_work_queue; \ \ nxt_thread_work_queue_add(thr, &thr->engine->read_work_queue, \ - c->io->read, c, c->socket.data, \ - c->socket.log); \ + c->io->read, task, c, c->socket.data); \ } while (0) #define \ -nxt_event_conn_write_enqueue(thr, c) \ +nxt_event_conn_write_enqueue(thr, task, c) \ do { \ c->socket.write_work_queue = &thr->engine->write_work_queue; \ \ nxt_thread_work_queue_add(thr, &thr->engine->write_work_queue, \ - c->io->write, c, c->socket.data, \ - c->socket.log); \ + c->io->write, task, c, c->socket.data); \ } while (0) @@ -376,7 +359,8 @@ typedef struct { NXT_EXPORT nxt_event_conn_proxy_t *nxt_event_conn_proxy_create( nxt_event_conn_t *c); -NXT_EXPORT void nxt_event_conn_proxy(nxt_event_conn_proxy_t *p); +NXT_EXPORT void nxt_event_conn_proxy(nxt_task_t *task, + nxt_event_conn_proxy_t *p); #endif /* _NXT_EVENT_CONN_H_INCLUDED_ */ |