summaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/java/nginx/unit/Context.java2
-rw-r--r--src/java/nxt_jni_InputStream.c28
-rw-r--r--src/nodejs/unit-http/unit.cpp3
-rw-r--r--src/nxt_conf.c72
-rw-r--r--src/nxt_conf_validation.c202
-rw-r--r--src/nxt_conn_write.c96
-rw-r--r--src/nxt_controller.c7
-rw-r--r--src/nxt_h1proto.c182
-rw-r--r--src/nxt_http.h19
-rw-r--r--src/nxt_http_proxy.c124
-rw-r--r--src/nxt_http_request.c27
-rw-r--r--src/nxt_http_route.c150
-rw-r--r--src/nxt_http_static.c13
-rw-r--r--src/nxt_kqueue_engine.c4
-rw-r--r--src/nxt_main.h8
-rw-r--r--src/nxt_php_sapi.c429
-rw-r--r--src/nxt_python_wsgi.c147
-rw-r--r--src/nxt_router.c116
-rw-r--r--src/nxt_router.h11
-rw-r--r--src/nxt_runtime.c18
-rw-r--r--src/nxt_runtime.h1
-rw-r--r--src/nxt_unit.c158
-rw-r--r--src/nxt_unit.h4
-rw-r--r--src/nxt_upstream.c135
-rw-r--r--src/nxt_upstream.h80
-rw-r--r--src/nxt_upstream_round_robin.c257
-rw-r--r--src/ruby/nxt_ruby_stream_io.c28
27 files changed, 1815 insertions, 506 deletions
diff --git a/src/java/nginx/unit/Context.java b/src/java/nginx/unit/Context.java
index 6fcd6018..5f7ec22f 100644
--- a/src/java/nginx/unit/Context.java
+++ b/src/java/nginx/unit/Context.java
@@ -1517,7 +1517,7 @@ public class Context implements ServletContext, InitParams
|| ci.isAnnotation()
|| ci.isAbstract())
{
- return;
+ continue;
}
trace("loadInitializer: handles class: " + ci.getName());
diff --git a/src/java/nxt_jni_InputStream.c b/src/java/nxt_jni_InputStream.c
index b96ff742..3b74b0c1 100644
--- a/src/java/nxt_jni_InputStream.c
+++ b/src/java/nxt_jni_InputStream.c
@@ -90,40 +90,20 @@ static jint JNICALL
nxt_java_InputStream_readLine(JNIEnv *env, jclass cls,
jlong req_info_ptr, jarray out, jint off, jint len)
{
- char *p;
- jint size, b_size;
uint8_t *data;
ssize_t res;
- nxt_unit_buf_t *b;
nxt_unit_request_info_t *req;
req = nxt_jlong2ptr(req_info_ptr);
- size = 0;
-
- for (b = req->content_buf; b; b = nxt_unit_buf_next(b)) {
- b_size = b->end - b->free;
- p = memchr(b->free, '\n', b_size);
-
- if (p != NULL) {
- p++;
- size += p - b->free;
- break;
- }
+ data = (*env)->GetPrimitiveArrayCritical(env, out, NULL);
- size += b_size;
+ res = nxt_unit_request_readline_size(req, len);
- if (size >= len) {
- break;
- }
+ if (res > 0) {
+ res = nxt_unit_request_read(req, data + off, res);
}
- len = len < size ? len : size;
-
- data = (*env)->GetPrimitiveArrayCritical(env, out, NULL);
-
- res = nxt_unit_request_read(req, data + off, len);
-
nxt_unit_req_debug(req, "readLine '%.*s'", res, (char *) data + off);
(*env)->ReleasePrimitiveArrayCritical(env, out, data, 0);
diff --git a/src/nodejs/unit-http/unit.cpp b/src/nodejs/unit-http/unit.cpp
index 64e076c1..975174d4 100644
--- a/src/nodejs/unit-http/unit.cpp
+++ b/src/nodejs/unit-http/unit.cpp
@@ -812,8 +812,7 @@ Unit::response_write(napi_env env, napi_callback_info info)
/* TODO: will work only for utf8 content-type */
if (req->response_buf != NULL
- && (req->response_buf->end - req->response_buf->free)
- >= buf_len)
+ && req->response_buf->end >= req->response_buf->free + buf_len)
{
buf = req->response_buf;
diff --git a/src/nxt_conf.c b/src/nxt_conf.c
index 43820d2a..2a952c35 100644
--- a/src/nxt_conf.c
+++ b/src/nxt_conf.c
@@ -1244,15 +1244,74 @@ nxt_conf_json_parse(nxt_mp_t *mp, u_char *start, u_char *end,
static u_char *
nxt_conf_json_skip_space(u_char *start, u_char *end)
{
- u_char *p;
+ u_char *p, ch;
+
+ enum {
+ sw_normal = 0,
+ sw_after_slash,
+ sw_single_comment,
+ sw_multi_comment,
+ sw_after_asterisk,
+ } state;
+
+ state = sw_normal;
for (p = start; nxt_fast_path(p != end); p++) {
+ ch = *p;
+
+ switch (state) {
+
+ case sw_normal:
+ switch (ch) {
+ case ' ':
+ case '\t':
+ case '\n':
+ case '\r':
+ continue;
+ case '/':
+ state = sw_after_slash;
+ continue;
+ }
+
+ break;
+
+ case sw_after_slash:
+ switch (ch) {
+ case '/':
+ state = sw_single_comment;
+ continue;
+ case '*':
+ state = sw_multi_comment;
+ continue;
+ }
+
+ p--;
+ break;
+
+ case sw_single_comment:
+ if (ch == '\n') {
+ state = sw_normal;
+ }
- switch (*p) {
- case ' ':
- case '\t':
- case '\r':
- case '\n':
+ continue;
+
+ case sw_multi_comment:
+ if (ch == '*') {
+ state = sw_after_asterisk;
+ }
+
+ continue;
+
+ case sw_after_asterisk:
+ switch (ch) {
+ case '/':
+ state = sw_normal;
+ continue;
+ case '*':
+ continue;
+ }
+
+ state = sw_multi_comment;
continue;
}
@@ -1346,6 +1405,7 @@ nxt_conf_json_parse_value(nxt_mp_t *mp, nxt_conf_value_t *value, u_char *start,
case '{':
case '[':
case '"':
+ case '/':
return p;
}
}
diff --git a/src/nxt_conf_validation.c b/src/nxt_conf_validation.c
index 5a1f7839..3a3654bd 100644
--- a/src/nxt_conf_validation.c
+++ b/src/nxt_conf_validation.c
@@ -110,6 +110,12 @@ static nxt_int_t nxt_conf_vldt_java_classpath(nxt_conf_validation_t *vldt,
nxt_conf_value_t *value);
static nxt_int_t nxt_conf_vldt_java_option(nxt_conf_validation_t *vldt,
nxt_conf_value_t *value);
+static nxt_int_t nxt_conf_vldt_upstream(nxt_conf_validation_t *vldt,
+ nxt_str_t *name, nxt_conf_value_t *value);
+static nxt_int_t nxt_conf_vldt_server(nxt_conf_validation_t *vldt,
+ nxt_str_t *name, nxt_conf_value_t *value);
+static nxt_int_t nxt_conf_vldt_server_weight(nxt_conf_validation_t *vldt,
+ nxt_conf_value_t *value, void *data);
static nxt_int_t nxt_conf_vldt_isolation(nxt_conf_validation_t *vldt,
nxt_conf_value_t *value, void *data);
@@ -176,11 +182,21 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_http_members[] = {
NULL,
NULL },
+ { nxt_string("body_buffer_size"),
+ NXT_CONF_VLDT_INTEGER,
+ NULL,
+ NULL },
+
{ nxt_string("max_body_size"),
NXT_CONF_VLDT_INTEGER,
NULL,
NULL },
+ { nxt_string("body_temp_path"),
+ NXT_CONF_VLDT_STRING,
+ NULL,
+ NULL },
+
{ nxt_string("websocket"),
NXT_CONF_VLDT_OBJECT,
&nxt_conf_vldt_object,
@@ -226,6 +242,11 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_root_members[] = {
&nxt_conf_vldt_object_iterator,
(void *) &nxt_conf_vldt_app },
+ { nxt_string("upstreams"),
+ NXT_CONF_VLDT_OBJECT,
+ &nxt_conf_vldt_object_iterator,
+ (void *) &nxt_conf_vldt_upstream },
+
{ nxt_string("access_log"),
NXT_CONF_VLDT_STRING,
NULL,
@@ -323,17 +344,32 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_match_members[] = {
};
-static nxt_conf_vldt_object_t nxt_conf_vldt_action_members[] = {
+static nxt_conf_vldt_object_t nxt_conf_vldt_pass_action_members[] = {
{ nxt_string("pass"),
NXT_CONF_VLDT_STRING,
&nxt_conf_vldt_pass,
NULL },
+ NXT_CONF_VLDT_END
+};
+
+
+static nxt_conf_vldt_object_t nxt_conf_vldt_share_action_members[] = {
{ nxt_string("share"),
NXT_CONF_VLDT_STRING,
NULL,
NULL },
+ { nxt_string("fallback"),
+ NXT_CONF_VLDT_OBJECT,
+ &nxt_conf_vldt_action,
+ NULL },
+
+ NXT_CONF_VLDT_END
+};
+
+
+static nxt_conf_vldt_object_t nxt_conf_vldt_proxy_action_members[] = {
{ nxt_string("proxy"),
NXT_CONF_VLDT_STRING,
&nxt_conf_vldt_proxy,
@@ -667,6 +703,26 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_java_members[] = {
};
+static nxt_conf_vldt_object_t nxt_conf_vldt_upstream_members[] = {
+ { nxt_string("servers"),
+ NXT_CONF_VLDT_OBJECT,
+ &nxt_conf_vldt_object_iterator,
+ (void *) &nxt_conf_vldt_server },
+
+ NXT_CONF_VLDT_END
+};
+
+
+static nxt_conf_vldt_object_t nxt_conf_vldt_upstream_server_members[] = {
+ { nxt_string("weight"),
+ NXT_CONF_VLDT_INTEGER,
+ &nxt_conf_vldt_server_weight,
+ NULL },
+
+ NXT_CONF_VLDT_END
+};
+
+
nxt_int_t
nxt_conf_validate(nxt_conf_validation_t *vldt)
{
@@ -912,30 +968,45 @@ static nxt_int_t
nxt_conf_vldt_action(nxt_conf_validation_t *vldt, nxt_conf_value_t *value,
void *data)
{
- nxt_int_t ret;
- nxt_conf_value_t *pass_value, *share_value, *proxy_value;
+ nxt_uint_t i;
+ nxt_conf_value_t *action;
+ nxt_conf_vldt_object_t *members;
+
+ static struct {
+ nxt_str_t name;
+ nxt_conf_vldt_object_t *members;
+
+ } actions[] = {
+ { nxt_string("pass"), nxt_conf_vldt_pass_action_members },
+ { nxt_string("share"), nxt_conf_vldt_share_action_members },
+ { nxt_string("proxy"), nxt_conf_vldt_proxy_action_members },
+ };
- static nxt_str_t pass_str = nxt_string("pass");
- static nxt_str_t share_str = nxt_string("share");
- static nxt_str_t proxy_str = nxt_string("proxy");
+ members = NULL;
- ret = nxt_conf_vldt_object(vldt, value, nxt_conf_vldt_action_members);
+ for (i = 0; i < nxt_nitems(actions); i++) {
+ action = nxt_conf_get_object_member(value, &actions[i].name, NULL);
- if (ret != NXT_OK) {
- return ret;
- }
+ if (action == NULL) {
+ continue;
+ }
- pass_value = nxt_conf_get_object_member(value, &pass_str, NULL);
- share_value = nxt_conf_get_object_member(value, &share_str, NULL);
- proxy_value = nxt_conf_get_object_member(value, &proxy_str, NULL);
+ if (members != NULL) {
+ return nxt_conf_vldt_error(vldt, "The \"action\" object must have "
+ "just one of \"pass\", \"share\" or "
+ "\"proxy\" options set.");
+ }
- if (pass_value == NULL && share_value == NULL && proxy_value == NULL) {
+ members = actions[i].members;
+ }
+
+ if (members == NULL) {
return nxt_conf_vldt_error(vldt, "The \"action\" object must have "
- "either \"pass\" or \"share\" or "
+ "either \"pass\", \"share\", or "
"\"proxy\" option set.");
}
- return NXT_OK;
+ return nxt_conf_vldt_object(vldt, value, members);
}
@@ -987,6 +1058,27 @@ nxt_conf_vldt_pass(nxt_conf_validation_t *vldt, nxt_conf_value_t *value,
return NXT_OK;
}
+ if (nxt_str_eq(&first, "upstreams", 9)) {
+
+ if (second.length == 0) {
+ goto error;
+ }
+
+ value = nxt_conf_get_object_member(vldt->conf, &first, NULL);
+
+ if (nxt_slow_path(value == NULL)) {
+ goto error;
+ }
+
+ value = nxt_conf_get_object_member(value, &second, NULL);
+
+ if (nxt_slow_path(value == NULL)) {
+ goto error;
+ }
+
+ return NXT_OK;
+ }
+
if (nxt_str_eq(&first, "routes", 6)) {
value = nxt_conf_get_object_member(vldt->conf, &first, NULL);
@@ -1871,3 +1963,81 @@ nxt_conf_vldt_java_option(nxt_conf_validation_t *vldt, nxt_conf_value_t *value)
return NXT_OK;
}
+
+
+static nxt_int_t
+nxt_conf_vldt_upstream(nxt_conf_validation_t *vldt, nxt_str_t *name,
+ nxt_conf_value_t *value)
+{
+ nxt_int_t ret;
+ nxt_conf_value_t *conf;
+
+ static nxt_str_t servers = nxt_string("servers");
+
+ ret = nxt_conf_vldt_type(vldt, name, value, NXT_CONF_VLDT_OBJECT);
+
+ if (ret != NXT_OK) {
+ return ret;
+ }
+
+ ret = nxt_conf_vldt_object(vldt, value, nxt_conf_vldt_upstream_members);
+
+ if (ret != NXT_OK) {
+ return ret;
+ }
+
+ conf = nxt_conf_get_object_member(value, &servers, NULL);
+ if (conf == NULL) {
+ return nxt_conf_vldt_error(vldt, "The \"%V\" upstream must contain "
+ "\"servers\" object value.", name);
+ }
+
+ return NXT_OK;
+}
+
+
+static nxt_int_t
+nxt_conf_vldt_server(nxt_conf_validation_t *vldt, nxt_str_t *name,
+ nxt_conf_value_t *value)
+{
+ nxt_int_t ret;
+ nxt_sockaddr_t *sa;
+
+ ret = nxt_conf_vldt_type(vldt, name, value, NXT_CONF_VLDT_OBJECT);
+
+ if (ret != NXT_OK) {
+ return ret;
+ }
+
+ sa = nxt_sockaddr_parse(vldt->pool, name);
+
+ if (sa == NULL) {
+ return nxt_conf_vldt_error(vldt, "The \"%V\" is not valid "
+ "server address.", name);
+ }
+
+ return nxt_conf_vldt_object(vldt, value,
+ nxt_conf_vldt_upstream_server_members);
+}
+
+
+static nxt_int_t
+nxt_conf_vldt_server_weight(nxt_conf_validation_t *vldt,
+ nxt_conf_value_t *value, void *data)
+{
+ int64_t int_value;
+
+ int_value = nxt_conf_get_integer(value);
+
+ if (int_value <= 0) {
+ return nxt_conf_vldt_error(vldt, "The \"weight\" number must be "
+ "greater than 0.");
+ }
+
+ if (int_value > NXT_INT32_T_MAX) {
+ return nxt_conf_vldt_error(vldt, "The \"weight\" number must "
+ "not exceed %d.", NXT_INT32_T_MAX);
+ }
+
+ return NXT_OK;
+}
diff --git a/src/nxt_conn_write.c b/src/nxt_conn_write.c
index 298d8f75..d7a6a8da 100644
--- a/src/nxt_conn_write.c
+++ b/src/nxt_conn_write.c
@@ -9,6 +9,8 @@
static void nxt_conn_write_timer_handler(nxt_task_t *task, void *obj,
void *data);
+static ssize_t nxt_conn_io_sendfile(nxt_task_t *task, nxt_sendbuf_t *sb);
+static ssize_t nxt_sendfile(int fd, int s, off_t pos, size_t size);
void
@@ -170,10 +172,104 @@ nxt_conn_io_sendbuf(nxt_task_t *task, nxt_sendbuf_t *sb)
return 0;
}
+ if (niov == 0 && nxt_buf_is_file(sb->buf)) {
+ return nxt_conn_io_sendfile(task, sb);
+ }
+
return nxt_conn_io_writev(task, sb, iov, niov);
}
+static ssize_t
+nxt_conn_io_sendfile(nxt_task_t *task, nxt_sendbuf_t *sb)
+{
+ size_t size;
+ ssize_t n;
+ nxt_buf_t *b;
+ nxt_err_t err;
+
+ b = sb->buf;
+
+ for ( ;; ) {
+ size = b->file_end - b->file_pos;
+
+ n = nxt_sendfile(b->file->fd, sb->socket, b->file_pos, size);
+
+ err = (n == -1) ? nxt_errno : 0;
+
+ nxt_debug(task, "sendfile(%FD, %d, @%O, %uz): %z",
+ b->file->fd, sb->socket, b->file_pos, size, n);
+
+ if (n > 0) {
+ if (n < (ssize_t) size) {
+ sb->ready = 0;
+ }
+
+ return n;
+ }
+
+ if (nxt_slow_path(n == 0)) {
+ nxt_alert(task, "sendfile() reported that file was truncated at %O",
+ b->file_pos);
+
+ return NXT_ERROR;
+ }
+
+ /* n == -1 */
+
+ switch (err) {
+
+ case NXT_EAGAIN:
+ sb->ready = 0;
+ nxt_debug(task, "sendfile() %E", err);
+
+ return NXT_AGAIN;
+
+ case NXT_EINTR:
+ nxt_debug(task, "sendfile() %E", err);
+ continue;
+
+ default:
+ sb->error = err;
+ nxt_log(task, nxt_socket_error_level(err),
+ "sendfile(%FD, %d, @%O, %uz) failed %E",
+ b->file->fd, sb->socket, b->file_pos, size, err);
+
+ return NXT_ERROR;
+ }
+ }
+}
+
+
+static ssize_t
+nxt_sendfile(int fd, int s, off_t pos, size_t size)
+{
+ ssize_t res;
+
+#ifdef NXT_HAVE_MACOSX_SENDFILE
+ off_t sent = size;
+
+ int rc = sendfile(fd, s, pos, &sent, NULL, 0);
+
+ res = (rc == 0 || sent > 0) ? sent : -1;
+#endif
+
+#ifdef NXT_HAVE_FREEBSD_SENDFILE
+ off_t sent = 0;
+
+ int rc = sendfile(fd, s, pos, size, NULL, &sent, 0);
+
+ res = (rc == 0 || sent > 0) ? sent : -1;
+#endif
+
+#ifdef NXT_HAVE_LINUX_SENDFILE
+ res = sendfile(s, fd, &pos, size);
+#endif
+
+ return res;
+}
+
+
ssize_t
nxt_conn_io_writev(nxt_task_t *task, nxt_sendbuf_t *sb, struct iovec *iov,
nxt_uint_t niov)
diff --git a/src/nxt_controller.c b/src/nxt_controller.c
index 86ba1246..cc1ed534 100644
--- a/src/nxt_controller.c
+++ b/src/nxt_controller.c
@@ -989,6 +989,13 @@ nxt_controller_process_config(nxt_task_t *task, nxt_controller_request_t *req,
nxt_memzero(&error, sizeof(nxt_conf_json_error_t));
+ /* Skip UTF-8 BOM. */
+ if (nxt_buf_mem_used_size(mbuf) >= 3
+ && nxt_memcmp(mbuf->pos, "\xEF\xBB\xBF", 3) == 0)
+ {
+ mbuf->pos += 3;
+ }
+
value = nxt_conf_json_parse(mp, mbuf->pos, mbuf->free, &error);
if (value == NULL) {
diff --git a/src/nxt_h1proto.c b/src/nxt_h1proto.c
index 8ce57893..35918bd8 100644
--- a/src/nxt_h1proto.c
+++ b/src/nxt_h1proto.c
@@ -6,6 +6,7 @@
#include <nxt_router.h>
#include <nxt_http.h>
+#include <nxt_upstream.h>
#include <nxt_h1proto.h>
#include <nxt_websocket.h>
#include <nxt_websocket_header.h>
@@ -816,12 +817,16 @@ nxt_h1p_transfer_encoding(void *ctx, nxt_http_field_t *field, uintptr_t data)
static void
nxt_h1p_request_body_read(nxt_task_t *task, nxt_http_request_t *r)
{
- size_t size, body_length;
+ size_t size, body_length, body_buffer_size, body_rest;
+ ssize_t res;
+ nxt_str_t *tmp_path, tmp_name;
nxt_buf_t *in, *b;
nxt_conn_t *c;
nxt_h1proto_t *h1p;
nxt_http_status_t status;
+ static const nxt_str_t tmp_name_pattern = nxt_string("/req-XXXXXXXX");
+
h1p = r->proto.h1;
nxt_debug(task, "h1p request body read %O te:%d",
@@ -846,43 +851,97 @@ nxt_h1p_request_body_read(nxt_task_t *task, nxt_http_request_t *r)
goto ready;
}
- if (r->content_length_n > (nxt_off_t) r->conf->socket_conf->max_body_size) {
- status = NXT_HTTP_PAYLOAD_TOO_LARGE;
+ body_length = (size_t) r->content_length_n;
+
+ body_buffer_size = nxt_min(r->conf->socket_conf->body_buffer_size,
+ body_length);
+
+ if (body_length > body_buffer_size) {
+ tmp_path = &r->conf->socket_conf->body_temp_path;
+
+ tmp_name.length = tmp_path->length + tmp_name_pattern.length;
+
+ b = nxt_buf_file_alloc(r->mem_pool,
+ body_buffer_size + sizeof(nxt_file_t)
+ + tmp_name.length + 1, 0);
+
+ } else {
+ /* This initialization required for CentOS 6, gcc 4.4.7. */
+ tmp_path = NULL;
+ tmp_name.length = 0;
+
+ b = nxt_buf_mem_alloc(r->mem_pool, body_buffer_size, 0);
+ }
+
+ if (nxt_slow_path(b == NULL)) {
+ status = NXT_HTTP_INTERNAL_SERVER_ERROR;
goto error;
}
- body_length = (size_t) r->content_length_n;
+ r->body = b;
- b = r->body;
+ if (body_length > body_buffer_size) {
+ tmp_name.start = nxt_pointer_to(b->mem.start, sizeof(nxt_file_t));
+
+ memcpy(tmp_name.start, tmp_path->start, tmp_path->length);
+ memcpy(tmp_name.start + tmp_path->length, tmp_name_pattern.start,
+ tmp_name_pattern.length);
+ tmp_name.start[tmp_name.length] = '\0';
+
+ b->file = (nxt_file_t *) b->mem.start;
+ nxt_memzero(b->file, sizeof(nxt_file_t));
+ b->file->fd = -1;
+ b->file->size = body_length;
+
+ b->mem.start += sizeof(nxt_file_t) + tmp_name.length + 1;
+ b->mem.pos = b->mem.start;
+ b->mem.free = b->mem.start;
+
+ b->file->fd = mkstemp((char *) tmp_name.start);
+ if (nxt_slow_path(b->file->fd == -1)) {
+ nxt_log(task, NXT_LOG_ERR, "mkstemp() failed %E", nxt_errno);
- if (b == NULL) {
- b = nxt_buf_mem_alloc(r->mem_pool, body_length, 0);
- if (nxt_slow_path(b == NULL)) {
status = NXT_HTTP_INTERNAL_SERVER_ERROR;
goto error;
}
- r->body = b;
+ nxt_debug(task, "create body tmp file \"%V\", %d",
+ &tmp_name, b->file->fd);
+
+ unlink((char *) tmp_name.start);
}
+ body_rest = body_length;
+
in = h1p->conn->read;
size = nxt_buf_mem_used_size(&in->mem);
if (size != 0) {
- if (size > body_length) {
- size = body_length;
+ size = nxt_min(size, body_length);
+
+ if (nxt_buf_is_file(b)) {
+ res = nxt_fd_write(b->file->fd, in->mem.pos, size);
+ if (nxt_slow_path(res < (ssize_t) size)) {
+ status = NXT_HTTP_INTERNAL_SERVER_ERROR;
+ goto error;
+ }
+
+ b->file_end += size;
+
+ } else {
+ size = nxt_min(body_buffer_size, size);
+ b->mem.free = nxt_cpymem(b->mem.free, in->mem.pos, size);
+ body_buffer_size -= size;
}
- b->mem.free = nxt_cpymem(b->mem.free, in->mem.pos, size);
in->mem.pos += size;
+ body_rest -= size;
}
- size = nxt_buf_mem_free_size(&b->mem);
+ nxt_debug(task, "h1p body rest: %uz", body_rest);
- nxt_debug(task, "h1p body rest: %uz", size);
-
- if (size != 0) {
+ if (body_rest != 0) {
in->next = h1p->buffers;
h1p->buffers = in;
h1p->nbuffers++;
@@ -895,6 +954,13 @@ nxt_h1p_request_body_read(nxt_task_t *task, nxt_http_request_t *r)
return;
}
+ if (nxt_buf_is_file(b)) {
+ b->mem.start = NULL;
+ b->mem.end = NULL;
+ b->mem.pos = NULL;
+ b->mem.free = NULL;
+ }
+
ready:
r->state->ready_handler(task, r, NULL);
@@ -926,7 +992,9 @@ static const nxt_conn_state_t nxt_h1p_read_body_state
static void
nxt_h1p_conn_request_body_read(nxt_task_t *task, void *obj, void *data)
{
- size_t size;
+ size_t size, body_rest;
+ ssize_t res;
+ nxt_buf_t *b;
nxt_conn_t *c;
nxt_h1proto_t *h1p;
nxt_http_request_t *r;
@@ -937,18 +1005,59 @@ nxt_h1p_conn_request_body_read(nxt_task_t *task, void *obj, void *data)
nxt_debug(task, "h1p conn request body read");
- size = nxt_buf_mem_free_size(&c->read->mem);
-
- nxt_debug(task, "h1p body rest: %uz", size);
+ r = h1p->request;
engine = task->thread->engine;
- if (size != 0) {
+ b = c->read;
+
+ if (nxt_buf_is_file(b)) {
+ body_rest = b->file->size - b->file_end;
+
+ size = nxt_buf_mem_used_size(&b->mem);
+ size = nxt_min(size, body_rest);
+
+ res = nxt_fd_write(b->file->fd, b->mem.pos, size);
+ if (nxt_slow_path(res < (ssize_t) size)) {
+ nxt_h1p_request_error(task, h1p, r);
+ return;
+ }
+
+ b->file_end += size;
+ body_rest -= res;
+
+ b->mem.pos += size;
+
+ if (b->mem.pos == b->mem.free) {
+ if (body_rest >= (size_t) nxt_buf_mem_size(&b->mem)) {
+ b->mem.free = b->mem.start;
+
+ } else {
+ /* This required to avoid reading next request. */
+ b->mem.free = b->mem.end - body_rest;
+ }
+
+ b->mem.pos = b->mem.free;
+ }
+
+ } else {
+ body_rest = nxt_buf_mem_free_size(&c->read->mem);
+ }
+
+ nxt_debug(task, "h1p body rest: %uz", body_rest);
+
+ if (body_rest != 0) {
nxt_conn_read(engine, c);
} else {
+ if (nxt_buf_is_file(b)) {
+ b->mem.start = NULL;
+ b->mem.end = NULL;
+ b->mem.pos = NULL;
+ b->mem.free = NULL;
+ }
+
c->read = NULL;
- r = h1p->request;
r->state->ready_handler(task, r, NULL);
}
@@ -2004,7 +2113,7 @@ nxt_h1p_peer_connect(nxt_task_t *task, nxt_http_peer_t *peer)
c->read_timer.task = task;
c->write_timer.task = task;
c->socket.data = peer;
- c->remote = peer->sockaddr;
+ c->remote = peer->server->sockaddr;
c->socket.write_ready = 1;
c->write_state = &nxt_h1p_peer_connect_state;
@@ -2144,7 +2253,13 @@ nxt_h1p_peer_header_send(nxt_task_t *task, nxt_http_peer_t *peer)
c->write_state = &nxt_h1p_peer_header_send_state;
if (r->body != NULL) {
- body = nxt_buf_mem_alloc(r->mem_pool, 0, 0);
+ if (nxt_buf_is_file(r->body)) {
+ body = nxt_buf_file_alloc(r->mem_pool, 0, 0);
+
+ } else {
+ body = nxt_buf_mem_alloc(r->mem_pool, 0, 0);
+ }
+
if (nxt_slow_path(body == NULL)) {
r->state->error_handler(task, r, peer);
return;
@@ -2152,8 +2267,15 @@ nxt_h1p_peer_header_send(nxt_task_t *task, nxt_http_peer_t *peer)
header->next = body;
- body->mem = r->body->mem;
- size += nxt_buf_mem_used_size(&body->mem);
+ if (nxt_buf_is_file(r->body)) {
+ body->file = r->body->file;
+ body->file_end = r->body->file_end;
+
+ } else {
+ body->mem = r->body->mem;
+ }
+
+ size += nxt_buf_used_size(body);
// nxt_mp_retain(r->mem_pool);
}
@@ -2209,13 +2331,13 @@ nxt_h1p_peer_header_sent(nxt_task_t *task, void *obj, void *data)
c->write = nxt_sendbuf_completion(task, &engine->fast_work_queue, c->write);
- if (c->write == NULL) {
- r = peer->request;
- r->state->ready_handler(task, r, peer);
+ if (c->write != NULL) {
+ nxt_conn_write(engine, c);
return;
}
- nxt_conn_write(engine, c);
+ r = peer->request;
+ r->state->ready_handler(task, r, peer);
}
diff --git a/src/nxt_http.h b/src/nxt_http.h
index 030d77a7..0e0694e5 100644
--- a/src/nxt_http.h
+++ b/src/nxt_http.h
@@ -106,10 +106,12 @@ typedef struct {
} nxt_http_response_t;
+typedef struct nxt_upstream_server_s nxt_upstream_server_t;
+
typedef struct {
nxt_http_proto_t proto;
nxt_http_request_t *request;
- nxt_sockaddr_t *sockaddr;
+ nxt_upstream_server_t *server;
nxt_list_t *fields;
nxt_buf_t *body;
nxt_off_t remainder;
@@ -178,7 +180,6 @@ struct nxt_http_request_s {
typedef struct nxt_http_route_s nxt_http_route_t;
-typedef struct nxt_http_upstream_s nxt_http_upstream_t;
struct nxt_http_action_s {
@@ -187,8 +188,10 @@ struct nxt_http_action_s {
nxt_http_action_t *action);
union {
nxt_http_route_t *route;
- nxt_http_upstream_t *upstream;
nxt_app_t *application;
+ nxt_http_action_t *fallback;
+ nxt_upstream_t *upstream;
+ uint32_t upstream_number;
} u;
nxt_str_t name;
@@ -274,6 +277,11 @@ nxt_http_action_t *nxt_http_pass_application(nxt_task_t *task,
void nxt_http_routes_cleanup(nxt_task_t *task, nxt_http_routes_t *routes);
void nxt_http_action_cleanup(nxt_task_t *task, nxt_http_action_t *action);
+nxt_int_t nxt_upstreams_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
+ nxt_conf_value_t *conf);
+nxt_int_t nxt_upstreams_joint_create(nxt_router_temp_conf_t *tmcf,
+ nxt_upstream_t ***upstream_joint);
+
nxt_http_action_t *nxt_http_static_handler(nxt_task_t *task,
nxt_http_request_t *r, nxt_http_action_t *action);
nxt_int_t nxt_http_static_mtypes_init(nxt_mp_t *mp, nxt_lvlhsh_t *hash);
@@ -284,6 +292,11 @@ nxt_str_t *nxt_http_static_mtypes_hash_find(nxt_lvlhsh_t *hash,
nxt_http_action_t *nxt_http_application_handler(nxt_task_t *task,
nxt_http_request_t *r, nxt_http_action_t *action);
+void nxt_upstream_find(nxt_upstreams_t *upstreams, nxt_str_t *name,
+ nxt_http_action_t *action);
+nxt_http_action_t *nxt_upstream_proxy_handler(nxt_task_t *task,
+ nxt_http_request_t *r, nxt_upstream_t *upstream);
+
nxt_int_t nxt_http_proxy_create(nxt_mp_t *mp, nxt_http_action_t *action);
nxt_int_t nxt_http_proxy_date(void *ctx, nxt_http_field_t *field,
diff --git a/src/nxt_http_proxy.c b/src/nxt_http_proxy.c
index 7f4eeff2..893e9303 100644
--- a/src/nxt_http_proxy.c
+++ b/src/nxt_http_proxy.c
@@ -6,23 +6,21 @@
#include <nxt_router.h>
#include <nxt_http.h>
+#include <nxt_upstream.h>
-typedef void (*nxt_http_upstream_connect_t)(nxt_task_t *task,
- nxt_http_upstream_t *upstream, nxt_http_peer_t *peer);
-
-
-struct nxt_http_upstream_s {
- uint32_t current;
- uint32_t n;
- uint8_t protocol;
- nxt_http_upstream_connect_t connect;
- nxt_sockaddr_t *sockaddr[1];
+struct nxt_upstream_proxy_s {
+ nxt_sockaddr_t *sockaddr;
+ uint8_t protocol;
};
-static void nxt_http_upstream_connect(nxt_task_t *task,
- nxt_http_upstream_t *upstream, nxt_http_peer_t *peer);
+static void nxt_http_proxy_server_get(nxt_task_t *task,
+ nxt_upstream_server_t *us);
+static void nxt_http_proxy_upstream_ready(nxt_task_t *task,
+ nxt_upstream_server_t *us);
+static void nxt_http_proxy_upstream_error(nxt_task_t *task,
+ nxt_upstream_server_t *us);
static nxt_http_action_t *nxt_http_proxy_handler(nxt_task_t *task,
nxt_http_request_t *r, nxt_http_action_t *action);
static void nxt_http_proxy_header_send(nxt_task_t *task, void *obj, void *data);
@@ -43,12 +41,24 @@ static const nxt_http_request_state_t nxt_http_proxy_header_read_state;
static const nxt_http_request_state_t nxt_http_proxy_read_state;
+static const nxt_upstream_server_proto_t nxt_upstream_simple_proto = {
+ .get = nxt_http_proxy_server_get,
+};
+
+
+static const nxt_upstream_peer_state_t nxt_upstream_proxy_state = {
+ .ready = nxt_http_proxy_upstream_ready,
+ .error = nxt_http_proxy_upstream_error,
+};
+
+
nxt_int_t
nxt_http_proxy_create(nxt_mp_t *mp, nxt_http_action_t *action)
{
- nxt_str_t name;
- nxt_sockaddr_t *sa;
- nxt_http_upstream_t *upstream;
+ nxt_str_t name;
+ nxt_sockaddr_t *sa;
+ nxt_upstream_t *up;
+ nxt_upstream_proxy_t *proxy;
sa = NULL;
name = action->name;
@@ -66,18 +76,25 @@ nxt_http_proxy_create(nxt_mp_t *mp, nxt_http_action_t *action)
}
if (sa != NULL) {
- upstream = nxt_mp_alloc(mp, sizeof(nxt_http_upstream_t));
- if (nxt_slow_path(upstream == NULL)) {
+ up = nxt_mp_alloc(mp, sizeof(nxt_upstream_t));
+ if (nxt_slow_path(up == NULL)) {
+ return NXT_ERROR;
+ }
+
+ up->name.length = sa->length;
+ up->name.start = nxt_sockaddr_start(sa);
+ up->proto = &nxt_upstream_simple_proto;
+
+ proxy = nxt_mp_alloc(mp, sizeof(nxt_upstream_proxy_t));
+ if (nxt_slow_path(proxy == NULL)) {
return NXT_ERROR;
}
- upstream->current = 0;
- upstream->n = 1;
- upstream->protocol = NXT_HTTP_PROTO_H1;
- upstream->connect = nxt_http_upstream_connect;
- upstream->sockaddr[0] = sa;
+ proxy->sockaddr = sa;
+ proxy->protocol = NXT_HTTP_PROTO_H1;
+ up->type.proxy = proxy;
- action->u.upstream = upstream;
+ action->u.upstream = up;
action->handler = nxt_http_proxy_handler;
}
@@ -89,7 +106,22 @@ static nxt_http_action_t *
nxt_http_proxy_handler(nxt_task_t *task, nxt_http_request_t *r,
nxt_http_action_t *action)
{
- nxt_http_peer_t *peer;
+ return nxt_upstream_proxy_handler(task, r, action->u.upstream);
+}
+
+
+nxt_http_action_t *
+nxt_upstream_proxy_handler(nxt_task_t *task, nxt_http_request_t *r,
+ nxt_upstream_t *upstream)
+{
+ nxt_http_peer_t *peer;
+ nxt_upstream_server_t *us;
+
+ us = nxt_mp_zalloc(r->mem_pool, sizeof(nxt_upstream_server_t));
+ if (nxt_slow_path(us == NULL)) {
+ nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
+ return NULL;
+ }
peer = nxt_mp_zalloc(r->mem_pool, sizeof(nxt_http_peer_t));
if (nxt_slow_path(peer == NULL)) {
@@ -102,18 +134,39 @@ nxt_http_proxy_handler(nxt_task_t *task, nxt_http_request_t *r,
nxt_mp_retain(r->mem_pool);
- action->u.upstream->connect(task, action->u.upstream, peer);
+ us->state = &nxt_upstream_proxy_state;
+ us->peer.http = peer;
+ peer->server = us;
+
+ us->upstream = upstream;
+ upstream->proto->get(task, us);
return NULL;
}
static void
-nxt_http_upstream_connect(nxt_task_t *task, nxt_http_upstream_t *upstream,
- nxt_http_peer_t *peer)
+nxt_http_proxy_server_get(nxt_task_t *task, nxt_upstream_server_t *us)
{
- peer->protocol = upstream->protocol;
- peer->sockaddr = upstream->sockaddr[0];
+ nxt_upstream_proxy_t *proxy;
+
+ proxy = us->upstream->type.proxy;
+
+ us->sockaddr = proxy->sockaddr;
+ us->protocol = proxy->protocol;
+
+ us->state->ready(task, us);
+}
+
+
+static void
+nxt_http_proxy_upstream_ready(nxt_task_t *task, nxt_upstream_server_t *us)
+{
+ nxt_http_peer_t *peer;
+
+ peer = us->peer.http;
+
+ peer->protocol = us->protocol;
peer->request->state = &nxt_http_proxy_header_send_state;
@@ -121,6 +174,19 @@ nxt_http_upstream_connect(nxt_task_t *task, nxt_http_upstream_t *upstream,
}
+static void
+nxt_http_proxy_upstream_error(nxt_task_t *task, nxt_upstream_server_t *us)
+{
+ nxt_http_request_t *r;
+
+ r = us->peer.http->request;
+
+ nxt_mp_release(r->mem_pool);
+
+ nxt_http_request_error(task, r, NXT_HTTP_BAD_GATEWAY);
+}
+
+
static const nxt_http_request_state_t nxt_http_proxy_header_send_state
nxt_aligned(64) =
{
diff --git a/src/nxt_http_request.c b/src/nxt_http_request.c
index 14c75dab..72aaa290 100644
--- a/src/nxt_http_request.c
+++ b/src/nxt_http_request.c
@@ -186,7 +186,7 @@ nxt_int_t
nxt_http_request_content_length(void *ctx, nxt_http_field_t *field,
uintptr_t data)
{
- nxt_off_t n;
+ nxt_off_t n, max_body_size;
nxt_http_request_t *r;
r = ctx;
@@ -198,6 +198,13 @@ nxt_http_request_content_length(void *ctx, nxt_http_field_t *field,
if (nxt_fast_path(n >= 0)) {
r->content_length_n = n;
+
+ max_body_size = r->conf->socket_conf->max_body_size;
+
+ if (nxt_slow_path(n > max_body_size)) {
+ return NXT_HTTP_PAYLOAD_TOO_LARGE;
+ }
+
return NXT_OK;
}
}
@@ -319,18 +326,8 @@ nxt_http_action_t *
nxt_http_application_handler(nxt_task_t *task, nxt_http_request_t *r,
nxt_http_action_t *action)
{
- nxt_event_engine_t *engine;
-
nxt_debug(task, "http application handler");
- nxt_mp_retain(r->mem_pool);
-
- engine = task->thread->engine;
- r->timer.task = &engine->task;
- r->timer.work_queue = &engine->fast_work_queue;
- r->timer.log = engine->task.log;
- r->timer.bias = NXT_TIMER_DEFAULT_BIAS;
-
/*
* TODO: need an application flag to get local address
* required by "SERVER_ADDR" in Pyhton and PHP. Not used in Go.
@@ -572,6 +569,14 @@ nxt_http_request_close_handler(nxt_task_t *task, void *obj, void *data)
r->proto.any = NULL;
+ if (r->body != NULL && nxt_buf_is_file(r->body)
+ && r->body->file->fd != -1)
+ {
+ nxt_fd_close(r->body->file->fd);
+
+ r->body->file->fd = -1;
+ }
+
if (nxt_fast_path(proto.any != NULL)) {
protocol = r->protocol;
diff --git a/src/nxt_http_route.c b/src/nxt_http_route.c
index ef9593b7..d7f20bcb 100644
--- a/src/nxt_http_route.c
+++ b/src/nxt_http_route.c
@@ -43,6 +43,7 @@ typedef struct {
nxt_conf_value_t *pass;
nxt_conf_value_t *share;
nxt_conf_value_t *proxy;
+ nxt_conf_value_t *fallback;
} nxt_http_route_action_conf_t;
@@ -175,7 +176,7 @@ static nxt_http_route_t *nxt_http_route_create(nxt_task_t *task,
static nxt_http_route_match_t *nxt_http_route_match_create(nxt_task_t *task,
nxt_router_temp_conf_t *tmcf, nxt_conf_value_t *cv);
static nxt_int_t nxt_http_route_action_create(nxt_router_temp_conf_t *tmcf,
- nxt_conf_value_t *cv, nxt_http_route_match_t *match);
+ nxt_conf_value_t *cv, nxt_http_action_t *action);
static nxt_http_route_table_t *nxt_http_route_table_create(nxt_task_t *task,
nxt_mp_t *mp, nxt_conf_value_t *table_cv, nxt_http_route_object_t object,
nxt_bool_t case_sensitive);
@@ -191,6 +192,7 @@ static nxt_http_route_rule_t *nxt_http_route_rule_create(nxt_task_t *task,
nxt_mp_t *mp, nxt_conf_value_t *cv, nxt_bool_t case_sensitive,
nxt_http_route_pattern_case_t pattern_case);
static int nxt_http_pattern_compare(const void *one, const void *two);
+static int nxt_http_addr_pattern_compare(const void *one, const void *two);
static nxt_int_t nxt_http_route_pattern_create(nxt_task_t *task, nxt_mp_t *mp,
nxt_conf_value_t *cv, nxt_http_route_pattern_t *pattern,
nxt_http_route_pattern_case_t pattern_case);
@@ -201,8 +203,8 @@ static void nxt_http_route_resolve(nxt_task_t *task,
nxt_router_temp_conf_t *tmcf, nxt_http_route_t *route);
static void nxt_http_action_resolve(nxt_task_t *task,
nxt_router_temp_conf_t *tmcf, nxt_http_action_t *action);
-static nxt_http_route_t *nxt_http_route_find(nxt_http_routes_t *routes,
- nxt_str_t *name);
+static void nxt_http_route_find(nxt_http_routes_t *routes, nxt_str_t *name,
+ nxt_http_action_t *action);
static void nxt_http_route_cleanup(nxt_task_t *task, nxt_http_route_t *routes);
static nxt_http_action_t *nxt_http_route_handler(nxt_task_t *task,
@@ -407,7 +409,7 @@ nxt_http_route_match_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
uint32_t n;
nxt_mp_t *mp;
nxt_int_t ret;
- nxt_conf_value_t *match_conf;
+ nxt_conf_value_t *match_conf, *action_conf;
nxt_http_route_test_t *test;
nxt_http_route_rule_t *rule;
nxt_http_route_table_t *table;
@@ -416,6 +418,7 @@ nxt_http_route_match_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
nxt_http_route_match_conf_t mtcf;
static nxt_str_t match_path = nxt_string("/match");
+ static nxt_str_t action_path = nxt_string("/action");
match_conf = nxt_conf_get_path(cv, &match_path);
@@ -433,7 +436,12 @@ nxt_http_route_match_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
match->action.handler = NULL;
match->items = n;
- ret = nxt_http_route_action_create(tmcf, cv, match);
+ action_conf = nxt_conf_get_path(cv, &action_path);
+ if (nxt_slow_path(action_conf == NULL)) {
+ return NULL;
+ }
+
+ ret = nxt_http_route_action_create(tmcf, action_conf, &match->action);
if (nxt_slow_path(ret != NXT_OK)) {
return NULL;
}
@@ -579,30 +587,27 @@ static nxt_conf_map_t nxt_http_route_action_conf[] = {
NXT_CONF_MAP_PTR,
offsetof(nxt_http_route_action_conf_t, proxy)
},
+ {
+ nxt_string("fallback"),
+ NXT_CONF_MAP_PTR,
+ offsetof(nxt_http_route_action_conf_t, fallback)
+ },
};
static nxt_int_t
nxt_http_route_action_create(nxt_router_temp_conf_t *tmcf, nxt_conf_value_t *cv,
- nxt_http_route_match_t *match)
+ nxt_http_action_t *action)
{
nxt_mp_t *mp;
nxt_int_t ret;
nxt_str_t name, *string;
- nxt_conf_value_t *conf, *action_conf;
+ nxt_conf_value_t *conf;
nxt_http_route_action_conf_t accf;
- static nxt_str_t action_path = nxt_string("/action");
-
- action_conf = nxt_conf_get_path(cv, &action_path);
- if (action_conf == NULL) {
- return NXT_ERROR;
- }
-
nxt_memzero(&accf, sizeof(accf));
- ret = nxt_conf_map_object(tmcf->mem_pool,
- action_conf, nxt_http_route_action_conf,
+ ret = nxt_conf_map_object(tmcf->mem_pool, cv, nxt_http_route_action_conf,
nxt_nitems(nxt_http_route_action_conf), &accf);
if (ret != NXT_OK) {
return ret;
@@ -612,7 +617,7 @@ nxt_http_route_action_create(nxt_router_temp_conf_t *tmcf, nxt_conf_value_t *cv,
if (accf.share != NULL) {
conf = accf.share;
- match->action.handler = nxt_http_static_handler;
+ action->handler = nxt_http_static_handler;
} else if (accf.proxy != NULL) {
conf = accf.proxy;
@@ -622,13 +627,23 @@ nxt_http_route_action_create(nxt_router_temp_conf_t *tmcf, nxt_conf_value_t *cv,
mp = tmcf->router_conf->mem_pool;
- string = nxt_str_dup(mp, &match->action.name, &name);
+ string = nxt_str_dup(mp, &action->name, &name);
if (nxt_slow_path(string == NULL)) {
return NXT_ERROR;
}
+ if (accf.fallback != NULL) {
+ action->u.fallback = nxt_mp_zalloc(mp, sizeof(nxt_http_action_t));
+ if (nxt_slow_path(action->u.fallback == NULL)) {
+ return NXT_ERROR;
+ }
+
+ return nxt_http_route_action_create(tmcf, accf.fallback,
+ action->u.fallback);
+ }
+
if (accf.proxy != NULL) {
- return nxt_http_proxy_create(mp, &match->action);
+ return nxt_http_proxy_create(mp, action);
}
return NXT_OK;
@@ -867,6 +882,12 @@ nxt_http_route_addr_rule_create(nxt_task_t *task, nxt_mp_t *mp,
}
}
+ if (n > 1) {
+ nxt_qsort(addr_rule->addr_pattern, addr_rule->items,
+ sizeof(nxt_http_route_addr_pattern_t),
+ nxt_http_addr_pattern_compare);
+ }
+
return addr_rule;
}
@@ -890,6 +911,18 @@ nxt_http_pattern_compare(const void *one, const void *two)
}
+static int
+nxt_http_addr_pattern_compare(const void *one, const void *two)
+{
+ const nxt_http_route_addr_pattern_t *p1, *p2;
+
+ p1 = one;
+ p2 = two;
+
+ return (p2->base.negative - p1->base.negative);
+}
+
+
static nxt_int_t
nxt_http_route_pattern_create(nxt_task_t *task, nxt_mp_t *mp,
nxt_conf_value_t *cv, nxt_http_route_pattern_t *pattern,
@@ -1043,18 +1076,13 @@ static void
nxt_http_route_resolve(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
nxt_http_route_t *route)
{
- nxt_http_action_t *action;
nxt_http_route_match_t **match, **end;
match = &route->match[0];
end = match + route->items;
while (match < end) {
- action = &(*match)->action;
-
- if (action->handler == NULL) {
- nxt_http_action_resolve(task, tmcf, &(*match)->action);
- }
+ nxt_http_action_resolve(task, tmcf, &(*match)->action);
match++;
}
@@ -1067,16 +1095,30 @@ nxt_http_action_resolve(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
{
nxt_str_t name;
+ if (action->handler != NULL) {
+ if (action->handler == nxt_http_static_handler
+ && action->u.fallback != NULL)
+ {
+ nxt_http_action_resolve(task, tmcf, action->u.fallback);
+ }
+
+ return;
+ }
+
name = action->name;
if (nxt_str_start(&name, "applications/", 13)) {
name.length -= 13;
name.start += 13;
- action->u.application = nxt_router_listener_application(tmcf, &name);
+ nxt_router_listener_application(tmcf, &name, action);
nxt_router_app_use(task, action->u.application, 1);
- action->handler = nxt_http_application_handler;
+ } else if (nxt_str_start(&name, "upstreams/", 10)) {
+ name.length -= 10;
+ name.start += 10;
+
+ nxt_upstream_find(tmcf->router_conf->upstreams, &name, action);
} else if (nxt_str_start(&name, "routes", 6)) {
@@ -1089,15 +1131,14 @@ nxt_http_action_resolve(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
name.start += 7;
}
- action->u.route = nxt_http_route_find(tmcf->router_conf->routes, &name);
-
- action->handler = nxt_http_route_handler;
+ nxt_http_route_find(tmcf->router_conf->routes, &name, action);
}
}
-static nxt_http_route_t *
-nxt_http_route_find(nxt_http_routes_t *routes, nxt_str_t *name)
+static void
+nxt_http_route_find(nxt_http_routes_t *routes, nxt_str_t *name,
+ nxt_http_action_t *action)
{
nxt_http_route_t **route, **end;
@@ -1106,13 +1147,14 @@ nxt_http_route_find(nxt_http_routes_t *routes, nxt_str_t *name)
while (route < end) {
if (nxt_strstr_eq(&(*route)->name, name)) {
- return *route;
+ action->u.route = *route;
+ action->handler = nxt_http_route_handler;
+
+ return;
}
route++;
}
-
- return NULL;
}
@@ -1153,11 +1195,9 @@ nxt_http_pass_application(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
action->name = *name;
- action->u.application = nxt_router_listener_application(tmcf, name);
+ nxt_router_listener_application(tmcf, name, action);
nxt_router_app_use(task, action->u.application, 1);
- action->handler = nxt_http_application_handler;
-
return action;
}
@@ -1201,6 +1241,13 @@ nxt_http_action_cleanup(nxt_task_t *task, nxt_http_action_t *action)
{
if (action->handler == nxt_http_application_handler) {
nxt_router_app_use(task, action->u.application, -1);
+ return;
+ }
+
+ if (action->handler == nxt_http_static_handler
+ && action->u.fallback != NULL)
+ {
+ nxt_http_action_cleanup(task, action->u.fallback);
}
}
@@ -1501,19 +1548,34 @@ static nxt_int_t
nxt_http_route_addr_rule(nxt_http_request_t *r,
nxt_http_route_addr_rule_t *addr_rule, nxt_sockaddr_t *sa)
{
- uint32_t i, n;
+ uint32_t n;
+ nxt_bool_t matches;
nxt_http_route_addr_pattern_t *p;
n = addr_rule->items;
+ p = &addr_rule->addr_pattern[0] - 1;
- for (i = 0; i < n; i++) {
- p = &addr_rule->addr_pattern[i];
- if (nxt_http_route_addr_pattern_match(p, sa)) {
+ do {
+ p++;
+ n--;
+
+ matches = nxt_http_route_addr_pattern_match(p, sa);
+
+ if (p->base.negative) {
+ if (matches) {
+ continue;
+ }
+
+ return 0;
+ }
+
+ if (matches) {
return 1;
}
- }
- return 0;
+ } while (n > 0);
+
+ return p->base.negative;
}
diff --git a/src/nxt_http_static.c b/src/nxt_http_static.c
index 44132859..46ae57a7 100644
--- a/src/nxt_http_static.c
+++ b/src/nxt_http_static.c
@@ -49,6 +49,10 @@ nxt_http_static_handler(nxt_task_t *task, nxt_http_request_t *r,
if (nxt_slow_path(!nxt_str_eq(r->method, "GET", 3))) {
if (!nxt_str_eq(r->method, "HEAD", 4)) {
+ if (action->u.fallback != NULL) {
+ return action->u.fallback;
+ }
+
nxt_http_request_error(task, r, NXT_HTTP_METHOD_NOT_ALLOWED);
return NULL;
}
@@ -123,6 +127,10 @@ nxt_http_static_handler(nxt_task_t *task, nxt_http_request_t *r,
break;
}
+ if (level == NXT_LOG_ERR && action->u.fallback != NULL) {
+ return action->u.fallback;
+ }
+
if (status != NXT_HTTP_NOT_FOUND) {
nxt_log(task, level, "open(\"%FN\") failed %E", f->name, f->error);
}
@@ -222,8 +230,13 @@ nxt_http_static_handler(nxt_task_t *task, nxt_http_request_t *r,
nxt_file_close(task, f);
if (nxt_slow_path(!nxt_is_dir(&fi))) {
+ if (action->u.fallback != NULL) {
+ return action->u.fallback;
+ }
+
nxt_log(task, NXT_LOG_ERR, "\"%FN\" is not a regular file",
f->name);
+
nxt_http_request_error(task, r, NXT_HTTP_NOT_FOUND);
return NULL;
}
diff --git a/src/nxt_kqueue_engine.c b/src/nxt_kqueue_engine.c
index 9edbc346..ecc3251e 100644
--- a/src/nxt_kqueue_engine.c
+++ b/src/nxt_kqueue_engine.c
@@ -747,7 +747,7 @@ nxt_kqueue_poll(nxt_event_engine_t *engine, nxt_msec_t timeout)
err = kev->fflags;
eof = (kev->flags & EV_EOF) != 0;
ev->kq_errno = err;
- ev->kq_eof = eof;
+ ev->kq_eof |= eof;
if (ev->read <= NXT_EVENT_BLOCKED) {
nxt_debug(ev->task, "blocked read event fd:%d", ev->fd);
@@ -778,7 +778,7 @@ nxt_kqueue_poll(nxt_event_engine_t *engine, nxt_msec_t timeout)
err = kev->fflags;
eof = (kev->flags & EV_EOF) != 0;
ev->kq_errno = err;
- ev->kq_eof = eof;
+ ev->kq_eof |= eof;
if (ev->write <= NXT_EVENT_BLOCKED) {
nxt_debug(ev->task, "blocked write event fd:%d", ev->fd);
diff --git a/src/nxt_main.h b/src/nxt_main.h
index d9e337d2..b310c4fa 100644
--- a/src/nxt_main.h
+++ b/src/nxt_main.h
@@ -149,15 +149,7 @@ typedef void (*nxt_event_conn_handler_t)(nxt_thread_t *thr, nxt_conn_t *c);
#include <nxt_cache.h>
-#include <nxt_source.h>
-typedef struct nxt_upstream_source_s nxt_upstream_source_t;
-
#include <nxt_http_parse.h>
-#include <nxt_stream_source.h>
-#include <nxt_upstream.h>
-#include <nxt_upstream_source.h>
-#include <nxt_http_source.h>
-#include <nxt_fastcgi_source.h>
#include <nxt_runtime.h>
#include <nxt_port_hash.h>
diff --git a/src/nxt_php_sapi.c b/src/nxt_php_sapi.c
index 26bf915f..f5053652 100644
--- a/src/nxt_php_sapi.c
+++ b/src/nxt_php_sapi.c
@@ -15,23 +15,31 @@
#include <nxt_unit_request.h>
-#if PHP_MAJOR_VERSION >= 7
-# define NXT_PHP7 1
-# if PHP_MINOR_VERSION >= 1
-# define NXT_HAVE_PHP_LOG_MESSAGE_WITH_SYSLOG_TYPE 1
-# else
-# define NXT_HAVE_PHP_INTERRUPTS 1
-# endif
-# define NXT_HAVE_PHP_IGNORE_CWD 1
+#if PHP_VERSION_ID >= 50400
+#define NXT_HAVE_PHP_IGNORE_CWD 1
+#endif
+
+#if PHP_VERSION_ID >= 70100
+#define NXT_HAVE_PHP_LOG_MESSAGE_WITH_SYSLOG_TYPE 1
#else
-# define NXT_HAVE_PHP_INTERRUPTS 1
-# if PHP_MINOR_VERSION >= 4
-# define NXT_HAVE_PHP_IGNORE_CWD 1
-# endif
+#define NXT_HAVE_PHP_INTERRUPTS 1
+#endif
+
+#if PHP_VERSION_ID >= 70000
+#define NXT_PHP7 1
#endif
+typedef struct {
+ char *cookie;
+ nxt_str_t path_info;
+ nxt_str_t script_name;
+ nxt_str_t script_filename;
+ nxt_str_t script_dirname;
+ nxt_unit_request_info_t *req;
+
+ uint8_t chdir; /* 1 bit */
+} nxt_php_run_ctx_t;
-typedef struct nxt_php_run_ctx_s nxt_php_run_ctx_t;
#ifdef NXT_PHP7
typedef int (*nxt_php_disable_t)(char *p, size_t size);
@@ -39,14 +47,24 @@ typedef int (*nxt_php_disable_t)(char *p, size_t size);
typedef int (*nxt_php_disable_t)(char *p, uint TSRMLS_DC);
#endif
+#if PHP_VERSION_ID < 70200
+typedef void (*zif_handler)(INTERNAL_FUNCTION_PARAMETERS);
+#endif
+
static nxt_int_t nxt_php_init(nxt_task_t *task, nxt_common_app_conf_t *conf);
static void nxt_php_str_trim_trail(nxt_str_t *str, u_char t);
static void nxt_php_str_trim_lead(nxt_str_t *str, u_char t);
+static nxt_int_t nxt_php_dirname(const nxt_str_t *file, nxt_str_t *dir);
nxt_inline u_char *nxt_realpath(const void *c);
+nxt_inline void nxt_php_vcwd_chdir(nxt_unit_request_info_t *req,
+ const nxt_str_t *dirname);
-static void nxt_php_request_handler(nxt_unit_request_info_t *req);
+static void nxt_php_script_request_handler(nxt_unit_request_info_t *req);
+static void nxt_php_path_request_handler(nxt_unit_request_info_t *req);
+static nxt_int_t nxt_php_request_init(nxt_php_run_ctx_t *ctx,
+ nxt_unit_request_t *r);
static int nxt_php_startup(sapi_module_struct *sapi_module);
static void nxt_php_set_options(nxt_task_t *task, nxt_conf_value_t *options,
@@ -56,6 +74,8 @@ static nxt_int_t nxt_php_alter_option(nxt_str_t *name, nxt_str_t *value,
static void nxt_php_disable(nxt_task_t *task, const char *type,
nxt_str_t *value, char **ptr, nxt_php_disable_t disable);
static int nxt_php_send_headers(sapi_headers_struct *sapi_headers TSRMLS_DC);
+static void *nxt_php_hash_str_find_ptr(const HashTable *ht,
+ const nxt_str_t *str);
static char *nxt_php_read_cookies(TSRMLS_D);
static void nxt_php_set_sptr(nxt_unit_request_info_t *req, const char *name,
nxt_unit_sptr_t *v, uint32_t len, zval *track_vars_array TSRMLS_DC);
@@ -80,6 +100,55 @@ static int nxt_php_read_post(char *buffer, uint count_bytes TSRMLS_DC);
#endif
+PHP_MINIT_FUNCTION(nxt_php_ext);
+ZEND_NAMED_FUNCTION(nxt_php_chdir);
+
+zif_handler nxt_php_chdir_handler;
+
+
+static zend_module_entry nxt_php_unit_module = {
+ STANDARD_MODULE_HEADER,
+ "unit",
+ NULL, /* function table */
+ PHP_MINIT(nxt_php_ext), /* initialization */
+ NULL, /* shutdown */
+ NULL, /* request initialization */
+ NULL, /* request shutdown */
+ NULL, /* information */
+ NXT_VERSION,
+ STANDARD_MODULE_PROPERTIES
+};
+
+
+PHP_MINIT_FUNCTION(nxt_php_ext)
+{
+ zend_function *func;
+
+ static const nxt_str_t chdir = nxt_string("chdir");
+
+ func = nxt_php_hash_str_find_ptr(CG(function_table), &chdir);
+ if (nxt_slow_path(func == NULL)) {
+ return FAILURE;
+ }
+
+ nxt_php_chdir_handler = func->internal_function.handler;
+ func->internal_function.handler = nxt_php_chdir;
+
+ return SUCCESS;
+}
+
+
+ZEND_NAMED_FUNCTION(nxt_php_chdir)
+{
+ nxt_php_run_ctx_t *ctx;
+
+ ctx = SG(server_context);
+ ctx->chdir = 1;
+
+ nxt_php_chdir_handler(INTERNAL_FUNCTION_PARAM_PASSTHRU);
+}
+
+
static sapi_module_struct nxt_php_sapi_module =
{
(char *) "cli-server",
@@ -141,17 +210,9 @@ static sapi_module_struct nxt_php_sapi_module =
};
-struct nxt_php_run_ctx_s {
- char *cookie;
- nxt_str_t path_info;
- nxt_str_t script_name;
- nxt_str_t script_filename;
- nxt_unit_request_info_t *req;
-};
-
-
static nxt_str_t nxt_php_root;
static nxt_str_t nxt_php_script_name;
+static nxt_str_t nxt_php_script_dirname;
static nxt_str_t nxt_php_script_filename;
static nxt_str_t nxt_php_index = nxt_string("index.php");
@@ -172,7 +233,7 @@ NXT_EXPORT nxt_app_module_t nxt_app_module = {
static nxt_task_t *nxt_php_task;
-#ifdef ZTS
+#if defined(ZTS) && PHP_VERSION_ID < 70400
static void ***tsrm_ls;
#endif
@@ -182,7 +243,9 @@ nxt_php_init(nxt_task_t *task, nxt_common_app_conf_t *conf)
{
u_char *p, *tmp;
nxt_str_t ini_path;
- nxt_str_t *root, *script_filename, *script_name, *index;
+ nxt_str_t *root, *script_filename, *script_dirname, *script_name;
+ nxt_str_t *index;
+ nxt_int_t ret;
nxt_port_t *my_port, *main_port;
nxt_runtime_t *rt;
nxt_unit_ctx_t *unit_ctx;
@@ -205,6 +268,7 @@ nxt_php_init(nxt_task_t *task, nxt_common_app_conf_t *conf)
root = &nxt_php_root;
script_filename = &nxt_php_script_filename;
+ script_dirname = &nxt_php_script_dirname;
script_name = &nxt_php_script_name;
index = &nxt_php_index;
@@ -249,6 +313,11 @@ nxt_php_init(nxt_task_t *task, nxt_common_app_conf_t *conf)
return NXT_ERROR;
}
+ ret = nxt_php_dirname(script_filename, script_dirname);
+ if (nxt_slow_path(ret != NXT_OK)) {
+ return NXT_ERROR;
+ }
+
script_name->length = c->script.length + 1;
script_name->start = nxt_malloc(script_name->length);
if (nxt_slow_path(script_name->start == NULL)) {
@@ -277,11 +346,33 @@ nxt_php_init(nxt_task_t *task, nxt_common_app_conf_t *conf)
nxt_memcpy(index->start, c->index.start, c->index.length);
}
+ nxt_memzero(&php_init, sizeof(nxt_unit_init_t));
+
+ if (nxt_php_script_filename.start != NULL) {
+ if (nxt_slow_path(chdir((char *) script_dirname->start) != 0)) {
+ nxt_alert(task, "failed to chdir(%V) %E", script_dirname,
+ nxt_errno);
+
+ return NXT_ERROR;
+ }
+
+ php_init.callbacks.request_handler = nxt_php_script_request_handler;
+
+ } else {
+ php_init.callbacks.request_handler = nxt_php_path_request_handler;
+ }
+
#ifdef ZTS
+
+#if PHP_VERSION_ID >= 70400
+ php_tsrm_startup();
+#else
tsrm_startup(1, 1, 0, NULL);
tsrm_ls = ts_resource(0);
#endif
+#endif
+
#if defined(NXT_PHP7) && defined(ZEND_SIGNALS)
#if (NXT_ZEND_SIGNAL_STARTUP)
@@ -312,7 +403,10 @@ nxt_php_init(nxt_task_t *task, nxt_common_app_conf_t *conf)
}
}
- nxt_php_startup(&nxt_php_sapi_module);
+ if (nxt_slow_path(nxt_php_startup(&nxt_php_sapi_module) == FAILURE)) {
+ nxt_alert(task, "failed to initialize SAPI module and extension");
+ return NXT_ERROR;
+ }
if (c->options != NULL) {
value = nxt_conf_get_object_member(c->options, &admin_str, NULL);
@@ -322,21 +416,20 @@ nxt_php_init(nxt_task_t *task, nxt_common_app_conf_t *conf)
nxt_php_set_options(task, value, ZEND_INI_USER);
}
- nxt_memzero(&php_init, sizeof(nxt_unit_init_t));
-
rt = task->thread->runtime;
main_port = rt->port_by_type[NXT_PROCESS_MAIN];
if (nxt_slow_path(main_port == NULL)) {
+ nxt_alert(task, "main process not found");
return NXT_ERROR;
}
my_port = nxt_runtime_port_find(rt, nxt_pid, 0);
if (nxt_slow_path(my_port == NULL)) {
+ nxt_alert(task, "my_port not found");
return NXT_ERROR;
}
- php_init.callbacks.request_handler = nxt_php_request_handler;
php_init.ready_port.id.pid = main_port->pid;
php_init.ready_port.id.id = main_port->id;
php_init.ready_port.out_fd = main_port->pair[1];
@@ -411,7 +504,7 @@ nxt_php_set_options(nxt_task_t *task, nxt_conf_value_t *options, int type)
}
-#if (NXT_PHP7)
+#ifdef NXT_PHP7
static nxt_int_t
nxt_php_alter_option(nxt_str_t *name, nxt_str_t *value, int type)
@@ -419,10 +512,8 @@ nxt_php_alter_option(nxt_str_t *name, nxt_str_t *value, int type)
zend_string *zs;
zend_ini_entry *ini_entry;
- ini_entry = zend_hash_str_find_ptr(EG(ini_directives),
- (char *) name->start, name->length);
-
- if (ini_entry == NULL) {
+ ini_entry = nxt_php_hash_str_find_ptr(EG(ini_directives), name);
+ if (nxt_slow_path(ini_entry == NULL)) {
return NXT_ERROR;
}
@@ -452,19 +543,9 @@ nxt_php_alter_option(nxt_str_t *name, nxt_str_t *value, int type)
{
char *cstr;
zend_ini_entry *ini_entry;
- char buf[256];
- if (nxt_slow_path(name->length >= sizeof(buf))) {
- return NXT_ERROR;
- }
-
- nxt_memcpy(buf, name->start, name->length);
- buf[name->length] = '\0';
-
- if (zend_hash_find(EG(ini_directives), buf, name->length + 1,
- (void **) &ini_entry)
- == FAILURE)
- {
+ ini_entry = nxt_php_hash_str_find_ptr(EG(ini_directives), name);
+ if (nxt_slow_path(ini_entry == NULL)) {
return NXT_ERROR;
}
@@ -549,6 +630,33 @@ nxt_php_disable(nxt_task_t *task, const char *type, nxt_str_t *value,
}
+static nxt_int_t
+nxt_php_dirname(const nxt_str_t *file, nxt_str_t *dir)
+{
+ size_t length;
+
+ nxt_assert(file->length > 0 && file->start[0] == '/');
+
+ length = file->length;
+
+ while (file->start[length - 1] != '/') {
+ length--;
+ }
+
+ dir->length = length;
+ dir->start = nxt_malloc(length + 1);
+ if (nxt_slow_path(dir->start == NULL)) {
+ return NXT_ERROR;
+ }
+
+ nxt_memcpy(dir->start, file->start, length);
+
+ dir->start[length] = '\0';
+
+ return NXT_OK;
+}
+
+
static void
nxt_php_str_trim_trail(nxt_str_t *str, u_char t)
{
@@ -578,12 +686,46 @@ nxt_realpath(const void *c)
static void
-nxt_php_request_handler(nxt_unit_request_info_t *req)
+nxt_php_script_request_handler(nxt_unit_request_info_t *req)
+{
+ zend_file_handle file_handle;
+ nxt_php_run_ctx_t ctx;
+
+ nxt_memzero(&ctx, sizeof(ctx));
+
+ ctx.req = req;
+ ctx.script_filename = nxt_php_script_filename;
+ ctx.script_dirname = nxt_php_script_dirname;
+ ctx.script_name = nxt_php_script_name;
+
+ nxt_memzero(&file_handle, sizeof(file_handle));
+
+ file_handle.type = ZEND_HANDLE_FILENAME;
+ file_handle.filename = (char *) ctx.script_filename.start;
+
+ if (nxt_slow_path(nxt_php_request_init(&ctx, req->request) != NXT_OK)) {
+ nxt_unit_request_done(req, NXT_UNIT_ERROR);
+ return;
+ }
+
+ php_execute_script(&file_handle TSRMLS_CC);
+
+ if (ctx.chdir) {
+ nxt_php_vcwd_chdir(ctx.req, &nxt_php_script_dirname);
+ }
+
+ php_request_shutdown(NULL);
+
+ nxt_unit_request_done(req, NXT_UNIT_OK);
+}
+
+
+static void
+nxt_php_path_request_handler(nxt_unit_request_info_t *req)
{
- int rc;
u_char *p;
nxt_str_t path, script_name;
- nxt_unit_field_t *f;
+ nxt_int_t ret;
zend_file_handle file_handle;
nxt_php_run_ctx_t run_ctx, *ctx;
nxt_unit_request_t *r;
@@ -598,59 +740,101 @@ nxt_php_request_handler(nxt_unit_request_info_t *req)
path.length = r->path_length;
path.start = nxt_unit_sptr_get(&r->path);
- if (nxt_php_script_filename.start == NULL) {
- nxt_str_null(&script_name);
-
- ctx->path_info.start = (u_char *) strstr((char *) path.start, ".php/");
- if (ctx->path_info.start != NULL) {
- ctx->path_info.start += 4;
- path.length = ctx->path_info.start - path.start;
+ nxt_str_null(&script_name);
- ctx->path_info.length = r->path_length - path.length;
+ ctx->path_info.start = (u_char *) strstr((char *) path.start, ".php/");
+ if (ctx->path_info.start != NULL) {
+ ctx->path_info.start += 4;
+ path.length = ctx->path_info.start - path.start;
- } else if (path.start[path.length - 1] == '/') {
- script_name = nxt_php_index;
+ ctx->path_info.length = r->path_length - path.length;
- } else {
- if (nxt_slow_path(path.length < 4
- || nxt_memcmp(path.start + (path.length - 4),
- ".php", 4)))
- {
- nxt_unit_request_done(req, NXT_UNIT_ERROR);
-
- return;
- }
- }
+ } else if (path.start[path.length - 1] == '/') {
+ script_name = nxt_php_index;
- ctx->script_filename.length = nxt_php_root.length + path.length
- + script_name.length;
- p = nxt_malloc(ctx->script_filename.length + 1);
- if (nxt_slow_path(p == NULL)) {
+ } else {
+ if (nxt_slow_path(path.length < 4
+ || nxt_memcmp(path.start + (path.length - 4),
+ ".php", 4)))
+ {
nxt_unit_request_done(req, NXT_UNIT_ERROR);
return;
}
+ }
- ctx->script_filename.start = p;
+ ctx->script_filename.length = nxt_php_root.length
+ + path.length
+ + script_name.length;
- ctx->script_name.length = path.length + script_name.length;
- ctx->script_name.start = p + nxt_php_root.length;
+ p = nxt_malloc(ctx->script_filename.length + 1);
+ if (nxt_slow_path(p == NULL)) {
+ nxt_unit_request_done(req, NXT_UNIT_ERROR);
- p = nxt_cpymem(p, nxt_php_root.start, nxt_php_root.length);
- p = nxt_cpymem(p, path.start, path.length);
+ return;
+ }
- if (script_name.length > 0) {
- p = nxt_cpymem(p, script_name.start, script_name.length);
- }
+ ctx->script_filename.start = p;
- *p = '\0';
+ ctx->script_name.length = path.length + script_name.length;
+ ctx->script_name.start = p + nxt_php_root.length;
- } else {
- ctx->script_filename = nxt_php_script_filename;
- ctx->script_name = nxt_php_script_name;
+ p = nxt_cpymem(p, nxt_php_root.start, nxt_php_root.length);
+ p = nxt_cpymem(p, path.start, path.length);
+
+ if (script_name.length > 0) {
+ p = nxt_cpymem(p, script_name.start, script_name.length);
+ }
+
+ *p = '\0';
+
+ nxt_memzero(&file_handle, sizeof(file_handle));
+
+ file_handle.type = ZEND_HANDLE_FILENAME;
+ file_handle.filename = (char *) ctx->script_filename.start;
+
+ ret = nxt_php_dirname(&ctx->script_filename, &ctx->script_dirname);
+ if (nxt_slow_path(ret != NXT_OK)) {
+ nxt_unit_request_done(req, NXT_UNIT_ERROR);
+ nxt_free(ctx->script_filename.start);
+
+ return;
}
+ if (nxt_slow_path(nxt_php_request_init(ctx, req->request) != NXT_OK)) {
+ nxt_unit_request_done(req, NXT_UNIT_ERROR);
+ goto cleanup;
+ }
+
+ nxt_php_vcwd_chdir(ctx->req, &ctx->script_dirname);
+
+ php_execute_script(&file_handle TSRMLS_CC);
+
+ php_request_shutdown(NULL);
+
+ nxt_unit_request_done(req, NXT_UNIT_OK);
+
+cleanup:
+
+ nxt_free(ctx->script_filename.start);
+ nxt_free(ctx->script_dirname.start);
+}
+
+
+static int
+nxt_php_startup(sapi_module_struct *sapi_module)
+{
+ return php_module_startup(sapi_module, &nxt_php_unit_module, 1);
+}
+
+
+static nxt_int_t
+nxt_php_request_init(nxt_php_run_ctx_t *ctx, nxt_unit_request_t *r)
+{
+ nxt_unit_field_t *f;
+
SG(server_context) = ctx;
+ SG(options) |= SAPI_OPTION_NO_CHDIR;
SG(request_info).request_uri = nxt_unit_sptr_get(&r->target);
SG(request_info).request_method = nxt_unit_sptr_get(&r->method);
@@ -676,55 +860,41 @@ nxt_php_request_handler(nxt_unit_request_info_t *req)
SG(request_info).path_translated = NULL;
- nxt_memzero(&file_handle, sizeof(file_handle));
-
- file_handle.type = ZEND_HANDLE_FILENAME;
- file_handle.filename = (char *) ctx->script_filename.start;
-
- nxt_unit_req_debug(req, "handle.filename = '%s'",
+ nxt_unit_req_debug(ctx->req, "handle.filename = '%s'",
ctx->script_filename.start);
if (nxt_php_script_filename.start != NULL) {
- nxt_unit_req_debug(req, "run script %.*s in absolute mode",
+ nxt_unit_req_debug(ctx->req, "run script %.*s in absolute mode",
(int) nxt_php_script_filename.length,
(char *) nxt_php_script_filename.start);
} else {
- nxt_unit_req_debug(req, "run script %.*s",
+ nxt_unit_req_debug(ctx->req, "run script %.*s",
(int) ctx->script_filename.length,
(char *) ctx->script_filename.start);
}
-#if (NXT_PHP7)
+#ifdef NXT_PHP7
if (nxt_slow_path(php_request_startup() == FAILURE)) {
#else
if (nxt_slow_path(php_request_startup(TSRMLS_C) == FAILURE)) {
#endif
- nxt_unit_req_debug(req, "php_request_startup() failed");
- rc = NXT_UNIT_ERROR;
+ nxt_unit_req_debug(ctx->req, "php_request_startup() failed");
- goto fail;
+ return NXT_ERROR;
}
- rc = NXT_UNIT_OK;
-
- php_execute_script(&file_handle TSRMLS_CC);
- php_request_shutdown(NULL);
-
-fail:
-
- nxt_unit_request_done(req, rc);
-
- if (ctx->script_filename.start != nxt_php_script_filename.start) {
- nxt_free(ctx->script_filename.start);
- }
+ return NXT_OK;
}
-static int
-nxt_php_startup(sapi_module_struct *sapi_module)
+nxt_inline void
+nxt_php_vcwd_chdir(nxt_unit_request_info_t *req, const nxt_str_t *dir)
{
- return php_module_startup(sapi_module, NULL, 0);
+ if (nxt_slow_path(VCWD_CHDIR((char *) dir->start) != 0)) {
+ nxt_unit_req_alert(req, "VCWD_CHDIR(%s) failed (%d: %s)",
+ dir->start, errno, strerror(errno));
+ }
}
@@ -1001,6 +1171,41 @@ nxt_php_set_str(nxt_unit_request_info_t *req, const char *name,
}
+#ifdef NXT_PHP7
+
+static void *
+nxt_php_hash_str_find_ptr(const HashTable *ht, const nxt_str_t *str)
+{
+ return zend_hash_str_find_ptr(ht, (const char *) str->start, str->length);
+}
+
+#else
+
+static void *
+nxt_php_hash_str_find_ptr(const HashTable *ht, const nxt_str_t *str)
+{
+ int ret;
+ void *entry;
+ char buf[256];
+
+ if (nxt_slow_path(str->length >= (sizeof(buf) - 1))) {
+ return NULL;
+ }
+
+ nxt_memcpy(buf, str->start, str->length);
+ buf[str->length] = '\0';
+
+ ret = zend_hash_find(ht, buf, str->length + 1, &entry);
+ if (nxt_fast_path(ret == SUCCESS)) {
+ return entry;
+ }
+
+ return NULL;
+}
+
+#endif
+
+
static void
nxt_php_set_cstr(nxt_unit_request_info_t *req, const char *name,
const char *cstr, uint32_t len, zval *track_vars_array TSRMLS_DC)
diff --git a/src/nxt_python_wsgi.c b/src/nxt_python_wsgi.c
index ea8b6903..14211f3f 100644
--- a/src/nxt_python_wsgi.c
+++ b/src/nxt_python_wsgi.c
@@ -89,8 +89,12 @@ static PyObject *nxt_py_write(PyObject *self, PyObject *args);
static void nxt_py_input_dealloc(nxt_py_input_t *self);
static PyObject *nxt_py_input_read(nxt_py_input_t *self, PyObject *args);
static PyObject *nxt_py_input_readline(nxt_py_input_t *self, PyObject *args);
+static PyObject *nxt_py_input_getline(nxt_python_run_ctx_t *ctx, size_t size);
static PyObject *nxt_py_input_readlines(nxt_py_input_t *self, PyObject *args);
+static PyObject *nxt_py_input_iter(PyObject *self);
+static PyObject *nxt_py_input_next(PyObject *self);
+
static void nxt_python_print_exception(void);
static int nxt_python_write(nxt_python_run_ctx_t *ctx, PyObject *bytes);
@@ -142,6 +146,8 @@ static PyTypeObject nxt_py_input_type = {
.tp_dealloc = (destructor) nxt_py_input_dealloc,
.tp_flags = Py_TPFLAGS_DEFAULT,
.tp_doc = "unit input object.",
+ .tp_iter = nxt_py_input_iter,
+ .tp_iternext = nxt_py_input_next,
.tp_methods = nxt_py_input_methods,
};
@@ -1229,14 +1235,151 @@ nxt_py_input_read(nxt_py_input_t *self, PyObject *args)
static PyObject *
nxt_py_input_readline(nxt_py_input_t *self, PyObject *args)
{
- return PyBytes_FromStringAndSize("", 0);
+ ssize_t ssize;
+ PyObject *obj;
+ Py_ssize_t n;
+ nxt_python_run_ctx_t *ctx;
+
+ ctx = nxt_python_run_ctx;
+ if (nxt_slow_path(ctx == NULL)) {
+ return PyErr_Format(PyExc_RuntimeError,
+ "wsgi.input.readline() is called "
+ "outside of WSGI request processing");
+ }
+
+ n = PyTuple_GET_SIZE(args);
+
+ if (n > 0) {
+ if (n != 1) {
+ return PyErr_Format(PyExc_TypeError, "invalid number of arguments");
+ }
+
+ obj = PyTuple_GET_ITEM(args, 0);
+
+ ssize = PyNumber_AsSsize_t(obj, PyExc_OverflowError);
+
+ if (nxt_fast_path(ssize > 0)) {
+ return nxt_py_input_getline(ctx, ssize);
+ }
+
+ if (ssize == 0) {
+ return PyBytes_FromStringAndSize("", 0);
+ }
+
+ if (ssize != -1) {
+ return PyErr_Format(PyExc_ValueError,
+ "the read line size cannot be zero or less");
+ }
+
+ if (PyErr_Occurred()) {
+ return NULL;
+ }
+ }
+
+ return nxt_py_input_getline(ctx, SSIZE_MAX);
+}
+
+
+static PyObject *
+nxt_py_input_getline(nxt_python_run_ctx_t *ctx, size_t size)
+{
+ void *buf;
+ ssize_t res;
+ PyObject *content;
+
+ res = nxt_unit_request_readline_size(ctx->req, size);
+ if (nxt_slow_path(res < 0)) {
+ return NULL;
+ }
+
+ if (res == 0) {
+ return PyBytes_FromStringAndSize("", 0);
+ }
+
+ content = PyBytes_FromStringAndSize(NULL, res);
+ if (nxt_slow_path(content == NULL)) {
+ return NULL;
+ }
+
+ buf = PyBytes_AS_STRING(content);
+
+ res = nxt_unit_request_read(ctx->req, buf, res);
+
+ return content;
}
static PyObject *
nxt_py_input_readlines(nxt_py_input_t *self, PyObject *args)
{
- return PyList_New(0);
+ PyObject *res;
+ nxt_python_run_ctx_t *ctx;
+
+ ctx = nxt_python_run_ctx;
+ if (nxt_slow_path(ctx == NULL)) {
+ return PyErr_Format(PyExc_RuntimeError,
+ "wsgi.input.readlines() is called "
+ "outside of WSGI request processing");
+ }
+
+ res = PyList_New(0);
+ if (nxt_slow_path(res == NULL)) {
+ return NULL;
+ }
+
+ for ( ;; ) {
+ PyObject *line = nxt_py_input_getline(ctx, SSIZE_MAX);
+ if (nxt_slow_path(line == NULL)) {
+ Py_DECREF(res);
+ return NULL;
+ }
+
+ if (PyBytes_GET_SIZE(line) == 0) {
+ Py_DECREF(line);
+ return res;
+ }
+
+ PyList_Append(res, line);
+ Py_DECREF(line);
+ }
+
+ return res;
+}
+
+
+static PyObject *
+nxt_py_input_iter(PyObject *self)
+{
+ Py_INCREF(self);
+ return self;
+}
+
+
+static PyObject *
+nxt_py_input_next(PyObject *self)
+{
+ PyObject *line;
+ nxt_python_run_ctx_t *ctx;
+
+ ctx = nxt_python_run_ctx;
+ if (nxt_slow_path(ctx == NULL)) {
+ return PyErr_Format(PyExc_RuntimeError,
+ "wsgi.input.next() is called "
+ "outside of WSGI request processing");
+ }
+
+ line = nxt_py_input_getline(ctx, SSIZE_MAX);
+ if (nxt_slow_path(line == NULL)) {
+ return NULL;
+ }
+
+ if (PyBytes_GET_SIZE(line) == 0) {
+ Py_DECREF(line);
+ PyErr_SetNone(PyExc_StopIteration);
+ return NULL;
+ }
+
+ return line;
}
diff --git a/src/nxt_router.c b/src/nxt_router.c
index 3ff048c5..a913284c 100644
--- a/src/nxt_router.c
+++ b/src/nxt_router.c
@@ -1361,6 +1361,12 @@ static nxt_conf_map_t nxt_router_http_conf[] = {
NXT_CONF_MAP_MSEC,
offsetof(nxt_socket_conf_t, send_timeout),
},
+
+ {
+ nxt_string("body_temp_path"),
+ NXT_CONF_MAP_STR,
+ offsetof(nxt_socket_conf_t, body_temp_path),
+ },
};
@@ -1397,6 +1403,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
nxt_int_t ret;
nxt_str_t name, path;
nxt_app_t *app, *prev;
+ nxt_str_t *t;
nxt_router_t *router;
nxt_app_joint_t *app_joint;
nxt_conf_value_t *conf, *http, *value, *websocket;
@@ -1634,6 +1641,11 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
tmcf->router_conf->routes = routes;
}
+ ret = nxt_upstreams_create(task, tmcf, conf);
+ if (nxt_slow_path(ret != NXT_OK)) {
+ return ret;
+ }
+
http = nxt_conf_get_path(conf, &http_path);
#if 0
if (http == NULL) {
@@ -1693,6 +1705,8 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
skcf->websocket_conf.read_timeout = 60 * 1000;
skcf->websocket_conf.keepalive_interval = 30 * 1000;
+ nxt_str_null(&skcf->body_temp_path);
+
if (http != NULL) {
ret = nxt_conf_map_object(mp, http, nxt_router_http_conf,
nxt_nitems(nxt_router_http_conf),
@@ -1714,6 +1728,13 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
}
}
+ t = &skcf->body_temp_path;
+
+ if (t->length == 0) {
+ t->start = (u_char *) task->thread->runtime->tmp;
+ t->length = nxt_strlen(t->start);
+ }
+
#if (NXT_TLS)
value = nxt_conf_get_path(listener, &certificate_path);
@@ -1904,8 +1925,9 @@ nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name)
}
-nxt_app_t *
-nxt_router_listener_application(nxt_router_temp_conf_t *tmcf, nxt_str_t *name)
+void
+nxt_router_listener_application(nxt_router_temp_conf_t *tmcf, nxt_str_t *name,
+ nxt_http_action_t *action)
{
nxt_app_t *app;
@@ -1915,7 +1937,8 @@ nxt_router_listener_application(nxt_router_temp_conf_t *tmcf, nxt_str_t *name)
app = nxt_router_app_find(&tmcf->previous, name);
}
- return app;
+ action->u.application = app;
+ action->handler = nxt_http_application_handler;
}
@@ -2524,6 +2547,7 @@ nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
nxt_work_handler_t handler)
{
+ nxt_int_t ret;
nxt_joint_job_t *job;
nxt_queue_link_t *qlk;
nxt_socket_conf_t *skcf;
@@ -2557,6 +2581,11 @@ nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
job->work.data = joint;
+ ret = nxt_upstreams_joint_create(tmcf, &joint->upstreams);
+ if (nxt_slow_path(ret != NXT_OK)) {
+ return ret;
+ }
+
joint->count = 1;
skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
@@ -4152,6 +4181,13 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
state.req_app_link = re_ra;
state.app = app;
+ /*
+ * Need to increment use count "in advance" because
+ * nxt_router_port_select() will remove re_ra from lists
+ * and decrement use count.
+ */
+ nxt_request_app_link_inc_use(re_ra);
+
nxt_router_port_select(task, &state);
goto re_ra_cancelled;
@@ -4217,16 +4253,18 @@ re_ra_cancelled:
if (re_ra != NULL) {
if (nxt_router_port_post_select(task, &state) == NXT_OK) {
/*
- * There should be call nxt_request_app_link_inc_use(re_ra),
- * because of one more link in the queue.
- * Corresponding decrement is in nxt_router_app_process_request().
+ * Reference counter already incremented above, this will
+ * keep re_ra while nxt_router_app_process_request()
+ * task is in queue. Reference counter decreased in
+ * nxt_router_app_process_request() after processing.
*/
- nxt_request_app_link_inc_use(re_ra);
-
nxt_work_queue_add(&task->thread->engine->fast_work_queue,
nxt_router_app_process_request,
&task->thread->engine->task, app, re_ra);
+
+ } else {
+ nxt_request_app_link_use(task, re_ra, -1);
}
}
@@ -4234,15 +4272,14 @@ re_ra_cancelled:
/*
* There should be call nxt_request_app_link_inc_use(req_app_link),
* because of one more link in the queue. But one link was
- * recently removed from app->requests link.
+ * recently removed from app->requests linked list.
+ * Corresponding decrement is in nxt_router_app_process_request().
*/
nxt_work_queue_add(&task->thread->engine->fast_work_queue,
nxt_router_app_process_request,
&task->thread->engine->task, app, req_app_link);
- /* ... skip nxt_request_app_link_use(task, req_app_link, -1) too. */
-
goto adjust_use;
}
@@ -4684,6 +4721,21 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r,
return;
}
+ /*
+ * At this point we have request req_rpc_data allocated and registered
+ * in port handlers. Need to fixup request memory pool. Counterpart
+ * release will be called via following call chain:
+ * nxt_request_rpc_data_unlink() ->
+ * nxt_router_http_request_done() ->
+ * nxt_router_http_request_release()
+ */
+ nxt_mp_retain(r->mem_pool);
+
+ r->timer.task = &engine->task;
+ r->timer.work_queue = &engine->fast_work_queue;
+ r->timer.log = engine->task.log;
+ r->timer.bias = NXT_TIMER_DEFAULT_BIAS;
+
req_rpc_data->stream = nxt_port_rpc_ex_stream(req_rpc_data);
req_rpc_data->app = app;
@@ -4722,7 +4774,8 @@ static void
nxt_router_app_prepare_request(nxt_task_t *task,
nxt_request_app_link_t *req_app_link)
{
- nxt_buf_t *buf;
+ nxt_fd_t fd;
+ nxt_buf_t *buf, *body;
nxt_int_t res;
nxt_port_t *port, *c_port, *reply_port;
nxt_apr_action_t apr_action;
@@ -4781,8 +4834,14 @@ nxt_router_app_prepare_request(nxt_task_t *task,
goto release_port;
}
- res = nxt_port_socket_twrite(task, port, NXT_PORT_MSG_REQ_HEADERS,
- -1, req_app_link->stream, reply_port->id, buf,
+ body = req_app_link->request->body;
+ fd = (body != NULL && nxt_buf_is_file(body)) ? body->file->fd : -1;
+
+ res = nxt_port_socket_twrite(task, port,
+ NXT_PORT_MSG_REQ_HEADERS
+ | NXT_PORT_MSG_CLOSE_FD,
+ fd,
+ req_app_link->stream, reply_port->id, buf,
&req_app_link->msg_info.tracking);
if (nxt_slow_path(res != NXT_OK)) {
@@ -4791,6 +4850,10 @@ nxt_router_app_prepare_request(nxt_task_t *task,
goto release_port;
}
+ if (fd != -1) {
+ body->file->fd = -1;
+ }
+
release_port:
nxt_router_app_port_release(task, port, apr_action);
@@ -5115,6 +5178,10 @@ nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r,
}
}
+ if (r->body != NULL && nxt_buf_is_file(r->body)) {
+ lseek(r->body->file->fd, 0, SEEK_SET);
+ }
+
return out;
}
@@ -5185,6 +5252,13 @@ nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data)
state.req_app_link = pending_ra;
state.app = app;
+ /*
+ * Need to increment use count "in advance" because
+ * nxt_router_port_select() will remove pending_ra from lists
+ * and decrement use count.
+ */
+ nxt_request_app_link_inc_use(pending_ra);
+
nxt_router_port_select(task, &state);
} else {
@@ -5196,7 +5270,19 @@ nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data)
if (pending_ra != NULL) {
if (nxt_router_port_post_select(task, &state) == NXT_OK) {
- nxt_router_app_prepare_request(task, pending_ra);
+ /*
+ * Reference counter already incremented above, this will
+ * keep pending_ra while nxt_router_app_process_request()
+ * task is in queue. Reference counter decreased in
+ * nxt_router_app_process_request() after processing.
+ */
+
+ nxt_work_queue_add(&task->thread->engine->fast_work_queue,
+ nxt_router_app_process_request,
+ &task->thread->engine->task, app, pending_ra);
+
+ } else {
+ nxt_request_app_link_use(task, pending_ra, -1);
}
}
diff --git a/src/nxt_router.h b/src/nxt_router.h
index 1517c14b..08142ce3 100644
--- a/src/nxt_router.h
+++ b/src/nxt_router.h
@@ -18,6 +18,8 @@ typedef struct nxt_http_request_s nxt_http_request_t;
typedef struct nxt_http_action_s nxt_http_action_t;
typedef struct nxt_http_routes_s nxt_http_routes_t;
+typedef struct nxt_upstream_s nxt_upstream_t;
+typedef struct nxt_upstreams_s nxt_upstreams_t;
typedef struct nxt_router_access_log_s nxt_router_access_log_t;
@@ -43,6 +45,7 @@ typedef struct {
nxt_router_t *router;
nxt_http_routes_t *routes;
+ nxt_upstreams_t *upstreams;
nxt_lvlhsh_t mtypes_hash;
@@ -184,6 +187,8 @@ typedef struct {
nxt_websocket_conf_t websocket_conf;
+ nxt_str_t body_temp_path;
+
#if (NXT_TLS)
nxt_tls_conf_t *tls;
#endif
@@ -196,6 +201,8 @@ typedef struct {
nxt_event_engine_t *engine;
nxt_socket_conf_t *socket_conf;
+ nxt_upstream_t **upstreams;
+
/* Modules configuraitons. */
} nxt_socket_conf_joint_t;
@@ -218,8 +225,8 @@ void nxt_router_access_log_reopen_handler(nxt_task_t *task,
void nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r,
nxt_app_t *app);
void nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port);
-nxt_app_t *nxt_router_listener_application(nxt_router_temp_conf_t *tmcf,
- nxt_str_t *name);
+void nxt_router_listener_application(nxt_router_temp_conf_t *tmcf,
+ nxt_str_t *name, nxt_http_action_t *action);
void nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i);
void nxt_router_listen_event_release(nxt_task_t *task, nxt_listen_event_t *lev,
nxt_socket_conf_joint_t *joint);
diff --git a/src/nxt_runtime.c b/src/nxt_runtime.c
index 80b25c1b..f6d80ccb 100644
--- a/src/nxt_runtime.c
+++ b/src/nxt_runtime.c
@@ -693,6 +693,7 @@ nxt_runtime_conf_init(nxt_task_t *task, nxt_runtime_t *rt)
rt->modules = NXT_MODULES;
rt->state = NXT_STATE;
rt->control = NXT_CONTROL_SOCK;
+ rt->tmp = NXT_TMP;
nxt_memzero(&rt->capabilities, sizeof(nxt_capabilities_t));
@@ -835,6 +836,7 @@ nxt_runtime_conf_read_cmd(nxt_task_t *task, nxt_runtime_t *rt)
static const char no_modules[] =
"option \"--modules\" requires directory\n";
static const char no_state[] = "option \"--state\" requires directory\n";
+ static const char no_tmp[] = "option \"--tmp\" requires directory\n";
static const char help[] =
"\n"
@@ -859,6 +861,9 @@ nxt_runtime_conf_read_cmd(nxt_task_t *task, nxt_runtime_t *rt)
" --state DIRECTORY set state directory name\n"
" default: \"" NXT_STATE "\"\n"
"\n"
+ " --tmp DIRECTORY set tmp directory name\n"
+ " default: \"" NXT_TMP "\"\n"
+ "\n"
" --user USER set non-privileged processes to run"
" as specified user\n"
" default: \"" NXT_USER "\"\n"
@@ -966,6 +971,19 @@ nxt_runtime_conf_read_cmd(nxt_task_t *task, nxt_runtime_t *rt)
continue;
}
+ if (nxt_strcmp(p, "--tmp") == 0) {
+ if (*argv == NULL) {
+ write(STDERR_FILENO, no_tmp, nxt_length(no_tmp));
+ return NXT_ERROR;
+ }
+
+ p = *argv++;
+
+ rt->tmp = p;
+
+ continue;
+ }
+
if (nxt_strcmp(p, "--no-daemon") == 0) {
rt->daemon = 0;
continue;
diff --git a/src/nxt_runtime.h b/src/nxt_runtime.h
index f8d19ec6..a364c38c 100644
--- a/src/nxt_runtime.h
+++ b/src/nxt_runtime.h
@@ -68,6 +68,7 @@ struct nxt_runtime_s {
const char *conf;
const char *conf_tmp;
const char *control;
+ const char *tmp;
nxt_str_t certs;
diff --git a/src/nxt_unit.c b/src/nxt_unit.c
index 7c3d945c..7a4124fb 100644
--- a/src/nxt_unit.c
+++ b/src/nxt_unit.c
@@ -76,6 +76,8 @@ static nxt_unit_read_buf_t *nxt_unit_read_buf_get_impl(
nxt_unit_ctx_impl_t *ctx_impl);
static void nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx,
nxt_unit_read_buf_t *rbuf);
+static nxt_unit_mmap_buf_t *nxt_unit_request_preread(
+ nxt_unit_request_info_t *req, size_t size);
static ssize_t nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst,
size_t size);
static nxt_port_mmap_header_t *nxt_unit_mmap_get(nxt_unit_ctx_t *ctx,
@@ -961,6 +963,9 @@ nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
req_impl->incoming_buf->prev = &req_impl->incoming_buf;
recv_msg->incoming_buf = NULL;
+ req->content_fd = recv_msg->fd;
+ recv_msg->fd = -1;
+
req->response_max_fields = 0;
req_impl->state = NXT_UNIT_RS_START;
req_impl->websocket = 0;
@@ -1178,6 +1183,12 @@ nxt_unit_request_info_release(nxt_unit_request_info_t *req)
nxt_unit_mmap_buf_free(req_impl->incoming_buf);
}
+ if (req->content_fd != -1) {
+ close(req->content_fd);
+
+ req->content_fd = -1;
+ }
+
/*
* Process release should go after buffers release to guarantee mmap
* existence.
@@ -2423,8 +2434,144 @@ nxt_unit_response_write_cb(nxt_unit_request_info_t *req,
ssize_t
nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, size_t size)
{
- return nxt_unit_buf_read(&req->content_buf, &req->content_length,
- dst, size);
+ ssize_t buf_res, res;
+
+ buf_res = nxt_unit_buf_read(&req->content_buf, &req->content_length,
+ dst, size);
+
+ if (buf_res < (ssize_t) size && req->content_fd != -1) {
+ res = read(req->content_fd, dst, size);
+ if (res < 0) {
+ nxt_unit_req_alert(req, "failed to read content: %s (%d)",
+ strerror(errno), errno);
+
+ return res;
+ }
+
+ if (res < (ssize_t) size) {
+ close(req->content_fd);
+
+ req->content_fd = -1;
+ }
+
+ req->content_length -= res;
+ size -= res;
+
+ dst = nxt_pointer_to(dst, res);
+
+ } else {
+ res = 0;
+ }
+
+ return buf_res + res;
+}
+
+
+ssize_t
+nxt_unit_request_readline_size(nxt_unit_request_info_t *req, size_t max_size)
+{
+ char *p;
+ size_t l_size, b_size;
+ nxt_unit_buf_t *b;
+ nxt_unit_mmap_buf_t *mmap_buf, *preread_buf;
+
+ if (req->content_length == 0) {
+ return 0;
+ }
+
+ l_size = 0;
+
+ b = req->content_buf;
+
+ while (b != NULL) {
+ b_size = b->end - b->free;
+ p = memchr(b->free, '\n', b_size);
+
+ if (p != NULL) {
+ p++;
+ l_size += p - b->free;
+ break;
+ }
+
+ l_size += b_size;
+
+ if (max_size <= l_size) {
+ break;
+ }
+
+ mmap_buf = nxt_container_of(b, nxt_unit_mmap_buf_t, buf);
+ if (mmap_buf->next == NULL
+ && req->content_fd != -1
+ && l_size < req->content_length)
+ {
+ preread_buf = nxt_unit_request_preread(req, 16384);
+ if (nxt_slow_path(preread_buf == NULL)) {
+ return -1;
+ }
+
+ nxt_unit_mmap_buf_insert(&mmap_buf->next, preread_buf);
+ }
+
+ b = nxt_unit_buf_next(b);
+ }
+
+ return nxt_min(max_size, l_size);
+}
+
+
+static nxt_unit_mmap_buf_t *
+nxt_unit_request_preread(nxt_unit_request_info_t *req, size_t size)
+{
+ ssize_t res;
+ nxt_unit_mmap_buf_t *mmap_buf;
+
+ if (req->content_fd == -1) {
+ nxt_unit_req_alert(req, "preread: content_fd == -1");
+ return NULL;
+ }
+
+ mmap_buf = nxt_unit_mmap_buf_get(req->ctx);
+ if (nxt_slow_path(mmap_buf == NULL)) {
+ nxt_unit_req_alert(req, "preread: failed to allocate buf");
+ return NULL;
+ }
+
+ mmap_buf->free_ptr = malloc(size);
+ if (nxt_slow_path(mmap_buf->free_ptr == NULL)) {
+ nxt_unit_req_alert(req, "preread: failed to allocate buf memory");
+ nxt_unit_mmap_buf_release(mmap_buf);
+ return NULL;
+ }
+
+ mmap_buf->plain_ptr = mmap_buf->free_ptr;
+
+ mmap_buf->hdr = NULL;
+ mmap_buf->buf.start = mmap_buf->free_ptr;
+ mmap_buf->buf.free = mmap_buf->buf.start;
+ mmap_buf->buf.end = mmap_buf->buf.start + size;
+ mmap_buf->process = NULL;
+
+ res = read(req->content_fd, mmap_buf->free_ptr, size);
+ if (res < 0) {
+ nxt_unit_req_alert(req, "failed to read content: %s (%d)",
+ strerror(errno), errno);
+
+ nxt_unit_mmap_buf_free(mmap_buf);
+
+ return NULL;
+ }
+
+ if (res < (ssize_t) size) {
+ close(req->content_fd);
+
+ req->content_fd = -1;
+ }
+
+ nxt_unit_req_debug(req, "preread: read %d", (int) res);
+
+ mmap_buf->buf.end = mmap_buf->buf.free + res;
+
+ return mmap_buf;
}
@@ -2433,14 +2580,17 @@ nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst, size_t size)
{
u_char *p;
size_t rest, copy, read;
- nxt_unit_buf_t *buf;
+ nxt_unit_buf_t *buf, *last_buf;
p = dst;
rest = size;
buf = *b;
+ last_buf = buf;
while (buf != NULL) {
+ last_buf = buf;
+
copy = buf->end - buf->free;
copy = nxt_min(rest, copy);
@@ -2460,7 +2610,7 @@ nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst, size_t size)
buf = nxt_unit_buf_next(buf);
}
- *b = buf;
+ *b = last_buf;
read = size - rest;
diff --git a/src/nxt_unit.h b/src/nxt_unit.h
index c8aaa124..900f3ac2 100644
--- a/src/nxt_unit.h
+++ b/src/nxt_unit.h
@@ -103,6 +103,7 @@ struct nxt_unit_request_info_s {
nxt_unit_buf_t *content_buf;
uint64_t content_length;
+ int content_fd;
void *data;
};
@@ -335,6 +336,9 @@ int nxt_unit_response_write_cb(nxt_unit_request_info_t *req,
ssize_t nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst,
size_t size);
+ssize_t nxt_unit_request_readline_size(nxt_unit_request_info_t *req,
+ size_t max_size);
+
void nxt_unit_request_done(nxt_unit_request_info_t *req, int rc);
diff --git a/src/nxt_upstream.c b/src/nxt_upstream.c
index e1615120..66b6619a 100644
--- a/src/nxt_upstream.c
+++ b/src/nxt_upstream.c
@@ -4,40 +4,137 @@
* Copyright (C) NGINX, Inc.
*/
-#include <nxt_main.h>
+#include <nxt_router.h>
+#include <nxt_http.h>
+#include <nxt_upstream.h>
-typedef struct {
- void (*peer_get)(nxt_upstream_peer_t *up);
- void (*peer_free)(nxt_upstream_peer_t *up);
-} nxt_upstream_name_t;
+static nxt_http_action_t *nxt_upstream_handler(nxt_task_t *task,
+ nxt_http_request_t *r, nxt_http_action_t *action);
-static const nxt_upstream_name_t nxt_upstream_names[] = {
+nxt_int_t
+nxt_upstreams_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
+ nxt_conf_value_t *conf)
+{
+ size_t size;
+ uint32_t i, n, next;
+ nxt_mp_t *mp;
+ nxt_int_t ret;
+ nxt_str_t name, *string;
+ nxt_upstreams_t *upstreams;
+ nxt_conf_value_t *upstreams_conf, *upcf;
+
+ static nxt_str_t upstreams_name = nxt_string("upstreams");
+
+ upstreams_conf = nxt_conf_get_object_member(conf, &upstreams_name, NULL);
+
+ if (upstreams_conf == NULL) {
+ return NXT_OK;
+ }
+
+ n = nxt_conf_object_members_count(upstreams_conf);
+
+ if (n == 0) {
+ return NXT_OK;
+ }
+
+ mp = tmcf->router_conf->mem_pool;
+ size = sizeof(nxt_upstreams_t) + n * sizeof(nxt_upstream_t);
+
+ upstreams = nxt_mp_zalloc(mp, size);
+ if (nxt_slow_path(upstreams == NULL)) {
+ return NXT_ERROR;
+ }
+
+ upstreams->items = n;
+ next = 0;
+
+ for (i = 0; i < n; i++) {
+ upcf = nxt_conf_next_object_member(upstreams_conf, &name, &next);
+
+ string = nxt_str_dup(mp, &upstreams->upstream[i].name, &name);
+ if (nxt_slow_path(string == NULL)) {
+ return NXT_ERROR;
+ }
+
+ ret = nxt_upstream_round_robin_create(task, tmcf, upcf,
+ &upstreams->upstream[i]);
+ if (nxt_slow_path(ret != NXT_OK)) {
+ return NXT_ERROR;
+ }
+ }
+
+ tmcf->router_conf->upstreams = upstreams;
- { "round_robin", &nxt_upstream_round_robin },
-};
+ return NXT_OK;
+}
void
-nxt_upstream_create(nxt_upstream_peer_t *up)
+nxt_upstream_find(nxt_upstreams_t *upstreams, nxt_str_t *name,
+ nxt_http_action_t *action)
{
- /* TODO: dynamic balancer add & lvlhsh */
- nxt_upstream_names[0].create(up);
+ uint32_t i, n;
+ nxt_upstream_t *upstream;
+
+ upstream = &upstreams->upstream[0];
+ n = upstreams->items;
+
+ for (i = 0; i < n; i++) {
+ if (nxt_strstr_eq(&upstream[i].name, name)) {
+ action->u.upstream_number = i;
+ action->handler = nxt_upstream_handler;
+
+ return;
+ }
+ }
}
-void
-nxt_upstream_peer(nxt_upstream_peer_t *up)
+nxt_int_t
+nxt_upstreams_joint_create(nxt_router_temp_conf_t *tmcf,
+ nxt_upstream_t ***upstream_joint)
{
- nxt_upstream_t *u;
+ uint32_t i, n;
+ nxt_upstream_t *u, **up;
+ nxt_upstreams_t *upstreams;
+ nxt_router_conf_t *router_conf;
+
+ router_conf = tmcf->router_conf;
+ upstreams = router_conf->upstreams;
+
+ if (upstreams == NULL) {
+ *upstream_joint = NULL;
+ return NXT_OK;
+ }
- u = up->upstream;
+ n = upstreams->items;
- if (u != NULL) {
- u->peer_get(up);
- return;
+ up = nxt_mp_zalloc(router_conf->mem_pool, n * sizeof(nxt_upstream_t *));
+ if (nxt_slow_path(up == NULL)) {
+ return NXT_ERROR;
}
- nxt_upstream_create(up);
+ u = &upstreams->upstream[0];
+
+ for (i = 0; i < n; i++) {
+ up[i] = u[i].proto->joint_create(tmcf, &u[i]);
+ if (nxt_slow_path(up[i] == NULL)) {
+ return NXT_ERROR;
+ }
+ }
+
+ *upstream_joint = up;
+
+ return NXT_OK;
+}
+
+
+static nxt_http_action_t *
+nxt_upstream_handler(nxt_task_t *task, nxt_http_request_t *r,
+ nxt_http_action_t *action)
+{
+ return nxt_upstream_proxy_handler(task, r,
+ r->conf->upstreams[action->u.upstream_number]);
}
diff --git a/src/nxt_upstream.h b/src/nxt_upstream.h
index d1fca2a5..afc53774 100644
--- a/src/nxt_upstream.h
+++ b/src/nxt_upstream.h
@@ -8,40 +8,74 @@
#define _NXT_UPSTREAM_H_INCLUDED_
-typedef struct nxt_upstream_peer_s nxt_upstream_peer_t;
+typedef struct nxt_upstream_proxy_s nxt_upstream_proxy_t;
+typedef struct nxt_upstream_round_robin_s nxt_upstream_round_robin_t;
+typedef struct nxt_upstream_round_robin_server_s
+ nxt_upstream_round_robin_server_t;
-struct nxt_upstream_peer_s {
- /* STUB */
- void *upstream;
- void *data;
- /**/
+typedef void (*nxt_upstream_peer_ready_t)(nxt_task_t *task,
+ nxt_upstream_server_t *us);
+typedef void (*nxt_upstream_peer_error_t)(nxt_task_t *task,
+ nxt_upstream_server_t *us);
- nxt_sockaddr_t *sockaddr;
- nxt_nsec_t delay;
- uint32_t tries;
- in_port_t port;
+typedef struct {
+ nxt_upstream_peer_ready_t ready;
+ nxt_upstream_peer_error_t error;
+} nxt_upstream_peer_state_t;
- nxt_str_t addr;
- nxt_mp_t *mem_pool;
- void (*ready_handler)(nxt_task_t *task, nxt_upstream_peer_t *up);
- void (*protocol_handler)(nxt_upstream_source_t *us);
-};
+typedef nxt_upstream_t *(*nxt_upstream_joint_create_t)(
+ nxt_router_temp_conf_t *tmcf, nxt_upstream_t *upstream);
+typedef void (*nxt_upstream_server_get_t)(nxt_task_t *task,
+ nxt_upstream_server_t *us);
typedef struct {
- void (*ready_handler)(void *data);
- nxt_work_handler_t completion_handler;
- nxt_work_handler_t error_handler;
-} nxt_upstream_state_t;
+ nxt_upstream_joint_create_t joint_create;
+ nxt_upstream_server_get_t get;
+} nxt_upstream_server_proto_t;
+
+
+struct nxt_upstream_s {
+ const nxt_upstream_server_proto_t *proto;
+
+ union {
+ nxt_upstream_proxy_t *proxy;
+ nxt_upstream_round_robin_t *round_robin;
+ } type;
+
+ nxt_str_t name;
+};
+
+
+struct nxt_upstreams_s {
+ uint32_t items;
+ nxt_upstream_t upstream[0];
+};
+
+
+struct nxt_upstream_server_s {
+ nxt_sockaddr_t *sockaddr;
+ const nxt_upstream_peer_state_t *state;
+ nxt_upstream_t *upstream;
+
+ uint8_t protocol;
+
+ union {
+ nxt_upstream_round_robin_server_t *round_robin;
+ } server;
+
+ union {
+ nxt_http_peer_t *http;
+ } peer;
+};
-/* STUB */
-NXT_EXPORT void nxt_upstream_round_robin_peer(nxt_task_t *task,
- nxt_upstream_peer_t *up);
-/**/
+nxt_int_t nxt_upstream_round_robin_create(nxt_task_t *task,
+ nxt_router_temp_conf_t *tmcf, nxt_conf_value_t *upstream_conf,
+ nxt_upstream_t *upstream);
#endif /* _NXT_UPSTREAM_H_INCLUDED_ */
diff --git a/src/nxt_upstream_round_robin.c b/src/nxt_upstream_round_robin.c
index 09a3bce3..fd76ecb5 100644
--- a/src/nxt_upstream_round_robin.c
+++ b/src/nxt_upstream_round_robin.c
@@ -4,197 +4,188 @@
* Copyright (C) NGINX, Inc.
*/
-#include <nxt_main.h>
+#include <nxt_router.h>
+#include <nxt_http.h>
+#include <nxt_upstream.h>
-typedef struct {
- int32_t weight;
- int32_t effective_weight;
- int32_t current_weight;
- uint32_t down; /* 1 bit */
- nxt_msec_t last_accessed;
- nxt_sockaddr_t *sockaddr;
-} nxt_upstream_round_robin_peer_t;
+struct nxt_upstream_round_robin_server_s {
+ nxt_sockaddr_t *sockaddr;
+ int32_t current_weight;
+ int32_t effective_weight;
+ int32_t weight;
-typedef struct {
- nxt_uint_t npeers;
- nxt_upstream_round_robin_peer_t *peers;
- nxt_thread_spinlock_t lock;
-} nxt_upstream_round_robin_t;
+ uint8_t protocol;
+};
-static void nxt_upstream_round_robin_create(nxt_task_t *task, void *obj,
- void *data);
-static void nxt_upstream_round_robin_peer_error(nxt_task_t *task, void *obj,
- void *data);
-static void nxt_upstream_round_robin_get_peer(nxt_task_t *task,
- nxt_upstream_peer_t *up);
+struct nxt_upstream_round_robin_s {
+ uint32_t items;
+ nxt_upstream_round_robin_server_t server[0];
+};
-void
-nxt_upstream_round_robin_peer(nxt_task_t *task, nxt_upstream_peer_t *up)
-{
- nxt_job_sockaddr_parse_t *jbs;
+static nxt_upstream_t *nxt_upstream_round_robin_joint_create(
+ nxt_router_temp_conf_t *tmcf, nxt_upstream_t *upstream);
+static void nxt_upstream_round_robin_server_get(nxt_task_t *task,
+ nxt_upstream_server_t *us);
- if (up->upstream != NULL) {
- nxt_upstream_round_robin_get_peer(task, up);
- }
- jbs = nxt_job_create(up->mem_pool, sizeof(nxt_job_sockaddr_parse_t));
- if (nxt_slow_path(jbs == NULL)) {
- up->ready_handler(task, up);
- return;
- }
+static const nxt_upstream_server_proto_t nxt_upstream_round_robin_proto = {
+ .joint_create = nxt_upstream_round_robin_joint_create,
+ .get = nxt_upstream_round_robin_server_get,
+};
- jbs->resolve.job.task = task;
- jbs->resolve.job.data = up;
- jbs->resolve.port = up->port;
- jbs->resolve.log_level = NXT_LOG_ERR;
- jbs->resolve.ready_handler = nxt_upstream_round_robin_create;
- jbs->resolve.error_handler = nxt_upstream_round_robin_peer_error;
- jbs->addr = up->addr;
- nxt_job_sockaddr_parse(jbs);
-}
+static nxt_conf_map_t nxt_upstream_round_robin_server_conf[] = {
+ {
+ nxt_string("weight"),
+ NXT_CONF_MAP_INT32,
+ offsetof(nxt_upstream_round_robin_server_t, weight),
+ },
+};
-static void
-nxt_upstream_round_robin_create(nxt_task_t *task, void *obj, void *data)
+nxt_int_t
+nxt_upstream_round_robin_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
+ nxt_conf_value_t *upstream_conf, nxt_upstream_t *upstream)
{
- nxt_uint_t i;
- nxt_sockaddr_t *sa;
- nxt_upstream_peer_t *up;
- nxt_job_sockaddr_parse_t *jbs;
- nxt_upstream_round_robin_t *urr;
- nxt_upstream_round_robin_peer_t *peer;
+ size_t size;
+ uint32_t i, n, next;
+ nxt_mp_t *mp;
+ nxt_str_t name;
+ nxt_sockaddr_t *sa;
+ nxt_conf_value_t *servers_conf, *srvcf;
+ nxt_upstream_round_robin_t *urr;
- jbs = obj;
- up = jbs->resolve.job.data;
+ static nxt_str_t servers = nxt_string("servers");
- urr = nxt_mp_zget(up->mem_pool, sizeof(nxt_upstream_round_robin_t));
- if (nxt_slow_path(urr == NULL)) {
- goto fail;
- }
+ mp = tmcf->router_conf->mem_pool;
- urr->npeers = jbs->resolve.count;
+ servers_conf = nxt_conf_get_object_member(upstream_conf, &servers, NULL);
+ n = nxt_conf_object_members_count(servers_conf);
- peer = nxt_mp_zget(up->mem_pool,
- urr->npeers * sizeof(nxt_upstream_round_robin_peer_t));
- if (nxt_slow_path(peer == NULL)) {
- goto fail;
+ size = sizeof(nxt_upstream_round_robin_t)
+ + n * sizeof(nxt_upstream_round_robin_server_t);
+
+ urr = nxt_mp_zalloc(mp, size);
+ if (nxt_slow_path(urr == NULL)) {
+ return NXT_ERROR;
}
- urr->peers = peer;
+ urr->items = n;
+ next = 0;
- for (i = 0; i < urr->npeers; i++) {
- peer[i].weight = 1;
- peer[i].effective_weight = 1;
+ for (i = 0; i < n; i++) {
+ srvcf = nxt_conf_next_object_member(servers_conf, &name, &next);
- sa = jbs->resolve.sockaddrs[i];
+ sa = nxt_sockaddr_parse(mp, &name);
+ if (nxt_slow_path(sa == NULL)) {
+ return NXT_ERROR;
+ }
- /* STUB */
sa->type = SOCK_STREAM;
- nxt_sockaddr_text(sa);
+ urr->server[i].sockaddr = sa;
+ urr->server[i].weight = 1;
+ urr->server[i].protocol = NXT_HTTP_PROTO_H1;
- nxt_debug(task, "upstream peer: %*s",
- (size_t) sa->length, nxt_sockaddr_start(sa));
+ nxt_conf_map_object(mp, srvcf, nxt_upstream_round_robin_server_conf,
+ nxt_nitems(nxt_upstream_round_robin_server_conf),
+ &urr->server[i]);
- /* TODO: memcpy to shared memory pool. */
- peer[i].sockaddr = sa;
+ urr->server[i].effective_weight = urr->server[i].weight;
}
- up->upstream = urr;
+ upstream->proto = &nxt_upstream_round_robin_proto;
+ upstream->type.round_robin = urr;
- /* STUB */
- up->sockaddr = peer[0].sockaddr;
+ return NXT_OK;
+}
- nxt_job_destroy(task, jbs);
- up->ready_handler(task, up);
- //nxt_upstream_round_robin_get_peer(up);
- return;
+static nxt_upstream_t *
+nxt_upstream_round_robin_joint_create(nxt_router_temp_conf_t *tmcf,
+ nxt_upstream_t *upstream)
+{
+ size_t size;
+ uint32_t i, n;
+ nxt_mp_t *mp;
+ nxt_upstream_t *u;
+ nxt_upstream_round_robin_t *urr, *urrcf;
-fail:
+ mp = tmcf->router_conf->mem_pool;
- nxt_job_destroy(task, jbs);
+ u = nxt_mp_alloc(mp, sizeof(nxt_upstream_t));
+ if (nxt_slow_path(u == NULL)) {
+ return NULL;
+ }
- up->ready_handler(task, up);
-}
+ *u = *upstream;
+ urrcf = upstream->type.round_robin;
-static void
-nxt_upstream_round_robin_peer_error(nxt_task_t *task, void *obj, void *data)
-{
- nxt_upstream_peer_t *up;
- nxt_job_sockaddr_parse_t *jbs;
+ size = sizeof(nxt_upstream_round_robin_t)
+ + urrcf->items * sizeof(nxt_upstream_round_robin_server_t);
- jbs = obj;
- up = jbs->resolve.job.data;
+ urr = nxt_mp_alloc(mp, size);
+ if (nxt_slow_path(urr == NULL)) {
+ return NULL;
+ }
- up->ready_handler(task, up);
-}
+ u->type.round_robin = urr;
+ n = urrcf->items;
+ urr->items = n;
-static void
-nxt_upstream_round_robin_get_peer(nxt_task_t *task, nxt_upstream_peer_t *up)
-{
- int32_t effective_weights;
- nxt_uint_t i;
- nxt_msec_t now;
- nxt_upstream_round_robin_t *urr;
- nxt_upstream_round_robin_peer_t *peer, *best;
+ for (i = 0; i < n; i++) {
+ urr->server[i] = urrcf->server[i];
+ }
- urr = up->upstream;
+ return u;
+}
- now = task->thread->engine->timers.now;
- nxt_thread_spin_lock(&urr->lock);
+static void
+nxt_upstream_round_robin_server_get(nxt_task_t *task, nxt_upstream_server_t *us)
+{
+ int32_t total;
+ uint32_t i, n;
+ nxt_upstream_round_robin_t *round_robin;
+ nxt_upstream_round_robin_server_t *s, *best;
best = NULL;
- effective_weights = 0;
- peer = urr->peers;
+ total = 0;
- for (i = 0; i < urr->npeers; i++) {
-
- if (peer[i].down) {
- continue;
- }
+ round_robin = us->upstream->type.round_robin;
-#if 0
- if (peer[i].max_fails != 0 && peer[i].fails >= peer->max_fails) {
- good = peer[i].last_accessed + peer[i].fail_timeout;
+ s = round_robin->server;
+ n = round_robin->items;
- if (nxt_msec_diff(now, peer[i].last_accessed) <= 0) {
- continue;
- }
- }
-#endif
+ for (i = 0; i < n; i++) {
- peer[i].current_weight += peer[i].effective_weight;
- effective_weights += peer[i].effective_weight;
+ s[i].current_weight += s[i].effective_weight;
+ total += s[i].effective_weight;
- if (peer[i].effective_weight < peer[i].weight) {
- peer[i].effective_weight++;
+ if (s[i].effective_weight < s[i].weight) {
+ s[i].effective_weight++;
}
- if (best == NULL || peer[i].current_weight > best->current_weight) {
- best = &peer[i];
+ if (best == NULL || s[i].current_weight > best->current_weight) {
+ best = &s[i];
}
}
- if (best != NULL) {
- best->current_weight -= effective_weights;
- best->last_accessed = now;
-
- up->sockaddr = best->sockaddr;
-
- } else {
- up->sockaddr = NULL;
+ if (best == NULL) {
+ us->state->error(task, us);
+ return;
}
- nxt_thread_spin_unlock(&urr->lock);
+ best->current_weight -= total;
+ us->sockaddr = best->sockaddr;
+ us->protocol = best->protocol;
+ us->server.round_robin = best;
- up->ready_handler(task, up);
+ us->state->ready(task, us);
}
diff --git a/src/ruby/nxt_ruby_stream_io.c b/src/ruby/nxt_ruby_stream_io.c
index 7e8b3ce1..cc110035 100644
--- a/src/ruby/nxt_ruby_stream_io.c
+++ b/src/ruby/nxt_ruby_stream_io.c
@@ -88,9 +88,7 @@ static VALUE
nxt_ruby_stream_io_gets(VALUE obj)
{
VALUE buf;
- char *p;
- size_t size, b_size;
- nxt_unit_buf_t *b;
+ ssize_t res;
nxt_ruby_run_ctx_t *run_ctx;
nxt_unit_request_info_t *req;
@@ -102,30 +100,20 @@ nxt_ruby_stream_io_gets(VALUE obj)
return Qnil;
}
- size = 0;
-
- for (b = req->content_buf; b; b = nxt_unit_buf_next(b)) {
- b_size = b->end - b->free;
- p = memchr(b->free, '\n', b_size);
-
- if (p != NULL) {
- p++;
- size += p - b->free;
- break;
- }
-
- size += b_size;
+ res = nxt_unit_request_readline_size(req, SSIZE_MAX);
+ if (nxt_slow_path(res < 0)) {
+ return Qnil;
}
- buf = rb_str_buf_new(size);
+ buf = rb_str_buf_new(res);
- if (buf == Qnil) {
+ if (nxt_slow_path(buf == Qnil)) {
return Qnil;
}
- size = nxt_unit_request_read(req, RSTRING_PTR(buf), size);
+ res = nxt_unit_request_read(req, RSTRING_PTR(buf), res);
- rb_str_set_len(buf, size);
+ rb_str_set_len(buf, res);
return buf;
}