summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_router.c
diff options
context:
space:
mode:
authorMax Romanov <max.romanov@nginx.com>2018-08-06 17:27:33 +0300
committerMax Romanov <max.romanov@nginx.com>2018-08-06 17:27:33 +0300
commit1bb22d1e922c87d3c86c67bdce626767ee48fb5c (patch)
tree6e067a82b309c3a0d0f592f037f26d886a7f8c13 /src/nxt_router.c
parentb6ce2da65c9c5229d744b2d964623b2d0f731ee9 (diff)
downloadunit-1bb22d1e922c87d3c86c67bdce626767ee48fb5c.tar.gz
unit-1bb22d1e922c87d3c86c67bdce626767ee48fb5c.tar.bz2
Unit application library.
Library now used in all language modules. Old 'nxt_app_*' code removed. See src/test/nxt_unit_app_test.c for usage sample.
Diffstat (limited to '')
-rw-r--r--src/nxt_router.c655
1 files changed, 295 insertions, 360 deletions
diff --git a/src/nxt_router.c b/src/nxt_router.c
index d9df48ec..4c25341f 100644
--- a/src/nxt_router.c
+++ b/src/nxt_router.c
@@ -8,6 +8,9 @@
#include <nxt_router.h>
#include <nxt_conf.h>
#include <nxt_http.h>
+#include <nxt_port_memory_int.h>
+#include <nxt_unit_request.h>
+#include <nxt_unit_response.h>
typedef struct {
@@ -241,16 +244,8 @@ static nxt_int_t nxt_router_app_port(nxt_task_t *task, nxt_app_t *app,
static void nxt_router_app_prepare_request(nxt_task_t *task,
nxt_req_app_link_t *ra);
-static nxt_int_t nxt_python_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
- nxt_app_wmsg_t *wmsg);
-static nxt_int_t nxt_php_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
- nxt_app_wmsg_t *wmsg);
-static nxt_int_t nxt_go_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
- nxt_app_wmsg_t *wmsg);
-static nxt_int_t nxt_perl_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
- nxt_app_wmsg_t *wmsg);
-static nxt_int_t nxt_ruby_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
- nxt_app_wmsg_t *wmsg);
+static nxt_buf_t *nxt_router_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
+ nxt_port_t *port, const nxt_str_t *prefix);
static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data);
static void nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj,
@@ -265,13 +260,15 @@ static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data);
static nxt_router_t *nxt_router;
+static const nxt_str_t http_prefix = nxt_string("HTTP_");
+static const nxt_str_t empty_prefix = nxt_string("");
-static nxt_app_prepare_msg_t nxt_app_prepare_msg[] = {
- nxt_python_prepare_msg,
- nxt_php_prepare_msg,
- nxt_go_prepare_msg,
- nxt_perl_prepare_msg,
- nxt_ruby_prepare_msg,
+static const nxt_str_t *nxt_app_msg_prefix[] = {
+ &http_prefix,
+ &http_prefix,
+ &empty_prefix,
+ &http_prefix,
+ &http_prefix,
};
@@ -1459,7 +1456,6 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
app->live = 1;
app->max_pending_responses = 2;
app->max_requests = apcf.requests;
- app->prepare_msg = nxt_app_prepare_msg[lang->type];
engine = task->thread->engine;
@@ -3145,6 +3141,7 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
nxt_http_request_t *r;
nxt_req_conn_link_t *rc;
nxt_app_parse_ctx_t *ar;
+ nxt_unit_response_t *resp;
b = msg->buf;
rc = data;
@@ -3204,11 +3201,48 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
nxt_http_request_send_body(task, r, NULL);
} else {
+ size_t b_size = nxt_buf_mem_used_size(&b->mem);
+
+ if (nxt_slow_path(b_size < sizeof(*resp))) {
+ goto fail;
+ }
+
+ resp = (void *) b->mem.pos;
+ if (nxt_slow_path(b_size < sizeof(*resp)
+ + resp->fields_count * sizeof(nxt_unit_field_t))) {
+ goto fail;
+ }
+
+ nxt_unit_field_t *f;
+ nxt_http_field_t *field;
+
+ for (f = resp->fields; f < resp->fields + resp->fields_count; f++) {
+ field = nxt_list_add(ar->resp_parser.fields);
+
+ if (nxt_slow_path(field == NULL)) {
+ goto fail;
+ }
+
+ field->hash = f->hash;
+ field->skip = f->skip;
+
+ field->name_length = f->name_length;
+ field->value_length = f->value_length;
+ field->name = nxt_unit_sptr_get(&f->name);
+ field->value = nxt_unit_sptr_get(&f->value);
+
+ nxt_debug(task, "header: %*s: %*s",
+ (size_t) field->name_length, field->name,
+ (size_t) field->value_length, field->value);
+ }
+ r->status = resp->status;
+
+/*
ret = nxt_http_parse_fields(&ar->resp_parser, &b->mem);
if (nxt_slow_path(ret != NXT_DONE)) {
goto fail;
}
-
+*/
r->resp.fields = ar->resp_parser.fields;
ret = nxt_http_fields_process(r->resp.fields,
@@ -3217,6 +3251,14 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
goto fail;
}
+ if (resp->piggyback_content_length != 0) {
+ b->mem.pos = nxt_unit_sptr_get(&resp->piggyback_content);
+ b->mem.free = b->mem.pos + resp->piggyback_content_length;
+
+ } else {
+ b->mem.pos = b->mem.free;
+ }
+
if (nxt_buf_mem_used_size(&b->mem) == 0) {
nxt_work_queue_add(&task->thread->engine->fast_work_queue,
b->completion_handler, task, b, b->parent);
@@ -4208,10 +4250,9 @@ static void
nxt_router_app_prepare_request(nxt_task_t *task, nxt_req_app_link_t *ra)
{
uint32_t request_failed;
- nxt_buf_t *b;
+ nxt_buf_t *buf;
nxt_int_t res;
nxt_port_t *port, *c_port, *reply_port;
- nxt_app_wmsg_t wmsg;
nxt_app_parse_ctx_t *ap;
nxt_assert(ra->app_port != NULL);
@@ -4236,32 +4277,30 @@ nxt_router_app_prepare_request(nxt_task_t *task, nxt_req_app_link_t *ra)
nxt_process_connected_port_add(port->process, reply_port);
}
- wmsg.port = port;
- wmsg.write = NULL;
- wmsg.buf = &wmsg.write;
- wmsg.stream = ra->stream;
-
- res = port->app->prepare_msg(task, &ap->r, &wmsg);
+ buf = nxt_router_prepare_msg(task, &ap->r, port,
+ nxt_app_msg_prefix[port->app->type]);
- if (nxt_slow_path(res != NXT_OK)) {
+ if (nxt_slow_path(buf == NULL)) {
nxt_router_ra_error(ra, 500,
"Failed to prepare message for application");
goto release_port;
}
nxt_debug(task, "about to send %O bytes buffer to app process port %d",
- nxt_buf_used_size(wmsg.write),
- wmsg.port->socket.fd);
+ nxt_buf_used_size(buf),
+ port->socket.fd);
request_failed = 0;
- ra->msg_info.buf = wmsg.write;
- ra->msg_info.completion_handler = wmsg.write->completion_handler;
+ ra->msg_info.buf = buf;
+ ra->msg_info.completion_handler = buf->completion_handler;
- for (b = wmsg.write; b != NULL; b = b->next) {
- b->completion_handler = nxt_router_dummy_buf_completion;
+ for (; buf; buf = buf->next) {
+ buf->completion_handler = nxt_router_dummy_buf_completion;
}
+ buf = ra->msg_info.buf;
+
res = nxt_port_mmap_get_tracking(task, port, &ra->msg_info.tracking,
ra->stream);
if (nxt_slow_path(res != NXT_OK)) {
@@ -4270,8 +4309,8 @@ nxt_router_app_prepare_request(nxt_task_t *task, nxt_req_app_link_t *ra)
goto release_port;
}
- res = nxt_port_socket_twrite(task, wmsg.port, NXT_PORT_MSG_DATA,
- -1, ra->stream, reply_port->id, wmsg.write,
+ res = nxt_port_socket_twrite(task, port, NXT_PORT_MSG_DATA,
+ -1, ra->stream, reply_port->id, buf,
&ra->msg_info.tracking);
if (nxt_slow_path(res != NXT_OK)) {
@@ -4288,421 +4327,317 @@ release_port:
}
-static nxt_int_t
-nxt_python_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
- nxt_app_wmsg_t *wmsg)
-{
- nxt_int_t rc;
- nxt_buf_t *b;
- nxt_http_field_t *field;
- nxt_app_request_header_t *h;
-
- static const nxt_str_t prefix = nxt_string("HTTP_");
- static const nxt_str_t eof = nxt_null_string;
-
- h = &r->header;
-
-#define RC(S) \
- do { \
- rc = (S); \
- if (nxt_slow_path(rc != NXT_OK)) { \
- goto fail; \
- } \
- } while(0)
-
-#define NXT_WRITE(N) \
- RC(nxt_app_msg_write_str(task, wmsg, N))
-
- /* TODO error handle, async mmap buffer assignment */
+struct nxt_fields_iter_s {
+ nxt_list_part_t *part;
+ nxt_http_field_t *field;
+};
- NXT_WRITE(&h->method);
- NXT_WRITE(&h->target);
+typedef struct nxt_fields_iter_s nxt_fields_iter_t;
- if (h->path.start == h->target.start) {
- NXT_WRITE(&eof);
- } else {
- NXT_WRITE(&h->path);
- }
-
- if (h->query.start != NULL) {
- RC(nxt_app_msg_write_size(task, wmsg,
- h->query.start - h->target.start + 1));
- } else {
- RC(nxt_app_msg_write_size(task, wmsg, 0));
+static nxt_http_field_t *
+nxt_fields_part_first(nxt_list_part_t *part, nxt_fields_iter_t *i)
+{
+ if (part == NULL) {
+ return NULL;
}
- NXT_WRITE(&h->version);
-
- NXT_WRITE(&r->remote);
- NXT_WRITE(&r->local);
-
- NXT_WRITE(&h->host);
- NXT_WRITE(&h->content_type);
- NXT_WRITE(&h->content_length);
-
- nxt_list_each(field, h->fields) {
- RC(nxt_app_msg_write_prefixed_upcase(task, wmsg, &prefix, field->name,
- field->name_length));
- RC(nxt_app_msg_write(task, wmsg, field->value, field->value_length));
-
- } nxt_list_loop;
-
- /* end-of-headers mark */
- NXT_WRITE(&eof);
-
- RC(nxt_app_msg_write_size(task, wmsg, r->body.preread_size));
-
- for (b = r->body.buf; b != NULL; b = b->next) {
- RC(nxt_app_msg_write_raw(task, wmsg, b->mem.pos,
- nxt_buf_mem_used_size(&b->mem)));
+ while (part->nelts == 0) {
+ part = part->next;
+ if (part == NULL) {
+ return NULL;
+ }
}
-#undef NXT_WRITE
-#undef RC
-
- return NXT_OK;
-
-fail:
+ i->part = part;
+ i->field = nxt_list_data(i->part);
- return NXT_ERROR;
+ return i->field;
}
-static nxt_int_t
-nxt_php_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
- nxt_app_wmsg_t *wmsg)
+static nxt_http_field_t *
+nxt_fields_first(nxt_list_t *fields, nxt_fields_iter_t *i)
{
- nxt_int_t rc;
- nxt_buf_t *b;
- nxt_http_field_t *field;
- nxt_app_request_header_t *h;
-
- static const nxt_str_t prefix = nxt_string("HTTP_");
- static const nxt_str_t eof = nxt_null_string;
-
- h = &r->header;
-
-#define RC(S) \
- do { \
- rc = (S); \
- if (nxt_slow_path(rc != NXT_OK)) { \
- goto fail; \
- } \
- } while(0)
-
-#define NXT_WRITE(N) \
- RC(nxt_app_msg_write_str(task, wmsg, N))
-
- /* TODO error handle, async mmap buffer assignment */
-
- NXT_WRITE(&h->method);
- NXT_WRITE(&h->target);
-
- if (h->path.start == h->target.start) {
- NXT_WRITE(&eof);
-
- } else {
- NXT_WRITE(&h->path);
- }
-
- if (h->query.start != NULL) {
- RC(nxt_app_msg_write_size(task, wmsg,
- h->query.start - h->target.start + 1));
- } else {
- RC(nxt_app_msg_write_size(task, wmsg, 0));
- }
-
- NXT_WRITE(&h->version);
-
- // PHP_SELF
- // SCRIPT_NAME
- // SCRIPT_FILENAME
- // DOCUMENT_ROOT
+ return nxt_fields_part_first(nxt_list_part(fields), i);
+}
- NXT_WRITE(&r->remote);
- NXT_WRITE(&r->local);
- NXT_WRITE(&h->host);
- NXT_WRITE(&h->cookie);
- NXT_WRITE(&h->content_type);
- NXT_WRITE(&h->content_length);
+static nxt_http_field_t *
+nxt_fields_next(nxt_fields_iter_t *i)
+{
+ nxt_http_field_t *end = nxt_list_data(i->part);
- RC(nxt_app_msg_write_size(task, wmsg, h->parsed_content_length));
- RC(nxt_app_msg_write_size(task, wmsg, r->body.preread_size));
+ end += i->part->nelts;
+ i->field++;
- for (b = r->body.buf; b != NULL; b = b->next) {
- RC(nxt_app_msg_write_raw(task, wmsg, b->mem.pos,
- nxt_buf_mem_used_size(&b->mem)));
+ if (i->field < end) {
+ return i->field;
}
- nxt_list_each(field, h->fields) {
- RC(nxt_app_msg_write_prefixed_upcase(task, wmsg, &prefix, field->name,
- field->name_length));
- RC(nxt_app_msg_write(task, wmsg, field->value, field->value_length));
-
- } nxt_list_loop;
-
- /* end-of-headers mark */
- NXT_WRITE(&eof);
-
-#undef NXT_WRITE
-#undef RC
-
- return NXT_OK;
-
-fail:
-
- return NXT_ERROR;
+ return nxt_fields_part_first(i->part->next, i);
}
-static nxt_int_t
-nxt_go_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, nxt_app_wmsg_t *wmsg)
+static nxt_buf_t *
+nxt_router_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
+ nxt_port_t *port, const nxt_str_t *prefix)
{
- nxt_int_t rc;
- nxt_buf_t *b;
- nxt_http_field_t *field;
+ void *target_pos, *query_pos;
+ u_char *pos, *end, *p, c;
+ size_t fields_count, req_size, size, free_size;
+ size_t copy_size;
+ nxt_buf_t *b, *buf, *out, **tail;
+ nxt_http_field_t *field, *dup;
+ nxt_unit_field_t *dst_field;
+ nxt_fields_iter_t iter, dup_iter;
+ nxt_unit_request_t *req;
nxt_app_request_header_t *h;
- static const nxt_str_t eof = nxt_null_string;
-
h = &r->header;
-#define RC(S) \
- do { \
- rc = (S); \
- if (nxt_slow_path(rc != NXT_OK)) { \
- goto fail; \
- } \
- } while(0)
-
-#define NXT_WRITE(N) \
- RC(nxt_app_msg_write_str(task, wmsg, N))
-
- /* TODO error handle, async mmap buffer assignment */
-
- NXT_WRITE(&h->method);
- NXT_WRITE(&h->target);
-
- if (h->path.start == h->target.start) {
- NXT_WRITE(&eof);
-
- } else {
- NXT_WRITE(&h->path);
- }
-
- if (h->query.start != NULL) {
- RC(nxt_app_msg_write_size(task, wmsg,
- h->query.start - h->target.start + 1));
- } else {
- RC(nxt_app_msg_write_size(task, wmsg, 0));
- }
-
- NXT_WRITE(&h->version);
- NXT_WRITE(&r->remote);
-
- NXT_WRITE(&h->host);
- NXT_WRITE(&h->cookie);
- NXT_WRITE(&h->content_type);
- NXT_WRITE(&h->content_length);
+ req_size = sizeof(nxt_unit_request_t)
+ + h->method.length + 1
+ + h->version.length + 1
+ + r->remote.length + 1
+ + r->local.length + 1
+ + h->target.length + 1
+ + (h->path.start != h->target.start ? h->path.length + 1 : 0);
- RC(nxt_app_msg_write_size(task, wmsg, h->parsed_content_length));
+ fields_count = 0;
nxt_list_each(field, h->fields) {
- RC(nxt_app_msg_write(task, wmsg, field->name, field->name_length));
- RC(nxt_app_msg_write(task, wmsg, field->value, field->value_length));
+ fields_count++;
+ req_size += field->name_length + prefix->length + 1
+ + field->value_length + 1;
} nxt_list_loop;
- /* end-of-headers mark */
- NXT_WRITE(&eof);
+ req_size += fields_count * sizeof(nxt_unit_field_t);
- RC(nxt_app_msg_write_size(task, wmsg, r->body.preread_size));
+ if (nxt_slow_path(req_size > PORT_MMAP_DATA_SIZE)) {
+ nxt_alert(task, "headers to big to fit in shared memory (%d)",
+ (int) req_size);
- for (b = r->body.buf; b != NULL; b = b->next) {
- RC(nxt_app_msg_write_raw(task, wmsg, b->mem.pos,
- nxt_buf_mem_used_size(&b->mem)));
+ return NULL;
}
-#undef NXT_WRITE
-#undef RC
-
- return NXT_OK;
-
-fail:
-
- return NXT_ERROR;
-}
+ out = nxt_port_mmap_get_buf(task, port,
+ nxt_min(req_size + r->body.preread_size, PORT_MMAP_DATA_SIZE));
+ if (nxt_slow_path(out == NULL)) {
+ return NULL;
+ }
+ req = (nxt_unit_request_t *) out->mem.free;
+ out->mem.free += req_size;
-static nxt_int_t
-nxt_perl_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
- nxt_app_wmsg_t *wmsg)
-{
- nxt_int_t rc;
- nxt_str_t str;
- nxt_buf_t *b;
- nxt_http_field_t *field;
- nxt_app_request_header_t *h;
+ req->content_length = h->parsed_content_length;
- static const nxt_str_t prefix = nxt_string("HTTP_");
- static const nxt_str_t eof = nxt_null_string;
+ p = (u_char *) (req->fields + fields_count);
- h = &r->header;
+ nxt_debug(task, "fields_count=%d", (int) fields_count);
-#define RC(S) \
- do { \
- rc = (S); \
- if (nxt_slow_path(rc != NXT_OK)) { \
- goto fail; \
- } \
- } while(0)
+ req->method_length = h->method.length;
+ nxt_unit_sptr_set(&req->method, p);
+ p = nxt_cpymem(p, h->method.start, h->method.length);
+ *p++ = '\0';
-#define NXT_WRITE(N) \
- RC(nxt_app_msg_write_str(task, wmsg, N))
+ req->version_length = h->version.length;
+ nxt_unit_sptr_set(&req->version, p);
+ p = nxt_cpymem(p, h->version.start, h->version.length);
+ *p++ = '\0';
- /* TODO error handle, async mmap buffer assignment */
+ req->remote_length = r->remote.length;
+ nxt_unit_sptr_set(&req->remote, p);
+ p = nxt_cpymem(p, r->remote.start, r->remote.length);
+ *p++ = '\0';
- NXT_WRITE(&h->method);
- NXT_WRITE(&h->target);
+ req->local_length = r->local.length;
+ nxt_unit_sptr_set(&req->local, p);
+ p = nxt_cpymem(p, r->local.start, r->local.length);
+ *p++ = '\0';
- if (h->query.length) {
- str.start = h->target.start;
- str.length = (h->target.length - h->query.length) - 1;
+ target_pos = p;
+ req->target_length = h->target.length;
+ nxt_unit_sptr_set(&req->target, p);
+ p = nxt_cpymem(p, h->target.start, h->target.length);
+ *p++ = '\0';
- RC(nxt_app_msg_write_str(task, wmsg, &str));
+ req->path_length = h->path.length;
+ if (h->path.start == h->target.start) {
+ nxt_unit_sptr_set(&req->path, target_pos);
} else {
- NXT_WRITE(&eof);
+ nxt_unit_sptr_set(&req->path, p);
+ p = nxt_cpymem(p, h->path.start, h->path.length);
+ *p++ = '\0';
}
+ req->query_length = h->query.length;
if (h->query.start != NULL) {
- RC(nxt_app_msg_write_size(task, wmsg,
- h->query.start - h->target.start + 1));
+ query_pos = nxt_pointer_to(target_pos,
+ h->query.start - h->target.start);
+
+ nxt_unit_sptr_set(&req->query, query_pos);
+
} else {
- RC(nxt_app_msg_write_size(task, wmsg, 0));
+ req->query.offset = 0;
}
- NXT_WRITE(&h->version);
+ req->host_field = NXT_UNIT_NONE_FIELD;
+ req->content_length_field = NXT_UNIT_NONE_FIELD;
+ req->content_type_field = NXT_UNIT_NONE_FIELD;
+ req->cookie_field = NXT_UNIT_NONE_FIELD;
- NXT_WRITE(&r->remote);
- NXT_WRITE(&r->local);
+ dst_field = req->fields;
- NXT_WRITE(&h->host);
- NXT_WRITE(&h->content_type);
- NXT_WRITE(&h->content_length);
+ for (field = nxt_fields_first(h->fields, &iter);
+ field != NULL;
+ field = nxt_fields_next(&iter))
+ {
+ if (field->skip) {
+ continue;
+ }
- nxt_list_each(field, h->fields) {
- RC(nxt_app_msg_write_prefixed_upcase(task, wmsg, &prefix,
- field->name, field->name_length));
- RC(nxt_app_msg_write(task, wmsg, field->value, field->value_length));
- } nxt_list_loop;
+ dst_field->hash = field->hash;
+ dst_field->skip = 0;
+ dst_field->name_length = field->name_length + prefix->length;
+ dst_field->value_length = field->value_length;
- /* end-of-headers mark */
- NXT_WRITE(&eof);
+ if (field->value == h->host.start) {
+ req->host_field = dst_field - req->fields;
- RC(nxt_app_msg_write_size(task, wmsg, r->body.preread_size));
+ } else if (field->value == h->content_length.start) {
+ req->content_length_field = dst_field - req->fields;
- for (b = r->body.buf; b != NULL; b = b->next) {
+ } else if (field->value == h->content_type.start) {
+ req->content_type_field = dst_field - req->fields;
- RC(nxt_app_msg_write_raw(task, wmsg, b->mem.pos,
- nxt_buf_mem_used_size(&b->mem)));
- }
+ } else if (field->value == h->cookie.start) {
+ req->cookie_field = dst_field - req->fields;
+ }
-#undef NXT_WRITE
-#undef RC
+ nxt_debug(task, "add field 0x%04Xd, %d, %d, %p : %d %p",
+ (int) field->hash, (int) field->skip,
+ (int) field->name_length, field->name,
+ (int) field->value_length, field->value);
- return NXT_OK;
+ if (prefix->length != 0) {
+ nxt_unit_sptr_set(&dst_field->name, p);
+ p = nxt_cpymem(p, prefix->start, prefix->length);
-fail:
+ end = field->name + field->name_length;
+ for (pos = field->name; pos < end; pos++) {
+ c = *pos;
- return NXT_ERROR;
-}
+ if (c >= 'a' && c <= 'z') {
+ *p++ = (c & ~0x20);
+ continue;
+ }
+ if (c == '-') {
+ *p++ = '_';
+ continue;
+ }
-static nxt_int_t
-nxt_ruby_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
- nxt_app_wmsg_t *wmsg)
-{
- nxt_int_t rc;
- nxt_str_t str;
- nxt_buf_t *b;
- nxt_http_field_t *field;
- nxt_app_request_header_t *h;
+ *p++ = c;
+ }
- static const nxt_str_t prefix = nxt_string("HTTP_");
- static const nxt_str_t eof = nxt_null_string;
+ } else {
+ nxt_unit_sptr_set(&dst_field->name, p);
+ p = nxt_cpymem(p, field->name, field->name_length);
+ }
- h = &r->header;
+ *p++ = '\0';
-#define RC(S) \
- do { \
- rc = (S); \
- if (nxt_slow_path(rc != NXT_OK)) { \
- goto fail; \
- } \
- } while(0)
+ nxt_unit_sptr_set(&dst_field->value, p);
+ p = nxt_cpymem(p, field->value, field->value_length);
-#define NXT_WRITE(N) \
- RC(nxt_app_msg_write_str(task, wmsg, N))
+ if (prefix->length != 0) {
+ dup_iter = iter;
- /* TODO error handle, async mmap buffer assignment */
+ for (dup = nxt_fields_next(&dup_iter);
+ dup != NULL;
+ dup = nxt_fields_next(&dup_iter))
+ {
+ if (dup->name_length != field->name_length
+ || dup->skip
+ || dup->hash != field->hash
+ || nxt_memcasecmp(dup->name, field->name, dup->name_length))
+ {
+ continue;
+ }
- NXT_WRITE(&h->method);
- NXT_WRITE(&h->target);
+ p = nxt_cpymem(p, ", ", 2);
+ p = nxt_cpymem(p, dup->value, dup->value_length);
- if (h->query.length) {
- str.start = h->target.start;
- str.length = (h->target.length - h->query.length) - 1;
+ dst_field->value_length += 2 + dup->value_length;
- RC(nxt_app_msg_write_str(task, wmsg, &str));
+ dup->skip = 1;
+ }
+ }
- } else {
- NXT_WRITE(&eof);
- }
+ *p++ = '\0';
- if (h->query.start != NULL) {
- RC(nxt_app_msg_write_size(task, wmsg,
- h->query.start - h->target.start + 1));
- } else {
- RC(nxt_app_msg_write_size(task, wmsg, 0));
+ dst_field++;
}
- NXT_WRITE(&h->version);
+ req->fields_count = dst_field - req->fields;
- NXT_WRITE(&r->remote);
- NXT_WRITE(&r->local);
+ nxt_unit_sptr_set(&req->preread_content, out->mem.free);
- NXT_WRITE(&h->host);
- NXT_WRITE(&h->content_type);
- NXT_WRITE(&h->content_length);
+ buf = out;
+ tail = &buf->next;
- nxt_list_each(field, h->fields) {
- RC(nxt_app_msg_write_prefixed_upcase(task, wmsg, &prefix,
- field->name, field->name_length));
- RC(nxt_app_msg_write(task, wmsg, field->value, field->value_length));
- } nxt_list_loop;
+ for (b = r->body.buf; b != NULL; b = b->next) {
+ size = nxt_buf_mem_used_size(&b->mem);
+ pos = b->mem.pos;
+
+ while (size > 0) {
+ if (buf == NULL) {
+ free_size = nxt_min(size, PORT_MMAP_DATA_SIZE);
+
+ buf = nxt_port_mmap_get_buf(task, port, free_size);
+ if (nxt_slow_path(buf == NULL)) {
+ while (out != NULL) {
+ buf = out->next;
+ out->completion_handler(task, out, out->parent);
+ out = buf;
+ }
+ return NULL;
+ }
- /* end-of-headers mark */
- NXT_WRITE(&eof);
+ *tail = buf;
+ tail = &buf->next;
- RC(nxt_app_msg_write_size(task, wmsg, r->body.preread_size));
+ } else {
+ free_size = nxt_buf_mem_free_size(&buf->mem);
+ if (free_size < size
+ && nxt_port_mmap_increase_buf(task, buf, size, 1)
+ == NXT_OK)
+ {
+ free_size = nxt_buf_mem_free_size(&buf->mem);
+ }
+ }
- for (b = r->body.buf; b != NULL; b = b->next) {
+ if (free_size > 0) {
+ copy_size = nxt_min(free_size, size);
- RC(nxt_app_msg_write_raw(task, wmsg, b->mem.pos,
- nxt_buf_mem_used_size(&b->mem)));
- }
+ buf->mem.free = nxt_cpymem(buf->mem.free, pos, copy_size);
-#undef NXT_WRITE
-#undef RC
+ size -= copy_size;
+ pos += copy_size;
- return NXT_OK;
+ if (size == 0) {
+ break;
+ }
+ }
-fail:
+ buf = NULL;
+ }
+ }
- return NXT_ERROR;
+ return out;
}