summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_unit.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/nxt_unit.c')
-rw-r--r--src/nxt_unit.c3179
1 files changed, 2247 insertions, 932 deletions
diff --git a/src/nxt_unit.c b/src/nxt_unit.c
index 9f6eab95..6b7d631d 100644
--- a/src/nxt_unit.c
+++ b/src/nxt_unit.c
@@ -7,6 +7,8 @@
#include "nxt_main.h"
#include "nxt_port_memory_int.h"
+#include "nxt_port_queue.h"
+#include "nxt_app_queue.h"
#include "nxt_unit.h"
#include "nxt_unit_request.h"
@@ -38,20 +40,30 @@ typedef struct nxt_unit_websocket_frame_impl_s nxt_unit_websocket_frame_impl_t;
static nxt_unit_impl_t *nxt_unit_create(nxt_unit_init_t *init);
static int nxt_unit_ctx_init(nxt_unit_impl_t *lib,
nxt_unit_ctx_impl_t *ctx_impl, void *data);
+nxt_inline void nxt_unit_ctx_use(nxt_unit_ctx_t *ctx);
+nxt_inline void nxt_unit_ctx_release(nxt_unit_ctx_t *ctx);
+nxt_inline void nxt_unit_lib_use(nxt_unit_impl_t *lib);
+nxt_inline void nxt_unit_lib_release(nxt_unit_impl_t *lib);
nxt_inline void nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
nxt_unit_mmap_buf_t *mmap_buf);
nxt_inline void nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev,
nxt_unit_mmap_buf_t *mmap_buf);
nxt_inline void nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf);
static int nxt_unit_read_env(nxt_unit_port_t *ready_port,
- nxt_unit_port_t *read_port, int *log_fd, uint32_t *stream,
- uint32_t *shm_limit);
-static int nxt_unit_ready(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
- uint32_t stream);
+ nxt_unit_port_t *router_port, nxt_unit_port_t *read_port,
+ int *log_fd, uint32_t *stream, uint32_t *shm_limit);
+static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream,
+ int queue_fd);
+static int nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf);
static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx,
nxt_unit_recv_msg_t *recv_msg);
static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx,
nxt_unit_recv_msg_t *recv_msg);
+static int nxt_unit_process_req_body(nxt_unit_ctx_t *ctx,
+ nxt_unit_recv_msg_t *recv_msg);
+static int nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
+ nxt_unit_port_id_t *port_id);
+static int nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req);
static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx,
nxt_unit_recv_msg_t *recv_msg);
static int nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx);
@@ -63,11 +75,9 @@ static nxt_unit_websocket_frame_impl_t *nxt_unit_websocket_frame_get(
nxt_unit_ctx_t *ctx);
static void nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws);
static void nxt_unit_websocket_frame_free(nxt_unit_websocket_frame_impl_t *ws);
-static nxt_unit_process_t *nxt_unit_msg_get_process(nxt_unit_ctx_t *ctx,
- nxt_unit_recv_msg_t *recv_msg);
static nxt_unit_mmap_buf_t *nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx);
static void nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf);
-static int nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream,
+static int nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req,
nxt_unit_mmap_buf_t *mmap_buf, int last);
static void nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf);
static void nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf);
@@ -81,70 +91,97 @@ static nxt_unit_mmap_buf_t *nxt_unit_request_preread(
static ssize_t nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst,
size_t size);
static nxt_port_mmap_header_t *nxt_unit_mmap_get(nxt_unit_ctx_t *ctx,
- nxt_unit_process_t *process, nxt_unit_port_id_t *port_id,
- nxt_chunk_id_t *c, int *n, int min_n);
-static int nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id);
+ nxt_unit_port_t *port, nxt_chunk_id_t *c, int *n, int min_n);
+static int nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port);
static int nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx);
static nxt_unit_mmap_t *nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i);
static nxt_port_mmap_header_t *nxt_unit_new_mmap(nxt_unit_ctx_t *ctx,
- nxt_unit_process_t *process, nxt_unit_port_id_t *port_id, int n);
-static int nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
+ nxt_unit_port_t *port, int n);
+static int nxt_unit_shm_open(nxt_unit_ctx_t *ctx, size_t size);
+static int nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
int fd);
static int nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx,
- nxt_unit_process_t *process, nxt_unit_port_id_t *port_id, uint32_t size,
+ nxt_unit_port_t *port, uint32_t size,
uint32_t min_size, nxt_unit_mmap_buf_t *mmap_buf, char *local_buf);
static int nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd);
static void nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps);
-static void nxt_unit_process_use(nxt_unit_ctx_t *ctx,
- nxt_unit_process_t *process, int i);
+nxt_inline void nxt_unit_process_use(nxt_unit_process_t *process);
+nxt_inline void nxt_unit_process_release(nxt_unit_process_t *process);
static void nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps);
-static nxt_port_mmap_header_t *nxt_unit_get_incoming_mmap(nxt_unit_ctx_t *ctx,
- nxt_unit_process_t *process, uint32_t id);
-static int nxt_unit_tracking_read(nxt_unit_ctx_t *ctx,
- nxt_unit_recv_msg_t *recv_msg);
+static int nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t *ctx,
+ nxt_unit_mmaps_t *mmaps, pid_t pid, uint32_t id,
+ nxt_port_mmap_header_t **hdr, nxt_unit_read_buf_t *rbuf);
static int nxt_unit_mmap_read(nxt_unit_ctx_t *ctx,
- nxt_unit_recv_msg_t *recv_msg);
+ nxt_unit_recv_msg_t *recv_msg, nxt_unit_read_buf_t *rbuf);
+static int nxt_unit_get_mmap(nxt_unit_ctx_t *ctx, pid_t pid, uint32_t id);
static void nxt_unit_mmap_release(nxt_unit_ctx_t *ctx,
- nxt_unit_process_t *process,
nxt_port_mmap_header_t *hdr, void *start, uint32_t size);
static int nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid);
-static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_ctx_t *ctx,
+static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_impl_t *lib,
pid_t pid);
-static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_ctx_t *ctx,
+static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_impl_t *lib,
pid_t pid, int remove);
static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib);
-static void nxt_unit_read_buf(nxt_unit_ctx_t *ctx,
- nxt_unit_read_buf_t *rbuf);
-static int nxt_unit_create_port(nxt_unit_ctx_t *ctx,
- nxt_unit_port_id_t *port_id, int *fd);
-
-static int nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *dst,
- nxt_unit_port_id_t *new_port, int fd);
-
-static void nxt_unit_remove_port_unsafe(nxt_unit_ctx_t *ctx,
- nxt_unit_port_id_t *port_id, nxt_unit_port_t *r_port,
- nxt_unit_process_t **process);
-static void nxt_unit_remove_process(nxt_unit_ctx_t *ctx,
+static int nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx);
+static int nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf);
+static int nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx);
+static void nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx);
+nxt_inline int nxt_unit_is_read_queue(nxt_unit_read_buf_t *rbuf);
+nxt_inline int nxt_unit_is_read_socket(nxt_unit_read_buf_t *rbuf);
+nxt_inline int nxt_unit_is_shm_ack(nxt_unit_read_buf_t *rbuf);
+nxt_inline int nxt_unit_is_quit(nxt_unit_read_buf_t *rbuf);
+static int nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx,
+ nxt_unit_port_t *port);
+static void nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl);
+static nxt_unit_port_t *nxt_unit_create_port(nxt_unit_ctx_t *ctx);
+
+static int nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst,
+ nxt_unit_port_t *port, int queue_fd);
+
+nxt_inline void nxt_unit_port_use(nxt_unit_port_t *port);
+nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port);
+static nxt_unit_port_t *nxt_unit_add_port(nxt_unit_ctx_t *ctx,
+ nxt_unit_port_t *port, void *queue);
+static void nxt_unit_remove_port(nxt_unit_impl_t *lib,
+ nxt_unit_port_id_t *port_id);
+static nxt_unit_port_t *nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib,
+ nxt_unit_port_id_t *port_id);
+static void nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid);
+static void nxt_unit_remove_process(nxt_unit_impl_t *lib,
nxt_unit_process_t *process);
-
-static ssize_t nxt_unit_port_send_default(nxt_unit_ctx_t *ctx,
- nxt_unit_port_id_t *port_id, const void *buf, size_t buf_size,
+static void nxt_unit_quit(nxt_unit_ctx_t *ctx);
+static int nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id);
+static ssize_t nxt_unit_port_send(nxt_unit_ctx_t *ctx,
+ nxt_unit_port_t *port, const void *buf, size_t buf_size,
const void *oob, size_t oob_size);
-static ssize_t nxt_unit_port_recv_default(nxt_unit_ctx_t *ctx,
- nxt_unit_port_id_t *port_id, void *buf, size_t buf_size,
- void *oob, size_t oob_size);
+static ssize_t nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd,
+ const void *buf, size_t buf_size, const void *oob, size_t oob_size);
+static int nxt_unit_ctx_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
+ nxt_unit_read_buf_t *rbuf);
+nxt_inline void nxt_unit_rbuf_cpy(nxt_unit_read_buf_t *dst,
+ nxt_unit_read_buf_t *src);
+static int nxt_unit_shared_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
+ nxt_unit_read_buf_t *rbuf);
+static int nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
+ nxt_unit_read_buf_t *rbuf);
+static int nxt_unit_port_queue_recv(nxt_unit_port_t *port,
+ nxt_unit_read_buf_t *rbuf);
+static int nxt_unit_app_queue_recv(nxt_unit_port_t *port,
+ nxt_unit_read_buf_t *rbuf);
+nxt_inline int nxt_unit_close(int fd);
+static int nxt_unit_fd_blocking(int fd);
static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash,
nxt_unit_port_t *port);
-static nxt_unit_port_impl_t *nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash,
+static nxt_unit_port_t *nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash,
nxt_unit_port_id_t *port_id, int remove);
-static int nxt_unit_request_hash_add(nxt_lvlhsh_t *request_hash,
- nxt_unit_request_info_impl_t *req_impl);
-static nxt_unit_request_info_impl_t *nxt_unit_request_hash_find(
- nxt_lvlhsh_t *request_hash, uint32_t stream, int remove);
+static int nxt_unit_request_hash_add(nxt_unit_ctx_t *ctx,
+ nxt_unit_request_info_t *req);
+static nxt_unit_request_info_t *nxt_unit_request_hash_find(
+ nxt_unit_ctx_t *ctx, uint32_t stream, int remove);
static char * nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level);
@@ -156,10 +193,8 @@ struct nxt_unit_mmap_buf_s {
nxt_unit_mmap_buf_t **prev;
nxt_port_mmap_header_t *hdr;
- nxt_unit_port_id_t port_id;
nxt_unit_request_info_t *req;
nxt_unit_ctx_impl_t *ctx_impl;
- nxt_unit_process_t *process;
char *free_ptr;
char *plain_ptr;
};
@@ -176,8 +211,7 @@ struct nxt_unit_recv_msg_s {
void *start;
uint32_t size;
- int fd;
- nxt_unit_process_t *process;
+ int fd[2];
nxt_unit_mmap_buf_t *incoming_buf;
};
@@ -197,15 +231,17 @@ struct nxt_unit_request_info_impl_s {
uint32_t stream;
- nxt_unit_process_t *process;
-
nxt_unit_mmap_buf_t *outgoing_buf;
nxt_unit_mmap_buf_t *incoming_buf;
nxt_unit_req_state_t state;
uint8_t websocket;
+ uint8_t in_hash;
+ /* for nxt_unit_ctx_impl_t.free_req or active_req */
nxt_queue_link_t link;
+ /* for nxt_unit_port_impl_t.awaiting_req */
+ nxt_queue_link_t port_wait_link;
char extra_data[];
};
@@ -223,7 +259,8 @@ struct nxt_unit_websocket_frame_impl_s {
struct nxt_unit_read_buf_s {
- nxt_unit_read_buf_t *next;
+ nxt_queue_link_t link;
+ nxt_unit_ctx_impl_t *ctx_impl;
ssize_t size;
char buf[16384];
char oob[256];
@@ -233,10 +270,12 @@ struct nxt_unit_read_buf_s {
struct nxt_unit_ctx_impl_s {
nxt_unit_ctx_t ctx;
+ nxt_atomic_t use_count;
+ nxt_atomic_t wait_items;
+
pthread_mutex_t mutex;
- nxt_unit_port_id_t read_port_id;
- int read_port_fd;
+ nxt_unit_port_t *read_port;
nxt_queue_link_t link;
@@ -254,9 +293,14 @@ struct nxt_unit_ctx_impl_s {
/* of nxt_unit_request_info_impl_t */
nxt_lvlhsh_t requests;
- nxt_unit_read_buf_t *pending_read_head;
- nxt_unit_read_buf_t **pending_read_tail;
- nxt_unit_read_buf_t *free_read_buf;
+ /* of nxt_unit_request_info_impl_t */
+ nxt_queue_t ready_req;
+
+ /* of nxt_unit_read_buf_t */
+ nxt_queue_t pending_rbuf;
+
+ /* of nxt_unit_read_buf_t */
+ nxt_queue_t free_rbuf;
nxt_unit_mmap_buf_t ctx_buf[2];
nxt_unit_read_buf_t ctx_read_buf;
@@ -265,10 +309,29 @@ struct nxt_unit_ctx_impl_s {
};
+struct nxt_unit_mmap_s {
+ nxt_port_mmap_header_t *hdr;
+
+ /* of nxt_unit_read_buf_t */
+ nxt_queue_t awaiting_rbuf;
+};
+
+
+struct nxt_unit_mmaps_s {
+ pthread_mutex_t mutex;
+ uint32_t size;
+ uint32_t cap;
+ nxt_atomic_t allocated_chunks;
+ nxt_unit_mmap_t *elts;
+};
+
+
struct nxt_unit_impl_s {
nxt_unit_t unit;
nxt_unit_callbacks_t callbacks;
+ nxt_atomic_t use_count;
+
uint32_t request_data_size;
uint32_t shm_mmap_limit;
@@ -277,10 +340,14 @@ struct nxt_unit_impl_s {
nxt_lvlhsh_t processes; /* of nxt_unit_process_t */
nxt_lvlhsh_t ports; /* of nxt_unit_port_impl_t */
- nxt_unit_port_id_t ready_port_id;
+ nxt_unit_port_t *router_port;
+ nxt_unit_port_t *shared_port;
nxt_queue_t contexts; /* of nxt_unit_ctx_impl_t */
+ nxt_unit_mmaps_t incoming;
+ nxt_unit_mmaps_t outgoing;
+
pid_t pid;
int log_fd;
int online;
@@ -292,32 +359,28 @@ struct nxt_unit_impl_s {
struct nxt_unit_port_impl_s {
nxt_unit_port_t port;
+ nxt_atomic_t use_count;
+
+ /* for nxt_unit_process_t.ports */
nxt_queue_link_t link;
nxt_unit_process_t *process;
-};
+ /* of nxt_unit_request_info_impl_t */
+ nxt_queue_t awaiting_req;
-struct nxt_unit_mmap_s {
- nxt_port_mmap_header_t *hdr;
-};
+ int ready;
+ void *queue;
-struct nxt_unit_mmaps_s {
- pthread_mutex_t mutex;
- uint32_t size;
- uint32_t cap;
- nxt_atomic_t allocated_chunks;
- nxt_unit_mmap_t *elts;
+ int from_socket;
+ nxt_unit_read_buf_t *socket_rbuf;
};
struct nxt_unit_process_s {
pid_t pid;
- nxt_queue_t ports;
-
- nxt_unit_mmaps_t incoming;
- nxt_unit_mmaps_t outgoing;
+ nxt_queue_t ports; /* of nxt_unit_port_impl_t */
nxt_unit_impl_t *lib;
@@ -337,34 +400,41 @@ typedef struct {
nxt_unit_ctx_t *
nxt_unit_init(nxt_unit_init_t *init)
{
- int rc;
+ int rc, queue_fd;
+ void *mem;
uint32_t ready_stream, shm_limit;
nxt_unit_ctx_t *ctx;
nxt_unit_impl_t *lib;
- nxt_unit_port_t ready_port, read_port;
+ nxt_unit_port_t ready_port, router_port, read_port;
lib = nxt_unit_create(init);
if (nxt_slow_path(lib == NULL)) {
return NULL;
}
+ queue_fd = -1;
+ mem = MAP_FAILED;
+
if (init->ready_port.id.pid != 0
&& init->ready_stream != 0
&& init->read_port.id.pid != 0)
{
ready_port = init->ready_port;
ready_stream = init->ready_stream;
+ router_port = init->router_port;
read_port = init->read_port;
lib->log_fd = init->log_fd;
nxt_unit_port_id_init(&ready_port.id, ready_port.id.pid,
ready_port.id.id);
+ nxt_unit_port_id_init(&router_port.id, router_port.id.pid,
+ router_port.id.id);
nxt_unit_port_id_init(&read_port.id, read_port.id.pid,
read_port.id.id);
} else {
- rc = nxt_unit_read_env(&ready_port, &read_port, &lib->log_fd,
- &ready_stream, &shm_limit);
+ rc = nxt_unit_read_env(&ready_port, &router_port, &read_port,
+ &lib->log_fd, &ready_stream, &shm_limit);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
goto fail;
}
@@ -378,37 +448,77 @@ nxt_unit_init(nxt_unit_init_t *init)
}
lib->pid = read_port.id.pid;
+
ctx = &lib->main_ctx.ctx;
- rc = lib->callbacks.add_port(ctx, &ready_port);
- if (rc != NXT_UNIT_OK) {
- nxt_unit_alert(NULL, "failed to add ready_port");
+ rc = nxt_unit_fd_blocking(router_port.out_fd);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ goto fail;
+ }
+
+ lib->router_port = nxt_unit_add_port(ctx, &router_port, NULL);
+ if (nxt_slow_path(lib->router_port == NULL)) {
+ nxt_unit_alert(NULL, "failed to add router_port");
+
+ goto fail;
+ }
+ queue_fd = nxt_unit_shm_open(ctx, sizeof(nxt_port_queue_t));
+ if (nxt_slow_path(queue_fd == -1)) {
goto fail;
}
- rc = lib->callbacks.add_port(ctx, &read_port);
+ mem = mmap(NULL, sizeof(nxt_port_queue_t),
+ PROT_READ | PROT_WRITE, MAP_SHARED, queue_fd, 0);
+ if (nxt_slow_path(mem == MAP_FAILED)) {
+ nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", queue_fd,
+ strerror(errno), errno);
+
+ goto fail;
+ }
+
+ nxt_port_queue_init(mem);
+
+ rc = nxt_unit_fd_blocking(read_port.in_fd);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ goto fail;
+ }
+
+ lib->main_ctx.read_port = nxt_unit_add_port(ctx, &read_port, mem);
+ if (nxt_slow_path(lib->main_ctx.read_port == NULL)) {
nxt_unit_alert(NULL, "failed to add read_port");
goto fail;
}
- lib->main_ctx.read_port_id = read_port.id;
- lib->ready_port_id = ready_port.id;
+ rc = nxt_unit_fd_blocking(ready_port.out_fd);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ goto fail;
+ }
- rc = nxt_unit_ready(ctx, &ready_port.id, ready_stream);
+ rc = nxt_unit_ready(ctx, ready_port.out_fd, ready_stream, queue_fd);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
nxt_unit_alert(NULL, "failed to send READY message");
goto fail;
}
+ nxt_unit_close(ready_port.out_fd);
+ nxt_unit_close(queue_fd);
+
return ctx;
fail:
- free(lib);
+ if (mem != MAP_FAILED) {
+ munmap(mem, sizeof(nxt_port_queue_t));
+ }
+
+ if (queue_fd != -1) {
+ nxt_unit_close(queue_fd);
+ }
+
+ nxt_unit_ctx_release(&lib->main_ctx.ctx);
return NULL;
}
@@ -450,8 +560,13 @@ nxt_unit_create(nxt_unit_init_t *init)
nxt_queue_init(&lib->contexts);
+ lib->use_count = 0;
+ lib->router_port = NULL;
+ lib->shared_port = NULL;
+
rc = nxt_unit_ctx_init(lib, &lib->main_ctx, init->ctx_data);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ pthread_mutex_destroy(&lib->mutex);
goto fail;
}
@@ -460,32 +575,12 @@ nxt_unit_create(nxt_unit_init_t *init)
if (cb->request_handler == NULL) {
nxt_unit_alert(NULL, "request_handler is NULL");
+ pthread_mutex_destroy(&lib->mutex);
goto fail;
}
- if (cb->add_port == NULL) {
- cb->add_port = nxt_unit_add_port;
- }
-
- if (cb->remove_port == NULL) {
- cb->remove_port = nxt_unit_remove_port;
- }
-
- if (cb->remove_pid == NULL) {
- cb->remove_pid = nxt_unit_remove_pid;
- }
-
- if (cb->quit == NULL) {
- cb->quit = nxt_unit_quit;
- }
-
- if (cb->port_send == NULL) {
- cb->port_send = nxt_unit_port_send_default;
- }
-
- if (cb->port_recv == NULL) {
- cb->port_recv = nxt_unit_port_recv_default;
- }
+ nxt_unit_mmaps_init(&lib->incoming);
+ nxt_unit_mmaps_init(&lib->outgoing);
return lib;
@@ -506,8 +601,6 @@ nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl,
ctx_impl->ctx.data = data;
ctx_impl->ctx.unit = &lib->unit;
- nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link);
-
rc = pthread_mutex_init(&ctx_impl->mutex, NULL);
if (nxt_slow_path(rc != 0)) {
nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc);
@@ -515,25 +608,33 @@ nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl,
return NXT_UNIT_ERROR;
}
+ nxt_unit_lib_use(lib);
+
+ nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link);
+
+ ctx_impl->use_count = 1;
+ ctx_impl->wait_items = 0;
+
nxt_queue_init(&ctx_impl->free_req);
nxt_queue_init(&ctx_impl->free_ws);
nxt_queue_init(&ctx_impl->active_req);
+ nxt_queue_init(&ctx_impl->ready_req);
+ nxt_queue_init(&ctx_impl->pending_rbuf);
+ nxt_queue_init(&ctx_impl->free_rbuf);
ctx_impl->free_buf = NULL;
nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[1]);
nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[0]);
nxt_queue_insert_tail(&ctx_impl->free_req, &ctx_impl->req.link);
+ nxt_queue_insert_tail(&ctx_impl->free_rbuf, &ctx_impl->ctx_read_buf.link);
- ctx_impl->pending_read_head = NULL;
- ctx_impl->pending_read_tail = &ctx_impl->pending_read_head;
- ctx_impl->free_read_buf = &ctx_impl->ctx_read_buf;
- ctx_impl->ctx_read_buf.next = NULL;
+ ctx_impl->ctx_read_buf.ctx_impl = ctx_impl;
ctx_impl->req.req.ctx = &ctx_impl->ctx;
ctx_impl->req.req.unit = &lib->unit;
- ctx_impl->read_port_fd = -1;
+ ctx_impl->read_port = NULL;
ctx_impl->requests.slot = 0;
return NXT_UNIT_OK;
@@ -541,6 +642,80 @@ nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl,
nxt_inline void
+nxt_unit_ctx_use(nxt_unit_ctx_t *ctx)
+{
+ nxt_unit_ctx_impl_t *ctx_impl;
+
+ ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
+
+ nxt_atomic_fetch_add(&ctx_impl->use_count, 1);
+}
+
+
+nxt_inline void
+nxt_unit_ctx_release(nxt_unit_ctx_t *ctx)
+{
+ long c;
+ nxt_unit_ctx_impl_t *ctx_impl;
+
+ ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
+
+ c = nxt_atomic_fetch_add(&ctx_impl->use_count, -1);
+
+ if (c == 1) {
+ nxt_unit_ctx_free(ctx_impl);
+ }
+}
+
+
+nxt_inline void
+nxt_unit_lib_use(nxt_unit_impl_t *lib)
+{
+ nxt_atomic_fetch_add(&lib->use_count, 1);
+}
+
+
+nxt_inline void
+nxt_unit_lib_release(nxt_unit_impl_t *lib)
+{
+ long c;
+ nxt_unit_process_t *process;
+
+ c = nxt_atomic_fetch_add(&lib->use_count, -1);
+
+ if (c == 1) {
+ for ( ;; ) {
+ pthread_mutex_lock(&lib->mutex);
+
+ process = nxt_unit_process_pop_first(lib);
+ if (process == NULL) {
+ pthread_mutex_unlock(&lib->mutex);
+
+ break;
+ }
+
+ nxt_unit_remove_process(lib, process);
+ }
+
+ pthread_mutex_destroy(&lib->mutex);
+
+ if (nxt_fast_path(lib->router_port != NULL)) {
+ nxt_unit_port_release(lib->router_port);
+ }
+
+ if (nxt_fast_path(lib->shared_port != NULL)) {
+ nxt_unit_port_release(lib->shared_port);
+ }
+
+ nxt_unit_mmaps_destroy(&lib->incoming);
+ nxt_unit_mmaps_destroy(&lib->outgoing);
+
+ free(lib);
+ }
+}
+
+
+nxt_inline void
nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
nxt_unit_mmap_buf_t *mmap_buf)
{
@@ -585,15 +760,16 @@ nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf)
static int
-nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *read_port,
- int *log_fd, uint32_t *stream, uint32_t *shm_limit)
+nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port,
+ nxt_unit_port_t *read_port, int *log_fd, uint32_t *stream,
+ uint32_t *shm_limit)
{
int rc;
- int ready_fd, read_fd;
+ int ready_fd, router_fd, read_fd;
char *unit_init, *version_end;
long version_length;
- int64_t ready_pid, read_pid;
- uint32_t ready_stream, ready_id, read_id;
+ int64_t ready_pid, router_pid, read_pid;
+ uint32_t ready_stream, router_id, ready_id, read_id;
unit_init = getenv(NXT_UNIT_INIT_ENV);
if (nxt_slow_path(unit_init == NULL)) {
@@ -621,13 +797,15 @@ nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *read_port,
"%"PRIu32";"
"%"PRId64",%"PRIu32",%d;"
"%"PRId64",%"PRIu32",%d;"
+ "%"PRId64",%"PRIu32",%d;"
"%d,%"PRIu32,
&ready_stream,
&ready_pid, &ready_id, &ready_fd,
+ &router_pid, &router_id, &router_fd,
&read_pid, &read_id, &read_fd,
log_fd, shm_limit);
- if (nxt_slow_path(rc != 9)) {
+ if (nxt_slow_path(rc != 12)) {
nxt_unit_alert(NULL, "failed to scan variables: %d", rc);
return NXT_UNIT_ERROR;
@@ -639,6 +817,12 @@ nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *read_port,
ready_port->out_fd = ready_fd;
ready_port->data = NULL;
+ nxt_unit_port_id_init(&router_port->id, (pid_t) router_pid, router_id);
+
+ router_port->in_fd = -1;
+ router_port->out_fd = router_fd;
+ router_port->data = NULL;
+
nxt_unit_port_id_init(&read_port->id, (pid_t) read_pid, read_id);
read_port->in_fd = read_fd;
@@ -652,13 +836,17 @@ nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *read_port,
static int
-nxt_unit_ready(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
- uint32_t stream)
+nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream, int queue_fd)
{
ssize_t res;
nxt_port_msg_t msg;
nxt_unit_impl_t *lib;
+ union {
+ struct cmsghdr cm;
+ char space[CMSG_SPACE(sizeof(int))];
+ } cmsg;
+
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
msg.stream = stream;
@@ -671,7 +859,25 @@ nxt_unit_ready(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
msg.mf = 0;
msg.tracking = 0;
- res = lib->callbacks.port_send(ctx, port_id, &msg, sizeof(msg), NULL, 0);
+ memset(&cmsg, 0, sizeof(cmsg));
+
+ cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int));
+ cmsg.cm.cmsg_level = SOL_SOCKET;
+ cmsg.cm.cmsg_type = SCM_RIGHTS;
+
+ /*
+ * memcpy() is used instead of simple
+ * *(int *) CMSG_DATA(&cmsg.cm) = fd;
+ * because GCC 4.4 with -O2/3/s optimization may issue a warning:
+ * dereferencing type-punned pointer will break strict-aliasing rules
+ *
+ * Fortunately, GCC with -O1 compiles this nxt_memcpy()
+ * in the same simple assignment as in the code above.
+ */
+ memcpy(CMSG_DATA(&cmsg.cm), &queue_fd, sizeof(int));
+
+ res = nxt_unit_sendmsg(ctx, ready_fd, &msg, sizeof(msg),
+ &cmsg, sizeof(cmsg));
if (res != sizeof(msg)) {
return NXT_UNIT_ERROR;
}
@@ -680,41 +886,56 @@ nxt_unit_ready(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
}
-int
-nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
- void *buf, size_t buf_size, void *oob, size_t oob_size)
+static int
+nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
{
- int rc;
- pid_t pid;
- struct cmsghdr *cm;
- nxt_port_msg_t *port_msg;
- nxt_unit_impl_t *lib;
- nxt_unit_recv_msg_t recv_msg;
- nxt_unit_callbacks_t *cb;
+ int rc;
+ pid_t pid;
+ struct cmsghdr *cm;
+ nxt_port_msg_t *port_msg;
+ nxt_unit_impl_t *lib;
+ nxt_unit_recv_msg_t recv_msg;
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
- rc = NXT_UNIT_ERROR;
- recv_msg.fd = -1;
- recv_msg.process = NULL;
- port_msg = buf;
- cm = oob;
-
- if (oob_size >= CMSG_SPACE(sizeof(int))
- && cm->cmsg_len == CMSG_LEN(sizeof(int))
- && cm->cmsg_level == SOL_SOCKET
+ recv_msg.fd[0] = -1;
+ recv_msg.fd[1] = -1;
+ port_msg = (nxt_port_msg_t *) rbuf->buf;
+ cm = (struct cmsghdr *) rbuf->oob;
+
+ if (cm->cmsg_level == SOL_SOCKET
&& cm->cmsg_type == SCM_RIGHTS)
{
- memcpy(&recv_msg.fd, CMSG_DATA(cm), sizeof(int));
+ if (cm->cmsg_len == CMSG_LEN(sizeof(int))) {
+ memcpy(recv_msg.fd, CMSG_DATA(cm), sizeof(int));
+ }
+
+ if (cm->cmsg_len == CMSG_LEN(sizeof(int) * 2)) {
+ memcpy(recv_msg.fd, CMSG_DATA(cm), sizeof(int) * 2);
+ }
}
recv_msg.incoming_buf = NULL;
- if (nxt_slow_path(buf_size < sizeof(nxt_port_msg_t))) {
- nxt_unit_warn(ctx, "message too small (%d bytes)", (int) buf_size);
- goto fail;
+ if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) {
+ if (nxt_slow_path(rbuf->size == 0)) {
+ nxt_unit_debug(ctx, "read port closed");
+
+ nxt_unit_quit(ctx);
+ rc = NXT_UNIT_OK;
+ goto done;
+ }
+
+ nxt_unit_alert(ctx, "message too small (%d bytes)", (int) rbuf->size);
+
+ rc = NXT_UNIT_ERROR;
+ goto done;
}
+ nxt_unit_debug(ctx, "#%"PRIu32": process message %d fd[0] %d fd[1] %d",
+ port_msg->stream, (int) port_msg->type,
+ recv_msg.fd[0], recv_msg.fd[1]);
+
recv_msg.stream = port_msg->stream;
recv_msg.pid = port_msg->pid;
recv_msg.reply_port = port_msg->reply_port;
@@ -722,41 +943,42 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
recv_msg.mmap = port_msg->mmap;
recv_msg.start = port_msg + 1;
- recv_msg.size = buf_size - sizeof(nxt_port_msg_t);
+ recv_msg.size = rbuf->size - sizeof(nxt_port_msg_t);
if (nxt_slow_path(port_msg->type >= NXT_PORT_MSG_MAX)) {
- nxt_unit_warn(ctx, "#%"PRIu32": unknown message type (%d)",
- port_msg->stream, (int) port_msg->type);
- goto fail;
- }
-
- if (port_msg->tracking && nxt_unit_tracking_read(ctx, &recv_msg) == 0) {
- rc = NXT_UNIT_OK;
-
- goto fail;
+ nxt_unit_alert(ctx, "#%"PRIu32": unknown message type (%d)",
+ port_msg->stream, (int) port_msg->type);
+ rc = NXT_UNIT_ERROR;
+ goto done;
}
/* Fragmentation is unsupported. */
if (nxt_slow_path(port_msg->nf != 0 || port_msg->mf != 0)) {
- nxt_unit_warn(ctx, "#%"PRIu32": fragmented message type (%d)",
- port_msg->stream, (int) port_msg->type);
- goto fail;
+ nxt_unit_alert(ctx, "#%"PRIu32": fragmented message type (%d)",
+ port_msg->stream, (int) port_msg->type);
+ rc = NXT_UNIT_ERROR;
+ goto done;
}
if (port_msg->mmap) {
- if (nxt_unit_mmap_read(ctx, &recv_msg) != NXT_UNIT_OK) {
- goto fail;
+ rc = nxt_unit_mmap_read(ctx, &recv_msg, rbuf);
+
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ if (rc == NXT_UNIT_AGAIN) {
+ recv_msg.fd[0] = -1;
+ recv_msg.fd[1] = -1;
+ }
+
+ goto done;
}
}
- cb = &lib->callbacks;
-
switch (port_msg->type) {
case _NXT_PORT_MSG_QUIT:
nxt_unit_debug(ctx, "#%"PRIu32": quit", port_msg->stream);
- cb->quit(ctx);
+ nxt_unit_quit(ctx);
rc = NXT_UNIT_OK;
break;
@@ -766,45 +988,52 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
case _NXT_PORT_MSG_CHANGE_FILE:
nxt_unit_debug(ctx, "#%"PRIu32": change_file: fd %d",
- port_msg->stream, recv_msg.fd);
+ port_msg->stream, recv_msg.fd[0]);
- if (dup2(recv_msg.fd, lib->log_fd) == -1) {
+ if (dup2(recv_msg.fd[0], lib->log_fd) == -1) {
nxt_unit_alert(ctx, "#%"PRIu32": dup2(%d, %d) failed: %s (%d)",
- port_msg->stream, recv_msg.fd, lib->log_fd,
+ port_msg->stream, recv_msg.fd[0], lib->log_fd,
strerror(errno), errno);
- goto fail;
+ rc = NXT_UNIT_ERROR;
+ goto done;
}
rc = NXT_UNIT_OK;
break;
case _NXT_PORT_MSG_MMAP:
- if (nxt_slow_path(recv_msg.fd < 0)) {
+ if (nxt_slow_path(recv_msg.fd[0] < 0)) {
nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for mmap",
- port_msg->stream, recv_msg.fd);
+ port_msg->stream, recv_msg.fd[0]);
- goto fail;
+ rc = NXT_UNIT_ERROR;
+ goto done;
}
- rc = nxt_unit_incoming_mmap(ctx, port_msg->pid, recv_msg.fd);
+ rc = nxt_unit_incoming_mmap(ctx, port_msg->pid, recv_msg.fd[0]);
break;
case _NXT_PORT_MSG_REQ_HEADERS:
rc = nxt_unit_process_req_headers(ctx, &recv_msg);
break;
+ case _NXT_PORT_MSG_REQ_BODY:
+ rc = nxt_unit_process_req_body(ctx, &recv_msg);
+ break;
+
case _NXT_PORT_MSG_WEBSOCKET:
rc = nxt_unit_process_websocket(ctx, &recv_msg);
break;
case _NXT_PORT_MSG_REMOVE_PID:
if (nxt_slow_path(recv_msg.size != sizeof(pid))) {
- nxt_unit_warn(ctx, "#%"PRIu32": remove_pid: invalid message size "
- "(%d != %d)", port_msg->stream, (int) recv_msg.size,
- (int) sizeof(pid));
+ nxt_unit_alert(ctx, "#%"PRIu32": remove_pid: invalid message size "
+ "(%d != %d)", port_msg->stream, (int) recv_msg.size,
+ (int) sizeof(pid));
- goto fail;
+ rc = NXT_UNIT_ERROR;
+ goto done;
}
memcpy(&pid, recv_msg.start, sizeof(pid));
@@ -812,7 +1041,7 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
nxt_unit_debug(ctx, "#%"PRIu32": remove_pid: %d",
port_msg->stream, (int) pid);
- cb->remove_pid(ctx, pid);
+ nxt_unit_remove_pid(lib, pid);
rc = NXT_UNIT_OK;
break;
@@ -825,21 +1054,29 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
nxt_unit_debug(ctx, "#%"PRIu32": ignore message type: %d",
port_msg->stream, (int) port_msg->type);
- goto fail;
+ rc = NXT_UNIT_ERROR;
+ goto done;
}
-fail:
+done:
- if (recv_msg.fd != -1) {
- close(recv_msg.fd);
+ if (recv_msg.fd[0] != -1) {
+ nxt_unit_close(recv_msg.fd[0]);
+ }
+
+ if (recv_msg.fd[1] != -1) {
+ nxt_unit_close(recv_msg.fd[1]);
}
while (recv_msg.incoming_buf != NULL) {
nxt_unit_mmap_buf_free(recv_msg.incoming_buf);
}
- if (recv_msg.process != NULL) {
- nxt_unit_process_use(ctx, recv_msg.process, -1);
+ if (nxt_fast_path(rc != NXT_UNIT_AGAIN)) {
+#if (NXT_DEBUG)
+ memset(rbuf->buf, 0xAC, rbuf->size);
+#endif
+ nxt_unit_read_buf_release(ctx, rbuf);
}
return rc;
@@ -849,9 +1086,9 @@ fail:
static int
nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
{
- int nb;
+ void *mem;
nxt_unit_impl_t *lib;
- nxt_unit_port_t new_port;
+ nxt_unit_port_t new_port, *port;
nxt_port_msg_new_port_t *new_port_msg;
if (nxt_slow_path(recv_msg->size != sizeof(nxt_port_msg_new_port_t))) {
@@ -862,48 +1099,80 @@ nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
return NXT_UNIT_ERROR;
}
- if (nxt_slow_path(recv_msg->fd < 0)) {
+ if (nxt_slow_path(recv_msg->fd[0] < 0)) {
nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for new port",
- recv_msg->stream, recv_msg->fd);
+ recv_msg->stream, recv_msg->fd[0]);
return NXT_UNIT_ERROR;
}
new_port_msg = recv_msg->start;
- nxt_unit_debug(ctx, "#%"PRIu32": new_port: %d,%d fd %d",
+ nxt_unit_debug(ctx, "#%"PRIu32": new_port: port{%d,%d} fd[0] %d fd[1] %d",
recv_msg->stream, (int) new_port_msg->pid,
- (int) new_port_msg->id, recv_msg->fd);
+ (int) new_port_msg->id, recv_msg->fd[0], recv_msg->fd[1]);
- nb = 0;
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
- if (nxt_slow_path(ioctl(recv_msg->fd, FIONBIO, &nb) == -1)) {
- nxt_unit_alert(ctx, "#%"PRIu32": new_port: ioctl(%d, FIONBIO, 0) "
- "failed: %s (%d)",
- recv_msg->stream, recv_msg->fd, strerror(errno), errno);
+ if (new_port_msg->id == (nxt_port_id_t) -1) {
+ nxt_unit_port_id_init(&new_port.id, lib->pid, new_port_msg->id);
- return NXT_UNIT_ERROR;
+ new_port.in_fd = recv_msg->fd[0];
+ new_port.out_fd = -1;
+
+ mem = mmap(NULL, sizeof(nxt_app_queue_t), PROT_READ | PROT_WRITE,
+ MAP_SHARED, recv_msg->fd[1], 0);
+
+ } else {
+ if (nxt_slow_path(nxt_unit_fd_blocking(recv_msg->fd[0])
+ != NXT_UNIT_OK))
+ {
+ return NXT_UNIT_ERROR;
+ }
+
+ nxt_unit_port_id_init(&new_port.id, new_port_msg->pid,
+ new_port_msg->id);
+
+ new_port.in_fd = -1;
+ new_port.out_fd = recv_msg->fd[0];
+
+ mem = mmap(NULL, sizeof(nxt_port_queue_t), PROT_READ | PROT_WRITE,
+ MAP_SHARED, recv_msg->fd[1], 0);
}
- nxt_unit_port_id_init(&new_port.id, new_port_msg->pid,
- new_port_msg->id);
+ if (nxt_slow_path(mem == MAP_FAILED)) {
+ nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", recv_msg->fd[1],
+ strerror(errno), errno);
+
+ return NXT_UNIT_ERROR;
+ }
- new_port.in_fd = -1;
- new_port.out_fd = recv_msg->fd;
new_port.data = NULL;
- recv_msg->fd = -1;
+ recv_msg->fd[0] = -1;
- lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+ port = nxt_unit_add_port(ctx, &new_port, mem);
+ if (nxt_slow_path(port == NULL)) {
+ return NXT_UNIT_ERROR;
+ }
+
+ if (new_port_msg->id == (nxt_port_id_t) -1) {
+ lib->shared_port = port;
+
+ } else {
+ nxt_unit_port_release(port);
+ }
- return lib->callbacks.add_port(ctx, &new_port);
+ return NXT_UNIT_OK;
}
static int
nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
{
+ int res;
nxt_unit_impl_t *lib;
+ nxt_unit_port_id_t port_id;
nxt_unit_request_t *r;
nxt_unit_mmap_buf_t *b;
nxt_unit_request_info_t *req;
@@ -934,9 +1203,6 @@ nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
req = &req_impl->req;
- nxt_unit_port_id_init(&req->response_port, recv_msg->pid,
- recv_msg->reply_port);
-
req->request = recv_msg->start;
b = recv_msg->incoming_buf;
@@ -952,14 +1218,6 @@ nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
req->content_buf = req->request_buf;
req->content_buf->free = nxt_unit_sptr_get(&r->preread_content);
- /* "Move" process reference to req_impl. */
- req_impl->process = nxt_unit_msg_get_process(ctx, recv_msg);
- if (nxt_slow_path(req_impl->process == NULL)) {
- return NXT_UNIT_ERROR;
- }
-
- recv_msg->process = NULL;
-
req_impl->stream = recv_msg->stream;
req_impl->outgoing_buf = NULL;
@@ -973,12 +1231,13 @@ nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
req_impl->incoming_buf->prev = &req_impl->incoming_buf;
recv_msg->incoming_buf = NULL;
- req->content_fd = recv_msg->fd;
- recv_msg->fd = -1;
+ req->content_fd = recv_msg->fd[0];
+ recv_msg->fd[0] = -1;
req->response_max_fields = 0;
req_impl->state = NXT_UNIT_RS_START;
req_impl->websocket = 0;
+ req_impl->in_hash = 0;
nxt_unit_debug(ctx, "#%"PRIu32": %.*s %.*s (%d)", recv_msg->stream,
(int) r->method_length,
@@ -987,9 +1246,247 @@ nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
(char *) nxt_unit_sptr_get(&r->target),
(int) r->content_length);
+ nxt_unit_port_id_init(&port_id, recv_msg->pid, recv_msg->reply_port);
+
+ res = nxt_unit_request_check_response_port(req, &port_id);
+ if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
+ return NXT_UNIT_ERROR;
+ }
+
+ if (nxt_fast_path(res == NXT_UNIT_OK)) {
+ res = nxt_unit_send_req_headers_ack(req);
+ if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
+ nxt_unit_request_done(req, NXT_UNIT_ERROR);
+
+ return NXT_UNIT_ERROR;
+ }
+
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+
+ if (req->content_length
+ > (uint64_t) (req->content_buf->end - req->content_buf->free))
+ {
+ res = nxt_unit_request_hash_add(ctx, req);
+ if (nxt_slow_path(res != NXT_UNIT_OK)) {
+ nxt_unit_req_warn(req, "failed to add request to hash");
+
+ nxt_unit_request_done(req, NXT_UNIT_ERROR);
+
+ return NXT_UNIT_ERROR;
+ }
+
+ /*
+ * If application have separate data handler, we may start
+ * request processing and process data when it is arrived.
+ */
+ if (lib->callbacks.data_handler == NULL) {
+ return NXT_UNIT_OK;
+ }
+ }
+
+ lib->callbacks.request_handler(req);
+ }
+
+ return NXT_UNIT_OK;
+}
+
+
+static int
+nxt_unit_process_req_body(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
+{
+ uint64_t l;
+ nxt_unit_impl_t *lib;
+ nxt_unit_mmap_buf_t *b;
+ nxt_unit_request_info_t *req;
+
+ req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last);
+ if (req == NULL) {
+ return NXT_UNIT_OK;
+ }
+
+ l = req->content_buf->end - req->content_buf->free;
+
+ for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
+ b->req = req;
+ l += b->buf.end - b->buf.free;
+ }
+
+ if (recv_msg->incoming_buf != NULL) {
+ b = nxt_container_of(req->content_buf, nxt_unit_mmap_buf_t, buf);
+
+ /* "Move" incoming buffer list to req_impl. */
+ nxt_unit_mmap_buf_insert_tail(&b->next, recv_msg->incoming_buf);
+ recv_msg->incoming_buf = NULL;
+ }
+
+ req->content_fd = recv_msg->fd[0];
+ recv_msg->fd[0] = -1;
+
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
- lib->callbacks.request_handler(req);
+ if (lib->callbacks.data_handler != NULL) {
+ lib->callbacks.data_handler(req);
+
+ return NXT_UNIT_OK;
+ }
+
+ if (req->content_fd != -1 || l == req->content_length) {
+ lib->callbacks.request_handler(req);
+ }
+
+ return NXT_UNIT_OK;
+}
+
+
+static int
+nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
+ nxt_unit_port_id_t *port_id)
+{
+ int res;
+ nxt_unit_ctx_t *ctx;
+ nxt_unit_impl_t *lib;
+ nxt_unit_port_t *port;
+ nxt_unit_process_t *process;
+ nxt_unit_ctx_impl_t *ctx_impl;
+ nxt_unit_port_impl_t *port_impl;
+ nxt_unit_request_info_impl_t *req_impl;
+
+ ctx = req->ctx;
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+ ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
+
+ pthread_mutex_lock(&lib->mutex);
+
+ port = nxt_unit_port_hash_find(&lib->ports, port_id, 0);
+ port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
+
+ if (nxt_fast_path(port != NULL)) {
+ req->response_port = port;
+
+ if (nxt_fast_path(port_impl->ready)) {
+ pthread_mutex_unlock(&lib->mutex);
+
+ nxt_unit_debug(ctx, "check_response_port: found port{%d,%d}",
+ (int) port->id.pid, (int) port->id.id);
+
+ return NXT_UNIT_OK;
+ }
+
+ nxt_unit_debug(ctx, "check_response_port: "
+ "port{%d,%d} already requested",
+ (int) port->id.pid, (int) port->id.id);
+
+ req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
+
+ nxt_queue_insert_tail(&port_impl->awaiting_req,
+ &req_impl->port_wait_link);
+
+ pthread_mutex_unlock(&lib->mutex);
+
+ nxt_atomic_fetch_add(&ctx_impl->wait_items, 1);
+
+ return NXT_UNIT_AGAIN;
+ }
+
+ port_impl = malloc(sizeof(nxt_unit_port_impl_t));
+ if (nxt_slow_path(port_impl == NULL)) {
+ nxt_unit_alert(ctx, "check_response_port: malloc(%d) failed",
+ (int) sizeof(nxt_unit_port_impl_t));
+
+ pthread_mutex_unlock(&lib->mutex);
+
+ return NXT_UNIT_ERROR;
+ }
+
+ port = &port_impl->port;
+
+ port->id = *port_id;
+ port->in_fd = -1;
+ port->out_fd = -1;
+ port->data = NULL;
+
+ res = nxt_unit_port_hash_add(&lib->ports, port);
+ if (nxt_slow_path(res != NXT_UNIT_OK)) {
+ nxt_unit_alert(ctx, "check_response_port: %d,%d hash_add failed",
+ port->id.pid, port->id.id);
+
+ pthread_mutex_unlock(&lib->mutex);
+
+ free(port);
+
+ return NXT_UNIT_ERROR;
+ }
+
+ process = nxt_unit_process_find(lib, port_id->pid, 0);
+ if (nxt_slow_path(process == NULL)) {
+ nxt_unit_alert(ctx, "check_response_port: process %d not found",
+ port->id.pid);
+
+ nxt_unit_port_hash_find(&lib->ports, port_id, 1);
+
+ pthread_mutex_unlock(&lib->mutex);
+
+ free(port);
+
+ return NXT_UNIT_ERROR;
+ }
+
+ nxt_queue_insert_tail(&process->ports, &port_impl->link);
+
+ port_impl->process = process;
+ port_impl->queue = NULL;
+ port_impl->from_socket = 0;
+ port_impl->socket_rbuf = NULL;
+
+ nxt_queue_init(&port_impl->awaiting_req);
+
+ req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
+
+ nxt_queue_insert_tail(&port_impl->awaiting_req, &req_impl->port_wait_link);
+
+ port_impl->use_count = 2;
+ port_impl->ready = 0;
+
+ req->response_port = port;
+
+ pthread_mutex_unlock(&lib->mutex);
+
+ res = nxt_unit_get_port(ctx, port_id);
+ if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
+ return NXT_UNIT_ERROR;
+ }
+
+ nxt_atomic_fetch_add(&ctx_impl->wait_items, 1);
+
+ return NXT_UNIT_AGAIN;
+}
+
+
+static int
+nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req)
+{
+ ssize_t res;
+ nxt_port_msg_t msg;
+ nxt_unit_impl_t *lib;
+ nxt_unit_ctx_impl_t *ctx_impl;
+ nxt_unit_request_info_impl_t *req_impl;
+
+ lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit);
+ ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx);
+ req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
+
+ memset(&msg, 0, sizeof(nxt_port_msg_t));
+
+ msg.stream = req_impl->stream;
+ msg.pid = lib->pid;
+ msg.reply_port = ctx_impl->read_port->id.id;
+ msg.type = _NXT_PORT_MSG_REQ_HEADERS_ACK;
+
+ res = nxt_unit_port_send(req->ctx, req->response_port,
+ &msg, sizeof(msg), NULL, 0);
+ if (nxt_slow_path(res != sizeof(msg))) {
+ return NXT_UNIT_ERROR;
+ }
return NXT_UNIT_OK;
}
@@ -1001,21 +1498,17 @@ nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
size_t hsize;
nxt_unit_impl_t *lib;
nxt_unit_mmap_buf_t *b;
- nxt_unit_ctx_impl_t *ctx_impl;
nxt_unit_callbacks_t *cb;
nxt_unit_request_info_t *req;
nxt_unit_request_info_impl_t *req_impl;
nxt_unit_websocket_frame_impl_t *ws_impl;
- ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
-
- req_impl = nxt_unit_request_hash_find(&ctx_impl->requests, recv_msg->stream,
- recv_msg->last);
- if (req_impl == NULL) {
+ req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last);
+ if (nxt_slow_path(req == NULL)) {
return NXT_UNIT_OK;
}
- req = &req_impl->req;
+ req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
cb = &lib->callbacks;
@@ -1181,12 +1674,12 @@ nxt_unit_request_info_release(nxt_unit_request_info_t *req)
req->response = NULL;
req->response_buf = NULL;
- if (req_impl->websocket) {
- nxt_unit_request_hash_find(&ctx_impl->requests, req_impl->stream, 1);
-
- req_impl->websocket = 0;
+ if (req_impl->in_hash) {
+ nxt_unit_request_hash_find(req->ctx, req_impl->stream, 1);
}
+ req_impl->websocket = 0;
+
while (req_impl->outgoing_buf != NULL) {
nxt_unit_mmap_buf_free(req_impl->outgoing_buf);
}
@@ -1196,19 +1689,15 @@ nxt_unit_request_info_release(nxt_unit_request_info_t *req)
}
if (req->content_fd != -1) {
- close(req->content_fd);
+ nxt_unit_close(req->content_fd);
req->content_fd = -1;
}
- /*
- * Process release should go after buffers release to guarantee mmap
- * existence.
- */
- if (req_impl->process != NULL) {
- nxt_unit_process_use(req->ctx, req_impl->process, -1);
+ if (req->response_port != NULL) {
+ nxt_unit_port_release(req->response_port);
- req_impl->process = NULL;
+ req->response_port = NULL;
}
pthread_mutex_lock(&ctx_impl->mutex);
@@ -1729,7 +2218,7 @@ nxt_unit_response_send(nxt_unit_request_info_t *req)
mmap_buf = nxt_container_of(req->response_buf, nxt_unit_mmap_buf_t, buf);
- rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, mmap_buf, 0);
+ rc = nxt_unit_mmap_buf_send(req, mmap_buf, 0);
if (nxt_fast_path(rc == NXT_UNIT_OK)) {
req->response = NULL;
req->response_buf = NULL;
@@ -1782,8 +2271,8 @@ nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size)
nxt_unit_mmap_buf_insert_tail(&req_impl->outgoing_buf, mmap_buf);
- rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process,
- &req->response_port, size, size, mmap_buf,
+ rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port,
+ size, size, mmap_buf,
NULL);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
nxt_unit_mmap_buf_release(mmap_buf);
@@ -1795,32 +2284,6 @@ nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size)
}
-static nxt_unit_process_t *
-nxt_unit_msg_get_process(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
-{
- nxt_unit_impl_t *lib;
-
- if (recv_msg->process != NULL) {
- return recv_msg->process;
- }
-
- lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
-
- pthread_mutex_lock(&lib->mutex);
-
- recv_msg->process = nxt_unit_process_find(ctx, recv_msg->pid, 0);
-
- pthread_mutex_unlock(&lib->mutex);
-
- if (recv_msg->process == NULL) {
- nxt_unit_warn(ctx, "#%"PRIu32": process %d not found",
- recv_msg->stream, (int) recv_msg->pid);
- }
-
- return recv_msg->process;
-}
-
-
static nxt_unit_mmap_buf_t *
nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx)
{
@@ -1869,15 +2332,6 @@ nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf)
}
-typedef struct {
- size_t len;
- const char *str;
-} nxt_unit_str_t;
-
-
-#define nxt_unit_str(str) { nxt_length(str), str }
-
-
int
nxt_unit_request_is_websocket_handshake(nxt_unit_request_info_t *req)
{
@@ -1889,7 +2343,6 @@ int
nxt_unit_response_upgrade(nxt_unit_request_info_t *req)
{
int rc;
- nxt_unit_ctx_impl_t *ctx_impl;
nxt_unit_request_info_impl_t *req_impl;
req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
@@ -1912,9 +2365,7 @@ nxt_unit_response_upgrade(nxt_unit_request_info_t *req)
return NXT_UNIT_ERROR;
}
- ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx);
-
- rc = nxt_unit_request_hash_add(&ctx_impl->requests, req_impl);
+ rc = nxt_unit_request_hash_add(req->ctx, req);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
nxt_unit_req_warn(req, "upgrade: failed to add request to hash");
@@ -1980,7 +2431,7 @@ nxt_unit_buf_send(nxt_unit_buf_t *buf)
}
if (nxt_fast_path(buf->free > buf->start)) {
- rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, mmap_buf, 0);
+ rc = nxt_unit_mmap_buf_send(req, mmap_buf, 0);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
return rc;
}
@@ -1995,17 +2446,15 @@ nxt_unit_buf_send(nxt_unit_buf_t *buf)
static void
nxt_unit_buf_send_done(nxt_unit_buf_t *buf)
{
- int rc;
- nxt_unit_mmap_buf_t *mmap_buf;
- nxt_unit_request_info_t *req;
- nxt_unit_request_info_impl_t *req_impl;
+ int rc;
+ nxt_unit_mmap_buf_t *mmap_buf;
+ nxt_unit_request_info_t *req;
mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
req = mmap_buf->req;
- req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
- rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, mmap_buf, 1);
+ rc = nxt_unit_mmap_buf_send(req, mmap_buf, 1);
if (nxt_slow_path(rc == NXT_UNIT_OK)) {
nxt_unit_mmap_buf_free(mmap_buf);
@@ -2018,7 +2467,7 @@ nxt_unit_buf_send_done(nxt_unit_buf_t *buf)
static int
-nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream,
+nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req,
nxt_unit_mmap_buf_t *mmap_buf, int last)
{
struct {
@@ -2026,22 +2475,24 @@ nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream,
nxt_port_mmap_msg_t mmap_msg;
} m;
- int rc;
- u_char *last_used, *first_free;
- ssize_t res;
- nxt_chunk_id_t first_free_chunk;
- nxt_unit_buf_t *buf;
- nxt_unit_impl_t *lib;
- nxt_port_mmap_header_t *hdr;
+ int rc;
+ u_char *last_used, *first_free;
+ ssize_t res;
+ nxt_chunk_id_t first_free_chunk;
+ nxt_unit_buf_t *buf;
+ nxt_unit_impl_t *lib;
+ nxt_port_mmap_header_t *hdr;
+ nxt_unit_request_info_impl_t *req_impl;
- lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+ lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit);
+ req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
buf = &mmap_buf->buf;
hdr = mmap_buf->hdr;
m.mmap_msg.size = buf->free - buf->start;
- m.msg.stream = stream;
+ m.msg.stream = req_impl->stream;
m.msg.pid = lib->pid;
m.msg.reply_port = 0;
m.msg.type = _NXT_PORT_MSG_DATA;
@@ -2058,14 +2509,14 @@ nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream,
m.mmap_msg.chunk_id = nxt_port_mmap_chunk_id(hdr,
(u_char *) buf->start);
- nxt_unit_debug(ctx, "#%"PRIu32": send mmap: (%d,%d,%d)",
- stream,
+ nxt_unit_debug(req->ctx, "#%"PRIu32": send mmap: (%d,%d,%d)",
+ req_impl->stream,
(int) m.mmap_msg.mmap_id,
(int) m.mmap_msg.chunk_id,
(int) m.mmap_msg.size);
- res = lib->callbacks.port_send(ctx, &mmap_buf->port_id, &m, sizeof(m),
- NULL, 0);
+ res = nxt_unit_port_send(req->ctx, req->response_port, &m, sizeof(m),
+ NULL, 0);
if (nxt_slow_path(res != sizeof(m))) {
goto free_buf;
}
@@ -2091,33 +2542,34 @@ nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream,
mmap_buf->hdr = NULL;
}
- nxt_atomic_fetch_add(&mmap_buf->process->outgoing.allocated_chunks,
+ nxt_atomic_fetch_add(&lib->outgoing.allocated_chunks,
(int) m.mmap_msg.chunk_id - (int) first_free_chunk);
- nxt_unit_debug(ctx, "process %d allocated_chunks %d",
- mmap_buf->process->pid,
- (int) mmap_buf->process->outgoing.allocated_chunks);
+ nxt_unit_debug(req->ctx, "allocated_chunks %d",
+ (int) lib->outgoing.allocated_chunks);
} else {
if (nxt_slow_path(mmap_buf->plain_ptr == NULL
|| mmap_buf->plain_ptr > buf->start - sizeof(m.msg)))
{
- nxt_unit_warn(ctx, "#%"PRIu32": failed to send plain memory buffer"
- ": no space reserved for message header", stream);
+ nxt_unit_alert(req->ctx,
+ "#%"PRIu32": failed to send plain memory buffer"
+ ": no space reserved for message header",
+ req_impl->stream);
goto free_buf;
}
memcpy(buf->start - sizeof(m.msg), &m.msg, sizeof(m.msg));
- nxt_unit_debug(ctx, "#%"PRIu32": send plain: %d",
- stream,
+ nxt_unit_debug(req->ctx, "#%"PRIu32": send plain: %d",
+ req_impl->stream,
(int) (sizeof(m.msg) + m.mmap_msg.size));
- res = lib->callbacks.port_send(ctx, &mmap_buf->port_id,
- buf->start - sizeof(m.msg),
- m.mmap_msg.size + sizeof(m.msg),
- NULL, 0);
+ res = nxt_unit_port_send(req->ctx, req->response_port,
+ buf->start - sizeof(m.msg),
+ m.mmap_msg.size + sizeof(m.msg),
+ NULL, 0);
if (nxt_slow_path(res != (ssize_t) (m.mmap_msg.size + sizeof(m.msg)))) {
goto free_buf;
}
@@ -2154,7 +2606,6 @@ nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf)
{
if (mmap_buf->hdr != NULL) {
nxt_unit_mmap_release(&mmap_buf->ctx_impl->ctx,
- mmap_buf->process,
mmap_buf->hdr, mmap_buf->buf.start,
mmap_buf->buf.end - mmap_buf->buf.start);
@@ -2175,33 +2626,43 @@ static nxt_unit_read_buf_t *
nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx)
{
nxt_unit_ctx_impl_t *ctx_impl;
+ nxt_unit_read_buf_t *rbuf;
ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
pthread_mutex_lock(&ctx_impl->mutex);
- return nxt_unit_read_buf_get_impl(ctx_impl);
+ rbuf = nxt_unit_read_buf_get_impl(ctx_impl);
+
+ pthread_mutex_unlock(&ctx_impl->mutex);
+
+ memset(rbuf->oob, 0, sizeof(struct cmsghdr));
+
+ return rbuf;
}
static nxt_unit_read_buf_t *
nxt_unit_read_buf_get_impl(nxt_unit_ctx_impl_t *ctx_impl)
{
+ nxt_queue_link_t *link;
nxt_unit_read_buf_t *rbuf;
- if (ctx_impl->free_read_buf != NULL) {
- rbuf = ctx_impl->free_read_buf;
- ctx_impl->free_read_buf = rbuf->next;
+ if (!nxt_queue_is_empty(&ctx_impl->free_rbuf)) {
+ link = nxt_queue_first(&ctx_impl->free_rbuf);
+ nxt_queue_remove(link);
- pthread_mutex_unlock(&ctx_impl->mutex);
+ rbuf = nxt_container_of(link, nxt_unit_read_buf_t, link);
return rbuf;
}
- pthread_mutex_unlock(&ctx_impl->mutex);
-
rbuf = malloc(sizeof(nxt_unit_read_buf_t));
+ if (nxt_fast_path(rbuf != NULL)) {
+ rbuf->ctx_impl = ctx_impl;
+ }
+
return rbuf;
}
@@ -2216,8 +2677,7 @@ nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx,
pthread_mutex_lock(&ctx_impl->mutex);
- rbuf->next = ctx_impl->free_read_buf;
- ctx_impl->free_read_buf = rbuf;
+ nxt_queue_insert_head(&ctx_impl->free_rbuf, &rbuf->link);
pthread_mutex_unlock(&ctx_impl->mutex);
}
@@ -2276,13 +2736,15 @@ nxt_unit_response_write_nb(nxt_unit_request_info_t *req, const void *start,
nxt_unit_request_info_impl_t *req_impl;
char local_buf[NXT_UNIT_LOCAL_BUF_SIZE];
+ nxt_unit_req_debug(req, "write: %d", (int) size);
+
req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
part_start = start;
sent = 0;
if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
- nxt_unit_req_warn(req, "write: response not initialized yet");
+ nxt_unit_req_alert(req, "write: response not initialized yet");
return -NXT_UNIT_ERROR;
}
@@ -2314,8 +2776,7 @@ nxt_unit_response_write_nb(nxt_unit_request_info_t *req, const void *start,
min_part_size = nxt_min(min_size, part_size);
min_part_size = nxt_min(min_part_size, PORT_MMAP_CHUNK_SIZE);
- rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process,
- &req->response_port, part_size,
+ rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port, part_size,
min_part_size, &mmap_buf, local_buf);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
return -rc;
@@ -2330,7 +2791,7 @@ nxt_unit_response_write_nb(nxt_unit_request_info_t *req, const void *start,
mmap_buf.buf.free = nxt_cpymem(mmap_buf.buf.free,
part_start, part_size);
- rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, &mmap_buf, 0);
+ rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
return -rc;
}
@@ -2360,8 +2821,14 @@ nxt_unit_response_write_cb(nxt_unit_request_info_t *req,
req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
+ if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
+ nxt_unit_req_alert(req, "write: response not initialized yet");
+
+ return NXT_UNIT_ERROR;
+ }
+
/* Check if response is not send yet. */
- if (nxt_slow_path(req->response_buf)) {
+ if (nxt_slow_path(req->response_buf != NULL)) {
/* Enable content in headers buf. */
rc = nxt_unit_response_add_content(req, "", 0);
@@ -2408,8 +2875,7 @@ nxt_unit_response_write_cb(nxt_unit_request_info_t *req,
buf_size = nxt_min(read_info->buf_size, PORT_MMAP_DATA_SIZE);
- rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process,
- &req->response_port,
+ rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port,
buf_size, buf_size,
&mmap_buf, local_buf);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
@@ -2431,7 +2897,7 @@ nxt_unit_response_write_cb(nxt_unit_request_info_t *req,
buf->free += n;
}
- rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, &mmap_buf, 0);
+ rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
nxt_unit_req_error(req, "Failed to send content");
@@ -2451,9 +2917,11 @@ nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, size_t size)
buf_res = nxt_unit_buf_read(&req->content_buf, &req->content_length,
dst, size);
+ nxt_unit_req_debug(req, "read: %d", (int) buf_res);
+
if (buf_res < (ssize_t) size && req->content_fd != -1) {
res = read(req->content_fd, dst, size);
- if (res < 0) {
+ if (nxt_slow_path(res < 0)) {
nxt_unit_req_alert(req, "failed to read content: %s (%d)",
strerror(errno), errno);
@@ -2461,7 +2929,7 @@ nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, size_t size)
}
if (res < (ssize_t) size) {
- close(req->content_fd);
+ nxt_unit_close(req->content_fd);
req->content_fd = -1;
}
@@ -2561,7 +3029,6 @@ nxt_unit_request_preread(nxt_unit_request_info_t *req, size_t size)
mmap_buf->buf.start = mmap_buf->free_ptr;
mmap_buf->buf.free = mmap_buf->buf.start;
mmap_buf->buf.end = mmap_buf->buf.start + size;
- mmap_buf->process = NULL;
res = read(req->content_fd, mmap_buf->free_ptr, size);
if (res < 0) {
@@ -2574,7 +3041,7 @@ nxt_unit_request_preread(nxt_unit_request_info_t *req, size_t size)
}
if (res < (ssize_t) size) {
- close(req->content_fd);
+ nxt_unit_close(req->content_fd);
req->content_fd = -1;
}
@@ -2689,8 +3156,8 @@ skip_response_send:
msg.mf = 0;
msg.tracking = 0;
- (void) lib->callbacks.port_send(req->ctx, &req->response_port,
- &msg, sizeof(msg), NULL, 0);
+ (void) nxt_unit_port_send(req->ctx, req->response_port,
+ &msg, sizeof(msg), NULL, 0);
nxt_unit_request_info_release(req);
}
@@ -2710,17 +3177,14 @@ int
nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode,
uint8_t last, const struct iovec *iov, int iovcnt)
{
- int i, rc;
- size_t l, copy;
- uint32_t payload_len, buf_size, alloc_size;
- const uint8_t *b;
- nxt_unit_buf_t *buf;
- nxt_unit_mmap_buf_t mmap_buf;
- nxt_websocket_header_t *wh;
- nxt_unit_request_info_impl_t *req_impl;
- char local_buf[NXT_UNIT_LOCAL_BUF_SIZE];
-
- req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
+ int i, rc;
+ size_t l, copy;
+ uint32_t payload_len, buf_size, alloc_size;
+ const uint8_t *b;
+ nxt_unit_buf_t *buf;
+ nxt_unit_mmap_buf_t mmap_buf;
+ nxt_websocket_header_t *wh;
+ char local_buf[NXT_UNIT_LOCAL_BUF_SIZE];
payload_len = 0;
@@ -2731,8 +3195,7 @@ nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode,
buf_size = 10 + payload_len;
alloc_size = nxt_min(buf_size, PORT_MMAP_DATA_SIZE);
- rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process,
- &req->response_port,
+ rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port,
alloc_size, alloc_size,
&mmap_buf, local_buf);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
@@ -2766,8 +3229,7 @@ nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode,
if (l > 0) {
if (nxt_fast_path(buf->free > buf->start)) {
- rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream,
- &mmap_buf, 0);
+ rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
return rc;
@@ -2776,8 +3238,7 @@ nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode,
alloc_size = nxt_min(buf_size, PORT_MMAP_DATA_SIZE);
- rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process,
- &req->response_port,
+ rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port,
alloc_size, alloc_size,
&mmap_buf, local_buf);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
@@ -2790,8 +3251,7 @@ nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode,
}
if (buf->free > buf->start) {
- rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream,
- &mmap_buf, 0);
+ rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0);
}
return rc;
@@ -2864,8 +3324,8 @@ nxt_unit_websocket_done(nxt_unit_websocket_frame_t *ws)
static nxt_port_mmap_header_t *
-nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
- nxt_unit_port_id_t *port_id, nxt_chunk_id_t *c, int *n, int min_n)
+nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
+ nxt_chunk_id_t *c, int *n, int min_n)
{
int res, nchunks, i;
uint32_t outgoing_size;
@@ -2875,18 +3335,18 @@ nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
- pthread_mutex_lock(&process->outgoing.mutex);
+ pthread_mutex_lock(&lib->outgoing.mutex);
retry:
- outgoing_size = process->outgoing.size;
+ outgoing_size = lib->outgoing.size;
- mm_end = process->outgoing.elts + outgoing_size;
+ mm_end = lib->outgoing.elts + outgoing_size;
- for (mm = process->outgoing.elts; mm < mm_end; mm++) {
+ for (mm = lib->outgoing.elts; mm < mm_end; mm++) {
hdr = mm->hdr;
- if (hdr->sent_over != 0xFFFFu && hdr->sent_over != port_id->id) {
+ if (hdr->sent_over != 0xFFFFu && hdr->sent_over != port->id.id) {
continue;
}
@@ -2930,13 +3390,13 @@ retry:
if (outgoing_size >= lib->shm_mmap_limit) {
/* Cannot allocate more shared memory. */
- pthread_mutex_unlock(&process->outgoing.mutex);
+ pthread_mutex_unlock(&lib->outgoing.mutex);
if (min_n == 0) {
*n = 0;
}
- if (nxt_slow_path(process->outgoing.allocated_chunks + min_n
+ if (nxt_slow_path(lib->outgoing.allocated_chunks + min_n
>= lib->shm_mmap_limit * PORT_MMAP_CHUNK_COUNT))
{
/* Memory allocated by application, but not send to router. */
@@ -2945,7 +3405,7 @@ retry:
/* Notify router about OOSM condition. */
- res = nxt_unit_send_oosm(ctx, port_id);
+ res = nxt_unit_send_oosm(ctx, port);
if (nxt_slow_path(res != NXT_UNIT_OK)) {
return NULL;
}
@@ -2965,30 +3425,29 @@ retry:
nxt_unit_debug(ctx, "oosm: retry");
- pthread_mutex_lock(&process->outgoing.mutex);
+ pthread_mutex_lock(&lib->outgoing.mutex);
goto retry;
}
*c = 0;
- hdr = nxt_unit_new_mmap(ctx, process, port_id, *n);
+ hdr = nxt_unit_new_mmap(ctx, port, *n);
unlock:
- nxt_atomic_fetch_add(&process->outgoing.allocated_chunks, *n);
+ nxt_atomic_fetch_add(&lib->outgoing.allocated_chunks, *n);
- nxt_unit_debug(ctx, "process %d allocated_chunks %d",
- process->pid,
- (int) process->outgoing.allocated_chunks);
+ nxt_unit_debug(ctx, "allocated_chunks %d",
+ (int) lib->outgoing.allocated_chunks);
- pthread_mutex_unlock(&process->outgoing.mutex);
+ pthread_mutex_unlock(&lib->outgoing.mutex);
return hdr;
}
static int
-nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id)
+nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
{
ssize_t res;
nxt_port_msg_t msg;
@@ -3006,7 +3465,7 @@ nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id)
msg.mf = 0;
msg.tracking = 0;
- res = lib->callbacks.port_send(ctx, port_id, &msg, sizeof(msg), NULL, 0);
+ res = nxt_unit_port_send(ctx, lib->router_port, &msg, sizeof(msg), NULL, 0);
if (nxt_slow_path(res != sizeof(msg))) {
return NXT_UNIT_ERROR;
}
@@ -3018,7 +3477,7 @@ nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id)
static int
nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx)
{
- nxt_port_msg_t *port_msg;
+ int res;
nxt_unit_ctx_impl_t *ctx_impl;
nxt_unit_read_buf_t *rbuf;
@@ -3030,31 +3489,25 @@ nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx)
return NXT_UNIT_ERROR;
}
- nxt_unit_read_buf(ctx, rbuf);
-
- if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) {
+ res = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf);
+ if (res == NXT_UNIT_ERROR) {
nxt_unit_read_buf_release(ctx, rbuf);
return NXT_UNIT_ERROR;
}
- port_msg = (nxt_port_msg_t *) rbuf->buf;
-
- if (port_msg->type == _NXT_PORT_MSG_SHM_ACK) {
+ if (nxt_unit_is_shm_ack(rbuf)) {
nxt_unit_read_buf_release(ctx, rbuf);
-
break;
}
pthread_mutex_lock(&ctx_impl->mutex);
- *ctx_impl->pending_read_tail = rbuf;
- ctx_impl->pending_read_tail = &rbuf->next;
- rbuf->next = NULL;
+ nxt_queue_insert_tail(&ctx_impl->pending_rbuf, &rbuf->link);
pthread_mutex_unlock(&ctx_impl->mutex);
- if (port_msg->type == _NXT_PORT_MSG_QUIT) {
+ if (nxt_unit_is_quit(rbuf)) {
nxt_unit_debug(ctx, "oosm: quit received");
return NXT_UNIT_ERROR;
@@ -3068,7 +3521,12 @@ nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx)
static nxt_unit_mmap_t *
nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i)
{
- uint32_t cap;
+ uint32_t cap, n;
+ nxt_unit_mmap_t *e;
+
+ if (nxt_fast_path(mmaps->size > i)) {
+ return mmaps->elts + i;
+ }
cap = mmaps->cap;
@@ -3088,13 +3546,19 @@ nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i)
if (cap != mmaps->cap) {
- mmaps->elts = realloc(mmaps->elts, cap * sizeof(*mmaps->elts));
- if (nxt_slow_path(mmaps->elts == NULL)) {
+ e = realloc(mmaps->elts, cap * sizeof(nxt_unit_mmap_t));
+ if (nxt_slow_path(e == NULL)) {
return NULL;
}
- memset(mmaps->elts + mmaps->cap, 0,
- sizeof(*mmaps->elts) * (cap - mmaps->cap));
+ mmaps->elts = e;
+
+ for (n = mmaps->cap; n < cap; n++) {
+ e = mmaps->elts + n;
+
+ e->hdr = NULL;
+ nxt_queue_init(&e->awaiting_rbuf);
+ }
mmaps->cap = cap;
}
@@ -3108,27 +3572,100 @@ nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i)
static nxt_port_mmap_header_t *
-nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
- nxt_unit_port_id_t *port_id, int n)
+nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int n)
{
int i, fd, rc;
void *mem;
- char name[64];
nxt_unit_mmap_t *mm;
nxt_unit_impl_t *lib;
nxt_port_mmap_header_t *hdr;
- lib = process->lib;
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
- mm = nxt_unit_mmap_at(&process->outgoing, process->outgoing.size);
+ mm = nxt_unit_mmap_at(&lib->outgoing, lib->outgoing.size);
if (nxt_slow_path(mm == NULL)) {
- nxt_unit_warn(ctx, "failed to add mmap to outgoing array");
+ nxt_unit_alert(ctx, "failed to add mmap to outgoing array");
return NULL;
}
+ fd = nxt_unit_shm_open(ctx, PORT_MMAP_SIZE);
+ if (nxt_slow_path(fd == -1)) {
+ goto remove_fail;
+ }
+
+ mem = mmap(NULL, PORT_MMAP_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
+ if (nxt_slow_path(mem == MAP_FAILED)) {
+ nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", fd,
+ strerror(errno), errno);
+
+ nxt_unit_close(fd);
+
+ goto remove_fail;
+ }
+
+ mm->hdr = mem;
+ hdr = mem;
+
+ memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map));
+ memset(hdr->free_tracking_map, 0xFFU, sizeof(hdr->free_tracking_map));
+
+ hdr->id = lib->outgoing.size - 1;
+ hdr->src_pid = lib->pid;
+ hdr->dst_pid = port->id.pid;
+ hdr->sent_over = port->id.id;
+
+ /* Mark first n chunk(s) as busy */
+ for (i = 0; i < n; i++) {
+ nxt_port_mmap_set_chunk_busy(hdr->free_map, i);
+ }
+
+ /* Mark as busy chunk followed the last available chunk. */
+ nxt_port_mmap_set_chunk_busy(hdr->free_map, PORT_MMAP_CHUNK_COUNT);
+ nxt_port_mmap_set_chunk_busy(hdr->free_tracking_map, PORT_MMAP_CHUNK_COUNT);
+
+ pthread_mutex_unlock(&lib->outgoing.mutex);
+
+ rc = nxt_unit_send_mmap(ctx, port, fd);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ munmap(mem, PORT_MMAP_SIZE);
+ hdr = NULL;
+
+ } else {
+ nxt_unit_debug(ctx, "new mmap #%"PRIu32" created for %d -> %d",
+ hdr->id, (int) lib->pid, (int) port->id.pid);
+ }
+
+ nxt_unit_close(fd);
+
+ pthread_mutex_lock(&lib->outgoing.mutex);
+
+ if (nxt_fast_path(hdr != NULL)) {
+ return hdr;
+ }
+
+remove_fail:
+
+ lib->outgoing.size--;
+
+ return NULL;
+}
+
+
+static int
+nxt_unit_shm_open(nxt_unit_ctx_t *ctx, size_t size)
+{
+ int fd;
+ nxt_unit_impl_t *lib;
+
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+
+#if (NXT_HAVE_MEMFD_CREATE || NXT_HAVE_SHM_OPEN)
+ char name[64];
+
snprintf(name, sizeof(name), NXT_SHM_PREFIX "unit.%d.%p",
lib->pid, (void *) pthread_self());
+#endif
#if (NXT_HAVE_MEMFD_CREATE)
@@ -3137,7 +3674,7 @@ nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
nxt_unit_alert(ctx, "memfd_create(%s) failed: %s (%d)", name,
strerror(errno), errno);
- goto remove_fail;
+ return -1;
}
nxt_unit_debug(ctx, "memfd_create(%s): %d", name, fd);
@@ -3149,7 +3686,7 @@ nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
nxt_unit_alert(ctx, "shm_open(SHM_ANON) failed: %s (%d)",
strerror(errno), errno);
- goto remove_fail;
+ return -1;
}
#elif (NXT_HAVE_SHM_OPEN)
@@ -3162,12 +3699,12 @@ nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
nxt_unit_alert(ctx, "shm_open(%s) failed: %s (%d)", name,
strerror(errno), errno);
- goto remove_fail;
+ return -1;
}
if (nxt_slow_path(shm_unlink(name) == -1)) {
- nxt_unit_warn(ctx, "shm_unlink(%s) failed: %s (%d)", name,
- strerror(errno), errno);
+ nxt_unit_alert(ctx, "shm_unlink(%s) failed: %s (%d)", name,
+ strerror(errno), errno);
}
#else
@@ -3176,71 +3713,21 @@ nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
#endif
- if (nxt_slow_path(ftruncate(fd, PORT_MMAP_SIZE) == -1)) {
+ if (nxt_slow_path(ftruncate(fd, size) == -1)) {
nxt_unit_alert(ctx, "ftruncate(%d) failed: %s (%d)", fd,
strerror(errno), errno);
- goto remove_fail;
- }
-
- mem = mmap(NULL, PORT_MMAP_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
- if (nxt_slow_path(mem == MAP_FAILED)) {
- nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", fd,
- strerror(errno), errno);
-
- goto remove_fail;
- }
-
- mm->hdr = mem;
- hdr = mem;
-
- memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map));
- memset(hdr->free_tracking_map, 0xFFU, sizeof(hdr->free_tracking_map));
-
- hdr->id = process->outgoing.size - 1;
- hdr->src_pid = lib->pid;
- hdr->dst_pid = process->pid;
- hdr->sent_over = port_id->id;
+ nxt_unit_close(fd);
- /* Mark first n chunk(s) as busy */
- for (i = 0; i < n; i++) {
- nxt_port_mmap_set_chunk_busy(hdr->free_map, i);
- }
-
- /* Mark as busy chunk followed the last available chunk. */
- nxt_port_mmap_set_chunk_busy(hdr->free_map, PORT_MMAP_CHUNK_COUNT);
- nxt_port_mmap_set_chunk_busy(hdr->free_tracking_map, PORT_MMAP_CHUNK_COUNT);
-
- pthread_mutex_unlock(&process->outgoing.mutex);
-
- rc = nxt_unit_send_mmap(ctx, port_id, fd);
- if (nxt_slow_path(rc != NXT_UNIT_OK)) {
- munmap(mem, PORT_MMAP_SIZE);
- hdr = NULL;
-
- } else {
- nxt_unit_debug(ctx, "new mmap #%"PRIu32" created for %d -> %d",
- hdr->id, (int) lib->pid, (int) process->pid);
- }
-
- close(fd);
-
- pthread_mutex_lock(&process->outgoing.mutex);
-
- if (nxt_fast_path(hdr != NULL)) {
- return hdr;
+ return -1;
}
-remove_fail:
-
- process->outgoing.size--;
-
- return NULL;
+ return fd;
}
static int
-nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int fd)
+nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int fd)
{
ssize_t res;
nxt_port_msg_t msg;
@@ -3284,8 +3771,8 @@ nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int fd)
*/
memcpy(CMSG_DATA(&cmsg.cm), &fd, sizeof(int));
- res = lib->callbacks.port_send(ctx, port_id, &msg, sizeof(msg),
- &cmsg, sizeof(cmsg));
+ res = nxt_unit_port_send(ctx, port, &msg, sizeof(msg),
+ &cmsg, sizeof(cmsg));
if (nxt_slow_path(res != sizeof(msg))) {
return NXT_UNIT_ERROR;
}
@@ -3295,8 +3782,8 @@ nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int fd)
static int
-nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
- nxt_unit_port_id_t *port_id, uint32_t size, uint32_t min_size,
+nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
+ uint32_t size, uint32_t min_size,
nxt_unit_mmap_buf_t *mmap_buf, char *local_buf)
{
int nchunks, min_nchunks;
@@ -3321,8 +3808,6 @@ nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
mmap_buf->buf.start = mmap_buf->plain_ptr + sizeof(nxt_port_msg_t);
mmap_buf->buf.free = mmap_buf->buf.start;
mmap_buf->buf.end = mmap_buf->buf.start + size;
- mmap_buf->port_id = *port_id;
- mmap_buf->process = process;
nxt_unit_debug(ctx, "outgoing plain buffer allocation: (%p, %d)",
mmap_buf->buf.start, (int) size);
@@ -3333,7 +3818,7 @@ nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
nchunks = (size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE;
min_nchunks = (min_size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE;
- hdr = nxt_unit_mmap_get(ctx, process, port_id, &c, &nchunks, min_nchunks);
+ hdr = nxt_unit_mmap_get(ctx, port, &c, &nchunks, min_nchunks);
if (nxt_slow_path(hdr == NULL)) {
if (nxt_fast_path(min_nchunks == 0 && nchunks == 0)) {
mmap_buf->hdr = NULL;
@@ -3352,8 +3837,6 @@ nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
mmap_buf->buf.start = (char *) nxt_port_mmap_chunk_start(hdr, c);
mmap_buf->buf.free = mmap_buf->buf.start;
mmap_buf->buf.end = mmap_buf->buf.start + nchunks * PORT_MMAP_CHUNK_SIZE;
- mmap_buf->port_id = *port_id;
- mmap_buf->process = process;
mmap_buf->free_ptr = NULL;
mmap_buf->ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
@@ -3368,83 +3851,87 @@ nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
static int
nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd)
{
- int rc;
- void *mem;
- struct stat mmap_stat;
- nxt_unit_mmap_t *mm;
- nxt_unit_impl_t *lib;
- nxt_unit_process_t *process;
- nxt_port_mmap_header_t *hdr;
+ int rc;
+ void *mem;
+ nxt_queue_t awaiting_rbuf;
+ struct stat mmap_stat;
+ nxt_unit_mmap_t *mm;
+ nxt_unit_impl_t *lib;
+ nxt_unit_ctx_impl_t *ctx_impl;
+ nxt_unit_read_buf_t *rbuf;
+ nxt_port_mmap_header_t *hdr;
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
nxt_unit_debug(ctx, "incoming_mmap: fd %d from process %d", fd, (int) pid);
- pthread_mutex_lock(&lib->mutex);
-
- process = nxt_unit_process_find(ctx, pid, 0);
-
- pthread_mutex_unlock(&lib->mutex);
-
- if (nxt_slow_path(process == NULL)) {
- nxt_unit_warn(ctx, "incoming_mmap: process %d not found, fd %d",
- (int) pid, fd);
-
- return NXT_UNIT_ERROR;
- }
-
- rc = NXT_UNIT_ERROR;
-
if (fstat(fd, &mmap_stat) == -1) {
- nxt_unit_warn(ctx, "incoming_mmap: fstat(%d) failed: %s (%d)", fd,
- strerror(errno), errno);
+ nxt_unit_alert(ctx, "incoming_mmap: fstat(%d) failed: %s (%d)", fd,
+ strerror(errno), errno);
- goto fail;
+ return NXT_UNIT_ERROR;
}
mem = mmap(NULL, mmap_stat.st_size, PROT_READ | PROT_WRITE,
MAP_SHARED, fd, 0);
if (nxt_slow_path(mem == MAP_FAILED)) {
- nxt_unit_warn(ctx, "incoming_mmap: mmap() failed: %s (%d)",
- strerror(errno), errno);
+ nxt_unit_alert(ctx, "incoming_mmap: mmap() failed: %s (%d)",
+ strerror(errno), errno);
- goto fail;
+ return NXT_UNIT_ERROR;
}
hdr = mem;
- if (nxt_slow_path(hdr->src_pid != pid || hdr->dst_pid != lib->pid)) {
+ if (nxt_slow_path(hdr->src_pid != pid)) {
- nxt_unit_warn(ctx, "incoming_mmap: unexpected pid in mmap header "
- "detected: %d != %d or %d != %d", (int) hdr->src_pid,
- (int) pid, (int) hdr->dst_pid, (int) lib->pid);
+ nxt_unit_alert(ctx, "incoming_mmap: unexpected pid in mmap header "
+ "detected: %d != %d or %d != %d", (int) hdr->src_pid,
+ (int) pid, (int) hdr->dst_pid, (int) lib->pid);
munmap(mem, PORT_MMAP_SIZE);
- goto fail;
+ return NXT_UNIT_ERROR;
}
- pthread_mutex_lock(&process->incoming.mutex);
+ nxt_queue_init(&awaiting_rbuf);
+
+ pthread_mutex_lock(&lib->incoming.mutex);
- mm = nxt_unit_mmap_at(&process->incoming, hdr->id);
+ mm = nxt_unit_mmap_at(&lib->incoming, hdr->id);
if (nxt_slow_path(mm == NULL)) {
- nxt_unit_warn(ctx, "incoming_mmap: failed to add to incoming array");
+ nxt_unit_alert(ctx, "incoming_mmap: failed to add to incoming array");
munmap(mem, PORT_MMAP_SIZE);
+ rc = NXT_UNIT_ERROR;
+
} else {
mm->hdr = hdr;
hdr->sent_over = 0xFFFFu;
+ nxt_queue_add(&awaiting_rbuf, &mm->awaiting_rbuf);
+ nxt_queue_init(&mm->awaiting_rbuf);
+
rc = NXT_UNIT_OK;
}
- pthread_mutex_unlock(&process->incoming.mutex);
+ pthread_mutex_unlock(&lib->incoming.mutex);
-fail:
+ nxt_queue_each(rbuf, &awaiting_rbuf, nxt_unit_read_buf_t, link) {
- nxt_unit_process_use(ctx, process, -1);
+ ctx_impl = rbuf->ctx_impl;
+
+ pthread_mutex_lock(&ctx_impl->mutex);
+
+ nxt_queue_insert_head(&ctx_impl->pending_rbuf, &rbuf->link);
+
+ pthread_mutex_unlock(&ctx_impl->mutex);
+
+ nxt_atomic_fetch_add(&ctx_impl->wait_items, -1);
+
+ } nxt_queue_loop;
return rc;
}
@@ -3462,18 +3949,22 @@ nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps)
}
-static void
-nxt_unit_process_use(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, int i)
+nxt_inline void
+nxt_unit_process_use(nxt_unit_process_t *process)
{
- long c;
+ nxt_atomic_fetch_add(&process->use_count, 1);
+}
+
- c = nxt_atomic_fetch_add(&process->use_count, i);
+nxt_inline void
+nxt_unit_process_release(nxt_unit_process_t *process)
+{
+ long c;
- if (i < 0 && c == -i) {
- nxt_unit_debug(ctx, "destroy process #%d", (int) process->pid);
+ c = nxt_atomic_fetch_add(&process->use_count, -1);
- nxt_unit_mmaps_destroy(&process->incoming);
- nxt_unit_mmaps_destroy(&process->outgoing);
+ if (c == 1) {
+ nxt_unit_debug(NULL, "destroy process #%d", (int) process->pid);
free(process);
}
@@ -3499,85 +3990,62 @@ nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps)
}
-static nxt_port_mmap_header_t *
-nxt_unit_get_incoming_mmap(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
- uint32_t id)
+static int
+nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t *ctx, nxt_unit_mmaps_t *mmaps,
+ pid_t pid, uint32_t id, nxt_port_mmap_header_t **hdr,
+ nxt_unit_read_buf_t *rbuf)
{
- nxt_port_mmap_header_t *hdr;
-
- if (nxt_fast_path(process->incoming.size > id)) {
- hdr = process->incoming.elts[id].hdr;
-
- } else {
- hdr = NULL;
- }
+ int res, need_rbuf;
+ nxt_unit_mmap_t *mm;
+ nxt_unit_ctx_impl_t *ctx_impl;
- return hdr;
-}
+ mm = nxt_unit_mmap_at(mmaps, id);
+ if (nxt_slow_path(mm == NULL)) {
+ nxt_unit_alert(ctx, "failed to allocate mmap");
+ pthread_mutex_unlock(&mmaps->mutex);
-static int
-nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
-{
- int rc;
- nxt_chunk_id_t c;
- nxt_unit_process_t *process;
- nxt_port_mmap_header_t *hdr;
- nxt_port_mmap_tracking_msg_t *tracking_msg;
+ *hdr = NULL;
- if (recv_msg->size < (int) sizeof(nxt_port_mmap_tracking_msg_t)) {
- nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: too small message (%d)",
- recv_msg->stream, (int) recv_msg->size);
-
- return 0;
+ return NXT_UNIT_ERROR;
}
- tracking_msg = recv_msg->start;
+ *hdr = mm->hdr;
- recv_msg->start = tracking_msg + 1;
- recv_msg->size -= sizeof(nxt_port_mmap_tracking_msg_t);
-
- process = nxt_unit_msg_get_process(ctx, recv_msg);
- if (nxt_slow_path(process == NULL)) {
- return 0;
+ if (nxt_fast_path(*hdr != NULL)) {
+ return NXT_UNIT_OK;
}
- pthread_mutex_lock(&process->incoming.mutex);
-
- hdr = nxt_unit_get_incoming_mmap(ctx, process, tracking_msg->mmap_id);
- if (nxt_slow_path(hdr == NULL)) {
- pthread_mutex_unlock(&process->incoming.mutex);
+ need_rbuf = nxt_queue_is_empty(&mm->awaiting_rbuf);
- nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: "
- "invalid mmap id %d,%"PRIu32,
- recv_msg->stream, (int) process->pid,
- tracking_msg->mmap_id);
+ nxt_queue_insert_tail(&mm->awaiting_rbuf, &rbuf->link);
- return 0;
- }
+ pthread_mutex_unlock(&mmaps->mutex);
- c = tracking_msg->tracking_id;
- rc = nxt_atomic_cmp_set(hdr->tracking + c, recv_msg->stream, 0);
+ ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
- if (rc == 0) {
- nxt_unit_debug(ctx, "#%"PRIu32": tracking cancelled",
- recv_msg->stream);
+ nxt_atomic_fetch_add(&ctx_impl->wait_items, 1);
- nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c);
+ if (need_rbuf) {
+ res = nxt_unit_get_mmap(ctx, pid, id);
+ if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
+ return NXT_UNIT_ERROR;
+ }
}
- pthread_mutex_unlock(&process->incoming.mutex);
-
- return rc;
+ return NXT_UNIT_AGAIN;
}
static int
-nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
+nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg,
+ nxt_unit_read_buf_t *rbuf)
{
+ int res;
void *start;
uint32_t size;
- nxt_unit_process_t *process;
+ nxt_unit_impl_t *lib;
+ nxt_unit_mmaps_t *mmaps;
nxt_unit_mmap_buf_t *b, **incoming_tail;
nxt_port_mmap_msg_t *mmap_msg, *end;
nxt_port_mmap_header_t *hdr;
@@ -3589,22 +4057,22 @@ nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
return NXT_UNIT_ERROR;
}
- process = nxt_unit_msg_get_process(ctx, recv_msg);
- if (nxt_slow_path(process == NULL)) {
- return NXT_UNIT_ERROR;
- }
-
mmap_msg = recv_msg->start;
end = nxt_pointer_to(recv_msg->start, recv_msg->size);
incoming_tail = &recv_msg->incoming_buf;
+ /* Allocating buffer structures. */
for (; mmap_msg < end; mmap_msg++) {
b = nxt_unit_mmap_buf_get(ctx);
if (nxt_slow_path(b == NULL)) {
nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: failed to allocate buf",
recv_msg->stream);
+ while (recv_msg->incoming_buf != NULL) {
+ nxt_unit_mmap_buf_release(recv_msg->incoming_buf);
+ }
+
return NXT_UNIT_ERROR;
}
@@ -3615,19 +4083,23 @@ nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
b = recv_msg->incoming_buf;
mmap_msg = recv_msg->start;
- pthread_mutex_lock(&process->incoming.mutex);
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+
+ mmaps = &lib->incoming;
+
+ pthread_mutex_lock(&mmaps->mutex);
for (; mmap_msg < end; mmap_msg++) {
- hdr = nxt_unit_get_incoming_mmap(ctx, process, mmap_msg->mmap_id);
- if (nxt_slow_path(hdr == NULL)) {
- pthread_mutex_unlock(&process->incoming.mutex);
+ res = nxt_unit_check_rbuf_mmap(ctx, mmaps,
+ recv_msg->pid, mmap_msg->mmap_id,
+ &hdr, rbuf);
- nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: "
- "invalid mmap id %d,%"PRIu32,
- recv_msg->stream, (int) process->pid,
- mmap_msg->mmap_id);
+ if (nxt_slow_path(res != NXT_UNIT_OK)) {
+ while (recv_msg->incoming_buf != NULL) {
+ nxt_unit_mmap_buf_release(recv_msg->incoming_buf);
+ }
- return NXT_UNIT_ERROR;
+ return res;
}
start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id);
@@ -3642,7 +4114,6 @@ nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
b->buf.free = start;
b->buf.end = b->buf.start + size;
b->hdr = hdr;
- b->process = process;
b = b->next;
@@ -3654,15 +4125,48 @@ nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
(int) mmap_msg->size);
}
- pthread_mutex_unlock(&process->incoming.mutex);
+ pthread_mutex_unlock(&mmaps->mutex);
+
+ return NXT_UNIT_OK;
+}
+
+
+static int
+nxt_unit_get_mmap(nxt_unit_ctx_t *ctx, pid_t pid, uint32_t id)
+{
+ ssize_t res;
+ nxt_unit_impl_t *lib;
+ nxt_unit_ctx_impl_t *ctx_impl;
+
+ struct {
+ nxt_port_msg_t msg;
+ nxt_port_msg_get_mmap_t get_mmap;
+ } m;
+
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+ ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
+
+ memset(&m.msg, 0, sizeof(nxt_port_msg_t));
+
+ m.msg.pid = lib->pid;
+ m.msg.reply_port = ctx_impl->read_port->id.id;
+ m.msg.type = _NXT_PORT_MSG_GET_MMAP;
+
+ m.get_mmap.id = id;
+
+ nxt_unit_debug(ctx, "get_mmap: %d %d", (int) pid, (int) id);
+
+ res = nxt_unit_port_send(ctx, lib->router_port, &m, sizeof(m), NULL, 0);
+ if (nxt_slow_path(res != sizeof(m))) {
+ return NXT_UNIT_ERROR;
+ }
return NXT_UNIT_OK;
}
static void
-nxt_unit_mmap_release(nxt_unit_ctx_t *ctx,
- nxt_unit_process_t *process, nxt_port_mmap_header_t *hdr,
+nxt_unit_mmap_release(nxt_unit_ctx_t *ctx, nxt_port_mmap_header_t *hdr,
void *start, uint32_t size)
{
int freed_chunks;
@@ -3688,12 +4192,10 @@ nxt_unit_mmap_release(nxt_unit_ctx_t *ctx,
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
if (hdr->src_pid == lib->pid && freed_chunks != 0) {
- nxt_atomic_fetch_add(&process->outgoing.allocated_chunks,
- -freed_chunks);
+ nxt_atomic_fetch_add(&lib->outgoing.allocated_chunks, -freed_chunks);
- nxt_unit_debug(ctx, "process %d allocated_chunks %d",
- process->pid,
- (int) process->outgoing.allocated_chunks);
+ nxt_unit_debug(ctx, "allocated_chunks %d",
+ (int) lib->outgoing.allocated_chunks);
}
if (hdr->dst_pid == lib->pid
@@ -3708,15 +4210,12 @@ nxt_unit_mmap_release(nxt_unit_ctx_t *ctx,
static int
nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid)
{
- ssize_t res;
- nxt_port_msg_t msg;
- nxt_unit_impl_t *lib;
- nxt_unit_port_id_t port_id;
+ ssize_t res;
+ nxt_port_msg_t msg;
+ nxt_unit_impl_t *lib;
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
- nxt_unit_port_id_init(&port_id, pid, 0);
-
msg.stream = 0;
msg.pid = lib->pid;
msg.reply_port = 0;
@@ -3727,7 +4226,7 @@ nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid)
msg.mf = 0;
msg.tracking = 0;
- res = lib->callbacks.port_send(ctx, &port_id, &msg, sizeof(msg), NULL, 0);
+ res = nxt_unit_port_send(ctx, lib->router_port, &msg, sizeof(msg), NULL, 0);
if (nxt_slow_path(res != sizeof(msg))) {
return NXT_UNIT_ERROR;
}
@@ -3772,40 +4271,34 @@ nxt_unit_process_lhq_pid(nxt_lvlhsh_query_t *lhq, pid_t *pid)
static nxt_unit_process_t *
-nxt_unit_process_get(nxt_unit_ctx_t *ctx, pid_t pid)
+nxt_unit_process_get(nxt_unit_impl_t *lib, pid_t pid)
{
- nxt_unit_impl_t *lib;
nxt_unit_process_t *process;
nxt_lvlhsh_query_t lhq;
- lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
-
nxt_unit_process_lhq_pid(&lhq, &pid);
if (nxt_lvlhsh_find(&lib->processes, &lhq) == NXT_OK) {
process = lhq.value;
- nxt_unit_process_use(ctx, process, 1);
+ nxt_unit_process_use(process);
return process;
}
process = malloc(sizeof(nxt_unit_process_t));
if (nxt_slow_path(process == NULL)) {
- nxt_unit_warn(ctx, "failed to allocate process for #%d", (int) pid);
+ nxt_unit_alert(NULL, "failed to allocate process for #%d", (int) pid);
return NULL;
}
process->pid = pid;
- process->use_count = 1;
+ process->use_count = 2;
process->next_port_id = 0;
process->lib = lib;
nxt_queue_init(&process->ports);
- nxt_unit_mmaps_init(&process->incoming);
- nxt_unit_mmaps_init(&process->outgoing);
-
lhq.replace = 0;
lhq.value = process;
@@ -3815,31 +4308,23 @@ nxt_unit_process_get(nxt_unit_ctx_t *ctx, pid_t pid)
break;
default:
- nxt_unit_warn(ctx, "process %d insert failed", (int) pid);
+ nxt_unit_alert(NULL, "process %d insert failed", (int) pid);
- pthread_mutex_destroy(&process->outgoing.mutex);
- pthread_mutex_destroy(&process->incoming.mutex);
free(process);
process = NULL;
break;
}
- nxt_unit_process_use(ctx, process, 1);
-
return process;
}
static nxt_unit_process_t *
-nxt_unit_process_find(nxt_unit_ctx_t *ctx, pid_t pid, int remove)
+nxt_unit_process_find(nxt_unit_impl_t *lib, pid_t pid, int remove)
{
int rc;
- nxt_unit_impl_t *lib;
- nxt_unit_process_t *process;
nxt_lvlhsh_query_t lhq;
- lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
-
nxt_unit_process_lhq_pid(&lhq, &pid);
if (remove) {
@@ -3850,13 +4335,11 @@ nxt_unit_process_find(nxt_unit_ctx_t *ctx, pid_t pid, int remove)
}
if (rc == NXT_OK) {
- process = lhq.value;
-
if (!remove) {
- nxt_unit_process_use(ctx, process, 1);
+ nxt_unit_process_use(lhq.value);
}
- return process;
+ return lhq.value;
}
return NULL;
@@ -3876,17 +4359,21 @@ nxt_unit_run(nxt_unit_ctx_t *ctx)
int rc;
nxt_unit_impl_t *lib;
+ nxt_unit_ctx_use(ctx);
+
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
rc = NXT_UNIT_OK;
while (nxt_fast_path(lib->online)) {
- rc = nxt_unit_run_once(ctx);
+ rc = nxt_unit_run_once_impl(ctx);
- if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
break;
}
}
+ nxt_unit_ctx_release(ctx);
+
return rc;
}
@@ -3894,174 +4381,578 @@ nxt_unit_run(nxt_unit_ctx_t *ctx)
int
nxt_unit_run_once(nxt_unit_ctx_t *ctx)
{
+ int rc;
+
+ nxt_unit_ctx_use(ctx);
+
+ rc = nxt_unit_run_once_impl(ctx);
+
+ nxt_unit_ctx_release(ctx);
+
+ return rc;
+}
+
+
+static int
+nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx)
+{
int rc;
- nxt_unit_ctx_impl_t *ctx_impl;
nxt_unit_read_buf_t *rbuf;
+ rbuf = nxt_unit_read_buf_get(ctx);
+ if (nxt_slow_path(rbuf == NULL)) {
+ return NXT_UNIT_ERROR;
+ }
+
+ rc = nxt_unit_read_buf(ctx, rbuf);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ nxt_unit_read_buf_release(ctx, rbuf);
+
+ return rc;
+ }
+
+ rc = nxt_unit_process_msg(ctx, rbuf);
+ if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
+ return NXT_UNIT_ERROR;
+ }
+
+ rc = nxt_unit_process_pending_rbuf(ctx);
+ if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
+ return NXT_UNIT_ERROR;
+ }
+
+ nxt_unit_process_ready_req(ctx);
+
+ return rc;
+}
+
+
+static int
+nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
+{
+ int nevents, res, err;
+ nxt_unit_impl_t *lib;
+ nxt_unit_ctx_impl_t *ctx_impl;
+ nxt_unit_port_impl_t *port_impl;
+ struct pollfd fds[2];
+
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
- pthread_mutex_lock(&ctx_impl->mutex);
+ if (ctx_impl->wait_items > 0 || lib->shared_port == NULL) {
- if (ctx_impl->pending_read_head != NULL) {
- rbuf = ctx_impl->pending_read_head;
- ctx_impl->pending_read_head = rbuf->next;
+ return nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf);
+ }
+
+ port_impl = nxt_container_of(ctx_impl->read_port, nxt_unit_port_impl_t,
+ port);
- if (ctx_impl->pending_read_tail == &rbuf->next) {
- ctx_impl->pending_read_tail = &ctx_impl->pending_read_head;
+retry:
+
+ if (port_impl->from_socket == 0) {
+ res = nxt_unit_port_queue_recv(ctx_impl->read_port, rbuf);
+ if (res == NXT_UNIT_OK) {
+ if (nxt_unit_is_read_socket(rbuf)) {
+ port_impl->from_socket++;
+
+ nxt_unit_debug(ctx, "port{%d,%d} dequeue 1 read_socket %d",
+ (int) ctx_impl->read_port->id.pid,
+ (int) ctx_impl->read_port->id.id,
+ port_impl->from_socket);
+
+ } else {
+ nxt_unit_debug(ctx, "port{%d,%d} dequeue %d",
+ (int) ctx_impl->read_port->id.pid,
+ (int) ctx_impl->read_port->id.id,
+ (int) rbuf->size);
+
+ return NXT_UNIT_OK;
+ }
}
+ }
- pthread_mutex_unlock(&ctx_impl->mutex);
+ res = nxt_unit_app_queue_recv(lib->shared_port, rbuf);
+ if (res == NXT_UNIT_OK) {
+ return NXT_UNIT_OK;
+ }
- } else {
- rbuf = nxt_unit_read_buf_get_impl(ctx_impl);
- if (nxt_slow_path(rbuf == NULL)) {
- return NXT_UNIT_ERROR;
+ fds[0].fd = ctx_impl->read_port->in_fd;
+ fds[0].events = POLLIN;
+ fds[0].revents = 0;
+
+ fds[1].fd = lib->shared_port->in_fd;
+ fds[1].events = POLLIN;
+ fds[1].revents = 0;
+
+ nevents = poll(fds, 2, -1);
+ if (nxt_slow_path(nevents == -1)) {
+ err = errno;
+
+ if (err == EINTR) {
+ goto retry;
}
- nxt_unit_read_buf(ctx, rbuf);
+ nxt_unit_alert(ctx, "poll(%d,%d) failed: %s (%d)",
+ fds[0].fd, fds[1].fd, strerror(err), err);
+
+ rbuf->size = -1;
+
+ return (err == EAGAIN) ? NXT_UNIT_AGAIN : NXT_UNIT_ERROR;
}
- if (nxt_fast_path(rbuf->size > 0)) {
- rc = nxt_unit_process_msg(ctx, &ctx_impl->read_port_id,
- rbuf->buf, rbuf->size,
- rbuf->oob, sizeof(rbuf->oob));
+ nxt_unit_debug(ctx, "poll(%d,%d): %d, revents [%04uXi, %04uXi]",
+ fds[0].fd, fds[1].fd, nevents, fds[0].revents,
+ fds[1].revents);
-#if (NXT_DEBUG)
- memset(rbuf->buf, 0xAC, rbuf->size);
-#endif
+ if ((fds[0].revents & POLLIN) != 0) {
+ res = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf);
+ if (res == NXT_UNIT_AGAIN) {
+ goto retry;
+ }
- } else {
- rc = NXT_UNIT_ERROR;
+ return res;
+ }
+
+ if ((fds[1].revents & POLLIN) != 0) {
+ res = nxt_unit_shared_port_recv(ctx, lib->shared_port, rbuf);
+ if (res == NXT_UNIT_AGAIN) {
+ goto retry;
+ }
+
+ return res;
}
- nxt_unit_read_buf_release(ctx, rbuf);
+ nxt_unit_alert(ctx, "poll(%d,%d): %d unexpected revents [%04uXi, %04uXi]",
+ fds[0].fd, fds[1].fd, nevents, fds[0].revents,
+ fds[1].revents);
+
+ return NXT_UNIT_ERROR;
+}
+
+
+static int
+nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx)
+{
+ int rc;
+ nxt_queue_t pending_rbuf;
+ nxt_unit_ctx_impl_t *ctx_impl;
+ nxt_unit_read_buf_t *rbuf;
+
+ nxt_queue_init(&pending_rbuf);
+
+ ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
+
+ pthread_mutex_lock(&ctx_impl->mutex);
+
+ if (nxt_queue_is_empty(&ctx_impl->pending_rbuf)) {
+ pthread_mutex_unlock(&ctx_impl->mutex);
+
+ return NXT_UNIT_OK;
+ }
+
+ nxt_queue_add(&pending_rbuf, &ctx_impl->pending_rbuf);
+ nxt_queue_init(&ctx_impl->pending_rbuf);
+
+ pthread_mutex_unlock(&ctx_impl->mutex);
+
+ rc = NXT_UNIT_OK;
+
+ nxt_queue_each(rbuf, &pending_rbuf, nxt_unit_read_buf_t, link) {
+
+ if (nxt_fast_path(rc != NXT_UNIT_ERROR)) {
+ rc = nxt_unit_process_msg(&ctx_impl->ctx, rbuf);
+
+ } else {
+ nxt_unit_read_buf_release(ctx, rbuf);
+ }
+
+ } nxt_queue_loop;
return rc;
}
static void
-nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
+nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx)
+{
+ int res;
+ nxt_queue_t ready_req;
+ nxt_unit_impl_t *lib;
+ nxt_unit_ctx_impl_t *ctx_impl;
+ nxt_unit_request_info_t *req;
+ nxt_unit_request_info_impl_t *req_impl;
+
+ nxt_queue_init(&ready_req);
+
+ ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
+
+ pthread_mutex_lock(&ctx_impl->mutex);
+
+ if (nxt_queue_is_empty(&ctx_impl->ready_req)) {
+ pthread_mutex_unlock(&ctx_impl->mutex);
+
+ return;
+ }
+
+ nxt_queue_add(&ready_req, &ctx_impl->ready_req);
+ nxt_queue_init(&ctx_impl->ready_req);
+
+ pthread_mutex_unlock(&ctx_impl->mutex);
+
+ nxt_queue_each(req_impl, &ready_req,
+ nxt_unit_request_info_impl_t, port_wait_link)
+ {
+ lib = nxt_container_of(ctx_impl->ctx.unit, nxt_unit_impl_t, unit);
+
+ req = &req_impl->req;
+
+ res = nxt_unit_send_req_headers_ack(req);
+ if (nxt_slow_path(res != NXT_UNIT_OK)) {
+ nxt_unit_request_done(req, NXT_UNIT_ERROR);
+
+ continue;
+ }
+
+ if (req->content_length
+ > (uint64_t) (req->content_buf->end - req->content_buf->free))
+ {
+ res = nxt_unit_request_hash_add(ctx, req);
+ if (nxt_slow_path(res != NXT_UNIT_OK)) {
+ nxt_unit_req_warn(req, "failed to add request to hash");
+
+ nxt_unit_request_done(req, NXT_UNIT_ERROR);
+
+ continue;
+ }
+
+ /*
+ * If application have separate data handler, we may start
+ * request processing and process data when it is arrived.
+ */
+ if (lib->callbacks.data_handler == NULL) {
+ continue;
+ }
+ }
+
+ lib->callbacks.request_handler(&req_impl->req);
+
+ } nxt_queue_loop;
+}
+
+
+int
+nxt_unit_run_ctx(nxt_unit_ctx_t *ctx)
{
+ int rc;
nxt_unit_impl_t *lib;
+ nxt_unit_read_buf_t *rbuf;
nxt_unit_ctx_impl_t *ctx_impl;
+ nxt_unit_ctx_use(ctx);
+
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
- memset(rbuf->oob, 0, sizeof(struct cmsghdr));
+ rc = NXT_UNIT_OK;
- if (ctx_impl->read_port_fd != -1) {
- rbuf->size = nxt_unit_port_recv(ctx, ctx_impl->read_port_fd,
- rbuf->buf, sizeof(rbuf->buf),
- rbuf->oob, sizeof(rbuf->oob));
+ while (nxt_fast_path(lib->online)) {
+ rbuf = nxt_unit_read_buf_get(ctx);
+ if (nxt_slow_path(rbuf == NULL)) {
+ rc = NXT_UNIT_ERROR;
+ break;
+ }
- } else {
- lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+ retry:
- rbuf->size = lib->callbacks.port_recv(ctx, &ctx_impl->read_port_id,
- rbuf->buf, sizeof(rbuf->buf),
- rbuf->oob, sizeof(rbuf->oob));
+ rc = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf);
+ if (rc == NXT_UNIT_AGAIN) {
+ goto retry;
+ }
+
+ rc = nxt_unit_process_msg(ctx, rbuf);
+ if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
+ break;
+ }
+
+ rc = nxt_unit_process_pending_rbuf(ctx);
+ if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
+ break;
+ }
+
+ nxt_unit_process_ready_req(ctx);
}
+
+ nxt_unit_ctx_release(ctx);
+
+ return rc;
}
-void
-nxt_unit_done(nxt_unit_ctx_t *ctx)
+nxt_inline int
+nxt_unit_is_read_queue(nxt_unit_read_buf_t *rbuf)
{
+ nxt_port_msg_t *port_msg;
+
+ if (nxt_fast_path(rbuf->size == (ssize_t) sizeof(nxt_port_msg_t))) {
+ port_msg = (nxt_port_msg_t *) rbuf->buf;
+
+ return port_msg->type == _NXT_PORT_MSG_READ_QUEUE;
+ }
+
+ return 0;
+}
+
+
+nxt_inline int
+nxt_unit_is_read_socket(nxt_unit_read_buf_t *rbuf)
+{
+ if (nxt_fast_path(rbuf->size == 1)) {
+ return rbuf->buf[0] == _NXT_PORT_MSG_READ_SOCKET;
+ }
+
+ return 0;
+}
+
+
+nxt_inline int
+nxt_unit_is_shm_ack(nxt_unit_read_buf_t *rbuf)
+{
+ nxt_port_msg_t *port_msg;
+
+ if (nxt_fast_path(rbuf->size == (ssize_t) sizeof(nxt_port_msg_t))) {
+ port_msg = (nxt_port_msg_t *) rbuf->buf;
+
+ return port_msg->type == _NXT_PORT_MSG_SHM_ACK;
+ }
+
+ return 0;
+}
+
+
+nxt_inline int
+nxt_unit_is_quit(nxt_unit_read_buf_t *rbuf)
+{
+ nxt_port_msg_t *port_msg;
+
+ if (nxt_fast_path(rbuf->size == (ssize_t) sizeof(nxt_port_msg_t))) {
+ port_msg = (nxt_port_msg_t *) rbuf->buf;
+
+ return port_msg->type == _NXT_PORT_MSG_QUIT;
+ }
+
+ return 0;
+}
+
+
+int
+nxt_unit_run_shared(nxt_unit_ctx_t *ctx)
+{
+ int rc;
nxt_unit_impl_t *lib;
- nxt_unit_process_t *process;
- nxt_unit_ctx_impl_t *ctx_impl;
+ nxt_unit_read_buf_t *rbuf;
+
+ nxt_unit_ctx_use(ctx);
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+ rc = NXT_UNIT_OK;
- nxt_queue_each(ctx_impl, &lib->contexts, nxt_unit_ctx_impl_t, link) {
+ while (nxt_fast_path(lib->online)) {
+ rbuf = nxt_unit_read_buf_get(ctx);
+ if (nxt_slow_path(rbuf == NULL)) {
+ rc = NXT_UNIT_ERROR;
+ break;
+ }
- nxt_unit_ctx_free(&ctx_impl->ctx);
+ retry:
- } nxt_queue_loop;
+ rc = nxt_unit_shared_port_recv(ctx, lib->shared_port, rbuf);
+ if (rc == NXT_UNIT_AGAIN) {
+ goto retry;
+ }
- for ( ;; ) {
- pthread_mutex_lock(&lib->mutex);
+ if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
+ nxt_unit_read_buf_release(ctx, rbuf);
+ break;
+ }
- process = nxt_unit_process_pop_first(lib);
- if (process == NULL) {
- pthread_mutex_unlock(&lib->mutex);
+ rc = nxt_unit_process_msg(ctx, rbuf);
+ if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
+ break;
+ }
+ rc = nxt_unit_process_pending_rbuf(ctx);
+ if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
break;
}
- nxt_unit_remove_process(ctx, process);
+ nxt_unit_process_ready_req(ctx);
}
- pthread_mutex_destroy(&lib->mutex);
+ nxt_unit_ctx_release(ctx);
- free(lib);
+ return rc;
+}
+
+
+int
+nxt_unit_process_port_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
+{
+ int rc;
+
+ nxt_unit_ctx_use(ctx);
+
+ rc = nxt_unit_process_port_msg_impl(ctx, port);
+
+ nxt_unit_ctx_release(ctx);
+
+ return rc;
+}
+
+
+static int
+nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
+{
+ int rc;
+ nxt_unit_impl_t *lib;
+ nxt_unit_read_buf_t *rbuf;
+
+ rbuf = nxt_unit_read_buf_get(ctx);
+ if (nxt_slow_path(rbuf == NULL)) {
+ return NXT_UNIT_ERROR;
+ }
+
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+
+retry:
+
+ if (port == lib->shared_port) {
+ rc = nxt_unit_shared_port_recv(ctx, port, rbuf);
+
+ } else {
+ rc = nxt_unit_ctx_port_recv(ctx, port, rbuf);
+ }
+
+ if (rc != NXT_UNIT_OK) {
+ nxt_unit_read_buf_release(ctx, rbuf);
+ return rc;
+ }
+
+ rc = nxt_unit_process_msg(ctx, rbuf);
+ if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
+ return NXT_UNIT_ERROR;
+ }
+
+ rc = nxt_unit_process_pending_rbuf(ctx);
+ if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
+ return NXT_UNIT_ERROR;
+ }
+
+ nxt_unit_process_ready_req(ctx);
+
+ rbuf = nxt_unit_read_buf_get(ctx);
+ if (nxt_slow_path(rbuf == NULL)) {
+ return NXT_UNIT_ERROR;
+ }
+
+ if (lib->online) {
+ goto retry;
+ }
+
+ return rc;
+}
+
+
+void
+nxt_unit_done(nxt_unit_ctx_t *ctx)
+{
+ nxt_unit_ctx_release(ctx);
}
nxt_unit_ctx_t *
nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data)
{
- int rc, fd;
- nxt_unit_impl_t *lib;
- nxt_unit_port_id_t new_port_id;
- nxt_unit_ctx_impl_t *new_ctx;
+ int rc, queue_fd;
+ void *mem;
+ nxt_unit_impl_t *lib;
+ nxt_unit_port_t *port;
+ nxt_unit_ctx_impl_t *new_ctx;
+ nxt_unit_port_impl_t *port_impl;
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
new_ctx = malloc(sizeof(nxt_unit_ctx_impl_t) + lib->request_data_size);
if (nxt_slow_path(new_ctx == NULL)) {
- nxt_unit_warn(ctx, "failed to allocate context");
+ nxt_unit_alert(ctx, "failed to allocate context");
return NULL;
}
- rc = nxt_unit_create_port(ctx, &new_port_id, &fd);
+ rc = nxt_unit_ctx_init(lib, new_ctx, data);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
- free(new_ctx);
+ free(new_ctx);
- return NULL;
+ return NULL;
}
- rc = nxt_unit_send_port(ctx, &lib->ready_port_id, &new_port_id, fd);
- if (nxt_slow_path(rc != NXT_UNIT_OK)) {
- lib->callbacks.remove_port(ctx, &new_port_id);
+ queue_fd = -1;
- close(fd);
+ port = nxt_unit_create_port(ctx);
+ if (nxt_slow_path(port == NULL)) {
+ goto fail;
+ }
- free(new_ctx);
+ new_ctx->read_port = port;
- return NULL;
+ queue_fd = nxt_unit_shm_open(ctx, sizeof(nxt_port_queue_t));
+ if (nxt_slow_path(queue_fd == -1)) {
+ goto fail;
}
- close(fd);
+ mem = mmap(NULL, sizeof(nxt_port_queue_t),
+ PROT_READ | PROT_WRITE, MAP_SHARED, queue_fd, 0);
+ if (nxt_slow_path(mem == MAP_FAILED)) {
+ nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", queue_fd,
+ strerror(errno), errno);
- rc = nxt_unit_ctx_init(lib, new_ctx, data);
- if (nxt_slow_path(rc != NXT_UNIT_OK)) {
- lib->callbacks.remove_port(ctx, &new_port_id);
+ goto fail;
+ }
- free(new_ctx);
+ nxt_port_queue_init(mem);
- return NULL;
+ port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
+ port_impl->queue = mem;
+
+ rc = nxt_unit_send_port(ctx, lib->router_port, port, queue_fd);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ goto fail;
}
- new_ctx->read_port_id = new_port_id;
+ nxt_unit_close(queue_fd);
return &new_ctx->ctx;
+
+fail:
+
+ if (queue_fd != -1) {
+ nxt_unit_close(queue_fd);
+ }
+
+ nxt_unit_ctx_release(&new_ctx->ctx);
+
+ return NULL;
}
-void
-nxt_unit_ctx_free(nxt_unit_ctx_t *ctx)
+static void
+nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl)
{
nxt_unit_impl_t *lib;
- nxt_unit_ctx_impl_t *ctx_impl;
nxt_unit_mmap_buf_t *mmap_buf;
nxt_unit_request_info_impl_t *req_impl;
nxt_unit_websocket_frame_impl_t *ws_impl;
- ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
- lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+ lib = nxt_container_of(ctx_impl->ctx.unit, nxt_unit_impl_t, unit);
nxt_queue_each(req_impl, &ctx_impl->active_req,
nxt_unit_request_info_impl_t, link)
@@ -4099,9 +4990,16 @@ nxt_unit_ctx_free(nxt_unit_ctx_t *ctx)
nxt_queue_remove(&ctx_impl->link);
+ if (nxt_fast_path(ctx_impl->read_port != NULL)) {
+ nxt_unit_remove_port(lib, &ctx_impl->read_port->id);
+ nxt_unit_port_release(ctx_impl->read_port);
+ }
+
if (ctx_impl != &lib->main_ctx) {
free(ctx_impl);
}
+
+ nxt_unit_lib_release(lib);
}
@@ -4127,42 +5025,12 @@ nxt_unit_port_id_init(nxt_unit_port_id_t *port_id, pid_t pid, uint16_t id)
}
-int
-nxt_unit_create_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *dst,
- nxt_unit_port_id_t *port_id)
-{
- int rc, fd;
- nxt_unit_impl_t *lib;
- nxt_unit_port_id_t new_port_id;
-
- lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
-
- rc = nxt_unit_create_port(ctx, &new_port_id, &fd);
- if (nxt_slow_path(rc != NXT_UNIT_OK)) {
- return rc;
- }
-
- rc = nxt_unit_send_port(ctx, dst, &new_port_id, fd);
-
- if (nxt_fast_path(rc == NXT_UNIT_OK)) {
- *port_id = new_port_id;
-
- } else {
- lib->callbacks.remove_port(ctx, &new_port_id);
- }
-
- close(fd);
-
- return rc;
-}
-
-
-static int
-nxt_unit_create_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int *fd)
+static nxt_unit_port_t *
+nxt_unit_create_port(nxt_unit_ctx_t *ctx)
{
int rc, port_sockets[2];
nxt_unit_impl_t *lib;
- nxt_unit_port_t new_port;
+ nxt_unit_port_t new_port, *port;
nxt_unit_process_t *process;
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
@@ -4172,7 +5040,7 @@ nxt_unit_create_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int *fd)
nxt_unit_warn(ctx, "create_port: socketpair() failed: %s (%d)",
strerror(errno), errno);
- return NXT_UNIT_ERROR;
+ return NULL;
}
nxt_unit_debug(ctx, "create_port: new socketpair: %d->%d",
@@ -4180,49 +5048,43 @@ nxt_unit_create_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int *fd)
pthread_mutex_lock(&lib->mutex);
- process = nxt_unit_process_get(ctx, lib->pid);
+ process = nxt_unit_process_get(lib, lib->pid);
if (nxt_slow_path(process == NULL)) {
pthread_mutex_unlock(&lib->mutex);
- close(port_sockets[0]);
- close(port_sockets[1]);
+ nxt_unit_close(port_sockets[0]);
+ nxt_unit_close(port_sockets[1]);
- return NXT_UNIT_ERROR;
+ return NULL;
}
nxt_unit_port_id_init(&new_port.id, lib->pid, process->next_port_id++);
new_port.in_fd = port_sockets[0];
- new_port.out_fd = -1;
+ new_port.out_fd = port_sockets[1];
new_port.data = NULL;
pthread_mutex_unlock(&lib->mutex);
- nxt_unit_process_use(ctx, process, -1);
-
- rc = lib->callbacks.add_port(ctx, &new_port);
- if (nxt_slow_path(rc != NXT_UNIT_OK)) {
- nxt_unit_warn(ctx, "create_port: add_port() failed");
-
- close(port_sockets[0]);
- close(port_sockets[1]);
+ nxt_unit_process_release(process);
- return rc;
+ port = nxt_unit_add_port(ctx, &new_port, NULL);
+ if (nxt_slow_path(port == NULL)) {
+ nxt_unit_close(port_sockets[0]);
+ nxt_unit_close(port_sockets[1]);
}
- *port_id = new_port.id;
- *fd = port_sockets[1];
-
- return rc;
+ return port;
}
static int
-nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *dst,
- nxt_unit_port_id_t *new_port, int fd)
+nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst,
+ nxt_unit_port_t *port, int queue_fd)
{
ssize_t res;
nxt_unit_impl_t *lib;
+ int fds[2] = { port->out_fd, queue_fd };
struct {
nxt_port_msg_t msg;
@@ -4231,7 +5093,7 @@ nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *dst,
union {
struct cmsghdr cm;
- char space[CMSG_SPACE(sizeof(int))];
+ char space[CMSG_SPACE(sizeof(int) * 2)];
} cmsg;
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
@@ -4246,15 +5108,15 @@ nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *dst,
m.msg.mf = 0;
m.msg.tracking = 0;
- m.new_port.id = new_port->id;
- m.new_port.pid = new_port->pid;
+ m.new_port.id = port->id.id;
+ m.new_port.pid = port->id.pid;
m.new_port.type = NXT_PROCESS_APP;
m.new_port.max_size = 16 * 1024;
m.new_port.max_share = 64 * 1024;
memset(&cmsg, 0, sizeof(cmsg));
- cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int));
+ cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int) * 2);
cmsg.cm.cmsg_level = SOL_SOCKET;
cmsg.cm.cmsg_type = SCM_RIGHTS;
@@ -4267,22 +5129,74 @@ nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *dst,
* Fortunately, GCC with -O1 compiles this nxt_memcpy()
* in the same simple assignment as in the code above.
*/
- memcpy(CMSG_DATA(&cmsg.cm), &fd, sizeof(int));
+ memcpy(CMSG_DATA(&cmsg.cm), fds, sizeof(int) * 2);
- res = lib->callbacks.port_send(ctx, dst, &m, sizeof(m),
- &cmsg, sizeof(cmsg));
+ res = nxt_unit_port_send(ctx, dst, &m, sizeof(m), &cmsg, sizeof(cmsg));
- return res == sizeof(m) ? NXT_UNIT_OK : NXT_UNIT_ERROR;
+ return (res == sizeof(m)) ? NXT_UNIT_OK : NXT_UNIT_ERROR;
}
-int
-nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
+nxt_inline void nxt_unit_port_use(nxt_unit_port_t *port)
{
- int rc;
- nxt_unit_impl_t *lib;
- nxt_unit_process_t *process;
- nxt_unit_port_impl_t *new_port, *old_port;
+ nxt_unit_port_impl_t *port_impl;
+
+ port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
+
+ nxt_atomic_fetch_add(&port_impl->use_count, 1);
+}
+
+
+nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port)
+{
+ long c;
+ nxt_unit_port_impl_t *port_impl;
+
+ port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
+
+ c = nxt_atomic_fetch_add(&port_impl->use_count, -1);
+
+ if (c == 1) {
+ nxt_unit_debug(NULL, "destroy port{%d,%d} in_fd %d out_fd %d",
+ (int) port->id.pid, (int) port->id.id,
+ port->in_fd, port->out_fd);
+
+ nxt_unit_process_release(port_impl->process);
+
+ if (port->in_fd != -1) {
+ nxt_unit_close(port->in_fd);
+
+ port->in_fd = -1;
+ }
+
+ if (port->out_fd != -1) {
+ nxt_unit_close(port->out_fd);
+
+ port->out_fd = -1;
+ }
+
+ if (port_impl->queue != NULL) {
+ munmap(port_impl->queue, (port->id.id == (nxt_port_id_t) -1)
+ ? sizeof(nxt_app_queue_t)
+ : sizeof(nxt_port_queue_t));
+ }
+
+ free(port_impl);
+ }
+}
+
+
+static nxt_unit_port_t *
+nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, void *queue)
+{
+ int rc;
+ nxt_queue_t awaiting_req;
+ nxt_unit_impl_t *lib;
+ nxt_unit_port_t *old_port;
+ nxt_unit_process_t *process;
+ nxt_unit_ctx_impl_t *ctx_impl;
+ nxt_unit_port_impl_t *new_port, *old_port_impl;
+ nxt_unit_request_info_impl_t *req_impl;
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
@@ -4291,32 +5205,91 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
old_port = nxt_unit_port_hash_find(&lib->ports, &port->id, 0);
if (nxt_slow_path(old_port != NULL)) {
- nxt_unit_debug(ctx, "add_port: duplicate %d,%d in_fd %d out_fd %d",
- port->id.pid, port->id.id,
- port->in_fd, port->out_fd);
+ nxt_unit_debug(ctx, "add_port: duplicate port{%d,%d} "
+ "in_fd %d out_fd %d queue %p",
+ port->id.pid, port->id.id,
+ port->in_fd, port->out_fd, queue);
+
+ if (old_port->data == NULL) {
+ old_port->data = port->data;
+ port->data = NULL;
+ }
+
+ if (old_port->in_fd == -1) {
+ old_port->in_fd = port->in_fd;
+ port->in_fd = -1;
+ }
if (port->in_fd != -1) {
- close(port->in_fd);
+ nxt_unit_close(port->in_fd);
port->in_fd = -1;
}
+ if (old_port->out_fd == -1) {
+ old_port->out_fd = port->out_fd;
+ port->out_fd = -1;
+ }
+
if (port->out_fd != -1) {
- close(port->out_fd);
+ nxt_unit_close(port->out_fd);
port->out_fd = -1;
}
+ *port = *old_port;
+
+ nxt_queue_init(&awaiting_req);
+
+ old_port_impl = nxt_container_of(old_port, nxt_unit_port_impl_t, port);
+
+ if (old_port_impl->queue == NULL) {
+ old_port_impl->queue = queue;
+ }
+
+ if (!nxt_queue_is_empty(&old_port_impl->awaiting_req)) {
+ nxt_queue_add(&awaiting_req, &old_port_impl->awaiting_req);
+ nxt_queue_init(&old_port_impl->awaiting_req);
+ }
+
+ old_port_impl->ready = (port->in_fd != -1 || port->out_fd != -1);
+
pthread_mutex_unlock(&lib->mutex);
- return NXT_UNIT_OK;
+ if (lib->callbacks.add_port != NULL
+ && (port->in_fd != -1 || port->out_fd != -1))
+ {
+ lib->callbacks.add_port(ctx, old_port);
+ }
+
+ nxt_queue_each(req_impl, &awaiting_req,
+ nxt_unit_request_info_impl_t, port_wait_link)
+ {
+ nxt_queue_remove(&req_impl->port_wait_link);
+
+ ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t,
+ ctx);
+
+ pthread_mutex_lock(&ctx_impl->mutex);
+
+ nxt_queue_insert_tail(&ctx_impl->ready_req,
+ &req_impl->port_wait_link);
+
+ pthread_mutex_unlock(&ctx_impl->mutex);
+
+ nxt_atomic_fetch_add(&ctx_impl->wait_items, -1);
+
+ } nxt_queue_loop;
+
+ return old_port;
}
- nxt_unit_debug(ctx, "add_port: %d,%d in_fd %d out_fd %d",
+ new_port = NULL;
+
+ nxt_unit_debug(ctx, "add_port: port{%d,%d} in_fd %d out_fd %d queue %p",
port->id.pid, port->id.id,
- port->in_fd, port->out_fd);
+ port->in_fd, port->out_fd, queue);
- process = nxt_unit_process_get(ctx, port->id.pid);
+ process = nxt_unit_process_get(lib, port->id.pid);
if (nxt_slow_path(process == NULL)) {
- rc = NXT_UNIT_ERROR;
goto unlock;
}
@@ -4326,7 +5299,9 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
new_port = malloc(sizeof(nxt_unit_port_impl_t));
if (nxt_slow_path(new_port == NULL)) {
- rc = NXT_UNIT_ERROR;
+ nxt_unit_alert(ctx, "add_port: %d,%d malloc() failed",
+ port->id.pid, port->id.id);
+
goto unlock;
}
@@ -4337,149 +5312,131 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
nxt_unit_alert(ctx, "add_port: %d,%d hash_add failed",
port->id.pid, port->id.id);
+ free(new_port);
+
+ new_port = NULL;
+
goto unlock;
}
nxt_queue_insert_tail(&process->ports, &new_port->link);
- rc = NXT_UNIT_OK;
-
+ new_port->use_count = 2;
new_port->process = process;
+ new_port->ready = (port->in_fd != -1 || port->out_fd != -1);
+ new_port->queue = queue;
+ new_port->from_socket = 0;
+ new_port->socket_rbuf = NULL;
+
+ nxt_queue_init(&new_port->awaiting_req);
+
+ process = NULL;
unlock:
pthread_mutex_unlock(&lib->mutex);
- if (nxt_slow_path(process != NULL && rc != NXT_UNIT_OK)) {
- nxt_unit_process_use(ctx, process, -1);
+ if (nxt_slow_path(process != NULL)) {
+ nxt_unit_process_release(process);
}
- return rc;
-}
-
+ if (lib->callbacks.add_port != NULL
+ && new_port != NULL
+ && (port->in_fd != -1 || port->out_fd != -1))
+ {
+ lib->callbacks.add_port(ctx, &new_port->port);
+ }
-void
-nxt_unit_remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id)
-{
- nxt_unit_find_remove_port(ctx, port_id, NULL);
+ return &new_port->port;
}
-void
-nxt_unit_find_remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
- nxt_unit_port_t *r_port)
+static void
+nxt_unit_remove_port(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id)
{
- nxt_unit_impl_t *lib;
- nxt_unit_process_t *process;
-
- lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+ nxt_unit_port_t *port;
+ nxt_unit_port_impl_t *port_impl;
pthread_mutex_lock(&lib->mutex);
- process = NULL;
+ port = nxt_unit_remove_port_unsafe(lib, port_id);
- nxt_unit_remove_port_unsafe(ctx, port_id, r_port, &process);
+ if (nxt_fast_path(port != NULL)) {
+ port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
+
+ nxt_queue_remove(&port_impl->link);
+ }
pthread_mutex_unlock(&lib->mutex);
- if (nxt_slow_path(process != NULL)) {
- nxt_unit_process_use(ctx, process, -1);
+ if (lib->callbacks.remove_port != NULL && port != NULL) {
+ lib->callbacks.remove_port(&lib->unit, port);
+ }
+
+ if (nxt_fast_path(port != NULL)) {
+ nxt_unit_port_release(port);
}
}
-static void
-nxt_unit_remove_port_unsafe(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
- nxt_unit_port_t *r_port, nxt_unit_process_t **process)
+static nxt_unit_port_t *
+nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id)
{
- nxt_unit_impl_t *lib;
- nxt_unit_port_impl_t *port;
-
- lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+ nxt_unit_port_t *port;
port = nxt_unit_port_hash_find(&lib->ports, port_id, 1);
if (nxt_slow_path(port == NULL)) {
- nxt_unit_debug(ctx, "remove_port: port %d,%d not found",
+ nxt_unit_debug(NULL, "remove_port: port{%d,%d} not found",
(int) port_id->pid, (int) port_id->id);
- return;
+ return NULL;
}
- nxt_unit_debug(ctx, "remove_port: port %d,%d, fds %d,%d, data %p",
+ nxt_unit_debug(NULL, "remove_port: port{%d,%d}, fds %d,%d, data %p",
(int) port_id->pid, (int) port_id->id,
- port->port.in_fd, port->port.out_fd, port->port.data);
-
- if (port->port.in_fd != -1) {
- close(port->port.in_fd);
- }
-
- if (port->port.out_fd != -1) {
- close(port->port.out_fd);
- }
-
- if (port->process != NULL) {
- nxt_queue_remove(&port->link);
- }
-
- if (process != NULL) {
- *process = port->process;
- }
+ port->in_fd, port->out_fd, port->data);
- if (r_port != NULL) {
- *r_port = port->port;
- }
-
- free(port);
+ return port;
}
-void
-nxt_unit_remove_pid(nxt_unit_ctx_t *ctx, pid_t pid)
+static void
+nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid)
{
- nxt_unit_impl_t *lib;
nxt_unit_process_t *process;
- lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
-
pthread_mutex_lock(&lib->mutex);
- process = nxt_unit_process_find(ctx, pid, 1);
+ process = nxt_unit_process_find(lib, pid, 1);
if (nxt_slow_path(process == NULL)) {
- nxt_unit_debug(ctx, "remove_pid: process %d not found", (int) pid);
+ nxt_unit_debug(NULL, "remove_pid: process %d not found", (int) pid);
pthread_mutex_unlock(&lib->mutex);
return;
}
- nxt_unit_remove_process(ctx, process);
+ nxt_unit_remove_process(lib, process);
+
+ if (lib->callbacks.remove_pid != NULL) {
+ lib->callbacks.remove_pid(&lib->unit, pid);
+ }
}
static void
-nxt_unit_remove_process(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process)
+nxt_unit_remove_process(nxt_unit_impl_t *lib, nxt_unit_process_t *process)
{
nxt_queue_t ports;
- nxt_unit_impl_t *lib;
nxt_unit_port_impl_t *port;
- lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
-
nxt_queue_init(&ports);
nxt_queue_add(&ports, &process->ports);
nxt_queue_each(port, &ports, nxt_unit_port_impl_t, link) {
- nxt_unit_process_use(ctx, process, -1);
- port->process = NULL;
-
- /* Shortcut for default callback. */
- if (lib->callbacks.remove_port == nxt_unit_remove_port) {
- nxt_queue_remove(&port->link);
-
- nxt_unit_remove_port_unsafe(ctx, &port->port.id, NULL, NULL);
- }
+ nxt_unit_remove_port_unsafe(lib, &port->port.id);
} nxt_queue_loop;
@@ -4489,70 +5446,168 @@ nxt_unit_remove_process(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process)
nxt_queue_remove(&port->link);
- lib->callbacks.remove_port(ctx, &port->port.id);
+ if (lib->callbacks.remove_port != NULL) {
+ lib->callbacks.remove_port(&lib->unit, &port->port);
+ }
+
+ nxt_unit_port_release(&port->port);
} nxt_queue_loop;
- nxt_unit_process_use(ctx, process, -1);
+ nxt_unit_process_release(process);
}
-void
+static void
nxt_unit_quit(nxt_unit_ctx_t *ctx)
{
nxt_unit_impl_t *lib;
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
- lib->online = 0;
+ if (lib->online) {
+ lib->online = 0;
+
+ if (lib->callbacks.quit != NULL) {
+ lib->callbacks.quit(ctx);
+ }
+ }
+}
+
+
+static int
+nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id)
+{
+ ssize_t res;
+ nxt_unit_impl_t *lib;
+ nxt_unit_ctx_impl_t *ctx_impl;
+
+ struct {
+ nxt_port_msg_t msg;
+ nxt_port_msg_get_port_t get_port;
+ } m;
+
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+ ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
+
+ memset(&m.msg, 0, sizeof(nxt_port_msg_t));
+
+ m.msg.pid = lib->pid;
+ m.msg.reply_port = ctx_impl->read_port->id.id;
+ m.msg.type = _NXT_PORT_MSG_GET_PORT;
+
+ m.get_port.id = port_id->id;
+ m.get_port.pid = port_id->pid;
+
+ nxt_unit_debug(ctx, "get_port: %d %d", (int) port_id->pid,
+ (int) port_id->id);
+
+ res = nxt_unit_port_send(ctx, lib->router_port, &m, sizeof(m), NULL, 0);
+ if (nxt_slow_path(res != sizeof(m))) {
+ return NXT_UNIT_ERROR;
+ }
+
+ return NXT_UNIT_OK;
}
static ssize_t
-nxt_unit_port_send_default(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
+nxt_unit_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
const void *buf, size_t buf_size, const void *oob, size_t oob_size)
{
- int fd;
+ int notify;
+ ssize_t ret;
+ nxt_int_t rc;
+ nxt_port_msg_t msg;
nxt_unit_impl_t *lib;
- nxt_unit_port_impl_t *port;
+ nxt_unit_port_impl_t *port_impl;
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
- pthread_mutex_lock(&lib->mutex);
+ port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
+ if (port_impl->queue != NULL && oob_size == 0
+ && buf_size <= NXT_PORT_QUEUE_MSG_SIZE)
+ {
+ rc = nxt_port_queue_send(port_impl->queue, buf, buf_size, &notify);
+ if (nxt_slow_path(rc != NXT_OK)) {
+ nxt_unit_alert(ctx, "port_send: port %d,%d queue overflow",
+ (int) port->id.pid, (int) port->id.id);
- port = nxt_unit_port_hash_find(&lib->ports, port_id, 0);
+ return -1;
+ }
- if (nxt_fast_path(port != NULL)) {
- fd = port->port.out_fd;
+ nxt_unit_debug(ctx, "port{%d,%d} enqueue %d notify %d",
+ (int) port->id.pid, (int) port->id.id,
+ (int) buf_size, notify);
- } else {
- nxt_unit_warn(ctx, "port_send: port %d,%d not found",
- (int) port_id->pid, (int) port_id->id);
- fd = -1;
+ if (notify) {
+ memcpy(&msg, buf, sizeof(nxt_port_msg_t));
+
+ msg.type = _NXT_PORT_MSG_READ_QUEUE;
+
+ if (lib->callbacks.port_send == NULL) {
+ ret = nxt_unit_sendmsg(ctx, port->out_fd, &msg,
+ sizeof(nxt_port_msg_t), NULL, 0);
+
+ nxt_unit_debug(ctx, "port{%d,%d} send %d read_queue",
+ (int) port->id.pid, (int) port->id.id,
+ (int) ret);
+
+ } else {
+ ret = lib->callbacks.port_send(ctx, port, &msg,
+ sizeof(nxt_port_msg_t), NULL, 0);
+
+ nxt_unit_debug(ctx, "port{%d,%d} sendcb %d read_queue",
+ (int) port->id.pid, (int) port->id.id,
+ (int) ret);
+ }
+
+ }
+
+ return buf_size;
}
- pthread_mutex_unlock(&lib->mutex);
+ if (port_impl->queue != NULL) {
+ msg.type = _NXT_PORT_MSG_READ_SOCKET;
- if (nxt_slow_path(fd == -1)) {
- if (port != NULL) {
- nxt_unit_warn(ctx, "port_send: port %d,%d: fd == -1",
- (int) port_id->pid, (int) port_id->id);
+ rc = nxt_port_queue_send(port_impl->queue, &msg.type, 1, &notify);
+ if (nxt_slow_path(rc != NXT_OK)) {
+ nxt_unit_alert(ctx, "port_send: port %d,%d queue overflow",
+ (int) port->id.pid, (int) port->id.id);
+
+ return -1;
}
- return -1;
+ nxt_unit_debug(ctx, "port{%d,%d} enqueue 1 read_socket notify %d",
+ (int) port->id.pid, (int) port->id.id, notify);
}
- nxt_unit_debug(ctx, "port_send: found port %d,%d fd %d",
- (int) port_id->pid, (int) port_id->id, fd);
+ if (lib->callbacks.port_send != NULL) {
+ ret = lib->callbacks.port_send(ctx, port, buf, buf_size,
+ oob, oob_size);
+
+ nxt_unit_debug(ctx, "port{%d,%d} sendcb %d",
+ (int) port->id.pid, (int) port->id.id,
+ (int) ret);
+
+ } else {
+ ret = nxt_unit_sendmsg(ctx, port->out_fd, buf, buf_size,
+ oob, oob_size);
+
+ nxt_unit_debug(ctx, "port{%d,%d} sendmsg %d",
+ (int) port->id.pid, (int) port->id.id,
+ (int) ret);
+ }
- return nxt_unit_port_send(ctx, fd, buf, buf_size, oob, oob_size);
+ return ret;
}
-ssize_t
-nxt_unit_port_send(nxt_unit_ctx_t *ctx, int fd,
+static ssize_t
+nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd,
const void *buf, size_t buf_size, const void *oob, size_t oob_size)
{
+ int err;
ssize_t res;
struct iovec iov[1];
struct msghdr msg;
@@ -4573,7 +5628,9 @@ retry:
res = sendmsg(fd, &msg, 0);
if (nxt_slow_path(res == -1)) {
- if (errno == EINTR) {
+ err = errno;
+
+ if (err == EINTR) {
goto retry;
}
@@ -4582,7 +5639,7 @@ retry:
* implementation.
*/
nxt_unit_warn(ctx, "sendmsg(%d, %d) failed: %s (%d)",
- fd, (int) buf_size, strerror(errno), errno);
+ fd, (int) buf_size, strerror(err), err);
} else {
nxt_unit_debug(ctx, "sendmsg(%d, %d): %d", fd, (int) buf_size,
@@ -4593,88 +5650,310 @@ retry:
}
-static ssize_t
-nxt_unit_port_recv_default(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
- void *buf, size_t buf_size, void *oob, size_t oob_size)
+static int
+nxt_unit_ctx_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
+ nxt_unit_read_buf_t *rbuf)
{
- int fd;
- nxt_unit_impl_t *lib;
- nxt_unit_ctx_impl_t *ctx_impl;
- nxt_unit_port_impl_t *port;
+ int res, read;
+ nxt_unit_port_impl_t *port_impl;
- lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+ port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
- pthread_mutex_lock(&lib->mutex);
+ read = 0;
- port = nxt_unit_port_hash_find(&lib->ports, port_id, 0);
+retry:
- if (nxt_fast_path(port != NULL)) {
- fd = port->port.in_fd;
+ if (port_impl->from_socket > 0) {
+ if (port_impl->socket_rbuf != NULL
+ && port_impl->socket_rbuf->size > 0)
+ {
+ port_impl->from_socket--;
+
+ nxt_unit_rbuf_cpy(rbuf, port_impl->socket_rbuf);
+ port_impl->socket_rbuf->size = 0;
+
+ nxt_unit_debug(ctx, "port{%d,%d} use suspended message %d",
+ (int) port->id.pid, (int) port->id.id,
+ (int) rbuf->size);
+
+ return NXT_UNIT_OK;
+ }
} else {
- nxt_unit_debug(ctx, "port_recv: port %d,%d not found",
- (int) port_id->pid, (int) port_id->id);
- fd = -1;
+ res = nxt_unit_port_queue_recv(port, rbuf);
+
+ if (res == NXT_UNIT_OK) {
+ if (nxt_unit_is_read_socket(rbuf)) {
+ port_impl->from_socket++;
+
+ nxt_unit_debug(ctx, "port{%d,%d} dequeue 1 read_socket %d",
+ (int) port->id.pid, (int) port->id.id,
+ port_impl->from_socket);
+
+ goto retry;
+ }
+
+ nxt_unit_debug(ctx, "port{%d,%d} dequeue %d",
+ (int) port->id.pid, (int) port->id.id,
+ (int) rbuf->size);
+
+ return NXT_UNIT_OK;
+ }
}
- pthread_mutex_unlock(&lib->mutex);
+ if (read) {
+ return NXT_UNIT_AGAIN;
+ }
- if (nxt_slow_path(fd == -1)) {
- return -1;
+ res = nxt_unit_port_recv(ctx, port, rbuf);
+ if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
+ return NXT_UNIT_ERROR;
}
- nxt_unit_debug(ctx, "port_recv: found port %d,%d, fd %d",
- (int) port_id->pid, (int) port_id->id, fd);
+ read = 1;
- ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
+ if (nxt_unit_is_read_queue(rbuf)) {
+ nxt_unit_debug(ctx, "port{%d,%d} recv %d read_queue",
+ (int) port->id.pid, (int) port->id.id, (int) rbuf->size);
+
+ if (port_impl->from_socket) {
+ nxt_unit_warn(ctx, "port protocol warning: READ_QUEUE after READ_SOCKET");
+ }
+
+ goto retry;
+ }
+
+ nxt_unit_debug(ctx, "port{%d,%d} recvmsg %d",
+ (int) port->id.pid, (int) port->id.id,
+ (int) rbuf->size);
+
+ if (res == NXT_UNIT_AGAIN) {
+ return NXT_UNIT_AGAIN;
+ }
+
+ if (port_impl->from_socket > 0) {
+ port_impl->from_socket--;
+
+ return NXT_UNIT_OK;
+ }
+
+ nxt_unit_debug(ctx, "port{%d,%d} suspend message %d",
+ (int) port->id.pid, (int) port->id.id,
+ (int) rbuf->size);
+
+ if (port_impl->socket_rbuf == NULL) {
+ port_impl->socket_rbuf = nxt_unit_read_buf_get(ctx);
+
+ if (nxt_slow_path(port_impl->socket_rbuf == NULL)) {
+ return NXT_UNIT_ERROR;
+ }
+
+ port_impl->socket_rbuf->size = 0;
+ }
+
+ if (port_impl->socket_rbuf->size > 0) {
+ nxt_unit_alert(ctx, "too many port socket messages");
+
+ return NXT_UNIT_ERROR;
+ }
- if (nxt_fast_path(port_id == &ctx_impl->read_port_id)) {
- ctx_impl->read_port_fd = fd;
+ nxt_unit_rbuf_cpy(port_impl->socket_rbuf, rbuf);
+
+ memset(rbuf->oob, 0, sizeof(struct cmsghdr));
+
+ goto retry;
+}
+
+
+nxt_inline void
+nxt_unit_rbuf_cpy(nxt_unit_read_buf_t *dst, nxt_unit_read_buf_t *src)
+{
+ memcpy(dst->buf, src->buf, src->size);
+ dst->size = src->size;
+ memcpy(dst->oob, src->oob, sizeof(src->oob));
+}
+
+
+static int
+nxt_unit_shared_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
+ nxt_unit_read_buf_t *rbuf)
+{
+ int res;
+
+retry:
+
+ res = nxt_unit_app_queue_recv(port, rbuf);
+
+ if (res == NXT_UNIT_AGAIN) {
+ res = nxt_unit_port_recv(ctx, port, rbuf);
+ if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
+ return NXT_UNIT_ERROR;
+ }
+
+ if (nxt_unit_is_read_queue(rbuf)) {
+ nxt_unit_debug(ctx, "port{%d,%d} recv %d read_queue",
+ (int) port->id.pid, (int) port->id.id, (int) rbuf->size);
+
+ goto retry;
+ }
}
- return nxt_unit_port_recv(ctx, fd, buf, buf_size, oob, oob_size);
+ return res;
}
-ssize_t
-nxt_unit_port_recv(nxt_unit_ctx_t *ctx, int fd, void *buf, size_t buf_size,
- void *oob, size_t oob_size)
+static int
+nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
+ nxt_unit_read_buf_t *rbuf)
{
- ssize_t res;
- struct iovec iov[1];
- struct msghdr msg;
+ int fd, err;
+ struct iovec iov[1];
+ struct msghdr msg;
+ nxt_unit_impl_t *lib;
- iov[0].iov_base = buf;
- iov[0].iov_len = buf_size;
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+
+ if (lib->callbacks.port_recv != NULL) {
+ rbuf->size = lib->callbacks.port_recv(ctx, port,
+ rbuf->buf, sizeof(rbuf->buf),
+ rbuf->oob, sizeof(rbuf->oob));
+
+ nxt_unit_debug(ctx, "port{%d,%d} recvcb %d",
+ (int) port->id.pid, (int) port->id.id, (int) rbuf->size);
+
+ if (nxt_slow_path(rbuf->size < 0)) {
+ return NXT_UNIT_ERROR;
+ }
+
+ return NXT_UNIT_OK;
+ }
+
+ iov[0].iov_base = rbuf->buf;
+ iov[0].iov_len = sizeof(rbuf->buf);
msg.msg_name = NULL;
msg.msg_namelen = 0;
msg.msg_iov = iov;
msg.msg_iovlen = 1;
msg.msg_flags = 0;
- msg.msg_control = oob;
- msg.msg_controllen = oob_size;
+ msg.msg_control = rbuf->oob;
+ msg.msg_controllen = sizeof(rbuf->oob);
+
+ fd = port->in_fd;
retry:
- res = recvmsg(fd, &msg, 0);
+ rbuf->size = recvmsg(fd, &msg, 0);
- if (nxt_slow_path(res == -1)) {
- if (errno == EINTR) {
+ if (nxt_slow_path(rbuf->size == -1)) {
+ err = errno;
+
+ if (err == EINTR) {
goto retry;
}
+ if (err == EAGAIN) {
+ nxt_unit_debug(ctx, "recvmsg(%d) failed: %s (%d)",
+ fd, strerror(err), err);
+
+ return NXT_UNIT_AGAIN;
+ }
+
nxt_unit_alert(ctx, "recvmsg(%d) failed: %s (%d)",
+ fd, strerror(err), err);
+
+ return NXT_UNIT_ERROR;
+ }
+
+ nxt_unit_debug(ctx, "recvmsg(%d): %d", fd, (int) rbuf->size);
+
+ return NXT_UNIT_OK;
+}
+
+
+static int
+nxt_unit_port_queue_recv(nxt_unit_port_t *port, nxt_unit_read_buf_t *rbuf)
+{
+ nxt_unit_port_impl_t *port_impl;
+
+ port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
+
+ rbuf->size = nxt_port_queue_recv(port_impl->queue, rbuf->buf);
+
+ return (rbuf->size == -1) ? NXT_UNIT_AGAIN : NXT_UNIT_OK;
+}
+
+
+static int
+nxt_unit_app_queue_recv(nxt_unit_port_t *port, nxt_unit_read_buf_t *rbuf)
+{
+ uint32_t cookie;
+ nxt_port_msg_t *port_msg;
+ nxt_app_queue_t *queue;
+ nxt_unit_port_impl_t *port_impl;
+
+ port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
+ queue = port_impl->queue;
+
+retry:
+
+ rbuf->size = nxt_app_queue_recv(queue, rbuf->buf, &cookie);
+
+ nxt_unit_debug(NULL, "app_queue_recv: %d", (int) rbuf->size);
+
+ if (rbuf->size >= (ssize_t) sizeof(nxt_port_msg_t)) {
+ port_msg = (nxt_port_msg_t *) rbuf->buf;
+
+ if (nxt_app_queue_cancel(queue, cookie, port_msg->stream)) {
+ return NXT_UNIT_OK;
+ }
+
+ nxt_unit_debug(NULL, "app_queue_recv: message cancelled");
+
+ goto retry;
+ }
+
+ return (rbuf->size == -1) ? NXT_UNIT_AGAIN : NXT_UNIT_OK;
+}
+
+
+nxt_inline int
+nxt_unit_close(int fd)
+{
+ int res;
+
+ res = close(fd);
+
+ if (nxt_slow_path(res == -1)) {
+ nxt_unit_alert(NULL, "close(%d) failed: %s (%d)",
fd, strerror(errno), errno);
} else {
- nxt_unit_debug(ctx, "recvmsg(%d): %d", fd, (int) res);
+ nxt_unit_debug(NULL, "close(%d): %d", fd, res);
}
return res;
}
+static int
+nxt_unit_fd_blocking(int fd)
+{
+ int nb;
+
+ nb = 0;
+
+ if (nxt_slow_path(ioctl(fd, FIONBIO, &nb) == -1)) {
+ nxt_unit_alert(NULL, "ioctl(%d, FIONBIO, 0) failed: %s (%d)",
+ fd, strerror(errno), errno);
+
+ return NXT_UNIT_ERROR;
+ }
+
+ return NXT_UNIT_OK;
+}
+
+
static nxt_int_t
nxt_unit_port_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
{
@@ -4755,7 +6034,7 @@ nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash, nxt_unit_port_t *port)
}
-static nxt_unit_port_impl_t *
+static nxt_unit_port_t *
nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash, nxt_unit_port_id_t *port_id,
int remove)
{
@@ -4775,6 +6054,10 @@ nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash, nxt_unit_port_id_t *port_id,
switch (res) {
case NXT_OK:
+ if (!remove) {
+ nxt_unit_port_use(lhq.value);
+ }
+
return lhq.value;
default:
@@ -4799,12 +6082,19 @@ static const nxt_lvlhsh_proto_t lvlhsh_requests_proto nxt_aligned(64) = {
static int
-nxt_unit_request_hash_add(nxt_lvlhsh_t *request_hash,
- nxt_unit_request_info_impl_t *req_impl)
+nxt_unit_request_hash_add(nxt_unit_ctx_t *ctx,
+ nxt_unit_request_info_t *req)
{
- uint32_t *stream;
- nxt_int_t res;
- nxt_lvlhsh_query_t lhq;
+ uint32_t *stream;
+ nxt_int_t res;
+ nxt_lvlhsh_query_t lhq;
+ nxt_unit_ctx_impl_t *ctx_impl;
+ nxt_unit_request_info_impl_t *req_impl;
+
+ req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
+ if (req_impl->in_hash) {
+ return NXT_UNIT_OK;
+ }
stream = &req_impl->stream;
@@ -4816,11 +6106,18 @@ nxt_unit_request_hash_add(nxt_lvlhsh_t *request_hash,
lhq.replace = 0;
lhq.value = req_impl;
- res = nxt_lvlhsh_insert(request_hash, &lhq);
+ ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
+
+ pthread_mutex_lock(&ctx_impl->mutex);
+
+ res = nxt_lvlhsh_insert(&ctx_impl->requests, &lhq);
+
+ pthread_mutex_unlock(&ctx_impl->mutex);
switch (res) {
case NXT_OK:
+ req_impl->in_hash = 1;
return NXT_UNIT_OK;
default:
@@ -4829,12 +6126,13 @@ nxt_unit_request_hash_add(nxt_lvlhsh_t *request_hash,
}
-static nxt_unit_request_info_impl_t *
-nxt_unit_request_hash_find(nxt_lvlhsh_t *request_hash, uint32_t stream,
- int remove)
+static nxt_unit_request_info_t *
+nxt_unit_request_hash_find(nxt_unit_ctx_t *ctx, uint32_t stream, int remove)
{
- nxt_int_t res;
- nxt_lvlhsh_query_t lhq;
+ nxt_int_t res;
+ nxt_lvlhsh_query_t lhq;
+ nxt_unit_ctx_impl_t *ctx_impl;
+ nxt_unit_request_info_impl_t *req_impl;
lhq.key_hash = nxt_murmur_hash2(&stream, sizeof(stream));
lhq.key.length = sizeof(stream);
@@ -4842,16 +6140,26 @@ nxt_unit_request_hash_find(nxt_lvlhsh_t *request_hash, uint32_t stream,
lhq.proto = &lvlhsh_requests_proto;
lhq.pool = NULL;
+ ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
+
+ pthread_mutex_lock(&ctx_impl->mutex);
+
if (remove) {
- res = nxt_lvlhsh_delete(request_hash, &lhq);
+ res = nxt_lvlhsh_delete(&ctx_impl->requests, &lhq);
} else {
- res = nxt_lvlhsh_find(request_hash, &lhq);
+ res = nxt_lvlhsh_find(&ctx_impl->requests, &lhq);
}
+ pthread_mutex_unlock(&ctx_impl->mutex);
+
switch (res) {
case NXT_OK:
+ req_impl = nxt_container_of(lhq.value, nxt_unit_request_info_impl_t,
+ req);
+ req_impl->in_hash = 0;
+
return lhq.value;
default:
@@ -4977,11 +6285,18 @@ nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level)
tm = *localtime(&ts.tv_sec);
#endif
+#if (NXT_DEBUG)
p += snprintf(p, end - p,
"%4d/%02d/%02d %02d:%02d:%02d.%03d ",
tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday,
tm.tm_hour, tm.tm_min, tm.tm_sec,
(int) ts.tv_nsec / 1000000);
+#else
+ p += snprintf(p, end - p,
+ "%4d/%02d/%02d %02d:%02d:%02d ",
+ tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday,
+ tm.tm_hour, tm.tm_min, tm.tm_sec);
+#endif
p += snprintf(p, end - p,
"[%s] %d#%"PRIu64" [unit] ", nxt_unit_log_levels[level],