/*
* Copyright (C) NGINX, Inc.
*/
#include <stdlib.h>
#include "nxt_main.h"
#include "nxt_port_memory_int.h"
#include "nxt_unit.h"
#include "nxt_unit_request.h"
#include "nxt_unit_response.h"
#include "nxt_unit_websocket.h"
#include "nxt_websocket.h"
#if (NXT_HAVE_MEMFD_CREATE)
#include <linux/memfd.h>
#endif
#define NXT_UNIT_MAX_PLAIN_SIZE 1024
#define NXT_UNIT_LOCAL_BUF_SIZE \
(NXT_UNIT_MAX_PLAIN_SIZE + sizeof(nxt_port_msg_t))
typedef struct nxt_unit_impl_s nxt_unit_impl_t;
typedef struct nxt_unit_mmap_s nxt_unit_mmap_t;
typedef struct nxt_unit_mmaps_s nxt_unit_mmaps_t;
typedef struct nxt_unit_process_s nxt_unit_process_t;
typedef struct nxt_unit_mmap_buf_s nxt_unit_mmap_buf_t;
typedef struct nxt_unit_recv_msg_s nxt_unit_recv_msg_t;
typedef struct nxt_unit_read_buf_s nxt_unit_read_buf_t;
typedef struct nxt_unit_ctx_impl_s nxt_unit_ctx_impl_t;
typedef struct nxt_unit_port_impl_s nxt_unit_port_impl_t;
typedef struct nxt_unit_request_info_impl_s nxt_unit_request_info_impl_t;
typedef struct nxt_unit_websocket_frame_impl_s nxt_unit_websocket_frame_impl_t;
static nxt_unit_impl_t *nxt_unit_create(nxt_unit_init_t *init);
static int nxt_unit_ctx_init(nxt_unit_impl_t *lib,
nxt_unit_ctx_impl_t *ctx_impl, void *data);
nxt_inline void nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
nxt_unit_mmap_buf_t *mmap_buf);
nxt_inline void nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev,
nxt_unit_mmap_buf_t *mmap_buf);
nxt_inline void nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf);
static int nxt_unit_read_env(nxt_unit_port_t *ready_port,
nxt_unit_port_t *read_port, int *log_fd, uint32_t *stream,
uint32_t *shm_limit);
static int nxt_unit_ready(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
uint32_t stream);
static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx,
nxt_unit_recv_msg_t *recv_msg);
static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx,
nxt_unit_recv_msg_t *recv_msg);
static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx,
nxt_unit_recv_msg_t *recv_msg);
static int nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx);
static nxt_unit_request_info_impl_t *nxt_unit_request_info_get(
nxt_unit_ctx_t *ctx);
static void nxt_unit_request_info_release(nxt_unit_request_info_t *req);
static void nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req);
static nxt_unit_websocket_frame_impl_t *nxt_unit_websocket_frame_get(
nxt_unit_ctx_t *ctx);
static void nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws);
static void nxt_unit_websocket_frame_free(nxt_unit_websocket_frame_impl_t *ws);
static nxt_unit_process_t *nxt_unit_msg_get_process(nxt_unit_ctx_t *ctx,
nxt_unit_recv_msg_t *recv_msg);
static nxt_unit_mmap_buf_t *nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx);
static void nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf);
static int nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream,
nxt_unit_mmap_buf_t *mmap_buf, int last);
static void nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf);
static void nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf);
static nxt_unit_read_buf_t *nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx);
static nxt_unit_read_buf_t *nxt_unit_read_buf_get_impl(
nxt_unit_ctx_impl_t *ctx_impl);
static void nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx,
nxt_unit_read_buf_t *rbuf);
static ssize_t nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst,
size_t size);
static nxt_port_mmap_header_t *nxt_unit_mmap_get(nxt_unit_ctx_t *ctx,
nxt_unit_process_t *process, nxt_unit_port_id_t *port_id,
nxt_chunk_id_t *c, int *n, int min_n);
static int nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id);
static int nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx);
static nxt_unit_mmap_t *nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i);
static nxt_port_mmap_header_t *nxt_unit_new_mmap(nxt_unit_ctx_t *ctx,
nxt_unit_process_t *process, nxt_unit_port_id_t *port_id, int n);
static int nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
int fd);
static int nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx,
nxt_unit_process_t *process, nxt_unit_port_id_t *port_id, uint32_t size,
uint32_t min_size, nxt_unit_mmap_buf_t *mmap_buf, char *local_buf);
static int nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd);
static void nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps);
static void nxt_unit_process_use(nxt_unit_ctx_t *ctx,
nxt_unit_process_t *process, int i);
static void nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps);
static nxt_port_mmap_header_t *nxt_unit_get_incoming_mmap(nxt_unit_ctx_t *ctx,
nxt_unit_process_t *process, uint32_t id);
static int nxt_unit_tracking_read(nxt_unit_ctx_t *ctx,
nxt_unit_recv_msg_t *recv_msg);
static int nxt_unit_mmap_read(nxt_unit_ctx_t *ctx,
nxt_unit_recv_msg_t *recv_msg);
static void nxt_unit_mmap_release(nxt_unit_ctx_t *ctx,
nxt_unit_process_t *process,
nxt_port_mmap_header_t *hdr, void *start, uint32_t size);
static int nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid);
static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_ctx_t *ctx,
pid_t pid);
static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_ctx_t *ctx,
pid_t pid, int remove);
static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib);
static void nxt_unit_read_buf(nxt_unit_ctx_t *ctx,
nxt_unit_read_buf_t *rbuf);
static int nxt_unit_create_port(nxt_unit_ctx_t *ctx,
nxt_unit_port_id_t *port_id, int *fd);
static int nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *dst,
nxt_unit_port_id_t *new_port, int fd);
static void nxt_unit_remove_port_unsafe(nxt_unit_ctx_t *ctx,
nxt_unit_port_id_t *port_id, nxt_unit_port_t *r_port,
nxt_unit_process_t **process);
static void nxt_unit_remove_process(nxt_unit_ctx_t *ctx,
nxt_unit_process_t *process);
static ssize_t nxt_unit_port_send_default(nxt_unit_ctx_t *ctx,
nxt_unit_port_id_t *port_id, const void *buf, size_t buf_size,
const void *oob, size_t oob_size);
static ssize_t nxt_unit_port_recv_default(nxt_unit_ctx_t *ctx,
nxt_unit_port_id_t *port_id, void *buf, size_t buf_size,
void *oob, size_t oob_size);
static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash,
nxt_unit_port_t *port);
static nxt_unit_port_impl_t *nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash,
nxt_unit_port_id_t *port_id, int remove);
static int nxt_unit_request_hash_add(nxt_lvlhsh_t *request_hash,
nxt_unit_request_info_impl_t *req_impl);
static nxt_unit_request_info_impl_t *nxt_unit_request_hash_find(
nxt_lvlhsh_t *request_hash, uint32_t stream, int remove);
static char * nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level);
struct nxt_unit_mmap_buf_s {
nxt_unit_buf_t buf;
nxt_unit_mmap_buf_t *next;
nxt_unit_mmap_buf_t **prev;
nxt_port_mmap_header_t *hdr;
nxt_unit_port_id_t port_id;
nxt_unit_request_info_t *req;
nxt_unit_ctx_impl_t *ctx_impl;
nxt_unit_process_t *process;
char *free_ptr;
char *plain_ptr;
};
struct nxt_unit_recv_msg_s {
uint32_t stream;
nxt_pid_t pid;
nxt_port_id_t reply_port;
uint8_t last; /* 1 bit */
uint8_t mmap; /* 1 bit */
void *start;
uint32_t size;
int fd;
nxt_unit_process_t *process;
nxt_unit_mmap_buf_t *incoming_buf;
};
typedef enum {
NXT_UNIT_RS_START = 0,
NXT_UNIT_RS_RESPONSE_INIT,
NXT_UNIT_RS_RESPONSE_HAS_CONTENT,
NXT_UNIT_RS_RESPONSE_SENT,
NXT_UNIT_RS_RELEASED,
} nxt_unit_req_state_t;
struct nxt_unit_request_info_impl_s {
nxt_unit_request_info_t req;
uint32_t stream;
nxt_unit_process_t *process;
nxt_unit_mmap_buf_t *outgoing_buf;
nxt_unit_mmap_buf_t *incoming_buf;
nxt_unit_req_state_t state;
uint8_t websocket;
nxt_queue_link_t link;
char extra_data[];
};
struct nxt_unit_websocket_frame_impl_s {
nxt_unit_websocket_frame_t ws;
nxt_unit_mmap_buf_t *buf;
nxt_queue_link_t link;
nxt_unit_ctx_impl_t *ctx_impl;
};
struct nxt_unit_read_buf_s {
nxt_unit_read_buf_t *next;
ssize_t size;
char buf[16384];
char oob[256];
};
struct nxt_unit_ctx_impl_s {
nxt_unit_ctx_t ctx;
pthread_mutex_t mutex;
nxt_unit_port_id_t read_port_id;
int read_port_fd;
nxt_queue_link_t link;
nxt_unit_mmap_buf_t *free_buf;
/* of nxt_unit_request_info_impl_t */
nxt_queue_t free_req;
/* of nxt_unit_websocket_frame_impl_t */
nxt_queue_t free_ws;
/* of nxt_unit_request_info_impl_t */
nxt_queue_t active_req;
/* of nxt_unit_request_info_impl_t */
nxt_lvlhsh_t requests;
nxt_unit_read_buf_t *pending_read_head;
nxt_unit_read_buf_t **pending_read_tail;
nxt_unit_read_buf_t *free_read_buf;
nxt_unit_mmap_buf_t ctx_buf[2];
nxt_unit_read_buf_t ctx_read_buf;
nxt_unit_request_info_impl_t req;
};
struct nxt_unit_impl_s {
nxt_unit_t unit;
nxt_unit_callbacks_t callbacks;
uint32_t request_data_size;
uint32_t shm_mmap_limit;
pthread_mutex_t mutex;
nxt_lvlhsh_t processes; /* of nxt_unit_process_t */
nxt_lvlhsh_t ports; /* of nxt_unit_port_impl_t */
nxt_unit_port_id_t ready_port_id;
nxt_queue_t contexts; /* of nxt_unit_ctx_impl_t */
pid_t pid;
int log_fd;
int online;
nxt_unit_ctx_impl_t main_ctx;
};
struct nxt_unit_port_impl_s {
nxt_unit_port_t port;
nxt_queue_link_t link;
nxt_unit_process_t *process;
};
struct nxt_unit_mmap_s {
nxt_port_mmap_header_t *hdr;
};
struct nxt_unit_mmaps_s {
pthread_mutex_t mutex;
uint32_t size;
uint32_t cap;
nxt_atomic_t allocated_chunks;
nxt_unit_mmap_t *elts;
};
struct nxt_unit_process_s {
pid_t pid;
nxt_queue_t ports;
nxt_unit_mmaps_t incoming;
nxt_unit_mmaps_t outgoing;
nxt_unit_impl_t *lib;
nxt_atomic_t use_count;
uint32_t next_port_id;
};
/* Explicitly using 32 bit types to avoid possible alignment. */
typedef struct {
int32_t pid;
uint32_t id;
} nxt_unit_port_hash_id_t;
nxt_unit_ctx_t *
nxt_unit_init(nxt_unit_init_t *init)
{
int rc;
uint32_t ready_stream, shm_limit;
nxt_unit_ctx_t *ctx;
nxt_unit_impl_t *lib;
nxt_unit_port_t ready_port, read_port;
lib = nxt_unit_create(init);
if (nxt_slow_path(lib == NULL)) {
return NULL;
}
if (init->ready_port.id.pid != 0
&& init->ready_stream != 0
&& init->read_port.id.pid != 0)
{
ready_port = init->ready_port;
ready_stream = init->ready_stream;
read_port = init->read_port;
lib->log_fd = init->log_fd;
nxt_unit_port_id_init(&ready_port.id, ready_port.id.pid,
ready_port.id.id);
nxt_unit_port_id_init(&read_port.id, read_port.id.pid,
read_port.id.id);
} else {
rc = nxt_unit_read_env(&ready_port, &read_port, &lib->log_fd,
&ready_stream, &shm_limit);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
goto fail;
}
lib->shm_mmap_limit = (shm_limit + PORT_MMAP_DATA_SIZE - 1)
/ PORT_MMAP_DATA_SIZE;
}
if (nxt_slow_path(lib->shm_mmap_limit < 1)) {
lib->shm_mmap_limit = 1;
}
lib->pid = read_port.id.pid;
ctx = &lib->main_ctx.ctx;
rc = lib->callbacks.add_port(ctx, &ready_port);
if (rc != NXT_UNIT_OK) {
nxt_unit_alert(NULL, "failed to add ready_port");
goto fail;
}
rc = lib->callbacks.add_port(ctx, &read_port);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
nxt_unit_alert(NULL, "failed to add read_port");
goto fail;
}
lib->main_ctx.read_port_id = read_port.id;
lib->ready_port_id = ready_port.id;
rc = nxt_unit_ready(ctx, &ready_port.id, ready_stream);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
nxt_unit_alert(NULL, "failed to send READY message");
goto fail;
}
return ctx;
fail:
free(lib);
return NULL;
}
static nxt_unit_impl_t *
nxt_unit_create(nxt_unit_init_t *init)
{
int rc;
nxt_unit_impl_t *lib;
nxt_unit_callbacks_t *cb;
lib = malloc(sizeof(nxt_unit_impl_t) + init->request_data_size);
if (nxt_slow_path(lib == NULL)) {
nxt_unit_alert(NULL, "failed to allocate unit struct");
return NULL;
}
rc = pthread_mutex_init(&lib->mutex, NULL);
if (nxt_slow_path(rc != 0)) {
nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc);
goto fail;
}
lib->unit.data = init->data;
lib->callbacks = init->callbacks;
lib->request_data_size = init->request_data_size;
lib->shm_mmap_limit = (init->shm_limit + PORT_MMAP_DATA_SIZE - 1)
/ PORT_MMAP_DATA_SIZE;
lib->processes.slot = NULL;
lib->ports.slot = NULL;
lib->log_fd = STDERR_FILENO;
lib->online = 1;
nxt_queue_init(&lib->contexts);
rc = nxt_unit_ctx_init(lib, &lib->main_ctx, init->ctx_data);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
goto fail;
}
cb = &lib->callbacks;
if (cb->request_handler == NULL) {
nxt_unit_alert(NULL, "request_handler is NULL");
goto fail;
}
if (cb->add_port == NULL) {
cb->add_port = nxt_unit_add_port;
}
if (cb->remove_port == NULL) {
cb->remove_port = nxt_unit_remove_port;
}
if (cb->remove_pid == NULL) {
cb->remove_pid = nxt_unit_remove_pid;
}
if (cb->quit == NULL) {
cb->quit = nxt_unit_quit;
}
if (cb->port_send == NULL) {
cb->port_send = nxt_unit_port_send_default;
}
if (cb->port_recv == NULL) {
cb->port_recv = nxt_unit_port_recv_default;
}
return lib;
fail:
free(lib);
return NULL;
}
static int
nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl,
void *data)
{
int rc;
ctx_impl->ctx.data = data;
ctx_impl->ctx.unit = &lib->unit;
nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link);
rc = pthread_mutex_init(&ctx_impl->mutex, NULL);
if (nxt_slow_path(rc != 0)) {
nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc);
return NXT_UNIT_ERROR;
}
nxt_queue_init(&ctx_impl->free_req);
nxt_queue_init(&ctx_impl->free_ws);
nxt_queue_init(&ctx_impl->active_req);
ctx_impl->free_buf = NULL;
nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[1]);
nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[0]);
nxt_queue_insert_tail(&ctx_impl->free_req, &ctx_impl->req.link);
ctx_impl->pending_read_head = NULL;
ctx_impl->pending_read_tail = &ctx_impl->pending_read_head;
ctx_impl->free_read_buf = &ctx_impl->ctx_read_buf;
ctx_impl->ctx_read_buf.next = NULL;
ctx_impl->req.req.ctx = &ctx_impl->ctx;
ctx_impl->req.req.unit = &lib->unit;
ctx_impl->read_port_fd = -1;
ctx_impl->requests.slot = 0;
return NXT_UNIT_OK;
}
nxt_inline void
nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
nxt_unit_mmap_buf_t *mmap_buf)
{
mmap_buf->next = *head;
if (mmap_buf->next != NULL) {
mmap_buf->next->prev = &mmap_buf->next;
}
*head = mmap_buf;
mmap_buf->prev = head;
}
nxt_inline void
nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev,
nxt_unit_mmap_buf_t *mmap_buf)
{
while (*prev != NULL) {
prev = &(*prev)->next;
}
nxt_unit_mmap_buf_insert(prev, mmap_buf);
}
nxt_inline void
nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf)
{
nxt_unit_mmap_buf_t **prev;
prev = mmap_buf->prev;
if (mmap_buf->next != NULL) {
mmap_buf->next->prev = prev;
}
if (prev != NULL) {
*prev = mmap_buf->next;
}
}
static int
nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *read_port,
int *log_fd, uint32_t *stream, uint32_t *shm_limit)
{
int rc;
int ready_fd, read_fd;
char *unit_init, *version_end;
long version_length;
int64_t ready_pid, read_pid;
uint32_t ready_stream, ready_id, read_id;
unit_init = getenv(NXT_UNIT_INIT_ENV);
if (nxt_slow_path(unit_init == NULL)) {
nxt_unit_alert(NULL, "%s is not in the current environment",
NXT_UNIT_INIT_ENV);
return NXT_UNIT_ERROR;
}
nxt_unit_debug(NULL, "%s='%s'", NXT_UNIT_INIT_ENV, unit_init);
version_length = nxt_length(NXT_VERSION);
version_end = strchr(unit_init, ';');
if (version_end == NULL
|| version_end - unit_init != version_length
|| memcmp(unit_init, NXT_VERSION, version_length) != 0)
{
nxt_unit_alert(NULL, "version check error");
return NXT_UNIT_ERROR;
}
rc = sscanf(version_end + 1,
"%"PRIu32";"
"%"PRId64",%"PRIu32",%d;"
"%"PRId64",%"PRIu32",%d;"
"%d,%"PRIu32,
&ready_stream,
&ready_pid, &ready_id, &ready_fd,
&read_pid, &read_id, &read_fd,
log_fd, shm_limit);
if (nxt_slow_path(rc != 9)) {
nxt_unit_alert(NULL, "failed to scan variables: %d", rc);
return NXT_UNIT_ERROR;
}
nxt_unit_port_id_init(&ready_port->id, (pid_t) ready_pid, ready_id);
ready_port->in_fd = -1;
ready_port->out_fd = ready_fd;
ready_port->data = NULL;
nxt_unit_port_id_init(&read_port->id, (pid_t) read_pid, read_id);
read_port->in_fd = read_fd;
read_port->out_fd = -1;
read_port->data = NULL;
*stream = ready_stream;
return NXT_UNIT_OK;
}
static int
nxt_unit_ready(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
uint32_t stream)
{
ssize_t res;
nxt_port_msg_t msg;
nxt_unit_impl_t *lib;
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
msg.stream = stream;
msg.pid = lib->pid;
msg.reply_port = 0;
msg.type = _NXT_PORT_MSG_PROCESS_READY;
msg.last = 1;
msg.mmap = 0;
msg.nf = 0;
msg.mf = 0;
msg.tracking = 0;
res = lib->callbacks.port_send(ctx, port_id, &msg, sizeof(msg), NULL, 0);
if (res != sizeof(msg)) {
return NXT_UNIT_ERROR;
}
return NXT_UNIT_OK;
}
int
nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
void *buf, size_t buf_size, void *oob, size_t oob_size)
{
int rc;
pid_t pid;
struct cmsghdr *cm;
nxt_port_msg_t *port_msg;
nxt_unit_impl_t *lib;
nxt_unit_recv_msg_t recv_msg;
nxt_unit_callbacks_t *cb;
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
rc = NXT_UNIT_ERROR;
recv_msg.fd = -1;
recv_msg.process = NULL;
port_msg = buf;
cm = oob;
if (oob_size >= CMSG_SPACE(sizeof(int))
&& cm->cmsg_len == CMSG_LEN(sizeof(int))
&& cm->cmsg_level == SOL_SOCKET
&& cm->cmsg_type == SCM_RIGHTS)
{
memcpy(&recv_msg.fd, CMSG_DATA(cm), sizeof(int));
}
recv_msg.incoming_buf = NULL;
if (nxt_slow_path(buf_size < sizeof(nxt_port_msg_t))) {
nxt_unit_warn(ctx, "message too small (%d bytes)", (int) buf_size);
goto fail;
}
recv_msg.stream = port_msg->stream;
recv_msg.pid = port_msg->pid;
recv_msg.reply_port = port_msg->reply_port;
recv_msg.last = port_msg->last;
recv_msg.mmap = port_msg->mmap;
recv_msg.start = port_msg + 1;
recv_msg.size = buf_size - sizeof(nxt_port_msg_t);
if (nxt_slow_path(port_msg->type >= NXT_PORT_MSG_MAX)) {
nxt_unit_warn(ctx, "#%"PRIu32": unknown message type (%d)",
port_msg->stream, (int) port_msg->type);
goto fail;
}
if (port_msg->tracking && nxt_unit_tracking_read(ctx, &recv_msg) == 0) {
rc = NXT_UNIT_OK;
goto fail;
}
/* Fragmentation is unsupported. */
if (nxt_slow_path(port_msg->nf != 0 || port_msg->mf != 0)) {
nxt_unit_warn(ctx, "#%"PRIu32": fragmented message type (%d)",
port_msg->stream, (int) port_msg->type);
goto fail;
}
if (port_msg->mmap) {
if (nxt_unit_mmap_read(ctx, &recv_msg) != NXT_UNIT_OK) {
goto fail;
}
}
cb = &lib->callbacks;
switch (port_msg->type) {
case _NXT_PORT_MSG_QUIT:
nxt_unit_debug(ctx, "#%"PRIu32": quit", port_msg->stream);
cb->quit(ctx);
rc = NXT_UNIT_OK;
break;
case _NXT_PORT_MSG_NEW_PORT:
rc = nxt_unit_process_new_port(ctx, &recv_msg);
break;
case _NXT_PORT_MSG_CHANGE_FILE:
nxt_unit_debug(ctx, "#%"PRIu32": change_file: fd %d",
port_msg->stream, recv_msg.fd);
break;
case _NXT_PORT_MSG_MMAP:
if (nxt_slow_path(recv_msg.fd < 0)) {
nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for mmap",
port_msg->stream, recv_msg.fd);
goto fail;
}
rc = nxt_unit_incoming_mmap(ctx, port_msg->pid, recv_msg.fd);
break;
case _NXT_PORT_MSG_REQ_HEADERS:
rc = nxt_unit_process_req_headers(ctx, &recv_msg);
break;
case _NXT_PORT_MSG_WEBSOCKET:
rc = nxt_unit_process_websocket(ctx, &recv_msg);
break;
case _NXT_PORT_MSG_REMOVE_PID:
if (nxt_slow_path(recv_msg.size != sizeof(pid))) {
nxt_unit_warn(ctx, "#%"PRIu32": remove_pid: invalid message size "
"(%d != %d)", port_msg->stream, (int) recv_msg.size,
(int) sizeof(pid));
goto fail;
}
memcpy(&pid, recv_msg.start, sizeof(pid));
nxt_unit_debug(ctx, "#%"PRIu32": remove_pid: %d",
port_msg->stream, (int) pid);
cb->remove_pid(ctx, pid);
rc = NXT_UNIT_OK;
break;
case _NXT_PORT_MSG_SHM_ACK:
rc = nxt_unit_process_shm_ack(ctx);
break;
default:
nxt_unit_debug(ctx, "#%"PRIu32": ignore message type: %d",
port_msg->stream, (int) port_msg->type);
goto fail;
}
fail:
if (recv_msg.fd != -1) {
close(recv_msg.fd);
}
while (recv_msg.incoming_buf != NULL) {
nxt_unit_mmap_buf_free(recv_msg.incoming_buf);
}
if (recv_msg.process != NULL) {
nxt_unit_process_use(ctx, recv_msg.process, -1);
}
return rc;
}
static int
nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
{
int nb;
nxt_unit_impl_t *lib;
nxt_unit_port_t new_port;
nxt_port_msg_new_port_t *new_port_msg;
if (nxt_slow_path(recv_msg->size != sizeof(nxt_port_msg_new_port_t))) {
nxt_unit_warn(ctx, "#%"PRIu32": new_port: "
"invalid message size (%d)",
recv_msg->stream, (int) recv_msg->size);
return NXT_UNIT_ERROR;
}
if (nxt_slow_path(recv_msg->fd < 0)) {
nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for new port",
recv_msg->stream, recv_msg->fd);
return NXT_UNIT_ERROR;
}
new_port_msg = recv_msg->start;
nxt_unit_debug(ctx, "#%"PRIu32": new_port: %d,%d fd %d",
recv_msg->stream, (int) new_port_msg->pid,
(int) new_port_msg->id, recv_msg->fd);
nb = 0;
if (nxt_slow_path(ioctl(recv_msg->fd, FIONBIO, &nb) == -1)) {
nxt_unit_alert(ctx, "#%"PRIu32": new_port: ioctl(%d, FIONBIO, 0) "
"failed: %s (%d)",
recv_msg->stream, recv_msg->fd, strerror(errno), errno);
return NXT_UNIT_ERROR;
}
nxt_unit_port_id_init(&new_port.id, new_port_msg->pid,
new_port_msg->id);
new_port.in_fd = -1;
new_port.out_fd = recv_msg->fd;
new_port.data = NULL;
recv_msg->fd = -1;
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
return lib->callbacks.add_port(ctx, &new_port);
}
static int
nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
{
nxt_unit_impl_t *lib;
nxt_unit_request_t *r;
nxt_unit_mmap_buf_t *b;
nxt_unit_request_info_t *req;
nxt_unit_request_info_impl_t *req_impl;
if (nxt_slow_path(recv_msg->mmap == 0)) {
nxt_unit_warn(ctx, "#%"PRIu32": data is not in shared memory",
recv_msg->stream);
return NXT_UNIT_ERROR;
}
if (nxt_slow_path(recv_msg->size < sizeof(nxt_unit_request_t))) {
nxt_unit_warn(ctx, "#%"PRIu32": data too short: %d while at least "
"%d expected", recv_msg->stream, (int) recv_msg->size,
(int) sizeof(nxt_unit_request_t));
return NXT_UNIT_ERROR;
}
req_impl = nxt_unit_request_info_get(ctx);
if (nxt_slow_path(req_impl == NULL)) {
nxt_unit_warn(ctx, "#%"PRIu32": request info allocation failed",
recv_msg->stream);
return NXT_UNIT_ERROR;
}
req = &req_impl->req;
nxt_unit_port_id_init(&req->response_port, recv_msg->pid,
recv_msg->reply_port);
req->request = recv_msg->start;
b = recv_msg->incoming_buf;
req->request_buf = &b->buf;
req->response = NULL;
req->response_buf = NULL;
r = req->request;
req->content_length = r->content_length;
req->content_buf = req->request_buf;
req->content_buf->free = nxt_unit_sptr_get(&r->preread_content);
/* "Move" process reference to req_impl. */
req_impl->process = nxt_unit_msg_get_process(ctx, recv_msg);
if (nxt_slow_path(req_impl->process == NULL)) {
return NXT_UNIT_ERROR;
}
recv_msg->process = NULL;
req_impl->stream = recv_msg->stream;
req_impl->outgoing_buf = NULL;
for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
b->req = req;
}
/* "Move" incoming buffer list to req_impl. */
req_impl->incoming_buf = recv_msg->incoming_buf;
req_impl->incoming_buf->prev = &req_impl->incoming_buf;
recv_msg->incoming_buf = NULL;
req->response_max_fields = 0;
req_impl->state = NXT_UNIT_RS_START;
req_impl->websocket = 0;
nxt_unit_debug(ctx, "#%"PRIu32": %.*s %.*s (%d)", recv_msg->stream,
(int) r->method_length, nxt_unit_sptr_get(&r->method),
(int) r->target_length, nxt_unit_sptr_get(&r->target),
(int) r->content_length);
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
lib->callbacks.request_handler(req);
return NXT_UNIT_OK;
}
static int
nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
{
size_t hsize;
nxt_unit_impl_t *lib;
nxt_unit_mmap_buf_t *b;
nxt_unit_ctx_impl_t *ctx_impl;
nxt_unit_callbacks_t *cb;
nxt_unit_request_info_t *req;
nxt_unit_request_info_impl_t *req_impl;
nxt_unit_websocket_frame_impl_t *ws_impl;
ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
req_impl = nxt_unit_request_hash_find(&ctx_impl->requests, recv_msg->stream,
recv_msg->last);
if (req_impl == NULL) {
return NXT_UNIT_OK;
}
req = &req_impl->req;
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
cb = &lib->callbacks;
if (cb->websocket_handler && recv_msg->size >= 2) {
ws_impl = nxt_unit_websocket_frame_get(ctx);
if (nxt_slow_path(ws_impl == NULL)) {
nxt_unit_warn(ctx, "#%"PRIu32": websocket frame allocation failed",
req_impl->stream);
return NXT_UNIT_ERROR;
}
ws_impl->ws.req = req;
ws_impl->buf = NULL;
if (recv_msg->mmap) {
for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
b->req = req;
}
/* "Move" incoming buffer list to ws_impl. */
ws_impl->buf = recv_msg->incoming_buf;
ws_impl->buf->prev = &ws_impl->buf;
recv_msg->incoming_buf = NULL;
b = ws_impl->buf;
} else {
b = nxt_unit_mmap_buf_get(ctx);
if (nxt_slow_path(b == NULL)) {
nxt_unit_alert(ctx, "#%"PRIu32": failed to allocate buf",
req_impl->stream);
nxt_unit_websocket_frame_release(&ws_impl->ws);
return NXT_UNIT_ERROR;
}
b->req = req;
b->buf.start = recv_msg->start;
b->buf.free = b->buf.start;
b->buf.end = b->buf.start + recv_msg->size;
nxt_unit_mmap_buf_insert(&ws_impl->buf, b);
}
ws_impl->ws.header = (void *) b->buf.start;
ws_impl->ws.payload_len = nxt_websocket_frame_payload_len(
ws_impl->ws.header);
hsize = nxt_websocket_frame_header_size(ws_impl->ws.header);
if (ws_impl->ws.header->mask) {
ws_impl->ws.mask = (uint8_t *) b->buf.start + hsize - 4;
} else {
ws_impl->ws.mask = NULL;
}
b->buf.free += hsize;
ws_impl->ws.content_buf = &b->buf;
ws_impl->ws.content_length = ws_impl->ws.payload_len;
nxt_unit_req_debug(req, "websocket_handler: opcode=%d, "
"payload_len=%"PRIu64,
ws_impl->ws.header->opcode,
ws_impl->ws.payload_len);
cb->websocket_handler(&ws_impl->ws);
}
if (recv_msg->last) {
req_impl->websocket = 0;
if (cb->close_handler) {
nxt_unit_req_debug(req, "close_handler");
cb->close_handler(req);
} else {
nxt_unit_request_done(req, NXT_UNIT_ERROR);
}
}
return NXT_UNIT_OK;
}
static int
nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx)
{
nxt_unit_impl_t *lib;
nxt_unit_callbacks_t *cb;
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
cb = &lib->callbacks;
if (cb->shm_ack_handler != NULL) {
cb->shm_ack_handler(ctx);
}
return NXT_UNIT_OK;
}
static nxt_unit_request_info_impl_t *
nxt_unit_request_info_get(nxt_unit_ctx_t *ctx)
{
nxt_unit_impl_t *lib;
nxt_queue_link_t *lnk;
nxt_unit_ctx_impl_t *ctx_impl;
nxt_unit_request_info_impl_t *req_impl;
ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
pthread_mutex_lock(&ctx_impl->mutex);
if (nxt_queue_is_empty(&ctx_impl->free_req)) {
pthread_mutex_unlock(&ctx_impl->mutex);
req_impl = malloc(sizeof(nxt_unit_request_info_impl_t)
+ lib->request_data_size);
if (nxt_slow_path(req_impl == NULL)) {
return NULL;
}
req_impl->req.unit = ctx->unit;
req_impl->req.ctx = ctx;
pthread_mutex_lock(&ctx_impl->mutex);
} else {
lnk = nxt_queue_first(&ctx_impl->free_req);
nxt_queue_remove(lnk);
req_impl = nxt_container_of(lnk, nxt_unit_request_info_impl_t, link);
}
nxt_queue_insert_tail(&ctx_impl->active_req, &req_impl->link);
pthread_mutex_unlock(&ctx_impl->mutex);
req_impl->req.data = lib->request_data_size ? req_impl->extra_data : NULL;
return req_impl;
}
static void
nxt_unit_request_info_release(nxt_unit_request_info_t *req)
{
nxt_unit_ctx_impl_t *ctx_impl;
nxt_unit_request_info_impl_t *req_impl;
ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx);
req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
req->response = NULL;
req->response_buf = NULL;
if (req_impl->websocket) {
nxt_unit_request_hash_find(&ctx_impl->requests, req_impl->stream, 1);
req_impl->websocket = 0;
}
while (req_impl->outgoing_buf != NULL) {
nxt_unit_mmap_buf_free(req_impl->outgoing_buf);
}
while (req_impl->incoming_buf != NULL) {
nxt_unit_mmap_buf_free(req_impl->incoming_buf);
}
/*
* Process release should go after buffers release to guarantee mmap
* existence.
*/
if (req_impl->process != NULL) {
nxt_unit_process_use(req->ctx, req_impl->process, -1);
req_impl->process = NULL;
}
pthread_mutex_lock(&ctx_impl->mutex);
nxt_queue_remove(&req_impl->link);
nxt_queue_insert_tail(&ctx_impl->free_req, &req_impl->link);
pthread_mutex_unlock(&ctx_impl->mutex);
req_impl->state = NXT_UNIT_RS_RELEASED;
}
static void
nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req_impl)
{
nxt_unit_ctx_impl_t *ctx_impl;
ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t, ctx);
nxt_queue_remove(&req_impl->link);
if (req_impl != &ctx_impl->req) {
free(req_impl);
}
}
static nxt_unit_websocket_frame_impl_t *
nxt_unit_websocket_frame_get(nxt_unit_ctx_t *ctx)
{
nxt_queue_link_t *lnk;
nxt_unit_ctx_impl_t *ctx_impl;
nxt_unit_websocket_frame_impl_t *ws_impl;
ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
pthread_mutex_lock(&ctx_impl->mutex);
if (nxt_queue_is_empty(&ctx_impl->free_ws)) {
pthread_mutex_unlock(&ctx_impl->mutex);
ws_impl = malloc(sizeof(nxt_unit_websocket_frame_impl_t));
if (nxt_slow_path(ws_impl == NULL)) {
return NULL;
}
} else {
lnk = nxt_queue_first(&ctx_impl->free_ws);
nxt_queue_remove(lnk);
pthread_mutex_unlock(&ctx_impl->mutex);
ws_impl = nxt_container_of(lnk, nxt_unit_websocket_frame_impl_t, link);
}
ws_impl->ctx_impl = ctx_impl;
return ws_impl;
}
static void
nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws)
{
nxt_unit_websocket_frame_impl_t *ws_impl;
ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws);
while (ws_impl->buf != NULL) {
nxt_unit_mmap_buf_free(ws_impl->buf);
}
ws->req = NULL;
pthread_mutex_lock(&ws_impl->ctx_impl->mutex);
nxt_queue_insert_tail(&ws_impl->ctx_impl->free_ws, &ws_impl->link);
pthread_mutex_unlock(&ws_impl->ctx_impl->mutex);
}
static void
nxt_unit_websocket_frame_free(nxt_unit_websocket_frame_impl_t *ws_impl)
{
nxt_queue_remove(&ws_impl->link);
free(ws_impl);
}
uint16_t
nxt_unit_field_hash(const char *name, size_t name_length)
{
u_char ch;
uint32_t hash;
const char *p, *end;
hash = 159406; /* Magic value copied from nxt_http_parse.c */
end = name + name_length;
for (p = name; p < end; p++) {
ch = *p;
hash = (hash << 4) + hash + nxt_lowcase(ch);
}
hash = (hash >> 16) ^ hash;
return hash;
}
void
nxt_unit_request_group_dup_fields(nxt_unit_request_info_t *req)
{
uint32_t i, j;
nxt_unit_field_t *fields, f;
nxt_unit_request_t *r;
nxt_unit_req_debug(req, "group_dup_fields");
r = req->request;
fields = r->fields;
for (i = 0; i < r->fields_count; i++) {
switch (fields[i].hash) {
case NXT_UNIT_HASH_CONTENT_LENGTH:
r->content_length_field = i;
break;
case NXT_UNIT_HASH_CONTENT_TYPE:
r->content_type_field = i;
break;
case NXT_UNIT_HASH_COOKIE:
r->cookie_field = i;
break;
};
for (j = i + 1; j < r->fields_count; j++) {
if (fields[i].hash != fields[j].hash) {
continue;
}
if (j == i + 1) {
continue;
}
f = fields[j];
f.name.offset += (j - (i + 1)) * sizeof(f);
f.value.offset += (j - (i + 1)) * sizeof(f);
while (j > i + 1) {
fields[j] = fields[j - 1];
fields[j].name.offset -= sizeof(f);
fields[j].value.offset -= sizeof(f);
j--;
}
fields[j] = f;
i++;
}
}
}
int
nxt_unit_response_init(nxt_unit_request_info_t *req,
uint16_t status, uint32_t max_fields_count, uint32_t max_fields_size)
{
uint32_t buf_size;
nxt_unit_buf_t *buf;
nxt_unit_request_info_impl_t *req_impl;
req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
nxt_unit_req_warn(req, "init: response already sent");
return NXT_UNIT_ERROR;
}
nxt_unit_req_debug(req, "init: %d, max fields %d/%d", (int) status,
(int) max_fields_count, (int) max_fields_size);
if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT)) {
nxt_unit_req_debug(req, "duplicate response init");
}
/*
* Each field name and value 0-terminated by libunit,
* this is the reason of '+ 2' below.
*/
buf_size = sizeof(nxt_unit_response_t)
+ max_fields_count * (sizeof(nxt_unit_field_t) + 2)
+ max_fields_size;
if (nxt_slow_path(req->response_buf != NULL)) {
buf = req->response_buf;
if (nxt_fast_path(buf_size <= (uint32_t) (buf->end - buf->start))) {
goto init_response;
}
nxt_unit_buf_free(buf);
req->response_buf = NULL;
req->response = NULL;
req->response_max_fields = 0;
req_impl->state = NXT_UNIT_RS_START;
}
buf = nxt_unit_response_buf_alloc(req, buf_size);
if (nxt_slow_path(buf == NULL)) {
return NXT_UNIT_ERROR;
}
init_response:
memset(buf->start, 0, sizeof(nxt_unit_response_t));
req->response_buf = buf;
req->response = (nxt_unit_response_t *) buf->start;
req->response->status = status;
buf->free = buf->start + sizeof(nxt_unit_response_t)
+ max_fields_count * sizeof(nxt_unit_field_t);
req->response_max_fields = max_fields_count;
req_impl->state = NXT_UNIT_RS_RESPONSE_INIT;
return NXT_UNIT_OK;
}
int
nxt_unit_response_realloc(nxt_unit_request_info_t *req,
uint32_t max_fields_count, uint32_t max_fields_size)
{
char *p;
uint32_t i, buf_size;
nxt_unit_buf_t *buf;
nxt_unit_field_t *f, *src;
nxt_unit_response_t *resp;
nxt_unit_request_info_impl_t *req_impl;
req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
nxt_unit_req_warn(req, "realloc: response not init");
return NXT_UNIT_ERROR;
}
if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
nxt_unit_req_warn(req, "realloc: response already sent");
return NXT_UNIT_ERROR;
}
if (nxt_slow_path(max_fields_count < req->response->fields_count)) {
nxt_unit_req_warn(req, "realloc: new max_fields_count is too small");
return NXT_UNIT_ERROR;
}
/*
* Each field name and value 0-terminated by libunit,
* this is the reason of '+ 2' below.
*/
buf_size = sizeof(nxt_unit_response_t)
+ max_fields_count * (sizeof(nxt_unit_field_t) + 2)
+ max_fields_size;
nxt_unit_req_debug(req, "realloc %"PRIu32"", buf_size);
buf = nxt_unit_response_buf_alloc(req, buf_size);
if (nxt_slow_path(buf == NULL)) {
nxt_unit_req_warn(req, "realloc: new buf allocation failed");
return NXT_UNIT_ERROR;
}
resp = (nxt_unit_response_t *) buf->start;
memset(resp, 0, sizeof(nxt_unit_response_t));
resp->status = req->response->status;
resp->content_length = req->response->content_length;
p = buf->start + max_fields_count * sizeof(nxt_unit_field_t);
f = resp->fields;
for (i = 0; i < req->response->fields_count; i++) {
src = req->response->fields + i;
if (nxt_slow_path(src->skip != 0)) {
continue;
}
if (nxt_slow_path(src->name_length + src->value_length + 2
> (uint32_t) (buf->end - p)))
{
nxt_unit_req_warn(req, "realloc: not enough space for field"
" #%"PRIu32" (%p), (%"PRIu32" + %"PRIu32") required",
i, src, src->name_length, src->value_length);
goto fail;
}
nxt_unit_sptr_set(&f->name, p);
p = nxt_cpymem(p, nxt_unit_sptr_get(&src->name), src->name_length);
*p++ = '\0';
nxt_unit_sptr_set(&f->value, p);
p = nxt_cpymem(p, nxt_unit_sptr_get(&src->value), src->value_length);
*p++ = '\0';
f->hash = src->hash;
f->skip = 0;
f->name_length = src->name_length;
f->value_length = src->value_length;
resp->fields_count++;
f++;
}
if (req->response->piggyback_content_length > 0) {
if (nxt_slow_path(req->response->piggyback_content_length
> (uint32_t) (buf->end - p)))
{
nxt_unit_req_warn(req, "realloc: not enought space for content"
" #%"PRIu32", %"PRIu32" required",
i, req->response->piggyback_content_length);
goto fail;
}
resp->piggyback_content_length =
req->response->piggyback_content_length;
nxt_unit_sptr_set(&resp->piggyback_content, p);
p = nxt_cpymem(p, nxt_unit_sptr_get(&req->response->piggyback_content),
req->response->piggyback_content_length);
}
buf->free = p;
nxt_unit_buf_free(req->response_buf);
req->response = resp;
req->response_buf = buf;
req->response_max_fields = max_fields_count;
return NXT_UNIT_OK;
fail:
nxt_unit_buf_free(buf);
return NXT_UNIT_ERROR;
}
int
nxt_unit_response_is_init(nxt_unit_request_info_t *req)
{
nxt_unit_request_info_impl_t *req_impl;
req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
return req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT;
}
int
nxt_unit_response_add_field(nxt_unit_request_info_t *req,
const char *name, uint8_t name_length,
const char *value, uint32_t value_length)
{
nxt_unit_buf_t *buf;
nxt_unit_field_t *f;
nxt_unit_response_t *resp;
nxt_unit_request_info_impl_t *req_impl;
req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
if (nxt_slow_path(req_impl->state != NXT_UNIT_RS_RESPONSE_INIT)) {
nxt_unit_req_warn(req, "add_field: response not initialized or "
"already sent");
return NXT_UNIT_ERROR;
}
resp = req->response;
if (nxt_slow_path(resp->fields_count >= req->response_max_fields)) {
nxt_unit_req_warn(req, "add_field: too many response fields");
return NXT_UNIT_ERROR;
}
buf = req->response_buf;
if (nxt_slow_path(name_length + value_length + 2
> (uint32_t) (buf->end - buf->free)))
{
nxt_unit_req_warn(req, "add_field: response buffer overflow");
return NXT_UNIT_ERROR;
}
nxt_unit_req_debug(req, "add_field #%"PRIu32": %.*s: %.*s",
resp->fields_count,
(int) name_length, name,
(int) value_length, value);
f = resp->fields + resp->fields_count;
nxt_unit_sptr_set(&f->name, buf->free);
buf->free = nxt_cpymem(buf->free, name, name_length);
*buf->free++ = '\0';
nxt_unit_sptr_set(&f->value, buf->free);
buf->free = nxt_cpymem(buf->free, value, value_length);
*buf->free++ = '\0';
f->hash = nxt_unit_field_hash(name, name_length);
f->skip = 0;
f->name_length = name_length;
f->value_length = value_length;
resp->fields_count++;
return NXT_UNIT_OK;
}
int
nxt_unit_response_add_content(nxt_unit_request_info_t *req,
const void* src, uint32_t size)
{
nxt_unit_buf_t *buf;
nxt_unit_response_t *resp;
nxt_unit_request_info_impl_t *req_impl;
req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
nxt_unit_req_warn(req, "add_content: response not initialized yet");
return NXT_UNIT_ERROR;
}
if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
nxt_unit_req_warn(req, "add_content: response already sent");
return NXT_UNIT_ERROR;
}
buf = req->response_buf;
if (nxt_slow_path(size > (uint32_t) (buf->end - buf->free))) {
nxt_unit_req_warn(req, "add_content: buffer overflow");
return NXT_UNIT_ERROR;
}
resp = req->response;
if (resp->piggyback_content_length == 0) {
nxt_unit_sptr_set(&resp->piggyback_content, buf->free);
req_impl->state = NXT_UNIT_RS_RESPONSE_HAS_CONTENT;
}
resp->piggyback_content_length += size;
buf->free = nxt_cpymem(buf->free, src, size);
return NXT_UNIT_OK;
}
int
nxt_unit_response_send(nxt_unit_request_info_t *req)
{
int rc;
nxt_unit_mmap_buf_t *mmap_buf;
nxt_unit_request_info_impl_t *req_impl;
req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
nxt_unit_req_warn(req, "send: response is not initialized yet");
return NXT_UNIT_ERROR;
}
if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
nxt_unit_req_warn(req, "send: response already sent");
return NXT_UNIT_ERROR;
}
if (req->request->websocket_handshake && req->response->status == 101) {
nxt_unit_response_upgrade(req);
}
nxt_unit_req_debug(req, "send: %"PRIu32" fields, %d bytes",
req->response->fields_count,
(int) (req->response_buf->free
- req->response_buf->start));
mmap_buf = nxt_container_of(req->response_buf, nxt_unit_mmap_buf_t, buf);
rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, mmap_buf, 0);
if (nxt_fast_path(rc == NXT_UNIT_OK)) {
req->response = NULL;
req->response_buf = NULL;
req_impl->state = NXT_UNIT_RS_RESPONSE_SENT;
nxt_unit_mmap_buf_free(mmap_buf);
}
return rc;
}
int
nxt_unit_response_is_sent(nxt_unit_request_info_t *req)
{
nxt_unit_request_info_impl_t *req_impl;
req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
return req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT;
}
nxt_unit_buf_t *
nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size)
{
int rc;
nxt_unit_mmap_buf_t *mmap_buf;
nxt_unit_request_info_impl_t *req_impl;
if (nxt_slow_path(size > PORT_MMAP_DATA_SIZE)) {
nxt_unit_req_warn(req, "response_buf_alloc: "
"requested buffer (%"PRIu32") too big", size);
return NULL;
}
nxt_unit_req_debug(req, "response_buf_alloc: %"PRIu32, size);
req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
mmap_buf = nxt_unit_mmap_buf_get(req->ctx);
if (nxt_slow_path(mmap_buf == NULL)) {
nxt_unit_req_alert(req, "response_buf_alloc: failed to allocate buf");
return NULL;
}
mmap_buf->req = req;
nxt_unit_mmap_buf_insert_tail(&req_impl->outgoing_buf, mmap_buf);
rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process,
&req->response_port, size, size, mmap_buf,
NULL);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
nxt_unit_mmap_buf_release(mmap_buf);
return NULL;
}
return &mmap_buf->buf;
}
static nxt_unit_process_t *
nxt_unit_msg_get_process(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
{
nxt_unit_impl_t *lib;
if (recv_msg->process != NULL) {
return recv_msg->process;
}
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
pthread_mutex_lock(&lib->mutex);
recv_msg->process = nxt_unit_process_find(ctx, recv_msg->pid, 0);
pthread_mutex_unlock(&lib->mutex);
if (recv_msg->process == NULL) {
nxt_unit_warn(ctx, "#%"PRIu32": process %d not found",
recv_msg->stream, (int) recv_msg->pid);
}
return recv_msg->process;
}
static nxt_unit_mmap_buf_t *
nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx)
{
nxt_unit_mmap_buf_t *mmap_buf;
nxt_unit_ctx_impl_t *ctx_impl;
ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
pthread_mutex_lock(&ctx_impl->mutex);
if (ctx_impl->free_buf == NULL) {
pthread_mutex_unlock(&ctx_impl->mutex);
mmap_buf = malloc(sizeof(nxt_unit_mmap_buf_t));
if (nxt_slow_path(mmap_buf == NULL)) {
return NULL;
}
} else {
mmap_buf = ctx_impl->free_buf;
nxt_unit_mmap_buf_unlink(mmap_buf);
pthread_mutex_unlock(&ctx_impl->mutex);
}
mmap_buf->ctx_impl = ctx_impl;
mmap_buf->hdr = NULL;
mmap_buf->free_ptr = NULL;
return mmap_buf;
}
static void
nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf)
{
nxt_unit_mmap_buf_unlink(mmap_buf);
pthread_mutex_lock(&mmap_buf->ctx_impl->mutex);
nxt_unit_mmap_buf_insert(&mmap_buf->ctx_impl->free_buf, mmap_buf);
pthread_mutex_unlock(&mmap_buf->ctx_impl->mutex);
}
typedef struct {
size_t len;
const char *str;
} nxt_unit_str_t;
#define nxt_unit_str(str) { nxt_length(str), str }
int
nxt_unit_request_is_websocket_handshake(nxt_unit_request_info_t *req)
{
return req->request->websocket_handshake;
}
int
nxt_unit_response_upgrade(nxt_unit_request_info_t *req)
{
int rc;
nxt_unit_ctx_impl_t *ctx_impl;
nxt_unit_request_info_impl_t *req_impl;
req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
if (nxt_slow_path(req_impl->websocket != 0)) {
nxt_unit_req_debug(req, "upgrade: already upgraded");
return NXT_UNIT_OK;
}
if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
nxt_unit_req_warn(req, "upgrade: response is not initialized yet");
return NXT_UNIT_ERROR;
}
if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
nxt_unit_req_warn(req, "upgrade: response already sent");
return NXT_UNIT_ERROR;
}
ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx);
rc = nxt_unit_request_hash_add(&ctx_impl->requests, req_impl);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
nxt_unit_req_warn(req, "upgrade: failed to add request to hash");
return NXT_UNIT_ERROR;
}
req_impl->websocket = 1;
req->response->status = 101;
return NXT_UNIT_OK;
}
int
nxt_unit_response_is_websocket(nxt_unit_request_info_t *req)
{
nxt_unit_request_info_impl_t *req_impl;
req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
return req_impl->websocket;
}
nxt_unit_request_info_t *
nxt_unit_get_request_info_from_data(void *data)
{
nxt_unit_request_info_impl_t *req_impl;
req_impl = nxt_container_of(data, nxt_unit_request_info_impl_t, extra_data);
return &req_impl->req;
}
int
nxt_unit_buf_send(nxt_unit_buf_t *buf)
{
int rc;
nxt_unit_mmap_buf_t *mmap_buf;
nxt_unit_request_info_t *req;
nxt_unit_request_info_impl_t *req_impl;
mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
req = mmap_buf->req;
req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
nxt_unit_req_debug(req, "buf_send: %d bytes",
(int) (buf->free - buf->start));
if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
nxt_unit_req_warn(req, "buf_send: response not initialized yet");
return NXT_UNIT_ERROR;
}
if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_SENT)) {
nxt_unit_req_warn(req, "buf_send: headers not sent yet");
return NXT_UNIT_ERROR;
}
if (nxt_fast_path(buf->free > buf->start)) {
rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, mmap_buf, 0);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
return rc;
}
}
nxt_unit_mmap_buf_free(mmap_buf);
return NXT_UNIT_OK;
}
static void
nxt_unit_buf_send_done(nxt_unit_buf_t *buf)
{
int rc;
nxt_unit_mmap_buf_t *mmap_buf;
nxt_unit_request_info_t *req;
nxt_unit_request_info_impl_t *req_impl;
mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
req = mmap_buf->req;
req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, mmap_buf, 1);
if (nxt_slow_path(rc == NXT_UNIT_OK)) {
nxt_unit_mmap_buf_free(mmap_buf);
nxt_unit_request_info_release(req);
} else {
nxt_unit_request_done(req, rc);
}
}
static int
nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream,
nxt_unit_mmap_buf_t *mmap_buf, int last)
{
struct {
nxt_port_msg_t msg;
nxt_port_mmap_msg_t mmap_msg;
} m;
int rc;
u_char *last_used, *first_free;
ssize_t res;
nxt_chunk_id_t first_free_chunk;
nxt_unit_buf_t *buf;
nxt_unit_impl_t *lib;
nxt_port_mmap_header_t *hdr;
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
buf = &mmap_buf->buf;
hdr = mmap_buf->hdr;
m.mmap_msg.size = buf->free - buf->start;
m.msg.stream = stream;
m.msg.pid = lib->pid;
m.msg.reply_port = 0;
m.msg.type = _NXT_PORT_MSG_DATA;
m.msg.last = last != 0;
m.msg.mmap = hdr != NULL && m.mmap_msg.size > 0;
m.msg.nf = 0;
m.msg.mf = 0;
m.msg.tracking = 0;
rc = NXT_UNIT_ERROR;
if (m.msg.mmap) {
m.mmap_msg.mmap_id = hdr->id;
m.mmap_msg.chunk_id = nxt_port_mmap_chunk_id(hdr,
(u_char *) buf->start);
nxt_unit_debug(ctx, "#%"PRIu32": send mmap: (%d,%d,%d)",
stream,
(int) m.mmap_msg.mmap_id,
(int) m.mmap_msg.chunk_id,
(int) m.mmap_msg.size);
res = lib->callbacks.port_send(ctx, &mmap_buf->port_id, &m, sizeof(m),
NULL, 0);
if (nxt_slow_path(res != sizeof(m))) {
goto free_buf;
}
last_used = (u_char *) buf->free - 1;
first_free_chunk = nxt_port_mmap_chunk_id(hdr, last_used) + 1;
if (buf->end - buf->free >= PORT_MMAP_CHUNK_SIZE) {
first_free = nxt_port_mmap_chunk_start(hdr, first_free_chunk);
buf->start = (char *) first_free;
buf->free = buf->start;
if (buf->end < buf->start) {
buf->end = buf->start;
}
} else {
buf->start = NULL;
buf->free = NULL;
buf->end = NULL;
mmap_buf->hdr = NULL;
}
nxt_atomic_fetch_add(&mmap_buf->process->outgoing.allocated_chunks,
(int) m.mmap_msg.chunk_id - (int) first_free_chunk);
nxt_unit_debug(ctx, "process %d allocated_chunks %d",
mmap_buf->process->pid,
mmap_buf->process->outgoing.allocated_chunks);
} else {
if (nxt_slow_path(mmap_buf->plain_ptr == NULL
|| mmap_buf->plain_ptr > buf->start - sizeof(m.msg)))
{
nxt_unit_warn(ctx, "#%"PRIu32": failed to send plain memory buffer"
": no space reserved for message header", stream);
goto free_buf;
}
memcpy(buf->start - sizeof(m.msg), &m.msg, sizeof(m.msg));
nxt_unit_debug(ctx, "#%"PRIu32": send plain: %d",
stream,
(int) (sizeof(m.msg) + m.mmap_msg.size));
res = lib->callbacks.port_send(ctx, &mmap_buf->port_id,
buf->start - sizeof(m.msg),
m.mmap_msg.size + sizeof(m.msg),
NULL, 0);
if (nxt_slow_path(res != (ssize_t) (m.mmap_msg.size + sizeof(m.msg)))) {
goto free_buf;
}
}
rc = NXT_UNIT_OK;
free_buf:
nxt_unit_free_outgoing_buf(mmap_buf);
return rc;
}
void
nxt_unit_buf_free(nxt_unit_buf_t *buf)
{
nxt_unit_mmap_buf_free(nxt_container_of(buf, nxt_unit_mmap_buf_t, buf));
}
static void
nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf)
{
nxt_unit_free_outgoing_buf(mmap_buf);
nxt_unit_mmap_buf_release(mmap_buf);
}
static void
nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf)
{
if (mmap_buf->hdr != NULL) {
nxt_unit_mmap_release(&mmap_buf->ctx_impl->ctx,
mmap_buf->process,
mmap_buf->hdr, mmap_buf->buf.start,
mmap_buf->buf.end - mmap_buf->buf.start);
mmap_buf->hdr = NULL;
return;
}
if (mmap_buf->free_ptr != NULL) {
free(mmap_buf->free_ptr);
mmap_buf->free_ptr = NULL;
}
}
static nxt_unit_read_buf_t *
nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx)
{
nxt_unit_ctx_impl_t *ctx_impl;
ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
pthread_mutex_lock(&ctx_impl->mutex);
return nxt_unit_read_buf_get_impl(ctx_impl);
}
static nxt_unit_read_buf_t *
nxt_unit_read_buf_get_impl(nxt_unit_ctx_impl_t *ctx_impl)
{
nxt_unit_read_buf_t *rbuf;
if (ctx_impl->free_read_buf != NULL) {
rbuf = ctx_impl->free_read_buf;
ctx_impl->free_read_buf = rbuf->next;
pthread_mutex_unlock(&ctx_impl->mutex);
return rbuf;
}
pthread_mutex_unlock(&ctx_impl->mutex);
rbuf = malloc(sizeof(nxt_unit_read_buf_t));
return rbuf;
}
static void
nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx,
nxt_unit_read_buf_t *rbuf)
{
nxt_unit_ctx_impl_t *ctx_impl;
ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
pthread_mutex_lock(&ctx_impl->mutex);
rbuf->next = ctx_impl->free_read_buf;
ctx_impl->free_read_buf = rbuf;
pthread_mutex_unlock(&ctx_impl->mutex);
}
nxt_unit_buf_t *
nxt_unit_buf_next(nxt_unit_buf_t *buf)
{
nxt_unit_mmap_buf_t *mmap_buf;
mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
if (mmap_buf->next == NULL) {
return NULL;
}
return &mmap_buf->next->buf;
}
uint32_t
nxt_unit_buf_max(void)
{
return PORT_MMAP_DATA_SIZE;
}
uint32_t
nxt_unit_buf_min(void)
{
return PORT_MMAP_CHUNK_SIZE;
}
int
nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start,
size_t size)
{
ssize_t res;
res = nxt_unit_response_write_nb(req, start, size, size);
return res < 0 ? -res : NXT_UNIT_OK;
}
ssize_t
nxt_unit_response_write_nb(nxt_unit_request_info_t *req, const void *start,
size_t size, size_t min_size)
{
int rc;
ssize_t sent;
uint32_t part_size, min_part_size, buf_size;
const char *part_start;
nxt_unit_mmap_buf_t mmap_buf;
nxt_unit_request_info_impl_t *req_impl;
char local_buf[NXT_UNIT_LOCAL_BUF_SIZE];
req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
part_start = start;
sent = 0;
if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
nxt_unit_req_warn(req, "write: response not initialized yet");
return -NXT_UNIT_ERROR;
}
/* Check if response is not send yet. */
if (nxt_slow_path(req->response_buf != NULL)) {
part_size = req->response_buf->end - req->response_buf->free;
part_size = nxt_min(size, part_size);
rc = nxt_unit_response_add_content(req, part_start, part_size);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
return -rc;
}
rc = nxt_unit_response_send(req);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
return -rc;
}
size -= part_size;
part_start += part_size;
sent += part_size;
min_size -= nxt_min(min_size, part_size);
}
while (size > 0) {
part_size = nxt_min(size, PORT_MMAP_DATA_SIZE);
min_part_size = nxt_min(min_size, part_size);
min_part_size = nxt_min(min_part_size, PORT_MMAP_CHUNK_SIZE);
rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process,
&req->response_port, part_size,
min_part_size, &mmap_buf, local_buf);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
return -rc;
}
buf_size = mmap_buf.buf.end - mmap_buf.buf.free;
if (nxt_slow_path(buf_size == 0)) {
return sent;
}
part_size = nxt_min(buf_size, part_size);
mmap_buf.buf.free = nxt_cpymem(mmap_buf.buf.free,
part_start, part_size);
rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, &mmap_buf, 0);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
return -rc;
}
size -= part_size;
part_start += part_size;
sent += part_size;
min_size -= nxt_min(min_size, part_size);
}
return sent;
}
int
nxt_unit_response_write_cb(nxt_unit_request_info_t *req,
nxt_unit_read_info_t *read_info)
{
int rc;
ssize_t n;
uint32_t buf_size;
nxt_unit_buf_t *buf;
nxt_unit_mmap_buf_t mmap_buf;
nxt_unit_request_info_impl_t *req_impl;
char local_buf[NXT_UNIT_LOCAL_BUF_SIZE];
req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
/* Check if response is not send yet. */
if (nxt_slow_path(req->response_buf)) {
/* Enable content in headers buf. */
rc = nxt_unit_response_add_content(req, "", 0);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
nxt_unit_req_error(req, "Failed to add piggyback content");
return rc;
}
buf = req->response_buf;
while (buf->end - buf->free > 0) {
n = read_info->read(read_info, buf->free, buf->end - buf->free);
if (nxt_slow_path(n < 0)) {
nxt_unit_req_error(req, "Read error");
return NXT_UNIT_ERROR;
}
/* Manually increase sizes. */
buf->free += n;
req->response->piggyback_content_length += n;
if (read_info->eof) {
break;
}
}
rc = nxt_unit_response_send(req);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
nxt_unit_req_error(req, "Failed to send headers with content");
return rc;
}
if (read_info->eof) {
return NXT_UNIT_OK;
}
}
while (!read_info->eof) {
nxt_unit_req_debug(req, "write_cb, alloc %"PRIu32"",
read_info->buf_size);
buf_size = nxt_min(read_info->buf_size, PORT_MMAP_DATA_SIZE);
rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process,
&req->response_port,
buf_size, buf_size,
&mmap_buf, local_buf);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
return rc;
}
buf = &mmap_buf.buf;
while (!read_info->eof && buf->end > buf->free) {
n = read_info->read(read_info, buf->free, buf->end - buf->free);
if (nxt_slow_path(n < 0)) {
nxt_unit_req_error(req, "Read error");
nxt_unit_free_outgoing_buf(&mmap_buf);
return NXT_UNIT_ERROR;
}
buf->free += n;
}
rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, &mmap_buf, 0);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
nxt_unit_req_error(req, "Failed to send content");
return rc;
}
}
return NXT_UNIT_OK;
}
ssize_t
nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, size_t size)
{
return nxt_unit_buf_read(&req->content_buf, &req->content_length,
dst, size);
}
ssize_t
nxt_unit_request_readline_size(nxt_unit_request_info_t *req, size_t max_size)
{
char *p;
size_t l_size, b_size;
nxt_unit_buf_t *b;
if (req->content_length == 0) {
return 0;
}
l_size = 0;
b = req->content_buf;
while (b != NULL) {
b_size = b->end - b->free;
p = memchr(b->free, '\n', b_size);
if (p != NULL) {
p++;
l_size += p - b->free;
break;
}
l_size += b_size;
if (max_size <= l_size) {
break;
}
b = nxt_unit_buf_next(b);
}
return nxt_min(max_size, l_size);
}
static ssize_t
nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst, size_t size)
{
u_char *p;
size_t rest, copy, read;
nxt_unit_buf_t *buf;
p = dst;
rest = size;
buf = *b;
while (buf != NULL) {
copy = buf->end - buf->free;
copy = nxt_min(rest, copy);
p = nxt_cpymem(p, buf->free, copy);
buf->free += copy;
rest -= copy;
if (rest == 0) {
if (buf->end == buf->free) {
buf = nxt_unit_buf_next(buf);
}
break;
}
buf = nxt_unit_buf_next(buf);
}
*b = buf;
read = size - rest;
*len -= read;
return read;
}
void
nxt_unit_request_done(nxt_unit_request_info_t *req, int rc)
{
ssize_t res;
uint32_t size;
nxt_port_msg_t msg;
nxt_unit_impl_t *lib;
nxt_unit_request_info_impl_t *req_impl;
req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
nxt_unit_req_debug(req, "done: %d", rc);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
goto skip_response_send;
}
if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
size = nxt_length("Content-Type") + nxt_length("text/plain");
rc = nxt_unit_response_init(req, 200, 1, size);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
goto skip_response_send;
}
rc = nxt_unit_response_add_field(req, "Content-Type",
nxt_length("Content-Type"),
"text/plain", nxt_length("text/plain"));
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
goto skip_response_send;
}
}
if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_SENT)) {
req_impl->state = NXT_UNIT_RS_RESPONSE_SENT;
nxt_unit_buf_send_done(req->response_buf);
return;
}
skip_response_send:
lib = nxt_container_of(req->unit, nxt_unit_impl_t, unit);
msg.stream = req_impl->stream;
msg.pid = lib->pid;
msg.reply_port = 0;
msg.type = (rc == NXT_UNIT_OK) ? _NXT_PORT_MSG_DATA
: _NXT_PORT_MSG_RPC_ERROR;
msg.last = 1;
msg.mmap = 0;
msg.nf = 0;
msg.mf = 0;
msg.tracking = 0;
res = lib->callbacks.port_send(req->ctx, &req->response_port,
&msg, sizeof(msg), NULL, 0);
if (nxt_slow_path(res != sizeof(msg))) {
nxt_unit_req_alert(req, "last message send failed: %s (%d)",
strerror(errno), errno);
}
nxt_unit_request_info_release(req);
}
int
nxt_unit_websocket_send(nxt_unit_request_info_t *req, uint8_t opcode,
uint8_t last, const void *start, size_t size)
{
const struct iovec iov = { (void *) start, size };
return nxt_unit_websocket_sendv(req, opcode, last, &iov, 1);
}
int
nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode,
uint8_t last, const struct iovec *iov, int iovcnt)
{
int i, rc;
size_t l, copy;
uint32_t payload_len, buf_size, alloc_size;
const uint8_t *b;
nxt_unit_buf_t *buf;
nxt_unit_mmap_buf_t mmap_buf;
nxt_websocket_header_t *wh;
nxt_unit_request_info_impl_t *req_impl;
char local_buf[NXT_UNIT_LOCAL_BUF_SIZE];
req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
payload_len = 0;
for (i = 0; i < iovcnt; i++) {
payload_len += iov[i].iov_len;
}
buf_size = 10 + payload_len;
alloc_size = nxt_min(buf_size, PORT_MMAP_DATA_SIZE);
rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process,
&req->response_port,
alloc_size, alloc_size,
&mmap_buf, local_buf);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
return rc;
}
buf = &mmap_buf.buf;
buf->start[0] = 0;
buf->start[1] = 0;
buf_size -= buf->end - buf->start;
wh = (void *) buf->free;
buf->free = nxt_websocket_frame_init(wh, payload_len);
wh->fin = last;
wh->opcode = opcode;
for (i = 0; i < iovcnt; i++) {
b = iov[i].iov_base;
l = iov[i].iov_len;
while (l > 0) {
copy = buf->end - buf->free;
copy = nxt_min(l, copy);
buf->free = nxt_cpymem(buf->free, b, copy);
b += copy;
l -= copy;
if (l > 0) {
if (nxt_fast_path(buf->free > buf->start)) {
rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream,
&mmap_buf, 0);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
return rc;
}
}
alloc_size = nxt_min(buf_size, PORT_MMAP_DATA_SIZE);
rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process,
&req->response_port,
alloc_size, alloc_size,
&mmap_buf, local_buf);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
return rc;
}
buf_size -= buf->end - buf->start;
}
}
}
if (buf->free > buf->start) {
rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream,
&mmap_buf, 0);
}
return rc;
}
ssize_t
nxt_unit_websocket_read(nxt_unit_websocket_frame_t *ws, void *dst,
size_t size)
{
ssize_t res;
uint8_t *b;
uint64_t i, d;
res = nxt_unit_buf_read(&ws->content_buf, &ws->content_length,
dst, size);
if (ws->mask == NULL) {
return res;
}
b = dst;
d = (ws->payload_len - ws->content_length - res) % 4;
for (i = 0; i < (uint64_t) res; i++) {
b[i] ^= ws->mask[ (i + d) % 4 ];
}
return res;
}
int
nxt_unit_websocket_retain(nxt_unit_websocket_frame_t *ws)
{
char *b;
size_t size;
nxt_unit_websocket_frame_impl_t *ws_impl;
ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws);
if (ws_impl->buf->free_ptr != NULL || ws_impl->buf->hdr != NULL) {
return NXT_UNIT_OK;
}
size = ws_impl->buf->buf.end - ws_impl->buf->buf.start;
b = malloc(size);
if (nxt_slow_path(b == NULL)) {
return NXT_UNIT_ERROR;
}
memcpy(b, ws_impl->buf->buf.start, size);
ws_impl->buf->buf.start = b;
ws_impl->buf->buf.free = b;
ws_impl->buf->buf.end = b + size;
ws_impl->buf->free_ptr = b;
return NXT_UNIT_OK;
}
void
nxt_unit_websocket_done(nxt_unit_websocket_frame_t *ws)
{
nxt_unit_websocket_frame_release(ws);
}
static nxt_port_mmap_header_t *
nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
nxt_unit_port_id_t *port_id, nxt_chunk_id_t *c, int *n, int min_n)
{
int res, nchunks, i;
uint32_t outgoing_size;
nxt_unit_mmap_t *mm, *mm_end;
nxt_unit_impl_t *lib;
nxt_port_mmap_header_t *hdr;
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
pthread_mutex_lock(&process->outgoing.mutex);
retry:
outgoing_size = process->outgoing.size;
mm_end = process->outgoing.elts + outgoing_size;
for (mm = process->outgoing.elts; mm < mm_end; mm++) {
hdr = mm->hdr;
if (hdr->sent_over != 0xFFFFu && hdr->sent_over != port_id->id) {
continue;
}
*c = 0;
while (nxt_port_mmap_get_free_chunk(hdr->free_map, c)) {
nchunks = 1;
while (nchunks < *n) {
res = nxt_port_mmap_chk_set_chunk_busy(hdr->free_map,
*c + nchunks);
if (res == 0) {
if (nchunks >= min_n) {
*n = nchunks;
goto unlock;
}
for (i = 0; i < nchunks; i++) {
nxt_port_mmap_set_chunk_free(hdr->free_map, *c + i);
}
*c += nchunks + 1;
nchunks = 0;
break;
}
nchunks++;
}
if (nchunks >= min_n) {
*n = nchunks;
goto unlock;
}
}
hdr->oosm = 1;
}
if (outgoing_size >= lib->shm_mmap_limit) {
/* Cannot allocate more shared memory. */
pthread_mutex_unlock(&process->outgoing.mutex);
if (min_n == 0) {
*n = 0;
}
if (nxt_slow_path(process->outgoing.allocated_chunks + min_n
>= lib->shm_mmap_limit * PORT_MMAP_CHUNK_COUNT))
{
/* Memory allocated by application, but not send to router. */
return NULL;
}
/* Notify router about OOSM condition. */
res = nxt_unit_send_oosm(ctx, port_id);
if (nxt_slow_path(res != NXT_UNIT_OK)) {
return NULL;
}
/* Return if caller can handle OOSM condition. Non-blocking mode. */
if (min_n == 0) {
return NULL;
}
nxt_unit_debug(ctx, "oosm: waiting for ACK");
res = nxt_unit_wait_shm_ack(ctx);
if (nxt_slow_path(res != NXT_UNIT_OK)) {
return NULL;
}
nxt_unit_debug(ctx, "oosm: retry");
pthread_mutex_lock(&process->outgoing.mutex);
goto retry;
}
*c = 0;
hdr = nxt_unit_new_mmap(ctx, process, port_id, *n);
unlock:
nxt_atomic_fetch_add(&process->outgoing.allocated_chunks, *n);
nxt_unit_debug(ctx, "process %d allocated_chunks %d",
process->pid,
process->outgoing.allocated_chunks);
pthread_mutex_unlock(&process->outgoing.mutex);
return hdr;
}
static int
nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id)
{
ssize_t res;
nxt_port_msg_t msg;
nxt_unit_impl_t *lib;
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
msg.stream = 0;
msg.pid = lib->pid;
msg.reply_port = 0;
msg.type = _NXT_PORT_MSG_OOSM;
msg.last = 0;
msg.mmap = 0;
msg.nf = 0;
msg.mf = 0;
msg.tracking = 0;
res = lib->callbacks.port_send(ctx, port_id, &msg, sizeof(msg), NULL, 0);
if (nxt_slow_path(res != sizeof(msg))) {
nxt_unit_warn(ctx, "failed to send oosm to %d: %s (%d)",
(int) port_id->pid, strerror(errno), errno);
return NXT_UNIT_ERROR;
}
return NXT_UNIT_OK;
}
static int
nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx)
{
nxt_port_msg_t *port_msg;
nxt_unit_ctx_impl_t *ctx_impl;
nxt_unit_read_buf_t *rbuf;
ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
while (1) {
rbuf = nxt_unit_read_buf_get(ctx);
if (nxt_slow_path(rbuf == NULL)) {
return NXT_UNIT_ERROR;
}
nxt_unit_read_buf(ctx, rbuf);
if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) {
nxt_unit_read_buf_release(ctx, rbuf);
return NXT_UNIT_ERROR;
}
port_msg = (nxt_port_msg_t *) rbuf->buf;
if (port_msg->type == _NXT_PORT_MSG_SHM_ACK) {
nxt_unit_read_buf_release(ctx, rbuf);
break;
}
pthread_mutex_lock(&ctx_impl->mutex);
*ctx_impl->pending_read_tail = rbuf;
ctx_impl->pending_read_tail = &rbuf->next;
rbuf->next = NULL;
pthread_mutex_unlock(&ctx_impl->mutex);
if (port_msg->type == _NXT_PORT_MSG_QUIT) {
nxt_unit_debug(ctx, "oosm: quit received");
return NXT_UNIT_ERROR;
}
}
return NXT_UNIT_OK;
}
static nxt_unit_mmap_t *
nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i)
{
uint32_t cap;
cap = mmaps->cap;
if (cap == 0) {
cap = i + 1;
}
while (i + 1 > cap) {
if (cap < 16) {
cap = cap * 2;
} else {
cap = cap + cap / 2;
}
}
if (cap != mmaps->cap) {
mmaps->elts = realloc(mmaps->elts, cap * sizeof(*mmaps->elts));
if (nxt_slow_path(mmaps->elts == NULL)) {
return NULL;
}
memset(mmaps->elts + mmaps->cap, 0,
sizeof(*mmaps->elts) * (cap - mmaps->cap));
mmaps->cap = cap;
}
if (i + 1 > mmaps->size) {
mmaps->size = i + 1;
}
return mmaps->elts + i;
}
static nxt_port_mmap_header_t *
nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
nxt_unit_port_id_t *port_id, int n)
{
int i, fd, rc;
void *mem;
char name[64];
nxt_unit_mmap_t *mm;
nxt_unit_impl_t *lib;
nxt_port_mmap_header_t *hdr;
lib = process->lib;
mm = nxt_unit_mmap_at(&process->outgoing, process->outgoing.size);
if (nxt_slow_path(mm == NULL)) {
nxt_unit_warn(ctx, "failed to add mmap to outgoing array");
return NULL;
}
snprintf(name, sizeof(name), NXT_SHM_PREFIX "unit.%d.%p",
lib->pid, (void *) pthread_self());
#if (NXT_HAVE_MEMFD_CREATE)
fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC);
if (nxt_slow_path(fd == -1)) {
nxt_unit_alert(ctx, "memfd_create(%s) failed: %s (%d)", name,
strerror(errno), errno);
goto remove_fail;
}
nxt_unit_debug(ctx, "memfd_create(%s): %d", name, fd);
#elif (NXT_HAVE_SHM_OPEN_ANON)
fd = shm_open(SHM_ANON, O_RDWR, S_IRUSR | S_IWUSR);
if (nxt_slow_path(fd == -1)) {
nxt_unit_alert(ctx, "shm_open(SHM_ANON) failed: %s (%d)",
strerror(errno), errno);
goto remove_fail;
}
#elif (NXT_HAVE_SHM_OPEN)
/* Just in case. */
shm_unlink(name);
fd = shm_open(name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR);
if (nxt_slow_path(fd == -1)) {
nxt_unit_alert(ctx, "shm_open(%s) failed: %s (%d)", name,
strerror(errno), errno);
goto remove_fail;
}
if (nxt_slow_path(shm_unlink(name) == -1)) {
nxt_unit_warn(ctx, "shm_unlink(%s) failed: %s (%d)", name,
strerror(errno), errno);
}
#else
#error No working shared memory implementation.
#endif
if (nxt_slow_path(ftruncate(fd, PORT_MMAP_SIZE) == -1)) {
nxt_unit_alert(ctx, "ftruncate(%d) failed: %s (%d)", fd,
strerror(errno), errno);
goto remove_fail;
}
mem = mmap(NULL, PORT_MMAP_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
if (nxt_slow_path(mem == MAP_FAILED)) {
nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", fd,
strerror(errno), errno);
goto remove_fail;
}
mm->hdr = mem;
hdr = mem;
memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map));
memset(hdr->free_tracking_map, 0xFFU, sizeof(hdr->free_tracking_map));
hdr->id = process->outgoing.size - 1;
hdr->src_pid = lib->pid;
hdr->dst_pid = process->pid;
hdr->sent_over = port_id->id;
/* Mark first n chunk(s) as busy */
for (i = 0; i < n; i++) {
nxt_port_mmap_set_chunk_busy(hdr->free_map, i);
}
/* Mark as busy chunk followed the last available chunk. */
nxt_port_mmap_set_chunk_busy(hdr->free_map, PORT_MMAP_CHUNK_COUNT);
nxt_port_mmap_set_chunk_busy(hdr->free_tracking_map, PORT_MMAP_CHUNK_COUNT);
pthread_mutex_unlock(&process->outgoing.mutex);
rc = nxt_unit_send_mmap(ctx, port_id, fd);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
munmap(mem, PORT_MMAP_SIZE);
hdr = NULL;
} else {
nxt_unit_debug(ctx, "new mmap #%"PRIu32" created for %d -> %d",
hdr->id, (int) lib->pid, (int) process->pid);
}
close(fd);
pthread_mutex_lock(&process->outgoing.mutex);
if (nxt_fast_path(hdr != NULL)) {
return hdr;
}
remove_fail:
process->outgoing.size--;
return NULL;
}
static int
nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int fd)
{
ssize_t res;
nxt_port_msg_t msg;
nxt_unit_impl_t *lib;
union {
struct cmsghdr cm;
char space[CMSG_SPACE(sizeof(int))];
} cmsg;
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
msg.stream = 0;
msg.pid = lib->pid;
msg.reply_port = 0;
msg.type = _NXT_PORT_MSG_MMAP;
msg.last = 0;
msg.mmap = 0;
msg.nf = 0;
msg.mf = 0;
msg.tracking = 0;
/*
* Fill all padding fields with 0.
* Code in Go 1.11 validate cmsghdr using padding field as part of len.
* See Cmsghdr definition and socketControlMessageHeaderAndData function.
*/
memset(&cmsg, 0, sizeof(cmsg));
cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int));
cmsg.cm.cmsg_level = SOL_SOCKET;
cmsg.cm.cmsg_type = SCM_RIGHTS;
/*
* memcpy() is used instead of simple
* *(int *) CMSG_DATA(&cmsg.cm) = fd;
* because GCC 4.4 with -O2/3/s optimization may issue a warning:
* dereferencing type-punned pointer will break strict-aliasing rules
*
* Fortunately, GCC with -O1 compiles this nxt_memcpy()
* in the same simple assignment as in the code above.
*/
memcpy(CMSG_DATA(&cmsg.cm), &fd, sizeof(int));
res = lib->callbacks.port_send(ctx, port_id, &msg, sizeof(msg),
&cmsg, sizeof(cmsg));
if (nxt_slow_path(res != sizeof(msg))) {
nxt_unit_warn(ctx, "failed to send shm to %d: %s (%d)",
(int) port_id->pid, strerror(errno), errno);
return NXT_UNIT_ERROR;
}
return NXT_UNIT_OK;
}
static int
nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
nxt_unit_port_id_t *port_id, uint32_t size, uint32_t min_size,
nxt_unit_mmap_buf_t *mmap_buf, char *local_buf)
{
int nchunks, min_nchunks;
nxt_chunk_id_t c;
nxt_port_mmap_header_t *hdr;
if (size <= NXT_UNIT_MAX_PLAIN_SIZE) {
if (local_buf != NULL) {
mmap_buf->free_ptr = NULL;
mmap_buf->plain_ptr = local_buf;
} else {
mmap_buf->free_ptr = malloc(size + sizeof(nxt_port_msg_t));
if (nxt_slow_path(mmap_buf->free_ptr == NULL)) {
return NXT_UNIT_ERROR;
}
mmap_buf->plain_ptr = mmap_buf->free_ptr;
}
mmap_buf->hdr = NULL;
mmap_buf->buf.start = mmap_buf->plain_ptr + sizeof(nxt_port_msg_t);
mmap_buf->buf.free = mmap_buf->buf.start;
mmap_buf->buf.end = mmap_buf->buf.start + size;
mmap_buf->port_id = *port_id;
mmap_buf->process = process;
nxt_unit_debug(ctx, "outgoing plain buffer allocation: (%p, %d)",
mmap_buf->buf.start, (int) size);
return NXT_UNIT_OK;
}
nchunks = (size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE;
min_nchunks = (min_size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE;
hdr = nxt_unit_mmap_get(ctx, process, port_id, &c, &nchunks, min_nchunks);
if (nxt_slow_path(hdr == NULL)) {
if (nxt_fast_path(min_nchunks == 0 && nchunks == 0)) {
mmap_buf->hdr = NULL;
mmap_buf->buf.start = NULL;
mmap_buf->buf.free = NULL;
mmap_buf->buf.end = NULL;
mmap_buf->free_ptr = NULL;
return NXT_UNIT_OK;
}
return NXT_UNIT_ERROR;
}
mmap_buf->hdr = hdr;
mmap_buf->buf.start = (char *) nxt_port_mmap_chunk_start(hdr, c);
mmap_buf->buf.free = mmap_buf->buf.start;
mmap_buf->buf.end = mmap_buf->buf.start + nchunks * PORT_MMAP_CHUNK_SIZE;
mmap_buf->port_id = *port_id;
mmap_buf->process = process;
mmap_buf->free_ptr = NULL;
mmap_buf->ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
nxt_unit_debug(ctx, "outgoing mmap allocation: (%d,%d,%d)",
(int) hdr->id, (int) c,
(int) (nchunks * PORT_MMAP_CHUNK_SIZE));
return NXT_UNIT_OK;
}
static int
nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd)
{
int rc;
void *mem;
struct stat mmap_stat;
nxt_unit_mmap_t *mm;
nxt_unit_impl_t *lib;
nxt_unit_process_t *process;
nxt_port_mmap_header_t *hdr;
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
nxt_unit_debug(ctx, "incoming_mmap: fd %d from process %d", fd, (int) pid);
pthread_mutex_lock(&lib->mutex);
process = nxt_unit_process_find(ctx, pid, 0);
pthread_mutex_unlock(&lib->mutex);
if (nxt_slow_path(process == NULL)) {
nxt_unit_warn(ctx, "incoming_mmap: process %d not found, fd %d",
(int) pid, fd);
return NXT_UNIT_ERROR;
}
rc = NXT_UNIT_ERROR;
if (fstat(fd, &mmap_stat) == -1) {
nxt_unit_warn(ctx, "incoming_mmap: fstat(%d) failed: %s (%d)", fd,
strerror(errno), errno);
goto fail;
}
mem = mmap(NULL, mmap_stat.st_size, PROT_READ | PROT_WRITE,
MAP_SHARED, fd, 0);
if (nxt_slow_path(mem == MAP_FAILED)) {
nxt_unit_warn(ctx, "incoming_mmap: mmap() failed: %s (%d)",
strerror(errno), errno);
goto fail;
}
hdr = mem;
if (nxt_slow_path(hdr->src_pid != pid || hdr->dst_pid != lib->pid)) {
nxt_unit_warn(ctx, "incoming_mmap: unexpected pid in mmap header "
"detected: %d != %d or %d != %d", (int) hdr->src_pid,
(int) pid, (int) hdr->dst_pid, (int) lib->pid);
munmap(mem, PORT_MMAP_SIZE);
goto fail;
}
pthread_mutex_lock(&process->incoming.mutex);
mm = nxt_unit_mmap_at(&process->incoming, hdr->id);
if (nxt_slow_path(mm == NULL)) {
nxt_unit_warn(ctx, "incoming_mmap: failed to add to incoming array");
munmap(mem, PORT_MMAP_SIZE);
} else {
mm->hdr = hdr;
hdr->sent_over = 0xFFFFu;
rc = NXT_UNIT_OK;
}
pthread_mutex_unlock(&process->incoming.mutex);
fail:
nxt_unit_process_use(ctx, process, -1);
return rc;
}
static void
nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps)
{
pthread_mutex_init(&mmaps->mutex, NULL);
mmaps->size = 0;
mmaps->cap = 0;
mmaps->elts = NULL;
mmaps->allocated_chunks = 0;
}
static void
nxt_unit_process_use(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, int i)
{
long c;
c = nxt_atomic_fetch_add(&process->use_count, i);
if (i < 0 && c == -i) {
nxt_unit_debug(ctx, "destroy process #%d", (int) process->pid);
nxt_unit_mmaps_destroy(&process->incoming);
nxt_unit_mmaps_destroy(&process->outgoing);
free(process);
}
}
static void
nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps)
{
nxt_unit_mmap_t *mm, *end;
if (mmaps->elts != NULL) {
end = mmaps->elts + mmaps->size;
for (mm = mmaps->elts; mm < end; mm++) {
munmap(mm->hdr, PORT_MMAP_SIZE);
}
free(mmaps->elts);
}
pthread_mutex_destroy(&mmaps->mutex);
}
static nxt_port_mmap_header_t *
nxt_unit_get_incoming_mmap(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
uint32_t id)
{
nxt_port_mmap_header_t *hdr;
if (nxt_fast_path(process->incoming.size > id)) {
hdr = process->incoming.elts[id].hdr;
} else {
hdr = NULL;
}
return hdr;
}
static int
nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
{
int rc;
nxt_chunk_id_t c;
nxt_unit_process_t *process;
nxt_port_mmap_header_t *hdr;
nxt_port_mmap_tracking_msg_t *tracking_msg;
if (recv_msg->size < (int) sizeof(nxt_port_mmap_tracking_msg_t)) {
nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: too small message (%d)",
recv_msg->stream, (int) recv_msg->size);
return 0;
}
tracking_msg = recv_msg->start;
recv_msg->start = tracking_msg + 1;
recv_msg->size -= sizeof(nxt_port_mmap_tracking_msg_t);
process = nxt_unit_msg_get_process(ctx, recv_msg);
if (nxt_slow_path(process == NULL)) {
return 0;
}
pthread_mutex_lock(&process->incoming.mutex);
hdr = nxt_unit_get_incoming_mmap(ctx, process, tracking_msg->mmap_id);
if (nxt_slow_path(hdr == NULL)) {
pthread_mutex_unlock(&process->incoming.mutex);
nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: "
"invalid mmap id %d,%"PRIu32,
recv_msg->stream, (int) process->pid,
tracking_msg->mmap_id);
return 0;
}
c = tracking_msg->tracking_id;
rc = nxt_atomic_cmp_set(hdr->tracking + c, recv_msg->stream, 0);
if (rc == 0) {
nxt_unit_debug(ctx, "#%"PRIu32": tracking cancelled",
recv_msg->stream);
nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c);
}
pthread_mutex_unlock(&process->incoming.mutex);
return rc;
}
static int
nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
{
void *start;
uint32_t size;
nxt_unit_process_t *process;
nxt_unit_mmap_buf_t *b, **incoming_tail;
nxt_port_mmap_msg_t *mmap_msg, *end;
nxt_port_mmap_header_t *hdr;
if (nxt_slow_path(recv_msg->size < sizeof(nxt_port_mmap_msg_t))) {
nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: too small message (%d)",
recv_msg->stream, (int) recv_msg->size);
return NXT_UNIT_ERROR;
}
process = nxt_unit_msg_get_process(ctx, recv_msg);
if (nxt_slow_path(process == NULL)) {
return NXT_UNIT_ERROR;
}
mmap_msg = recv_msg->start;
end = nxt_pointer_to(recv_msg->start, recv_msg->size);
incoming_tail = &recv_msg->incoming_buf;
for (; mmap_msg < end; mmap_msg++) {
b = nxt_unit_mmap_buf_get(ctx);
if (nxt_slow_path(b == NULL)) {
nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: failed to allocate buf",
recv_msg->stream);
return NXT_UNIT_ERROR;
}
nxt_unit_mmap_buf_insert(incoming_tail, b);
incoming_tail = &b->next;
}
b = recv_msg->incoming_buf;
mmap_msg = recv_msg->start;
pthread_mutex_lock(&process->incoming.mutex);
for (; mmap_msg < end; mmap_msg++) {
hdr = nxt_unit_get_incoming_mmap(ctx, process, mmap_msg->mmap_id);
if (nxt_slow_path(hdr == NULL)) {
pthread_mutex_unlock(&process->incoming.mutex);
nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: "
"invalid mmap id %d,%"PRIu32,
recv_msg->stream, (int) process->pid,
mmap_msg->mmap_id);
return NXT_UNIT_ERROR;
}
start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id);
size = mmap_msg->size;
if (recv_msg->start == mmap_msg) {
recv_msg->start = start;
recv_msg->size = size;
}
b->buf.start = start;
b->buf.free = start;
b->buf.end = b->buf.start + size;
b->hdr = hdr;
b->process = process;
b = b->next;
nxt_unit_debug(ctx, "#%"PRIu32": mmap_read: [%p,%d] %d->%d,(%d,%d,%d)",
recv_msg->stream,
start, (int) size,
(int) hdr->src_pid, (int) hdr->dst_pid,
(int) hdr->id, (int) mmap_msg->chunk_id,
(int) mmap_msg->size);
}
pthread_mutex_unlock(&process->incoming.mutex);
return NXT_UNIT_OK;
}
static void
nxt_unit_mmap_release(nxt_unit_ctx_t *ctx,
nxt_unit_process_t *process, nxt_port_mmap_header_t *hdr,
void *start, uint32_t size)
{
int freed_chunks;
u_char *p, *end;
nxt_chunk_id_t c;
nxt_unit_impl_t *lib;
memset(start, 0xA5, size);
p = start;
end = p + size;
c = nxt_port_mmap_chunk_id(hdr, p);
freed_chunks = 0;
while (p < end) {
nxt_port_mmap_set_chunk_free(hdr->free_map, c);
p += PORT_MMAP_CHUNK_SIZE;
c++;
freed_chunks++;
}
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
if (hdr->src_pid == lib->pid && freed_chunks != 0) {
nxt_atomic_fetch_add(&process->outgoing.allocated_chunks,
-freed_chunks);
nxt_unit_debug(ctx, "process %d allocated_chunks %d",
process->pid,
process->outgoing.allocated_chunks);
}
if (hdr->dst_pid == lib->pid
&& freed_chunks != 0
&& nxt_atomic_cmp_set(&hdr->oosm, 1, 0))
{
nxt_unit_send_shm_ack(ctx, hdr->src_pid);
}
}
static int
nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid)
{
ssize_t res;
nxt_port_msg_t msg;
nxt_unit_impl_t *lib;
nxt_unit_port_id_t port_id;
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
nxt_unit_port_id_init(&port_id, pid, 0);
msg.stream = 0;
msg.pid = lib->pid;
msg.reply_port = 0;
msg.type = _NXT_PORT_MSG_SHM_ACK;
msg.last = 0;
msg.mmap = 0;
msg.nf = 0;
msg.mf = 0;
msg.tracking = 0;
res = lib->callbacks.port_send(ctx, &port_id, &msg, sizeof(msg), NULL, 0);
if (nxt_slow_path(res != sizeof(msg))) {
nxt_unit_warn(ctx, "failed to send ack to %d: %s (%d)",
(int) port_id.pid, strerror(errno), errno);
return NXT_UNIT_ERROR;
}
return NXT_UNIT_OK;
}
static nxt_int_t
nxt_unit_lvlhsh_pid_test(nxt_lvlhsh_query_t *lhq, void *data)
{
nxt_process_t *process;
process = data;
if (lhq->key.length == sizeof(pid_t)
&& *(pid_t *) lhq->key.start == process->pid)
{
return NXT_OK;
}
return NXT_DECLINED;
}
static const nxt_lvlhsh_proto_t lvlhsh_processes_proto nxt_aligned(64) = {
NXT_LVLHSH_DEFAULT,
nxt_unit_lvlhsh_pid_test,
nxt_lvlhsh_alloc,
nxt_lvlhsh_free,
};
static inline void
nxt_unit_process_lhq_pid(nxt_lvlhsh_query_t *lhq, pid_t *pid)
{
lhq->key_hash = nxt_murmur_hash2(pid, sizeof(*pid));
lhq->key.length = sizeof(*pid);
lhq->key.start = (u_char *) pid;
lhq->proto = &lvlhsh_processes_proto;
}
static nxt_unit_process_t *
nxt_unit_process_get(nxt_unit_ctx_t *ctx, pid_t pid)
{
nxt_unit_impl_t *lib;
nxt_unit_process_t *process;
nxt_lvlhsh_query_t lhq;
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
nxt_unit_process_lhq_pid(&lhq, &pid);
if (nxt_lvlhsh_find(&lib->processes, &lhq) == NXT_OK) {
process = lhq.value;
nxt_unit_process_use(ctx, process, 1);
return process;
}
process = malloc(sizeof(nxt_unit_process_t));
if (nxt_slow_path(process == NULL)) {
nxt_unit_warn(ctx, "failed to allocate process for #%d", (int) pid);
return NULL;
}
process->pid = pid;
process->use_count = 1;
process->next_port_id = 0;
process->lib = lib;
nxt_queue_init(&process->ports);
nxt_unit_mmaps_init(&process->incoming);
nxt_unit_mmaps_init(&process->outgoing);
lhq.replace = 0;
lhq.value = process;
switch (nxt_lvlhsh_insert(&lib->processes, &lhq)) {
case NXT_OK:
break;
default:
nxt_unit_warn(ctx, "process %d insert failed", (int) pid);
pthread_mutex_destroy(&process->outgoing.mutex);
pthread_mutex_destroy(&process->incoming.mutex);
free(process);
process = NULL;
break;
}
nxt_unit_process_use(ctx, process, 1);
return process;
}
static nxt_unit_process_t *
nxt_unit_process_find(nxt_unit_ctx_t *ctx, pid_t pid, int remove)
{
int rc;
nxt_unit_impl_t *lib;
nxt_unit_process_t *process;
nxt_lvlhsh_query_t lhq;
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
nxt_unit_process_lhq_pid(&lhq, &pid);
if (remove) {
rc = nxt_lvlhsh_delete(&lib->processes, &lhq);
} else {
rc = nxt_lvlhsh_find(&lib->processes, &lhq);
}
if (rc == NXT_OK) {
process = lhq.value;
if (!remove) {
nxt_unit_process_use(ctx, process, 1);
}
return process;
}
return NULL;
}
static nxt_unit_process_t *
nxt_unit_process_pop_first(nxt_unit_impl_t *lib)
{
return nxt_lvlhsh_retrieve(&lib->processes, &lvlhsh_processes_proto, NULL);
}
int
nxt_unit_run(nxt_unit_ctx_t *ctx)
{
int rc;
nxt_unit_impl_t *lib;
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
rc = NXT_UNIT_OK;
while (nxt_fast_path(lib->online)) {
rc = nxt_unit_run_once(ctx);
}
return rc;
}
int
nxt_unit_run_once(nxt_unit_ctx_t *ctx)
{
int rc;
nxt_unit_ctx_impl_t *ctx_impl;
nxt_unit_read_buf_t *rbuf;
ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
pthread_mutex_lock(&ctx_impl->mutex);
if (ctx_impl->pending_read_head != NULL) {
rbuf = ctx_impl->pending_read_head;
ctx_impl->pending_read_head = rbuf->next;
if (ctx_impl->pending_read_tail == &rbuf->next) {
ctx_impl->pending_read_tail = &ctx_impl->pending_read_head;
}
pthread_mutex_unlock(&ctx_impl->mutex);
} else {
rbuf = nxt_unit_read_buf_get_impl(ctx_impl);
if (nxt_slow_path(rbuf == NULL)) {
return NXT_UNIT_ERROR;
}
nxt_unit_read_buf(ctx, rbuf);
}
if (nxt_fast_path(rbuf->size > 0)) {
rc = nxt_unit_process_msg(ctx, &ctx_impl->read_port_id,
rbuf->buf, rbuf->size,
rbuf->oob, sizeof(rbuf->oob));
#if (NXT_DEBUG)
memset(rbuf->buf, 0xAC, rbuf->size);
#endif
} else {
rc = NXT_UNIT_ERROR;
}
nxt_unit_read_buf_release(ctx, rbuf);
return rc;
}
static void
nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
{
nxt_unit_impl_t *lib;
nxt_unit_ctx_impl_t *ctx_impl;
ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
memset(rbuf->oob, 0, sizeof(struct cmsghdr));
if (ctx_impl->read_port_fd != -1) {
rbuf->size = nxt_unit_port_recv(ctx, ctx_impl->read_port_fd,
rbuf->buf, sizeof(rbuf->buf),
rbuf->oob, sizeof(rbuf->oob));
} else {
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
rbuf->size = lib->callbacks.port_recv(ctx, &ctx_impl->read_port_id,
rbuf->buf, sizeof(rbuf->buf),
rbuf->oob, sizeof(rbuf->oob));
}
}
void
nxt_unit_done(nxt_unit_ctx_t *ctx)
{
nxt_unit_impl_t *lib;
nxt_unit_process_t *process;
nxt_unit_ctx_impl_t *ctx_impl;
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
nxt_queue_each(ctx_impl, &lib->contexts, nxt_unit_ctx_impl_t, link) {
nxt_unit_ctx_free(&ctx_impl->ctx);
} nxt_queue_loop;
for ( ;; ) {
pthread_mutex_lock(&lib->mutex);
process = nxt_unit_process_pop_first(lib);
if (process == NULL) {
pthread_mutex_unlock(&lib->mutex);
break;
}
nxt_unit_remove_process(ctx, process);
}
pthread_mutex_destroy(&lib->mutex);
free(lib);
}
nxt_unit_ctx_t *
nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data)
{
int rc, fd;
nxt_unit_impl_t *lib;
nxt_unit_port_id_t new_port_id;
nxt_unit_ctx_impl_t *new_ctx;
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
new_ctx = malloc(sizeof(nxt_unit_ctx_impl_t) + lib->request_data_size);
if (nxt_slow_path(new_ctx == NULL)) {
nxt_unit_warn(ctx, "failed to allocate context");
return NULL;
}
rc = nxt_unit_create_port(ctx, &new_port_id, &fd);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
free(new_ctx);
return NULL;
}
rc = nxt_unit_send_port(ctx, &lib->ready_port_id, &new_port_id, fd);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
lib->callbacks.remove_port(ctx, &new_port_id);
close(fd);
free(new_ctx);
return NULL;
}
close(fd);
rc = nxt_unit_ctx_init(lib, new_ctx, data);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
lib->callbacks.remove_port(ctx, &new_port_id);
free(new_ctx);
return NULL;
}
new_ctx->read_port_id = new_port_id;
return &new_ctx->ctx;
}
void
nxt_unit_ctx_free(nxt_unit_ctx_t *ctx)
{
nxt_unit_impl_t *lib;
nxt_unit_ctx_impl_t *ctx_impl;
nxt_unit_mmap_buf_t *mmap_buf;
nxt_unit_request_info_impl_t *req_impl;
nxt_unit_websocket_frame_impl_t *ws_impl;
ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
nxt_queue_each(req_impl, &ctx_impl->active_req,
nxt_unit_request_info_impl_t, link)
{
nxt_unit_req_warn(&req_impl->req, "active request on ctx free");
nxt_unit_request_done(&req_impl->req, NXT_UNIT_ERROR);
} nxt_queue_loop;
nxt_unit_mmap_buf_unlink(&ctx_impl->ctx_buf[0]);
nxt_unit_mmap_buf_unlink(&ctx_impl->ctx_buf[1]);
while (ctx_impl->free_buf != NULL) {
mmap_buf = ctx_impl->free_buf;
nxt_unit_mmap_buf_unlink(mmap_buf);
free(mmap_buf);
}
nxt_queue_each(req_impl, &ctx_impl->free_req,
nxt_unit_request_info_impl_t, link)
{
nxt_unit_request_info_free(req_impl);
} nxt_queue_loop;
nxt_queue_each(ws_impl, &ctx_impl->free_ws,
nxt_unit_websocket_frame_impl_t, link)
{
nxt_unit_websocket_frame_free(ws_impl);
} nxt_queue_loop;
pthread_mutex_destroy(&ctx_impl->mutex);
nxt_queue_remove(&ctx_impl->link);
if (ctx_impl != &lib->main_ctx) {
free(ctx_impl);
}
}
/* SOCK_SEQPACKET is disabled to test SOCK_DGRAM on all platforms. */
#if (0 || NXT_HAVE_AF_UNIX_SOCK_SEQPACKET)
#define NXT_UNIX_SOCKET SOCK_SEQPACKET
#else
#define NXT_UNIX_SOCKET SOCK_DGRAM
#endif
void
nxt_unit_port_id_init(nxt_unit_port_id_t *port_id, pid_t pid, uint16_t id)
{
nxt_unit_port_hash_id_t port_hash_id;
port_hash_id.pid = pid;
port_hash_id.id = id;
port_id->pid = pid;
port_id->hash = nxt_murmur_hash2(&port_hash_id, sizeof(port_hash_id));
port_id->id = id;
}
int
nxt_unit_create_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *dst,
nxt_unit_port_id_t *port_id)
{
int rc, fd;
nxt_unit_impl_t *lib;
nxt_unit_port_id_t new_port_id;
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
rc = nxt_unit_create_port(ctx, &new_port_id, &fd);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
return rc;
}
rc = nxt_unit_send_port(ctx, dst, &new_port_id, fd);
if (nxt_fast_path(rc == NXT_UNIT_OK)) {
*port_id = new_port_id;
} else {
lib->callbacks.remove_port(ctx, &new_port_id);
}
close(fd);
return rc;
}
static int
nxt_unit_create_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int *fd)
{
int rc, port_sockets[2];
nxt_unit_impl_t *lib;
nxt_unit_port_t new_port;
nxt_unit_process_t *process;
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
rc = socketpair(AF_UNIX, NXT_UNIX_SOCKET, 0, port_sockets);
if (nxt_slow_path(rc != 0)) {
nxt_unit_warn(ctx, "create_port: socketpair() failed: %s (%d)",
strerror(errno), errno);
return NXT_UNIT_ERROR;
}
nxt_unit_debug(ctx, "create_port: new socketpair: %d->%d",
port_sockets[0], port_sockets[1]);
pthread_mutex_lock(&lib->mutex);
process = nxt_unit_process_get(ctx, lib->pid);
if (nxt_slow_path(process == NULL)) {
pthread_mutex_unlock(&lib->mutex);
close(port_sockets[0]);
close(port_sockets[1]);
return NXT_UNIT_ERROR;
}
nxt_unit_port_id_init(&new_port.id, lib->pid, process->next_port_id++);
new_port.in_fd = port_sockets[0];
new_port.out_fd = -1;
new_port.data = NULL;
pthread_mutex_unlock(&lib->mutex);
nxt_unit_process_use(ctx, process, -1);
rc = lib->callbacks.add_port(ctx, &new_port);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
nxt_unit_warn(ctx, "create_port: add_port() failed");
close(port_sockets[0]);
close(port_sockets[1]);
return rc;
}
*port_id = new_port.id;
*fd = port_sockets[1];
return rc;
}
static int
nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *dst,
nxt_unit_port_id_t *new_port, int fd)
{
ssize_t res;
nxt_unit_impl_t *lib;
struct {
nxt_port_msg_t msg;
nxt_port_msg_new_port_t new_port;
} m;
union {
struct cmsghdr cm;
char space[CMSG_SPACE(sizeof(int))];
} cmsg;
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
m.msg.stream = 0;
m.msg.pid = lib->pid;
m.msg.reply_port = 0;
m.msg.type = _NXT_PORT_MSG_NEW_PORT;
m.msg.last = 0;
m.msg.mmap = 0;
m.msg.nf = 0;
m.msg.mf = 0;
m.msg.tracking = 0;
m.new_port.id = new_port->id;
m.new_port.pid = new_port->pid;
m.new_port.type = NXT_PROCESS_WORKER;
m.new_port.max_size = 16 * 1024;
m.new_port.max_share = 64 * 1024;
memset(&cmsg, 0, sizeof(cmsg));
cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int));
cmsg.cm.cmsg_level = SOL_SOCKET;
cmsg.cm.cmsg_type = SCM_RIGHTS;
/*
* memcpy() is used instead of simple
* *(int *) CMSG_DATA(&cmsg.cm) = fd;
* because GCC 4.4 with -O2/3/s optimization may issue a warning:
* dereferencing type-punned pointer will break strict-aliasing rules
*
* Fortunately, GCC with -O1 compiles this nxt_memcpy()
* in the same simple assignment as in the code above.
*/
memcpy(CMSG_DATA(&cmsg.cm), &fd, sizeof(int));
res = lib->callbacks.port_send(ctx, dst, &m, sizeof(m),
&cmsg, sizeof(cmsg));
return res == sizeof(m) ? NXT_UNIT_OK : NXT_UNIT_ERROR;
}
int
nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
{
int rc;
nxt_unit_impl_t *lib;
nxt_unit_process_t *process;
nxt_unit_port_impl_t *new_port;
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
nxt_unit_debug(ctx, "add_port: %d,%d in_fd %d out_fd %d",
port->id.pid, port->id.id,
port->in_fd, port->out_fd);
pthread_mutex_lock(&lib->mutex);
process = nxt_unit_process_get(ctx, port->id.pid);
if (nxt_slow_path(process == NULL)) {
rc = NXT_UNIT_ERROR;
goto unlock;
}
if (port->id.id >= process->next_port_id) {
process->next_port_id = port->id.id + 1;
}
new_port = malloc(sizeof(nxt_unit_port_impl_t));
if (nxt_slow_path(new_port == NULL)) {
rc = NXT_UNIT_ERROR;
goto unlock;
}
new_port->port = *port;
rc = nxt_unit_port_hash_add(&lib->ports, &new_port->port);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
goto unlock;
}
nxt_queue_insert_tail(&process->ports, &new_port->link);
rc = NXT_UNIT_OK;
new_port->process = process;
unlock:
pthread_mutex_unlock(&lib->mutex);
if (nxt_slow_path(process != NULL && rc != NXT_UNIT_OK)) {
nxt_unit_process_use(ctx, process, -1);
}
return rc;
}
void
nxt_unit_remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id)
{
nxt_unit_find_remove_port(ctx, port_id, NULL);
}
void
nxt_unit_find_remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
nxt_unit_port_t *r_port)
{
nxt_unit_impl_t *lib;
nxt_unit_process_t *process;
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
pthread_mutex_lock(&lib->mutex);
process = NULL;
nxt_unit_remove_port_unsafe(ctx, port_id, r_port, &process);
pthread_mutex_unlock(&lib->mutex);
if (nxt_slow_path(process != NULL)) {
nxt_unit_process_use(ctx, process, -1);
}
}
static void
nxt_unit_remove_port_unsafe(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
nxt_unit_port_t *r_port, nxt_unit_process_t **process)
{
nxt_unit_impl_t *lib;
nxt_unit_port_impl_t *port;
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
port = nxt_unit_port_hash_find(&lib->ports, port_id, 1);
if (nxt_slow_path(port == NULL)) {
nxt_unit_debug(ctx, "remove_port: port %d,%d not found",
(int) port_id->pid, (int) port_id->id);
return;
}
nxt_unit_debug(ctx, "remove_port: port %d,%d, fds %d,%d, data %p",
(int) port_id->pid, (int) port_id->id,
port->port.in_fd, port->port.out_fd, port->port.data);
if (port->port.in_fd != -1) {
close(port->port.in_fd);
}
if (port->port.out_fd != -1) {
close(port->port.out_fd);
}
if (port->process != NULL) {
nxt_queue_remove(&port->link);
}
if (process != NULL) {
*process = port->process;
}
if (r_port != NULL) {
*r_port = port->port;
}
free(port);
}
void
nxt_unit_remove_pid(nxt_unit_ctx_t *ctx, pid_t pid)
{
nxt_unit_impl_t *lib;
nxt_unit_process_t *process;
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
pthread_mutex_lock(&lib->mutex);
process = nxt_unit_process_find(ctx, pid, 1);
if (nxt_slow_path(process == NULL)) {
nxt_unit_debug(ctx, "remove_pid: process %d not found", (int) pid);
pthread_mutex_unlock(&lib->mutex);
return;
}
nxt_unit_remove_process(ctx, process);
}
static void
nxt_unit_remove_process(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process)
{
nxt_queue_t ports;
nxt_unit_impl_t *lib;
nxt_unit_port_impl_t *port;
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
nxt_queue_init(&ports);
nxt_queue_add(&ports, &process->ports);
nxt_queue_each(port, &ports, nxt_unit_port_impl_t, link) {
nxt_unit_process_use(ctx, process, -1);
port->process = NULL;
/* Shortcut for default callback. */
if (lib->callbacks.remove_port == nxt_unit_remove_port) {
nxt_queue_remove(&port->link);
nxt_unit_remove_port_unsafe(ctx, &port->port.id, NULL, NULL);
}
} nxt_queue_loop;
pthread_mutex_unlock(&lib->mutex);
nxt_queue_each(port, &ports, nxt_unit_port_impl_t, link) {
nxt_queue_remove(&port->link);
lib->callbacks.remove_port(ctx, &port->port.id);
} nxt_queue_loop;
nxt_unit_process_use(ctx, process, -1);
}
void
nxt_unit_quit(nxt_unit_ctx_t *ctx)
{
nxt_unit_impl_t *lib;
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
lib->online = 0;
}
static ssize_t
nxt_unit_port_send_default(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
const void *buf, size_t buf_size, const void *oob, size_t oob_size)
{
int fd;
nxt_unit_impl_t *lib;
nxt_unit_port_impl_t *port;
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
pthread_mutex_lock(&lib->mutex);
port = nxt_unit_port_hash_find(&lib->ports, port_id, 0);
if (nxt_fast_path(port != NULL)) {
fd = port->port.out_fd;
} else {
nxt_unit_warn(ctx, "port_send: port %d,%d not found",
(int) port_id->pid, (int) port_id->id);
fd = -1;
}
pthread_mutex_unlock(&lib->mutex);
if (nxt_slow_path(fd == -1)) {
if (port != NULL) {
nxt_unit_warn(ctx, "port_send: port %d,%d: fd == -1",
(int) port_id->pid, (int) port_id->id);
}
return -1;
}
nxt_unit_debug(ctx, "port_send: found port %d,%d fd %d",
(int) port_id->pid, (int) port_id->id, fd);
return nxt_unit_port_send(ctx, fd, buf, buf_size, oob, oob_size);
}
ssize_t
nxt_unit_port_send(nxt_unit_ctx_t *ctx, int fd,
const void *buf, size_t buf_size, const void *oob, size_t oob_size)
{
ssize_t res;
struct iovec iov[1];
struct msghdr msg;
iov[0].iov_base = (void *) buf;
iov[0].iov_len = buf_size;
msg.msg_name = NULL;
msg.msg_namelen = 0;
msg.msg_iov = iov;
msg.msg_iovlen = 1;
msg.msg_flags = 0;
msg.msg_control = (void *) oob;
msg.msg_controllen = oob_size;
res = sendmsg(fd, &msg, 0);
if (nxt_slow_path(res == -1)) {
nxt_unit_warn(ctx, "port_send(%d, %d) failed: %s (%d)",
fd, (int) buf_size, strerror(errno), errno);
} else {
nxt_unit_debug(ctx, "port_send(%d, %d): %d", fd, (int) buf_size,
(int) res);
}
return res;
}
static ssize_t
nxt_unit_port_recv_default(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
void *buf, size_t buf_size, void *oob, size_t oob_size)
{
int fd;
nxt_unit_impl_t *lib;
nxt_unit_ctx_impl_t *ctx_impl;
nxt_unit_port_impl_t *port;
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
pthread_mutex_lock(&lib->mutex);
port = nxt_unit_port_hash_find(&lib->ports, port_id, 0);
if (nxt_fast_path(port != NULL)) {
fd = port->port.in_fd;
} else {
nxt_unit_debug(ctx, "port_recv: port %d,%d not found",
(int) port_id->pid, (int) port_id->id);
fd = -1;
}
pthread_mutex_unlock(&lib->mutex);
if (nxt_slow_path(fd == -1)) {
return -1;
}
nxt_unit_debug(ctx, "port_recv: found port %d,%d, fd %d",
(int) port_id->pid, (int) port_id->id, fd);
ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
if (nxt_fast_path(port_id == &ctx_impl->read_port_id)) {
ctx_impl->read_port_fd = fd;
}
return nxt_unit_port_recv(ctx, fd, buf, buf_size, oob, oob_size);
}
ssize_t
nxt_unit_port_recv(nxt_unit_ctx_t *ctx, int fd, void *buf, size_t buf_size,
void *oob, size_t oob_size)
{
ssize_t res;
struct iovec iov[1];
struct msghdr msg;
iov[0].iov_base = buf;
iov[0].iov_len = buf_size;
msg.msg_name = NULL;
msg.msg_namelen = 0;
msg.msg_iov = iov;
msg.msg_iovlen = 1;
msg.msg_flags = 0;
msg.msg_control = oob;
msg.msg_controllen = oob_size;
res = recvmsg(fd, &msg, 0);
if (nxt_slow_path(res == -1)) {
nxt_unit_warn(ctx, "port_recv(%d) failed: %s (%d)",
fd, strerror(errno), errno);
} else {
nxt_unit_debug(ctx, "port_recv(%d): %d", fd, (int) res);
}
return res;
}
static nxt_int_t
nxt_unit_port_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
{
nxt_unit_port_t *port;
nxt_unit_port_hash_id_t *port_id;
port = data;
port_id = (nxt_unit_port_hash_id_t *) lhq->key.start;
if (lhq->key.length == sizeof(nxt_unit_port_hash_id_t)
&& port_id->pid == port->id.pid
&& port_id->id == port->id.id)
{
return NXT_OK;
}
return NXT_DECLINED;
}
static const nxt_lvlhsh_proto_t lvlhsh_ports_proto nxt_aligned(64) = {
NXT_LVLHSH_DEFAULT,
nxt_unit_port_hash_test,
nxt_lvlhsh_alloc,
nxt_lvlhsh_free,
};
static inline void
nxt_unit_port_hash_lhq(nxt_lvlhsh_query_t *lhq,
nxt_unit_port_hash_id_t *port_hash_id,
nxt_unit_port_id_t *port_id)
{
port_hash_id->pid = port_id->pid;
port_hash_id->id = port_id->id;
if (nxt_fast_path(port_id->hash != 0)) {
lhq->key_hash = port_id->hash;
} else {
lhq->key_hash = nxt_murmur_hash2(port_hash_id, sizeof(*port_hash_id));
port_id->hash = lhq->key_hash;
nxt_unit_debug(NULL, "calculate hash for port_id (%d, %d): %04X",
(int) port_id->pid, (int) port_id->id,
(int) port_id->hash);
}
lhq->key.length = sizeof(nxt_unit_port_hash_id_t);
lhq->key.start = (u_char *) port_hash_id;
lhq->proto = &lvlhsh_ports_proto;
lhq->pool = NULL;
}
static int
nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash, nxt_unit_port_t *port)
{
nxt_int_t res;
nxt_lvlhsh_query_t lhq;
nxt_unit_port_hash_id_t port_hash_id;
nxt_unit_port_hash_lhq(&lhq, &port_hash_id, &port->id);
lhq.replace = 0;
lhq.value = port;
res = nxt_lvlhsh_insert(port_hash, &lhq);
switch (res) {
case NXT_OK:
return NXT_UNIT_OK;
default:
return NXT_UNIT_ERROR;
}
}
static nxt_unit_port_impl_t *
nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash, nxt_unit_port_id_t *port_id,
int remove)
{
nxt_int_t res;
nxt_lvlhsh_query_t lhq;
nxt_unit_port_hash_id_t port_hash_id;
nxt_unit_port_hash_lhq(&lhq, &port_hash_id, port_id);
if (remove) {
res = nxt_lvlhsh_delete(port_hash, &lhq);
} else {
res = nxt_lvlhsh_find(port_hash, &lhq);
}
switch (res) {
case NXT_OK:
return lhq.value;
default:
return NULL;
}
}
static nxt_int_t
nxt_unit_request_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
{
return NXT_OK;
}
static const nxt_lvlhsh_proto_t lvlhsh_requests_proto nxt_aligned(64) = {
NXT_LVLHSH_DEFAULT,
nxt_unit_request_hash_test,
nxt_lvlhsh_alloc,
nxt_lvlhsh_free,
};
static int
nxt_unit_request_hash_add(nxt_lvlhsh_t *request_hash,
nxt_unit_request_info_impl_t *req_impl)
{
uint32_t *stream;
nxt_int_t res;
nxt_lvlhsh_query_t lhq;
stream = &req_impl->stream;
lhq.key_hash = nxt_murmur_hash2(stream, sizeof(*stream));
lhq.key.length = sizeof(*stream);
lhq.key.start = (u_char *) stream;
lhq.proto = &lvlhsh_requests_proto;
lhq.pool = NULL;
lhq.replace = 0;
lhq.value = req_impl;
res = nxt_lvlhsh_insert(request_hash, &lhq);
switch (res) {
case NXT_OK:
return NXT_UNIT_OK;
default:
return NXT_UNIT_ERROR;
}
}
static nxt_unit_request_info_impl_t *
nxt_unit_request_hash_find(nxt_lvlhsh_t *request_hash, uint32_t stream,
int remove)
{
nxt_int_t res;
nxt_lvlhsh_query_t lhq;
lhq.key_hash = nxt_murmur_hash2(&stream, sizeof(stream));
lhq.key.length = sizeof(stream);
lhq.key.start = (u_char *) &stream;
lhq.proto = &lvlhsh_requests_proto;
lhq.pool = NULL;
if (remove) {
res = nxt_lvlhsh_delete(request_hash, &lhq);
} else {
res = nxt_lvlhsh_find(request_hash, &lhq);
}
switch (res) {
case NXT_OK:
return lhq.value;
default:
return NULL;
}
}
void
nxt_unit_log(nxt_unit_ctx_t *ctx, int level, const char *fmt, ...)
{
int log_fd, n;
char msg[NXT_MAX_ERROR_STR], *p, *end;
pid_t pid;
va_list ap;
nxt_unit_impl_t *lib;
if (nxt_fast_path(ctx != NULL)) {
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
pid = lib->pid;
log_fd = lib->log_fd;
} else {
pid = getpid();
log_fd = STDERR_FILENO;
}
p = msg;
end = p + sizeof(msg) - 1;
p = nxt_unit_snprint_prefix(p, end, pid, level);
va_start(ap, fmt);
p += vsnprintf(p, end - p, fmt, ap);
va_end(ap);
if (nxt_slow_path(p > end)) {
memcpy(end - 5, "[...]", 5);
p = end;
}
*p++ = '\n';
n = write(log_fd, msg, p - msg);
if (nxt_slow_path(n < 0)) {
fprintf(stderr, "Failed to write log: %.*s", (int) (p - msg), msg);
}
}
void
nxt_unit_req_log(nxt_unit_request_info_t *req, int level, const char *fmt, ...)
{
int log_fd, n;
char msg[NXT_MAX_ERROR_STR], *p, *end;
pid_t pid;
va_list ap;
nxt_unit_impl_t *lib;
nxt_unit_request_info_impl_t *req_impl;
if (nxt_fast_path(req != NULL)) {
lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit);
pid = lib->pid;
log_fd = lib->log_fd;
} else {
pid = getpid();
log_fd = STDERR_FILENO;
}
p = msg;
end = p + sizeof(msg) - 1;
p = nxt_unit_snprint_prefix(p, end, pid, level);
if (nxt_fast_path(req != NULL)) {
req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
p += snprintf(p, end - p, "#%"PRIu32": ", req_impl->stream);
}
va_start(ap, fmt);
p += vsnprintf(p, end - p, fmt, ap);
va_end(ap);
if (nxt_slow_path(p > end)) {
memcpy(end - 5, "[...]", 5);
p = end;
}
*p++ = '\n';
n = write(log_fd, msg, p - msg);
if (nxt_slow_path(n < 0)) {
fprintf(stderr, "Failed to write log: %.*s", (int) (p - msg), msg);
}
}
static const char * nxt_unit_log_levels[] = {
"alert",
"error",
"warn",
"notice",
"info",
"debug",
};
static char *
nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level)
{
struct tm tm;
struct timespec ts;
(void) clock_gettime(CLOCK_REALTIME, &ts);
#if (NXT_HAVE_LOCALTIME_R)
(void) localtime_r(&ts.tv_sec, &tm);
#else
tm = *localtime(&ts.tv_sec);
#endif
p += snprintf(p, end - p,
"%4d/%02d/%02d %02d:%02d:%02d.%03d ",
tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday,
tm.tm_hour, tm.tm_min, tm.tm_sec,
(int) ts.tv_nsec / 1000000);
p += snprintf(p, end - p,
"[%s] %d#%"PRIu64" [unit] ", nxt_unit_log_levels[level],
(int) pid,
(uint64_t) (uintptr_t) nxt_thread_get_tid());
return p;
}
/* The function required by nxt_lvlhsh_alloc() and nxt_lvlvhsh_free(). */
void *
nxt_memalign(size_t alignment, size_t size)
{
void *p;
nxt_err_t err;
err = posix_memalign(&p, alignment, size);
if (nxt_fast_path(err == 0)) {
return p;
}
return NULL;
}
#if (NXT_DEBUG)
void
nxt_free(void *p)
{
free(p);
}
#endif