summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_unit.c
diff options
context:
space:
mode:
authorMax Romanov <max.romanov@nginx.com>2018-08-06 17:27:33 +0300
committerMax Romanov <max.romanov@nginx.com>2018-08-06 17:27:33 +0300
commit1bb22d1e922c87d3c86c67bdce626767ee48fb5c (patch)
tree6e067a82b309c3a0d0f592f037f26d886a7f8c13 /src/nxt_unit.c
parentb6ce2da65c9c5229d744b2d964623b2d0f731ee9 (diff)
downloadunit-1bb22d1e922c87d3c86c67bdce626767ee48fb5c.tar.gz
unit-1bb22d1e922c87d3c86c67bdce626767ee48fb5c.tar.bz2
Unit application library.
Library now used in all language modules. Old 'nxt_app_*' code removed. See src/test/nxt_unit_app_test.c for usage sample.
Diffstat (limited to 'src/nxt_unit.c')
-rw-r--r--src/nxt_unit.c3630
1 files changed, 3630 insertions, 0 deletions
diff --git a/src/nxt_unit.c b/src/nxt_unit.c
new file mode 100644
index 00000000..2694de7d
--- /dev/null
+++ b/src/nxt_unit.c
@@ -0,0 +1,3630 @@
+
+/*
+ * Copyright (C) NGINX, Inc.
+ */
+
+#include <stdlib.h>
+
+#include "nxt_main.h"
+#include "nxt_port_memory_int.h"
+
+#include "nxt_unit.h"
+#include "nxt_unit_request.h"
+#include "nxt_unit_response.h"
+
+#if (NXT_HAVE_MEMFD_CREATE)
+#include <linux/memfd.h>
+#endif
+
+typedef struct nxt_unit_impl_s nxt_unit_impl_t;
+typedef struct nxt_unit_mmap_s nxt_unit_mmap_t;
+typedef struct nxt_unit_mmaps_s nxt_unit_mmaps_t;
+typedef struct nxt_unit_process_s nxt_unit_process_t;
+typedef struct nxt_unit_mmap_buf_s nxt_unit_mmap_buf_t;
+typedef struct nxt_unit_recv_msg_s nxt_unit_recv_msg_t;
+typedef struct nxt_unit_ctx_impl_s nxt_unit_ctx_impl_t;
+typedef struct nxt_unit_port_impl_s nxt_unit_port_impl_t;
+typedef struct nxt_unit_request_info_impl_s nxt_unit_request_info_impl_t;
+
+static nxt_unit_impl_t *nxt_unit_create(nxt_unit_init_t *init);
+static void nxt_unit_ctx_init(nxt_unit_impl_t *lib,
+ nxt_unit_ctx_impl_t *ctx_impl, void *data);
+static int nxt_unit_read_env(nxt_unit_port_t *ready_port,
+ nxt_unit_port_t *read_port, int *log_fd, uint32_t *stream);
+static int nxt_unit_ready(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
+ uint32_t stream);
+static nxt_unit_request_info_impl_t *nxt_unit_request_info_get(
+ nxt_unit_ctx_t *ctx);
+static void nxt_unit_request_info_release(nxt_unit_request_info_t *req);
+static void nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req);
+static nxt_unit_process_t *nxt_unit_msg_get_process(nxt_unit_ctx_t *ctx,
+ nxt_unit_recv_msg_t *recv_msg);
+static nxt_unit_mmap_buf_t *nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx);
+static void nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf);
+static int nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream,
+ nxt_unit_mmap_buf_t *mmap_buf, int last);
+static nxt_port_mmap_header_t *nxt_unit_mmap_get(nxt_unit_ctx_t *ctx,
+ nxt_unit_process_t *process, nxt_unit_port_id_t *port_id,
+ nxt_chunk_id_t *c, int n);
+static nxt_unit_mmap_t *nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i);
+static nxt_port_mmap_header_t *nxt_unit_new_mmap(nxt_unit_ctx_t *ctx,
+ nxt_unit_process_t *process, nxt_unit_port_id_t *port_id, int n);
+static int nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
+ int fd);
+static int nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx,
+ nxt_unit_process_t *process, nxt_unit_port_id_t *port_id, uint32_t size,
+ nxt_unit_mmap_buf_t *mmap_buf);
+static int nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd);
+
+static void nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps);
+static void nxt_unit_process_use(nxt_unit_ctx_t *ctx,
+ nxt_unit_process_t *process, int i);
+static void nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps);
+static nxt_port_mmap_header_t *nxt_unit_get_incoming_mmap(nxt_unit_ctx_t *ctx,
+ nxt_unit_process_t *process, uint32_t id);
+static int nxt_unit_tracking_read(nxt_unit_ctx_t *ctx,
+ nxt_unit_recv_msg_t *recv_msg);
+static int nxt_unit_mmap_read(nxt_unit_ctx_t *ctx,
+ nxt_unit_recv_msg_t *recv_msg, nxt_queue_t *incoming_buf);
+static int nxt_unit_mmap_release(nxt_port_mmap_header_t *hdr, void *start,
+ uint32_t size);
+
+static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_ctx_t *ctx,
+ pid_t pid);
+static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_ctx_t *ctx,
+ pid_t pid, int remove);
+static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib);
+static int nxt_unit_run_once(nxt_unit_ctx_t *ctx);
+static int nxt_unit_create_port(nxt_unit_ctx_t *ctx,
+ nxt_unit_port_id_t *port_id, int *fd);
+
+static int nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *dst,
+ nxt_unit_port_id_t *new_port, int fd);
+
+static void nxt_unit_remove_port_unsafe(nxt_unit_ctx_t *ctx,
+ nxt_unit_port_id_t *port_id, nxt_unit_port_t *r_port,
+ nxt_unit_process_t **process);
+static void nxt_unit_remove_process(nxt_unit_ctx_t *ctx,
+ nxt_unit_process_t *process);
+
+static ssize_t nxt_unit_port_send_default(nxt_unit_ctx_t *ctx,
+ nxt_unit_port_id_t *port_id, const void *buf, size_t buf_size,
+ const void *oob, size_t oob_size);
+static ssize_t nxt_unit_port_recv_default(nxt_unit_ctx_t *ctx,
+ nxt_unit_port_id_t *port_id, void *buf, size_t buf_size,
+ void *oob, size_t oob_size);
+
+static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash,
+ nxt_unit_port_t *port);
+static nxt_unit_port_impl_t *nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash,
+ nxt_unit_port_id_t *port_id, int remove);
+
+static char * nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level);
+
+
+struct nxt_unit_mmap_buf_s {
+ nxt_unit_buf_t buf;
+
+ nxt_port_mmap_header_t *hdr;
+ nxt_queue_link_t link;
+ nxt_unit_port_id_t port_id;
+ nxt_unit_request_info_t *req;
+ nxt_unit_ctx_impl_t *ctx_impl;
+};
+
+
+struct nxt_unit_recv_msg_s {
+ nxt_port_msg_t port_msg;
+
+ void *start;
+ uint32_t size;
+
+ nxt_unit_process_t *process;
+};
+
+
+typedef enum {
+ NXT_UNIT_RS_START = 0,
+ NXT_UNIT_RS_RESPONSE_INIT,
+ NXT_UNIT_RS_RESPONSE_HAS_CONTENT,
+ NXT_UNIT_RS_RESPONSE_SENT,
+ NXT_UNIT_RS_DONE,
+} nxt_unit_req_state_t;
+
+
+struct nxt_unit_request_info_impl_s {
+ nxt_unit_request_info_t req;
+
+ nxt_unit_recv_msg_t recv_msg;
+ nxt_queue_t outgoing_buf; /* of nxt_unit_mmap_buf_t */
+ nxt_queue_t incoming_buf; /* of nxt_unit_mmap_buf_t */
+
+ nxt_unit_req_state_t state;
+
+ nxt_queue_link_t link;
+
+ char extra_data[];
+};
+
+
+struct nxt_unit_ctx_impl_s {
+ nxt_unit_ctx_t ctx;
+
+ nxt_unit_port_id_t read_port_id;
+ int read_port_fd;
+
+ nxt_queue_link_t link;
+
+ nxt_queue_t free_buf; /* of nxt_unit_mmap_buf_t */
+
+ /* of nxt_unit_request_info_impl_t */
+ nxt_queue_t free_req;
+
+ /* of nxt_unit_request_info_impl_t */
+ nxt_queue_t active_req;
+
+ nxt_unit_mmap_buf_t ctx_buf[2];
+
+ nxt_unit_request_info_impl_t req;
+};
+
+
+struct nxt_unit_impl_s {
+ nxt_unit_t unit;
+ nxt_unit_callbacks_t callbacks;
+
+ uint32_t request_data_size;
+
+ pthread_mutex_t mutex;
+
+ nxt_lvlhsh_t processes; /* of nxt_unit_process_t */
+ nxt_lvlhsh_t ports; /* of nxt_unit_port_impl_t */
+
+ nxt_unit_port_id_t ready_port_id;
+
+ nxt_queue_t contexts; /* of nxt_unit_ctx_impl_t */
+
+ pid_t pid;
+ int log_fd;
+ int online;
+
+ nxt_unit_ctx_impl_t main_ctx;
+};
+
+
+struct nxt_unit_port_impl_s {
+ nxt_unit_port_t port;
+
+ nxt_queue_link_t link;
+ nxt_unit_process_t *process;
+};
+
+
+struct nxt_unit_mmap_s {
+ nxt_port_mmap_header_t *hdr;
+};
+
+
+struct nxt_unit_mmaps_s {
+ pthread_mutex_t mutex;
+ uint32_t size;
+ uint32_t cap;
+ nxt_unit_mmap_t *elts;
+};
+
+
+struct nxt_unit_process_s {
+ pid_t pid;
+
+ nxt_queue_t ports;
+
+ nxt_unit_mmaps_t incoming;
+ nxt_unit_mmaps_t outgoing;
+
+ nxt_unit_impl_t *lib;
+
+ nxt_atomic_t use_count;
+
+ uint32_t next_port_id;
+};
+
+
+/* Explicitly using 32 bit types to avoid possible alignment. */
+typedef struct {
+ int32_t pid;
+ uint32_t id;
+} nxt_unit_port_hash_id_t;
+
+
+nxt_unit_ctx_t *
+nxt_unit_init(nxt_unit_init_t *init)
+{
+ int rc;
+ uint32_t ready_stream;
+ nxt_unit_ctx_t *ctx;
+ nxt_unit_impl_t *lib;
+ nxt_unit_port_t ready_port, read_port;
+
+ lib = nxt_unit_create(init);
+ if (nxt_slow_path(lib == NULL)) {
+ return NULL;
+ }
+
+ if (init->ready_port.id.pid != 0
+ && init->ready_stream != 0
+ && init->read_port.id.pid != 0)
+ {
+ ready_port = init->ready_port;
+ ready_stream = init->ready_stream;
+ read_port = init->read_port;
+ lib->log_fd = init->log_fd;
+
+ nxt_unit_port_id_init(&ready_port.id, ready_port.id.pid,
+ ready_port.id.id);
+ nxt_unit_port_id_init(&read_port.id, read_port.id.pid,
+ read_port.id.id);
+ } else {
+ rc = nxt_unit_read_env(&ready_port, &read_port, &lib->log_fd,
+ &ready_stream);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ goto fail;
+ }
+ }
+
+ ctx = &lib->main_ctx.ctx;
+
+ rc = lib->callbacks.add_port(ctx, &ready_port);
+ if (rc != NXT_UNIT_OK) {
+ nxt_unit_alert(NULL, "failed to add ready_port");
+
+ goto fail;
+ }
+
+ rc = lib->callbacks.add_port(ctx, &read_port);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ nxt_unit_alert(NULL, "failed to add read_port");
+
+ goto fail;
+ }
+
+ lib->main_ctx.read_port_id = read_port.id;
+ lib->ready_port_id = ready_port.id;
+
+ rc = nxt_unit_ready(ctx, &ready_port.id, ready_stream);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ nxt_unit_alert(NULL, "failed to send READY message");
+
+ goto fail;
+ }
+
+ return ctx;
+
+fail:
+
+ free(lib);
+
+ return NULL;
+}
+
+
+static nxt_unit_impl_t *
+nxt_unit_create(nxt_unit_init_t *init)
+{
+ int rc;
+ nxt_unit_impl_t *lib;
+ nxt_unit_callbacks_t *cb;
+
+ lib = malloc(sizeof(nxt_unit_impl_t) + init->request_data_size);
+ if (nxt_slow_path(lib == NULL)) {
+ nxt_unit_alert(NULL, "failed to allocate unit struct");
+
+ return NULL;
+ }
+
+ rc = pthread_mutex_init(&lib->mutex, NULL);
+ if (nxt_slow_path(rc != 0)) {
+ nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc);
+
+ goto fail;
+ }
+
+ lib->unit.data = init->data;
+ lib->callbacks = init->callbacks;
+
+ lib->request_data_size = init->request_data_size;
+
+ lib->processes.slot = NULL;
+ lib->ports.slot = NULL;
+
+ lib->pid = getpid();
+ lib->log_fd = STDERR_FILENO;
+ lib->online = 1;
+
+ nxt_queue_init(&lib->contexts);
+
+ nxt_unit_ctx_init(lib, &lib->main_ctx, init->ctx_data);
+
+ cb = &lib->callbacks;
+
+ if (cb->request_handler == NULL) {
+ nxt_unit_alert(NULL, "request_handler is NULL");
+
+ goto fail;
+ }
+
+ if (cb->add_port == NULL) {
+ cb->add_port = nxt_unit_add_port;
+ }
+
+ if (cb->remove_port == NULL) {
+ cb->remove_port = nxt_unit_remove_port;
+ }
+
+ if (cb->remove_pid == NULL) {
+ cb->remove_pid = nxt_unit_remove_pid;
+ }
+
+ if (cb->quit == NULL) {
+ cb->quit = nxt_unit_quit;
+ }
+
+ if (cb->port_send == NULL) {
+ cb->port_send = nxt_unit_port_send_default;
+ }
+
+ if (cb->port_recv == NULL) {
+ cb->port_recv = nxt_unit_port_recv_default;
+ }
+
+ return lib;
+
+fail:
+
+ free(lib);
+
+ return NULL;
+}
+
+
+static void
+nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl,
+ void *data)
+{
+ ctx_impl->ctx.data = data;
+ ctx_impl->ctx.unit = &lib->unit;
+
+ nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link);
+
+ nxt_queue_init(&ctx_impl->free_buf);
+ nxt_queue_init(&ctx_impl->free_req);
+ nxt_queue_init(&ctx_impl->active_req);
+
+ nxt_queue_insert_tail(&ctx_impl->free_buf, &ctx_impl->ctx_buf[0].link);
+ nxt_queue_insert_tail(&ctx_impl->free_buf, &ctx_impl->ctx_buf[1].link);
+ nxt_queue_insert_tail(&ctx_impl->free_req, &ctx_impl->req.link);
+
+ ctx_impl->req.req.ctx = &ctx_impl->ctx;
+ ctx_impl->req.req.unit = &lib->unit;
+
+ ctx_impl->read_port_fd = -1;
+}
+
+
+static int
+nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *read_port,
+ int *log_fd, uint32_t *stream)
+{
+ int rc;
+ int ready_fd, read_fd;
+ char *unit_init, *version_end;
+ long version_length;
+ int64_t ready_pid, read_pid;
+ uint32_t ready_stream, ready_id, read_id;
+
+ unit_init = getenv(NXT_UNIT_INIT_ENV);
+ if (nxt_slow_path(unit_init == NULL)) {
+ nxt_unit_alert(NULL, "%s is not in the current environment",
+ NXT_UNIT_INIT_ENV);
+
+ return NXT_UNIT_ERROR;
+ }
+
+ nxt_unit_debug(NULL, "%s='%s'", NXT_UNIT_INIT_ENV, unit_init);
+
+ version_length = nxt_length(NXT_VERSION);
+
+ version_end = strchr(unit_init, ';');
+ if (version_end == NULL
+ || version_end - unit_init != version_length
+ || memcmp(unit_init, NXT_VERSION, version_length) != 0)
+ {
+ nxt_unit_alert(NULL, "version check error");
+
+ return NXT_UNIT_ERROR;
+ }
+
+ rc = sscanf(version_end + 1,
+ "%"PRIu32";"
+ "%"PRId64",%"PRIu32",%d;"
+ "%"PRId64",%"PRIu32",%d;"
+ "%d",
+ &ready_stream,
+ &ready_pid, &ready_id, &ready_fd,
+ &read_pid, &read_id, &read_fd,
+ log_fd);
+
+ if (nxt_slow_path(rc != 8)) {
+ nxt_unit_alert(NULL, "failed to scan variables");
+
+ return NXT_UNIT_ERROR;
+ }
+
+ nxt_unit_port_id_init(&ready_port->id, (pid_t) ready_pid, ready_id);
+
+ ready_port->in_fd = -1;
+ ready_port->out_fd = ready_fd;
+ ready_port->data = NULL;
+
+ nxt_unit_port_id_init(&read_port->id, (pid_t) read_pid, read_id);
+
+ read_port->in_fd = read_fd;
+ read_port->out_fd = -1;
+ read_port->data = NULL;
+
+ *stream = ready_stream;
+
+ return NXT_UNIT_OK;
+}
+
+
+static int
+nxt_unit_ready(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
+ uint32_t stream)
+{
+ ssize_t res;
+ nxt_port_msg_t msg;
+ nxt_unit_impl_t *lib;
+
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+
+ msg.stream = stream;
+ msg.pid = lib->pid;
+ msg.reply_port = 0;
+ msg.type = _NXT_PORT_MSG_PROCESS_READY;
+ msg.last = 1;
+ msg.mmap = 0;
+ msg.nf = 0;
+ msg.mf = 0;
+ msg.tracking = 0;
+
+ res = lib->callbacks.port_send(ctx, port_id, &msg, sizeof(msg), NULL, 0);
+ if (res != sizeof(msg)) {
+ return NXT_UNIT_ERROR;
+ }
+
+ return NXT_UNIT_OK;
+}
+
+
+int
+nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
+ void *buf, size_t buf_size, void *oob, size_t oob_size)
+{
+ int fd, rc;
+ pid_t pid;
+ nxt_queue_t incoming_buf;
+ struct cmsghdr *cm;
+ nxt_port_msg_t *port_msg;
+ nxt_unit_impl_t *lib;
+ nxt_unit_port_t new_port;
+ nxt_queue_link_t *lnk;
+ nxt_unit_request_t *r;
+ nxt_unit_mmap_buf_t *b;
+ nxt_unit_recv_msg_t recv_msg;
+ nxt_unit_callbacks_t *cb;
+ nxt_port_msg_new_port_t *new_port_msg;
+ nxt_unit_request_info_t *req;
+ nxt_unit_request_info_impl_t *req_impl;
+
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+
+ rc = NXT_UNIT_ERROR;
+ fd = -1;
+ recv_msg.process = NULL;
+ port_msg = buf;
+ cm = oob;
+
+ if (oob_size >= CMSG_SPACE(sizeof(int))
+ && cm->cmsg_len == CMSG_LEN(sizeof(int))
+ && cm->cmsg_level == SOL_SOCKET
+ && cm->cmsg_type == SCM_RIGHTS)
+ {
+ memcpy(&fd, CMSG_DATA(cm), sizeof(int));
+ }
+
+ if (nxt_slow_path(buf_size < sizeof(nxt_port_msg_t))) {
+ nxt_unit_warn(ctx, "message too small (%d bytes)", (int) buf_size);
+ goto fail;
+ }
+
+ recv_msg.port_msg = *port_msg;
+ recv_msg.start = port_msg + 1;
+ recv_msg.size = buf_size - sizeof(nxt_port_msg_t);
+
+ if (nxt_slow_path(port_msg->type >= NXT_PORT_MSG_MAX)) {
+ nxt_unit_warn(ctx, "#%"PRIu32": unknown message type (%d)",
+ port_msg->stream, (int) port_msg->type);
+ goto fail;
+ }
+
+ if (port_msg->tracking && nxt_unit_tracking_read(ctx, &recv_msg) == 0) {
+ rc = NXT_UNIT_OK;
+
+ goto fail;
+ }
+
+ /* Fragmentation is unsupported. */
+ if (nxt_slow_path(port_msg->nf != 0 || port_msg->mf != 0)) {
+ nxt_unit_warn(ctx, "#%"PRIu32": fragmented message type (%d)",
+ port_msg->stream, (int) port_msg->type);
+ goto fail;
+ }
+
+ if (port_msg->mmap) {
+ nxt_queue_init(&incoming_buf);
+
+ if (nxt_unit_mmap_read(ctx, &recv_msg, &incoming_buf) != NXT_UNIT_OK) {
+ goto fail;
+ }
+ }
+
+ cb = &lib->callbacks;
+
+ switch (port_msg->type) {
+
+ case _NXT_PORT_MSG_QUIT:
+ nxt_unit_debug(ctx, "#%"PRIu32": quit", port_msg->stream);
+
+ cb->quit(ctx);
+ rc = NXT_UNIT_OK;
+ break;
+
+ case _NXT_PORT_MSG_NEW_PORT:
+ if (nxt_slow_path(recv_msg.size != sizeof(nxt_port_msg_new_port_t))) {
+ nxt_unit_warn(ctx, "#%"PRIu32": new_port: "
+ "invalid message size (%d)",
+ port_msg->stream, (int) recv_msg.size);
+
+ goto fail;
+ }
+
+ new_port_msg = recv_msg.start;
+
+ nxt_unit_debug(ctx, "#%"PRIu32": new_port: %d,%d fd %d",
+ port_msg->stream, (int) new_port_msg->pid,
+ (int) new_port_msg->id, fd);
+
+ nxt_unit_port_id_init(&new_port.id, new_port_msg->pid,
+ new_port_msg->id);
+
+ new_port.in_fd = -1;
+ new_port.out_fd = fd;
+ new_port.data = NULL;
+
+ fd = -1;
+
+ rc = cb->add_port(ctx, &new_port);
+ break;
+
+ case _NXT_PORT_MSG_CHANGE_FILE:
+ nxt_unit_debug(ctx, "#%"PRIu32": change_file: fd %d",
+ port_msg->stream, fd);
+ break;
+
+ case _NXT_PORT_MSG_MMAP:
+ rc = nxt_unit_incoming_mmap(ctx, port_msg->pid, fd);
+ break;
+
+ case _NXT_PORT_MSG_DATA:
+ if (nxt_slow_path(port_msg->mmap == 0)) {
+ nxt_unit_warn(ctx, "#%"PRIu32": data is not in shared memory",
+ port_msg->stream);
+
+ goto fail;
+ }
+
+ if (nxt_slow_path(recv_msg.size < sizeof(nxt_unit_request_t))) {
+ nxt_unit_warn(ctx, "#%"PRIu32": data too short: %d while at least "
+ "%d expected", port_msg->stream, (int) recv_msg.size,
+ (int) sizeof(nxt_unit_request_t));
+
+ goto fail;
+ }
+
+ req_impl = nxt_unit_request_info_get(ctx);
+ if (nxt_slow_path(req_impl == NULL)) {
+ nxt_unit_warn(ctx, "#%"PRIu32": request info allocation failed",
+ port_msg->stream);
+
+ goto fail;
+ }
+
+ req = &req_impl->req;
+
+ req->request_port = *port_id;
+
+ nxt_unit_port_id_init(&req->response_port, port_msg->pid,
+ port_msg->reply_port);
+
+ req->request = recv_msg.start;
+
+ lnk = nxt_queue_first(&incoming_buf);
+ b = nxt_container_of(lnk, nxt_unit_mmap_buf_t, link);
+
+ req->request_buf = &b->buf;
+ req->response = NULL;
+ req->response_buf = NULL;
+
+ r = req->request;
+
+ req->content_length = r->content_length;
+
+ req->content_buf = req->request_buf;
+ req->content_buf->free = nxt_unit_sptr_get(&r->preread_content);
+
+ /* Move process to req_impl. */
+ req_impl->recv_msg = recv_msg;
+
+ recv_msg.process = NULL;
+
+ nxt_queue_init(&req_impl->outgoing_buf);
+ nxt_queue_init(&req_impl->incoming_buf);
+
+ nxt_queue_each(b, &incoming_buf, nxt_unit_mmap_buf_t, link)
+ {
+ b->req = req;
+ } nxt_queue_loop;
+
+ nxt_queue_add(&req_impl->incoming_buf, &incoming_buf);
+ nxt_queue_init(&incoming_buf);
+
+ req->response_max_fields = 0;
+ req_impl->state = NXT_UNIT_RS_START;
+
+ nxt_unit_debug(ctx, "#%"PRIu32": %.*s %.*s (%d)", port_msg->stream,
+ (int) r->method_length, nxt_unit_sptr_get(&r->method),
+ (int) r->target_length, nxt_unit_sptr_get(&r->target),
+ (int) r->content_length);
+
+ cb->request_handler(req);
+
+ rc = NXT_UNIT_OK;
+ break;
+
+ case _NXT_PORT_MSG_REMOVE_PID:
+ if (nxt_slow_path(recv_msg.size != sizeof(pid))) {
+ nxt_unit_warn(ctx, "#%"PRIu32": remove_pid: invalid message size "
+ "(%d != %d)", port_msg->stream, (int) recv_msg.size,
+ (int) sizeof(pid));
+
+ goto fail;
+ }
+
+ memcpy(&pid, recv_msg.start, sizeof(pid));
+
+ nxt_unit_debug(ctx, "#%"PRIu32": remove_pid: %d",
+ port_msg->stream, (int) pid);
+
+ cb->remove_pid(ctx, pid);
+
+ rc = NXT_UNIT_OK;
+ break;
+
+ default:
+ nxt_unit_debug(ctx, "#%"PRIu32": ignore message type: %d",
+ port_msg->stream, (int) port_msg->type);
+
+ goto fail;
+ }
+
+fail:
+
+ if (fd != -1) {
+ close(fd);
+ }
+
+ if (port_msg->mmap) {
+ nxt_queue_each(b, &incoming_buf, nxt_unit_mmap_buf_t, link)
+ {
+ nxt_unit_mmap_release(b->hdr, b->buf.start,
+ b->buf.end - b->buf.start);
+
+ nxt_unit_mmap_buf_release(b);
+ } nxt_queue_loop;
+ }
+
+ if (recv_msg.process != NULL) {
+ nxt_unit_process_use(ctx, recv_msg.process, -1);
+ }
+
+ return rc;
+}
+
+
+static nxt_unit_request_info_impl_t *
+nxt_unit_request_info_get(nxt_unit_ctx_t *ctx)
+{
+ nxt_unit_impl_t *lib;
+ nxt_queue_link_t *lnk;
+ nxt_unit_ctx_impl_t *ctx_impl;
+ nxt_unit_request_info_impl_t *req_impl;
+
+ ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
+
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+
+ if (nxt_queue_is_empty(&ctx_impl->free_req)) {
+ req_impl = malloc(sizeof(nxt_unit_request_info_impl_t)
+ + lib->request_data_size);
+ if (nxt_slow_path(req_impl == NULL)) {
+ nxt_unit_warn(ctx, "request info allocation failed");
+
+ return NULL;
+ }
+
+ req_impl->req.unit = ctx->unit;
+ req_impl->req.ctx = ctx;
+
+ } else {
+ lnk = nxt_queue_first(&ctx_impl->free_req);
+ nxt_queue_remove(lnk);
+
+ req_impl = nxt_container_of(lnk, nxt_unit_request_info_impl_t, link);
+ }
+
+ nxt_queue_insert_tail(&ctx_impl->active_req, &req_impl->link);
+
+ req_impl->req.data = lib->request_data_size ? req_impl->extra_data : NULL;
+
+ return req_impl;
+}
+
+
+static void
+nxt_unit_request_info_release(nxt_unit_request_info_t *req)
+{
+ nxt_unit_mmap_buf_t *b;
+ nxt_unit_ctx_impl_t *ctx_impl;
+ nxt_unit_recv_msg_t *recv_msg;
+ nxt_unit_request_info_impl_t *req_impl;
+
+ ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx);
+ req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
+
+ req->response = NULL;
+ req->response_buf = NULL;
+
+ recv_msg = &req_impl->recv_msg;
+
+ if (recv_msg->process != NULL) {
+ nxt_unit_process_use(req->ctx, recv_msg->process, -1);
+
+ recv_msg->process = NULL;
+ }
+
+ nxt_queue_each(b, &req_impl->outgoing_buf, nxt_unit_mmap_buf_t, link) {
+
+ nxt_unit_buf_free(&b->buf);
+
+ } nxt_queue_loop;
+
+ nxt_queue_each(b, &req_impl->incoming_buf, nxt_unit_mmap_buf_t, link) {
+
+ nxt_unit_mmap_release(b->hdr, b->buf.start, b->buf.end - b->buf.start);
+ nxt_unit_mmap_buf_release(b);
+
+ } nxt_queue_loop;
+
+ nxt_queue_remove(&req_impl->link);
+
+ nxt_queue_insert_tail(&ctx_impl->free_req, &req_impl->link);
+}
+
+
+static void
+nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req_impl)
+{
+ nxt_unit_ctx_impl_t *ctx_impl;
+
+ ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t, ctx);
+
+ nxt_queue_remove(&req_impl->link);
+
+ if (req_impl != &ctx_impl->req) {
+ free(req_impl);
+ }
+}
+
+
+uint16_t
+nxt_unit_field_hash(const char *name, size_t name_length)
+{
+ u_char ch;
+ uint32_t hash;
+ const char *p, *end;
+
+ hash = 159406; /* Magic value copied from nxt_http_parse.c */
+ end = name + name_length;
+
+ for (p = name; p < end; p++) {
+ ch = *p;
+ hash = (hash << 4) + hash + nxt_lowcase(ch);
+ }
+
+ hash = (hash >> 16) ^ hash;
+
+ return hash;
+}
+
+
+void
+nxt_unit_split_host(char *host, uint32_t host_length,
+ char **name, uint32_t *name_length, char **port, uint32_t *port_length)
+{
+ char *cpos;
+
+ static char default_host[] = "localhost";
+ static char default_port[] = "80";
+
+ if (nxt_slow_path(host == NULL || host_length == 0)) {
+ *name = default_host;
+ *name_length = nxt_length(default_host);
+
+ *port = default_port;
+ *port_length = nxt_length(default_port);
+
+ return;
+ }
+
+ cpos = memchr(host, ':', host_length);
+
+ if (nxt_slow_path(cpos == NULL)) {
+ *name = host;
+ *name_length = host_length;
+
+ *port = default_port;
+ *port_length = nxt_length(default_port);
+
+ return;
+ }
+
+ if (nxt_slow_path(cpos == host)) {
+ *name = default_host;
+ *name_length = nxt_length(default_host);
+
+ } else {
+ *name = host;
+ *name_length = cpos - host;
+ }
+
+ cpos++;
+
+ if (nxt_slow_path(host + host_length == cpos)) {
+ *port = default_port;
+ *port_length = nxt_length(default_port);
+
+ } else {
+ *port = cpos;
+ *port_length = host_length - (cpos - host);
+ }
+}
+
+
+void
+nxt_unit_request_group_dup_fields(nxt_unit_request_info_t *req)
+{
+ uint32_t i, j;
+ nxt_unit_field_t *fields, f;
+ nxt_unit_request_t *r;
+
+ nxt_unit_req_debug(req, "group_dup_fields");
+
+ r = req->request;
+ fields = r->fields;
+
+ for (i = 0; i < r->fields_count; i++) {
+
+ switch (fields[i].hash) {
+ case NXT_UNIT_HASH_HOST:
+ r->host_field = i;
+ break;
+
+ case NXT_UNIT_HASH_CONTENT_LENGTH:
+ r->content_length_field = i;
+ break;
+
+ case NXT_UNIT_HASH_CONTENT_TYPE:
+ r->content_type_field = i;
+ break;
+
+ case NXT_UNIT_HASH_COOKIE:
+ r->cookie_field = i;
+ break;
+ };
+
+ for (j = i + 1; j < r->fields_count; j++) {
+ if (fields[i].hash != fields[j].hash) {
+ continue;
+ }
+
+ if (j == i + 1) {
+ continue;
+ }
+
+ f = fields[j];
+ f.name.offset += (j - (i + 1)) * sizeof(f);
+ f.value.offset += (j - (i + 1)) * sizeof(f);
+
+ while (j > i + 1) {
+ fields[j] = fields[j - 1];
+ fields[j].name.offset -= sizeof(f);
+ fields[j].value.offset -= sizeof(f);
+ j--;
+ }
+
+ fields[j] = f;
+
+ i++;
+ }
+ }
+}
+
+
+int
+nxt_unit_response_init(nxt_unit_request_info_t *req,
+ uint16_t status, uint32_t max_fields_count, uint32_t max_fields_size)
+{
+ uint32_t buf_size;
+ nxt_unit_buf_t *buf;
+ nxt_unit_request_info_impl_t *req_impl;
+
+ req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
+
+ if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
+ nxt_unit_req_warn(req, "init: response already sent");
+
+ return NXT_UNIT_ERROR;
+ }
+
+ nxt_unit_req_debug(req, "init: %d, max fields %d/%d", (int) status,
+ (int) max_fields_count, (int) max_fields_size);
+
+ if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT)) {
+ nxt_unit_req_debug(req, "duplicate response init");
+ }
+
+ buf_size = sizeof(nxt_unit_response_t)
+ + max_fields_count * sizeof(nxt_unit_field_t)
+ + max_fields_size;
+
+ if (nxt_slow_path(req->response_buf != NULL)) {
+ buf = req->response_buf;
+
+ if (nxt_fast_path(buf_size <= (uint32_t) (buf->end - buf->start))) {
+ goto init_response;
+ }
+
+ nxt_unit_buf_free(buf);
+
+ req->response_buf = NULL;
+ req->response = NULL;
+ req->response_max_fields = 0;
+
+ req_impl->state = NXT_UNIT_RS_START;
+ }
+
+ buf = nxt_unit_response_buf_alloc(req, buf_size);
+ if (nxt_slow_path(buf == NULL)) {
+ return NXT_UNIT_ERROR;
+ }
+
+init_response:
+
+ memset(buf->start, 0, sizeof(nxt_unit_response_t));
+
+ req->response_buf = buf;
+
+ req->response = (nxt_unit_response_t *) buf->start;
+ req->response->status = status;
+
+ buf->free = buf->start + sizeof(nxt_unit_response_t)
+ + max_fields_count * sizeof(nxt_unit_field_t);
+
+ req->response_max_fields = max_fields_count;
+ req_impl->state = NXT_UNIT_RS_RESPONSE_INIT;
+
+ return NXT_UNIT_OK;
+}
+
+
+int
+nxt_unit_response_realloc(nxt_unit_request_info_t *req,
+ uint32_t max_fields_count, uint32_t max_fields_size)
+{
+ char *p;
+ uint32_t i, buf_size;
+ nxt_unit_buf_t *buf;
+ nxt_unit_field_t *f, *src;
+ nxt_unit_response_t *resp;
+ nxt_unit_request_info_impl_t *req_impl;
+
+ req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
+
+ if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
+ nxt_unit_req_warn(req, "realloc: response not init");
+
+ return NXT_UNIT_ERROR;
+ }
+
+ if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
+ nxt_unit_req_warn(req, "realloc: response already sent");
+
+ return NXT_UNIT_ERROR;
+ }
+
+ if (nxt_slow_path(max_fields_count < req->response->fields_count)) {
+ nxt_unit_req_warn(req, "realloc: new max_fields_count is too small");
+
+ return NXT_UNIT_ERROR;
+ }
+
+ buf_size = sizeof(nxt_unit_response_t)
+ + max_fields_count * sizeof(nxt_unit_field_t)
+ + max_fields_size;
+
+ buf = nxt_unit_response_buf_alloc(req, buf_size);
+ if (nxt_slow_path(buf == NULL)) {
+ return NXT_UNIT_ERROR;
+ }
+
+ resp = (nxt_unit_response_t *) buf->start;
+
+ memset(resp, 0, sizeof(nxt_unit_response_t));
+
+ resp->status = req->response->status;
+ resp->content_length = req->response->content_length;
+
+ p = buf->start + max_fields_count * sizeof(nxt_unit_field_t);
+ f = resp->fields;
+
+ for (i = 0; i < req->response->fields_count; i++) {
+ src = req->request->fields + i;
+
+ if (nxt_slow_path(src->skip != 0)) {
+ continue;
+ }
+
+ if (nxt_slow_path(src->name_length + src->value_length
+ > (uint32_t) (buf->end - p)))
+ {
+ goto fail;
+ }
+
+ nxt_unit_sptr_set(&f->name, p);
+ p = nxt_cpymem(p, nxt_unit_sptr_get(&src->name), src->name_length);
+
+ nxt_unit_sptr_set(&f->value, p);
+ p = nxt_cpymem(p, nxt_unit_sptr_get(&src->value), src->value_length);
+
+ f->hash = src->hash;
+ f->skip = 0;
+ f->name_length = src->name_length;
+ f->value_length = src->value_length;
+
+ resp->fields_count++;
+ f++;
+ }
+
+ if (req->response->piggyback_content_length > 0) {
+ if (nxt_slow_path(req->response->piggyback_content_length
+ > (uint32_t) (buf->end - p)))
+ {
+ goto fail;
+ }
+
+ resp->piggyback_content_length = req->response->piggyback_content_length;
+
+ nxt_unit_sptr_set(&resp->piggyback_content, p);
+ p = nxt_cpymem(p, nxt_unit_sptr_get(&req->response->piggyback_content),
+ req->response->piggyback_content_length);
+ }
+
+ buf->free = p;
+
+ nxt_unit_buf_free(req->response_buf);
+
+ req->response = resp;
+ req->response_buf = buf;
+ req->response_max_fields = max_fields_count;
+
+ return NXT_UNIT_OK;
+
+fail:
+
+ nxt_unit_buf_free(buf);
+
+ return NXT_UNIT_ERROR;
+}
+
+
+int
+nxt_unit_response_is_init(nxt_unit_request_info_t *req)
+{
+ nxt_unit_request_info_impl_t *req_impl;
+
+ req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
+
+ return req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT;
+}
+
+
+int
+nxt_unit_response_add_field(nxt_unit_request_info_t *req,
+ const char *name, uint8_t name_length,
+ const char *value, uint32_t value_length)
+{
+ nxt_unit_buf_t *buf;
+ nxt_unit_field_t *f;
+ nxt_unit_response_t *resp;
+ nxt_unit_request_info_impl_t *req_impl;
+
+ req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
+
+ if (nxt_slow_path(req_impl->state != NXT_UNIT_RS_RESPONSE_INIT)) {
+ nxt_unit_req_warn(req, "add_field: response not initialized or "
+ "already sent");
+
+ return NXT_UNIT_ERROR;
+ }
+
+ resp = req->response;
+
+ if (nxt_slow_path(resp->fields_count >= req->response_max_fields)) {
+ nxt_unit_req_warn(req, "add_field: too many response fields");
+
+ return NXT_UNIT_ERROR;
+ }
+
+ buf = req->response_buf;
+
+ if (nxt_slow_path(name_length + value_length
+ > (uint32_t) (buf->end - buf->free)))
+ {
+ nxt_unit_req_warn(req, "add_field: response buffer overflow");
+
+ return NXT_UNIT_ERROR;
+ }
+
+ nxt_unit_req_debug(req, "add_field #%"PRIu32": %.*s: %.*s",
+ resp->fields_count,
+ (int) name_length, name,
+ (int) value_length, value);
+
+ f = resp->fields + resp->fields_count;
+
+ nxt_unit_sptr_set(&f->name, buf->free);
+ buf->free = nxt_cpymem(buf->free, name, name_length);
+
+ nxt_unit_sptr_set(&f->value, buf->free);
+ buf->free = nxt_cpymem(buf->free, value, value_length);
+
+ f->hash = nxt_unit_field_hash(name, name_length);
+ f->skip = 0;
+ f->name_length = name_length;
+ f->value_length = value_length;
+
+ resp->fields_count++;
+
+ return NXT_UNIT_OK;
+}
+
+
+int
+nxt_unit_response_add_content(nxt_unit_request_info_t *req,
+ const void* src, uint32_t size)
+{
+ nxt_unit_buf_t *buf;
+ nxt_unit_response_t *resp;
+ nxt_unit_request_info_impl_t *req_impl;
+
+ req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
+
+ if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
+ nxt_unit_req_warn(req, "add_content: response not initialized yet");
+
+ return NXT_UNIT_ERROR;
+ }
+
+ if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
+ nxt_unit_req_warn(req, "add_content: response already sent");
+
+ return NXT_UNIT_ERROR;
+ }
+
+ buf = req->response_buf;
+
+ if (nxt_slow_path(size > (uint32_t) (buf->end - buf->free))) {
+ nxt_unit_req_warn(req, "add_content: buffer overflow");
+
+ return NXT_UNIT_ERROR;
+ }
+
+ resp = req->response;
+
+ if (resp->piggyback_content_length == 0) {
+ nxt_unit_sptr_set(&resp->piggyback_content, buf->free);
+ req_impl->state = NXT_UNIT_RS_RESPONSE_HAS_CONTENT;
+ }
+
+ resp->piggyback_content_length += size;
+
+ buf->free = nxt_cpymem(buf->free, src, size);
+
+ return NXT_UNIT_OK;
+}
+
+
+int
+nxt_unit_response_send(nxt_unit_request_info_t *req)
+{
+ int rc;
+ nxt_unit_mmap_buf_t *mmap_buf;
+ nxt_unit_request_info_impl_t *req_impl;
+
+ req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
+
+ if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
+ nxt_unit_req_warn(req, "send: response is not initialized yet");
+
+ return NXT_UNIT_ERROR;
+ }
+
+ if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
+ nxt_unit_req_warn(req, "send: response already sent");
+
+ return NXT_UNIT_ERROR;
+ }
+
+ nxt_unit_req_debug(req, "send: %"PRIu32" fields, %d bytes",
+ req->response->fields_count,
+ (int) (req->response_buf->free
+ - req->response_buf->start));
+
+ mmap_buf = nxt_container_of(req->response_buf, nxt_unit_mmap_buf_t, buf);
+
+ rc = nxt_unit_mmap_buf_send(req->ctx,
+ req_impl->recv_msg.port_msg.stream,
+ mmap_buf, 0);
+ if (nxt_fast_path(rc == NXT_UNIT_OK)) {
+ req->response = NULL;
+ req->response_buf = NULL;
+ req_impl->state = NXT_UNIT_RS_RESPONSE_SENT;
+
+ nxt_unit_mmap_buf_release(mmap_buf);
+ }
+
+ return rc;
+}
+
+
+int
+nxt_unit_response_is_sent(nxt_unit_request_info_t *req)
+{
+ nxt_unit_request_info_impl_t *req_impl;
+
+ req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
+
+ return req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT;
+}
+
+
+nxt_unit_buf_t *
+nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size)
+{
+ int rc;
+ nxt_unit_process_t *process;
+ nxt_unit_mmap_buf_t *mmap_buf;
+ nxt_unit_request_info_impl_t *req_impl;
+
+ if (nxt_slow_path(size > PORT_MMAP_DATA_SIZE)) {
+ nxt_unit_req_warn(req, "response_buf_alloc: "
+ "requested buffer (%"PRIu32") too big", size);
+
+ return NULL;
+ }
+
+ nxt_unit_req_debug(req, "response_buf_alloc: %"PRIu32, size);
+
+ req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
+
+ process = nxt_unit_msg_get_process(req->ctx, &req_impl->recv_msg);
+ if (nxt_slow_path(process == NULL)) {
+ return NULL;
+ }
+
+ mmap_buf = nxt_unit_mmap_buf_get(req->ctx);
+ if (nxt_slow_path(mmap_buf == NULL)) {
+ return NULL;
+ }
+
+ mmap_buf->req = req;
+
+ nxt_queue_insert_tail(&req_impl->outgoing_buf, &mmap_buf->link);
+
+ rc = nxt_unit_get_outgoing_buf(req->ctx, process, &req->response_port,
+ size, mmap_buf);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ nxt_unit_mmap_buf_release(mmap_buf);
+
+ return NULL;
+ }
+
+ return &mmap_buf->buf;
+}
+
+
+static nxt_unit_process_t *
+nxt_unit_msg_get_process(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
+{
+ nxt_unit_impl_t *lib;
+
+ if (recv_msg->process != NULL) {
+ return recv_msg->process;
+ }
+
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+
+ pthread_mutex_lock(&lib->mutex);
+
+ recv_msg->process = nxt_unit_process_find(ctx, recv_msg->port_msg.pid, 0);
+
+ pthread_mutex_unlock(&lib->mutex);
+
+ if (recv_msg->process == NULL) {
+ nxt_unit_warn(ctx, "#%"PRIu32": process %d not found",
+ recv_msg->port_msg.stream, (int) recv_msg->port_msg.pid);
+ }
+
+ return recv_msg->process;
+}
+
+
+static nxt_unit_mmap_buf_t *
+nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx)
+{
+ nxt_queue_link_t *lnk;
+ nxt_unit_mmap_buf_t *mmap_buf;
+ nxt_unit_ctx_impl_t *ctx_impl;
+
+ ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
+
+ if (nxt_queue_is_empty(&ctx_impl->free_buf)) {
+ mmap_buf = malloc(sizeof(nxt_unit_mmap_buf_t));
+ if (nxt_slow_path(mmap_buf == NULL)) {
+ nxt_unit_warn(ctx, "failed to allocate buf");
+ }
+
+ } else {
+ lnk = nxt_queue_first(&ctx_impl->free_buf);
+ nxt_queue_remove(lnk);
+
+ mmap_buf = nxt_container_of(lnk, nxt_unit_mmap_buf_t, link);
+ }
+
+ mmap_buf->ctx_impl = ctx_impl;
+
+ return mmap_buf;
+}
+
+
+static void
+nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf)
+{
+ nxt_queue_remove(&mmap_buf->link);
+
+ nxt_queue_insert_tail(&mmap_buf->ctx_impl->free_buf, &mmap_buf->link);
+}
+
+
+int
+nxt_unit_buf_send(nxt_unit_buf_t *buf)
+{
+ int rc;
+ nxt_unit_mmap_buf_t *mmap_buf;
+ nxt_unit_request_info_t *req;
+ nxt_unit_request_info_impl_t *req_impl;
+
+ mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
+
+ req = mmap_buf->req;
+ req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
+
+ nxt_unit_req_debug(req, "buf_send: %d bytes",
+ (int) (buf->free - buf->start));
+
+ if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
+ nxt_unit_req_warn(req, "buf_send: response not initialized yet");
+
+ return NXT_UNIT_ERROR;
+ }
+
+ if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_SENT)) {
+ nxt_unit_req_warn(req, "buf_send: headers not sent yet");
+
+ return NXT_UNIT_ERROR;
+ }
+
+ if (nxt_fast_path(buf->free > buf->start)) {
+ rc = nxt_unit_mmap_buf_send(req->ctx,
+ req_impl->recv_msg.port_msg.stream,
+ mmap_buf, 0);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ return rc;
+ }
+ }
+
+ nxt_unit_mmap_buf_release(mmap_buf);
+
+ return NXT_UNIT_OK;
+}
+
+
+static void
+nxt_unit_buf_send_done(nxt_unit_buf_t *buf)
+{
+ int rc;
+ nxt_unit_mmap_buf_t *mmap_buf;
+ nxt_unit_request_info_t *req;
+ nxt_unit_request_info_impl_t *req_impl;
+
+ mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
+
+ req = mmap_buf->req;
+ req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
+
+ rc = nxt_unit_mmap_buf_send(req->ctx,
+ req_impl->recv_msg.port_msg.stream,
+ mmap_buf, 1);
+
+ if (nxt_slow_path(rc == NXT_UNIT_OK)) {
+ nxt_unit_mmap_buf_release(mmap_buf);
+
+ nxt_unit_request_info_release(req);
+
+ } else {
+ nxt_unit_request_done(req, rc);
+ }
+}
+
+
+static int
+nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream,
+ nxt_unit_mmap_buf_t *mmap_buf, int last)
+{
+ struct {
+ nxt_port_msg_t msg;
+ nxt_port_mmap_msg_t mmap_msg;
+ } m;
+
+ u_char *end, *last_used, *first_free;
+ ssize_t res;
+ nxt_chunk_id_t first_free_chunk;
+ nxt_unit_buf_t *buf;
+ nxt_unit_impl_t *lib;
+ nxt_port_mmap_header_t *hdr;
+
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+
+ buf = &mmap_buf->buf;
+
+ m.mmap_msg.size = buf->free - buf->start;
+
+ m.msg.stream = stream;
+ m.msg.pid = lib->pid;
+ m.msg.reply_port = 0;
+ m.msg.type = _NXT_PORT_MSG_DATA;
+ m.msg.last = last != 0;
+ m.msg.mmap = m.mmap_msg.size > 0;
+ m.msg.nf = 0;
+ m.msg.mf = 0;
+ m.msg.tracking = 0;
+
+ hdr = mmap_buf->hdr;
+
+ m.mmap_msg.mmap_id = hdr->id;
+ m.mmap_msg.chunk_id = nxt_port_mmap_chunk_id(hdr, (u_char *) buf->start);
+
+ nxt_unit_debug(ctx, "#%"PRIu32": send mmap: (%d,%d,%d)",
+ stream,
+ (int) m.mmap_msg.mmap_id,
+ (int) m.mmap_msg.chunk_id,
+ (int) m.mmap_msg.size);
+
+ res = lib->callbacks.port_send(ctx, &mmap_buf->port_id, &m,
+ m.mmap_msg.size > 0 ? sizeof(m)
+ : sizeof(m.msg),
+ NULL, 0);
+ if (nxt_slow_path(res != sizeof(m))) {
+ return NXT_UNIT_ERROR;
+ }
+
+ if (buf->end - buf->free >= PORT_MMAP_CHUNK_SIZE) {
+ last_used = (u_char *) buf->free - 1;
+
+ first_free_chunk = nxt_port_mmap_chunk_id(hdr, last_used) + 1;
+ first_free = nxt_port_mmap_chunk_start(hdr, first_free_chunk);
+ end = (u_char *) buf->end;
+
+ nxt_unit_mmap_release(hdr, first_free, (uint32_t) (end - first_free));
+
+ buf->end = (char *) first_free;
+ }
+
+ return NXT_UNIT_OK;
+}
+
+
+void
+nxt_unit_buf_free(nxt_unit_buf_t *buf)
+{
+ nxt_unit_mmap_buf_t *mmap_buf;
+
+ mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
+
+ nxt_unit_mmap_release(mmap_buf->hdr, buf->start, buf->end - buf->start);
+
+ nxt_unit_mmap_buf_release(mmap_buf);
+}
+
+
+nxt_unit_buf_t *
+nxt_unit_buf_next(nxt_unit_buf_t *buf)
+{
+ nxt_queue_link_t *lnk;
+ nxt_unit_mmap_buf_t *mmap_buf;
+ nxt_unit_request_info_impl_t *req_impl;
+
+ mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
+ req_impl = nxt_container_of(mmap_buf->req, nxt_unit_request_info_impl_t,
+ req);
+
+ lnk = &mmap_buf->link;
+
+ if (lnk == nxt_queue_last(&req_impl->incoming_buf)) {
+ return NULL;
+ }
+
+ lnk = nxt_queue_next(lnk);
+ mmap_buf = nxt_container_of(lnk, nxt_unit_mmap_buf_t, link);
+
+ return &mmap_buf->buf;
+}
+
+
+uint32_t
+nxt_unit_buf_max(void)
+{
+ return PORT_MMAP_DATA_SIZE;
+}
+
+
+uint32_t
+nxt_unit_buf_min(void)
+{
+ return PORT_MMAP_CHUNK_SIZE;
+}
+
+
+int
+nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start,
+ size_t size)
+{
+ int rc;
+ uint32_t part_size;
+ const char *part_start;
+ nxt_unit_process_t *process;
+ nxt_unit_mmap_buf_t mmap_buf;
+ nxt_unit_request_info_impl_t *req_impl;
+
+ req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
+
+ part_start = start;
+
+ /* Check if response is not send yet. */
+ if (nxt_slow_path(req->response_buf)) {
+ part_size = req->response_buf->end - req->response_buf->free;
+ part_size = nxt_min(size, part_size);
+
+ rc = nxt_unit_response_add_content(req, part_start, part_size);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ return rc;
+ }
+
+ rc = nxt_unit_response_send(req);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ return rc;
+ }
+
+ size -= part_size;
+ part_start += part_size;
+ }
+
+ process = nxt_unit_msg_get_process(req->ctx, &req_impl->recv_msg);
+ if (nxt_slow_path(process == NULL)) {
+ return NXT_UNIT_ERROR;
+ }
+
+ while (size > 0) {
+ part_size = nxt_min(size, PORT_MMAP_DATA_SIZE);
+
+ rc = nxt_unit_get_outgoing_buf(req->ctx, process, &req->response_port,
+ part_size, &mmap_buf);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ return rc;
+ }
+
+ mmap_buf.buf.free = nxt_cpymem(mmap_buf.buf.free,
+ part_start, part_size);
+
+ rc = nxt_unit_mmap_buf_send(req->ctx,
+ req_impl->recv_msg.port_msg.stream,
+ &mmap_buf, 0);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ nxt_unit_mmap_release(mmap_buf.hdr, mmap_buf.buf.start,
+ mmap_buf.buf.end - mmap_buf.buf.start);
+
+ return rc;
+ }
+
+ size -= part_size;
+ part_start += part_size;
+ }
+
+ return NXT_UNIT_OK;
+}
+
+
+int
+nxt_unit_response_write_cb(nxt_unit_request_info_t *req,
+ nxt_unit_read_info_t *read_info)
+{
+ int rc;
+ ssize_t n;
+ nxt_unit_buf_t *buf;
+
+ /* Check if response is not send yet. */
+ if (nxt_slow_path(req->response_buf)) {
+
+ /* Enable content in headers buf. */
+ rc = nxt_unit_response_add_content(req, "", 0);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ nxt_unit_req_error(req, "Failed to add piggyback content");
+
+ return rc;
+ }
+
+ buf = req->response_buf;
+
+ while (buf->end - buf->free > 0) {
+ n = read_info->read(read_info, buf->free, buf->end - buf->free);
+ if (nxt_slow_path(n < 0)) {
+ nxt_unit_req_error(req, "Read error");
+
+ return NXT_UNIT_ERROR;
+ }
+
+ /* Manually increase sizes. */
+ buf->free += n;
+ req->response->piggyback_content_length += n;
+
+ if (read_info->eof) {
+ break;
+ }
+ }
+
+ rc = nxt_unit_response_send(req);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ nxt_unit_req_error(req, "Failed to send headers with content");
+
+ return rc;
+ }
+
+ if (read_info->eof) {
+ return NXT_UNIT_OK;
+ }
+ }
+
+ while (!read_info->eof) {
+ buf = nxt_unit_response_buf_alloc(req, nxt_min(read_info->buf_size,
+ PORT_MMAP_DATA_SIZE));
+ if (nxt_slow_path(buf == NULL)) {
+ nxt_unit_req_error(req, "Failed to allocate buf for content");
+
+ return NXT_UNIT_ERROR;
+ }
+
+ while (!read_info->eof && buf->end > buf->free) {
+ n = read_info->read(read_info, buf->free, buf->end - buf->free);
+ if (nxt_slow_path(n < 0)) {
+ nxt_unit_req_error(req, "Read error");
+
+ nxt_unit_buf_free(buf);
+
+ return NXT_UNIT_ERROR;
+ }
+
+ buf->free += n;
+ }
+
+ rc = nxt_unit_buf_send(buf);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ nxt_unit_req_error(req, "Failed to send content");
+
+ return rc;
+ }
+ }
+
+ return NXT_UNIT_OK;
+}
+
+
+ssize_t
+nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, size_t size)
+{
+ u_char *p;
+ size_t rest, copy, read;
+ nxt_unit_buf_t *buf;
+
+ p = dst;
+ rest = size;
+
+ buf = req->content_buf;
+
+ while (buf != NULL) {
+ copy = buf->end - buf->free;
+ copy = nxt_min(rest, copy);
+
+ p = nxt_cpymem(p, buf->free, copy);
+
+ buf->free += copy;
+ rest -= copy;
+
+ if (rest == 0) {
+ if (buf->end == buf->free) {
+ buf = nxt_unit_buf_next(buf);
+ }
+
+ break;
+ }
+
+ buf = nxt_unit_buf_next(buf);
+ }
+
+ req->content_buf = buf;
+
+ read = size - rest;
+
+ req->content_length -= read;
+
+ return read;
+}
+
+
+void
+nxt_unit_request_done(nxt_unit_request_info_t *req, int rc)
+{
+ ssize_t res;
+ uint32_t size;
+ nxt_port_msg_t msg;
+ nxt_unit_impl_t *lib;
+ nxt_unit_request_info_impl_t *req_impl;
+
+ req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
+
+ nxt_unit_req_debug(req, "done: %d", rc);
+
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ goto skip_response_send;
+ }
+
+ if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
+
+ size = nxt_length("Content-Type") + nxt_length("text/plain");
+
+ rc = nxt_unit_response_init(req, 200, 1, size);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ goto skip_response_send;
+ }
+
+ rc = nxt_unit_response_add_field(req, "Content-Type",
+ nxt_length("Content-Type"),
+ "text/plain", nxt_length("text/plain"));
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ goto skip_response_send;
+ }
+ }
+
+ if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_SENT)) {
+
+ req_impl->state = NXT_UNIT_RS_RESPONSE_SENT;
+
+ nxt_unit_buf_send_done(req->response_buf);
+
+ return;
+ }
+
+skip_response_send:
+
+ lib = nxt_container_of(req->unit, nxt_unit_impl_t, unit);
+
+ msg.stream = req_impl->recv_msg.port_msg.stream;
+ msg.pid = lib->pid;
+ msg.reply_port = 0;
+ msg.type = (rc == NXT_UNIT_OK) ? _NXT_PORT_MSG_DATA
+ : _NXT_PORT_MSG_RPC_ERROR;
+ msg.last = 1;
+ msg.mmap = 0;
+ msg.nf = 0;
+ msg.mf = 0;
+ msg.tracking = 0;
+
+ res = lib->callbacks.port_send(req->ctx, &req->response_port,
+ &msg, sizeof(msg), NULL, 0);
+ if (nxt_slow_path(res != sizeof(msg))) {
+ nxt_unit_req_alert(req, "last message send failed: %s (%d)",
+ strerror(errno), errno);
+ }
+
+ nxt_unit_request_info_release(req);
+}
+
+
+static nxt_port_mmap_header_t *
+nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
+ nxt_unit_port_id_t *port_id, nxt_chunk_id_t *c, int n)
+{
+ int res, nchunks, i;
+ nxt_unit_mmap_t *mm, *mm_end;
+ nxt_port_mmap_header_t *hdr;
+
+ pthread_mutex_lock(&process->outgoing.mutex);
+
+ mm_end = process->outgoing.elts + process->outgoing.size;
+
+ for (mm = process->outgoing.elts; mm < mm_end; mm++) {
+ hdr = mm->hdr;
+
+ if (hdr->sent_over != 0xFFFFu && hdr->sent_over != port_id->id) {
+ continue;
+ }
+
+ *c = 0;
+
+ while (nxt_port_mmap_get_free_chunk(hdr->free_map, c)) {
+ nchunks = 1;
+
+ while (nchunks < n) {
+ res = nxt_port_mmap_chk_set_chunk_busy(hdr->free_map,
+ *c + nchunks);
+
+ if (res == 0) {
+ for (i = 0; i < nchunks; i++) {
+ nxt_port_mmap_set_chunk_free(hdr->free_map, *c + i);
+ }
+
+ *c += nchunks + 1;
+ nchunks = 0;
+ break;
+ }
+
+ nchunks++;
+ }
+
+ if (nchunks == n) {
+ goto unlock;
+ }
+ }
+ }
+
+ *c = 0;
+ hdr = nxt_unit_new_mmap(ctx, process, port_id, n);
+
+unlock:
+
+ pthread_mutex_unlock(&process->outgoing.mutex);
+
+ return hdr;
+}
+
+
+static nxt_unit_mmap_t *
+nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i)
+{
+ uint32_t cap;
+
+ cap = mmaps->cap;
+
+ if (cap == 0) {
+ cap = i + 1;
+ }
+
+ while (i + 1 > cap) {
+
+ if (cap < 16) {
+ cap = cap * 2;
+
+ } else {
+ cap = cap + cap / 2;
+ }
+ }
+
+ if (cap != mmaps->cap) {
+
+ mmaps->elts = realloc(mmaps->elts, cap * sizeof(*mmaps->elts));
+ if (nxt_slow_path(mmaps->elts == NULL)) {
+ return NULL;
+ }
+
+ memset(mmaps->elts + mmaps->cap, 0,
+ sizeof(*mmaps->elts) * (cap - mmaps->cap));
+
+ mmaps->cap = cap;
+ }
+
+ if (i + 1 > mmaps->size) {
+ mmaps->size = i + 1;
+ }
+
+ return mmaps->elts + i;
+}
+
+
+static nxt_port_mmap_header_t *
+nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
+ nxt_unit_port_id_t *port_id, int n)
+{
+ int i, fd, rc;
+ void *mem;
+ char name[64];
+ nxt_unit_mmap_t *mm;
+ nxt_unit_impl_t *lib;
+ nxt_port_mmap_header_t *hdr;
+
+ lib = process->lib;
+
+ mm = nxt_unit_mmap_at(&process->outgoing, process->outgoing.size);
+ if (nxt_slow_path(mm == NULL)) {
+ nxt_unit_warn(ctx, "failed to add mmap to outgoing array");
+
+ return NULL;
+ }
+
+ snprintf(name, sizeof(name), NXT_SHM_PREFIX "unit.%d.%p",
+ lib->pid, (void *) pthread_self());
+
+#if (NXT_HAVE_MEMFD_CREATE)
+
+ fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC);
+ if (nxt_slow_path(fd == -1)) {
+ nxt_unit_alert(ctx, "memfd_create(%s) failed: %s (%d)", name,
+ strerror(errno), errno);
+
+ goto remove_fail;
+ }
+
+ nxt_unit_debug(ctx, "memfd_create(%s): %d", name, fd);
+
+#elif (NXT_HAVE_SHM_OPEN_ANON)
+
+ fd = shm_open(SHM_ANON, O_RDWR, S_IRUSR | S_IWUSR);
+ if (nxt_slow_path(fd == -1)) {
+ nxt_unit_alert(ctx, "shm_open(SHM_ANON) failed: %s (%d)",
+ strerror(errno), errno);
+
+ goto remove_fail;
+ }
+
+#elif (NXT_HAVE_SHM_OPEN)
+
+ /* Just in case. */
+ shm_unlink(name);
+
+ fd = shm_open(name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR);
+ if (nxt_slow_path(fd == -1)) {
+ nxt_unit_alert(ctx, "shm_open(%s) failed: %s (%d)", name,
+ strerror(errno), errno);
+
+ goto remove_fail;
+ }
+
+ if (nxt_slow_path(shm_unlink(name) == -1)) {
+ nxt_unit_warn(ctx, "shm_unlink(%s) failed: %s (%d)", name,
+ strerror(errno), errno);
+ }
+
+#else
+
+#error No working shared memory implementation.
+
+#endif
+
+ if (nxt_slow_path(ftruncate(fd, PORT_MMAP_SIZE) == -1)) {
+ nxt_unit_alert(ctx, "ftruncate(%d) failed: %s (%d)", fd,
+ strerror(errno), errno);
+
+ goto remove_fail;
+ }
+
+ mem = mmap(NULL, PORT_MMAP_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
+ if (nxt_slow_path(mem == MAP_FAILED)) {
+ nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", fd,
+ strerror(errno), errno);
+
+ goto remove_fail;
+ }
+
+ mm->hdr = mem;
+ hdr = mem;
+
+ memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map));
+ memset(hdr->free_tracking_map, 0xFFU, sizeof(hdr->free_tracking_map));
+
+ hdr->id = process->outgoing.size - 1;
+ hdr->src_pid = lib->pid;
+ hdr->dst_pid = process->pid;
+ hdr->sent_over = port_id->id;
+
+ /* Mark first n chunk(s) as busy */
+ for (i = 0; i < n; i++) {
+ nxt_port_mmap_set_chunk_busy(hdr->free_map, i);
+ }
+
+ /* Mark as busy chunk followed the last available chunk. */
+ nxt_port_mmap_set_chunk_busy(hdr->free_map, PORT_MMAP_CHUNK_COUNT);
+ nxt_port_mmap_set_chunk_busy(hdr->free_tracking_map, PORT_MMAP_CHUNK_COUNT);
+
+ pthread_mutex_unlock(&process->outgoing.mutex);
+
+ rc = nxt_unit_send_mmap(ctx, port_id, fd);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ munmap(mem, PORT_MMAP_SIZE);
+ hdr = NULL;
+
+ } else {
+ nxt_unit_debug(ctx, "new mmap #%"PRIu32" created for %d -> %d",
+ hdr->id, (int) lib->pid, (int) process->pid);
+ }
+
+ close(fd);
+
+ pthread_mutex_lock(&process->outgoing.mutex);
+
+ if (nxt_fast_path(hdr != NULL)) {
+ return hdr;
+ }
+
+remove_fail:
+
+ process->outgoing.size--;
+
+ return NULL;
+}
+
+
+static int
+nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int fd)
+{
+ ssize_t res;
+ nxt_port_msg_t msg;
+ nxt_unit_impl_t *lib;
+ union {
+ struct cmsghdr cm;
+ char space[CMSG_SPACE(sizeof(int))];
+ } cmsg;
+
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+
+ msg.stream = 0;
+ msg.pid = lib->pid;
+ msg.reply_port = 0;
+ msg.type = _NXT_PORT_MSG_MMAP;
+ msg.last = 0;
+ msg.mmap = 0;
+ msg.nf = 0;
+ msg.mf = 0;
+ msg.tracking = 0;
+
+#if (NXT_VALGRIND)
+ memset(&cmsg, 0, sizeof(cmsg));
+#endif
+
+ cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int));
+ cmsg.cm.cmsg_level = SOL_SOCKET;
+ cmsg.cm.cmsg_type = SCM_RIGHTS;
+
+ /*
+ * memcpy() is used instead of simple
+ * *(int *) CMSG_DATA(&cmsg.cm) = fd;
+ * because GCC 4.4 with -O2/3/s optimization may issue a warning:
+ * dereferencing type-punned pointer will break strict-aliasing rules
+ *
+ * Fortunately, GCC with -O1 compiles this nxt_memcpy()
+ * in the same simple assignment as in the code above.
+ */
+ memcpy(CMSG_DATA(&cmsg.cm), &fd, sizeof(int));
+
+ res = lib->callbacks.port_send(ctx, port_id, &msg, sizeof(msg),
+ &cmsg, sizeof(cmsg));
+ if (nxt_slow_path(res != sizeof(msg))) {
+ nxt_unit_warn(ctx, "failed to send shm to %d: %s (%d)",
+ (int) port_id->pid, strerror(errno), errno);
+
+ return NXT_UNIT_ERROR;
+ }
+
+ return NXT_UNIT_OK;
+}
+
+
+static int
+nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
+ nxt_unit_port_id_t *port_id, uint32_t size,
+ nxt_unit_mmap_buf_t *mmap_buf)
+{
+ uint32_t nchunks;
+ nxt_chunk_id_t c;
+ nxt_port_mmap_header_t *hdr;
+
+ nchunks = (size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE;
+
+ hdr = nxt_unit_mmap_get(ctx, process, port_id, &c, nchunks);
+ if (nxt_slow_path(hdr == NULL)) {
+ return NXT_UNIT_ERROR;
+ }
+
+ mmap_buf->hdr = hdr;
+ mmap_buf->buf.start = (char *) nxt_port_mmap_chunk_start(hdr, c);
+ mmap_buf->buf.free = mmap_buf->buf.start;
+ mmap_buf->buf.end = mmap_buf->buf.start + nchunks * PORT_MMAP_CHUNK_SIZE;
+ mmap_buf->port_id = *port_id;
+
+ nxt_unit_debug(ctx, "outgoing mmap allocation: (%d,%d,%d)",
+ (int) hdr->id, (int) c,
+ (int) (nchunks * PORT_MMAP_CHUNK_SIZE));
+
+ return NXT_UNIT_OK;
+}
+
+
+static int
+nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd)
+{
+ int rc;
+ void *mem;
+ struct stat mmap_stat;
+ nxt_unit_mmap_t *mm;
+ nxt_unit_impl_t *lib;
+ nxt_unit_process_t *process;
+ nxt_port_mmap_header_t *hdr;
+
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+
+ nxt_unit_debug(ctx, "incoming_mmap: fd %d from process %d", fd, (int) pid);
+
+ pthread_mutex_lock(&lib->mutex);
+
+ process = nxt_unit_process_find(ctx, pid, 0);
+
+ pthread_mutex_unlock(&lib->mutex);
+
+ if (nxt_slow_path(process == NULL)) {
+ nxt_unit_warn(ctx, "incoming_mmap: process %d not found, fd %d",
+ (int) pid, fd);
+
+ return NXT_UNIT_ERROR;
+ }
+
+ rc = NXT_UNIT_ERROR;
+
+ if (fstat(fd, &mmap_stat) == -1) {
+ nxt_unit_warn(ctx, "incoming_mmap: fstat(%d) failed: %s (%d)", fd,
+ strerror(errno), errno);
+
+ goto fail;
+ }
+
+ mem = mmap(NULL, mmap_stat.st_size, PROT_READ | PROT_WRITE,
+ MAP_SHARED, fd, 0);
+ if (nxt_slow_path(mem == MAP_FAILED)) {
+ nxt_unit_warn(ctx, "incoming_mmap: mmap() failed: %s (%d)",
+ strerror(errno), errno);
+
+ goto fail;
+ }
+
+ hdr = mem;
+
+ if (nxt_slow_path(hdr->src_pid != pid || hdr->dst_pid != lib->pid)) {
+
+ nxt_unit_warn(ctx, "incoming_mmap: unexpected pid in mmap header "
+ "detected: %d != %d or %d != %d", (int) hdr->src_pid,
+ (int) pid, (int) hdr->dst_pid, (int) lib->pid);
+
+ munmap(mem, PORT_MMAP_SIZE);
+
+ goto fail;
+ }
+
+ pthread_mutex_lock(&process->incoming.mutex);
+
+ mm = nxt_unit_mmap_at(&process->incoming, hdr->id);
+ if (nxt_slow_path(mm == NULL)) {
+ nxt_unit_warn(ctx, "incoming_mmap: failed to add to incoming array");
+
+ munmap(mem, PORT_MMAP_SIZE);
+
+ } else {
+ mm->hdr = hdr;
+
+ hdr->sent_over = 0xFFFFu;
+
+ rc = NXT_UNIT_OK;
+ }
+
+ pthread_mutex_unlock(&process->incoming.mutex);
+
+fail:
+
+ nxt_unit_process_use(ctx, process, -1);
+
+ return rc;
+}
+
+
+static void
+nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps)
+{
+ pthread_mutex_init(&mmaps->mutex, NULL);
+
+ mmaps->size = 0;
+ mmaps->cap = 0;
+ mmaps->elts = NULL;
+}
+
+
+static void
+nxt_unit_process_use(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, int i)
+{
+ long c;
+
+ c = nxt_atomic_fetch_add(&process->use_count, i);
+
+ if (i < 0 && c == -i) {
+ nxt_unit_debug(ctx, "destroy process #%d", (int) process->pid);
+
+ nxt_unit_mmaps_destroy(&process->incoming);
+ nxt_unit_mmaps_destroy(&process->outgoing);
+
+ free(process);
+ }
+}
+
+
+static void
+nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps)
+{
+ nxt_unit_mmap_t *mm, *end;
+
+ if (mmaps->elts != NULL) {
+ end = mmaps->elts + mmaps->size;
+
+ for (mm = mmaps->elts; mm < end; mm++) {
+ munmap(mm->hdr, PORT_MMAP_SIZE);
+ }
+
+ free(mmaps->elts);
+ }
+
+ pthread_mutex_destroy(&mmaps->mutex);
+}
+
+
+static nxt_port_mmap_header_t *
+nxt_unit_get_incoming_mmap(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
+ uint32_t id)
+{
+ nxt_port_mmap_header_t *hdr;
+
+ if (nxt_fast_path(process->incoming.size > id)) {
+ hdr = process->incoming.elts[id].hdr;
+
+ } else {
+ hdr = NULL;
+ }
+
+ return hdr;
+}
+
+
+static int
+nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
+{
+ int rc;
+ nxt_chunk_id_t c;
+ nxt_unit_process_t *process;
+ nxt_port_mmap_header_t *hdr;
+ nxt_port_mmap_tracking_msg_t *tracking_msg;
+
+ if (recv_msg->size < (int) sizeof(nxt_port_mmap_tracking_msg_t)) {
+ nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: too small message (%d)",
+ recv_msg->port_msg.stream, (int) recv_msg->size);
+
+ return 0;
+ }
+
+ tracking_msg = recv_msg->start;
+
+ recv_msg->start = tracking_msg + 1;
+ recv_msg->size -= sizeof(nxt_port_mmap_tracking_msg_t);
+
+ process = nxt_unit_msg_get_process(ctx, recv_msg);
+ if (nxt_slow_path(process == NULL)) {
+ return 0;
+ }
+
+ pthread_mutex_lock(&process->incoming.mutex);
+
+ hdr = nxt_unit_get_incoming_mmap(ctx, process, tracking_msg->mmap_id);
+ if (nxt_slow_path(hdr == NULL)) {
+ pthread_mutex_unlock(&process->incoming.mutex);
+
+ nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: "
+ "invalid mmap id %d,%"PRIu32,
+ recv_msg->port_msg.stream,
+ (int) process->pid, tracking_msg->mmap_id);
+
+ return 0;
+ }
+
+ c = tracking_msg->tracking_id;
+ rc = nxt_atomic_cmp_set(hdr->tracking + c, recv_msg->port_msg.stream, 0);
+
+ if (rc == 0) {
+ nxt_unit_debug(ctx, "#%"PRIu32": tracking cancelled",
+ recv_msg->port_msg.stream);
+
+ nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c);
+ }
+
+ pthread_mutex_unlock(&process->incoming.mutex);
+
+ return rc;
+}
+
+
+static int
+nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg,
+ nxt_queue_t *incoming_buf)
+{
+ void *start;
+ uint32_t size;
+ nxt_unit_process_t *process;
+ nxt_unit_mmap_buf_t *b;
+ nxt_port_mmap_msg_t *mmap_msg, *end;
+ nxt_port_mmap_header_t *hdr;
+
+ if (nxt_slow_path(recv_msg->size < sizeof(nxt_port_mmap_msg_t))) {
+ nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: too small message (%d)",
+ recv_msg->port_msg.stream, (int) recv_msg->size);
+
+ return NXT_UNIT_ERROR;
+ }
+
+ process = nxt_unit_msg_get_process(ctx, recv_msg);
+ if (nxt_slow_path(process == NULL)) {
+ return NXT_UNIT_ERROR;
+ }
+
+ mmap_msg = recv_msg->start;
+ end = nxt_pointer_to(recv_msg->start, recv_msg->size);
+
+ pthread_mutex_lock(&process->incoming.mutex);
+
+ for (; mmap_msg < end; mmap_msg++) {
+ hdr = nxt_unit_get_incoming_mmap(ctx, process, mmap_msg->mmap_id);
+ if (nxt_slow_path(hdr == NULL)) {
+ pthread_mutex_unlock(&process->incoming.mutex);
+
+ nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: "
+ "invalid mmap id %d,%"PRIu32,
+ recv_msg->port_msg.stream,
+ (int) process->pid, mmap_msg->mmap_id);
+
+ return NXT_UNIT_ERROR;
+ }
+
+ start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id);
+ size = mmap_msg->size;
+
+ if (recv_msg->start == mmap_msg) {
+ nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: "
+ "move start %p -> %p",
+ recv_msg->port_msg.stream,
+ recv_msg->start, start);
+
+ recv_msg->start = start;
+ recv_msg->size = size;
+ }
+
+ b = nxt_unit_mmap_buf_get(ctx);
+ if (nxt_slow_path(b == NULL)) {
+ pthread_mutex_unlock(&process->incoming.mutex);
+
+ nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: "
+ "failed to allocate buf",
+ recv_msg->port_msg.stream);
+
+ nxt_unit_mmap_release(hdr, start, size);
+
+ return NXT_UNIT_ERROR;
+ }
+
+ nxt_queue_insert_tail(incoming_buf, &b->link);
+
+ b->buf.start = start;
+ b->buf.free = start;
+ b->buf.end = b->buf.start + size;
+ b->hdr = hdr;
+
+ nxt_unit_debug(ctx, "#%"PRIu32": mmap_read: [%p,%d] %d->%d,(%d,%d,%d)\n"
+ "%.*s",
+ recv_msg->port_msg.stream,
+ start, (int) size,
+ (int) hdr->src_pid, (int) hdr->dst_pid,
+ (int) hdr->id, (int) mmap_msg->chunk_id,
+ (int) mmap_msg->size,
+ (int) size, (char *) start);
+ }
+
+ pthread_mutex_unlock(&process->incoming.mutex);
+
+ return NXT_UNIT_OK;
+}
+
+
+static int
+nxt_unit_mmap_release(nxt_port_mmap_header_t *hdr, void *start, uint32_t size)
+{
+ u_char *p, *end;
+ nxt_chunk_id_t c;
+
+ memset(start, 0xA5, size);
+
+ p = start;
+ end = p + size;
+ c = nxt_port_mmap_chunk_id(hdr, p);
+
+ while (p < end) {
+ nxt_port_mmap_set_chunk_free(hdr->free_map, c);
+
+ p += PORT_MMAP_CHUNK_SIZE;
+ c++;
+ }
+
+ return NXT_UNIT_OK;
+}
+
+
+static nxt_int_t
+nxt_unit_lvlhsh_pid_test(nxt_lvlhsh_query_t *lhq, void *data)
+{
+ nxt_process_t *process;
+
+ process = data;
+
+ if (lhq->key.length == sizeof(pid_t)
+ && *(pid_t *) lhq->key.start == process->pid)
+ {
+ return NXT_OK;
+ }
+
+ return NXT_DECLINED;
+}
+
+
+static const nxt_lvlhsh_proto_t lvlhsh_processes_proto nxt_aligned(64) = {
+ NXT_LVLHSH_DEFAULT,
+ nxt_unit_lvlhsh_pid_test,
+ nxt_lvlhsh_alloc,
+ nxt_lvlhsh_free,
+};
+
+
+static inline void
+nxt_unit_process_lhq_pid(nxt_lvlhsh_query_t *lhq, pid_t *pid)
+{
+ lhq->key_hash = nxt_murmur_hash2(pid, sizeof(*pid));
+ lhq->key.length = sizeof(*pid);
+ lhq->key.start = (u_char *) pid;
+ lhq->proto = &lvlhsh_processes_proto;
+}
+
+
+static nxt_unit_process_t *
+nxt_unit_process_get(nxt_unit_ctx_t *ctx, pid_t pid)
+{
+ nxt_unit_impl_t *lib;
+ nxt_unit_process_t *process;
+ nxt_lvlhsh_query_t lhq;
+
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+
+ nxt_unit_process_lhq_pid(&lhq, &pid);
+
+ if (nxt_lvlhsh_find(&lib->processes, &lhq) == NXT_OK) {
+ process = lhq.value;
+ nxt_unit_process_use(ctx, process, 1);
+
+ return process;
+ }
+
+ process = malloc(sizeof(nxt_unit_process_t));
+ if (nxt_slow_path(process == NULL)) {
+ nxt_unit_warn(ctx, "failed to allocate process for #%d", (int) pid);
+
+ return NULL;
+ }
+
+ process->pid = pid;
+ process->use_count = 1;
+ process->next_port_id = 0;
+ process->lib = lib;
+
+ nxt_queue_init(&process->ports);
+
+ nxt_unit_mmaps_init(&process->incoming);
+ nxt_unit_mmaps_init(&process->outgoing);
+
+ lhq.replace = 0;
+ lhq.value = process;
+
+ switch (nxt_lvlhsh_insert(&lib->processes, &lhq)) {
+
+ case NXT_OK:
+ break;
+
+ default:
+ nxt_unit_warn(ctx, "process %d insert failed", (int) pid);
+
+ pthread_mutex_destroy(&process->outgoing.mutex);
+ pthread_mutex_destroy(&process->incoming.mutex);
+ free(process);
+ process = NULL;
+ break;
+ }
+
+ nxt_unit_process_use(ctx, process, 1);
+
+ return process;
+}
+
+
+static nxt_unit_process_t *
+nxt_unit_process_find(nxt_unit_ctx_t *ctx, pid_t pid, int remove)
+{
+ int rc;
+ nxt_unit_impl_t *lib;
+ nxt_unit_process_t *process;
+ nxt_lvlhsh_query_t lhq;
+
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+
+ nxt_unit_process_lhq_pid(&lhq, &pid);
+
+ if (remove) {
+ rc = nxt_lvlhsh_delete(&lib->processes, &lhq);
+
+ } else {
+ rc = nxt_lvlhsh_find(&lib->processes, &lhq);
+ }
+
+ if (rc == NXT_OK) {
+ process = lhq.value;
+
+ if (!remove) {
+ nxt_unit_process_use(ctx, process, 1);
+ }
+
+ return process;
+ }
+
+ return NULL;
+}
+
+
+static nxt_unit_process_t *
+nxt_unit_process_pop_first(nxt_unit_impl_t *lib)
+{
+ return nxt_lvlhsh_retrieve(&lib->processes, &lvlhsh_processes_proto, NULL);
+}
+
+
+int
+nxt_unit_run(nxt_unit_ctx_t *ctx)
+{
+ int rc;
+ nxt_unit_impl_t *lib;
+
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+ rc = NXT_UNIT_OK;
+
+ while (nxt_fast_path(lib->online)) {
+ rc = nxt_unit_run_once(ctx);
+ }
+
+ return rc;
+}
+
+
+static int
+nxt_unit_run_once(nxt_unit_ctx_t *ctx)
+{
+ int rc;
+ char buf[4096];
+ char oob[256];
+ ssize_t rsize;
+ nxt_unit_impl_t *lib;
+ nxt_unit_ctx_impl_t *ctx_impl;
+
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+ ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
+
+ memset(oob, 0, sizeof(struct cmsghdr));
+
+ if (ctx_impl->read_port_fd != -1) {
+ rsize = nxt_unit_port_recv(ctx, ctx_impl->read_port_fd,
+ buf, sizeof(buf),
+ oob, sizeof(oob));
+ } else {
+ rsize = lib->callbacks.port_recv(ctx, &ctx_impl->read_port_id,
+ buf, sizeof(buf),
+ oob, sizeof(oob));
+ }
+
+ if (nxt_fast_path(rsize > 0)) {
+ rc = nxt_unit_process_msg(ctx, &ctx_impl->read_port_id, buf, rsize,
+ oob, sizeof(oob));
+ } else {
+ rc = NXT_UNIT_ERROR;
+ }
+
+ return rc;
+}
+
+
+void
+nxt_unit_done(nxt_unit_ctx_t *ctx)
+{
+ nxt_unit_impl_t *lib;
+ nxt_unit_process_t *process;
+ nxt_unit_ctx_impl_t *ctx_impl;
+
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+
+ nxt_queue_each(ctx_impl, &lib->contexts, nxt_unit_ctx_impl_t, link) {
+
+ nxt_unit_ctx_free(&ctx_impl->ctx);
+
+ } nxt_queue_loop;
+
+ for ( ;; ) {
+ process = nxt_unit_process_pop_first(lib);
+ if (process == NULL) {
+ break;
+ }
+
+ nxt_unit_remove_process(ctx, process);
+ }
+
+ free(lib);
+}
+
+
+nxt_unit_ctx_t *
+nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data)
+{
+ int rc, fd;
+ nxt_unit_impl_t *lib;
+ nxt_unit_port_id_t new_port_id;
+ nxt_unit_ctx_impl_t *new_ctx;
+
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+
+ new_ctx = malloc(sizeof(nxt_unit_ctx_impl_t) + lib->request_data_size);
+ if (nxt_slow_path(new_ctx == NULL)) {
+ nxt_unit_warn(ctx, "failed to allocate context");
+
+ return NULL;
+ }
+
+ rc = nxt_unit_create_port(ctx, &new_port_id, &fd);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ free(new_ctx);
+
+ return NULL;
+ }
+
+ rc = nxt_unit_send_port(ctx, &lib->ready_port_id, &new_port_id, fd);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ lib->callbacks.remove_port(ctx, &new_port_id);
+
+ close(fd);
+
+ free(new_ctx);
+
+ return NULL;
+ }
+
+ close(fd);
+
+ nxt_unit_ctx_init(lib, new_ctx, data);
+
+ new_ctx->read_port_id = new_port_id;
+
+ return &new_ctx->ctx;
+}
+
+
+void
+nxt_unit_ctx_free(nxt_unit_ctx_t *ctx)
+{
+ nxt_unit_impl_t *lib;
+ nxt_unit_ctx_impl_t *ctx_impl;
+ nxt_unit_mmap_buf_t *mmap_buf;
+ nxt_unit_request_info_impl_t *req_impl;
+
+ ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+
+ nxt_queue_each(req_impl, &ctx_impl->active_req,
+ nxt_unit_request_info_impl_t, link)
+ {
+ nxt_unit_req_warn(&req_impl->req, "active request on ctx free");
+
+ nxt_unit_request_done(&req_impl->req, NXT_UNIT_ERROR);
+
+ } nxt_queue_loop;
+
+ nxt_queue_remove(&ctx_impl->ctx_buf[0].link);
+ nxt_queue_remove(&ctx_impl->ctx_buf[1].link);
+
+ nxt_queue_each(mmap_buf, &ctx_impl->free_buf, nxt_unit_mmap_buf_t, link) {
+
+ nxt_queue_remove(&mmap_buf->link);
+ free(mmap_buf);
+
+ } nxt_queue_loop;
+
+ nxt_queue_each(req_impl, &ctx_impl->free_req,
+ nxt_unit_request_info_impl_t, link)
+ {
+ nxt_unit_request_info_free(req_impl);
+
+ } nxt_queue_loop;
+
+ nxt_queue_remove(&ctx_impl->link);
+
+ if (ctx_impl != &lib->main_ctx) {
+ free(ctx_impl);
+ }
+}
+
+
+/* SOCK_SEQPACKET is disabled to test SOCK_DGRAM on all platforms. */
+#if (0 || NXT_HAVE_AF_UNIX_SOCK_SEQPACKET)
+#define NXT_UNIX_SOCKET SOCK_SEQPACKET
+#else
+#define NXT_UNIX_SOCKET SOCK_DGRAM
+#endif
+
+
+void
+nxt_unit_port_id_init(nxt_unit_port_id_t *port_id, pid_t pid, uint16_t id)
+{
+ nxt_unit_port_hash_id_t port_hash_id;
+
+ port_hash_id.pid = pid;
+ port_hash_id.id = id;
+
+ port_id->pid = pid;
+ port_id->hash = nxt_murmur_hash2(&port_hash_id, sizeof(port_hash_id));
+ port_id->id = id;
+}
+
+
+int
+nxt_unit_create_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *dst,
+ nxt_unit_port_id_t *port_id)
+{
+ int rc, fd;
+ nxt_unit_impl_t *lib;
+ nxt_unit_port_id_t new_port_id;
+
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+
+ rc = nxt_unit_create_port(ctx, &new_port_id, &fd);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ return rc;
+ }
+
+ rc = nxt_unit_send_port(ctx, dst, &new_port_id, fd);
+
+ if (nxt_fast_path(rc == NXT_UNIT_OK)) {
+ *port_id = new_port_id;
+
+ } else {
+ lib->callbacks.remove_port(ctx, &new_port_id);
+ }
+
+ close(fd);
+
+ return rc;
+}
+
+
+static int
+nxt_unit_create_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int *fd)
+{
+ int rc, port_sockets[2];
+ nxt_unit_impl_t *lib;
+ nxt_unit_port_t new_port;
+ nxt_unit_process_t *process;
+
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+
+ rc = socketpair(AF_UNIX, NXT_UNIX_SOCKET, 0, port_sockets);
+ if (nxt_slow_path(rc != 0)) {
+ nxt_unit_warn(ctx, "create_port: socketpair() failed: %s (%d)",
+ strerror(errno), errno);
+
+ return NXT_UNIT_ERROR;
+ }
+
+ nxt_unit_debug(ctx, "create_port: new socketpair: %d->%d",
+ port_sockets[0], port_sockets[1]);
+
+ pthread_mutex_lock(&lib->mutex);
+
+ process = nxt_unit_process_get(ctx, lib->pid);
+ if (nxt_slow_path(process == NULL)) {
+ pthread_mutex_unlock(&lib->mutex);
+
+ close(port_sockets[0]);
+ close(port_sockets[1]);
+
+ return NXT_UNIT_ERROR;
+ }
+
+ nxt_unit_port_id_init(&new_port.id, lib->pid, process->next_port_id++);
+
+ new_port.in_fd = port_sockets[0];
+ new_port.out_fd = -1;
+ new_port.data = NULL;
+
+ pthread_mutex_unlock(&lib->mutex);
+
+ nxt_unit_process_use(ctx, process, -1);
+
+ rc = lib->callbacks.add_port(ctx, &new_port);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ nxt_unit_warn(ctx, "create_port: add_port() failed");
+
+ close(port_sockets[0]);
+ close(port_sockets[1]);
+
+ return rc;
+ }
+
+ *port_id = new_port.id;
+ *fd = port_sockets[1];
+
+ return rc;
+}
+
+
+static int
+nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *dst,
+ nxt_unit_port_id_t *new_port, int fd)
+{
+ ssize_t res;
+ nxt_unit_impl_t *lib;
+
+ struct {
+ nxt_port_msg_t msg;
+ nxt_port_msg_new_port_t new_port;
+ } m;
+
+ union {
+ struct cmsghdr cm;
+ char space[CMSG_SPACE(sizeof(int))];
+ } cmsg;
+
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+
+ m.msg.stream = 0;
+ m.msg.pid = lib->pid;
+ m.msg.reply_port = 0;
+ m.msg.type = _NXT_PORT_MSG_NEW_PORT;
+ m.msg.last = 0;
+ m.msg.mmap = 0;
+ m.msg.nf = 0;
+ m.msg.mf = 0;
+ m.msg.tracking = 0;
+
+ m.new_port.id = new_port->id;
+ m.new_port.pid = new_port->pid;
+ m.new_port.type = NXT_PROCESS_WORKER;
+ m.new_port.max_size = 16 * 1024;
+ m.new_port.max_share = 64 * 1024;
+
+#if (NXT_VALGRIND)
+ memset(&cmsg, 0, sizeof(cmsg));
+#endif
+
+ cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int));
+ cmsg.cm.cmsg_level = SOL_SOCKET;
+ cmsg.cm.cmsg_type = SCM_RIGHTS;
+
+ /*
+ * memcpy() is used instead of simple
+ * *(int *) CMSG_DATA(&cmsg.cm) = fd;
+ * because GCC 4.4 with -O2/3/s optimization may issue a warning:
+ * dereferencing type-punned pointer will break strict-aliasing rules
+ *
+ * Fortunately, GCC with -O1 compiles this nxt_memcpy()
+ * in the same simple assignment as in the code above.
+ */
+ memcpy(CMSG_DATA(&cmsg.cm), &fd, sizeof(int));
+
+ res = lib->callbacks.port_send(ctx, dst, &m, sizeof(m),
+ &cmsg, sizeof(cmsg));
+
+ return res == sizeof(m) ? NXT_UNIT_OK : NXT_UNIT_ERROR;
+}
+
+
+int
+nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
+{
+ int rc;
+ nxt_unit_impl_t *lib;
+ nxt_unit_process_t *process;
+ nxt_unit_port_impl_t *new_port;
+
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+
+ nxt_unit_debug(ctx, "add_port: %d,%d in_fd %d out_fd %d",
+ port->id.pid, port->id.id,
+ port->in_fd, port->out_fd);
+
+ pthread_mutex_lock(&lib->mutex);
+
+ process = nxt_unit_process_get(ctx, port->id.pid);
+ if (nxt_slow_path(process == NULL)) {
+ rc = NXT_UNIT_ERROR;
+ goto unlock;
+ }
+
+ if (port->id.id >= process->next_port_id) {
+ process->next_port_id = port->id.id + 1;
+ }
+
+ new_port = malloc(sizeof(nxt_unit_port_impl_t));
+ if (nxt_slow_path(new_port == NULL)) {
+ rc = NXT_UNIT_ERROR;
+ goto unlock;
+ }
+
+ new_port->port = *port;
+
+ rc = nxt_unit_port_hash_add(&lib->ports, &new_port->port);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ goto unlock;
+ }
+
+ nxt_queue_insert_tail(&process->ports, &new_port->link);
+
+ rc = NXT_UNIT_OK;
+
+ new_port->process = process;
+
+unlock:
+
+ pthread_mutex_unlock(&lib->mutex);
+
+ if (nxt_slow_path(process != NULL && rc != NXT_UNIT_OK)) {
+ nxt_unit_process_use(ctx, process, -1);
+ }
+
+ return rc;
+}
+
+
+void
+nxt_unit_remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id)
+{
+ nxt_unit_find_remove_port(ctx, port_id, NULL);
+}
+
+
+void
+nxt_unit_find_remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
+ nxt_unit_port_t *r_port)
+{
+ nxt_unit_impl_t *lib;
+ nxt_unit_process_t *process;
+
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+
+ pthread_mutex_lock(&lib->mutex);
+
+ process = NULL;
+
+ nxt_unit_remove_port_unsafe(ctx, port_id, r_port, &process);
+
+ pthread_mutex_unlock(&lib->mutex);
+
+ if (nxt_slow_path(process != NULL)) {
+ nxt_unit_process_use(ctx, process, -1);
+ }
+}
+
+
+static void
+nxt_unit_remove_port_unsafe(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
+ nxt_unit_port_t *r_port, nxt_unit_process_t **process)
+{
+ nxt_unit_impl_t *lib;
+ nxt_unit_port_impl_t *port;
+
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+
+ port = nxt_unit_port_hash_find(&lib->ports, port_id, 1);
+ if (nxt_slow_path(port == NULL)) {
+ nxt_unit_debug(ctx, "remove_port: port %d,%d not found",
+ (int) port_id->pid, (int) port_id->id);
+
+ return;
+ }
+
+ nxt_unit_debug(ctx, "remove_port: port %d,%d, fds %d,%d, data %p",
+ (int) port_id->pid, (int) port_id->id,
+ port->port.in_fd, port->port.out_fd, port->port.data);
+
+ if (port->port.in_fd != -1) {
+ close(port->port.in_fd);
+ }
+
+ if (port->port.out_fd != -1) {
+ close(port->port.out_fd);
+ }
+
+ if (port->process != NULL) {
+ nxt_queue_remove(&port->link);
+ }
+
+ if (process != NULL) {
+ *process = port->process;
+ }
+
+ if (r_port != NULL) {
+ *r_port = port->port;
+ }
+
+ free(port);
+}
+
+
+void
+nxt_unit_remove_pid(nxt_unit_ctx_t *ctx, pid_t pid)
+{
+ nxt_unit_impl_t *lib;
+ nxt_unit_process_t *process;
+
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+
+ pthread_mutex_lock(&lib->mutex);
+
+ process = nxt_unit_process_find(ctx, pid, 1);
+ if (nxt_slow_path(process == NULL)) {
+ nxt_unit_debug(ctx, "remove_pid: process %d not found", (int) pid);
+
+ pthread_mutex_unlock(&lib->mutex);
+
+ return;
+ }
+
+ nxt_unit_remove_process(ctx, process);
+}
+
+
+static void
+nxt_unit_remove_process(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process)
+{
+ nxt_queue_t ports;
+ nxt_unit_impl_t *lib;
+ nxt_unit_port_impl_t *port;
+
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+
+ nxt_queue_init(&ports);
+
+ nxt_queue_add(&ports, &process->ports);
+
+ nxt_queue_each(port, &ports, nxt_unit_port_impl_t, link) {
+
+ nxt_unit_process_use(ctx, process, -1);
+ port->process = NULL;
+
+ /* Shortcut for default callback. */
+ if (lib->callbacks.remove_port == nxt_unit_remove_port) {
+ nxt_queue_remove(&port->link);
+
+ nxt_unit_remove_port_unsafe(ctx, &port->port.id, NULL, NULL);
+ }
+
+ } nxt_queue_loop;
+
+ pthread_mutex_unlock(&lib->mutex);
+
+ nxt_queue_each(port, &ports, nxt_unit_port_impl_t, link) {
+
+ nxt_queue_remove(&port->link);
+
+ lib->callbacks.remove_port(ctx, &port->port.id);
+
+ } nxt_queue_loop;
+
+ nxt_unit_process_use(ctx, process, -1);
+}
+
+
+void
+nxt_unit_quit(nxt_unit_ctx_t *ctx)
+{
+ nxt_unit_impl_t *lib;
+
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+
+ lib->online = 0;
+}
+
+
+static ssize_t
+nxt_unit_port_send_default(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
+ const void *buf, size_t buf_size, const void *oob, size_t oob_size)
+{
+ int fd;
+ nxt_unit_impl_t *lib;
+ nxt_unit_port_impl_t *port;
+
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+
+ pthread_mutex_lock(&lib->mutex);
+
+ port = nxt_unit_port_hash_find(&lib->ports, port_id, 0);
+
+ if (nxt_fast_path(port != NULL)) {
+ fd = port->port.out_fd;
+
+ } else {
+ nxt_unit_warn(ctx, "port_send: port %d,%d not found",
+ (int) port_id->pid, (int) port_id->id);
+ fd = -1;
+ }
+
+ pthread_mutex_unlock(&lib->mutex);
+
+ if (nxt_slow_path(fd == -1)) {
+ if (port != NULL) {
+ nxt_unit_warn(ctx, "port_send: port %d,%d: fd == -1",
+ (int) port_id->pid, (int) port_id->id);
+ }
+
+ return -1;
+ }
+
+ nxt_unit_debug(ctx, "port_send: found port %d,%d fd %d",
+ (int) port_id->pid, (int) port_id->id, fd);
+
+ return nxt_unit_port_send(ctx, fd, buf, buf_size, oob, oob_size);
+}
+
+
+ssize_t
+nxt_unit_port_send(nxt_unit_ctx_t *ctx, int fd,
+ const void *buf, size_t buf_size, const void *oob, size_t oob_size)
+{
+ ssize_t res;
+ struct iovec iov[1];
+ struct msghdr msg;
+
+ iov[0].iov_base = (void *) buf;
+ iov[0].iov_len = buf_size;
+
+ msg.msg_name = NULL;
+ msg.msg_namelen = 0;
+ msg.msg_iov = iov;
+ msg.msg_iovlen = 1;
+ msg.msg_flags = 0;
+ msg.msg_control = (void *) oob;
+ msg.msg_controllen = oob_size;
+
+ res = sendmsg(fd, &msg, 0);
+
+ if (nxt_slow_path(res == -1)) {
+ nxt_unit_warn(ctx, "port_send(%d, %d) failed: %s (%d)",
+ fd, (int) buf_size, strerror(errno), errno);
+
+ } else {
+ nxt_unit_debug(ctx, "port_send(%d, %d): %d", fd, (int) buf_size,
+ (int) res);
+ }
+
+ return res;
+}
+
+
+static ssize_t
+nxt_unit_port_recv_default(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
+ void *buf, size_t buf_size, void *oob, size_t oob_size)
+{
+ int fd;
+ nxt_unit_impl_t *lib;
+ nxt_unit_ctx_impl_t *ctx_impl;
+ nxt_unit_port_impl_t *port;
+
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+
+ pthread_mutex_lock(&lib->mutex);
+
+ port = nxt_unit_port_hash_find(&lib->ports, port_id, 0);
+
+ if (nxt_fast_path(port != NULL)) {
+ fd = port->port.in_fd;
+
+ } else {
+ nxt_unit_debug(ctx, "port_recv: port %d,%d not found",
+ (int) port_id->pid, (int) port_id->id);
+ fd = -1;
+ }
+
+ pthread_mutex_unlock(&lib->mutex);
+
+ if (nxt_slow_path(fd == -1)) {
+ return -1;
+ }
+
+ nxt_unit_debug(ctx, "port_recv: found port %d,%d, fd %d",
+ (int) port_id->pid, (int) port_id->id, fd);
+
+ ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
+
+ if (nxt_fast_path(port_id == &ctx_impl->read_port_id)) {
+ ctx_impl->read_port_fd = fd;
+ }
+
+ return nxt_unit_port_recv(ctx, fd, buf, buf_size, oob, oob_size);
+}
+
+
+ssize_t
+nxt_unit_port_recv(nxt_unit_ctx_t *ctx, int fd, void *buf, size_t buf_size,
+ void *oob, size_t oob_size)
+{
+ ssize_t res;
+ struct iovec iov[1];
+ struct msghdr msg;
+
+ iov[0].iov_base = buf;
+ iov[0].iov_len = buf_size;
+
+ msg.msg_name = NULL;
+ msg.msg_namelen = 0;
+ msg.msg_iov = iov;
+ msg.msg_iovlen = 1;
+ msg.msg_flags = 0;
+ msg.msg_control = oob;
+ msg.msg_controllen = oob_size;
+
+ res = recvmsg(fd, &msg, 0);
+
+ if (nxt_slow_path(res == -1)) {
+ nxt_unit_warn(ctx, "port_recv(%d) failed: %s (%d)",
+ fd, strerror(errno), errno);
+
+ } else {
+ nxt_unit_debug(ctx, "port_recv(%d): %d", fd, (int) res);
+ }
+
+ return res;
+}
+
+
+static nxt_int_t
+nxt_unit_port_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
+{
+ nxt_unit_port_t *port;
+ nxt_unit_port_hash_id_t *port_id;
+
+ port = data;
+ port_id = (nxt_unit_port_hash_id_t *) lhq->key.start;
+
+ if (lhq->key.length == sizeof(nxt_unit_port_hash_id_t)
+ && port_id->pid == port->id.pid
+ && port_id->id == port->id.id)
+ {
+ return NXT_OK;
+ }
+
+ return NXT_DECLINED;
+}
+
+
+static const nxt_lvlhsh_proto_t lvlhsh_ports_proto nxt_aligned(64) = {
+ NXT_LVLHSH_DEFAULT,
+ nxt_unit_port_hash_test,
+ nxt_lvlhsh_alloc,
+ nxt_lvlhsh_free,
+};
+
+
+static inline void
+nxt_unit_port_hash_lhq(nxt_lvlhsh_query_t *lhq,
+ nxt_unit_port_hash_id_t *port_hash_id,
+ nxt_unit_port_id_t *port_id)
+{
+ port_hash_id->pid = port_id->pid;
+ port_hash_id->id = port_id->id;
+
+ if (nxt_fast_path(port_id->hash != 0)) {
+ lhq->key_hash = port_id->hash;
+
+ } else {
+ lhq->key_hash = nxt_murmur_hash2(port_hash_id, sizeof(*port_hash_id));
+
+ port_id->hash = lhq->key_hash;
+
+ nxt_unit_debug(NULL, "calculate hash for port_id (%d, %d): %04X",
+ (int) port_id->pid, (int) port_id->id,
+ (int) port_id->hash);
+ }
+
+ lhq->key.length = sizeof(nxt_unit_port_hash_id_t);
+ lhq->key.start = (u_char *) port_hash_id;
+ lhq->proto = &lvlhsh_ports_proto;
+ lhq->pool = NULL;
+}
+
+
+static int
+nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash, nxt_unit_port_t *port)
+{
+ nxt_int_t res;
+ nxt_lvlhsh_query_t lhq;
+ nxt_unit_port_hash_id_t port_hash_id;
+
+ nxt_unit_port_hash_lhq(&lhq, &port_hash_id, &port->id);
+ lhq.replace = 0;
+ lhq.value = port;
+
+ res = nxt_lvlhsh_insert(port_hash, &lhq);
+
+ switch (res) {
+
+ case NXT_OK:
+ return NXT_UNIT_OK;
+
+ default:
+ return NXT_UNIT_ERROR;
+ }
+}
+
+
+static nxt_unit_port_impl_t *
+nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash, nxt_unit_port_id_t *port_id,
+ int remove)
+{
+ nxt_int_t res;
+ nxt_lvlhsh_query_t lhq;
+ nxt_unit_port_hash_id_t port_hash_id;
+
+ nxt_unit_port_hash_lhq(&lhq, &port_hash_id, port_id);
+
+ if (remove) {
+ res = nxt_lvlhsh_delete(port_hash, &lhq);
+
+ } else {
+ res = nxt_lvlhsh_find(port_hash, &lhq);
+ }
+
+ switch (res) {
+
+ case NXT_OK:
+ return lhq.value;
+
+ default:
+ return NULL;
+ }
+}
+
+
+void
+nxt_unit_log(nxt_unit_ctx_t *ctx, int level, const char *fmt, ...)
+{
+ int log_fd, n;
+ char msg[NXT_MAX_ERROR_STR], *p, *end;
+ pid_t pid;
+ va_list ap;
+ nxt_unit_impl_t *lib;
+
+ if (nxt_fast_path(ctx != NULL)) {
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+
+ pid = lib->pid;
+ log_fd = lib->log_fd;
+
+ } else {
+ pid = getpid();
+ log_fd = STDERR_FILENO;
+ }
+
+ p = msg;
+ end = p + sizeof(msg) - 1;
+
+ p = nxt_unit_snprint_prefix(p, end, pid, level);
+
+ va_start(ap, fmt);
+ p += vsnprintf(p, end - p, fmt, ap);
+ va_end(ap);
+
+ if (nxt_slow_path(p > end)) {
+ memcpy(end - 5, "[...]", 5);
+ p = end;
+ }
+
+ *p++ = '\n';
+
+ n = write(log_fd, msg, p - msg);
+ if (nxt_slow_path(n < 0)) {
+ fprintf(stderr, "Failed to write log: %.*s", (int) (p - msg), msg);
+ }
+}
+
+
+void
+nxt_unit_req_log(nxt_unit_request_info_t *req, int level, const char *fmt, ...)
+{
+ int log_fd, n;
+ char msg[NXT_MAX_ERROR_STR], *p, *end;
+ pid_t pid;
+ va_list ap;
+ nxt_unit_impl_t *lib;
+ nxt_unit_request_info_impl_t *req_impl;
+
+ if (nxt_fast_path(req != NULL)) {
+ lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit);
+
+ pid = lib->pid;
+ log_fd = lib->log_fd;
+
+ } else {
+ pid = getpid();
+ log_fd = STDERR_FILENO;
+ }
+
+ p = msg;
+ end = p + sizeof(msg) - 1;
+
+ p = nxt_unit_snprint_prefix(p, end, pid, level);
+
+ if (nxt_fast_path(req != NULL)) {
+ req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
+
+ p += snprintf(p, end - p,
+ "#%"PRIu32": ", req_impl->recv_msg.port_msg.stream);
+ }
+
+ va_start(ap, fmt);
+ p += vsnprintf(p, end - p, fmt, ap);
+ va_end(ap);
+
+ if (nxt_slow_path(p > end)) {
+ memcpy(end - 5, "[...]", 5);
+ p = end;
+ }
+
+ *p++ = '\n';
+
+ n = write(log_fd, msg, p - msg);
+ if (nxt_slow_path(n < 0)) {
+ fprintf(stderr, "Failed to write log: %.*s", (int) (p - msg), msg);
+ }
+}
+
+
+static const char * nxt_unit_log_levels[] = {
+ "alert",
+ "error",
+ "warn",
+ "notice",
+ "info",
+ "debug",
+};
+
+
+static char *
+nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level)
+{
+ struct tm tm;
+ struct timespec ts;
+
+ (void) clock_gettime(CLOCK_REALTIME, &ts);
+
+#if (NXT_HAVE_LOCALTIME_R)
+ (void) localtime_r(&ts.tv_sec, &tm);
+#else
+ tm = *localtime(&ts.tv_sec);
+#endif
+
+ p += snprintf(p, end - p,
+ "%4d/%02d/%02d %02d:%02d:%02d.%03d ",
+ tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday,
+ tm.tm_hour, tm.tm_min, tm.tm_sec,
+ (int) ts.tv_nsec / 1000000);
+
+ p += snprintf(p, end - p,
+ "[%s] %d#%"PRIu64" [unit] ", nxt_unit_log_levels[level],
+ (int) pid,
+ (uint64_t) (uintptr_t) nxt_thread_get_tid());
+
+ return p;
+}
+
+
+/* The function required by nxt_lvlhsh_alloc() and nxt_lvlvhsh_free(). */
+
+void *
+nxt_memalign(size_t alignment, size_t size)
+{
+ void *p;
+ nxt_err_t err;
+
+ err = posix_memalign(&p, alignment, size);
+
+ if (nxt_fast_path(err == 0)) {
+ return p;
+ }
+
+ return NULL;
+}
+
+#if (NXT_DEBUG)
+
+void
+nxt_free(void *p)
+{
+ free(p);
+}
+
+#endif