diff options
author | Max Romanov <max.romanov@nginx.com> | 2018-08-06 17:27:33 +0300 |
---|---|---|
committer | Max Romanov <max.romanov@nginx.com> | 2018-08-06 17:27:33 +0300 |
commit | 1bb22d1e922c87d3c86c67bdce626767ee48fb5c (patch) | |
tree | 6e067a82b309c3a0d0f592f037f26d886a7f8c13 /src/nxt_router.c | |
parent | b6ce2da65c9c5229d744b2d964623b2d0f731ee9 (diff) | |
download | unit-1bb22d1e922c87d3c86c67bdce626767ee48fb5c.tar.gz unit-1bb22d1e922c87d3c86c67bdce626767ee48fb5c.tar.bz2 |
Unit application library.
Library now used in all language modules.
Old 'nxt_app_*' code removed.
See src/test/nxt_unit_app_test.c for usage sample.
Diffstat (limited to '')
-rw-r--r-- | src/nxt_router.c | 655 |
1 files changed, 295 insertions, 360 deletions
diff --git a/src/nxt_router.c b/src/nxt_router.c index d9df48ec..4c25341f 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -8,6 +8,9 @@ #include <nxt_router.h> #include <nxt_conf.h> #include <nxt_http.h> +#include <nxt_port_memory_int.h> +#include <nxt_unit_request.h> +#include <nxt_unit_response.h> typedef struct { @@ -241,16 +244,8 @@ static nxt_int_t nxt_router_app_port(nxt_task_t *task, nxt_app_t *app, static void nxt_router_app_prepare_request(nxt_task_t *task, nxt_req_app_link_t *ra); -static nxt_int_t nxt_python_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, - nxt_app_wmsg_t *wmsg); -static nxt_int_t nxt_php_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, - nxt_app_wmsg_t *wmsg); -static nxt_int_t nxt_go_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, - nxt_app_wmsg_t *wmsg); -static nxt_int_t nxt_perl_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, - nxt_app_wmsg_t *wmsg); -static nxt_int_t nxt_ruby_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, - nxt_app_wmsg_t *wmsg); +static nxt_buf_t *nxt_router_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, + nxt_port_t *port, const nxt_str_t *prefix); static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data); static void nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, @@ -265,13 +260,15 @@ static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data); static nxt_router_t *nxt_router; +static const nxt_str_t http_prefix = nxt_string("HTTP_"); +static const nxt_str_t empty_prefix = nxt_string(""); -static nxt_app_prepare_msg_t nxt_app_prepare_msg[] = { - nxt_python_prepare_msg, - nxt_php_prepare_msg, - nxt_go_prepare_msg, - nxt_perl_prepare_msg, - nxt_ruby_prepare_msg, +static const nxt_str_t *nxt_app_msg_prefix[] = { + &http_prefix, + &http_prefix, + &empty_prefix, + &http_prefix, + &http_prefix, }; @@ -1459,7 +1456,6 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, app->live = 1; app->max_pending_responses = 2; app->max_requests = apcf.requests; - app->prepare_msg = nxt_app_prepare_msg[lang->type]; engine = task->thread->engine; @@ -3145,6 +3141,7 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, nxt_http_request_t *r; nxt_req_conn_link_t *rc; nxt_app_parse_ctx_t *ar; + nxt_unit_response_t *resp; b = msg->buf; rc = data; @@ -3204,11 +3201,48 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, nxt_http_request_send_body(task, r, NULL); } else { + size_t b_size = nxt_buf_mem_used_size(&b->mem); + + if (nxt_slow_path(b_size < sizeof(*resp))) { + goto fail; + } + + resp = (void *) b->mem.pos; + if (nxt_slow_path(b_size < sizeof(*resp) + + resp->fields_count * sizeof(nxt_unit_field_t))) { + goto fail; + } + + nxt_unit_field_t *f; + nxt_http_field_t *field; + + for (f = resp->fields; f < resp->fields + resp->fields_count; f++) { + field = nxt_list_add(ar->resp_parser.fields); + + if (nxt_slow_path(field == NULL)) { + goto fail; + } + + field->hash = f->hash; + field->skip = f->skip; + + field->name_length = f->name_length; + field->value_length = f->value_length; + field->name = nxt_unit_sptr_get(&f->name); + field->value = nxt_unit_sptr_get(&f->value); + + nxt_debug(task, "header: %*s: %*s", + (size_t) field->name_length, field->name, + (size_t) field->value_length, field->value); + } + r->status = resp->status; + +/* ret = nxt_http_parse_fields(&ar->resp_parser, &b->mem); if (nxt_slow_path(ret != NXT_DONE)) { goto fail; } - +*/ r->resp.fields = ar->resp_parser.fields; ret = nxt_http_fields_process(r->resp.fields, @@ -3217,6 +3251,14 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, goto fail; } + if (resp->piggyback_content_length != 0) { + b->mem.pos = nxt_unit_sptr_get(&resp->piggyback_content); + b->mem.free = b->mem.pos + resp->piggyback_content_length; + + } else { + b->mem.pos = b->mem.free; + } + if (nxt_buf_mem_used_size(&b->mem) == 0) { nxt_work_queue_add(&task->thread->engine->fast_work_queue, b->completion_handler, task, b, b->parent); @@ -4208,10 +4250,9 @@ static void nxt_router_app_prepare_request(nxt_task_t *task, nxt_req_app_link_t *ra) { uint32_t request_failed; - nxt_buf_t *b; + nxt_buf_t *buf; nxt_int_t res; nxt_port_t *port, *c_port, *reply_port; - nxt_app_wmsg_t wmsg; nxt_app_parse_ctx_t *ap; nxt_assert(ra->app_port != NULL); @@ -4236,32 +4277,30 @@ nxt_router_app_prepare_request(nxt_task_t *task, nxt_req_app_link_t *ra) nxt_process_connected_port_add(port->process, reply_port); } - wmsg.port = port; - wmsg.write = NULL; - wmsg.buf = &wmsg.write; - wmsg.stream = ra->stream; - - res = port->app->prepare_msg(task, &ap->r, &wmsg); + buf = nxt_router_prepare_msg(task, &ap->r, port, + nxt_app_msg_prefix[port->app->type]); - if (nxt_slow_path(res != NXT_OK)) { + if (nxt_slow_path(buf == NULL)) { nxt_router_ra_error(ra, 500, "Failed to prepare message for application"); goto release_port; } nxt_debug(task, "about to send %O bytes buffer to app process port %d", - nxt_buf_used_size(wmsg.write), - wmsg.port->socket.fd); + nxt_buf_used_size(buf), + port->socket.fd); request_failed = 0; - ra->msg_info.buf = wmsg.write; - ra->msg_info.completion_handler = wmsg.write->completion_handler; + ra->msg_info.buf = buf; + ra->msg_info.completion_handler = buf->completion_handler; - for (b = wmsg.write; b != NULL; b = b->next) { - b->completion_handler = nxt_router_dummy_buf_completion; + for (; buf; buf = buf->next) { + buf->completion_handler = nxt_router_dummy_buf_completion; } + buf = ra->msg_info.buf; + res = nxt_port_mmap_get_tracking(task, port, &ra->msg_info.tracking, ra->stream); if (nxt_slow_path(res != NXT_OK)) { @@ -4270,8 +4309,8 @@ nxt_router_app_prepare_request(nxt_task_t *task, nxt_req_app_link_t *ra) goto release_port; } - res = nxt_port_socket_twrite(task, wmsg.port, NXT_PORT_MSG_DATA, - -1, ra->stream, reply_port->id, wmsg.write, + res = nxt_port_socket_twrite(task, port, NXT_PORT_MSG_DATA, + -1, ra->stream, reply_port->id, buf, &ra->msg_info.tracking); if (nxt_slow_path(res != NXT_OK)) { @@ -4288,421 +4327,317 @@ release_port: } -static nxt_int_t -nxt_python_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, - nxt_app_wmsg_t *wmsg) -{ - nxt_int_t rc; - nxt_buf_t *b; - nxt_http_field_t *field; - nxt_app_request_header_t *h; - - static const nxt_str_t prefix = nxt_string("HTTP_"); - static const nxt_str_t eof = nxt_null_string; - - h = &r->header; - -#define RC(S) \ - do { \ - rc = (S); \ - if (nxt_slow_path(rc != NXT_OK)) { \ - goto fail; \ - } \ - } while(0) - -#define NXT_WRITE(N) \ - RC(nxt_app_msg_write_str(task, wmsg, N)) - - /* TODO error handle, async mmap buffer assignment */ +struct nxt_fields_iter_s { + nxt_list_part_t *part; + nxt_http_field_t *field; +}; - NXT_WRITE(&h->method); - NXT_WRITE(&h->target); +typedef struct nxt_fields_iter_s nxt_fields_iter_t; - if (h->path.start == h->target.start) { - NXT_WRITE(&eof); - } else { - NXT_WRITE(&h->path); - } - - if (h->query.start != NULL) { - RC(nxt_app_msg_write_size(task, wmsg, - h->query.start - h->target.start + 1)); - } else { - RC(nxt_app_msg_write_size(task, wmsg, 0)); +static nxt_http_field_t * +nxt_fields_part_first(nxt_list_part_t *part, nxt_fields_iter_t *i) +{ + if (part == NULL) { + return NULL; } - NXT_WRITE(&h->version); - - NXT_WRITE(&r->remote); - NXT_WRITE(&r->local); - - NXT_WRITE(&h->host); - NXT_WRITE(&h->content_type); - NXT_WRITE(&h->content_length); - - nxt_list_each(field, h->fields) { - RC(nxt_app_msg_write_prefixed_upcase(task, wmsg, &prefix, field->name, - field->name_length)); - RC(nxt_app_msg_write(task, wmsg, field->value, field->value_length)); - - } nxt_list_loop; - - /* end-of-headers mark */ - NXT_WRITE(&eof); - - RC(nxt_app_msg_write_size(task, wmsg, r->body.preread_size)); - - for (b = r->body.buf; b != NULL; b = b->next) { - RC(nxt_app_msg_write_raw(task, wmsg, b->mem.pos, - nxt_buf_mem_used_size(&b->mem))); + while (part->nelts == 0) { + part = part->next; + if (part == NULL) { + return NULL; + } } -#undef NXT_WRITE -#undef RC - - return NXT_OK; - -fail: + i->part = part; + i->field = nxt_list_data(i->part); - return NXT_ERROR; + return i->field; } -static nxt_int_t -nxt_php_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, - nxt_app_wmsg_t *wmsg) +static nxt_http_field_t * +nxt_fields_first(nxt_list_t *fields, nxt_fields_iter_t *i) { - nxt_int_t rc; - nxt_buf_t *b; - nxt_http_field_t *field; - nxt_app_request_header_t *h; - - static const nxt_str_t prefix = nxt_string("HTTP_"); - static const nxt_str_t eof = nxt_null_string; - - h = &r->header; - -#define RC(S) \ - do { \ - rc = (S); \ - if (nxt_slow_path(rc != NXT_OK)) { \ - goto fail; \ - } \ - } while(0) - -#define NXT_WRITE(N) \ - RC(nxt_app_msg_write_str(task, wmsg, N)) - - /* TODO error handle, async mmap buffer assignment */ - - NXT_WRITE(&h->method); - NXT_WRITE(&h->target); - - if (h->path.start == h->target.start) { - NXT_WRITE(&eof); - - } else { - NXT_WRITE(&h->path); - } - - if (h->query.start != NULL) { - RC(nxt_app_msg_write_size(task, wmsg, - h->query.start - h->target.start + 1)); - } else { - RC(nxt_app_msg_write_size(task, wmsg, 0)); - } - - NXT_WRITE(&h->version); - - // PHP_SELF - // SCRIPT_NAME - // SCRIPT_FILENAME - // DOCUMENT_ROOT + return nxt_fields_part_first(nxt_list_part(fields), i); +} - NXT_WRITE(&r->remote); - NXT_WRITE(&r->local); - NXT_WRITE(&h->host); - NXT_WRITE(&h->cookie); - NXT_WRITE(&h->content_type); - NXT_WRITE(&h->content_length); +static nxt_http_field_t * +nxt_fields_next(nxt_fields_iter_t *i) +{ + nxt_http_field_t *end = nxt_list_data(i->part); - RC(nxt_app_msg_write_size(task, wmsg, h->parsed_content_length)); - RC(nxt_app_msg_write_size(task, wmsg, r->body.preread_size)); + end += i->part->nelts; + i->field++; - for (b = r->body.buf; b != NULL; b = b->next) { - RC(nxt_app_msg_write_raw(task, wmsg, b->mem.pos, - nxt_buf_mem_used_size(&b->mem))); + if (i->field < end) { + return i->field; } - nxt_list_each(field, h->fields) { - RC(nxt_app_msg_write_prefixed_upcase(task, wmsg, &prefix, field->name, - field->name_length)); - RC(nxt_app_msg_write(task, wmsg, field->value, field->value_length)); - - } nxt_list_loop; - - /* end-of-headers mark */ - NXT_WRITE(&eof); - -#undef NXT_WRITE -#undef RC - - return NXT_OK; - -fail: - - return NXT_ERROR; + return nxt_fields_part_first(i->part->next, i); } -static nxt_int_t -nxt_go_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, nxt_app_wmsg_t *wmsg) +static nxt_buf_t * +nxt_router_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, + nxt_port_t *port, const nxt_str_t *prefix) { - nxt_int_t rc; - nxt_buf_t *b; - nxt_http_field_t *field; + void *target_pos, *query_pos; + u_char *pos, *end, *p, c; + size_t fields_count, req_size, size, free_size; + size_t copy_size; + nxt_buf_t *b, *buf, *out, **tail; + nxt_http_field_t *field, *dup; + nxt_unit_field_t *dst_field; + nxt_fields_iter_t iter, dup_iter; + nxt_unit_request_t *req; nxt_app_request_header_t *h; - static const nxt_str_t eof = nxt_null_string; - h = &r->header; -#define RC(S) \ - do { \ - rc = (S); \ - if (nxt_slow_path(rc != NXT_OK)) { \ - goto fail; \ - } \ - } while(0) - -#define NXT_WRITE(N) \ - RC(nxt_app_msg_write_str(task, wmsg, N)) - - /* TODO error handle, async mmap buffer assignment */ - - NXT_WRITE(&h->method); - NXT_WRITE(&h->target); - - if (h->path.start == h->target.start) { - NXT_WRITE(&eof); - - } else { - NXT_WRITE(&h->path); - } - - if (h->query.start != NULL) { - RC(nxt_app_msg_write_size(task, wmsg, - h->query.start - h->target.start + 1)); - } else { - RC(nxt_app_msg_write_size(task, wmsg, 0)); - } - - NXT_WRITE(&h->version); - NXT_WRITE(&r->remote); - - NXT_WRITE(&h->host); - NXT_WRITE(&h->cookie); - NXT_WRITE(&h->content_type); - NXT_WRITE(&h->content_length); + req_size = sizeof(nxt_unit_request_t) + + h->method.length + 1 + + h->version.length + 1 + + r->remote.length + 1 + + r->local.length + 1 + + h->target.length + 1 + + (h->path.start != h->target.start ? h->path.length + 1 : 0); - RC(nxt_app_msg_write_size(task, wmsg, h->parsed_content_length)); + fields_count = 0; nxt_list_each(field, h->fields) { - RC(nxt_app_msg_write(task, wmsg, field->name, field->name_length)); - RC(nxt_app_msg_write(task, wmsg, field->value, field->value_length)); + fields_count++; + req_size += field->name_length + prefix->length + 1 + + field->value_length + 1; } nxt_list_loop; - /* end-of-headers mark */ - NXT_WRITE(&eof); + req_size += fields_count * sizeof(nxt_unit_field_t); - RC(nxt_app_msg_write_size(task, wmsg, r->body.preread_size)); + if (nxt_slow_path(req_size > PORT_MMAP_DATA_SIZE)) { + nxt_alert(task, "headers to big to fit in shared memory (%d)", + (int) req_size); - for (b = r->body.buf; b != NULL; b = b->next) { - RC(nxt_app_msg_write_raw(task, wmsg, b->mem.pos, - nxt_buf_mem_used_size(&b->mem))); + return NULL; } -#undef NXT_WRITE -#undef RC - - return NXT_OK; - -fail: - - return NXT_ERROR; -} + out = nxt_port_mmap_get_buf(task, port, + nxt_min(req_size + r->body.preread_size, PORT_MMAP_DATA_SIZE)); + if (nxt_slow_path(out == NULL)) { + return NULL; + } + req = (nxt_unit_request_t *) out->mem.free; + out->mem.free += req_size; -static nxt_int_t -nxt_perl_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, - nxt_app_wmsg_t *wmsg) -{ - nxt_int_t rc; - nxt_str_t str; - nxt_buf_t *b; - nxt_http_field_t *field; - nxt_app_request_header_t *h; + req->content_length = h->parsed_content_length; - static const nxt_str_t prefix = nxt_string("HTTP_"); - static const nxt_str_t eof = nxt_null_string; + p = (u_char *) (req->fields + fields_count); - h = &r->header; + nxt_debug(task, "fields_count=%d", (int) fields_count); -#define RC(S) \ - do { \ - rc = (S); \ - if (nxt_slow_path(rc != NXT_OK)) { \ - goto fail; \ - } \ - } while(0) + req->method_length = h->method.length; + nxt_unit_sptr_set(&req->method, p); + p = nxt_cpymem(p, h->method.start, h->method.length); + *p++ = '\0'; -#define NXT_WRITE(N) \ - RC(nxt_app_msg_write_str(task, wmsg, N)) + req->version_length = h->version.length; + nxt_unit_sptr_set(&req->version, p); + p = nxt_cpymem(p, h->version.start, h->version.length); + *p++ = '\0'; - /* TODO error handle, async mmap buffer assignment */ + req->remote_length = r->remote.length; + nxt_unit_sptr_set(&req->remote, p); + p = nxt_cpymem(p, r->remote.start, r->remote.length); + *p++ = '\0'; - NXT_WRITE(&h->method); - NXT_WRITE(&h->target); + req->local_length = r->local.length; + nxt_unit_sptr_set(&req->local, p); + p = nxt_cpymem(p, r->local.start, r->local.length); + *p++ = '\0'; - if (h->query.length) { - str.start = h->target.start; - str.length = (h->target.length - h->query.length) - 1; + target_pos = p; + req->target_length = h->target.length; + nxt_unit_sptr_set(&req->target, p); + p = nxt_cpymem(p, h->target.start, h->target.length); + *p++ = '\0'; - RC(nxt_app_msg_write_str(task, wmsg, &str)); + req->path_length = h->path.length; + if (h->path.start == h->target.start) { + nxt_unit_sptr_set(&req->path, target_pos); } else { - NXT_WRITE(&eof); + nxt_unit_sptr_set(&req->path, p); + p = nxt_cpymem(p, h->path.start, h->path.length); + *p++ = '\0'; } + req->query_length = h->query.length; if (h->query.start != NULL) { - RC(nxt_app_msg_write_size(task, wmsg, - h->query.start - h->target.start + 1)); + query_pos = nxt_pointer_to(target_pos, + h->query.start - h->target.start); + + nxt_unit_sptr_set(&req->query, query_pos); + } else { - RC(nxt_app_msg_write_size(task, wmsg, 0)); + req->query.offset = 0; } - NXT_WRITE(&h->version); + req->host_field = NXT_UNIT_NONE_FIELD; + req->content_length_field = NXT_UNIT_NONE_FIELD; + req->content_type_field = NXT_UNIT_NONE_FIELD; + req->cookie_field = NXT_UNIT_NONE_FIELD; - NXT_WRITE(&r->remote); - NXT_WRITE(&r->local); + dst_field = req->fields; - NXT_WRITE(&h->host); - NXT_WRITE(&h->content_type); - NXT_WRITE(&h->content_length); + for (field = nxt_fields_first(h->fields, &iter); + field != NULL; + field = nxt_fields_next(&iter)) + { + if (field->skip) { + continue; + } - nxt_list_each(field, h->fields) { - RC(nxt_app_msg_write_prefixed_upcase(task, wmsg, &prefix, - field->name, field->name_length)); - RC(nxt_app_msg_write(task, wmsg, field->value, field->value_length)); - } nxt_list_loop; + dst_field->hash = field->hash; + dst_field->skip = 0; + dst_field->name_length = field->name_length + prefix->length; + dst_field->value_length = field->value_length; - /* end-of-headers mark */ - NXT_WRITE(&eof); + if (field->value == h->host.start) { + req->host_field = dst_field - req->fields; - RC(nxt_app_msg_write_size(task, wmsg, r->body.preread_size)); + } else if (field->value == h->content_length.start) { + req->content_length_field = dst_field - req->fields; - for (b = r->body.buf; b != NULL; b = b->next) { + } else if (field->value == h->content_type.start) { + req->content_type_field = dst_field - req->fields; - RC(nxt_app_msg_write_raw(task, wmsg, b->mem.pos, - nxt_buf_mem_used_size(&b->mem))); - } + } else if (field->value == h->cookie.start) { + req->cookie_field = dst_field - req->fields; + } -#undef NXT_WRITE -#undef RC + nxt_debug(task, "add field 0x%04Xd, %d, %d, %p : %d %p", + (int) field->hash, (int) field->skip, + (int) field->name_length, field->name, + (int) field->value_length, field->value); - return NXT_OK; + if (prefix->length != 0) { + nxt_unit_sptr_set(&dst_field->name, p); + p = nxt_cpymem(p, prefix->start, prefix->length); -fail: + end = field->name + field->name_length; + for (pos = field->name; pos < end; pos++) { + c = *pos; - return NXT_ERROR; -} + if (c >= 'a' && c <= 'z') { + *p++ = (c & ~0x20); + continue; + } + if (c == '-') { + *p++ = '_'; + continue; + } -static nxt_int_t -nxt_ruby_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, - nxt_app_wmsg_t *wmsg) -{ - nxt_int_t rc; - nxt_str_t str; - nxt_buf_t *b; - nxt_http_field_t *field; - nxt_app_request_header_t *h; + *p++ = c; + } - static const nxt_str_t prefix = nxt_string("HTTP_"); - static const nxt_str_t eof = nxt_null_string; + } else { + nxt_unit_sptr_set(&dst_field->name, p); + p = nxt_cpymem(p, field->name, field->name_length); + } - h = &r->header; + *p++ = '\0'; -#define RC(S) \ - do { \ - rc = (S); \ - if (nxt_slow_path(rc != NXT_OK)) { \ - goto fail; \ - } \ - } while(0) + nxt_unit_sptr_set(&dst_field->value, p); + p = nxt_cpymem(p, field->value, field->value_length); -#define NXT_WRITE(N) \ - RC(nxt_app_msg_write_str(task, wmsg, N)) + if (prefix->length != 0) { + dup_iter = iter; - /* TODO error handle, async mmap buffer assignment */ + for (dup = nxt_fields_next(&dup_iter); + dup != NULL; + dup = nxt_fields_next(&dup_iter)) + { + if (dup->name_length != field->name_length + || dup->skip + || dup->hash != field->hash + || nxt_memcasecmp(dup->name, field->name, dup->name_length)) + { + continue; + } - NXT_WRITE(&h->method); - NXT_WRITE(&h->target); + p = nxt_cpymem(p, ", ", 2); + p = nxt_cpymem(p, dup->value, dup->value_length); - if (h->query.length) { - str.start = h->target.start; - str.length = (h->target.length - h->query.length) - 1; + dst_field->value_length += 2 + dup->value_length; - RC(nxt_app_msg_write_str(task, wmsg, &str)); + dup->skip = 1; + } + } - } else { - NXT_WRITE(&eof); - } + *p++ = '\0'; - if (h->query.start != NULL) { - RC(nxt_app_msg_write_size(task, wmsg, - h->query.start - h->target.start + 1)); - } else { - RC(nxt_app_msg_write_size(task, wmsg, 0)); + dst_field++; } - NXT_WRITE(&h->version); + req->fields_count = dst_field - req->fields; - NXT_WRITE(&r->remote); - NXT_WRITE(&r->local); + nxt_unit_sptr_set(&req->preread_content, out->mem.free); - NXT_WRITE(&h->host); - NXT_WRITE(&h->content_type); - NXT_WRITE(&h->content_length); + buf = out; + tail = &buf->next; - nxt_list_each(field, h->fields) { - RC(nxt_app_msg_write_prefixed_upcase(task, wmsg, &prefix, - field->name, field->name_length)); - RC(nxt_app_msg_write(task, wmsg, field->value, field->value_length)); - } nxt_list_loop; + for (b = r->body.buf; b != NULL; b = b->next) { + size = nxt_buf_mem_used_size(&b->mem); + pos = b->mem.pos; + + while (size > 0) { + if (buf == NULL) { + free_size = nxt_min(size, PORT_MMAP_DATA_SIZE); + + buf = nxt_port_mmap_get_buf(task, port, free_size); + if (nxt_slow_path(buf == NULL)) { + while (out != NULL) { + buf = out->next; + out->completion_handler(task, out, out->parent); + out = buf; + } + return NULL; + } - /* end-of-headers mark */ - NXT_WRITE(&eof); + *tail = buf; + tail = &buf->next; - RC(nxt_app_msg_write_size(task, wmsg, r->body.preread_size)); + } else { + free_size = nxt_buf_mem_free_size(&buf->mem); + if (free_size < size + && nxt_port_mmap_increase_buf(task, buf, size, 1) + == NXT_OK) + { + free_size = nxt_buf_mem_free_size(&buf->mem); + } + } - for (b = r->body.buf; b != NULL; b = b->next) { + if (free_size > 0) { + copy_size = nxt_min(free_size, size); - RC(nxt_app_msg_write_raw(task, wmsg, b->mem.pos, - nxt_buf_mem_used_size(&b->mem))); - } + buf->mem.free = nxt_cpymem(buf->mem.free, pos, copy_size); -#undef NXT_WRITE -#undef RC + size -= copy_size; + pos += copy_size; - return NXT_OK; + if (size == 0) { + break; + } + } -fail: + buf = NULL; + } + } - return NXT_ERROR; + return out; } |