diff options
author | Andrei Belov <defan@nginx.com> | 2020-03-12 18:40:48 +0300 |
---|---|---|
committer | Andrei Belov <defan@nginx.com> | 2020-03-12 18:40:48 +0300 |
commit | 4b7ca39903178e20ec7381205694cb01f0dec6bc (patch) | |
tree | 51afb9c7003b5927183e7ddecd766eb19e421233 /src | |
parent | 8414897527ed1616ea39a0cae4d1b8ee170d5cb8 (diff) | |
parent | b3c8a7b33a29208e75dfe4f670cf81dac7b99ccc (diff) | |
download | unit-4b7ca39903178e20ec7381205694cb01f0dec6bc.tar.gz unit-4b7ca39903178e20ec7381205694cb01f0dec6bc.tar.bz2 |
Merged with the default branch.1.16.0-1
Diffstat (limited to 'src')
-rw-r--r-- | src/java/nginx/unit/Context.java | 2 | ||||
-rw-r--r-- | src/java/nxt_jni_InputStream.c | 28 | ||||
-rw-r--r-- | src/nodejs/unit-http/unit.cpp | 3 | ||||
-rw-r--r-- | src/nxt_conf.c | 72 | ||||
-rw-r--r-- | src/nxt_conf_validation.c | 202 | ||||
-rw-r--r-- | src/nxt_conn_write.c | 96 | ||||
-rw-r--r-- | src/nxt_controller.c | 7 | ||||
-rw-r--r-- | src/nxt_h1proto.c | 182 | ||||
-rw-r--r-- | src/nxt_http.h | 19 | ||||
-rw-r--r-- | src/nxt_http_proxy.c | 124 | ||||
-rw-r--r-- | src/nxt_http_request.c | 27 | ||||
-rw-r--r-- | src/nxt_http_route.c | 150 | ||||
-rw-r--r-- | src/nxt_http_static.c | 13 | ||||
-rw-r--r-- | src/nxt_kqueue_engine.c | 4 | ||||
-rw-r--r-- | src/nxt_main.h | 8 | ||||
-rw-r--r-- | src/nxt_php_sapi.c | 429 | ||||
-rw-r--r-- | src/nxt_python_wsgi.c | 147 | ||||
-rw-r--r-- | src/nxt_router.c | 116 | ||||
-rw-r--r-- | src/nxt_router.h | 11 | ||||
-rw-r--r-- | src/nxt_runtime.c | 18 | ||||
-rw-r--r-- | src/nxt_runtime.h | 1 | ||||
-rw-r--r-- | src/nxt_unit.c | 158 | ||||
-rw-r--r-- | src/nxt_unit.h | 4 | ||||
-rw-r--r-- | src/nxt_upstream.c | 135 | ||||
-rw-r--r-- | src/nxt_upstream.h | 80 | ||||
-rw-r--r-- | src/nxt_upstream_round_robin.c | 257 | ||||
-rw-r--r-- | src/ruby/nxt_ruby_stream_io.c | 28 |
27 files changed, 1815 insertions, 506 deletions
diff --git a/src/java/nginx/unit/Context.java b/src/java/nginx/unit/Context.java index 6fcd6018..5f7ec22f 100644 --- a/src/java/nginx/unit/Context.java +++ b/src/java/nginx/unit/Context.java @@ -1517,7 +1517,7 @@ public class Context implements ServletContext, InitParams || ci.isAnnotation() || ci.isAbstract()) { - return; + continue; } trace("loadInitializer: handles class: " + ci.getName()); diff --git a/src/java/nxt_jni_InputStream.c b/src/java/nxt_jni_InputStream.c index b96ff742..3b74b0c1 100644 --- a/src/java/nxt_jni_InputStream.c +++ b/src/java/nxt_jni_InputStream.c @@ -90,40 +90,20 @@ static jint JNICALL nxt_java_InputStream_readLine(JNIEnv *env, jclass cls, jlong req_info_ptr, jarray out, jint off, jint len) { - char *p; - jint size, b_size; uint8_t *data; ssize_t res; - nxt_unit_buf_t *b; nxt_unit_request_info_t *req; req = nxt_jlong2ptr(req_info_ptr); - size = 0; - - for (b = req->content_buf; b; b = nxt_unit_buf_next(b)) { - b_size = b->end - b->free; - p = memchr(b->free, '\n', b_size); - - if (p != NULL) { - p++; - size += p - b->free; - break; - } + data = (*env)->GetPrimitiveArrayCritical(env, out, NULL); - size += b_size; + res = nxt_unit_request_readline_size(req, len); - if (size >= len) { - break; - } + if (res > 0) { + res = nxt_unit_request_read(req, data + off, res); } - len = len < size ? len : size; - - data = (*env)->GetPrimitiveArrayCritical(env, out, NULL); - - res = nxt_unit_request_read(req, data + off, len); - nxt_unit_req_debug(req, "readLine '%.*s'", res, (char *) data + off); (*env)->ReleasePrimitiveArrayCritical(env, out, data, 0); diff --git a/src/nodejs/unit-http/unit.cpp b/src/nodejs/unit-http/unit.cpp index 64e076c1..975174d4 100644 --- a/src/nodejs/unit-http/unit.cpp +++ b/src/nodejs/unit-http/unit.cpp @@ -812,8 +812,7 @@ Unit::response_write(napi_env env, napi_callback_info info) /* TODO: will work only for utf8 content-type */ if (req->response_buf != NULL - && (req->response_buf->end - req->response_buf->free) - >= buf_len) + && req->response_buf->end >= req->response_buf->free + buf_len) { buf = req->response_buf; diff --git a/src/nxt_conf.c b/src/nxt_conf.c index 43820d2a..2a952c35 100644 --- a/src/nxt_conf.c +++ b/src/nxt_conf.c @@ -1244,15 +1244,74 @@ nxt_conf_json_parse(nxt_mp_t *mp, u_char *start, u_char *end, static u_char * nxt_conf_json_skip_space(u_char *start, u_char *end) { - u_char *p; + u_char *p, ch; + + enum { + sw_normal = 0, + sw_after_slash, + sw_single_comment, + sw_multi_comment, + sw_after_asterisk, + } state; + + state = sw_normal; for (p = start; nxt_fast_path(p != end); p++) { + ch = *p; + + switch (state) { + + case sw_normal: + switch (ch) { + case ' ': + case '\t': + case '\n': + case '\r': + continue; + case '/': + state = sw_after_slash; + continue; + } + + break; + + case sw_after_slash: + switch (ch) { + case '/': + state = sw_single_comment; + continue; + case '*': + state = sw_multi_comment; + continue; + } + + p--; + break; + + case sw_single_comment: + if (ch == '\n') { + state = sw_normal; + } - switch (*p) { - case ' ': - case '\t': - case '\r': - case '\n': + continue; + + case sw_multi_comment: + if (ch == '*') { + state = sw_after_asterisk; + } + + continue; + + case sw_after_asterisk: + switch (ch) { + case '/': + state = sw_normal; + continue; + case '*': + continue; + } + + state = sw_multi_comment; continue; } @@ -1346,6 +1405,7 @@ nxt_conf_json_parse_value(nxt_mp_t *mp, nxt_conf_value_t *value, u_char *start, case '{': case '[': case '"': + case '/': return p; } } diff --git a/src/nxt_conf_validation.c b/src/nxt_conf_validation.c index 5a1f7839..3a3654bd 100644 --- a/src/nxt_conf_validation.c +++ b/src/nxt_conf_validation.c @@ -110,6 +110,12 @@ static nxt_int_t nxt_conf_vldt_java_classpath(nxt_conf_validation_t *vldt, nxt_conf_value_t *value); static nxt_int_t nxt_conf_vldt_java_option(nxt_conf_validation_t *vldt, nxt_conf_value_t *value); +static nxt_int_t nxt_conf_vldt_upstream(nxt_conf_validation_t *vldt, + nxt_str_t *name, nxt_conf_value_t *value); +static nxt_int_t nxt_conf_vldt_server(nxt_conf_validation_t *vldt, + nxt_str_t *name, nxt_conf_value_t *value); +static nxt_int_t nxt_conf_vldt_server_weight(nxt_conf_validation_t *vldt, + nxt_conf_value_t *value, void *data); static nxt_int_t nxt_conf_vldt_isolation(nxt_conf_validation_t *vldt, nxt_conf_value_t *value, void *data); @@ -176,11 +182,21 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_http_members[] = { NULL, NULL }, + { nxt_string("body_buffer_size"), + NXT_CONF_VLDT_INTEGER, + NULL, + NULL }, + { nxt_string("max_body_size"), NXT_CONF_VLDT_INTEGER, NULL, NULL }, + { nxt_string("body_temp_path"), + NXT_CONF_VLDT_STRING, + NULL, + NULL }, + { nxt_string("websocket"), NXT_CONF_VLDT_OBJECT, &nxt_conf_vldt_object, @@ -226,6 +242,11 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_root_members[] = { &nxt_conf_vldt_object_iterator, (void *) &nxt_conf_vldt_app }, + { nxt_string("upstreams"), + NXT_CONF_VLDT_OBJECT, + &nxt_conf_vldt_object_iterator, + (void *) &nxt_conf_vldt_upstream }, + { nxt_string("access_log"), NXT_CONF_VLDT_STRING, NULL, @@ -323,17 +344,32 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_match_members[] = { }; -static nxt_conf_vldt_object_t nxt_conf_vldt_action_members[] = { +static nxt_conf_vldt_object_t nxt_conf_vldt_pass_action_members[] = { { nxt_string("pass"), NXT_CONF_VLDT_STRING, &nxt_conf_vldt_pass, NULL }, + NXT_CONF_VLDT_END +}; + + +static nxt_conf_vldt_object_t nxt_conf_vldt_share_action_members[] = { { nxt_string("share"), NXT_CONF_VLDT_STRING, NULL, NULL }, + { nxt_string("fallback"), + NXT_CONF_VLDT_OBJECT, + &nxt_conf_vldt_action, + NULL }, + + NXT_CONF_VLDT_END +}; + + +static nxt_conf_vldt_object_t nxt_conf_vldt_proxy_action_members[] = { { nxt_string("proxy"), NXT_CONF_VLDT_STRING, &nxt_conf_vldt_proxy, @@ -667,6 +703,26 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_java_members[] = { }; +static nxt_conf_vldt_object_t nxt_conf_vldt_upstream_members[] = { + { nxt_string("servers"), + NXT_CONF_VLDT_OBJECT, + &nxt_conf_vldt_object_iterator, + (void *) &nxt_conf_vldt_server }, + + NXT_CONF_VLDT_END +}; + + +static nxt_conf_vldt_object_t nxt_conf_vldt_upstream_server_members[] = { + { nxt_string("weight"), + NXT_CONF_VLDT_INTEGER, + &nxt_conf_vldt_server_weight, + NULL }, + + NXT_CONF_VLDT_END +}; + + nxt_int_t nxt_conf_validate(nxt_conf_validation_t *vldt) { @@ -912,30 +968,45 @@ static nxt_int_t nxt_conf_vldt_action(nxt_conf_validation_t *vldt, nxt_conf_value_t *value, void *data) { - nxt_int_t ret; - nxt_conf_value_t *pass_value, *share_value, *proxy_value; + nxt_uint_t i; + nxt_conf_value_t *action; + nxt_conf_vldt_object_t *members; + + static struct { + nxt_str_t name; + nxt_conf_vldt_object_t *members; + + } actions[] = { + { nxt_string("pass"), nxt_conf_vldt_pass_action_members }, + { nxt_string("share"), nxt_conf_vldt_share_action_members }, + { nxt_string("proxy"), nxt_conf_vldt_proxy_action_members }, + }; - static nxt_str_t pass_str = nxt_string("pass"); - static nxt_str_t share_str = nxt_string("share"); - static nxt_str_t proxy_str = nxt_string("proxy"); + members = NULL; - ret = nxt_conf_vldt_object(vldt, value, nxt_conf_vldt_action_members); + for (i = 0; i < nxt_nitems(actions); i++) { + action = nxt_conf_get_object_member(value, &actions[i].name, NULL); - if (ret != NXT_OK) { - return ret; - } + if (action == NULL) { + continue; + } - pass_value = nxt_conf_get_object_member(value, &pass_str, NULL); - share_value = nxt_conf_get_object_member(value, &share_str, NULL); - proxy_value = nxt_conf_get_object_member(value, &proxy_str, NULL); + if (members != NULL) { + return nxt_conf_vldt_error(vldt, "The \"action\" object must have " + "just one of \"pass\", \"share\" or " + "\"proxy\" options set."); + } - if (pass_value == NULL && share_value == NULL && proxy_value == NULL) { + members = actions[i].members; + } + + if (members == NULL) { return nxt_conf_vldt_error(vldt, "The \"action\" object must have " - "either \"pass\" or \"share\" or " + "either \"pass\", \"share\", or " "\"proxy\" option set."); } - return NXT_OK; + return nxt_conf_vldt_object(vldt, value, members); } @@ -987,6 +1058,27 @@ nxt_conf_vldt_pass(nxt_conf_validation_t *vldt, nxt_conf_value_t *value, return NXT_OK; } + if (nxt_str_eq(&first, "upstreams", 9)) { + + if (second.length == 0) { + goto error; + } + + value = nxt_conf_get_object_member(vldt->conf, &first, NULL); + + if (nxt_slow_path(value == NULL)) { + goto error; + } + + value = nxt_conf_get_object_member(value, &second, NULL); + + if (nxt_slow_path(value == NULL)) { + goto error; + } + + return NXT_OK; + } + if (nxt_str_eq(&first, "routes", 6)) { value = nxt_conf_get_object_member(vldt->conf, &first, NULL); @@ -1871,3 +1963,81 @@ nxt_conf_vldt_java_option(nxt_conf_validation_t *vldt, nxt_conf_value_t *value) return NXT_OK; } + + +static nxt_int_t +nxt_conf_vldt_upstream(nxt_conf_validation_t *vldt, nxt_str_t *name, + nxt_conf_value_t *value) +{ + nxt_int_t ret; + nxt_conf_value_t *conf; + + static nxt_str_t servers = nxt_string("servers"); + + ret = nxt_conf_vldt_type(vldt, name, value, NXT_CONF_VLDT_OBJECT); + + if (ret != NXT_OK) { + return ret; + } + + ret = nxt_conf_vldt_object(vldt, value, nxt_conf_vldt_upstream_members); + + if (ret != NXT_OK) { + return ret; + } + + conf = nxt_conf_get_object_member(value, &servers, NULL); + if (conf == NULL) { + return nxt_conf_vldt_error(vldt, "The \"%V\" upstream must contain " + "\"servers\" object value.", name); + } + + return NXT_OK; +} + + +static nxt_int_t +nxt_conf_vldt_server(nxt_conf_validation_t *vldt, nxt_str_t *name, + nxt_conf_value_t *value) +{ + nxt_int_t ret; + nxt_sockaddr_t *sa; + + ret = nxt_conf_vldt_type(vldt, name, value, NXT_CONF_VLDT_OBJECT); + + if (ret != NXT_OK) { + return ret; + } + + sa = nxt_sockaddr_parse(vldt->pool, name); + + if (sa == NULL) { + return nxt_conf_vldt_error(vldt, "The \"%V\" is not valid " + "server address.", name); + } + + return nxt_conf_vldt_object(vldt, value, + nxt_conf_vldt_upstream_server_members); +} + + +static nxt_int_t +nxt_conf_vldt_server_weight(nxt_conf_validation_t *vldt, + nxt_conf_value_t *value, void *data) +{ + int64_t int_value; + + int_value = nxt_conf_get_integer(value); + + if (int_value <= 0) { + return nxt_conf_vldt_error(vldt, "The \"weight\" number must be " + "greater than 0."); + } + + if (int_value > NXT_INT32_T_MAX) { + return nxt_conf_vldt_error(vldt, "The \"weight\" number must " + "not exceed %d.", NXT_INT32_T_MAX); + } + + return NXT_OK; +} diff --git a/src/nxt_conn_write.c b/src/nxt_conn_write.c index 298d8f75..d7a6a8da 100644 --- a/src/nxt_conn_write.c +++ b/src/nxt_conn_write.c @@ -9,6 +9,8 @@ static void nxt_conn_write_timer_handler(nxt_task_t *task, void *obj, void *data); +static ssize_t nxt_conn_io_sendfile(nxt_task_t *task, nxt_sendbuf_t *sb); +static ssize_t nxt_sendfile(int fd, int s, off_t pos, size_t size); void @@ -170,10 +172,104 @@ nxt_conn_io_sendbuf(nxt_task_t *task, nxt_sendbuf_t *sb) return 0; } + if (niov == 0 && nxt_buf_is_file(sb->buf)) { + return nxt_conn_io_sendfile(task, sb); + } + return nxt_conn_io_writev(task, sb, iov, niov); } +static ssize_t +nxt_conn_io_sendfile(nxt_task_t *task, nxt_sendbuf_t *sb) +{ + size_t size; + ssize_t n; + nxt_buf_t *b; + nxt_err_t err; + + b = sb->buf; + + for ( ;; ) { + size = b->file_end - b->file_pos; + + n = nxt_sendfile(b->file->fd, sb->socket, b->file_pos, size); + + err = (n == -1) ? nxt_errno : 0; + + nxt_debug(task, "sendfile(%FD, %d, @%O, %uz): %z", + b->file->fd, sb->socket, b->file_pos, size, n); + + if (n > 0) { + if (n < (ssize_t) size) { + sb->ready = 0; + } + + return n; + } + + if (nxt_slow_path(n == 0)) { + nxt_alert(task, "sendfile() reported that file was truncated at %O", + b->file_pos); + + return NXT_ERROR; + } + + /* n == -1 */ + + switch (err) { + + case NXT_EAGAIN: + sb->ready = 0; + nxt_debug(task, "sendfile() %E", err); + + return NXT_AGAIN; + + case NXT_EINTR: + nxt_debug(task, "sendfile() %E", err); + continue; + + default: + sb->error = err; + nxt_log(task, nxt_socket_error_level(err), + "sendfile(%FD, %d, @%O, %uz) failed %E", + b->file->fd, sb->socket, b->file_pos, size, err); + + return NXT_ERROR; + } + } +} + + +static ssize_t +nxt_sendfile(int fd, int s, off_t pos, size_t size) +{ + ssize_t res; + +#ifdef NXT_HAVE_MACOSX_SENDFILE + off_t sent = size; + + int rc = sendfile(fd, s, pos, &sent, NULL, 0); + + res = (rc == 0 || sent > 0) ? sent : -1; +#endif + +#ifdef NXT_HAVE_FREEBSD_SENDFILE + off_t sent = 0; + + int rc = sendfile(fd, s, pos, size, NULL, &sent, 0); + + res = (rc == 0 || sent > 0) ? sent : -1; +#endif + +#ifdef NXT_HAVE_LINUX_SENDFILE + res = sendfile(s, fd, &pos, size); +#endif + + return res; +} + + ssize_t nxt_conn_io_writev(nxt_task_t *task, nxt_sendbuf_t *sb, struct iovec *iov, nxt_uint_t niov) diff --git a/src/nxt_controller.c b/src/nxt_controller.c index 86ba1246..cc1ed534 100644 --- a/src/nxt_controller.c +++ b/src/nxt_controller.c @@ -989,6 +989,13 @@ nxt_controller_process_config(nxt_task_t *task, nxt_controller_request_t *req, nxt_memzero(&error, sizeof(nxt_conf_json_error_t)); + /* Skip UTF-8 BOM. */ + if (nxt_buf_mem_used_size(mbuf) >= 3 + && nxt_memcmp(mbuf->pos, "\xEF\xBB\xBF", 3) == 0) + { + mbuf->pos += 3; + } + value = nxt_conf_json_parse(mp, mbuf->pos, mbuf->free, &error); if (value == NULL) { diff --git a/src/nxt_h1proto.c b/src/nxt_h1proto.c index 8ce57893..35918bd8 100644 --- a/src/nxt_h1proto.c +++ b/src/nxt_h1proto.c @@ -6,6 +6,7 @@ #include <nxt_router.h> #include <nxt_http.h> +#include <nxt_upstream.h> #include <nxt_h1proto.h> #include <nxt_websocket.h> #include <nxt_websocket_header.h> @@ -816,12 +817,16 @@ nxt_h1p_transfer_encoding(void *ctx, nxt_http_field_t *field, uintptr_t data) static void nxt_h1p_request_body_read(nxt_task_t *task, nxt_http_request_t *r) { - size_t size, body_length; + size_t size, body_length, body_buffer_size, body_rest; + ssize_t res; + nxt_str_t *tmp_path, tmp_name; nxt_buf_t *in, *b; nxt_conn_t *c; nxt_h1proto_t *h1p; nxt_http_status_t status; + static const nxt_str_t tmp_name_pattern = nxt_string("/req-XXXXXXXX"); + h1p = r->proto.h1; nxt_debug(task, "h1p request body read %O te:%d", @@ -846,43 +851,97 @@ nxt_h1p_request_body_read(nxt_task_t *task, nxt_http_request_t *r) goto ready; } - if (r->content_length_n > (nxt_off_t) r->conf->socket_conf->max_body_size) { - status = NXT_HTTP_PAYLOAD_TOO_LARGE; + body_length = (size_t) r->content_length_n; + + body_buffer_size = nxt_min(r->conf->socket_conf->body_buffer_size, + body_length); + + if (body_length > body_buffer_size) { + tmp_path = &r->conf->socket_conf->body_temp_path; + + tmp_name.length = tmp_path->length + tmp_name_pattern.length; + + b = nxt_buf_file_alloc(r->mem_pool, + body_buffer_size + sizeof(nxt_file_t) + + tmp_name.length + 1, 0); + + } else { + /* This initialization required for CentOS 6, gcc 4.4.7. */ + tmp_path = NULL; + tmp_name.length = 0; + + b = nxt_buf_mem_alloc(r->mem_pool, body_buffer_size, 0); + } + + if (nxt_slow_path(b == NULL)) { + status = NXT_HTTP_INTERNAL_SERVER_ERROR; goto error; } - body_length = (size_t) r->content_length_n; + r->body = b; - b = r->body; + if (body_length > body_buffer_size) { + tmp_name.start = nxt_pointer_to(b->mem.start, sizeof(nxt_file_t)); + + memcpy(tmp_name.start, tmp_path->start, tmp_path->length); + memcpy(tmp_name.start + tmp_path->length, tmp_name_pattern.start, + tmp_name_pattern.length); + tmp_name.start[tmp_name.length] = '\0'; + + b->file = (nxt_file_t *) b->mem.start; + nxt_memzero(b->file, sizeof(nxt_file_t)); + b->file->fd = -1; + b->file->size = body_length; + + b->mem.start += sizeof(nxt_file_t) + tmp_name.length + 1; + b->mem.pos = b->mem.start; + b->mem.free = b->mem.start; + + b->file->fd = mkstemp((char *) tmp_name.start); + if (nxt_slow_path(b->file->fd == -1)) { + nxt_log(task, NXT_LOG_ERR, "mkstemp() failed %E", nxt_errno); - if (b == NULL) { - b = nxt_buf_mem_alloc(r->mem_pool, body_length, 0); - if (nxt_slow_path(b == NULL)) { status = NXT_HTTP_INTERNAL_SERVER_ERROR; goto error; } - r->body = b; + nxt_debug(task, "create body tmp file \"%V\", %d", + &tmp_name, b->file->fd); + + unlink((char *) tmp_name.start); } + body_rest = body_length; + in = h1p->conn->read; size = nxt_buf_mem_used_size(&in->mem); if (size != 0) { - if (size > body_length) { - size = body_length; + size = nxt_min(size, body_length); + + if (nxt_buf_is_file(b)) { + res = nxt_fd_write(b->file->fd, in->mem.pos, size); + if (nxt_slow_path(res < (ssize_t) size)) { + status = NXT_HTTP_INTERNAL_SERVER_ERROR; + goto error; + } + + b->file_end += size; + + } else { + size = nxt_min(body_buffer_size, size); + b->mem.free = nxt_cpymem(b->mem.free, in->mem.pos, size); + body_buffer_size -= size; } - b->mem.free = nxt_cpymem(b->mem.free, in->mem.pos, size); in->mem.pos += size; + body_rest -= size; } - size = nxt_buf_mem_free_size(&b->mem); + nxt_debug(task, "h1p body rest: %uz", body_rest); - nxt_debug(task, "h1p body rest: %uz", size); - - if (size != 0) { + if (body_rest != 0) { in->next = h1p->buffers; h1p->buffers = in; h1p->nbuffers++; @@ -895,6 +954,13 @@ nxt_h1p_request_body_read(nxt_task_t *task, nxt_http_request_t *r) return; } + if (nxt_buf_is_file(b)) { + b->mem.start = NULL; + b->mem.end = NULL; + b->mem.pos = NULL; + b->mem.free = NULL; + } + ready: r->state->ready_handler(task, r, NULL); @@ -926,7 +992,9 @@ static const nxt_conn_state_t nxt_h1p_read_body_state static void nxt_h1p_conn_request_body_read(nxt_task_t *task, void *obj, void *data) { - size_t size; + size_t size, body_rest; + ssize_t res; + nxt_buf_t *b; nxt_conn_t *c; nxt_h1proto_t *h1p; nxt_http_request_t *r; @@ -937,18 +1005,59 @@ nxt_h1p_conn_request_body_read(nxt_task_t *task, void *obj, void *data) nxt_debug(task, "h1p conn request body read"); - size = nxt_buf_mem_free_size(&c->read->mem); - - nxt_debug(task, "h1p body rest: %uz", size); + r = h1p->request; engine = task->thread->engine; - if (size != 0) { + b = c->read; + + if (nxt_buf_is_file(b)) { + body_rest = b->file->size - b->file_end; + + size = nxt_buf_mem_used_size(&b->mem); + size = nxt_min(size, body_rest); + + res = nxt_fd_write(b->file->fd, b->mem.pos, size); + if (nxt_slow_path(res < (ssize_t) size)) { + nxt_h1p_request_error(task, h1p, r); + return; + } + + b->file_end += size; + body_rest -= res; + + b->mem.pos += size; + + if (b->mem.pos == b->mem.free) { + if (body_rest >= (size_t) nxt_buf_mem_size(&b->mem)) { + b->mem.free = b->mem.start; + + } else { + /* This required to avoid reading next request. */ + b->mem.free = b->mem.end - body_rest; + } + + b->mem.pos = b->mem.free; + } + + } else { + body_rest = nxt_buf_mem_free_size(&c->read->mem); + } + + nxt_debug(task, "h1p body rest: %uz", body_rest); + + if (body_rest != 0) { nxt_conn_read(engine, c); } else { + if (nxt_buf_is_file(b)) { + b->mem.start = NULL; + b->mem.end = NULL; + b->mem.pos = NULL; + b->mem.free = NULL; + } + c->read = NULL; - r = h1p->request; r->state->ready_handler(task, r, NULL); } @@ -2004,7 +2113,7 @@ nxt_h1p_peer_connect(nxt_task_t *task, nxt_http_peer_t *peer) c->read_timer.task = task; c->write_timer.task = task; c->socket.data = peer; - c->remote = peer->sockaddr; + c->remote = peer->server->sockaddr; c->socket.write_ready = 1; c->write_state = &nxt_h1p_peer_connect_state; @@ -2144,7 +2253,13 @@ nxt_h1p_peer_header_send(nxt_task_t *task, nxt_http_peer_t *peer) c->write_state = &nxt_h1p_peer_header_send_state; if (r->body != NULL) { - body = nxt_buf_mem_alloc(r->mem_pool, 0, 0); + if (nxt_buf_is_file(r->body)) { + body = nxt_buf_file_alloc(r->mem_pool, 0, 0); + + } else { + body = nxt_buf_mem_alloc(r->mem_pool, 0, 0); + } + if (nxt_slow_path(body == NULL)) { r->state->error_handler(task, r, peer); return; @@ -2152,8 +2267,15 @@ nxt_h1p_peer_header_send(nxt_task_t *task, nxt_http_peer_t *peer) header->next = body; - body->mem = r->body->mem; - size += nxt_buf_mem_used_size(&body->mem); + if (nxt_buf_is_file(r->body)) { + body->file = r->body->file; + body->file_end = r->body->file_end; + + } else { + body->mem = r->body->mem; + } + + size += nxt_buf_used_size(body); // nxt_mp_retain(r->mem_pool); } @@ -2209,13 +2331,13 @@ nxt_h1p_peer_header_sent(nxt_task_t *task, void *obj, void *data) c->write = nxt_sendbuf_completion(task, &engine->fast_work_queue, c->write); - if (c->write == NULL) { - r = peer->request; - r->state->ready_handler(task, r, peer); + if (c->write != NULL) { + nxt_conn_write(engine, c); return; } - nxt_conn_write(engine, c); + r = peer->request; + r->state->ready_handler(task, r, peer); } diff --git a/src/nxt_http.h b/src/nxt_http.h index 030d77a7..0e0694e5 100644 --- a/src/nxt_http.h +++ b/src/nxt_http.h @@ -106,10 +106,12 @@ typedef struct { } nxt_http_response_t; +typedef struct nxt_upstream_server_s nxt_upstream_server_t; + typedef struct { nxt_http_proto_t proto; nxt_http_request_t *request; - nxt_sockaddr_t *sockaddr; + nxt_upstream_server_t *server; nxt_list_t *fields; nxt_buf_t *body; nxt_off_t remainder; @@ -178,7 +180,6 @@ struct nxt_http_request_s { typedef struct nxt_http_route_s nxt_http_route_t; -typedef struct nxt_http_upstream_s nxt_http_upstream_t; struct nxt_http_action_s { @@ -187,8 +188,10 @@ struct nxt_http_action_s { nxt_http_action_t *action); union { nxt_http_route_t *route; - nxt_http_upstream_t *upstream; nxt_app_t *application; + nxt_http_action_t *fallback; + nxt_upstream_t *upstream; + uint32_t upstream_number; } u; nxt_str_t name; @@ -274,6 +277,11 @@ nxt_http_action_t *nxt_http_pass_application(nxt_task_t *task, void nxt_http_routes_cleanup(nxt_task_t *task, nxt_http_routes_t *routes); void nxt_http_action_cleanup(nxt_task_t *task, nxt_http_action_t *action); +nxt_int_t nxt_upstreams_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, + nxt_conf_value_t *conf); +nxt_int_t nxt_upstreams_joint_create(nxt_router_temp_conf_t *tmcf, + nxt_upstream_t ***upstream_joint); + nxt_http_action_t *nxt_http_static_handler(nxt_task_t *task, nxt_http_request_t *r, nxt_http_action_t *action); nxt_int_t nxt_http_static_mtypes_init(nxt_mp_t *mp, nxt_lvlhsh_t *hash); @@ -284,6 +292,11 @@ nxt_str_t *nxt_http_static_mtypes_hash_find(nxt_lvlhsh_t *hash, nxt_http_action_t *nxt_http_application_handler(nxt_task_t *task, nxt_http_request_t *r, nxt_http_action_t *action); +void nxt_upstream_find(nxt_upstreams_t *upstreams, nxt_str_t *name, + nxt_http_action_t *action); +nxt_http_action_t *nxt_upstream_proxy_handler(nxt_task_t *task, + nxt_http_request_t *r, nxt_upstream_t *upstream); + nxt_int_t nxt_http_proxy_create(nxt_mp_t *mp, nxt_http_action_t *action); nxt_int_t nxt_http_proxy_date(void *ctx, nxt_http_field_t *field, diff --git a/src/nxt_http_proxy.c b/src/nxt_http_proxy.c index 7f4eeff2..893e9303 100644 --- a/src/nxt_http_proxy.c +++ b/src/nxt_http_proxy.c @@ -6,23 +6,21 @@ #include <nxt_router.h> #include <nxt_http.h> +#include <nxt_upstream.h> -typedef void (*nxt_http_upstream_connect_t)(nxt_task_t *task, - nxt_http_upstream_t *upstream, nxt_http_peer_t *peer); - - -struct nxt_http_upstream_s { - uint32_t current; - uint32_t n; - uint8_t protocol; - nxt_http_upstream_connect_t connect; - nxt_sockaddr_t *sockaddr[1]; +struct nxt_upstream_proxy_s { + nxt_sockaddr_t *sockaddr; + uint8_t protocol; }; -static void nxt_http_upstream_connect(nxt_task_t *task, - nxt_http_upstream_t *upstream, nxt_http_peer_t *peer); +static void nxt_http_proxy_server_get(nxt_task_t *task, + nxt_upstream_server_t *us); +static void nxt_http_proxy_upstream_ready(nxt_task_t *task, + nxt_upstream_server_t *us); +static void nxt_http_proxy_upstream_error(nxt_task_t *task, + nxt_upstream_server_t *us); static nxt_http_action_t *nxt_http_proxy_handler(nxt_task_t *task, nxt_http_request_t *r, nxt_http_action_t *action); static void nxt_http_proxy_header_send(nxt_task_t *task, void *obj, void *data); @@ -43,12 +41,24 @@ static const nxt_http_request_state_t nxt_http_proxy_header_read_state; static const nxt_http_request_state_t nxt_http_proxy_read_state; +static const nxt_upstream_server_proto_t nxt_upstream_simple_proto = { + .get = nxt_http_proxy_server_get, +}; + + +static const nxt_upstream_peer_state_t nxt_upstream_proxy_state = { + .ready = nxt_http_proxy_upstream_ready, + .error = nxt_http_proxy_upstream_error, +}; + + nxt_int_t nxt_http_proxy_create(nxt_mp_t *mp, nxt_http_action_t *action) { - nxt_str_t name; - nxt_sockaddr_t *sa; - nxt_http_upstream_t *upstream; + nxt_str_t name; + nxt_sockaddr_t *sa; + nxt_upstream_t *up; + nxt_upstream_proxy_t *proxy; sa = NULL; name = action->name; @@ -66,18 +76,25 @@ nxt_http_proxy_create(nxt_mp_t *mp, nxt_http_action_t *action) } if (sa != NULL) { - upstream = nxt_mp_alloc(mp, sizeof(nxt_http_upstream_t)); - if (nxt_slow_path(upstream == NULL)) { + up = nxt_mp_alloc(mp, sizeof(nxt_upstream_t)); + if (nxt_slow_path(up == NULL)) { + return NXT_ERROR; + } + + up->name.length = sa->length; + up->name.start = nxt_sockaddr_start(sa); + up->proto = &nxt_upstream_simple_proto; + + proxy = nxt_mp_alloc(mp, sizeof(nxt_upstream_proxy_t)); + if (nxt_slow_path(proxy == NULL)) { return NXT_ERROR; } - upstream->current = 0; - upstream->n = 1; - upstream->protocol = NXT_HTTP_PROTO_H1; - upstream->connect = nxt_http_upstream_connect; - upstream->sockaddr[0] = sa; + proxy->sockaddr = sa; + proxy->protocol = NXT_HTTP_PROTO_H1; + up->type.proxy = proxy; - action->u.upstream = upstream; + action->u.upstream = up; action->handler = nxt_http_proxy_handler; } @@ -89,7 +106,22 @@ static nxt_http_action_t * nxt_http_proxy_handler(nxt_task_t *task, nxt_http_request_t *r, nxt_http_action_t *action) { - nxt_http_peer_t *peer; + return nxt_upstream_proxy_handler(task, r, action->u.upstream); +} + + +nxt_http_action_t * +nxt_upstream_proxy_handler(nxt_task_t *task, nxt_http_request_t *r, + nxt_upstream_t *upstream) +{ + nxt_http_peer_t *peer; + nxt_upstream_server_t *us; + + us = nxt_mp_zalloc(r->mem_pool, sizeof(nxt_upstream_server_t)); + if (nxt_slow_path(us == NULL)) { + nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); + return NULL; + } peer = nxt_mp_zalloc(r->mem_pool, sizeof(nxt_http_peer_t)); if (nxt_slow_path(peer == NULL)) { @@ -102,18 +134,39 @@ nxt_http_proxy_handler(nxt_task_t *task, nxt_http_request_t *r, nxt_mp_retain(r->mem_pool); - action->u.upstream->connect(task, action->u.upstream, peer); + us->state = &nxt_upstream_proxy_state; + us->peer.http = peer; + peer->server = us; + + us->upstream = upstream; + upstream->proto->get(task, us); return NULL; } static void -nxt_http_upstream_connect(nxt_task_t *task, nxt_http_upstream_t *upstream, - nxt_http_peer_t *peer) +nxt_http_proxy_server_get(nxt_task_t *task, nxt_upstream_server_t *us) { - peer->protocol = upstream->protocol; - peer->sockaddr = upstream->sockaddr[0]; + nxt_upstream_proxy_t *proxy; + + proxy = us->upstream->type.proxy; + + us->sockaddr = proxy->sockaddr; + us->protocol = proxy->protocol; + + us->state->ready(task, us); +} + + +static void +nxt_http_proxy_upstream_ready(nxt_task_t *task, nxt_upstream_server_t *us) +{ + nxt_http_peer_t *peer; + + peer = us->peer.http; + + peer->protocol = us->protocol; peer->request->state = &nxt_http_proxy_header_send_state; @@ -121,6 +174,19 @@ nxt_http_upstream_connect(nxt_task_t *task, nxt_http_upstream_t *upstream, } +static void +nxt_http_proxy_upstream_error(nxt_task_t *task, nxt_upstream_server_t *us) +{ + nxt_http_request_t *r; + + r = us->peer.http->request; + + nxt_mp_release(r->mem_pool); + + nxt_http_request_error(task, r, NXT_HTTP_BAD_GATEWAY); +} + + static const nxt_http_request_state_t nxt_http_proxy_header_send_state nxt_aligned(64) = { diff --git a/src/nxt_http_request.c b/src/nxt_http_request.c index 14c75dab..72aaa290 100644 --- a/src/nxt_http_request.c +++ b/src/nxt_http_request.c @@ -186,7 +186,7 @@ nxt_int_t nxt_http_request_content_length(void *ctx, nxt_http_field_t *field, uintptr_t data) { - nxt_off_t n; + nxt_off_t n, max_body_size; nxt_http_request_t *r; r = ctx; @@ -198,6 +198,13 @@ nxt_http_request_content_length(void *ctx, nxt_http_field_t *field, if (nxt_fast_path(n >= 0)) { r->content_length_n = n; + + max_body_size = r->conf->socket_conf->max_body_size; + + if (nxt_slow_path(n > max_body_size)) { + return NXT_HTTP_PAYLOAD_TOO_LARGE; + } + return NXT_OK; } } @@ -319,18 +326,8 @@ nxt_http_action_t * nxt_http_application_handler(nxt_task_t *task, nxt_http_request_t *r, nxt_http_action_t *action) { - nxt_event_engine_t *engine; - nxt_debug(task, "http application handler"); - nxt_mp_retain(r->mem_pool); - - engine = task->thread->engine; - r->timer.task = &engine->task; - r->timer.work_queue = &engine->fast_work_queue; - r->timer.log = engine->task.log; - r->timer.bias = NXT_TIMER_DEFAULT_BIAS; - /* * TODO: need an application flag to get local address * required by "SERVER_ADDR" in Pyhton and PHP. Not used in Go. @@ -572,6 +569,14 @@ nxt_http_request_close_handler(nxt_task_t *task, void *obj, void *data) r->proto.any = NULL; + if (r->body != NULL && nxt_buf_is_file(r->body) + && r->body->file->fd != -1) + { + nxt_fd_close(r->body->file->fd); + + r->body->file->fd = -1; + } + if (nxt_fast_path(proto.any != NULL)) { protocol = r->protocol; diff --git a/src/nxt_http_route.c b/src/nxt_http_route.c index ef9593b7..d7f20bcb 100644 --- a/src/nxt_http_route.c +++ b/src/nxt_http_route.c @@ -43,6 +43,7 @@ typedef struct { nxt_conf_value_t *pass; nxt_conf_value_t *share; nxt_conf_value_t *proxy; + nxt_conf_value_t *fallback; } nxt_http_route_action_conf_t; @@ -175,7 +176,7 @@ static nxt_http_route_t *nxt_http_route_create(nxt_task_t *task, static nxt_http_route_match_t *nxt_http_route_match_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, nxt_conf_value_t *cv); static nxt_int_t nxt_http_route_action_create(nxt_router_temp_conf_t *tmcf, - nxt_conf_value_t *cv, nxt_http_route_match_t *match); + nxt_conf_value_t *cv, nxt_http_action_t *action); static nxt_http_route_table_t *nxt_http_route_table_create(nxt_task_t *task, nxt_mp_t *mp, nxt_conf_value_t *table_cv, nxt_http_route_object_t object, nxt_bool_t case_sensitive); @@ -191,6 +192,7 @@ static nxt_http_route_rule_t *nxt_http_route_rule_create(nxt_task_t *task, nxt_mp_t *mp, nxt_conf_value_t *cv, nxt_bool_t case_sensitive, nxt_http_route_pattern_case_t pattern_case); static int nxt_http_pattern_compare(const void *one, const void *two); +static int nxt_http_addr_pattern_compare(const void *one, const void *two); static nxt_int_t nxt_http_route_pattern_create(nxt_task_t *task, nxt_mp_t *mp, nxt_conf_value_t *cv, nxt_http_route_pattern_t *pattern, nxt_http_route_pattern_case_t pattern_case); @@ -201,8 +203,8 @@ static void nxt_http_route_resolve(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, nxt_http_route_t *route); static void nxt_http_action_resolve(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, nxt_http_action_t *action); -static nxt_http_route_t *nxt_http_route_find(nxt_http_routes_t *routes, - nxt_str_t *name); +static void nxt_http_route_find(nxt_http_routes_t *routes, nxt_str_t *name, + nxt_http_action_t *action); static void nxt_http_route_cleanup(nxt_task_t *task, nxt_http_route_t *routes); static nxt_http_action_t *nxt_http_route_handler(nxt_task_t *task, @@ -407,7 +409,7 @@ nxt_http_route_match_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, uint32_t n; nxt_mp_t *mp; nxt_int_t ret; - nxt_conf_value_t *match_conf; + nxt_conf_value_t *match_conf, *action_conf; nxt_http_route_test_t *test; nxt_http_route_rule_t *rule; nxt_http_route_table_t *table; @@ -416,6 +418,7 @@ nxt_http_route_match_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, nxt_http_route_match_conf_t mtcf; static nxt_str_t match_path = nxt_string("/match"); + static nxt_str_t action_path = nxt_string("/action"); match_conf = nxt_conf_get_path(cv, &match_path); @@ -433,7 +436,12 @@ nxt_http_route_match_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, match->action.handler = NULL; match->items = n; - ret = nxt_http_route_action_create(tmcf, cv, match); + action_conf = nxt_conf_get_path(cv, &action_path); + if (nxt_slow_path(action_conf == NULL)) { + return NULL; + } + + ret = nxt_http_route_action_create(tmcf, action_conf, &match->action); if (nxt_slow_path(ret != NXT_OK)) { return NULL; } @@ -579,30 +587,27 @@ static nxt_conf_map_t nxt_http_route_action_conf[] = { NXT_CONF_MAP_PTR, offsetof(nxt_http_route_action_conf_t, proxy) }, + { + nxt_string("fallback"), + NXT_CONF_MAP_PTR, + offsetof(nxt_http_route_action_conf_t, fallback) + }, }; static nxt_int_t nxt_http_route_action_create(nxt_router_temp_conf_t *tmcf, nxt_conf_value_t *cv, - nxt_http_route_match_t *match) + nxt_http_action_t *action) { nxt_mp_t *mp; nxt_int_t ret; nxt_str_t name, *string; - nxt_conf_value_t *conf, *action_conf; + nxt_conf_value_t *conf; nxt_http_route_action_conf_t accf; - static nxt_str_t action_path = nxt_string("/action"); - - action_conf = nxt_conf_get_path(cv, &action_path); - if (action_conf == NULL) { - return NXT_ERROR; - } - nxt_memzero(&accf, sizeof(accf)); - ret = nxt_conf_map_object(tmcf->mem_pool, - action_conf, nxt_http_route_action_conf, + ret = nxt_conf_map_object(tmcf->mem_pool, cv, nxt_http_route_action_conf, nxt_nitems(nxt_http_route_action_conf), &accf); if (ret != NXT_OK) { return ret; @@ -612,7 +617,7 @@ nxt_http_route_action_create(nxt_router_temp_conf_t *tmcf, nxt_conf_value_t *cv, if (accf.share != NULL) { conf = accf.share; - match->action.handler = nxt_http_static_handler; + action->handler = nxt_http_static_handler; } else if (accf.proxy != NULL) { conf = accf.proxy; @@ -622,13 +627,23 @@ nxt_http_route_action_create(nxt_router_temp_conf_t *tmcf, nxt_conf_value_t *cv, mp = tmcf->router_conf->mem_pool; - string = nxt_str_dup(mp, &match->action.name, &name); + string = nxt_str_dup(mp, &action->name, &name); if (nxt_slow_path(string == NULL)) { return NXT_ERROR; } + if (accf.fallback != NULL) { + action->u.fallback = nxt_mp_zalloc(mp, sizeof(nxt_http_action_t)); + if (nxt_slow_path(action->u.fallback == NULL)) { + return NXT_ERROR; + } + + return nxt_http_route_action_create(tmcf, accf.fallback, + action->u.fallback); + } + if (accf.proxy != NULL) { - return nxt_http_proxy_create(mp, &match->action); + return nxt_http_proxy_create(mp, action); } return NXT_OK; @@ -867,6 +882,12 @@ nxt_http_route_addr_rule_create(nxt_task_t *task, nxt_mp_t *mp, } } + if (n > 1) { + nxt_qsort(addr_rule->addr_pattern, addr_rule->items, + sizeof(nxt_http_route_addr_pattern_t), + nxt_http_addr_pattern_compare); + } + return addr_rule; } @@ -890,6 +911,18 @@ nxt_http_pattern_compare(const void *one, const void *two) } +static int +nxt_http_addr_pattern_compare(const void *one, const void *two) +{ + const nxt_http_route_addr_pattern_t *p1, *p2; + + p1 = one; + p2 = two; + + return (p2->base.negative - p1->base.negative); +} + + static nxt_int_t nxt_http_route_pattern_create(nxt_task_t *task, nxt_mp_t *mp, nxt_conf_value_t *cv, nxt_http_route_pattern_t *pattern, @@ -1043,18 +1076,13 @@ static void nxt_http_route_resolve(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, nxt_http_route_t *route) { - nxt_http_action_t *action; nxt_http_route_match_t **match, **end; match = &route->match[0]; end = match + route->items; while (match < end) { - action = &(*match)->action; - - if (action->handler == NULL) { - nxt_http_action_resolve(task, tmcf, &(*match)->action); - } + nxt_http_action_resolve(task, tmcf, &(*match)->action); match++; } @@ -1067,16 +1095,30 @@ nxt_http_action_resolve(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, { nxt_str_t name; + if (action->handler != NULL) { + if (action->handler == nxt_http_static_handler + && action->u.fallback != NULL) + { + nxt_http_action_resolve(task, tmcf, action->u.fallback); + } + + return; + } + name = action->name; if (nxt_str_start(&name, "applications/", 13)) { name.length -= 13; name.start += 13; - action->u.application = nxt_router_listener_application(tmcf, &name); + nxt_router_listener_application(tmcf, &name, action); nxt_router_app_use(task, action->u.application, 1); - action->handler = nxt_http_application_handler; + } else if (nxt_str_start(&name, "upstreams/", 10)) { + name.length -= 10; + name.start += 10; + + nxt_upstream_find(tmcf->router_conf->upstreams, &name, action); } else if (nxt_str_start(&name, "routes", 6)) { @@ -1089,15 +1131,14 @@ nxt_http_action_resolve(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, name.start += 7; } - action->u.route = nxt_http_route_find(tmcf->router_conf->routes, &name); - - action->handler = nxt_http_route_handler; + nxt_http_route_find(tmcf->router_conf->routes, &name, action); } } -static nxt_http_route_t * -nxt_http_route_find(nxt_http_routes_t *routes, nxt_str_t *name) +static void +nxt_http_route_find(nxt_http_routes_t *routes, nxt_str_t *name, + nxt_http_action_t *action) { nxt_http_route_t **route, **end; @@ -1106,13 +1147,14 @@ nxt_http_route_find(nxt_http_routes_t *routes, nxt_str_t *name) while (route < end) { if (nxt_strstr_eq(&(*route)->name, name)) { - return *route; + action->u.route = *route; + action->handler = nxt_http_route_handler; + + return; } route++; } - - return NULL; } @@ -1153,11 +1195,9 @@ nxt_http_pass_application(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, action->name = *name; - action->u.application = nxt_router_listener_application(tmcf, name); + nxt_router_listener_application(tmcf, name, action); nxt_router_app_use(task, action->u.application, 1); - action->handler = nxt_http_application_handler; - return action; } @@ -1201,6 +1241,13 @@ nxt_http_action_cleanup(nxt_task_t *task, nxt_http_action_t *action) { if (action->handler == nxt_http_application_handler) { nxt_router_app_use(task, action->u.application, -1); + return; + } + + if (action->handler == nxt_http_static_handler + && action->u.fallback != NULL) + { + nxt_http_action_cleanup(task, action->u.fallback); } } @@ -1501,19 +1548,34 @@ static nxt_int_t nxt_http_route_addr_rule(nxt_http_request_t *r, nxt_http_route_addr_rule_t *addr_rule, nxt_sockaddr_t *sa) { - uint32_t i, n; + uint32_t n; + nxt_bool_t matches; nxt_http_route_addr_pattern_t *p; n = addr_rule->items; + p = &addr_rule->addr_pattern[0] - 1; - for (i = 0; i < n; i++) { - p = &addr_rule->addr_pattern[i]; - if (nxt_http_route_addr_pattern_match(p, sa)) { + do { + p++; + n--; + + matches = nxt_http_route_addr_pattern_match(p, sa); + + if (p->base.negative) { + if (matches) { + continue; + } + + return 0; + } + + if (matches) { return 1; } - } - return 0; + } while (n > 0); + + return p->base.negative; } diff --git a/src/nxt_http_static.c b/src/nxt_http_static.c index 44132859..46ae57a7 100644 --- a/src/nxt_http_static.c +++ b/src/nxt_http_static.c @@ -49,6 +49,10 @@ nxt_http_static_handler(nxt_task_t *task, nxt_http_request_t *r, if (nxt_slow_path(!nxt_str_eq(r->method, "GET", 3))) { if (!nxt_str_eq(r->method, "HEAD", 4)) { + if (action->u.fallback != NULL) { + return action->u.fallback; + } + nxt_http_request_error(task, r, NXT_HTTP_METHOD_NOT_ALLOWED); return NULL; } @@ -123,6 +127,10 @@ nxt_http_static_handler(nxt_task_t *task, nxt_http_request_t *r, break; } + if (level == NXT_LOG_ERR && action->u.fallback != NULL) { + return action->u.fallback; + } + if (status != NXT_HTTP_NOT_FOUND) { nxt_log(task, level, "open(\"%FN\") failed %E", f->name, f->error); } @@ -222,8 +230,13 @@ nxt_http_static_handler(nxt_task_t *task, nxt_http_request_t *r, nxt_file_close(task, f); if (nxt_slow_path(!nxt_is_dir(&fi))) { + if (action->u.fallback != NULL) { + return action->u.fallback; + } + nxt_log(task, NXT_LOG_ERR, "\"%FN\" is not a regular file", f->name); + nxt_http_request_error(task, r, NXT_HTTP_NOT_FOUND); return NULL; } diff --git a/src/nxt_kqueue_engine.c b/src/nxt_kqueue_engine.c index 9edbc346..ecc3251e 100644 --- a/src/nxt_kqueue_engine.c +++ b/src/nxt_kqueue_engine.c @@ -747,7 +747,7 @@ nxt_kqueue_poll(nxt_event_engine_t *engine, nxt_msec_t timeout) err = kev->fflags; eof = (kev->flags & EV_EOF) != 0; ev->kq_errno = err; - ev->kq_eof = eof; + ev->kq_eof |= eof; if (ev->read <= NXT_EVENT_BLOCKED) { nxt_debug(ev->task, "blocked read event fd:%d", ev->fd); @@ -778,7 +778,7 @@ nxt_kqueue_poll(nxt_event_engine_t *engine, nxt_msec_t timeout) err = kev->fflags; eof = (kev->flags & EV_EOF) != 0; ev->kq_errno = err; - ev->kq_eof = eof; + ev->kq_eof |= eof; if (ev->write <= NXT_EVENT_BLOCKED) { nxt_debug(ev->task, "blocked write event fd:%d", ev->fd); diff --git a/src/nxt_main.h b/src/nxt_main.h index d9e337d2..b310c4fa 100644 --- a/src/nxt_main.h +++ b/src/nxt_main.h @@ -149,15 +149,7 @@ typedef void (*nxt_event_conn_handler_t)(nxt_thread_t *thr, nxt_conn_t *c); #include <nxt_cache.h> -#include <nxt_source.h> -typedef struct nxt_upstream_source_s nxt_upstream_source_t; - #include <nxt_http_parse.h> -#include <nxt_stream_source.h> -#include <nxt_upstream.h> -#include <nxt_upstream_source.h> -#include <nxt_http_source.h> -#include <nxt_fastcgi_source.h> #include <nxt_runtime.h> #include <nxt_port_hash.h> diff --git a/src/nxt_php_sapi.c b/src/nxt_php_sapi.c index 26bf915f..f5053652 100644 --- a/src/nxt_php_sapi.c +++ b/src/nxt_php_sapi.c @@ -15,23 +15,31 @@ #include <nxt_unit_request.h> -#if PHP_MAJOR_VERSION >= 7 -# define NXT_PHP7 1 -# if PHP_MINOR_VERSION >= 1 -# define NXT_HAVE_PHP_LOG_MESSAGE_WITH_SYSLOG_TYPE 1 -# else -# define NXT_HAVE_PHP_INTERRUPTS 1 -# endif -# define NXT_HAVE_PHP_IGNORE_CWD 1 +#if PHP_VERSION_ID >= 50400 +#define NXT_HAVE_PHP_IGNORE_CWD 1 +#endif + +#if PHP_VERSION_ID >= 70100 +#define NXT_HAVE_PHP_LOG_MESSAGE_WITH_SYSLOG_TYPE 1 #else -# define NXT_HAVE_PHP_INTERRUPTS 1 -# if PHP_MINOR_VERSION >= 4 -# define NXT_HAVE_PHP_IGNORE_CWD 1 -# endif +#define NXT_HAVE_PHP_INTERRUPTS 1 +#endif + +#if PHP_VERSION_ID >= 70000 +#define NXT_PHP7 1 #endif +typedef struct { + char *cookie; + nxt_str_t path_info; + nxt_str_t script_name; + nxt_str_t script_filename; + nxt_str_t script_dirname; + nxt_unit_request_info_t *req; + + uint8_t chdir; /* 1 bit */ +} nxt_php_run_ctx_t; -typedef struct nxt_php_run_ctx_s nxt_php_run_ctx_t; #ifdef NXT_PHP7 typedef int (*nxt_php_disable_t)(char *p, size_t size); @@ -39,14 +47,24 @@ typedef int (*nxt_php_disable_t)(char *p, size_t size); typedef int (*nxt_php_disable_t)(char *p, uint TSRMLS_DC); #endif +#if PHP_VERSION_ID < 70200 +typedef void (*zif_handler)(INTERNAL_FUNCTION_PARAMETERS); +#endif + static nxt_int_t nxt_php_init(nxt_task_t *task, nxt_common_app_conf_t *conf); static void nxt_php_str_trim_trail(nxt_str_t *str, u_char t); static void nxt_php_str_trim_lead(nxt_str_t *str, u_char t); +static nxt_int_t nxt_php_dirname(const nxt_str_t *file, nxt_str_t *dir); nxt_inline u_char *nxt_realpath(const void *c); +nxt_inline void nxt_php_vcwd_chdir(nxt_unit_request_info_t *req, + const nxt_str_t *dirname); -static void nxt_php_request_handler(nxt_unit_request_info_t *req); +static void nxt_php_script_request_handler(nxt_unit_request_info_t *req); +static void nxt_php_path_request_handler(nxt_unit_request_info_t *req); +static nxt_int_t nxt_php_request_init(nxt_php_run_ctx_t *ctx, + nxt_unit_request_t *r); static int nxt_php_startup(sapi_module_struct *sapi_module); static void nxt_php_set_options(nxt_task_t *task, nxt_conf_value_t *options, @@ -56,6 +74,8 @@ static nxt_int_t nxt_php_alter_option(nxt_str_t *name, nxt_str_t *value, static void nxt_php_disable(nxt_task_t *task, const char *type, nxt_str_t *value, char **ptr, nxt_php_disable_t disable); static int nxt_php_send_headers(sapi_headers_struct *sapi_headers TSRMLS_DC); +static void *nxt_php_hash_str_find_ptr(const HashTable *ht, + const nxt_str_t *str); static char *nxt_php_read_cookies(TSRMLS_D); static void nxt_php_set_sptr(nxt_unit_request_info_t *req, const char *name, nxt_unit_sptr_t *v, uint32_t len, zval *track_vars_array TSRMLS_DC); @@ -80,6 +100,55 @@ static int nxt_php_read_post(char *buffer, uint count_bytes TSRMLS_DC); #endif +PHP_MINIT_FUNCTION(nxt_php_ext); +ZEND_NAMED_FUNCTION(nxt_php_chdir); + +zif_handler nxt_php_chdir_handler; + + +static zend_module_entry nxt_php_unit_module = { + STANDARD_MODULE_HEADER, + "unit", + NULL, /* function table */ + PHP_MINIT(nxt_php_ext), /* initialization */ + NULL, /* shutdown */ + NULL, /* request initialization */ + NULL, /* request shutdown */ + NULL, /* information */ + NXT_VERSION, + STANDARD_MODULE_PROPERTIES +}; + + +PHP_MINIT_FUNCTION(nxt_php_ext) +{ + zend_function *func; + + static const nxt_str_t chdir = nxt_string("chdir"); + + func = nxt_php_hash_str_find_ptr(CG(function_table), &chdir); + if (nxt_slow_path(func == NULL)) { + return FAILURE; + } + + nxt_php_chdir_handler = func->internal_function.handler; + func->internal_function.handler = nxt_php_chdir; + + return SUCCESS; +} + + +ZEND_NAMED_FUNCTION(nxt_php_chdir) +{ + nxt_php_run_ctx_t *ctx; + + ctx = SG(server_context); + ctx->chdir = 1; + + nxt_php_chdir_handler(INTERNAL_FUNCTION_PARAM_PASSTHRU); +} + + static sapi_module_struct nxt_php_sapi_module = { (char *) "cli-server", @@ -141,17 +210,9 @@ static sapi_module_struct nxt_php_sapi_module = }; -struct nxt_php_run_ctx_s { - char *cookie; - nxt_str_t path_info; - nxt_str_t script_name; - nxt_str_t script_filename; - nxt_unit_request_info_t *req; -}; - - static nxt_str_t nxt_php_root; static nxt_str_t nxt_php_script_name; +static nxt_str_t nxt_php_script_dirname; static nxt_str_t nxt_php_script_filename; static nxt_str_t nxt_php_index = nxt_string("index.php"); @@ -172,7 +233,7 @@ NXT_EXPORT nxt_app_module_t nxt_app_module = { static nxt_task_t *nxt_php_task; -#ifdef ZTS +#if defined(ZTS) && PHP_VERSION_ID < 70400 static void ***tsrm_ls; #endif @@ -182,7 +243,9 @@ nxt_php_init(nxt_task_t *task, nxt_common_app_conf_t *conf) { u_char *p, *tmp; nxt_str_t ini_path; - nxt_str_t *root, *script_filename, *script_name, *index; + nxt_str_t *root, *script_filename, *script_dirname, *script_name; + nxt_str_t *index; + nxt_int_t ret; nxt_port_t *my_port, *main_port; nxt_runtime_t *rt; nxt_unit_ctx_t *unit_ctx; @@ -205,6 +268,7 @@ nxt_php_init(nxt_task_t *task, nxt_common_app_conf_t *conf) root = &nxt_php_root; script_filename = &nxt_php_script_filename; + script_dirname = &nxt_php_script_dirname; script_name = &nxt_php_script_name; index = &nxt_php_index; @@ -249,6 +313,11 @@ nxt_php_init(nxt_task_t *task, nxt_common_app_conf_t *conf) return NXT_ERROR; } + ret = nxt_php_dirname(script_filename, script_dirname); + if (nxt_slow_path(ret != NXT_OK)) { + return NXT_ERROR; + } + script_name->length = c->script.length + 1; script_name->start = nxt_malloc(script_name->length); if (nxt_slow_path(script_name->start == NULL)) { @@ -277,11 +346,33 @@ nxt_php_init(nxt_task_t *task, nxt_common_app_conf_t *conf) nxt_memcpy(index->start, c->index.start, c->index.length); } + nxt_memzero(&php_init, sizeof(nxt_unit_init_t)); + + if (nxt_php_script_filename.start != NULL) { + if (nxt_slow_path(chdir((char *) script_dirname->start) != 0)) { + nxt_alert(task, "failed to chdir(%V) %E", script_dirname, + nxt_errno); + + return NXT_ERROR; + } + + php_init.callbacks.request_handler = nxt_php_script_request_handler; + + } else { + php_init.callbacks.request_handler = nxt_php_path_request_handler; + } + #ifdef ZTS + +#if PHP_VERSION_ID >= 70400 + php_tsrm_startup(); +#else tsrm_startup(1, 1, 0, NULL); tsrm_ls = ts_resource(0); #endif +#endif + #if defined(NXT_PHP7) && defined(ZEND_SIGNALS) #if (NXT_ZEND_SIGNAL_STARTUP) @@ -312,7 +403,10 @@ nxt_php_init(nxt_task_t *task, nxt_common_app_conf_t *conf) } } - nxt_php_startup(&nxt_php_sapi_module); + if (nxt_slow_path(nxt_php_startup(&nxt_php_sapi_module) == FAILURE)) { + nxt_alert(task, "failed to initialize SAPI module and extension"); + return NXT_ERROR; + } if (c->options != NULL) { value = nxt_conf_get_object_member(c->options, &admin_str, NULL); @@ -322,21 +416,20 @@ nxt_php_init(nxt_task_t *task, nxt_common_app_conf_t *conf) nxt_php_set_options(task, value, ZEND_INI_USER); } - nxt_memzero(&php_init, sizeof(nxt_unit_init_t)); - rt = task->thread->runtime; main_port = rt->port_by_type[NXT_PROCESS_MAIN]; if (nxt_slow_path(main_port == NULL)) { + nxt_alert(task, "main process not found"); return NXT_ERROR; } my_port = nxt_runtime_port_find(rt, nxt_pid, 0); if (nxt_slow_path(my_port == NULL)) { + nxt_alert(task, "my_port not found"); return NXT_ERROR; } - php_init.callbacks.request_handler = nxt_php_request_handler; php_init.ready_port.id.pid = main_port->pid; php_init.ready_port.id.id = main_port->id; php_init.ready_port.out_fd = main_port->pair[1]; @@ -411,7 +504,7 @@ nxt_php_set_options(nxt_task_t *task, nxt_conf_value_t *options, int type) } -#if (NXT_PHP7) +#ifdef NXT_PHP7 static nxt_int_t nxt_php_alter_option(nxt_str_t *name, nxt_str_t *value, int type) @@ -419,10 +512,8 @@ nxt_php_alter_option(nxt_str_t *name, nxt_str_t *value, int type) zend_string *zs; zend_ini_entry *ini_entry; - ini_entry = zend_hash_str_find_ptr(EG(ini_directives), - (char *) name->start, name->length); - - if (ini_entry == NULL) { + ini_entry = nxt_php_hash_str_find_ptr(EG(ini_directives), name); + if (nxt_slow_path(ini_entry == NULL)) { return NXT_ERROR; } @@ -452,19 +543,9 @@ nxt_php_alter_option(nxt_str_t *name, nxt_str_t *value, int type) { char *cstr; zend_ini_entry *ini_entry; - char buf[256]; - if (nxt_slow_path(name->length >= sizeof(buf))) { - return NXT_ERROR; - } - - nxt_memcpy(buf, name->start, name->length); - buf[name->length] = '\0'; - - if (zend_hash_find(EG(ini_directives), buf, name->length + 1, - (void **) &ini_entry) - == FAILURE) - { + ini_entry = nxt_php_hash_str_find_ptr(EG(ini_directives), name); + if (nxt_slow_path(ini_entry == NULL)) { return NXT_ERROR; } @@ -549,6 +630,33 @@ nxt_php_disable(nxt_task_t *task, const char *type, nxt_str_t *value, } +static nxt_int_t +nxt_php_dirname(const nxt_str_t *file, nxt_str_t *dir) +{ + size_t length; + + nxt_assert(file->length > 0 && file->start[0] == '/'); + + length = file->length; + + while (file->start[length - 1] != '/') { + length--; + } + + dir->length = length; + dir->start = nxt_malloc(length + 1); + if (nxt_slow_path(dir->start == NULL)) { + return NXT_ERROR; + } + + nxt_memcpy(dir->start, file->start, length); + + dir->start[length] = '\0'; + + return NXT_OK; +} + + static void nxt_php_str_trim_trail(nxt_str_t *str, u_char t) { @@ -578,12 +686,46 @@ nxt_realpath(const void *c) static void -nxt_php_request_handler(nxt_unit_request_info_t *req) +nxt_php_script_request_handler(nxt_unit_request_info_t *req) +{ + zend_file_handle file_handle; + nxt_php_run_ctx_t ctx; + + nxt_memzero(&ctx, sizeof(ctx)); + + ctx.req = req; + ctx.script_filename = nxt_php_script_filename; + ctx.script_dirname = nxt_php_script_dirname; + ctx.script_name = nxt_php_script_name; + + nxt_memzero(&file_handle, sizeof(file_handle)); + + file_handle.type = ZEND_HANDLE_FILENAME; + file_handle.filename = (char *) ctx.script_filename.start; + + if (nxt_slow_path(nxt_php_request_init(&ctx, req->request) != NXT_OK)) { + nxt_unit_request_done(req, NXT_UNIT_ERROR); + return; + } + + php_execute_script(&file_handle TSRMLS_CC); + + if (ctx.chdir) { + nxt_php_vcwd_chdir(ctx.req, &nxt_php_script_dirname); + } + + php_request_shutdown(NULL); + + nxt_unit_request_done(req, NXT_UNIT_OK); +} + + +static void +nxt_php_path_request_handler(nxt_unit_request_info_t *req) { - int rc; u_char *p; nxt_str_t path, script_name; - nxt_unit_field_t *f; + nxt_int_t ret; zend_file_handle file_handle; nxt_php_run_ctx_t run_ctx, *ctx; nxt_unit_request_t *r; @@ -598,59 +740,101 @@ nxt_php_request_handler(nxt_unit_request_info_t *req) path.length = r->path_length; path.start = nxt_unit_sptr_get(&r->path); - if (nxt_php_script_filename.start == NULL) { - nxt_str_null(&script_name); - - ctx->path_info.start = (u_char *) strstr((char *) path.start, ".php/"); - if (ctx->path_info.start != NULL) { - ctx->path_info.start += 4; - path.length = ctx->path_info.start - path.start; + nxt_str_null(&script_name); - ctx->path_info.length = r->path_length - path.length; + ctx->path_info.start = (u_char *) strstr((char *) path.start, ".php/"); + if (ctx->path_info.start != NULL) { + ctx->path_info.start += 4; + path.length = ctx->path_info.start - path.start; - } else if (path.start[path.length - 1] == '/') { - script_name = nxt_php_index; + ctx->path_info.length = r->path_length - path.length; - } else { - if (nxt_slow_path(path.length < 4 - || nxt_memcmp(path.start + (path.length - 4), - ".php", 4))) - { - nxt_unit_request_done(req, NXT_UNIT_ERROR); - - return; - } - } + } else if (path.start[path.length - 1] == '/') { + script_name = nxt_php_index; - ctx->script_filename.length = nxt_php_root.length + path.length - + script_name.length; - p = nxt_malloc(ctx->script_filename.length + 1); - if (nxt_slow_path(p == NULL)) { + } else { + if (nxt_slow_path(path.length < 4 + || nxt_memcmp(path.start + (path.length - 4), + ".php", 4))) + { nxt_unit_request_done(req, NXT_UNIT_ERROR); return; } + } - ctx->script_filename.start = p; + ctx->script_filename.length = nxt_php_root.length + + path.length + + script_name.length; - ctx->script_name.length = path.length + script_name.length; - ctx->script_name.start = p + nxt_php_root.length; + p = nxt_malloc(ctx->script_filename.length + 1); + if (nxt_slow_path(p == NULL)) { + nxt_unit_request_done(req, NXT_UNIT_ERROR); - p = nxt_cpymem(p, nxt_php_root.start, nxt_php_root.length); - p = nxt_cpymem(p, path.start, path.length); + return; + } - if (script_name.length > 0) { - p = nxt_cpymem(p, script_name.start, script_name.length); - } + ctx->script_filename.start = p; - *p = '\0'; + ctx->script_name.length = path.length + script_name.length; + ctx->script_name.start = p + nxt_php_root.length; - } else { - ctx->script_filename = nxt_php_script_filename; - ctx->script_name = nxt_php_script_name; + p = nxt_cpymem(p, nxt_php_root.start, nxt_php_root.length); + p = nxt_cpymem(p, path.start, path.length); + + if (script_name.length > 0) { + p = nxt_cpymem(p, script_name.start, script_name.length); + } + + *p = '\0'; + + nxt_memzero(&file_handle, sizeof(file_handle)); + + file_handle.type = ZEND_HANDLE_FILENAME; + file_handle.filename = (char *) ctx->script_filename.start; + + ret = nxt_php_dirname(&ctx->script_filename, &ctx->script_dirname); + if (nxt_slow_path(ret != NXT_OK)) { + nxt_unit_request_done(req, NXT_UNIT_ERROR); + nxt_free(ctx->script_filename.start); + + return; } + if (nxt_slow_path(nxt_php_request_init(ctx, req->request) != NXT_OK)) { + nxt_unit_request_done(req, NXT_UNIT_ERROR); + goto cleanup; + } + + nxt_php_vcwd_chdir(ctx->req, &ctx->script_dirname); + + php_execute_script(&file_handle TSRMLS_CC); + + php_request_shutdown(NULL); + + nxt_unit_request_done(req, NXT_UNIT_OK); + +cleanup: + + nxt_free(ctx->script_filename.start); + nxt_free(ctx->script_dirname.start); +} + + +static int +nxt_php_startup(sapi_module_struct *sapi_module) +{ + return php_module_startup(sapi_module, &nxt_php_unit_module, 1); +} + + +static nxt_int_t +nxt_php_request_init(nxt_php_run_ctx_t *ctx, nxt_unit_request_t *r) +{ + nxt_unit_field_t *f; + SG(server_context) = ctx; + SG(options) |= SAPI_OPTION_NO_CHDIR; SG(request_info).request_uri = nxt_unit_sptr_get(&r->target); SG(request_info).request_method = nxt_unit_sptr_get(&r->method); @@ -676,55 +860,41 @@ nxt_php_request_handler(nxt_unit_request_info_t *req) SG(request_info).path_translated = NULL; - nxt_memzero(&file_handle, sizeof(file_handle)); - - file_handle.type = ZEND_HANDLE_FILENAME; - file_handle.filename = (char *) ctx->script_filename.start; - - nxt_unit_req_debug(req, "handle.filename = '%s'", + nxt_unit_req_debug(ctx->req, "handle.filename = '%s'", ctx->script_filename.start); if (nxt_php_script_filename.start != NULL) { - nxt_unit_req_debug(req, "run script %.*s in absolute mode", + nxt_unit_req_debug(ctx->req, "run script %.*s in absolute mode", (int) nxt_php_script_filename.length, (char *) nxt_php_script_filename.start); } else { - nxt_unit_req_debug(req, "run script %.*s", + nxt_unit_req_debug(ctx->req, "run script %.*s", (int) ctx->script_filename.length, (char *) ctx->script_filename.start); } -#if (NXT_PHP7) +#ifdef NXT_PHP7 if (nxt_slow_path(php_request_startup() == FAILURE)) { #else if (nxt_slow_path(php_request_startup(TSRMLS_C) == FAILURE)) { #endif - nxt_unit_req_debug(req, "php_request_startup() failed"); - rc = NXT_UNIT_ERROR; + nxt_unit_req_debug(ctx->req, "php_request_startup() failed"); - goto fail; + return NXT_ERROR; } - rc = NXT_UNIT_OK; - - php_execute_script(&file_handle TSRMLS_CC); - php_request_shutdown(NULL); - -fail: - - nxt_unit_request_done(req, rc); - - if (ctx->script_filename.start != nxt_php_script_filename.start) { - nxt_free(ctx->script_filename.start); - } + return NXT_OK; } -static int -nxt_php_startup(sapi_module_struct *sapi_module) +nxt_inline void +nxt_php_vcwd_chdir(nxt_unit_request_info_t *req, const nxt_str_t *dir) { - return php_module_startup(sapi_module, NULL, 0); + if (nxt_slow_path(VCWD_CHDIR((char *) dir->start) != 0)) { + nxt_unit_req_alert(req, "VCWD_CHDIR(%s) failed (%d: %s)", + dir->start, errno, strerror(errno)); + } } @@ -1001,6 +1171,41 @@ nxt_php_set_str(nxt_unit_request_info_t *req, const char *name, } +#ifdef NXT_PHP7 + +static void * +nxt_php_hash_str_find_ptr(const HashTable *ht, const nxt_str_t *str) +{ + return zend_hash_str_find_ptr(ht, (const char *) str->start, str->length); +} + +#else + +static void * +nxt_php_hash_str_find_ptr(const HashTable *ht, const nxt_str_t *str) +{ + int ret; + void *entry; + char buf[256]; + + if (nxt_slow_path(str->length >= (sizeof(buf) - 1))) { + return NULL; + } + + nxt_memcpy(buf, str->start, str->length); + buf[str->length] = '\0'; + + ret = zend_hash_find(ht, buf, str->length + 1, &entry); + if (nxt_fast_path(ret == SUCCESS)) { + return entry; + } + + return NULL; +} + +#endif + + static void nxt_php_set_cstr(nxt_unit_request_info_t *req, const char *name, const char *cstr, uint32_t len, zval *track_vars_array TSRMLS_DC) diff --git a/src/nxt_python_wsgi.c b/src/nxt_python_wsgi.c index ea8b6903..14211f3f 100644 --- a/src/nxt_python_wsgi.c +++ b/src/nxt_python_wsgi.c @@ -89,8 +89,12 @@ static PyObject *nxt_py_write(PyObject *self, PyObject *args); static void nxt_py_input_dealloc(nxt_py_input_t *self); static PyObject *nxt_py_input_read(nxt_py_input_t *self, PyObject *args); static PyObject *nxt_py_input_readline(nxt_py_input_t *self, PyObject *args); +static PyObject *nxt_py_input_getline(nxt_python_run_ctx_t *ctx, size_t size); static PyObject *nxt_py_input_readlines(nxt_py_input_t *self, PyObject *args); +static PyObject *nxt_py_input_iter(PyObject *self); +static PyObject *nxt_py_input_next(PyObject *self); + static void nxt_python_print_exception(void); static int nxt_python_write(nxt_python_run_ctx_t *ctx, PyObject *bytes); @@ -142,6 +146,8 @@ static PyTypeObject nxt_py_input_type = { .tp_dealloc = (destructor) nxt_py_input_dealloc, .tp_flags = Py_TPFLAGS_DEFAULT, .tp_doc = "unit input object.", + .tp_iter = nxt_py_input_iter, + .tp_iternext = nxt_py_input_next, .tp_methods = nxt_py_input_methods, }; @@ -1229,14 +1235,151 @@ nxt_py_input_read(nxt_py_input_t *self, PyObject *args) static PyObject * nxt_py_input_readline(nxt_py_input_t *self, PyObject *args) { - return PyBytes_FromStringAndSize("", 0); + ssize_t ssize; + PyObject *obj; + Py_ssize_t n; + nxt_python_run_ctx_t *ctx; + + ctx = nxt_python_run_ctx; + if (nxt_slow_path(ctx == NULL)) { + return PyErr_Format(PyExc_RuntimeError, + "wsgi.input.readline() is called " + "outside of WSGI request processing"); + } + + n = PyTuple_GET_SIZE(args); + + if (n > 0) { + if (n != 1) { + return PyErr_Format(PyExc_TypeError, "invalid number of arguments"); + } + + obj = PyTuple_GET_ITEM(args, 0); + + ssize = PyNumber_AsSsize_t(obj, PyExc_OverflowError); + + if (nxt_fast_path(ssize > 0)) { + return nxt_py_input_getline(ctx, ssize); + } + + if (ssize == 0) { + return PyBytes_FromStringAndSize("", 0); + } + + if (ssize != -1) { + return PyErr_Format(PyExc_ValueError, + "the read line size cannot be zero or less"); + } + + if (PyErr_Occurred()) { + return NULL; + } + } + + return nxt_py_input_getline(ctx, SSIZE_MAX); +} + + +static PyObject * +nxt_py_input_getline(nxt_python_run_ctx_t *ctx, size_t size) +{ + void *buf; + ssize_t res; + PyObject *content; + + res = nxt_unit_request_readline_size(ctx->req, size); + if (nxt_slow_path(res < 0)) { + return NULL; + } + + if (res == 0) { + return PyBytes_FromStringAndSize("", 0); + } + + content = PyBytes_FromStringAndSize(NULL, res); + if (nxt_slow_path(content == NULL)) { + return NULL; + } + + buf = PyBytes_AS_STRING(content); + + res = nxt_unit_request_read(ctx->req, buf, res); + + return content; } static PyObject * nxt_py_input_readlines(nxt_py_input_t *self, PyObject *args) { - return PyList_New(0); + PyObject *res; + nxt_python_run_ctx_t *ctx; + + ctx = nxt_python_run_ctx; + if (nxt_slow_path(ctx == NULL)) { + return PyErr_Format(PyExc_RuntimeError, + "wsgi.input.readlines() is called " + "outside of WSGI request processing"); + } + + res = PyList_New(0); + if (nxt_slow_path(res == NULL)) { + return NULL; + } + + for ( ;; ) { + PyObject *line = nxt_py_input_getline(ctx, SSIZE_MAX); + if (nxt_slow_path(line == NULL)) { + Py_DECREF(res); + return NULL; + } + + if (PyBytes_GET_SIZE(line) == 0) { + Py_DECREF(line); + return res; + } + + PyList_Append(res, line); + Py_DECREF(line); + } + + return res; +} + + +static PyObject * +nxt_py_input_iter(PyObject *self) +{ + Py_INCREF(self); + return self; +} + + +static PyObject * +nxt_py_input_next(PyObject *self) +{ + PyObject *line; + nxt_python_run_ctx_t *ctx; + + ctx = nxt_python_run_ctx; + if (nxt_slow_path(ctx == NULL)) { + return PyErr_Format(PyExc_RuntimeError, + "wsgi.input.next() is called " + "outside of WSGI request processing"); + } + + line = nxt_py_input_getline(ctx, SSIZE_MAX); + if (nxt_slow_path(line == NULL)) { + return NULL; + } + + if (PyBytes_GET_SIZE(line) == 0) { + Py_DECREF(line); + PyErr_SetNone(PyExc_StopIteration); + return NULL; + } + + return line; } diff --git a/src/nxt_router.c b/src/nxt_router.c index 3ff048c5..a913284c 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -1361,6 +1361,12 @@ static nxt_conf_map_t nxt_router_http_conf[] = { NXT_CONF_MAP_MSEC, offsetof(nxt_socket_conf_t, send_timeout), }, + + { + nxt_string("body_temp_path"), + NXT_CONF_MAP_STR, + offsetof(nxt_socket_conf_t, body_temp_path), + }, }; @@ -1397,6 +1403,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, nxt_int_t ret; nxt_str_t name, path; nxt_app_t *app, *prev; + nxt_str_t *t; nxt_router_t *router; nxt_app_joint_t *app_joint; nxt_conf_value_t *conf, *http, *value, *websocket; @@ -1634,6 +1641,11 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, tmcf->router_conf->routes = routes; } + ret = nxt_upstreams_create(task, tmcf, conf); + if (nxt_slow_path(ret != NXT_OK)) { + return ret; + } + http = nxt_conf_get_path(conf, &http_path); #if 0 if (http == NULL) { @@ -1693,6 +1705,8 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, skcf->websocket_conf.read_timeout = 60 * 1000; skcf->websocket_conf.keepalive_interval = 30 * 1000; + nxt_str_null(&skcf->body_temp_path); + if (http != NULL) { ret = nxt_conf_map_object(mp, http, nxt_router_http_conf, nxt_nitems(nxt_router_http_conf), @@ -1714,6 +1728,13 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, } } + t = &skcf->body_temp_path; + + if (t->length == 0) { + t->start = (u_char *) task->thread->runtime->tmp; + t->length = nxt_strlen(t->start); + } + #if (NXT_TLS) value = nxt_conf_get_path(listener, &certificate_path); @@ -1904,8 +1925,9 @@ nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name) } -nxt_app_t * -nxt_router_listener_application(nxt_router_temp_conf_t *tmcf, nxt_str_t *name) +void +nxt_router_listener_application(nxt_router_temp_conf_t *tmcf, nxt_str_t *name, + nxt_http_action_t *action) { nxt_app_t *app; @@ -1915,7 +1937,8 @@ nxt_router_listener_application(nxt_router_temp_conf_t *tmcf, nxt_str_t *name) app = nxt_router_app_find(&tmcf->previous, name); } - return app; + action->u.application = app; + action->handler = nxt_http_application_handler; } @@ -2524,6 +2547,7 @@ nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, nxt_work_handler_t handler) { + nxt_int_t ret; nxt_joint_job_t *job; nxt_queue_link_t *qlk; nxt_socket_conf_t *skcf; @@ -2557,6 +2581,11 @@ nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf, job->work.data = joint; + ret = nxt_upstreams_joint_create(tmcf, &joint->upstreams); + if (nxt_slow_path(ret != NXT_OK)) { + return ret; + } + joint->count = 1; skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); @@ -4152,6 +4181,13 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, state.req_app_link = re_ra; state.app = app; + /* + * Need to increment use count "in advance" because + * nxt_router_port_select() will remove re_ra from lists + * and decrement use count. + */ + nxt_request_app_link_inc_use(re_ra); + nxt_router_port_select(task, &state); goto re_ra_cancelled; @@ -4217,16 +4253,18 @@ re_ra_cancelled: if (re_ra != NULL) { if (nxt_router_port_post_select(task, &state) == NXT_OK) { /* - * There should be call nxt_request_app_link_inc_use(re_ra), - * because of one more link in the queue. - * Corresponding decrement is in nxt_router_app_process_request(). + * Reference counter already incremented above, this will + * keep re_ra while nxt_router_app_process_request() + * task is in queue. Reference counter decreased in + * nxt_router_app_process_request() after processing. */ - nxt_request_app_link_inc_use(re_ra); - nxt_work_queue_add(&task->thread->engine->fast_work_queue, nxt_router_app_process_request, &task->thread->engine->task, app, re_ra); + + } else { + nxt_request_app_link_use(task, re_ra, -1); } } @@ -4234,15 +4272,14 @@ re_ra_cancelled: /* * There should be call nxt_request_app_link_inc_use(req_app_link), * because of one more link in the queue. But one link was - * recently removed from app->requests link. + * recently removed from app->requests linked list. + * Corresponding decrement is in nxt_router_app_process_request(). */ nxt_work_queue_add(&task->thread->engine->fast_work_queue, nxt_router_app_process_request, &task->thread->engine->task, app, req_app_link); - /* ... skip nxt_request_app_link_use(task, req_app_link, -1) too. */ - goto adjust_use; } @@ -4684,6 +4721,21 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r, return; } + /* + * At this point we have request req_rpc_data allocated and registered + * in port handlers. Need to fixup request memory pool. Counterpart + * release will be called via following call chain: + * nxt_request_rpc_data_unlink() -> + * nxt_router_http_request_done() -> + * nxt_router_http_request_release() + */ + nxt_mp_retain(r->mem_pool); + + r->timer.task = &engine->task; + r->timer.work_queue = &engine->fast_work_queue; + r->timer.log = engine->task.log; + r->timer.bias = NXT_TIMER_DEFAULT_BIAS; + req_rpc_data->stream = nxt_port_rpc_ex_stream(req_rpc_data); req_rpc_data->app = app; @@ -4722,7 +4774,8 @@ static void nxt_router_app_prepare_request(nxt_task_t *task, nxt_request_app_link_t *req_app_link) { - nxt_buf_t *buf; + nxt_fd_t fd; + nxt_buf_t *buf, *body; nxt_int_t res; nxt_port_t *port, *c_port, *reply_port; nxt_apr_action_t apr_action; @@ -4781,8 +4834,14 @@ nxt_router_app_prepare_request(nxt_task_t *task, goto release_port; } - res = nxt_port_socket_twrite(task, port, NXT_PORT_MSG_REQ_HEADERS, - -1, req_app_link->stream, reply_port->id, buf, + body = req_app_link->request->body; + fd = (body != NULL && nxt_buf_is_file(body)) ? body->file->fd : -1; + + res = nxt_port_socket_twrite(task, port, + NXT_PORT_MSG_REQ_HEADERS + | NXT_PORT_MSG_CLOSE_FD, + fd, + req_app_link->stream, reply_port->id, buf, &req_app_link->msg_info.tracking); if (nxt_slow_path(res != NXT_OK)) { @@ -4791,6 +4850,10 @@ nxt_router_app_prepare_request(nxt_task_t *task, goto release_port; } + if (fd != -1) { + body->file->fd = -1; + } + release_port: nxt_router_app_port_release(task, port, apr_action); @@ -5115,6 +5178,10 @@ nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r, } } + if (r->body != NULL && nxt_buf_is_file(r->body)) { + lseek(r->body->file->fd, 0, SEEK_SET); + } + return out; } @@ -5185,6 +5252,13 @@ nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data) state.req_app_link = pending_ra; state.app = app; + /* + * Need to increment use count "in advance" because + * nxt_router_port_select() will remove pending_ra from lists + * and decrement use count. + */ + nxt_request_app_link_inc_use(pending_ra); + nxt_router_port_select(task, &state); } else { @@ -5196,7 +5270,19 @@ nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data) if (pending_ra != NULL) { if (nxt_router_port_post_select(task, &state) == NXT_OK) { - nxt_router_app_prepare_request(task, pending_ra); + /* + * Reference counter already incremented above, this will + * keep pending_ra while nxt_router_app_process_request() + * task is in queue. Reference counter decreased in + * nxt_router_app_process_request() after processing. + */ + + nxt_work_queue_add(&task->thread->engine->fast_work_queue, + nxt_router_app_process_request, + &task->thread->engine->task, app, pending_ra); + + } else { + nxt_request_app_link_use(task, pending_ra, -1); } } diff --git a/src/nxt_router.h b/src/nxt_router.h index 1517c14b..08142ce3 100644 --- a/src/nxt_router.h +++ b/src/nxt_router.h @@ -18,6 +18,8 @@ typedef struct nxt_http_request_s nxt_http_request_t; typedef struct nxt_http_action_s nxt_http_action_t; typedef struct nxt_http_routes_s nxt_http_routes_t; +typedef struct nxt_upstream_s nxt_upstream_t; +typedef struct nxt_upstreams_s nxt_upstreams_t; typedef struct nxt_router_access_log_s nxt_router_access_log_t; @@ -43,6 +45,7 @@ typedef struct { nxt_router_t *router; nxt_http_routes_t *routes; + nxt_upstreams_t *upstreams; nxt_lvlhsh_t mtypes_hash; @@ -184,6 +187,8 @@ typedef struct { nxt_websocket_conf_t websocket_conf; + nxt_str_t body_temp_path; + #if (NXT_TLS) nxt_tls_conf_t *tls; #endif @@ -196,6 +201,8 @@ typedef struct { nxt_event_engine_t *engine; nxt_socket_conf_t *socket_conf; + nxt_upstream_t **upstreams; + /* Modules configuraitons. */ } nxt_socket_conf_joint_t; @@ -218,8 +225,8 @@ void nxt_router_access_log_reopen_handler(nxt_task_t *task, void nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r, nxt_app_t *app); void nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port); -nxt_app_t *nxt_router_listener_application(nxt_router_temp_conf_t *tmcf, - nxt_str_t *name); +void nxt_router_listener_application(nxt_router_temp_conf_t *tmcf, + nxt_str_t *name, nxt_http_action_t *action); void nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i); void nxt_router_listen_event_release(nxt_task_t *task, nxt_listen_event_t *lev, nxt_socket_conf_joint_t *joint); diff --git a/src/nxt_runtime.c b/src/nxt_runtime.c index 80b25c1b..f6d80ccb 100644 --- a/src/nxt_runtime.c +++ b/src/nxt_runtime.c @@ -693,6 +693,7 @@ nxt_runtime_conf_init(nxt_task_t *task, nxt_runtime_t *rt) rt->modules = NXT_MODULES; rt->state = NXT_STATE; rt->control = NXT_CONTROL_SOCK; + rt->tmp = NXT_TMP; nxt_memzero(&rt->capabilities, sizeof(nxt_capabilities_t)); @@ -835,6 +836,7 @@ nxt_runtime_conf_read_cmd(nxt_task_t *task, nxt_runtime_t *rt) static const char no_modules[] = "option \"--modules\" requires directory\n"; static const char no_state[] = "option \"--state\" requires directory\n"; + static const char no_tmp[] = "option \"--tmp\" requires directory\n"; static const char help[] = "\n" @@ -859,6 +861,9 @@ nxt_runtime_conf_read_cmd(nxt_task_t *task, nxt_runtime_t *rt) " --state DIRECTORY set state directory name\n" " default: \"" NXT_STATE "\"\n" "\n" + " --tmp DIRECTORY set tmp directory name\n" + " default: \"" NXT_TMP "\"\n" + "\n" " --user USER set non-privileged processes to run" " as specified user\n" " default: \"" NXT_USER "\"\n" @@ -966,6 +971,19 @@ nxt_runtime_conf_read_cmd(nxt_task_t *task, nxt_runtime_t *rt) continue; } + if (nxt_strcmp(p, "--tmp") == 0) { + if (*argv == NULL) { + write(STDERR_FILENO, no_tmp, nxt_length(no_tmp)); + return NXT_ERROR; + } + + p = *argv++; + + rt->tmp = p; + + continue; + } + if (nxt_strcmp(p, "--no-daemon") == 0) { rt->daemon = 0; continue; diff --git a/src/nxt_runtime.h b/src/nxt_runtime.h index f8d19ec6..a364c38c 100644 --- a/src/nxt_runtime.h +++ b/src/nxt_runtime.h @@ -68,6 +68,7 @@ struct nxt_runtime_s { const char *conf; const char *conf_tmp; const char *control; + const char *tmp; nxt_str_t certs; diff --git a/src/nxt_unit.c b/src/nxt_unit.c index 7c3d945c..7a4124fb 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -76,6 +76,8 @@ static nxt_unit_read_buf_t *nxt_unit_read_buf_get_impl( nxt_unit_ctx_impl_t *ctx_impl); static void nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf); +static nxt_unit_mmap_buf_t *nxt_unit_request_preread( + nxt_unit_request_info_t *req, size_t size); static ssize_t nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst, size_t size); static nxt_port_mmap_header_t *nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, @@ -961,6 +963,9 @@ nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) req_impl->incoming_buf->prev = &req_impl->incoming_buf; recv_msg->incoming_buf = NULL; + req->content_fd = recv_msg->fd; + recv_msg->fd = -1; + req->response_max_fields = 0; req_impl->state = NXT_UNIT_RS_START; req_impl->websocket = 0; @@ -1178,6 +1183,12 @@ nxt_unit_request_info_release(nxt_unit_request_info_t *req) nxt_unit_mmap_buf_free(req_impl->incoming_buf); } + if (req->content_fd != -1) { + close(req->content_fd); + + req->content_fd = -1; + } + /* * Process release should go after buffers release to guarantee mmap * existence. @@ -2423,8 +2434,144 @@ nxt_unit_response_write_cb(nxt_unit_request_info_t *req, ssize_t nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, size_t size) { - return nxt_unit_buf_read(&req->content_buf, &req->content_length, - dst, size); + ssize_t buf_res, res; + + buf_res = nxt_unit_buf_read(&req->content_buf, &req->content_length, + dst, size); + + if (buf_res < (ssize_t) size && req->content_fd != -1) { + res = read(req->content_fd, dst, size); + if (res < 0) { + nxt_unit_req_alert(req, "failed to read content: %s (%d)", + strerror(errno), errno); + + return res; + } + + if (res < (ssize_t) size) { + close(req->content_fd); + + req->content_fd = -1; + } + + req->content_length -= res; + size -= res; + + dst = nxt_pointer_to(dst, res); + + } else { + res = 0; + } + + return buf_res + res; +} + + +ssize_t +nxt_unit_request_readline_size(nxt_unit_request_info_t *req, size_t max_size) +{ + char *p; + size_t l_size, b_size; + nxt_unit_buf_t *b; + nxt_unit_mmap_buf_t *mmap_buf, *preread_buf; + + if (req->content_length == 0) { + return 0; + } + + l_size = 0; + + b = req->content_buf; + + while (b != NULL) { + b_size = b->end - b->free; + p = memchr(b->free, '\n', b_size); + + if (p != NULL) { + p++; + l_size += p - b->free; + break; + } + + l_size += b_size; + + if (max_size <= l_size) { + break; + } + + mmap_buf = nxt_container_of(b, nxt_unit_mmap_buf_t, buf); + if (mmap_buf->next == NULL + && req->content_fd != -1 + && l_size < req->content_length) + { + preread_buf = nxt_unit_request_preread(req, 16384); + if (nxt_slow_path(preread_buf == NULL)) { + return -1; + } + + nxt_unit_mmap_buf_insert(&mmap_buf->next, preread_buf); + } + + b = nxt_unit_buf_next(b); + } + + return nxt_min(max_size, l_size); +} + + +static nxt_unit_mmap_buf_t * +nxt_unit_request_preread(nxt_unit_request_info_t *req, size_t size) +{ + ssize_t res; + nxt_unit_mmap_buf_t *mmap_buf; + + if (req->content_fd == -1) { + nxt_unit_req_alert(req, "preread: content_fd == -1"); + return NULL; + } + + mmap_buf = nxt_unit_mmap_buf_get(req->ctx); + if (nxt_slow_path(mmap_buf == NULL)) { + nxt_unit_req_alert(req, "preread: failed to allocate buf"); + return NULL; + } + + mmap_buf->free_ptr = malloc(size); + if (nxt_slow_path(mmap_buf->free_ptr == NULL)) { + nxt_unit_req_alert(req, "preread: failed to allocate buf memory"); + nxt_unit_mmap_buf_release(mmap_buf); + return NULL; + } + + mmap_buf->plain_ptr = mmap_buf->free_ptr; + + mmap_buf->hdr = NULL; + mmap_buf->buf.start = mmap_buf->free_ptr; + mmap_buf->buf.free = mmap_buf->buf.start; + mmap_buf->buf.end = mmap_buf->buf.start + size; + mmap_buf->process = NULL; + + res = read(req->content_fd, mmap_buf->free_ptr, size); + if (res < 0) { + nxt_unit_req_alert(req, "failed to read content: %s (%d)", + strerror(errno), errno); + + nxt_unit_mmap_buf_free(mmap_buf); + + return NULL; + } + + if (res < (ssize_t) size) { + close(req->content_fd); + + req->content_fd = -1; + } + + nxt_unit_req_debug(req, "preread: read %d", (int) res); + + mmap_buf->buf.end = mmap_buf->buf.free + res; + + return mmap_buf; } @@ -2433,14 +2580,17 @@ nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst, size_t size) { u_char *p; size_t rest, copy, read; - nxt_unit_buf_t *buf; + nxt_unit_buf_t *buf, *last_buf; p = dst; rest = size; buf = *b; + last_buf = buf; while (buf != NULL) { + last_buf = buf; + copy = buf->end - buf->free; copy = nxt_min(rest, copy); @@ -2460,7 +2610,7 @@ nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst, size_t size) buf = nxt_unit_buf_next(buf); } - *b = buf; + *b = last_buf; read = size - rest; diff --git a/src/nxt_unit.h b/src/nxt_unit.h index c8aaa124..900f3ac2 100644 --- a/src/nxt_unit.h +++ b/src/nxt_unit.h @@ -103,6 +103,7 @@ struct nxt_unit_request_info_s { nxt_unit_buf_t *content_buf; uint64_t content_length; + int content_fd; void *data; }; @@ -335,6 +336,9 @@ int nxt_unit_response_write_cb(nxt_unit_request_info_t *req, ssize_t nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, size_t size); +ssize_t nxt_unit_request_readline_size(nxt_unit_request_info_t *req, + size_t max_size); + void nxt_unit_request_done(nxt_unit_request_info_t *req, int rc); diff --git a/src/nxt_upstream.c b/src/nxt_upstream.c index e1615120..66b6619a 100644 --- a/src/nxt_upstream.c +++ b/src/nxt_upstream.c @@ -4,40 +4,137 @@ * Copyright (C) NGINX, Inc. */ -#include <nxt_main.h> +#include <nxt_router.h> +#include <nxt_http.h> +#include <nxt_upstream.h> -typedef struct { - void (*peer_get)(nxt_upstream_peer_t *up); - void (*peer_free)(nxt_upstream_peer_t *up); -} nxt_upstream_name_t; +static nxt_http_action_t *nxt_upstream_handler(nxt_task_t *task, + nxt_http_request_t *r, nxt_http_action_t *action); -static const nxt_upstream_name_t nxt_upstream_names[] = { +nxt_int_t +nxt_upstreams_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, + nxt_conf_value_t *conf) +{ + size_t size; + uint32_t i, n, next; + nxt_mp_t *mp; + nxt_int_t ret; + nxt_str_t name, *string; + nxt_upstreams_t *upstreams; + nxt_conf_value_t *upstreams_conf, *upcf; + + static nxt_str_t upstreams_name = nxt_string("upstreams"); + + upstreams_conf = nxt_conf_get_object_member(conf, &upstreams_name, NULL); + + if (upstreams_conf == NULL) { + return NXT_OK; + } + + n = nxt_conf_object_members_count(upstreams_conf); + + if (n == 0) { + return NXT_OK; + } + + mp = tmcf->router_conf->mem_pool; + size = sizeof(nxt_upstreams_t) + n * sizeof(nxt_upstream_t); + + upstreams = nxt_mp_zalloc(mp, size); + if (nxt_slow_path(upstreams == NULL)) { + return NXT_ERROR; + } + + upstreams->items = n; + next = 0; + + for (i = 0; i < n; i++) { + upcf = nxt_conf_next_object_member(upstreams_conf, &name, &next); + + string = nxt_str_dup(mp, &upstreams->upstream[i].name, &name); + if (nxt_slow_path(string == NULL)) { + return NXT_ERROR; + } + + ret = nxt_upstream_round_robin_create(task, tmcf, upcf, + &upstreams->upstream[i]); + if (nxt_slow_path(ret != NXT_OK)) { + return NXT_ERROR; + } + } + + tmcf->router_conf->upstreams = upstreams; - { "round_robin", &nxt_upstream_round_robin }, -}; + return NXT_OK; +} void -nxt_upstream_create(nxt_upstream_peer_t *up) +nxt_upstream_find(nxt_upstreams_t *upstreams, nxt_str_t *name, + nxt_http_action_t *action) { - /* TODO: dynamic balancer add & lvlhsh */ - nxt_upstream_names[0].create(up); + uint32_t i, n; + nxt_upstream_t *upstream; + + upstream = &upstreams->upstream[0]; + n = upstreams->items; + + for (i = 0; i < n; i++) { + if (nxt_strstr_eq(&upstream[i].name, name)) { + action->u.upstream_number = i; + action->handler = nxt_upstream_handler; + + return; + } + } } -void -nxt_upstream_peer(nxt_upstream_peer_t *up) +nxt_int_t +nxt_upstreams_joint_create(nxt_router_temp_conf_t *tmcf, + nxt_upstream_t ***upstream_joint) { - nxt_upstream_t *u; + uint32_t i, n; + nxt_upstream_t *u, **up; + nxt_upstreams_t *upstreams; + nxt_router_conf_t *router_conf; + + router_conf = tmcf->router_conf; + upstreams = router_conf->upstreams; + + if (upstreams == NULL) { + *upstream_joint = NULL; + return NXT_OK; + } - u = up->upstream; + n = upstreams->items; - if (u != NULL) { - u->peer_get(up); - return; + up = nxt_mp_zalloc(router_conf->mem_pool, n * sizeof(nxt_upstream_t *)); + if (nxt_slow_path(up == NULL)) { + return NXT_ERROR; } - nxt_upstream_create(up); + u = &upstreams->upstream[0]; + + for (i = 0; i < n; i++) { + up[i] = u[i].proto->joint_create(tmcf, &u[i]); + if (nxt_slow_path(up[i] == NULL)) { + return NXT_ERROR; + } + } + + *upstream_joint = up; + + return NXT_OK; +} + + +static nxt_http_action_t * +nxt_upstream_handler(nxt_task_t *task, nxt_http_request_t *r, + nxt_http_action_t *action) +{ + return nxt_upstream_proxy_handler(task, r, + r->conf->upstreams[action->u.upstream_number]); } diff --git a/src/nxt_upstream.h b/src/nxt_upstream.h index d1fca2a5..afc53774 100644 --- a/src/nxt_upstream.h +++ b/src/nxt_upstream.h @@ -8,40 +8,74 @@ #define _NXT_UPSTREAM_H_INCLUDED_ -typedef struct nxt_upstream_peer_s nxt_upstream_peer_t; +typedef struct nxt_upstream_proxy_s nxt_upstream_proxy_t; +typedef struct nxt_upstream_round_robin_s nxt_upstream_round_robin_t; +typedef struct nxt_upstream_round_robin_server_s + nxt_upstream_round_robin_server_t; -struct nxt_upstream_peer_s { - /* STUB */ - void *upstream; - void *data; - /**/ +typedef void (*nxt_upstream_peer_ready_t)(nxt_task_t *task, + nxt_upstream_server_t *us); +typedef void (*nxt_upstream_peer_error_t)(nxt_task_t *task, + nxt_upstream_server_t *us); - nxt_sockaddr_t *sockaddr; - nxt_nsec_t delay; - uint32_t tries; - in_port_t port; +typedef struct { + nxt_upstream_peer_ready_t ready; + nxt_upstream_peer_error_t error; +} nxt_upstream_peer_state_t; - nxt_str_t addr; - nxt_mp_t *mem_pool; - void (*ready_handler)(nxt_task_t *task, nxt_upstream_peer_t *up); - void (*protocol_handler)(nxt_upstream_source_t *us); -}; +typedef nxt_upstream_t *(*nxt_upstream_joint_create_t)( + nxt_router_temp_conf_t *tmcf, nxt_upstream_t *upstream); +typedef void (*nxt_upstream_server_get_t)(nxt_task_t *task, + nxt_upstream_server_t *us); typedef struct { - void (*ready_handler)(void *data); - nxt_work_handler_t completion_handler; - nxt_work_handler_t error_handler; -} nxt_upstream_state_t; + nxt_upstream_joint_create_t joint_create; + nxt_upstream_server_get_t get; +} nxt_upstream_server_proto_t; + + +struct nxt_upstream_s { + const nxt_upstream_server_proto_t *proto; + + union { + nxt_upstream_proxy_t *proxy; + nxt_upstream_round_robin_t *round_robin; + } type; + + nxt_str_t name; +}; + + +struct nxt_upstreams_s { + uint32_t items; + nxt_upstream_t upstream[0]; +}; + + +struct nxt_upstream_server_s { + nxt_sockaddr_t *sockaddr; + const nxt_upstream_peer_state_t *state; + nxt_upstream_t *upstream; + + uint8_t protocol; + + union { + nxt_upstream_round_robin_server_t *round_robin; + } server; + + union { + nxt_http_peer_t *http; + } peer; +}; -/* STUB */ -NXT_EXPORT void nxt_upstream_round_robin_peer(nxt_task_t *task, - nxt_upstream_peer_t *up); -/**/ +nxt_int_t nxt_upstream_round_robin_create(nxt_task_t *task, + nxt_router_temp_conf_t *tmcf, nxt_conf_value_t *upstream_conf, + nxt_upstream_t *upstream); #endif /* _NXT_UPSTREAM_H_INCLUDED_ */ diff --git a/src/nxt_upstream_round_robin.c b/src/nxt_upstream_round_robin.c index 09a3bce3..fd76ecb5 100644 --- a/src/nxt_upstream_round_robin.c +++ b/src/nxt_upstream_round_robin.c @@ -4,197 +4,188 @@ * Copyright (C) NGINX, Inc. */ -#include <nxt_main.h> +#include <nxt_router.h> +#include <nxt_http.h> +#include <nxt_upstream.h> -typedef struct { - int32_t weight; - int32_t effective_weight; - int32_t current_weight; - uint32_t down; /* 1 bit */ - nxt_msec_t last_accessed; - nxt_sockaddr_t *sockaddr; -} nxt_upstream_round_robin_peer_t; +struct nxt_upstream_round_robin_server_s { + nxt_sockaddr_t *sockaddr; + int32_t current_weight; + int32_t effective_weight; + int32_t weight; -typedef struct { - nxt_uint_t npeers; - nxt_upstream_round_robin_peer_t *peers; - nxt_thread_spinlock_t lock; -} nxt_upstream_round_robin_t; + uint8_t protocol; +}; -static void nxt_upstream_round_robin_create(nxt_task_t *task, void *obj, - void *data); -static void nxt_upstream_round_robin_peer_error(nxt_task_t *task, void *obj, - void *data); -static void nxt_upstream_round_robin_get_peer(nxt_task_t *task, - nxt_upstream_peer_t *up); +struct nxt_upstream_round_robin_s { + uint32_t items; + nxt_upstream_round_robin_server_t server[0]; +}; -void -nxt_upstream_round_robin_peer(nxt_task_t *task, nxt_upstream_peer_t *up) -{ - nxt_job_sockaddr_parse_t *jbs; +static nxt_upstream_t *nxt_upstream_round_robin_joint_create( + nxt_router_temp_conf_t *tmcf, nxt_upstream_t *upstream); +static void nxt_upstream_round_robin_server_get(nxt_task_t *task, + nxt_upstream_server_t *us); - if (up->upstream != NULL) { - nxt_upstream_round_robin_get_peer(task, up); - } - jbs = nxt_job_create(up->mem_pool, sizeof(nxt_job_sockaddr_parse_t)); - if (nxt_slow_path(jbs == NULL)) { - up->ready_handler(task, up); - return; - } +static const nxt_upstream_server_proto_t nxt_upstream_round_robin_proto = { + .joint_create = nxt_upstream_round_robin_joint_create, + .get = nxt_upstream_round_robin_server_get, +}; - jbs->resolve.job.task = task; - jbs->resolve.job.data = up; - jbs->resolve.port = up->port; - jbs->resolve.log_level = NXT_LOG_ERR; - jbs->resolve.ready_handler = nxt_upstream_round_robin_create; - jbs->resolve.error_handler = nxt_upstream_round_robin_peer_error; - jbs->addr = up->addr; - nxt_job_sockaddr_parse(jbs); -} +static nxt_conf_map_t nxt_upstream_round_robin_server_conf[] = { + { + nxt_string("weight"), + NXT_CONF_MAP_INT32, + offsetof(nxt_upstream_round_robin_server_t, weight), + }, +}; -static void -nxt_upstream_round_robin_create(nxt_task_t *task, void *obj, void *data) +nxt_int_t +nxt_upstream_round_robin_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, + nxt_conf_value_t *upstream_conf, nxt_upstream_t *upstream) { - nxt_uint_t i; - nxt_sockaddr_t *sa; - nxt_upstream_peer_t *up; - nxt_job_sockaddr_parse_t *jbs; - nxt_upstream_round_robin_t *urr; - nxt_upstream_round_robin_peer_t *peer; + size_t size; + uint32_t i, n, next; + nxt_mp_t *mp; + nxt_str_t name; + nxt_sockaddr_t *sa; + nxt_conf_value_t *servers_conf, *srvcf; + nxt_upstream_round_robin_t *urr; - jbs = obj; - up = jbs->resolve.job.data; + static nxt_str_t servers = nxt_string("servers"); - urr = nxt_mp_zget(up->mem_pool, sizeof(nxt_upstream_round_robin_t)); - if (nxt_slow_path(urr == NULL)) { - goto fail; - } + mp = tmcf->router_conf->mem_pool; - urr->npeers = jbs->resolve.count; + servers_conf = nxt_conf_get_object_member(upstream_conf, &servers, NULL); + n = nxt_conf_object_members_count(servers_conf); - peer = nxt_mp_zget(up->mem_pool, - urr->npeers * sizeof(nxt_upstream_round_robin_peer_t)); - if (nxt_slow_path(peer == NULL)) { - goto fail; + size = sizeof(nxt_upstream_round_robin_t) + + n * sizeof(nxt_upstream_round_robin_server_t); + + urr = nxt_mp_zalloc(mp, size); + if (nxt_slow_path(urr == NULL)) { + return NXT_ERROR; } - urr->peers = peer; + urr->items = n; + next = 0; - for (i = 0; i < urr->npeers; i++) { - peer[i].weight = 1; - peer[i].effective_weight = 1; + for (i = 0; i < n; i++) { + srvcf = nxt_conf_next_object_member(servers_conf, &name, &next); - sa = jbs->resolve.sockaddrs[i]; + sa = nxt_sockaddr_parse(mp, &name); + if (nxt_slow_path(sa == NULL)) { + return NXT_ERROR; + } - /* STUB */ sa->type = SOCK_STREAM; - nxt_sockaddr_text(sa); + urr->server[i].sockaddr = sa; + urr->server[i].weight = 1; + urr->server[i].protocol = NXT_HTTP_PROTO_H1; - nxt_debug(task, "upstream peer: %*s", - (size_t) sa->length, nxt_sockaddr_start(sa)); + nxt_conf_map_object(mp, srvcf, nxt_upstream_round_robin_server_conf, + nxt_nitems(nxt_upstream_round_robin_server_conf), + &urr->server[i]); - /* TODO: memcpy to shared memory pool. */ - peer[i].sockaddr = sa; + urr->server[i].effective_weight = urr->server[i].weight; } - up->upstream = urr; + upstream->proto = &nxt_upstream_round_robin_proto; + upstream->type.round_robin = urr; - /* STUB */ - up->sockaddr = peer[0].sockaddr; + return NXT_OK; +} - nxt_job_destroy(task, jbs); - up->ready_handler(task, up); - //nxt_upstream_round_robin_get_peer(up); - return; +static nxt_upstream_t * +nxt_upstream_round_robin_joint_create(nxt_router_temp_conf_t *tmcf, + nxt_upstream_t *upstream) +{ + size_t size; + uint32_t i, n; + nxt_mp_t *mp; + nxt_upstream_t *u; + nxt_upstream_round_robin_t *urr, *urrcf; -fail: + mp = tmcf->router_conf->mem_pool; - nxt_job_destroy(task, jbs); + u = nxt_mp_alloc(mp, sizeof(nxt_upstream_t)); + if (nxt_slow_path(u == NULL)) { + return NULL; + } - up->ready_handler(task, up); -} + *u = *upstream; + urrcf = upstream->type.round_robin; -static void -nxt_upstream_round_robin_peer_error(nxt_task_t *task, void *obj, void *data) -{ - nxt_upstream_peer_t *up; - nxt_job_sockaddr_parse_t *jbs; + size = sizeof(nxt_upstream_round_robin_t) + + urrcf->items * sizeof(nxt_upstream_round_robin_server_t); - jbs = obj; - up = jbs->resolve.job.data; + urr = nxt_mp_alloc(mp, size); + if (nxt_slow_path(urr == NULL)) { + return NULL; + } - up->ready_handler(task, up); -} + u->type.round_robin = urr; + n = urrcf->items; + urr->items = n; -static void -nxt_upstream_round_robin_get_peer(nxt_task_t *task, nxt_upstream_peer_t *up) -{ - int32_t effective_weights; - nxt_uint_t i; - nxt_msec_t now; - nxt_upstream_round_robin_t *urr; - nxt_upstream_round_robin_peer_t *peer, *best; + for (i = 0; i < n; i++) { + urr->server[i] = urrcf->server[i]; + } - urr = up->upstream; + return u; +} - now = task->thread->engine->timers.now; - nxt_thread_spin_lock(&urr->lock); +static void +nxt_upstream_round_robin_server_get(nxt_task_t *task, nxt_upstream_server_t *us) +{ + int32_t total; + uint32_t i, n; + nxt_upstream_round_robin_t *round_robin; + nxt_upstream_round_robin_server_t *s, *best; best = NULL; - effective_weights = 0; - peer = urr->peers; + total = 0; - for (i = 0; i < urr->npeers; i++) { - - if (peer[i].down) { - continue; - } + round_robin = us->upstream->type.round_robin; -#if 0 - if (peer[i].max_fails != 0 && peer[i].fails >= peer->max_fails) { - good = peer[i].last_accessed + peer[i].fail_timeout; + s = round_robin->server; + n = round_robin->items; - if (nxt_msec_diff(now, peer[i].last_accessed) <= 0) { - continue; - } - } -#endif + for (i = 0; i < n; i++) { - peer[i].current_weight += peer[i].effective_weight; - effective_weights += peer[i].effective_weight; + s[i].current_weight += s[i].effective_weight; + total += s[i].effective_weight; - if (peer[i].effective_weight < peer[i].weight) { - peer[i].effective_weight++; + if (s[i].effective_weight < s[i].weight) { + s[i].effective_weight++; } - if (best == NULL || peer[i].current_weight > best->current_weight) { - best = &peer[i]; + if (best == NULL || s[i].current_weight > best->current_weight) { + best = &s[i]; } } - if (best != NULL) { - best->current_weight -= effective_weights; - best->last_accessed = now; - - up->sockaddr = best->sockaddr; - - } else { - up->sockaddr = NULL; + if (best == NULL) { + us->state->error(task, us); + return; } - nxt_thread_spin_unlock(&urr->lock); + best->current_weight -= total; + us->sockaddr = best->sockaddr; + us->protocol = best->protocol; + us->server.round_robin = best; - up->ready_handler(task, up); + us->state->ready(task, us); } diff --git a/src/ruby/nxt_ruby_stream_io.c b/src/ruby/nxt_ruby_stream_io.c index 7e8b3ce1..cc110035 100644 --- a/src/ruby/nxt_ruby_stream_io.c +++ b/src/ruby/nxt_ruby_stream_io.c @@ -88,9 +88,7 @@ static VALUE nxt_ruby_stream_io_gets(VALUE obj) { VALUE buf; - char *p; - size_t size, b_size; - nxt_unit_buf_t *b; + ssize_t res; nxt_ruby_run_ctx_t *run_ctx; nxt_unit_request_info_t *req; @@ -102,30 +100,20 @@ nxt_ruby_stream_io_gets(VALUE obj) return Qnil; } - size = 0; - - for (b = req->content_buf; b; b = nxt_unit_buf_next(b)) { - b_size = b->end - b->free; - p = memchr(b->free, '\n', b_size); - - if (p != NULL) { - p++; - size += p - b->free; - break; - } - - size += b_size; + res = nxt_unit_request_readline_size(req, SSIZE_MAX); + if (nxt_slow_path(res < 0)) { + return Qnil; } - buf = rb_str_buf_new(size); + buf = rb_str_buf_new(res); - if (buf == Qnil) { + if (nxt_slow_path(buf == Qnil)) { return Qnil; } - size = nxt_unit_request_read(req, RSTRING_PTR(buf), size); + res = nxt_unit_request_read(req, RSTRING_PTR(buf), res); - rb_str_set_len(buf, size); + rb_str_set_len(buf, res); return buf; } |