summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_event_conn_accept.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/nxt_event_conn_accept.c')
-rw-r--r--src/nxt_event_conn_accept.c72
1 files changed, 60 insertions, 12 deletions
diff --git a/src/nxt_event_conn_accept.c b/src/nxt_event_conn_accept.c
index 5f0a72f0..ced2a3d8 100644
--- a/src/nxt_event_conn_accept.c
+++ b/src/nxt_event_conn_accept.c
@@ -32,6 +32,53 @@ static void nxt_event_conn_listen_timer_handler(nxt_task_t *task, void *obj,
void *data);
+nxt_event_conn_listen_t *
+nxt_listen_event(nxt_task_t *task, nxt_listen_socket_t *ls)
+{
+ nxt_event_engine_t *engine;
+ nxt_event_conn_listen_t *cls;
+
+ cls = nxt_zalloc(sizeof(nxt_event_conn_listen_t));
+
+ if (nxt_fast_path(cls != NULL)) {
+ cls->socket.fd = ls->socket;
+
+ engine = task->thread->engine;
+ cls->batch = engine->batch;
+
+ cls->socket.read_work_queue = &engine->accept_work_queue;
+ cls->socket.read_handler = nxt_event_conn_listen_handler;
+ cls->socket.error_handler = nxt_event_conn_listen_event_error;
+ cls->socket.log = &nxt_main_log;
+
+ cls->accept = engine->event.io->accept;
+
+ cls->listen = ls;
+ cls->work_queue = &engine->read_work_queue;
+
+ cls->timer.work_queue = &engine->fast_work_queue;
+ cls->timer.handler = nxt_event_conn_listen_timer_handler;
+ cls->timer.log = &nxt_main_log;
+
+ cls->task.thread = task->thread;
+ cls->task.log = &nxt_main_log;
+ cls->task.ident = nxt_task_next_ident();
+ cls->socket.task = &cls->task;
+ cls->timer.task = &cls->task;
+
+ if (nxt_event_conn_accept_alloc(task, cls) != NULL) {
+ nxt_fd_event_enable_accept(engine, &cls->socket);
+
+ nxt_queue_insert_head(&engine->listen_connections, &cls->link);
+ }
+
+ return cls;
+ }
+
+ return NULL;
+}
+
+
nxt_int_t
nxt_event_conn_listen(nxt_task_t *task, nxt_listen_socket_t *ls)
{
@@ -97,13 +144,12 @@ nxt_event_conn_accept_alloc(nxt_task_t *task, nxt_event_conn_listen_t *cls)
/* This allocation cannot fail. */
c = nxt_event_conn_create(mp, cls->socket.task);
- cls->socket.data = c;
+ cls->next = c;
c->socket.read_work_queue = cls->socket.read_work_queue;
c->socket.write_ready = 1;
+ c->listen = cls;
ls = cls->listen;
- c->listen = ls;
-
/* This allocation cannot fail. */
remote = nxt_sockaddr_alloc(mp, ls->socklen, ls->address_length);
c->remote = remote;
@@ -146,7 +192,7 @@ nxt_event_conn_io_accept(nxt_task_t *task, void *obj, void *data)
nxt_event_conn_listen_t *cls;
cls = obj;
- c = data;
+ c = cls->next;
cls->ready--;
cls->socket.read_ready = (cls->ready != 0);
@@ -200,17 +246,19 @@ nxt_event_conn_accept(nxt_task_t *task, nxt_event_conn_listen_t *cls,
nxt_queue_insert_head(&task->thread->engine->idle_connections, &c->link);
- c->read_work_queue = c->listen->work_queue;
- c->write_work_queue = c->listen->work_queue;
+ c->read_work_queue = cls->work_queue;
+ c->write_work_queue = cls->work_queue;
- if (c->listen->read_after_accept) {
+ if (cls->listen->read_after_accept) {
//c->socket.read_ready = 1;
- c->listen->handler(task, c, NULL);
+// cls->listen->handler(task, c, cls->socket.data);
+ nxt_work_queue_add(c->read_work_queue, cls->listen->handler,
+ task, c, cls->socket.data);
} else {
- nxt_work_queue_add(c->write_work_queue, c->listen->handler,
- task, c, NULL);
+ nxt_work_queue_add(c->write_work_queue, cls->listen->handler,
+ task, c, cls->socket.data);
}
next = nxt_event_conn_accept_next(task, cls);
@@ -227,7 +275,7 @@ nxt_event_conn_accept_next(nxt_task_t *task, nxt_event_conn_listen_t *cls)
{
nxt_event_conn_t *c;
- cls->socket.data = NULL;
+ cls->next = NULL;
do {
c = nxt_event_conn_accept_alloc(task, cls);
@@ -338,7 +386,7 @@ nxt_event_conn_listen_timer_handler(nxt_task_t *task, void *obj, void *data)
ev = obj;
cls = nxt_timer_data(ev, nxt_event_conn_listen_t, timer);
- c = cls->socket.data;
+ c = cls->next;
if (c == NULL) {
c = nxt_event_conn_accept_next(task, cls);