diff options
Diffstat (limited to '')
86 files changed, 3555 insertions, 720 deletions
@@ -31,3 +31,4 @@ b651ff72ffe080835f884a1ace8fa24eb33e3569 1.10.0-2 c27c08b0deeee58676f3880f0f315388ce5d9322 1.11.0-2 b391df5f0102aa6afe660cfc863729c1b1111c9e 1.12.0 c1625c52dd6444ed613348719fbb54c7abcc6619 1.12.0-1 +3313bf222e6e0a91213946dfcbd70bb5079f4cef 1.13.0 @@ -1,4 +1,24 @@ +Changes with Unit 1.13.0 14 Nov 2019 + + *) Feature: basic support for HTTP reverse proxying. + + *) Feature: compatibility with Python 3.8. + + *) Bugfix: memory leak in Python application processes when the close + handler was used. + + *) Bugfix: threads in Python applications might not work correctly. + + *) Bugfix: Ruby on Rails applications might not work on Ruby 2.6. + + *) Bugfix: backtraces for uncaught exceptions in Python 3 might be + logged with significant delays. + + *) Bugfix: explicit setting a namespaces isolation option to false might + have enabled it. + + Changes with Unit 1.12.0 03 Oct 2019 *) Feature: compatibility with PHP 7.4. @@ -14,6 +14,7 @@ mkdir -p $NXT_BUILD_DIR/src \ cat << END > $NXT_MAKEFILE CC = $CC +AR = $AR CFLAGS = $NXT_CFLAGS $NXT_CC_OPT $CFLAGS diff --git a/auto/modules/python b/auto/modules/python index abd145c9..6c8198f5 100644 --- a/auto/modules/python +++ b/auto/modules/python @@ -64,6 +64,10 @@ nxt_found=no if /bin/sh -c "$NXT_PYTHON_CONFIG --prefix" >> $NXT_AUTOCONF_ERR 2>&1; then + if ${NXT_PYTHON_CONFIG} --embed >/dev/null 2>&1; then + NXT_PYTHON_CONFIG="${NXT_PYTHON_CONFIG} --embed" + fi + NXT_PYTHON_INCLUDE=`${NXT_PYTHON_CONFIG} --includes` NXT_PYTHON_LIBS=`${NXT_PYTHON_CONFIG} --ldflags` diff --git a/auto/os/conf b/auto/os/conf index 1e298ecd..02c4afaf 100644 --- a/auto/os/conf +++ b/auto/os/conf @@ -21,7 +21,7 @@ case "$NXT_SYSTEM" in Linux) nxt_have=NXT_LINUX . auto/have - NXT_STATIC_LINK="ar -r -c" + NXT_STATIC_LINK="\$(AR) -r -c" NXT_SHARED_LINK="\$(CC) -shared -Wl,-soname,libnxt.so" NXT_SHARED_LOCAL_LINK="\$(CC) -shared \ -Wl,-soname,\\\$\$ORIGIN/libnxt.so" @@ -44,7 +44,7 @@ case "$NXT_SYSTEM" in FreeBSD) nxt_have=NXT_FREEBSD . auto/have - NXT_STATIC_LINK="ar -r -c" + NXT_STATIC_LINK="\$(AR) -r -c" NXT_SHARED_LINK="\$(CC) -shared -Wl,-soname,libnxt.so" NXT_SHARED_LOCAL_LINK="\$(CC) -shared \ -Wl,-soname,\\\$\$ORIGIN/libnxt.so" @@ -71,14 +71,14 @@ case "$NXT_SYSTEM" in case "$NXT_CC_NAME" in SunC): - NXT_STATIC_LINK="ar -r -c" + NXT_STATIC_LINK="\$(AR) -r -c" NXT_SHARED_LINK="\$(CC) -G -h libnxt.so" NXT_SHARED_LOCAL_LINK="\$(CC) -G -h \\\$\$ORIGIN/libnxt.so" NXT_MODULE_LINK="\$(CC) -G" ;; *) - NXT_STATIC_LINK="ar -r -c" + NXT_STATIC_LINK="\$(AR) -r -c" NXT_SHARED_LINK="\$(CC) -shared -Wl,-soname,libnxt.so" NXT_SHARED_LOCAL_LINK="\$(CC) -shared \ -Wl,-soname,\\\$\$ORIGIN/libnxt.so" @@ -106,7 +106,7 @@ case "$NXT_SYSTEM" in # HFS+ volumes are caseless by default. nxt_have=NXT_HAVE_CASELESS_FILESYSTEM . auto/have - NXT_STATIC_LINK="ar -r -c" + NXT_STATIC_LINK="\$(AR) -r -c" NXT_SHARED_LINK="\$(CC) -dynamiclib" NXT_SHARED_LOCAL_LINK="\$(CC) -dynamiclib \ -install_name @executable_path/libnxt.dylib" @@ -130,7 +130,7 @@ case "$NXT_SYSTEM" in NetBSD) nxt_have=NXT_NETBSD . auto/have - NXT_STATIC_LINK="ar -r -c" + NXT_STATIC_LINK="\$(AR) -r -c" NXT_SHARED_LINK="\$(CC) -shared" NXT_SHARED_LOCAL_LINK="\$(CC) -shared" NXT_MODULE_LINK="\$(CC) -shared" @@ -152,7 +152,7 @@ case "$NXT_SYSTEM" in OpenBSD) nxt_have=NXT_OPENBSD . auto/have - NXT_STATIC_LINK="ar -r -c" + NXT_STATIC_LINK="\$(AR) -r -c" NXT_SHARED_LINK="\$(CC) -shared" NXT_SHARED_LOCAL_LINK="\$(CC) -shared" NXT_MODULE_LINK="\$(CC) -shared" @@ -174,7 +174,7 @@ case "$NXT_SYSTEM" in DragonFly) nxt_have=NXT_DRAGONFLY . auto/have - NXT_STATIC_LINK="ar -r -c" + NXT_STATIC_LINK="\$(AR) -r -c" NXT_SHARED_LINK="\$(CC) -shared" NXT_SHARED_LOCAL_LINK="\$(CC) -shared" NXT_MODULE_LINK="\$(CC) -shared" @@ -196,7 +196,7 @@ case "$NXT_SYSTEM" in AIX) nxt_have=NXT_AIX . auto/have - NXT_STATIC_LINK="ar -r -c" + NXT_STATIC_LINK="\$(AR) -r -c" NXT_SHARED_LINK="\$(CC) -G" NXT_SHARED_LOCAL_LINK="\$(CC) -G" NXT_MODULE_LINK="\$(CC) -G" @@ -220,7 +220,7 @@ case "$NXT_SYSTEM" in NXT_EXEC_LINK="\$(CC)" NXT_SHARED_LOCAL_EXEC_LINK= - NXT_STATIC_LINK="ar -r -c" + NXT_STATIC_LINK="\$(AR) -r -c" NXT_SHARED_LINK="\$(CC) -shared" NXT_SHARED_LOCAL_LINK="\$(CC) -shared" NXT_MODULE_LINK="\$(CC) -shared" @@ -238,7 +238,7 @@ case "$NXT_SYSTEM" in QNX) nxt_have=NXT_QNX . auto/have - NXT_STATIC_LINK="ar -r -c" + NXT_STATIC_LINK="\$(AR) -r -c" NXT_SHARED_LINK="\$(CC) -shared" NXT_SHARED_LOCAL_LINK="\$(CC) -shared" NXT_MODULE_LINK="\$(CC) -shared" @@ -257,7 +257,7 @@ case "$NXT_SYSTEM" in ;; *) - NXT_STATIC_LINK="ar -r -c" + NXT_STATIC_LINK="\$(AR) -r -c" NXT_SHARED_LINK="\$(CC) -shared" NXT_SHARED_LOCAL_LINK="\$(CC) -shared" NXT_MODULE_LINK="\$(CC) -shared" diff --git a/auto/os/test b/auto/os/test index 3188d3db..c37700a6 100644 --- a/auto/os/test +++ b/auto/os/test @@ -14,6 +14,7 @@ case "$NXT_SYSTEM" in NXT_SYSTEM_PLATFORM=`uname -m 2>/dev/null` echo=echo CC=${CC:-cc} + AR=${AR:-ar} ;; FreeBSD | NetBSD | OpenBSD | DragonFly) @@ -21,6 +22,7 @@ case "$NXT_SYSTEM" in NXT_SYSTEM_PLATFORM=`uname -m 2>/dev/null` echo=echo CC=${CC:-cc} + AR=${AR:-ar} ;; SunOS) @@ -28,6 +30,7 @@ case "$NXT_SYSTEM" in NXT_SYSTEM_PLATFORM=`uname -p 2>/dev/null` echo=echo CC=${CC:-gcc} + AR=${AR:-ar} NXT_TEST_CFLAGS="$NXT_TEST_CFLAGS -D_XOPEN_SOURCE" NXT_TEST_CFLAGS="$NXT_TEST_CFLAGS -D_XOPEN_SOURCE_EXTENDED=1" @@ -40,6 +43,7 @@ case "$NXT_SYSTEM" in NXT_SYSTEM_PLATFORM=`uname -m 2>/dev/null` echo=echo CC=${CC:-cc} + AR=${AR:-ar} ;; AIX) @@ -47,6 +51,7 @@ case "$NXT_SYSTEM" in NXT_SYSTEM_PLATFORM=`uname -p 2>/dev/null` echo=echo CC=${CC:-gcc} + AR=${AR:-ar} ;; HP-UX) @@ -54,6 +59,7 @@ case "$NXT_SYSTEM" in NXT_SYSTEM_PLATFORM=`uname -m 2>/dev/null` echo=echo CC=${CC:-gcc} + AR=${AR:-ar} NXT_TEST_CFLAGS="$NXT_TEST_CFLAGS -D_XOPEN_SOURCE" NXT_TEST_CFLAGS="$NXT_TEST_CFLAGS -D_XOPEN_SOURCE_EXTENDED" @@ -65,6 +71,7 @@ case "$NXT_SYSTEM" in NXT_SYSTEM_PLATFORM=`uname -p 2>/dev/null` echo=echo CC=${CC:-gcc} + AR=${AR:-ar} ;; MINGW*) @@ -76,6 +83,7 @@ case "$NXT_SYSTEM" in NXT_SYSTEM_PLATFORM=`uname -m 2>/dev/null` echo=auto/echo/echo.exe CC=${CC:-cl} + AR=${AR:-ar} NXT_WINDOWS=YES ;; @@ -84,6 +92,7 @@ case "$NXT_SYSTEM" in NXT_SYSTEM_PLATFORM=`uname -p 2>/dev/null` echo=echo CC=${CC:-gcc} + AR=${AR:-ar} ;; esac diff --git a/auto/sources b/auto/sources index c4b3808b..155e388b 100644 --- a/auto/sources +++ b/auto/sources @@ -85,6 +85,7 @@ NXT_LIB_SRCS=" \ src/nxt_http_error.c \ src/nxt_http_route.c \ src/nxt_http_static.c \ + src/nxt_http_proxy.c \ src/nxt_application.c \ src/nxt_external.c \ src/nxt_port_hash.c \ diff --git a/docs/changes.xml b/docs/changes.xml index 93b56293..a0dd7009 100644 --- a/docs/changes.xml +++ b/docs/changes.xml @@ -12,6 +12,76 @@ unit-perl unit-ruby unit-jsc-common unit-jsc8 unit-jsc10 unit-jsc11" + ver="1.13.0" rev="1" + date="2019-11-14" time="18:00:00 +0300" + packager="Andrei Belov <defan@nginx.com>"> + +<change> +<para> +NGINX Unit updated to 1.13.0. +</para> +</change> + +</changes> + + +<changes apply="unit" ver="1.13.0" rev="1" + date="2019-11-14" time="18:00:00 +0300" + packager="Andrei Belov <defan@nginx.com>"> + +<change type="feature"> +<para> +basic support for HTTP reverse proxying. +</para> +</change> + +<change type="feature"> +<para> +compatibility with Python 3.8. +</para> +</change> + +<change type="bugfix"> +<para> +memory leak in Python application processes when the close handler was used. +</para> +</change> + +<change type="bugfix"> +<para> +threads in Python applications might not work correctly. +</para> +</change> + +<change type="bugfix"> +<para> +Ruby on Rails applications might not work on Ruby 2.6. +</para> +</change> + +<change type="bugfix"> +<para> +backtraces for uncaught exceptions in Python 3 might be logged with significant +delays. +</para> +</change> + +<change type="bugfix"> +<para> +explicit setting a namespaces isolation option to false might have enabled it. +</para> +</change> + +</changes> + + +<changes apply="unit-php + unit-python unit-python2.7 + unit-python3.4 unit-python3.5 unit-python3.6 unit-python3.7 + unit-go unit-go1.7 unit-go1.8 unit-go1.9 unit-go1.10 unit-go1.11 + unit-perl + unit-ruby + unit-jsc-common unit-jsc8 unit-jsc10 unit-jsc11" ver="1.12.0" rev="1" date="2019-10-03" time="18:00:00 +0300" packager="Andrei Belov <defan@nginx.com>"> diff --git a/pkg/docker/Dockerfile.full b/pkg/docker/Dockerfile.full index c6646fda..cf3b1150 100644 --- a/pkg/docker/Dockerfile.full +++ b/pkg/docker/Dockerfile.full @@ -2,7 +2,7 @@ FROM debian:stretch-slim LABEL maintainer="NGINX Docker Maintainers <docker-maint@nginx.com>" -ENV UNIT_VERSION 1.12.0-1~stretch +ENV UNIT_VERSION 1.13.0-1~stretch RUN set -x \ && apt-get update \ diff --git a/pkg/docker/Dockerfile.go1.7-dev b/pkg/docker/Dockerfile.go1.7-dev index 9b57d3ec..5622271e 100644 --- a/pkg/docker/Dockerfile.go1.7-dev +++ b/pkg/docker/Dockerfile.go1.7-dev @@ -2,7 +2,7 @@ FROM debian:stretch-slim LABEL maintainer="NGINX Docker Maintainers <docker-maint@nginx.com>" -ENV UNIT_VERSION 1.12.0-1~stretch +ENV UNIT_VERSION 1.13.0-1~stretch RUN set -x \ && apt-get update \ diff --git a/pkg/docker/Dockerfile.go1.8-dev b/pkg/docker/Dockerfile.go1.8-dev index 5f650aae..d38b669b 100644 --- a/pkg/docker/Dockerfile.go1.8-dev +++ b/pkg/docker/Dockerfile.go1.8-dev @@ -2,7 +2,7 @@ FROM debian:stretch-slim LABEL maintainer="NGINX Docker Maintainers <docker-maint@nginx.com>" -ENV UNIT_VERSION 1.12.0-1~stretch +ENV UNIT_VERSION 1.13.0-1~stretch RUN set -x \ && apt-get update \ diff --git a/pkg/docker/Dockerfile.minimal b/pkg/docker/Dockerfile.minimal index 427a3ada..4567b2da 100644 --- a/pkg/docker/Dockerfile.minimal +++ b/pkg/docker/Dockerfile.minimal @@ -2,7 +2,7 @@ FROM debian:stretch-slim LABEL maintainer="NGINX Docker Maintainers <docker-maint@nginx.com>" -ENV UNIT_VERSION 1.12.0-1~stretch +ENV UNIT_VERSION 1.13.0-1~stretch RUN set -x \ && apt-get update \ diff --git a/pkg/docker/Dockerfile.perl5.24 b/pkg/docker/Dockerfile.perl5.24 index 7e51adcf..64d68a4e 100644 --- a/pkg/docker/Dockerfile.perl5.24 +++ b/pkg/docker/Dockerfile.perl5.24 @@ -2,7 +2,7 @@ FROM debian:stretch-slim LABEL maintainer="NGINX Docker Maintainers <docker-maint@nginx.com>" -ENV UNIT_VERSION 1.12.0-1~stretch +ENV UNIT_VERSION 1.13.0-1~stretch RUN set -x \ && apt-get update \ diff --git a/pkg/docker/Dockerfile.php7.0 b/pkg/docker/Dockerfile.php7.0 index 9615e041..649de902 100644 --- a/pkg/docker/Dockerfile.php7.0 +++ b/pkg/docker/Dockerfile.php7.0 @@ -2,7 +2,7 @@ FROM debian:stretch-slim LABEL maintainer="NGINX Docker Maintainers <docker-maint@nginx.com>" -ENV UNIT_VERSION 1.12.0-1~stretch +ENV UNIT_VERSION 1.13.0-1~stretch RUN set -x \ && apt-get update \ diff --git a/pkg/docker/Dockerfile.python2.7 b/pkg/docker/Dockerfile.python2.7 index eb599f95..e9866349 100644 --- a/pkg/docker/Dockerfile.python2.7 +++ b/pkg/docker/Dockerfile.python2.7 @@ -2,7 +2,7 @@ FROM debian:stretch-slim LABEL maintainer="NGINX Docker Maintainers <docker-maint@nginx.com>" -ENV UNIT_VERSION 1.12.0-1~stretch +ENV UNIT_VERSION 1.13.0-1~stretch RUN set -x \ && apt-get update \ diff --git a/pkg/docker/Dockerfile.python3.5 b/pkg/docker/Dockerfile.python3.5 index 362fb3ed..831bc54a 100644 --- a/pkg/docker/Dockerfile.python3.5 +++ b/pkg/docker/Dockerfile.python3.5 @@ -2,7 +2,7 @@ FROM debian:stretch-slim LABEL maintainer="NGINX Docker Maintainers <docker-maint@nginx.com>" -ENV UNIT_VERSION 1.12.0-1~stretch +ENV UNIT_VERSION 1.13.0-1~stretch RUN set -x \ && apt-get update \ diff --git a/pkg/docker/Dockerfile.ruby2.3 b/pkg/docker/Dockerfile.ruby2.3 index cc2e8637..181b2525 100644 --- a/pkg/docker/Dockerfile.ruby2.3 +++ b/pkg/docker/Dockerfile.ruby2.3 @@ -2,7 +2,7 @@ FROM debian:stretch-slim LABEL maintainer="NGINX Docker Maintainers <docker-maint@nginx.com>" -ENV UNIT_VERSION 1.12.0-1~stretch +ENV UNIT_VERSION 1.13.0-1~stretch RUN set -x \ && apt-get update \ diff --git a/src/go/unit/response.go b/src/go/unit/response.go index bb326ea5..767d66b7 100644 --- a/src/go/unit/response.go +++ b/src/go/unit/response.go @@ -63,7 +63,7 @@ func (r *response) WriteHeader(code int) { for k, vv := range r.header { for _, v := range vv { fields++ - fields_size += len(k) + len(v) + 2 + fields_size += len(k) + len(v) } } diff --git a/src/nodejs/unit-http/unit.cpp b/src/nodejs/unit-http/unit.cpp index ac10024c..10875703 100644 --- a/src/nodejs/unit-http/unit.cpp +++ b/src/nodejs/unit-http/unit.cpp @@ -629,9 +629,6 @@ Unit::response_send_headers(napi_env env, napi_callback_info info) keys_count = napi.get_value_uint32(argv[2]); header_len = napi.get_value_uint32(argv[3]); - /* Need to reserve extra byte for C-string 0-termination. */ - header_len++; - headers = argv[1]; ret = nxt_unit_response_init(req, status_code, keys_count, header_len); @@ -640,6 +637,12 @@ Unit::response_send_headers(napi_env env, napi_callback_info info) return nullptr; } + /* + * Each name and value are 0-terminated by libunit. + * Need to add extra 2 bytes for each header. + */ + header_len += keys_count * 2; + keys = napi.get_property_names(headers); keys_len = napi.get_array_length(keys); @@ -656,8 +659,8 @@ Unit::response_send_headers(napi_env env, napi_callback_info info) name_len = napi.get_value_string_latin1(name, ptr, header_len); name_ptr = ptr; - ptr += name_len; - header_len -= name_len; + ptr += name_len + 1; + header_len -= name_len + 1; hash = nxt_unit_field_hash(name_ptr, name_len); @@ -689,8 +692,8 @@ Unit::response_send_headers(napi_env env, napi_callback_info info) nxt_unit_sptr_set(&f->value, ptr); f->value_length = (uint32_t) value_len; - ptr += value_len; - header_len -= value_len; + ptr += value_len + 1; + header_len -= value_len + 1; req->response->fields_count++; } @@ -715,8 +718,8 @@ Unit::response_send_headers(napi_env env, napi_callback_info info) nxt_unit_sptr_set(&f->value, ptr); f->value_length = (uint32_t) value_len; - ptr += value_len; - header_len -= value_len; + ptr += value_len + 1; + header_len -= value_len + 1; req->response->fields_count++; } diff --git a/src/nxt_application.c b/src/nxt_application.c index 468bc627..bebe3907 100644 --- a/src/nxt_application.c +++ b/src/nxt_application.c @@ -327,6 +327,9 @@ nxt_app_start(nxt_task_t *task, void *data) lang->version, lang->file); nxt_app = nxt_app_module_load(task, lang->file); + if (nxt_slow_path(nxt_app == NULL)) { + return NXT_ERROR; + } } if (nxt_app->pre_init != NULL) { diff --git a/src/nxt_buf.c b/src/nxt_buf.c index 91846e4d..83be0fac 100644 --- a/src/nxt_buf.c +++ b/src/nxt_buf.c @@ -183,7 +183,10 @@ nxt_buf_chain_length(nxt_buf_t *b) length = 0; while (b != NULL) { - length += b->mem.free - b->mem.pos; + if (!nxt_buf_is_sync(b)) { + length += b->mem.free - b->mem.pos; + } + b = b->next; } @@ -195,7 +198,7 @@ static void nxt_buf_completion(nxt_task_t *task, void *obj, void *data) { nxt_mp_t *mp; - nxt_buf_t *b, *parent; + nxt_buf_t *b, *next, *parent; b = obj; parent = data; @@ -204,9 +207,23 @@ nxt_buf_completion(nxt_task_t *task, void *obj, void *data) nxt_assert(data == b->parent); - mp = b->data; - nxt_mp_free(mp, b); + do { + next = b->next; + parent = b->parent; + mp = b->data; + + nxt_mp_free(mp, b); + nxt_buf_parent_completion(task, parent); + + b = next; + } while (b != NULL); +} + + +void +nxt_buf_parent_completion(nxt_task_t *task, nxt_buf_t *parent) +{ if (parent != NULL) { nxt_debug(task, "parent retain:%uD", parent->retain); @@ -255,7 +272,7 @@ static void nxt_buf_ts_completion(nxt_task_t *task, void *obj, void *data) { nxt_mp_t *mp; - nxt_buf_t *b, *parent; + nxt_buf_t *b, *next, *parent; b = obj; parent = data; @@ -268,21 +285,18 @@ nxt_buf_ts_completion(nxt_task_t *task, void *obj, void *data) nxt_assert(data == b->parent); - mp = b->data; - nxt_mp_free(mp, b); - nxt_mp_release(mp); + do { + next = b->next; + parent = b->parent; + mp = b->data; - if (parent != NULL) { - nxt_debug(task, "parent retain:%uD", parent->retain); + nxt_mp_free(mp, b); + nxt_mp_release(mp); - parent->retain--; + nxt_buf_parent_completion(task, parent); - if (parent->retain == 0) { - parent->mem.pos = parent->mem.free; - - parent->completion_handler(task, parent, parent->parent); - } - } + b = next; + } while (b != NULL); } diff --git a/src/nxt_buf.h b/src/nxt_buf.h index 9c22d650..25e8499a 100644 --- a/src/nxt_buf.h +++ b/src/nxt_buf.h @@ -77,17 +77,17 @@ struct nxt_buf_s { uint32_t retain; - uint8_t is_file; /* 1 bit */ - - uint16_t is_mmap:1; - uint16_t is_port_mmap:1; - - uint16_t is_sync:1; - uint16_t is_nobuf:1; - uint16_t is_flush:1; - uint16_t is_last:1; - uint16_t is_port_mmap_sent:1; - uint16_t is_ts:1; + uint8_t cache_hint; + + uint8_t is_file:1; + uint8_t is_mmap:1; + uint8_t is_port_mmap:1; + uint8_t is_sync:1; + uint8_t is_nobuf:1; + uint8_t is_flush:1; + uint8_t is_last:1; + uint8_t is_port_mmap_sent:1; + uint8_t is_ts:1; nxt_buf_mem_t mem; @@ -250,6 +250,7 @@ NXT_EXPORT nxt_buf_t *nxt_buf_sync_alloc(nxt_mp_t *mp, nxt_uint_t flags); NXT_EXPORT nxt_int_t nxt_buf_ts_handle(nxt_task_t *task, void *obj, void *data); +NXT_EXPORT void nxt_buf_parent_completion(nxt_task_t *task, nxt_buf_t *parent); NXT_EXPORT nxt_buf_t *nxt_buf_make_plain(nxt_mp_t *mp, nxt_buf_t *src, size_t size); diff --git a/src/nxt_conf.c b/src/nxt_conf.c index 59eddd77..43820d2a 100644 --- a/src/nxt_conf.c +++ b/src/nxt_conf.c @@ -228,6 +228,13 @@ nxt_conf_get_integer(nxt_conf_value_t *value) } +uint8_t +nxt_conf_get_boolean(nxt_conf_value_t *value) +{ + return value->u.boolean; +} + + nxt_uint_t nxt_conf_object_members_count(nxt_conf_value_t *value) { diff --git a/src/nxt_conf.h b/src/nxt_conf.h index 725a6c95..66201fee 100644 --- a/src/nxt_conf.h +++ b/src/nxt_conf.h @@ -115,6 +115,7 @@ NXT_EXPORT void nxt_conf_set_string(nxt_conf_value_t *value, nxt_str_t *str); NXT_EXPORT nxt_int_t nxt_conf_set_string_dup(nxt_conf_value_t *value, nxt_mp_t *mp, nxt_str_t *str); NXT_EXPORT int64_t nxt_conf_get_integer(nxt_conf_value_t *value); +NXT_EXPORT uint8_t nxt_conf_get_boolean(nxt_conf_value_t *value); // FIXME reimplement and reorder functions below nxt_uint_t nxt_conf_object_members_count(nxt_conf_value_t *value); diff --git a/src/nxt_conf_validation.c b/src/nxt_conf_validation.c index c934b10b..105af675 100644 --- a/src/nxt_conf_validation.c +++ b/src/nxt_conf_validation.c @@ -58,8 +58,12 @@ static nxt_int_t nxt_conf_vldt_listener(nxt_conf_validation_t *vldt, static nxt_int_t nxt_conf_vldt_certificate(nxt_conf_validation_t *vldt, nxt_conf_value_t *value, void *data); #endif +static nxt_int_t nxt_conf_vldt_action(nxt_conf_validation_t *vldt, + nxt_conf_value_t *value, void *data); static nxt_int_t nxt_conf_vldt_pass(nxt_conf_validation_t *vldt, nxt_conf_value_t *value, void *data); +static nxt_int_t nxt_conf_vldt_proxy(nxt_conf_validation_t *vldt, + nxt_conf_value_t *value, void *data); static nxt_int_t nxt_conf_vldt_routes(nxt_conf_validation_t *vldt, nxt_conf_value_t *value, void *data); static nxt_int_t nxt_conf_vldt_routes_member(nxt_conf_validation_t *vldt, @@ -101,11 +105,9 @@ static nxt_int_t nxt_conf_vldt_java_classpath(nxt_conf_validation_t *vldt, static nxt_int_t nxt_conf_vldt_java_option(nxt_conf_validation_t *vldt, nxt_conf_value_t *value); -static nxt_int_t -nxt_conf_vldt_isolation(nxt_conf_validation_t *vldt, nxt_conf_value_t *value, - void *data); -static nxt_int_t -nxt_conf_vldt_clone_namespaces(nxt_conf_validation_t *vldt, +static nxt_int_t nxt_conf_vldt_isolation(nxt_conf_validation_t *vldt, + nxt_conf_value_t *value, void *data); +static nxt_int_t nxt_conf_vldt_clone_namespaces(nxt_conf_validation_t *vldt, nxt_conf_value_t *value, void *data); #if (NXT_HAVE_CLONE_NEWUSER) @@ -316,6 +318,11 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_action_members[] = { NULL, NULL }, + { nxt_string("proxy"), + NXT_CONF_VLDT_STRING, + &nxt_conf_vldt_proxy, + NULL }, + NXT_CONF_VLDT_END }; @@ -328,8 +335,8 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_route_members[] = { { nxt_string("action"), NXT_CONF_VLDT_OBJECT, - &nxt_conf_vldt_object, - (void *) &nxt_conf_vldt_action_members }, + &nxt_conf_vldt_action, + NULL }, NXT_CONF_VLDT_END }; @@ -618,7 +625,7 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_java_members[] = { { nxt_string("classpath"), NXT_CONF_VLDT_ARRAY, &nxt_conf_vldt_array_iterator, - (void *) &nxt_conf_vldt_java_classpath}, + (void *) &nxt_conf_vldt_java_classpath }, { nxt_string("webapp"), NXT_CONF_VLDT_STRING, @@ -628,7 +635,7 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_java_members[] = { { nxt_string("options"), NXT_CONF_VLDT_ARRAY, &nxt_conf_vldt_array_iterator, - (void *) &nxt_conf_vldt_java_option}, + (void *) &nxt_conf_vldt_java_option }, { nxt_string("unit_jars"), NXT_CONF_VLDT_STRING, @@ -881,6 +888,37 @@ nxt_conf_vldt_listener(nxt_conf_validation_t *vldt, nxt_str_t *name, static nxt_int_t +nxt_conf_vldt_action(nxt_conf_validation_t *vldt, nxt_conf_value_t *value, + void *data) +{ + nxt_int_t ret; + nxt_conf_value_t *pass_value, *share_value, *proxy_value; + + static nxt_str_t pass_str = nxt_string("pass"); + static nxt_str_t share_str = nxt_string("share"); + static nxt_str_t proxy_str = nxt_string("proxy"); + + ret = nxt_conf_vldt_object(vldt, value, nxt_conf_vldt_action_members); + + if (ret != NXT_OK) { + return ret; + } + + pass_value = nxt_conf_get_object_member(value, &pass_str, NULL); + share_value = nxt_conf_get_object_member(value, &share_str, NULL); + proxy_value = nxt_conf_get_object_member(value, &proxy_str, NULL); + + if (pass_value == NULL && share_value == NULL && proxy_value == NULL) { + return nxt_conf_vldt_error(vldt, "The \"action\" object must have " + "either \"pass\" or \"share\" or " + "\"proxy\" option set."); + } + + return NXT_OK; +} + + +static nxt_int_t nxt_conf_vldt_pass(nxt_conf_validation_t *vldt, nxt_conf_value_t *value, void *data) { @@ -964,6 +1002,30 @@ error: static nxt_int_t +nxt_conf_vldt_proxy(nxt_conf_validation_t *vldt, nxt_conf_value_t *value, + void *data) +{ + nxt_str_t name; + nxt_sockaddr_t *sa; + + nxt_conf_get_string(value, &name); + + if (nxt_str_start(&name, "http://", 7)) { + name.length -= 7; + name.start += 7; + + sa = nxt_sockaddr_parse(vldt->pool, &name); + if (sa != NULL) { + return NXT_OK; + } + } + + return nxt_conf_vldt_error(vldt, "The \"proxy\" address is invalid \"%V\"", + &name); +} + + +static nxt_int_t nxt_conf_vldt_routes(nxt_conf_validation_t *vldt, nxt_conf_value_t *value, void *data) { @@ -1525,8 +1587,8 @@ nxt_conf_vldt_environment(nxt_conf_validation_t *vldt, nxt_str_t *name, static nxt_int_t -nxt_conf_vldt_clone_namespaces(nxt_conf_validation_t *vldt, nxt_conf_value_t *value, - void *data) +nxt_conf_vldt_clone_namespaces(nxt_conf_validation_t *vldt, + nxt_conf_value_t *value, void *data) { return nxt_conf_vldt_object(vldt, value, data); } @@ -1691,7 +1753,8 @@ nxt_conf_vldt_php_option(nxt_conf_validation_t *vldt, nxt_str_t *name, static nxt_int_t -nxt_conf_vldt_java_classpath(nxt_conf_validation_t *vldt, nxt_conf_value_t *value) +nxt_conf_vldt_java_classpath(nxt_conf_validation_t *vldt, + nxt_conf_value_t *value) { nxt_str_t str; diff --git a/src/nxt_conn.h b/src/nxt_conn.h index 7284808b..2c1d49a0 100644 --- a/src/nxt_conn.h +++ b/src/nxt_conn.h @@ -8,7 +8,7 @@ #define _NXT_CONN_H_INCLUDED_ -typedef ssize_t (*nxt_conn_io_read_t)(nxt_conn_t *c); +typedef ssize_t (*nxt_conn_io_read_t)(nxt_task_t *task, nxt_conn_t *c); typedef nxt_msec_t (*nxt_conn_timer_value_t)(nxt_conn_t *c, uintptr_t data); diff --git a/src/nxt_conn_connect.c b/src/nxt_conn_connect.c index 12b6c80c..d045853f 100644 --- a/src/nxt_conn_connect.c +++ b/src/nxt_conn_connect.c @@ -7,6 +7,9 @@ #include <nxt_main.h> +static nxt_err_t nxt_conn_connect_test_error(nxt_task_t *task, nxt_conn_t *c); + + void nxt_conn_sys_socket(nxt_task_t *task, void *obj, void *data) { @@ -49,7 +52,7 @@ nxt_conn_io_connect(nxt_task_t *task, void *obj, void *data) case NXT_AGAIN: c->socket.write_handler = nxt_conn_connect_test; - c->socket.error_handler = state->error_handler; + c->socket.error_handler = nxt_conn_connect_error; engine = task->thread->engine; @@ -118,8 +121,7 @@ nxt_conn_socket(nxt_task_t *task, nxt_conn_t *c) void nxt_conn_connect_test(nxt_task_t *task, void *obj, void *data) { - int ret, err; - socklen_t len; + nxt_err_t err; nxt_conn_t *c; c = obj; @@ -132,48 +134,35 @@ nxt_conn_connect_test(nxt_task_t *task, void *obj, void *data) nxt_timer_disable(task->thread->engine, &c->write_timer); } - err = 0; - len = sizeof(int); - - /* - * Linux and BSDs return 0 and store a pending error in the err argument; - * Solaris returns -1 and sets the errno. - */ - - ret = getsockopt(c->socket.fd, SOL_SOCKET, SO_ERROR, (void *) &err, &len); - - if (nxt_slow_path(ret == -1)) { - err = nxt_errno; - } + err = nxt_conn_connect_test_error(task, c); if (err == 0) { nxt_work_queue_add(c->write_work_queue, c->write_state->ready_handler, task, c, data); - return; + } else { + nxt_conn_connect_error(task, c, data); } - - c->socket.error = err; - - nxt_log(task, nxt_socket_error_level(err), "connect(%d, %*s) failed %E", - c->socket.fd, (size_t) c->remote->length, - nxt_sockaddr_start(c->remote), err); - - nxt_conn_connect_error(task, c, data); } void nxt_conn_connect_error(nxt_task_t *task, void *obj, void *data) { + nxt_err_t err; nxt_conn_t *c; nxt_work_handler_t handler; const nxt_conn_state_t *state; c = obj; + err = c->socket.error; + + if (err == 0) { + err = nxt_conn_connect_test_error(task, c); + } state = c->write_state; - switch (c->socket.error) { + switch (err) { case NXT_ECONNREFUSED: #if (NXT_LINUX) @@ -193,3 +182,22 @@ nxt_conn_connect_error(nxt_task_t *task, void *obj, void *data) nxt_work_queue_add(c->write_work_queue, handler, task, c, data); } + + +static nxt_err_t +nxt_conn_connect_test_error(nxt_task_t *task, nxt_conn_t *c) +{ + nxt_err_t err; + + err = nxt_socket_error(c->socket.fd); + + if (err != 0) { + c->socket.error = err; + + nxt_log(task, nxt_socket_error_level(err), "connect(%d, %*s) failed %E", + c->socket.fd, (size_t) c->remote->length, + nxt_sockaddr_start(c->remote), err); + } + + return err; +} diff --git a/src/nxt_conn_read.c b/src/nxt_conn_read.c index 83969b31..3285abcd 100644 --- a/src/nxt_conn_read.c +++ b/src/nxt_conn_read.c @@ -69,7 +69,7 @@ nxt_conn_io_read(nxt_task_t *task, void *obj, void *data) n = c->io->recvbuf(c, c->read); } else { - n = state->io_read_handler(c); + n = state->io_read_handler(task, c); /* The state can be changed by io_read_handler. */ state = c->read_state; } diff --git a/src/nxt_epoll_engine.c b/src/nxt_epoll_engine.c index 9cdaab9b..a944834e 100644 --- a/src/nxt_epoll_engine.c +++ b/src/nxt_epoll_engine.c @@ -944,12 +944,12 @@ nxt_epoll_poll(nxt_event_engine_t *engine, nxt_msec_t timeout) nxt_work_queue_add(ev->read_work_queue, ev->read_handler, ev->task, ev, ev->data); + error = 0; + } else if (engine->u.epoll.mode == 0) { /* Level-triggered mode. */ nxt_epoll_disable_read(engine, ev); } - - error = 0; } if ((events & EPOLLOUT) != 0) { @@ -964,12 +964,12 @@ nxt_epoll_poll(nxt_event_engine_t *engine, nxt_msec_t timeout) nxt_work_queue_add(ev->write_work_queue, ev->write_handler, ev->task, ev, ev->data); + error = 0; + } else if (engine->u.epoll.mode == 0) { /* Level-triggered mode. */ nxt_epoll_disable_write(engine, ev); } - - error = 0; } if (!error) { diff --git a/src/nxt_event_engine.c b/src/nxt_event_engine.c index 31a35f6d..6f051067 100644 --- a/src/nxt_event_engine.c +++ b/src/nxt_event_engine.c @@ -556,19 +556,19 @@ nxt_event_engine_start(nxt_event_engine_t *engine) void * -nxt_event_engine_mem_alloc(nxt_event_engine_t *engine, uint8_t *slot, +nxt_event_engine_mem_alloc(nxt_event_engine_t *engine, uint8_t *hint, size_t size) { - uint8_t n; + uint32_t n; nxt_uint_t items; nxt_array_t *mem_cache; nxt_mem_cache_t *cache; nxt_mem_cache_block_t *block; mem_cache = engine->mem_cache; - n = *slot; + n = *hint; - if (n == (uint8_t) -1) { + if (n == NXT_EVENT_ENGINE_NO_MEM_HINT) { if (mem_cache == NULL) { /* IPv4 nxt_sockaddr_t and HTTP/1 and HTTP/2 buffers. */ @@ -607,7 +607,9 @@ nxt_event_engine_mem_alloc(nxt_event_engine_t *engine, uint8_t *slot, found: - *slot = n; + if (n < NXT_EVENT_ENGINE_NO_MEM_HINT) { + *hint = (uint8_t) n; + } } cache = mem_cache->elts; @@ -626,15 +628,39 @@ nxt_event_engine_mem_alloc(nxt_event_engine_t *engine, uint8_t *slot, void -nxt_event_engine_mem_free(nxt_event_engine_t *engine, uint8_t *slot, void *p) +nxt_event_engine_mem_free(nxt_event_engine_t *engine, uint8_t hint, void *p, + size_t size) { + uint32_t n; + nxt_array_t *mem_cache; nxt_mem_cache_t *cache; nxt_mem_cache_block_t *block; block = p; + mem_cache = engine->mem_cache; + cache = mem_cache->elts; + + n = hint; + + if (nxt_slow_path(n == NXT_EVENT_ENGINE_NO_MEM_HINT)) { - cache = engine->mem_cache->elts; - cache = cache + *slot; + if (size != 0) { + for (n = 0; n < mem_cache->nelts; n++) { + if (cache[n].size == size) { + goto found; + } + } + + nxt_alert(&engine->task, + "event engine mem free(%p, %z) not found", p, size); + } + + goto done; + } + +found: + + cache = cache + n; if (cache->count < 16) { cache->count++; @@ -644,10 +670,79 @@ nxt_event_engine_mem_free(nxt_event_engine_t *engine, uint8_t *slot, void *p) return; } +done: + nxt_mp_free(engine->mem_pool, p); } +void * +nxt_event_engine_buf_mem_alloc(nxt_event_engine_t *engine, size_t size) +{ + nxt_buf_t *b; + uint8_t hint; + + hint = NXT_EVENT_ENGINE_NO_MEM_HINT; + + b = nxt_event_engine_mem_alloc(engine, &hint, NXT_BUF_MEM_SIZE + size); + if (nxt_slow_path(b == NULL)) { + return NULL; + } + + nxt_memzero(b, NXT_BUF_MEM_SIZE); + + b->cache_hint = hint; + b->data = engine; + b->completion_handler = nxt_event_engine_buf_mem_completion; + + if (size != 0) { + b->mem.start = nxt_pointer_to(b, NXT_BUF_MEM_SIZE); + b->mem.pos = b->mem.start; + b->mem.free = b->mem.start; + b->mem.end = b->mem.start + size; + } + + return b; +} + + +void +nxt_event_engine_buf_mem_free(nxt_event_engine_t *engine, nxt_buf_t *b) +{ + size_t size; + + size = NXT_BUF_MEM_SIZE + nxt_buf_mem_size(&b->mem); + + nxt_event_engine_mem_free(engine, b->cache_hint, b, size); +} + + +void +nxt_event_engine_buf_mem_completion(nxt_task_t *task, void *obj, void *data) +{ + nxt_event_engine_t *engine; + nxt_buf_t *b, *next, *parent; + + b = obj; + parent = data; + + nxt_debug(task, "buf completion: %p %p", b, b->mem.start); + + engine = b->data; + + do { + next = b->next; + parent = b->parent; + + nxt_event_engine_buf_mem_free(engine, b); + + nxt_buf_parent_completion(task, parent); + + b = next; + } while (b != NULL); +} + + #if (NXT_DEBUG) void nxt_event_engine_thread_adopt(nxt_event_engine_t *engine) diff --git a/src/nxt_event_engine.h b/src/nxt_event_engine.h index 365d9e89..6b05d510 100644 --- a/src/nxt_event_engine.h +++ b/src/nxt_event_engine.h @@ -514,10 +514,16 @@ NXT_EXPORT void nxt_event_engine_post(nxt_event_engine_t *engine, NXT_EXPORT void nxt_event_engine_signal(nxt_event_engine_t *engine, nxt_uint_t signo); -void *nxt_event_engine_mem_alloc(nxt_event_engine_t *engine, uint8_t *slot, +#define NXT_EVENT_ENGINE_NO_MEM_HINT 255 + +void *nxt_event_engine_mem_alloc(nxt_event_engine_t *engine, uint8_t *hint, size_t size); -void nxt_event_engine_mem_free(nxt_event_engine_t *engine, uint8_t *slot, - void *p); +void nxt_event_engine_mem_free(nxt_event_engine_t *engine, uint8_t hint, + void *p, size_t size); +void *nxt_event_engine_buf_mem_alloc(nxt_event_engine_t *engine, size_t size); +void nxt_event_engine_buf_mem_free(nxt_event_engine_t *engine, nxt_buf_t *b); +void nxt_event_engine_buf_mem_completion(nxt_task_t *task, void *obj, + void *data); nxt_inline nxt_event_engine_t * diff --git a/src/nxt_h1proto.c b/src/nxt_h1proto.c index 541fcb44..b07eaf84 100644 --- a/src/nxt_h1proto.c +++ b/src/nxt_h1proto.c @@ -18,10 +18,10 @@ */ #if (NXT_TLS) -static ssize_t nxt_http_idle_io_read_handler(nxt_conn_t *c); +static ssize_t nxt_http_idle_io_read_handler(nxt_task_t *task, nxt_conn_t *c); static void nxt_http_conn_test(nxt_task_t *task, void *obj, void *data); #endif -static ssize_t nxt_h1p_idle_io_read_handler(nxt_conn_t *c); +static ssize_t nxt_h1p_idle_io_read_handler(nxt_task_t *task, nxt_conn_t *c); static void nxt_h1p_conn_proto_init(nxt_task_t *task, void *obj, void *data); static void nxt_h1p_conn_request_init(nxt_task_t *task, void *obj, void *data); static void nxt_h1p_conn_request_header_parse(nxt_task_t *task, void *obj, @@ -45,7 +45,7 @@ static void nxt_h1p_conn_request_body_read(nxt_task_t *task, void *obj, void *data); static void nxt_h1p_request_local_addr(nxt_task_t *task, nxt_http_request_t *r); static void nxt_h1p_request_header_send(nxt_task_t *task, - nxt_http_request_t *r, nxt_work_handler_t body_handler); + nxt_http_request_t *r, nxt_work_handler_t body_handler, void *data); static void nxt_h1p_request_send(nxt_task_t *task, nxt_http_request_t *r, nxt_buf_t *out); static nxt_buf_t *nxt_h1p_chunk_create(nxt_task_t *task, nxt_http_request_t *r, @@ -78,11 +78,32 @@ static void nxt_h1p_idle_response_timeout(nxt_task_t *task, void *obj, static nxt_msec_t nxt_h1p_idle_response_timer_value(nxt_conn_t *c, uintptr_t data); static void nxt_h1p_shutdown(nxt_task_t *task, nxt_conn_t *c); -static void nxt_h1p_shutdown_(nxt_task_t *task, nxt_conn_t *c); +static void nxt_h1p_closing(nxt_task_t *task, nxt_conn_t *c); static void nxt_h1p_conn_ws_shutdown(nxt_task_t *task, void *obj, void *data); static void nxt_h1p_conn_closing(nxt_task_t *task, void *obj, void *data); static void nxt_h1p_conn_free(nxt_task_t *task, void *obj, void *data); +static void nxt_h1p_peer_connect(nxt_task_t *task, nxt_http_peer_t *peer); +static void nxt_h1p_peer_connected(nxt_task_t *task, void *obj, void *data); +static void nxt_h1p_peer_refused(nxt_task_t *task, void *obj, void *data); +static void nxt_h1p_peer_header_send(nxt_task_t *task, nxt_http_peer_t *peer); +static void nxt_h1p_peer_header_sent(nxt_task_t *task, void *obj, void *data); +static void nxt_h1p_peer_header_read(nxt_task_t *task, nxt_http_peer_t *peer); +static ssize_t nxt_h1p_peer_io_read_handler(nxt_task_t *task, nxt_conn_t *c); +static void nxt_h1p_peer_header_read_done(nxt_task_t *task, void *obj, + void *data); +static nxt_int_t nxt_h1p_peer_header_parse(nxt_http_peer_t *peer, + nxt_buf_mem_t *bm); +static void nxt_h1p_peer_read(nxt_task_t *task, nxt_http_peer_t *peer); +static void nxt_h1p_peer_read_done(nxt_task_t *task, void *obj, void *data); +static void nxt_h1p_peer_closed(nxt_task_t *task, void *obj, void *data); +static void nxt_h1p_peer_error(nxt_task_t *task, void *obj, void *data); +static void nxt_h1p_peer_send_timeout(nxt_task_t *task, void *obj, void *data); +static void nxt_h1p_peer_read_timeout(nxt_task_t *task, void *obj, void *data); +static nxt_msec_t nxt_h1p_peer_timer_value(nxt_conn_t *c, uintptr_t data); +static void nxt_h1p_peer_close(nxt_task_t *task, nxt_http_peer_t *peer); +static void nxt_h1p_peer_free(nxt_task_t *task, void *obj, void *data); + #if (NXT_TLS) static const nxt_conn_state_t nxt_http_idle_state; static const nxt_conn_state_t nxt_h1p_shutdown_state; @@ -94,6 +115,13 @@ static const nxt_conn_state_t nxt_h1p_request_send_state; static const nxt_conn_state_t nxt_h1p_timeout_response_state; static const nxt_conn_state_t nxt_h1p_keepalive_state; static const nxt_conn_state_t nxt_h1p_close_state; +static const nxt_conn_state_t nxt_h1p_peer_connect_state; +static const nxt_conn_state_t nxt_h1p_peer_header_send_state; +static const nxt_conn_state_t nxt_h1p_peer_header_body_send_state; +static const nxt_conn_state_t nxt_h1p_peer_header_read_state; +static const nxt_conn_state_t nxt_h1p_peer_header_read_timer_state; +static const nxt_conn_state_t nxt_h1p_peer_read_state; +static const nxt_conn_state_t nxt_h1p_peer_close_state; const nxt_http_proto_table_t nxt_http_proto[3] = { @@ -106,6 +134,13 @@ const nxt_http_proto_table_t nxt_http_proto[3] = { .body_bytes_sent = nxt_h1p_request_body_bytes_sent, .discard = nxt_h1p_request_discard, .close = nxt_h1p_request_close, + + .peer_connect = nxt_h1p_peer_connect, + .peer_header_send = nxt_h1p_peer_header_send, + .peer_header_read = nxt_h1p_peer_header_read, + .peer_read = nxt_h1p_peer_read, + .peer_close = nxt_h1p_peer_close, + .ws_frame_start = nxt_h1p_websocket_frame_start, }, /* NXT_HTTP_PROTO_H2 */ @@ -113,9 +148,9 @@ const nxt_http_proto_table_t nxt_http_proto[3] = { }; -static nxt_lvlhsh_t nxt_h1p_fields_hash; +static nxt_lvlhsh_t nxt_h1p_fields_hash; -static nxt_http_field_proc_t nxt_h1p_fields[] = { +static nxt_http_field_proc_t nxt_h1p_fields[] = { { nxt_string("Connection"), &nxt_h1p_connection, 0 }, { nxt_string("Upgrade"), &nxt_h1p_upgrade, 0 }, { nxt_string("Sec-WebSocket-Key"), &nxt_h1p_websocket_key, 0 }, @@ -136,11 +171,32 @@ static nxt_http_field_proc_t nxt_h1p_fields[] = { }; +static nxt_lvlhsh_t nxt_h1p_peer_fields_hash; + +static nxt_http_field_proc_t nxt_h1p_peer_fields[] = { + { nxt_string("Connection"), &nxt_http_proxy_skip, 0 }, + { nxt_string("Transfer-Encoding"), &nxt_http_proxy_skip, 0 }, + { nxt_string("Server"), &nxt_http_proxy_skip, 0 }, + { nxt_string("Date"), &nxt_http_proxy_date, 0 }, + { nxt_string("Content-Length"), &nxt_http_proxy_content_length, 0 }, +}; + + nxt_int_t nxt_h1p_init(nxt_task_t *task, nxt_runtime_t *rt) { - return nxt_http_fields_hash(&nxt_h1p_fields_hash, rt->mem_pool, - nxt_h1p_fields, nxt_nitems(nxt_h1p_fields)); + nxt_int_t ret; + + ret = nxt_http_fields_hash(&nxt_h1p_fields_hash, rt->mem_pool, + nxt_h1p_fields, nxt_nitems(nxt_h1p_fields)); + + if (nxt_fast_path(ret == NXT_OK)) { + ret = nxt_http_fields_hash(&nxt_h1p_peer_fields_hash, + rt->mem_pool, nxt_h1p_peer_fields, + nxt_nitems(nxt_h1p_peer_fields)); + } + + return ret; } @@ -196,7 +252,7 @@ static const nxt_conn_state_t nxt_http_idle_state static ssize_t -nxt_http_idle_io_read_handler(nxt_conn_t *c) +nxt_http_idle_io_read_handler(nxt_task_t *task, nxt_conn_t *c) { size_t size; ssize_t n; @@ -216,7 +272,7 @@ nxt_http_idle_io_read_handler(nxt_conn_t *c) size = joint->socket_conf->header_buffer_size; - b = nxt_buf_mem_alloc(c->mem_pool, size, 0); + b = nxt_event_engine_buf_mem_alloc(task->thread->engine, size); if (nxt_slow_path(b == NULL)) { c->socket.error = NXT_ENOMEM; return NXT_ERROR; @@ -234,7 +290,7 @@ nxt_http_idle_io_read_handler(nxt_conn_t *c) } else { c->read = NULL; - nxt_mp_free(c->mem_pool, b); + nxt_event_engine_buf_mem_free(task->thread->engine, b); } return n; @@ -248,12 +304,14 @@ nxt_http_conn_test(nxt_task_t *task, void *obj, void *data) nxt_buf_t *b; nxt_conn_t *c; nxt_tls_conf_t *tls; + nxt_event_engine_t *engine; nxt_socket_conf_joint_t *joint; c = obj; nxt_debug(task, "h1p conn https test"); + engine = task->thread->engine; b = c->read; p = b->mem.pos; @@ -262,7 +320,7 @@ nxt_http_conn_test(nxt_task_t *task, void *obj, void *data) if (p[0] != 0x16) { b->mem.free = b->mem.pos; - nxt_conn_read(task->thread->engine, c); + nxt_conn_read(engine, c); return; } @@ -292,7 +350,7 @@ nxt_http_conn_test(nxt_task_t *task, void *obj, void *data) #endif c->read = NULL; - nxt_mp_free(c->mem_pool, b); + nxt_event_engine_buf_mem_free(engine, b); joint = c->listen->socket.data; @@ -301,7 +359,7 @@ nxt_http_conn_test(nxt_task_t *task, void *obj, void *data) * Listening socket had been closed while * connection was in keep-alive state. */ - nxt_h1p_shutdown(task, c); + nxt_h1p_closing(task, c); return; } @@ -330,7 +388,7 @@ static const nxt_conn_state_t nxt_h1p_idle_state static ssize_t -nxt_h1p_idle_io_read_handler(nxt_conn_t *c) +nxt_h1p_idle_io_read_handler(nxt_task_t *task, nxt_conn_t *c) { size_t size; ssize_t n; @@ -353,7 +411,7 @@ nxt_h1p_idle_io_read_handler(nxt_conn_t *c) if (b == NULL) { size = joint->socket_conf->header_buffer_size; - b = nxt_buf_mem_alloc(c->mem_pool, size, 0); + b = nxt_event_engine_buf_mem_alloc(task->thread->engine, size); if (nxt_slow_path(b == NULL)) { c->socket.error = NXT_ENOMEM; return NXT_ERROR; @@ -367,7 +425,7 @@ nxt_h1p_idle_io_read_handler(nxt_conn_t *c) } else { c->read = NULL; - nxt_mp_free(c->mem_pool, b); + nxt_event_engine_buf_mem_free(task->thread->engine, b); } return n; @@ -386,7 +444,7 @@ nxt_h1p_conn_proto_init(nxt_task_t *task, void *obj, void *data) h1p = nxt_mp_zget(c->mem_pool, sizeof(nxt_h1proto_t)); if (nxt_slow_path(h1p == NULL)) { - nxt_h1p_shutdown(task, c); + nxt_h1p_closing(task, c); return; } @@ -424,6 +482,12 @@ nxt_h1p_conn_request_init(nxt_task_t *task, void *obj, void *data) r->tls = c->u.tls; #endif + r->task = c->task; + task = &r->task; + c->socket.task = task; + c->read_timer.task = task; + c->write_timer.task = task; + ret = nxt_http_parse_request_init(&h1p->parser, r->mem_pool); if (nxt_fast_path(ret == NXT_OK)) { @@ -444,7 +508,7 @@ nxt_h1p_conn_request_init(nxt_task_t *task, void *obj, void *data) nxt_mp_release(r->mem_pool); } - nxt_h1p_shutdown(task, c); + nxt_h1p_closing(task, c); } @@ -599,13 +663,15 @@ nxt_h1p_header_process(nxt_task_t *task, nxt_h1proto_t *h1p, } if (nxt_slow_path(h1p->websocket_key == NULL)) { - nxt_log(task, NXT_LOG_INFO, "h1p upgrade: bad or absent websocket key"); + nxt_log(task, NXT_LOG_INFO, + "h1p upgrade: bad or absent websocket key"); return NXT_HTTP_BAD_REQUEST; } if (nxt_slow_path(h1p->websocket_version_ok == 0)) { - nxt_log(task, NXT_LOG_INFO, "h1p upgrade: bad or absent websocket version"); + nxt_log(task, NXT_LOG_INFO, + "h1p upgrade: bad or absent websocket version"); return NXT_HTTP_UPGRADE_REQUIRED; } @@ -655,16 +721,16 @@ nxt_h1p_header_buffer_test(nxt_task_t *task, nxt_h1proto_t *h1p, nxt_conn_t *c, static nxt_int_t nxt_h1p_connection(void *ctx, nxt_http_field_t *field, uintptr_t data) { - nxt_http_request_t *r; - static const u_char *upgrade = (const u_char *) "upgrade"; + nxt_http_request_t *r; r = ctx; + field->hopbyhop = 1; if (field->value_length == 5 && nxt_memcmp(field->value, "close", 5) == 0) { r->proto.h1->keepalive = 0; } else if (field->value_length == 7 - && nxt_memcasecmp(field->value, upgrade, 7) == 0) + && nxt_memcasecmp(field->value, "upgrade", 7) == 0) { r->proto.h1->connection_upgrade = 1; } @@ -676,13 +742,12 @@ nxt_h1p_connection(void *ctx, nxt_http_field_t *field, uintptr_t data) static nxt_int_t nxt_h1p_upgrade(void *ctx, nxt_http_field_t *field, uintptr_t data) { - nxt_http_request_t *r; - static const u_char *websocket = (const u_char *) "websocket"; + nxt_http_request_t *r; r = ctx; if (field->value_length == 9 - && nxt_memcasecmp(field->value, websocket, 9) == 0) + && nxt_memcasecmp(field->value, "websocket", 9) == 0) { r->proto.h1->upgrade_websocket = 1; } @@ -730,6 +795,8 @@ nxt_h1p_transfer_encoding(void *ctx, nxt_http_field_t *field, uintptr_t data) nxt_http_request_t *r; r = ctx; + field->skip = 1; + field->hopbyhop = 1; if (field->value_length == 7 && nxt_memcmp(field->value, "chunked", 7) == 0) @@ -995,7 +1062,7 @@ static const nxt_str_t nxt_http_server_error[] = { static void nxt_h1p_request_header_send(nxt_task_t *task, nxt_http_request_t *r, - nxt_work_handler_t body_handler) + nxt_work_handler_t body_handler, void *data) { u_char *p; size_t size; @@ -1172,7 +1239,7 @@ nxt_h1p_request_header_send(nxt_task_t *task, nxt_http_request_t *r, * in engine->write_work_queue. */ nxt_work_queue_add(&task->thread->engine->fast_work_queue, - body_handler, task, r, NULL); + body_handler, task, r, data); } else { header->next = nxt_http_buf_last(r); @@ -1211,6 +1278,7 @@ nxt_h1p_complete_buffers(nxt_task_t *task, nxt_h1proto_t *h1p) while (b != NULL) { next = b->next; + b->next = NULL; nxt_work_queue_add(&task->thread->engine->fast_work_queue, b->completion_handler, task, b, b->parent); @@ -1226,7 +1294,8 @@ nxt_h1p_complete_buffers(nxt_task_t *task, nxt_h1proto_t *h1p) size = nxt_buf_mem_used_size(&in->mem); if (size == 0) { - nxt_mp_free(c->mem_pool, in); + nxt_work_queue_add(&task->thread->engine->fast_work_queue, + in->completion_handler, task, in, in->parent); c->read = NULL; } @@ -1480,11 +1549,16 @@ nxt_h1p_request_close(nxt_task_t *task, nxt_http_proto_t proto, nxt_debug(task, "h1p request close"); h1p = proto.h1; + h1p->keepalive &= !h1p->request->inconsistent; h1p->request = NULL; nxt_router_conf_release(task, joint); c = h1p->conn; + task = &c->task; + c->socket.task = task; + c->read_timer.task = task; + c->write_timer.task = task; if (h1p->keepalive) { nxt_h1p_keepalive(task, h1p, c); @@ -1770,14 +1844,31 @@ nxt_h1p_shutdown(nxt_task_t *task, nxt_conn_t *c) } } else { - nxt_h1p_shutdown_(task, c); + nxt_h1p_closing(task, c); } } static void -nxt_h1p_shutdown_(nxt_task_t *task, nxt_conn_t *c) +nxt_h1p_conn_ws_shutdown(nxt_task_t *task, void *obj, void *data) { + nxt_timer_t *timer; + nxt_h1p_websocket_timer_t *ws_timer; + + nxt_debug(task, "h1p conn ws shutdown"); + + timer = obj; + ws_timer = nxt_timer_data(timer, nxt_h1p_websocket_timer_t, timer); + + nxt_h1p_closing(task, ws_timer->h1p->conn); +} + + +static void +nxt_h1p_closing(nxt_task_t *task, nxt_conn_t *c) +{ + nxt_debug(task, "h1p closing"); + c->socket.data = NULL; #if (NXT_TLS) @@ -1809,21 +1900,6 @@ static const nxt_conn_state_t nxt_h1p_shutdown_state static void -nxt_h1p_conn_ws_shutdown(nxt_task_t *task, void *obj, void *data) -{ - nxt_timer_t *timer; - nxt_h1p_websocket_timer_t *ws_timer; - - nxt_debug(task, "h1p conn ws shutdown"); - - timer = obj; - ws_timer = nxt_timer_data(timer, nxt_h1p_websocket_timer_t, timer); - - nxt_h1p_shutdown_(task, ws_timer->h1p->conn); -} - - -static void nxt_h1p_conn_closing(nxt_task_t *task, void *obj, void *data) { nxt_conn_t *c; @@ -1868,3 +1944,673 @@ nxt_h1p_conn_free(nxt_task_t *task, void *obj, void *data) nxt_router_listen_event_release(&engine->task, lev, NULL); } + + +static void +nxt_h1p_peer_connect(nxt_task_t *task, nxt_http_peer_t *peer) +{ + nxt_mp_t *mp; + nxt_int_t ret; + nxt_conn_t *c, *client; + nxt_h1proto_t *h1p; + nxt_fd_event_t *socket; + nxt_work_queue_t *wq; + nxt_http_request_t *r; + + nxt_debug(task, "h1p peer connect"); + + peer->status = NXT_HTTP_UNSET; + r = peer->request; + + mp = nxt_mp_create(1024, 128, 256, 32); + + if (nxt_slow_path(mp == NULL)) { + goto fail; + } + + h1p = nxt_mp_zalloc(mp, sizeof(nxt_h1proto_t)); + if (nxt_slow_path(h1p == NULL)) { + goto fail; + } + + ret = nxt_http_parse_request_init(&h1p->parser, r->mem_pool); + if (nxt_slow_path(ret != NXT_OK)) { + goto fail; + } + + c = nxt_conn_create(mp, task); + if (nxt_slow_path(c == NULL)) { + goto fail; + } + + c->mem_pool = mp; + h1p->conn = c; + + peer->proto.h1 = h1p; + h1p->request = r; + + c->socket.task = task; + c->read_timer.task = task; + c->write_timer.task = task; + c->socket.data = peer; + c->remote = peer->sockaddr; + + c->socket.write_ready = 1; + c->write_state = &nxt_h1p_peer_connect_state; + + /* + * TODO: queues should be implemented via client proto interface. + */ + client = r->proto.h1->conn; + + socket = &client->socket; + wq = socket->read_work_queue; + c->read_work_queue = wq; + c->socket.read_work_queue = wq; + c->read_timer.work_queue = wq; + + wq = socket->write_work_queue; + c->write_work_queue = wq; + c->socket.write_work_queue = wq; + c->write_timer.work_queue = wq; + /* TODO END */ + + nxt_conn_connect(task->thread->engine, c); + + return; + +fail: + + peer->status = NXT_HTTP_INTERNAL_SERVER_ERROR; + + r->state->error_handler(task, r, peer); +} + + +static const nxt_conn_state_t nxt_h1p_peer_connect_state + nxt_aligned(64) = +{ + .ready_handler = nxt_h1p_peer_connected, + .close_handler = nxt_h1p_peer_refused, + .error_handler = nxt_h1p_peer_error, + + .timer_handler = nxt_h1p_peer_send_timeout, + .timer_value = nxt_h1p_peer_timer_value, + .timer_data = offsetof(nxt_socket_conf_t, proxy_timeout), +}; + + +static void +nxt_h1p_peer_connected(nxt_task_t *task, void *obj, void *data) +{ + nxt_http_peer_t *peer; + nxt_http_request_t *r; + + peer = data; + + nxt_debug(task, "h1p peer connected"); + + r = peer->request; + r->state->ready_handler(task, r, peer); +} + + +static void +nxt_h1p_peer_refused(nxt_task_t *task, void *obj, void *data) +{ + nxt_http_peer_t *peer; + nxt_http_request_t *r; + + peer = data; + + nxt_debug(task, "h1p peer refused"); + + //peer->status = NXT_HTTP_SERVICE_UNAVAILABLE; + peer->status = NXT_HTTP_BAD_GATEWAY; + + r = peer->request; + r->state->error_handler(task, r, peer); +} + + +static void +nxt_h1p_peer_header_send(nxt_task_t *task, nxt_http_peer_t *peer) +{ + u_char *p; + size_t size; + nxt_buf_t *header, *body; + nxt_conn_t *c; + nxt_http_field_t *field; + nxt_http_request_t *r; + + nxt_debug(task, "h1p peer header send"); + + r = peer->request; + + size = r->method->length + sizeof(" ") + r->target.length + + sizeof(" HTTP/1.0\r\n") + + sizeof("\r\n"); + + nxt_list_each(field, r->fields) { + + if (!field->hopbyhop) { + size += field->name_length + field->value_length; + size += nxt_length(": \r\n"); + } + + } nxt_list_loop; + + header = nxt_http_buf_mem(task, r, size); + if (nxt_slow_path(header == NULL)) { + r->state->error_handler(task, r, peer); + return; + } + + p = header->mem.free; + + p = nxt_cpymem(p, r->method->start, r->method->length); + *p++ = ' '; + p = nxt_cpymem(p, r->target.start, r->target.length); + p = nxt_cpymem(p, " HTTP/1.0\r\n", 11); + + nxt_list_each(field, r->fields) { + + if (!field->hopbyhop) { + p = nxt_cpymem(p, field->name, field->name_length); + *p++ = ':'; *p++ = ' '; + p = nxt_cpymem(p, field->value, field->value_length); + *p++ = '\r'; *p++ = '\n'; + } + + } nxt_list_loop; + + *p++ = '\r'; *p++ = '\n'; + header->mem.free = p; + size = p - header->mem.pos; + + c = peer->proto.h1->conn; + c->write = header; + c->write_state = &nxt_h1p_peer_header_send_state; + + if (r->body != NULL) { + body = nxt_buf_mem_alloc(r->mem_pool, 0, 0); + if (nxt_slow_path(body == NULL)) { + r->state->error_handler(task, r, peer); + return; + } + + header->next = body; + + body->mem = r->body->mem; + size += nxt_buf_mem_used_size(&body->mem); + +// nxt_mp_retain(r->mem_pool); + } + + if (size > 16384) { + /* Use proxy_send_timeout instead of proxy_timeout. */ + c->write_state = &nxt_h1p_peer_header_body_send_state; + } + + nxt_conn_write(task->thread->engine, c); +} + + +static const nxt_conn_state_t nxt_h1p_peer_header_send_state + nxt_aligned(64) = +{ + .ready_handler = nxt_h1p_peer_header_sent, + .error_handler = nxt_h1p_peer_error, + + .timer_handler = nxt_h1p_peer_send_timeout, + .timer_value = nxt_h1p_peer_timer_value, + .timer_data = offsetof(nxt_socket_conf_t, proxy_timeout), +}; + + +static const nxt_conn_state_t nxt_h1p_peer_header_body_send_state + nxt_aligned(64) = +{ + .ready_handler = nxt_h1p_peer_header_sent, + .error_handler = nxt_h1p_peer_error, + + .timer_handler = nxt_h1p_peer_send_timeout, + .timer_value = nxt_h1p_peer_timer_value, + .timer_data = offsetof(nxt_socket_conf_t, proxy_send_timeout), + .timer_autoreset = 1, +}; + + +static void +nxt_h1p_peer_header_sent(nxt_task_t *task, void *obj, void *data) +{ + nxt_conn_t *c; + nxt_http_peer_t *peer; + nxt_http_request_t *r; + nxt_event_engine_t *engine; + + c = obj; + peer = data; + + nxt_debug(task, "h1p peer header sent"); + + engine = task->thread->engine; + + c->write = nxt_sendbuf_completion(task, &engine->fast_work_queue, c->write); + + if (c->write == NULL) { + r = peer->request; + r->state->ready_handler(task, r, peer); + return; + } + + nxt_conn_write(engine, c); +} + + +static void +nxt_h1p_peer_header_read(nxt_task_t *task, nxt_http_peer_t *peer) +{ + nxt_conn_t *c; + + nxt_debug(task, "h1p peer header read"); + + c = peer->proto.h1->conn; + + if (c->write_timer.enabled) { + c->read_state = &nxt_h1p_peer_header_read_state; + + } else { + c->read_state = &nxt_h1p_peer_header_read_timer_state; + } + + nxt_conn_read(task->thread->engine, c); +} + + +static const nxt_conn_state_t nxt_h1p_peer_header_read_state + nxt_aligned(64) = +{ + .ready_handler = nxt_h1p_peer_header_read_done, + .close_handler = nxt_h1p_peer_closed, + .error_handler = nxt_h1p_peer_error, + + .io_read_handler = nxt_h1p_peer_io_read_handler, +}; + + +static const nxt_conn_state_t nxt_h1p_peer_header_read_timer_state + nxt_aligned(64) = +{ + .ready_handler = nxt_h1p_peer_header_read_done, + .close_handler = nxt_h1p_peer_closed, + .error_handler = nxt_h1p_peer_error, + + .io_read_handler = nxt_h1p_peer_io_read_handler, + + .timer_handler = nxt_h1p_peer_read_timeout, + .timer_value = nxt_h1p_peer_timer_value, + .timer_data = offsetof(nxt_socket_conf_t, proxy_timeout), +}; + + +static ssize_t +nxt_h1p_peer_io_read_handler(nxt_task_t *task, nxt_conn_t *c) +{ + size_t size; + ssize_t n; + nxt_buf_t *b; + nxt_http_peer_t *peer; + nxt_socket_conf_t *skcf; + nxt_http_request_t *r; + + peer = c->socket.data; + r = peer->request; + b = c->read; + + if (b == NULL) { + skcf = r->conf->socket_conf; + + size = (peer->header_received) ? skcf->proxy_buffer_size + : skcf->proxy_header_buffer_size; + + nxt_debug(task, "h1p peer io read: %z", size); + + b = nxt_http_proxy_buf_mem_alloc(task, r, size); + if (nxt_slow_path(b == NULL)) { + c->socket.error = NXT_ENOMEM; + return NXT_ERROR; + } + } + + n = c->io->recvbuf(c, b); + + if (n > 0) { + c->read = b; + + } else { + c->read = NULL; + nxt_http_proxy_buf_mem_free(task, r, b); + } + + return n; +} + + +static void +nxt_h1p_peer_header_read_done(nxt_task_t *task, void *obj, void *data) +{ + nxt_int_t ret; + nxt_buf_t *b; + nxt_conn_t *c; + nxt_http_peer_t *peer; + nxt_http_request_t *r; + nxt_event_engine_t *engine; + + c = obj; + peer = data; + + nxt_debug(task, "h1p peer header read done"); + + b = c->read; + + ret = nxt_h1p_peer_header_parse(peer, &b->mem); + + r = peer->request; + + ret = nxt_expect(NXT_DONE, ret); + + if (ret != NXT_AGAIN) { + engine = task->thread->engine; + nxt_timer_disable(engine, &c->write_timer); + nxt_timer_disable(engine, &c->read_timer); + } + + switch (ret) { + + case NXT_DONE: + peer->fields = peer->proto.h1->parser.fields; + + ret = nxt_http_fields_process(peer->fields, + &nxt_h1p_peer_fields_hash, r); + if (nxt_slow_path(ret != NXT_OK)) { + peer->status = NXT_HTTP_INTERNAL_SERVER_ERROR; + break; + } + + c->read = NULL; + + if (nxt_buf_mem_used_size(&b->mem) != 0) { + peer->body = b; + } + + peer->header_received = 1; + + r->state->ready_handler(task, r, peer); + return; + + case NXT_AGAIN: + if (nxt_buf_mem_free_size(&b->mem) != 0) { + nxt_conn_read(task->thread->engine, c); + return; + } + + /* Fall through. */ + + default: + case NXT_ERROR: + case NXT_HTTP_PARSE_INVALID: + case NXT_HTTP_PARSE_UNSUPPORTED_VERSION: + case NXT_HTTP_PARSE_TOO_LARGE_FIELD: + peer->status = NXT_HTTP_BAD_GATEWAY; + break; + } + + nxt_http_proxy_buf_mem_free(task, r, b); + + r->state->error_handler(task, r, peer); +} + + +static nxt_int_t +nxt_h1p_peer_header_parse(nxt_http_peer_t *peer, nxt_buf_mem_t *bm) +{ + u_char *p; + size_t length; + nxt_int_t status; + + if (peer->status < 0) { + length = nxt_buf_mem_used_size(bm); + + if (nxt_slow_path(length < 12)) { + return NXT_AGAIN; + } + + p = bm->pos; + + if (nxt_slow_path(nxt_memcmp(p, "HTTP/1.", 7) != 0 + || (p[7] != '0' && p[7] != '1'))) + { + return NXT_ERROR; + } + + status = nxt_int_parse(&p[9], 3); + + if (nxt_slow_path(status < 0)) { + return NXT_ERROR; + } + + p += 12; + length -= 12; + + p = nxt_memchr(p, '\n', length); + + if (nxt_slow_path(p == NULL)) { + return NXT_AGAIN; + } + + bm->pos = p + 1; + peer->status = status; + } + + return nxt_http_parse_fields(&peer->proto.h1->parser, bm); +} + + +static void +nxt_h1p_peer_read(nxt_task_t *task, nxt_http_peer_t *peer) +{ + nxt_conn_t *c; + + nxt_debug(task, "h1p peer read"); + + c = peer->proto.h1->conn; + c->read_state = &nxt_h1p_peer_read_state; + + nxt_conn_read(task->thread->engine, c); +} + + +static const nxt_conn_state_t nxt_h1p_peer_read_state + nxt_aligned(64) = +{ + .ready_handler = nxt_h1p_peer_read_done, + .close_handler = nxt_h1p_peer_closed, + .error_handler = nxt_h1p_peer_error, + + .io_read_handler = nxt_h1p_peer_io_read_handler, + + .timer_handler = nxt_h1p_peer_read_timeout, + .timer_value = nxt_h1p_peer_timer_value, + .timer_data = offsetof(nxt_socket_conf_t, proxy_read_timeout), + .timer_autoreset = 1, +}; + + +static void +nxt_h1p_peer_read_done(nxt_task_t *task, void *obj, void *data) +{ + nxt_conn_t *c; + nxt_http_peer_t *peer; + nxt_http_request_t *r; + + c = obj; + peer = data; + + nxt_debug(task, "h1p peer read done"); + + peer->body = c->read; + c->read = NULL; + + r = peer->request; + r->state->ready_handler(task, r, peer); +} + + +static void +nxt_h1p_peer_closed(nxt_task_t *task, void *obj, void *data) +{ + nxt_http_peer_t *peer; + nxt_http_request_t *r; + + peer = data; + + nxt_debug(task, "h1p peer closed"); + + r = peer->request; + + if (peer->header_received) { + peer->body = nxt_http_buf_last(r); + + peer->closed = 1; + + r->state->ready_handler(task, r, peer); + + } else { + peer->status = NXT_HTTP_BAD_GATEWAY; + + r->state->error_handler(task, r, peer); + } +} + + +static void +nxt_h1p_peer_error(nxt_task_t *task, void *obj, void *data) +{ + nxt_http_peer_t *peer; + nxt_http_request_t *r; + + peer = data; + + nxt_debug(task, "h1p peer error"); + + peer->status = NXT_HTTP_BAD_GATEWAY; + + r = peer->request; + r->state->error_handler(task, r, peer); +} + + +static void +nxt_h1p_peer_send_timeout(nxt_task_t *task, void *obj, void *data) +{ + nxt_conn_t *c; + nxt_timer_t *timer; + nxt_http_peer_t *peer; + nxt_http_request_t *r; + + timer = obj; + + nxt_debug(task, "h1p peer send timeout"); + + c = nxt_write_timer_conn(timer); + c->block_write = 1; + c->block_read = 1; + + peer = c->socket.data; + peer->status = NXT_HTTP_GATEWAY_TIMEOUT; + + r = peer->request; + r->state->error_handler(task, r, peer); +} + + +static void +nxt_h1p_peer_read_timeout(nxt_task_t *task, void *obj, void *data) +{ + nxt_conn_t *c; + nxt_timer_t *timer; + nxt_http_peer_t *peer; + nxt_http_request_t *r; + + timer = obj; + + nxt_debug(task, "h1p peer read timeout"); + + c = nxt_read_timer_conn(timer); + c->block_write = 1; + c->block_read = 1; + + peer = c->socket.data; + peer->status = NXT_HTTP_GATEWAY_TIMEOUT; + + r = peer->request; + r->state->error_handler(task, r, peer); +} + + +static nxt_msec_t +nxt_h1p_peer_timer_value(nxt_conn_t *c, uintptr_t data) +{ + nxt_http_peer_t *peer; + + peer = c->socket.data; + + return nxt_value_at(nxt_msec_t, peer->request->conf->socket_conf, data); +} + + +static void +nxt_h1p_peer_close(nxt_task_t *task, nxt_http_peer_t *peer) +{ + nxt_conn_t *c; + + nxt_debug(task, "h1p peer close"); + + peer->closed = 1; + + c = peer->proto.h1->conn; + task = &c->task; + c->socket.task = task; + c->read_timer.task = task; + c->write_timer.task = task; + + if (c->socket.fd != -1) { + c->write_state = &nxt_h1p_peer_close_state; + + nxt_conn_close(task->thread->engine, c); + + } else { + nxt_h1p_peer_free(task, c, NULL); + } +} + + +static const nxt_conn_state_t nxt_h1p_peer_close_state + nxt_aligned(64) = +{ + .ready_handler = nxt_h1p_peer_free, +}; + + +static void +nxt_h1p_peer_free(nxt_task_t *task, void *obj, void *data) +{ + nxt_conn_t *c; + + c = obj; + + nxt_debug(task, "h1p peer free"); + + nxt_conn_free(task, c); +} diff --git a/src/nxt_h1proto.h b/src/nxt_h1proto.h index c6d3bd53..61da6770 100644 --- a/src/nxt_h1proto.h +++ b/src/nxt_h1proto.h @@ -20,6 +20,8 @@ struct nxt_h1proto_s { nxt_http_request_parse_t parser; uint8_t nbuffers; + uint8_t header_buffer_slot; + uint8_t large_buffer_slot; uint8_t keepalive; /* 1 bit */ uint8_t chunked; /* 1 bit */ uint8_t websocket; /* 1 bit */ diff --git a/src/nxt_h1proto_websocket.c b/src/nxt_h1proto_websocket.c index 13754be0..c9ff899c 100644 --- a/src/nxt_h1proto_websocket.c +++ b/src/nxt_h1proto_websocket.c @@ -26,7 +26,7 @@ static void nxt_h1p_conn_ws_keepalive_enable(nxt_task_t *task, static void nxt_h1p_conn_ws_frame_process(nxt_task_t *task, nxt_conn_t *c, nxt_h1proto_t *h1p, nxt_websocket_header_t *wsh); static void nxt_h1p_conn_ws_error(nxt_task_t *task, void *obj, void *data); -static ssize_t nxt_h1p_ws_io_read_handler(nxt_conn_t *c); +static ssize_t nxt_h1p_ws_io_read_handler(nxt_task_t *task, nxt_conn_t *c); static void nxt_h1p_conn_ws_timeout(nxt_task_t *task, void *obj, void *data); static void nxt_h1p_conn_ws_frame_payload_read(nxt_task_t *task, void *obj, void *data); @@ -473,7 +473,7 @@ nxt_h1p_conn_ws_error(nxt_task_t *task, void *obj, void *data) static ssize_t -nxt_h1p_ws_io_read_handler(nxt_conn_t *c) +nxt_h1p_ws_io_read_handler(nxt_task_t *task, nxt_conn_t *c) { size_t size; ssize_t n; @@ -697,6 +697,7 @@ nxt_h1p_conn_ws_pong(nxt_task_t *task, void *obj, void *data) for (i = 0; i < payload_len; i++) { while (nxt_buf_mem_used_size(&b->mem) == 0) { next = b->next; + b->next = NULL; nxt_work_queue_add(&task->thread->engine->fast_work_queue, b->completion_handler, task, b, b->parent); diff --git a/src/nxt_http.h b/src/nxt_http.h index 560b7310..030d77a7 100644 --- a/src/nxt_http.h +++ b/src/nxt_http.h @@ -9,6 +9,7 @@ typedef enum { + NXT_HTTP_UNSET = -1, NXT_HTTP_INVALID = 0, NXT_HTTP_CONTINUE = 100, @@ -105,6 +106,21 @@ typedef struct { } nxt_http_response_t; +typedef struct { + nxt_http_proto_t proto; + nxt_http_request_t *request; + nxt_sockaddr_t *sockaddr; + nxt_list_t *fields; + nxt_buf_t *body; + nxt_off_t remainder; + + nxt_http_status_t status:16; + nxt_http_protocol_t protocol:8; /* 2 bits */ + uint8_t header_received; /* 1 bit */ + uint8_t closed; /* 1 bit */ +} nxt_http_peer_t; + + struct nxt_http_request_s { nxt_http_proto_t proto; nxt_socket_conf_joint_t *conf; @@ -137,12 +153,14 @@ struct nxt_http_request_s { nxt_sockaddr_t *remote; nxt_sockaddr_t *local; void *tls; + nxt_task_t task; nxt_timer_t timer; void *timer_data; void *req_rpc_data; + nxt_http_peer_t *peer; nxt_buf_t *last; nxt_http_response_t resp; @@ -153,20 +171,23 @@ struct nxt_http_request_s { nxt_http_protocol_t protocol:8; /* 2 bits */ uint8_t logged; /* 1 bit */ uint8_t header_sent; /* 1 bit */ + uint8_t inconsistent; /* 1 bit */ uint8_t error; /* 1 bit */ uint8_t websocket_handshake; /* 1 bit */ }; typedef struct nxt_http_route_s nxt_http_route_t; +typedef struct nxt_http_upstream_s nxt_http_upstream_t; -struct nxt_http_pass_s { - nxt_http_pass_t *(*handler)(nxt_task_t *task, +struct nxt_http_action_s { + nxt_http_action_t *(*handler)(nxt_task_t *task, nxt_http_request_t *r, - nxt_http_pass_t *pass); + nxt_http_action_t *action); union { nxt_http_route_t *route; + nxt_http_upstream_t *upstream; nxt_app_t *application; } u; @@ -178,12 +199,19 @@ typedef struct { void (*body_read)(nxt_task_t *task, nxt_http_request_t *r); void (*local_addr)(nxt_task_t *task, nxt_http_request_t *r); void (*header_send)(nxt_task_t *task, nxt_http_request_t *r, - nxt_work_handler_t body_handler); + nxt_work_handler_t body_handler, void *data); void (*send)(nxt_task_t *task, nxt_http_request_t *r, nxt_buf_t *out); nxt_off_t (*body_bytes_sent)(nxt_task_t *task, nxt_http_proto_t proto); void (*discard)(nxt_task_t *task, nxt_http_request_t *r, nxt_buf_t *last); void (*close)(nxt_task_t *task, nxt_http_proto_t proto, nxt_socket_conf_joint_t *joint); + + void (*peer_connect)(nxt_task_t *task, nxt_http_peer_t *peer); + void (*peer_header_send)(nxt_task_t *task, nxt_http_peer_t *peer); + void (*peer_header_read)(nxt_task_t *task, nxt_http_peer_t *peer); + void (*peer_read)(nxt_task_t *task, nxt_http_peer_t *peer); + void (*peer_close)(nxt_task_t *task, nxt_http_peer_t *peer); + void (*ws_frame_start)(nxt_task_t *task, nxt_http_request_t *r, nxt_buf_t *ws_frame); } nxt_http_proto_table_t; @@ -218,7 +246,7 @@ void nxt_http_request_error(nxt_task_t *task, nxt_http_request_t *r, nxt_http_status_t status); void nxt_http_request_read_body(nxt_task_t *task, nxt_http_request_t *r); void nxt_http_request_header_send(nxt_task_t *task, nxt_http_request_t *r, - nxt_work_handler_t body_handler); + nxt_work_handler_t body_handler, void *data); void nxt_http_request_ws_frame_start(nxt_task_t *task, nxt_http_request_t *r, nxt_buf_t *ws_frame); void nxt_http_request_send(nxt_task_t *task, nxt_http_request_t *r, @@ -238,24 +266,36 @@ nxt_int_t nxt_http_request_content_length(void *ctx, nxt_http_field_t *field, nxt_http_routes_t *nxt_http_routes_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, nxt_conf_value_t *routes_conf); -nxt_http_pass_t *nxt_http_pass_create(nxt_task_t *task, +nxt_http_action_t *nxt_http_action_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, nxt_str_t *name); void nxt_http_routes_resolve(nxt_task_t *task, nxt_router_temp_conf_t *tmcf); -nxt_http_pass_t *nxt_http_pass_application(nxt_task_t *task, +nxt_http_action_t *nxt_http_pass_application(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, nxt_str_t *name); void nxt_http_routes_cleanup(nxt_task_t *task, nxt_http_routes_t *routes); -void nxt_http_pass_cleanup(nxt_task_t *task, nxt_http_pass_t *pass); +void nxt_http_action_cleanup(nxt_task_t *task, nxt_http_action_t *action); -nxt_http_pass_t *nxt_http_static_handler(nxt_task_t *task, - nxt_http_request_t *r, nxt_http_pass_t *pass); +nxt_http_action_t *nxt_http_static_handler(nxt_task_t *task, + nxt_http_request_t *r, nxt_http_action_t *action); nxt_int_t nxt_http_static_mtypes_init(nxt_mp_t *mp, nxt_lvlhsh_t *hash); nxt_int_t nxt_http_static_mtypes_hash_add(nxt_mp_t *mp, nxt_lvlhsh_t *hash, nxt_str_t *extension, nxt_str_t *type); nxt_str_t *nxt_http_static_mtypes_hash_find(nxt_lvlhsh_t *hash, nxt_str_t *extension); -nxt_http_pass_t *nxt_http_request_application(nxt_task_t *task, - nxt_http_request_t *r, nxt_http_pass_t *pass); +nxt_http_action_t *nxt_http_application_handler(nxt_task_t *task, + nxt_http_request_t *r, nxt_http_action_t *action); + +nxt_int_t nxt_http_proxy_create(nxt_mp_t *mp, nxt_http_action_t *action); +nxt_int_t nxt_http_proxy_date(void *ctx, nxt_http_field_t *field, + uintptr_t data); +nxt_int_t nxt_http_proxy_content_length(void *ctx, nxt_http_field_t *field, + uintptr_t data); +nxt_int_t nxt_http_proxy_skip(void *ctx, nxt_http_field_t *field, + uintptr_t data); +nxt_buf_t *nxt_http_proxy_buf_mem_alloc(nxt_task_t *task, nxt_http_request_t *r, + size_t size); +void nxt_http_proxy_buf_mem_free(nxt_task_t *task, nxt_http_request_t *r, + nxt_buf_t *b); extern nxt_time_string_t nxt_http_date_cache; diff --git a/src/nxt_http_error.c b/src/nxt_http_error.c index 8e8b80f1..370b12db 100644 --- a/src/nxt_http_error.c +++ b/src/nxt_http_error.c @@ -57,8 +57,8 @@ nxt_http_request_error(nxt_task_t *task, nxt_http_request_t *r, r->state = &nxt_http_request_send_error_body_state; - nxt_http_request_header_send(task, r, nxt_http_request_send_error_body); - + nxt_http_request_header_send(task, r, + nxt_http_request_send_error_body, NULL); return; fail: diff --git a/src/nxt_http_parse.c b/src/nxt_http_parse.c index e6e91454..4c5d4936 100644 --- a/src/nxt_http_parse.c +++ b/src/nxt_http_parse.c @@ -787,6 +787,7 @@ nxt_http_parse_field_end(nxt_http_request_parse_t *rp, u_char **pos, field->hash = nxt_http_field_hash_end(rp->field_hash); field->skip = 0; + field->hopbyhop = 0; field->name_length = rp->field_name.length; field->value_length = rp->field_value.length; diff --git a/src/nxt_http_parse.h b/src/nxt_http_parse.h index d7ce5e4f..d319c71d 100644 --- a/src/nxt_http_parse.h +++ b/src/nxt_http_parse.h @@ -81,7 +81,8 @@ typedef struct { struct nxt_http_field_s { uint16_t hash; - uint8_t skip; /* 1 bit */ + uint8_t skip:1; + uint8_t hopbyhop:1; uint8_t name_length; uint32_t value_length; u_char *name; diff --git a/src/nxt_http_proxy.c b/src/nxt_http_proxy.c new file mode 100644 index 00000000..7f4eeff2 --- /dev/null +++ b/src/nxt_http_proxy.c @@ -0,0 +1,403 @@ + +/* + * Copyright (C) Igor Sysoev + * Copyright (C) NGINX, Inc. + */ + +#include <nxt_router.h> +#include <nxt_http.h> + + +typedef void (*nxt_http_upstream_connect_t)(nxt_task_t *task, + nxt_http_upstream_t *upstream, nxt_http_peer_t *peer); + + +struct nxt_http_upstream_s { + uint32_t current; + uint32_t n; + uint8_t protocol; + nxt_http_upstream_connect_t connect; + nxt_sockaddr_t *sockaddr[1]; +}; + + +static void nxt_http_upstream_connect(nxt_task_t *task, + nxt_http_upstream_t *upstream, nxt_http_peer_t *peer); +static nxt_http_action_t *nxt_http_proxy_handler(nxt_task_t *task, + nxt_http_request_t *r, nxt_http_action_t *action); +static void nxt_http_proxy_header_send(nxt_task_t *task, void *obj, void *data); +static void nxt_http_proxy_header_sent(nxt_task_t *task, void *obj, void *data); +static void nxt_http_proxy_header_read(nxt_task_t *task, void *obj, void *data); +static void nxt_http_proxy_send_body(nxt_task_t *task, void *obj, void *data); +static void nxt_http_proxy_request_send(nxt_task_t *task, + nxt_http_request_t *r, nxt_buf_t *out); +static void nxt_http_proxy_read(nxt_task_t *task, void *obj, void *data); +static void nxt_http_proxy_buf_mem_completion(nxt_task_t *task, void *obj, + void *data); +static void nxt_http_proxy_error(nxt_task_t *task, void *obj, void *data); + + +static const nxt_http_request_state_t nxt_http_proxy_header_send_state; +static const nxt_http_request_state_t nxt_http_proxy_header_sent_state; +static const nxt_http_request_state_t nxt_http_proxy_header_read_state; +static const nxt_http_request_state_t nxt_http_proxy_read_state; + + +nxt_int_t +nxt_http_proxy_create(nxt_mp_t *mp, nxt_http_action_t *action) +{ + nxt_str_t name; + nxt_sockaddr_t *sa; + nxt_http_upstream_t *upstream; + + sa = NULL; + name = action->name; + + if (nxt_str_start(&name, "http://", 7)) { + name.length -= 7; + name.start += 7; + + sa = nxt_sockaddr_parse(mp, &name); + if (nxt_slow_path(sa == NULL)) { + return NXT_ERROR; + } + + sa->type = SOCK_STREAM; + } + + if (sa != NULL) { + upstream = nxt_mp_alloc(mp, sizeof(nxt_http_upstream_t)); + if (nxt_slow_path(upstream == NULL)) { + return NXT_ERROR; + } + + upstream->current = 0; + upstream->n = 1; + upstream->protocol = NXT_HTTP_PROTO_H1; + upstream->connect = nxt_http_upstream_connect; + upstream->sockaddr[0] = sa; + + action->u.upstream = upstream; + action->handler = nxt_http_proxy_handler; + } + + return NXT_OK; +} + + +static nxt_http_action_t * +nxt_http_proxy_handler(nxt_task_t *task, nxt_http_request_t *r, + nxt_http_action_t *action) +{ + nxt_http_peer_t *peer; + + peer = nxt_mp_zalloc(r->mem_pool, sizeof(nxt_http_peer_t)); + if (nxt_slow_path(peer == NULL)) { + nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); + return NULL; + } + + peer->request = r; + r->peer = peer; + + nxt_mp_retain(r->mem_pool); + + action->u.upstream->connect(task, action->u.upstream, peer); + + return NULL; +} + + +static void +nxt_http_upstream_connect(nxt_task_t *task, nxt_http_upstream_t *upstream, + nxt_http_peer_t *peer) +{ + peer->protocol = upstream->protocol; + peer->sockaddr = upstream->sockaddr[0]; + + peer->request->state = &nxt_http_proxy_header_send_state; + + nxt_http_proto[peer->protocol].peer_connect(task, peer); +} + + +static const nxt_http_request_state_t nxt_http_proxy_header_send_state + nxt_aligned(64) = +{ + .ready_handler = nxt_http_proxy_header_send, + .error_handler = nxt_http_proxy_error, +}; + + +static void +nxt_http_proxy_header_send(nxt_task_t *task, void *obj, void *data) +{ + nxt_http_peer_t *peer; + nxt_http_request_t *r; + + r = obj; + peer = data; + r->state = &nxt_http_proxy_header_sent_state; + + nxt_http_proto[peer->protocol].peer_header_send(task, peer); +} + + +static const nxt_http_request_state_t nxt_http_proxy_header_sent_state + nxt_aligned(64) = +{ + .ready_handler = nxt_http_proxy_header_sent, + .error_handler = nxt_http_proxy_error, +}; + + +static void +nxt_http_proxy_header_sent(nxt_task_t *task, void *obj, void *data) +{ + nxt_http_peer_t *peer; + nxt_http_request_t *r; + + r = obj; + peer = data; + r->state = &nxt_http_proxy_header_read_state; + + nxt_http_proto[peer->protocol].peer_header_read(task, peer); +} + + +static const nxt_http_request_state_t nxt_http_proxy_header_read_state + nxt_aligned(64) = +{ + .ready_handler = nxt_http_proxy_header_read, + .error_handler = nxt_http_proxy_error, +}; + + +static void +nxt_http_proxy_header_read(nxt_task_t *task, void *obj, void *data) +{ + nxt_http_peer_t *peer; + nxt_http_field_t *f, *field; + nxt_http_request_t *r; + + r = obj; + peer = data; + + r->status = peer->status; + + nxt_debug(task, "http proxy status: %d", peer->status); + + if (r->resp.content_length_n > 0) { + peer->remainder = r->resp.content_length_n; + } + + nxt_list_each(field, peer->fields) { + + nxt_debug(task, "http proxy header: \"%*s: %*s\"", + (size_t) field->name_length, field->name, + (size_t) field->value_length, field->value); + + if (!field->skip) { + f = nxt_list_add(r->resp.fields); + if (nxt_slow_path(f == NULL)) { + nxt_http_proxy_error(task, r, peer); + return; + } + + *f = *field; + } + + } nxt_list_loop; + + nxt_http_request_header_send(task, r, nxt_http_proxy_send_body, peer); +} + + +static void +nxt_http_proxy_send_body(nxt_task_t *task, void *obj, void *data) +{ + nxt_buf_t *out; + nxt_http_peer_t *peer; + nxt_http_request_t *r; + + r = obj; + peer = data; + out = peer->body; + + if (out != NULL) { + peer->body = NULL; + nxt_http_proxy_request_send(task, r, out); + } + + r->state = &nxt_http_proxy_read_state; + + nxt_http_proto[peer->protocol].peer_read(task, peer); +} + + +static void +nxt_http_proxy_request_send(nxt_task_t *task, nxt_http_request_t *r, + nxt_buf_t *out) +{ + size_t length; + + if (r->peer->remainder > 0) { + length = nxt_buf_chain_length(out); + r->peer->remainder -= length; + } + + nxt_http_request_send(task, r, out); +} + + +static const nxt_http_request_state_t nxt_http_proxy_read_state + nxt_aligned(64) = +{ + .ready_handler = nxt_http_proxy_read, + .error_handler = nxt_http_proxy_error, +}; + + +static void +nxt_http_proxy_read(nxt_task_t *task, void *obj, void *data) +{ + nxt_buf_t *out; + nxt_bool_t last; + nxt_http_peer_t *peer; + nxt_http_request_t *r; + + r = obj; + peer = data; + out = peer->body; + peer->body = NULL; + last = nxt_buf_is_last(out); + + nxt_http_proxy_request_send(task, r, out); + + if (!last) { + nxt_http_proto[peer->protocol].peer_read(task, peer); + + } else { + r->inconsistent = (peer->remainder != 0); + + nxt_http_proto[peer->protocol].peer_close(task, peer); + + nxt_mp_release(r->mem_pool); + } +} + + +nxt_buf_t * +nxt_http_proxy_buf_mem_alloc(nxt_task_t *task, nxt_http_request_t *r, + size_t size) +{ + nxt_buf_t *b; + + b = nxt_event_engine_buf_mem_alloc(task->thread->engine, size); + if (nxt_fast_path(b != NULL)) { + b->completion_handler = nxt_http_proxy_buf_mem_completion; + b->parent = r; + nxt_mp_retain(r->mem_pool); + + } else { + nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); + } + + return b; +} + + +static void +nxt_http_proxy_buf_mem_completion(nxt_task_t *task, void *obj, void *data) +{ + nxt_buf_t *b, *next; + nxt_http_peer_t *peer; + nxt_http_request_t *r; + + b = obj; + r = data; + + peer = r->peer; + + do { + next = b->next; + + nxt_http_proxy_buf_mem_free(task, r, b); + + b = next; + } while (b != NULL); + + if (!peer->closed) { + nxt_http_proto[peer->protocol].peer_read(task, peer); + } +} + + +void +nxt_http_proxy_buf_mem_free(nxt_task_t *task, nxt_http_request_t *r, + nxt_buf_t *b) +{ + nxt_event_engine_buf_mem_free(task->thread->engine, b); + + nxt_mp_release(r->mem_pool); +} + + +static void +nxt_http_proxy_error(nxt_task_t *task, void *obj, void *data) +{ + nxt_http_peer_t *peer; + nxt_http_request_t *r; + + r = obj; + peer = r->peer; + + nxt_http_proto[peer->protocol].peer_close(task, peer); + + nxt_mp_release(r->mem_pool); + + nxt_http_request_error(task, r, peer->status); +} + + +nxt_int_t +nxt_http_proxy_date(void *ctx, nxt_http_field_t *field, uintptr_t data) +{ + nxt_http_request_t *r; + + r = ctx; + + r->resp.date = field; + + return NXT_OK; +} + + +nxt_int_t +nxt_http_proxy_content_length(void *ctx, nxt_http_field_t *field, + uintptr_t data) +{ + nxt_off_t n; + nxt_http_request_t *r; + + r = ctx; + + r->resp.content_length = field; + + n = nxt_off_t_parse(field->value, field->value_length); + + if (nxt_fast_path(n >= 0)) { + r->resp.content_length_n = n; + } + + return NXT_OK; +} + + +nxt_int_t +nxt_http_proxy_skip(void *ctx, nxt_http_field_t *field, uintptr_t data) +{ + field->skip = 1; + + return NXT_OK; +} diff --git a/src/nxt_http_request.c b/src/nxt_http_request.c index a18a02e7..14c75dab 100644 --- a/src/nxt_http_request.c +++ b/src/nxt_http_request.c @@ -10,7 +10,7 @@ static nxt_int_t nxt_http_validate_host(nxt_str_t *host, nxt_mp_t *mp); static void nxt_http_request_start(nxt_task_t *task, void *obj, void *data); -static void nxt_http_request_pass(nxt_task_t *task, void *obj, void *data); +static void nxt_http_request_action(nxt_task_t *task, void *obj, void *data); static void nxt_http_request_proto_info(nxt_task_t *task, nxt_http_request_t *r); static void nxt_http_request_mem_buf_completion(nxt_task_t *task, void *obj, @@ -278,33 +278,33 @@ nxt_http_request_start(nxt_task_t *task, void *obj, void *data) static const nxt_http_request_state_t nxt_http_request_body_state nxt_aligned(64) = { - .ready_handler = nxt_http_request_pass, + .ready_handler = nxt_http_request_action, .error_handler = nxt_http_request_close_handler, }; static void -nxt_http_request_pass(nxt_task_t *task, void *obj, void *data) +nxt_http_request_action(nxt_task_t *task, void *obj, void *data) { - nxt_http_pass_t *pass; + nxt_http_action_t *action; nxt_http_request_t *r; r = obj; - pass = r->conf->socket_conf->pass; + action = r->conf->socket_conf->action; - if (nxt_fast_path(pass != NULL)) { + if (nxt_fast_path(action != NULL)) { do { - nxt_debug(task, "http request route: %V", &pass->name); + nxt_debug(task, "http request route: %V", &action->name); - pass = pass->handler(task, r, pass); + action = action->handler(task, r, action); - if (pass == NULL) { + if (action == NULL) { return; } - if (pass == NXT_HTTP_PASS_ERROR) { + if (action == NXT_HTTP_ACTION_ERROR) { break; } @@ -315,13 +315,13 @@ nxt_http_request_pass(nxt_task_t *task, void *obj, void *data) } -nxt_http_pass_t * -nxt_http_request_application(nxt_task_t *task, nxt_http_request_t *r, - nxt_http_pass_t *pass) +nxt_http_action_t * +nxt_http_application_handler(nxt_task_t *task, nxt_http_request_t *r, + nxt_http_action_t *action) { nxt_event_engine_t *engine; - nxt_debug(task, "http request application"); + nxt_debug(task, "http application handler"); nxt_mp_retain(r->mem_pool); @@ -344,7 +344,7 @@ nxt_http_request_application(nxt_task_t *task, nxt_http_request_t *r, nxt_str_set(&r->server_name, "localhost"); } - nxt_router_process_http_request(task, r, pass->u.application); + nxt_router_process_http_request(task, r, action->u.application); return NULL; } @@ -370,7 +370,7 @@ nxt_http_request_read_body(nxt_task_t *task, nxt_http_request_t *r) void nxt_http_request_header_send(nxt_task_t *task, nxt_http_request_t *r, - nxt_work_handler_t body_handler) + nxt_work_handler_t body_handler, void *data) { u_char *p, *end; nxt_http_field_t *server, *date, *content_length; @@ -431,7 +431,7 @@ nxt_http_request_header_send(nxt_task_t *task, nxt_http_request_t *r, } if (nxt_fast_path(r->proto.any != NULL)) { - nxt_http_proto[r->protocol].header_send(task, r, body_handler); + nxt_http_proto[r->protocol].header_send(task, r, body_handler, data); } return; @@ -483,15 +483,20 @@ nxt_http_buf_mem(nxt_task_t *task, nxt_http_request_t *r, size_t size) static void nxt_http_request_mem_buf_completion(nxt_task_t *task, void *obj, void *data) { - nxt_buf_t *b; + nxt_buf_t *b, *next; nxt_http_request_t *r; b = obj; r = data; - nxt_mp_free(r->mem_pool, b); + do { + next = b->next; - nxt_mp_release(r->mem_pool); + nxt_mp_free(r->mem_pool, b); + nxt_mp_release(r->mem_pool); + + b = next; + } while (b != NULL); } @@ -570,9 +575,9 @@ nxt_http_request_close_handler(nxt_task_t *task, void *obj, void *data) if (nxt_fast_path(proto.any != NULL)) { protocol = r->protocol; - nxt_mp_release(r->mem_pool); - nxt_http_proto[protocol].close(task, proto, conf); + + nxt_mp_release(r->mem_pool); } } diff --git a/src/nxt_http_route.c b/src/nxt_http_route.c index c3c11faa..18b352ea 100644 --- a/src/nxt_http_route.c +++ b/src/nxt_http_route.c @@ -36,6 +36,13 @@ typedef enum { typedef struct { + nxt_conf_value_t *pass; + nxt_conf_value_t *share; + nxt_conf_value_t *proxy; +} nxt_http_route_action_conf_t; + + +typedef struct { nxt_conf_value_t *host; nxt_conf_value_t *uri; nxt_conf_value_t *method; @@ -119,7 +126,7 @@ typedef union { typedef struct { uint32_t items; - nxt_http_pass_t pass; + nxt_http_action_t action; nxt_http_route_test_t test[0]; } nxt_http_route_match_t; @@ -152,6 +159,8 @@ static nxt_http_route_t *nxt_http_route_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, nxt_conf_value_t *cv); static nxt_http_route_match_t *nxt_http_route_match_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, nxt_conf_value_t *cv); +static nxt_int_t nxt_http_route_action_create(nxt_router_temp_conf_t *tmcf, + nxt_conf_value_t *cv, nxt_http_route_match_t *match); static nxt_http_route_table_t *nxt_http_route_table_create(nxt_task_t *task, nxt_mp_t *mp, nxt_conf_value_t *table_cv, nxt_http_route_object_t object, nxt_bool_t case_sensitive); @@ -173,15 +182,15 @@ static u_char *nxt_http_route_pattern_copy(nxt_mp_t *mp, nxt_str_t *test, static void nxt_http_route_resolve(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, nxt_http_route_t *route); -static void nxt_http_pass_resolve(nxt_task_t *task, - nxt_router_temp_conf_t *tmcf, nxt_http_pass_t *pass); +static void nxt_http_action_resolve(nxt_task_t *task, + nxt_router_temp_conf_t *tmcf, nxt_http_action_t *action); static nxt_http_route_t *nxt_http_route_find(nxt_http_routes_t *routes, nxt_str_t *name); static void nxt_http_route_cleanup(nxt_task_t *task, nxt_http_route_t *routes); -static nxt_http_pass_t *nxt_http_route_pass(nxt_task_t *task, - nxt_http_request_t *r, nxt_http_pass_t *start); -static nxt_http_pass_t *nxt_http_route_match(nxt_http_request_t *r, +static nxt_http_action_t *nxt_http_route_handler(nxt_task_t *task, + nxt_http_request_t *r, nxt_http_action_t *start); +static nxt_http_action_t *nxt_http_route_match(nxt_http_request_t *r, nxt_http_route_match_t *match); static nxt_int_t nxt_http_route_table(nxt_http_request_t *r, nxt_http_route_table_t *table); @@ -367,16 +376,13 @@ nxt_http_route_match_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, uint32_t n; nxt_mp_t *mp; nxt_int_t ret; - nxt_str_t pass, *string; - nxt_conf_value_t *match_conf, *pass_conf; + nxt_conf_value_t *match_conf; nxt_http_route_test_t *test; nxt_http_route_rule_t *rule; nxt_http_route_table_t *table; nxt_http_route_match_t *match; nxt_http_route_match_conf_t mtcf; - static nxt_str_t pass_path = nxt_string("/action/pass"); - static nxt_str_t share_path = nxt_string("/action/share"); static nxt_str_t match_path = nxt_string("/match"); match_conf = nxt_conf_get_path(cv, &match_path); @@ -391,25 +397,12 @@ nxt_http_route_match_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, return NULL; } - match->pass.u.route = NULL; - match->pass.handler = NULL; + match->action.u.route = NULL; + match->action.handler = NULL; match->items = n; - pass_conf = nxt_conf_get_path(cv, &pass_path); - - if (pass_conf == NULL) { - pass_conf = nxt_conf_get_path(cv, &share_path); - if (nxt_slow_path(pass_conf == NULL)) { - return NULL; - } - - match->pass.handler = nxt_http_static_handler; - } - - nxt_conf_get_string(pass_conf, &pass); - - string = nxt_str_dup(mp, &match->pass.name, &pass); - if (nxt_slow_path(string == NULL)) { + ret = nxt_http_route_action_create(tmcf, cv, match); + if (nxt_slow_path(ret != NXT_OK)) { return NULL; } @@ -516,6 +509,78 @@ nxt_http_route_match_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, } +static nxt_conf_map_t nxt_http_route_action_conf[] = { + { + nxt_string("pass"), + NXT_CONF_MAP_PTR, + offsetof(nxt_http_route_action_conf_t, pass) + }, + { + nxt_string("share"), + NXT_CONF_MAP_PTR, + offsetof(nxt_http_route_action_conf_t, share) + }, + { + nxt_string("proxy"), + NXT_CONF_MAP_PTR, + offsetof(nxt_http_route_action_conf_t, proxy) + }, +}; + + +static nxt_int_t +nxt_http_route_action_create(nxt_router_temp_conf_t *tmcf, nxt_conf_value_t *cv, + nxt_http_route_match_t *match) +{ + nxt_mp_t *mp; + nxt_int_t ret; + nxt_str_t name, *string; + nxt_conf_value_t *conf, *action_conf; + nxt_http_route_action_conf_t accf; + + static nxt_str_t action_path = nxt_string("/action"); + + action_conf = nxt_conf_get_path(cv, &action_path); + if (action_conf == NULL) { + return NXT_ERROR; + } + + nxt_memzero(&accf, sizeof(accf)); + + ret = nxt_conf_map_object(tmcf->mem_pool, + action_conf, nxt_http_route_action_conf, + nxt_nitems(nxt_http_route_action_conf), &accf); + if (ret != NXT_OK) { + return ret; + } + + conf = accf.pass; + + if (accf.share != NULL) { + conf = accf.share; + match->action.handler = nxt_http_static_handler; + + } else if (accf.proxy != NULL) { + conf = accf.proxy; + } + + nxt_conf_get_string(conf, &name); + + mp = tmcf->router_conf->mem_pool; + + string = nxt_str_dup(mp, &match->action.name, &name); + if (nxt_slow_path(string == NULL)) { + return NXT_ERROR; + } + + if (accf.proxy != NULL) { + return nxt_http_proxy_create(mp, &match->action); + } + + return NXT_OK; +} + + static nxt_http_route_table_t * nxt_http_route_table_create(nxt_task_t *task, nxt_mp_t *mp, nxt_conf_value_t *table_cv, nxt_http_route_object_t object, @@ -877,17 +942,17 @@ static void nxt_http_route_resolve(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, nxt_http_route_t *route) { - nxt_http_pass_t *pass; + nxt_http_action_t *action; nxt_http_route_match_t **match, **end; match = &route->match[0]; end = match + route->items; while (match < end) { - pass = &(*match)->pass; + action = &(*match)->action; - if (pass->handler == NULL) { - nxt_http_pass_resolve(task, tmcf, &(*match)->pass); + if (action->handler == NULL) { + nxt_http_action_resolve(task, tmcf, &(*match)->action); } match++; @@ -896,21 +961,21 @@ nxt_http_route_resolve(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, static void -nxt_http_pass_resolve(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, - nxt_http_pass_t *pass) +nxt_http_action_resolve(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, + nxt_http_action_t *action) { nxt_str_t name; - name = pass->name; + name = action->name; if (nxt_str_start(&name, "applications/", 13)) { name.length -= 13; name.start += 13; - pass->u.application = nxt_router_listener_application(tmcf, &name); - nxt_router_app_use(task, pass->u.application, 1); + action->u.application = nxt_router_listener_application(tmcf, &name); + nxt_router_app_use(task, action->u.application, 1); - pass->handler = nxt_http_request_application; + action->handler = nxt_http_application_handler; } else if (nxt_str_start(&name, "routes", 6)) { @@ -923,9 +988,9 @@ nxt_http_pass_resolve(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, name.start += 7; } - pass->u.route = nxt_http_route_find(tmcf->router_conf->routes, &name); + action->u.route = nxt_http_route_find(tmcf->router_conf->routes, &name); - pass->handler = nxt_http_route_pass; + action->handler = nxt_http_route_handler; } } @@ -950,46 +1015,49 @@ nxt_http_route_find(nxt_http_routes_t *routes, nxt_str_t *name) } -nxt_http_pass_t * -nxt_http_pass_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, +nxt_http_action_t * +nxt_http_action_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, nxt_str_t *name) { - nxt_http_pass_t *pass; + nxt_http_action_t *action; - pass = nxt_mp_alloc(tmcf->router_conf->mem_pool, sizeof(nxt_http_pass_t)); - if (nxt_slow_path(pass == NULL)) { + action = nxt_mp_alloc(tmcf->router_conf->mem_pool, + sizeof(nxt_http_action_t)); + if (nxt_slow_path(action == NULL)) { return NULL; } - pass->name = *name; + action->name = *name; + action->handler = NULL; - nxt_http_pass_resolve(task, tmcf, pass); + nxt_http_action_resolve(task, tmcf, action); - return pass; + return action; } /* COMPATIBILITY: listener application. */ -nxt_http_pass_t * +nxt_http_action_t * nxt_http_pass_application(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, nxt_str_t *name) { - nxt_http_pass_t *pass; + nxt_http_action_t *action; - pass = nxt_mp_alloc(tmcf->router_conf->mem_pool, sizeof(nxt_http_pass_t)); - if (nxt_slow_path(pass == NULL)) { + action = nxt_mp_alloc(tmcf->router_conf->mem_pool, + sizeof(nxt_http_action_t)); + if (nxt_slow_path(action == NULL)) { return NULL; } - pass->name = *name; + action->name = *name; - pass->u.application = nxt_router_listener_application(tmcf, name); - nxt_router_app_use(task, pass->u.application, 1); + action->u.application = nxt_router_listener_application(tmcf, name); + nxt_router_app_use(task, action->u.application, 1); - pass->handler = nxt_http_request_application; + action->handler = nxt_http_application_handler; - return pass; + return action; } @@ -1020,7 +1088,7 @@ nxt_http_route_cleanup(nxt_task_t *task, nxt_http_route_t *route) end = match + route->items; while (match < end) { - nxt_http_pass_cleanup(task, &(*match)->pass); + nxt_http_action_cleanup(task, &(*match)->action); match++; } @@ -1028,20 +1096,20 @@ nxt_http_route_cleanup(nxt_task_t *task, nxt_http_route_t *route) void -nxt_http_pass_cleanup(nxt_task_t *task, nxt_http_pass_t *pass) +nxt_http_action_cleanup(nxt_task_t *task, nxt_http_action_t *action) { - if (pass->handler == nxt_http_request_application) { - nxt_router_app_use(task, pass->u.application, -1); + if (action->handler == nxt_http_application_handler) { + nxt_router_app_use(task, action->u.application, -1); } } -static nxt_http_pass_t * -nxt_http_route_pass(nxt_task_t *task, nxt_http_request_t *r, - nxt_http_pass_t *start) +static nxt_http_action_t * +nxt_http_route_handler(nxt_task_t *task, nxt_http_request_t *r, + nxt_http_action_t *start) { - nxt_http_pass_t *pass; nxt_http_route_t *route; + nxt_http_action_t *action; nxt_http_route_match_t **match, **end; route = start->u.route; @@ -1049,9 +1117,9 @@ nxt_http_route_pass(nxt_task_t *task, nxt_http_request_t *r, end = match + route->items; while (match < end) { - pass = nxt_http_route_match(r, *match); - if (pass != NULL) { - return pass; + action = nxt_http_route_match(r, *match); + if (action != NULL) { + return action; } match++; @@ -1063,7 +1131,7 @@ nxt_http_route_pass(nxt_task_t *task, nxt_http_request_t *r, } -static nxt_http_pass_t * +static nxt_http_action_t * nxt_http_route_match(nxt_http_request_t *r, nxt_http_route_match_t *match) { nxt_int_t ret; @@ -1081,14 +1149,14 @@ nxt_http_route_match(nxt_http_request_t *r, nxt_http_route_match_t *match) } if (ret <= 0) { - /* 0 => NULL, -1 => NXT_HTTP_PASS_ERROR. */ - return (nxt_http_pass_t *) (intptr_t) ret; + /* 0 => NULL, -1 => NXT_HTTP_ACTION_ERROR. */ + return (nxt_http_action_t *) (intptr_t) ret; } test++; } - return &match->pass; + return &match->action; } diff --git a/src/nxt_http_static.c b/src/nxt_http_static.c index 48a989cf..44132859 100644 --- a/src/nxt_http_static.c +++ b/src/nxt_http_static.c @@ -27,9 +27,9 @@ static void nxt_http_static_mtypes_hash_free(void *data, void *p); static const nxt_http_request_state_t nxt_http_static_send_state; -nxt_http_pass_t * +nxt_http_action_t * nxt_http_static_handler(nxt_task_t *task, nxt_http_request_t *r, - nxt_http_pass_t *pass) + nxt_http_action_t *action) { size_t alloc, encode; u_char *p; @@ -76,7 +76,7 @@ nxt_http_static_handler(nxt_task_t *task, nxt_http_request_t *r, nxt_str_null(&extension); } - alloc = pass->name.length + r->path->length + index.length + 1; + alloc = action->name.length + r->path->length + index.length + 1; f->name = nxt_mp_nget(r->mem_pool, alloc); if (nxt_slow_path(f->name == NULL)) { @@ -84,7 +84,7 @@ nxt_http_static_handler(nxt_task_t *task, nxt_http_request_t *r, } p = f->name; - p = nxt_cpymem(p, pass->name.start, pass->name.length); + p = nxt_cpymem(p, action->name.start, action->name.length); p = nxt_cpymem(p, r->path->start, r->path->length); p = nxt_cpymem(p, index.start, index.length); *p = '\0'; @@ -272,7 +272,7 @@ nxt_http_static_handler(nxt_task_t *task, nxt_http_request_t *r, body_handler = NULL; } - nxt_http_request_header_send(task, r, body_handler); + nxt_http_request_header_send(task, r, body_handler, NULL); r->state = &nxt_http_static_send_state; return NULL; diff --git a/src/nxt_http_websocket.c b/src/nxt_http_websocket.c index d58d615c..fb888f5d 100644 --- a/src/nxt_http_websocket.c +++ b/src/nxt_http_websocket.c @@ -88,6 +88,7 @@ nxt_http_websocket_client(nxt_task_t *task, void *obj, void *data) frame_size -= copy_size; next = b->next; + b->next = NULL; if (nxt_buf_mem_used_size(&b->mem) == 0) { nxt_work_queue_add(&task->thread->engine->fast_work_queue, diff --git a/src/nxt_main_process.c b/src/nxt_main_process.c index 85685fbc..cfe0341f 100644 --- a/src/nxt_main_process.c +++ b/src/nxt_main_process.c @@ -70,8 +70,8 @@ static void nxt_main_port_access_log_handler(nxt_task_t *task, static nxt_int_t nxt_init_set_isolation(nxt_task_t *task, nxt_process_init_t *init, nxt_conf_value_t *isolation); -static nxt_int_t nxt_init_set_ns(nxt_task_t *task, - nxt_process_init_t *init, nxt_conf_value_t *ns); +static nxt_int_t nxt_init_set_ns(nxt_task_t *task, nxt_process_init_t *init, + nxt_conf_value_t *ns); const nxt_sig_event_t nxt_main_process_signals[] = { nxt_event_signal(SIGHUP, nxt_main_process_signal_handler), @@ -397,31 +397,19 @@ nxt_main_process_port_create(nxt_task_t *task, nxt_runtime_t *rt) nxt_port_t *port; nxt_process_t *process; - process = nxt_runtime_process_get(rt, nxt_pid); - if (nxt_slow_path(process == NULL)) { - return NXT_ERROR; - } - - port = nxt_port_new(task, 0, nxt_pid, NXT_PROCESS_MAIN); + port = nxt_runtime_process_port_create(task, rt, nxt_pid, 0, + NXT_PROCESS_MAIN); if (nxt_slow_path(port == NULL)) { - nxt_process_use(task, process, -1); return NXT_ERROR; } - nxt_process_port_add(task, process, port); - - nxt_process_use(task, process, -1); + process = port->process; ret = nxt_port_socket_init(task, port, 0); if (nxt_slow_path(ret != NXT_OK)) { - nxt_port_use(task, port, -1); return ret; } - nxt_runtime_port_add(task, port); - - nxt_port_use(task, port, -1); - /* * A main process port. A write port is not closed * since it should be inherited by worker processes. @@ -465,13 +453,11 @@ nxt_main_start_controller_process(nxt_task_t *task, nxt_runtime_t *rt) { nxt_process_init_t *init; - init = nxt_malloc(sizeof(nxt_process_init_t)); + init = nxt_mp_zalloc(rt->mem_pool, sizeof(nxt_process_init_t)); if (nxt_slow_path(init == NULL)) { return NXT_ERROR; } - nxt_memzero(init, sizeof(nxt_process_init_t)); - init->start = nxt_controller_start; init->name = "controller"; init->user_cred = &rt->user_cred; @@ -561,13 +547,11 @@ nxt_main_start_discovery_process(nxt_task_t *task, nxt_runtime_t *rt) { nxt_process_init_t *init; - init = nxt_malloc(sizeof(nxt_process_init_t)); + init = nxt_mp_zalloc(rt->mem_pool, sizeof(nxt_process_init_t)); if (nxt_slow_path(init == NULL)) { return NXT_ERROR; } - nxt_memzero(init, sizeof(nxt_process_init_t)); - init->start = nxt_discovery_start; init->name = "discovery"; init->user_cred = &rt->user_cred; @@ -587,13 +571,11 @@ nxt_main_start_router_process(nxt_task_t *task, nxt_runtime_t *rt) { nxt_process_init_t *init; - init = nxt_malloc(sizeof(nxt_process_init_t)); + init = nxt_mp_zalloc(rt->mem_pool, sizeof(nxt_process_init_t)); if (nxt_slow_path(init == NULL)) { return NXT_ERROR; } - nxt_memzero(init, sizeof(nxt_process_init_t)); - init->start = nxt_router_start; init->name = "router"; init->user_cred = &rt->user_cred; @@ -627,13 +609,11 @@ nxt_main_start_worker_process(nxt_task_t *task, nxt_runtime_t *rt, + app_conf->group.length + 1; } - init = nxt_malloc(size); + init = nxt_mp_zalloc(rt->mem_pool, size); if (nxt_slow_path(init == NULL)) { return NXT_ERROR; } - nxt_memzero(init, sizeof(nxt_process_init_t)); - if (rt->capabilities.setid) { init->user_cred = nxt_pointer_to(init, sizeof(nxt_process_init_t)); user = nxt_pointer_to(init->user_cred, sizeof(nxt_user_cred_t)); @@ -705,7 +685,7 @@ nxt_main_start_worker_process(nxt_task_t *task, nxt_runtime_t *rt, fail: - nxt_free(init); + nxt_mp_free(rt->mem_pool, init); return NXT_ERROR; } @@ -726,6 +706,8 @@ nxt_main_create_worker_process(nxt_task_t *task, nxt_runtime_t *rt, process = nxt_runtime_process_new(rt); if (nxt_slow_path(process == NULL)) { + nxt_mp_free(rt->mem_pool, init); + return NXT_ERROR; } @@ -785,8 +767,6 @@ nxt_main_stop_all_processes(nxt_task_t *task, nxt_runtime_t *rt) nxt_runtime_process_each(rt, process) { if (nxt_pid != process->pid) { - process->init = NULL; - nxt_process_port_each(process, port) { (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, @@ -1005,10 +985,11 @@ nxt_main_cleanup_worker_process(nxt_task_t *task, nxt_pid_t pid) if (process) { init = process->init; + process->init = NULL; ptype = nxt_process_type(process); - if (process->ready && init != NULL) { + if (process->ready) { init->stream = 0; } @@ -1057,7 +1038,7 @@ nxt_main_cleanup_worker_process(nxt_task_t *task, nxt_pid_t pid) init->restart(task, rt, init); } else { - nxt_free(init); + nxt_mp_free(rt->mem_pool, init); } } } @@ -1540,7 +1521,8 @@ nxt_init_set_isolation(nxt_task_t *task, nxt_process_init_t *init, static nxt_int_t -nxt_init_set_ns(nxt_task_t *task, nxt_process_init_t *init, nxt_conf_value_t *namespaces) +nxt_init_set_ns(nxt_task_t *task, nxt_process_init_t *init, + nxt_conf_value_t *namespaces) { uint32_t index; nxt_str_t name; @@ -1549,7 +1531,13 @@ nxt_init_set_ns(nxt_task_t *task, nxt_process_init_t *init, nxt_conf_value_t *na index = 0; - while ((value = nxt_conf_next_object_member(namespaces, &name, &index)) != NULL) { + for ( ;; ) { + value = nxt_conf_next_object_member(namespaces, &name, &index); + + if (value == NULL) { + break; + } + flag = 0; #if (NXT_HAVE_CLONE_NEWUSER) @@ -1593,11 +1581,9 @@ nxt_init_set_ns(nxt_task_t *task, nxt_process_init_t *init, nxt_conf_value_t *na return NXT_ERROR; } - if (nxt_conf_get_integer(value) == 0) { - continue; /* process shares everything by default */ + if (nxt_conf_get_boolean(value)) { + init->isolation.clone.flags |= flag; } - - init->isolation.clone.flags |= flag; } return NXT_OK; diff --git a/src/nxt_port.c b/src/nxt_port.c index 9029353a..8d14a5e7 100644 --- a/src/nxt_port.c +++ b/src/nxt_port.c @@ -238,7 +238,6 @@ void nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { nxt_port_t *port; - nxt_process_t *process; nxt_runtime_t *rt; nxt_port_msg_new_port_t *new_port_msg; @@ -261,22 +260,13 @@ nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) return; } - process = nxt_runtime_process_get(rt, new_port_msg->pid); - if (nxt_slow_path(process == NULL)) { - return; - } - - port = nxt_port_new(task, new_port_msg->id, new_port_msg->pid, - new_port_msg->type); + port = nxt_runtime_process_port_create(task, rt, new_port_msg->pid, + new_port_msg->id, + new_port_msg->type); if (nxt_slow_path(port == NULL)) { - nxt_process_use(task, process, -1); return; } - nxt_process_port_add(task, process, port); - - nxt_process_use(task, process, -1); - nxt_fd_nonblocking(task, msg->fd); port->pair[0] = -1; @@ -286,10 +276,6 @@ nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) port->socket.task = task; - nxt_runtime_port_add(task, port); - - nxt_port_use(task, port, -1); - nxt_port_write_enable(task, port); msg->u.new_port = port; diff --git a/src/nxt_port_socket.c b/src/nxt_port_socket.c index 4edc423a..9c7da970 100644 --- a/src/nxt_port_socket.c +++ b/src/nxt_port_socket.c @@ -478,7 +478,8 @@ static nxt_buf_t * nxt_port_buf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b, size_t sent, nxt_bool_t mmap_mode) { - size_t size; + size_t size; + nxt_buf_t *next; while (b != NULL) { @@ -528,7 +529,9 @@ nxt_port_buf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b, nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent); - b = b->next; + next = b->next; + b->next = NULL; + b = next; } return b; @@ -796,7 +799,7 @@ static void nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port, nxt_port_recv_msg_t *msg) { - nxt_buf_t *b, *orig_b; + nxt_buf_t *b, *orig_b, *next; nxt_port_recv_msg_t *fmsg; if (nxt_slow_path(msg->size < sizeof(nxt_port_msg_t))) { @@ -915,11 +918,15 @@ fmsg_failed: */ if (msg->buf == b) { /* complete mmap buffers */ - for (; b != NULL; b = b->next) { + while (b != NULL) { nxt_debug(task, "complete buffer %p", b); nxt_work_queue_add(port->socket.read_work_queue, b->completion_handler, task, b, b->parent); + + next = b->next; + b->next = NULL; + b = next; } } @@ -964,7 +971,7 @@ static void nxt_port_error_handler(nxt_task_t *task, void *obj, void *data) { int use_delta; - nxt_buf_t *b; + nxt_buf_t *b, *next; nxt_port_t *port; nxt_work_queue_t *wq; nxt_port_send_msg_t *msg; @@ -986,7 +993,10 @@ nxt_port_error_handler(nxt_task_t *task, void *obj, void *data) nxt_queue_each(msg, &port->messages, nxt_port_send_msg_t, link) { - for (b = msg->buf; b != NULL; b = b->next) { + for (b = msg->buf; b != NULL; b = next) { + next = b->next; + b->next = NULL; + if (nxt_buf_is_sync(b)) { continue; } diff --git a/src/nxt_process.c b/src/nxt_process.c index 0cc9ccc4..b246a58c 100644 --- a/src/nxt_process.c +++ b/src/nxt_process.c @@ -207,7 +207,7 @@ nxt_process_create(nxt_task_t *task, nxt_process_t *process) goto fail; } -#if (NXT_HAVE_CLONE_NEWUSER) +#if (NXT_HAVE_CLONE && NXT_HAVE_CLONE_NEWUSER) if ((init->isolation.clone.flags & CLONE_NEWUSER) == CLONE_NEWUSER) { ret = nxt_clone_proc_map(task, pid, &init->isolation.clone); if (nxt_slow_path(ret != NXT_OK)) { @@ -723,16 +723,35 @@ free: nxt_int_t nxt_user_cred_set(nxt_task_t *task, nxt_user_cred_t *uc) { - nxt_debug(task, "user cred set: \"%s\" uid:%uL base gid:%uL", - uc->user, (uint64_t) uc->uid, (uint64_t) uc->base_gid); + nxt_debug(task, "user cred set: \"%s\" uid:%d base gid:%d", + uc->user, uc->uid, uc->base_gid); if (setgid(uc->base_gid) != 0) { + +#if (NXT_HAVE_CLONE) + if (nxt_errno == EINVAL) { + nxt_log(task, NXT_LOG_ERR, "The gid %d isn't valid in the " + "application namespace.", uc->base_gid); + return NXT_ERROR; + } +#endif + nxt_alert(task, "setgid(%d) failed %E", uc->base_gid, nxt_errno); return NXT_ERROR; } if (uc->gids != NULL) { if (setgroups(uc->ngroups, uc->gids) != 0) { + +#if (NXT_HAVE_CLONE) + if (nxt_errno == EINVAL) { + nxt_log(task, NXT_LOG_ERR, "The user \"%s\" (uid: %d) has " + "supplementary group ids not valid in the application " + "namespace.", uc->user, uc->uid); + return NXT_ERROR; + } +#endif + nxt_alert(task, "setgroups(%i) failed %E", uc->ngroups, nxt_errno); return NXT_ERROR; } @@ -747,6 +766,15 @@ nxt_user_cred_set(nxt_task_t *task, nxt_user_cred_t *uc) } if (setuid(uc->uid) != 0) { + +#if (NXT_HAVE_CLONE) + if (nxt_errno == EINVAL) { + nxt_log(task, NXT_LOG_ERR, "The uid %d (user \"%s\") isn't " + "valid in the application namespace.", uc->uid, uc->user); + return NXT_ERROR; + } +#endif + nxt_alert(task, "setuid(%d) failed %E", uc->uid, nxt_errno); return NXT_ERROR; } @@ -756,6 +784,17 @@ nxt_user_cred_set(nxt_task_t *task, nxt_user_cred_t *uc) void +nxt_process_use(nxt_task_t *task, nxt_process_t *process, int i) +{ + process->use_count += i; + + if (process->use_count == 0) { + nxt_runtime_process_release(task->thread->runtime, process); + } +} + + +void nxt_process_port_add(nxt_task_t *task, nxt_process_t *process, nxt_port_t *port) { nxt_assert(port->process == NULL); diff --git a/src/nxt_process.h b/src/nxt_process.h index df9ca038..d67573f1 100644 --- a/src/nxt_process.h +++ b/src/nxt_process.h @@ -114,6 +114,8 @@ NXT_EXPORT void nxt_process_port_add(nxt_task_t *task, nxt_process_t *process, nxt_process_type_t nxt_process_type(nxt_process_t *process); +void nxt_process_use(nxt_task_t *task, nxt_process_t *process, int i); + void nxt_process_close_ports(nxt_task_t *task, nxt_process_t *process); void nxt_process_connected_port_add(nxt_process_t *process, nxt_port_t *port); diff --git a/src/nxt_python_wsgi.c b/src/nxt_python_wsgi.c index a6d5f217..5bb2fb2c 100644 --- a/src/nxt_python_wsgi.c +++ b/src/nxt_python_wsgi.c @@ -86,6 +86,7 @@ static PyObject *nxt_py_input_read(nxt_py_input_t *self, PyObject *args); static PyObject *nxt_py_input_readline(nxt_py_input_t *self, PyObject *args); static PyObject *nxt_py_input_readlines(nxt_py_input_t *self, PyObject *args); +static void nxt_python_print_exception(void); static int nxt_python_write(nxt_python_run_ctx_t *ctx, PyObject *bytes); struct nxt_python_run_ctx_s { @@ -130,58 +131,17 @@ static PyMethodDef nxt_py_input_methods[] = { static PyTypeObject nxt_py_input_type = { PyVarObject_HEAD_INIT(NULL, 0) - "unit._input", /* tp_name */ - (int) sizeof(nxt_py_input_t), /* tp_basicsize */ - 0, /* tp_itemsize */ - (destructor) nxt_py_input_dealloc, /* tp_dealloc */ - 0, /* tp_print */ - 0, /* tp_getattr */ - 0, /* tp_setattr */ - 0, /* tp_compare */ - 0, /* tp_repr */ - 0, /* tp_as_number */ - 0, /* tp_as_sequence */ - 0, /* tp_as_mapping */ - 0, /* tp_hash */ - 0, /* tp_call */ - 0, /* tp_str */ - 0, /* tp_getattro */ - 0, /* tp_setattro */ - 0, /* tp_as_buffer */ - Py_TPFLAGS_DEFAULT, /* tp_flags */ - "unit input object.", /* tp_doc */ - 0, /* tp_traverse */ - 0, /* tp_clear */ - 0, /* tp_richcompare */ - 0, /* tp_weaklistoffset */ - 0, /* tp_iter */ - 0, /* tp_iternext */ - nxt_py_input_methods, /* tp_methods */ - 0, /* tp_members */ - 0, /* tp_getset */ - 0, /* tp_base */ - 0, /* tp_dict */ - 0, /* tp_descr_get */ - 0, /* tp_descr_set */ - 0, /* tp_dictoffset */ - 0, /* tp_init */ - 0, /* tp_alloc */ - 0, /* tp_new */ - 0, /* tp_free */ - 0, /* tp_is_gc */ - 0, /* tp_bases */ - 0, /* tp_mro - method resolution order */ - 0, /* tp_cache */ - 0, /* tp_subclasses */ - 0, /* tp_weaklist */ - 0, /* tp_del */ - 0, /* tp_version_tag */ -#if PY_MAJOR_VERSION == 3 && PY_MINOR_VERSION > 3 - 0, /* tp_finalize */ -#endif + + .tp_name = "unit._input", + .tp_basicsize = sizeof(nxt_py_input_t), + .tp_dealloc = (destructor) nxt_py_input_dealloc, + .tp_flags = Py_TPFLAGS_DEFAULT, + .tp_doc = "unit input object.", + .tp_methods = nxt_py_input_methods, }; +static PyObject *nxt_py_stderr_flush; static PyObject *nxt_py_application; static PyObject *nxt_py_start_resp_obj; static PyObject *nxt_py_write_obj; @@ -193,6 +153,7 @@ static wchar_t *nxt_py_home; static char *nxt_py_home; #endif +static PyThreadState *nxt_python_thread_state; static nxt_python_run_ctx_t *nxt_python_run_ctx; @@ -279,6 +240,21 @@ nxt_python_init(nxt_task_t *task, nxt_common_app_conf_t *conf) module = NULL; + obj = PySys_GetObject((char *) "stderr"); + if (nxt_slow_path(obj == NULL)) { + nxt_alert(task, "Python failed to get \"sys.stderr\" object"); + goto fail; + } + + nxt_py_stderr_flush = PyObject_GetAttrString(obj, "flush"); + if (nxt_slow_path(nxt_py_stderr_flush == NULL)) { + nxt_alert(task, "Python failed to get \"flush\" attribute of " + "\"sys.stderr\" object"); + goto fail; + } + + Py_DECREF(obj); + if (c->path.length > 0) { obj = PyString_FromStringAndSize((char *) c->path.start, c->path.length); @@ -349,7 +325,7 @@ nxt_python_init(nxt_task_t *task, nxt_common_app_conf_t *conf) module = PyImport_ImportModule(nxt_py_module); if (nxt_slow_path(module == NULL)) { nxt_alert(task, "Python failed to import module \"%s\"", nxt_py_module); - PyErr_Print(); + nxt_python_print_exception(); goto fail; } @@ -363,7 +339,6 @@ nxt_python_init(nxt_task_t *task, nxt_common_app_conf_t *conf) if (nxt_slow_path(PyCallable_Check(obj) == 0)) { nxt_alert(task, "\"application\" in module \"%s\" " "is not a callable object", nxt_py_module); - PyErr_Print(); goto fail; } @@ -382,10 +357,14 @@ nxt_python_init(nxt_task_t *task, nxt_common_app_conf_t *conf) goto fail; } + nxt_python_thread_state = PyEval_SaveThread(); + rc = nxt_unit_run(unit_ctx); nxt_unit_done(unit_ctx); + PyEval_RestoreThread(nxt_python_thread_state); + nxt_python_atexit(); exit(rc); @@ -407,22 +386,26 @@ static void nxt_python_request_handler(nxt_unit_request_info_t *req) { int rc; - PyObject *result, *iterator, *item, *args, *environ; + PyObject *environ, *args, *response, *iterator, *item; + PyObject *close, *result; nxt_python_run_ctx_t run_ctx = {-1, 0, NULL, req}; + PyEval_RestoreThread(nxt_python_thread_state); + environ = nxt_python_get_environ(&run_ctx); if (nxt_slow_path(environ == NULL)) { - nxt_unit_request_done(req, NXT_UNIT_ERROR); - - return; + rc = NXT_UNIT_ERROR; + goto done; } args = PyTuple_New(2); if (nxt_slow_path(args == NULL)) { + Py_DECREF(environ); + nxt_unit_req_error(req, "Python failed to create arguments tuple"); - nxt_unit_request_done(req, NXT_UNIT_ERROR); - return; + rc = NXT_UNIT_ERROR; + goto done; } PyTuple_SET_ITEM(args, 0, environ); @@ -432,103 +415,104 @@ nxt_python_request_handler(nxt_unit_request_info_t *req) nxt_python_run_ctx = &run_ctx; - result = PyObject_CallObject(nxt_py_application, args); + response = PyObject_CallObject(nxt_py_application, args); Py_DECREF(args); - if (nxt_slow_path(result == NULL)) { + if (nxt_slow_path(response == NULL)) { nxt_unit_req_error(req, "Python failed to call the application"); - PyErr_Print(); - - nxt_unit_request_done(req, NXT_UNIT_ERROR); - nxt_python_run_ctx = NULL; + nxt_python_print_exception(); - return; + rc = NXT_UNIT_ERROR; + goto done; } - item = NULL; - iterator = NULL; + /* Shortcut: avoid iterate over response string symbols. */ + if (PyBytes_Check(response)) { + rc = nxt_python_write(&run_ctx, response); - /* Shortcut: avoid iterate over result string symbols. */ - if (PyBytes_Check(result)) { + } else { + iterator = PyObject_GetIter(response); - rc = nxt_python_write(&run_ctx, result); - if (nxt_slow_path(rc != NXT_UNIT_OK)) { - goto fail; - } + if (nxt_fast_path(iterator != NULL)) { + rc = NXT_UNIT_OK; - } else { - iterator = PyObject_GetIter(result); + while (run_ctx.bytes_sent < run_ctx.content_length) { + item = PyIter_Next(iterator); - if (nxt_slow_path(iterator == NULL)) { - nxt_unit_req_error(req, "the application returned " - "not an iterable object"); + if (item == NULL) { + if (nxt_slow_path(PyErr_Occurred() != NULL)) { + nxt_unit_req_error(req, "Python failed to iterate over " + "the application response object"); + nxt_python_print_exception(); - goto fail; - } + rc = NXT_UNIT_ERROR; + } - while (run_ctx.bytes_sent < run_ctx.content_length - && (item = PyIter_Next(iterator))) - { - if (nxt_slow_path(!PyBytes_Check(item))) { - nxt_unit_req_error(req, "the application returned " - "not a bytestring object"); + break; + } - goto fail; - } + if (nxt_fast_path(PyBytes_Check(item))) { + rc = nxt_python_write(&run_ctx, item); - rc = nxt_python_write(&run_ctx, item); - if (nxt_slow_path(rc != NXT_UNIT_OK)) { - goto fail; - } + } else { + nxt_unit_req_error(req, "the application returned " + "not a bytestring object"); + rc = NXT_UNIT_ERROR; + } - Py_DECREF(item); - } + Py_DECREF(item); - Py_DECREF(iterator); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + break; + } + } - if (PyObject_HasAttrString(result, "close")) { - PyObject_CallMethod(result, (char *) "close", NULL); - } - } + Py_DECREF(iterator); - if (nxt_slow_path(PyErr_Occurred() != NULL)) { - nxt_unit_req_error(req, "an application error occurred"); - PyErr_Print(); - } + } else { + nxt_unit_req_error(req, + "the application returned not an iterable object"); + nxt_python_print_exception(); - nxt_unit_request_done(req, NXT_UNIT_OK); + rc = NXT_UNIT_ERROR; + } - Py_DECREF(result); + close = PyObject_GetAttrString(response, "close"); - nxt_python_run_ctx = NULL; + if (close != NULL) { + result = PyObject_CallFunction(close, NULL); + if (nxt_slow_path(result == NULL)) { + nxt_unit_req_error(req, "Python failed to call the close() " + "method of the application response"); + nxt_python_print_exception(); - return; + } else { + Py_DECREF(result); + } -fail: + Py_DECREF(close); - if (item != NULL) { - Py_DECREF(item); + } else { + PyErr_Clear(); + } } - if (iterator != NULL) { - Py_DECREF(iterator); - } + Py_DECREF(response); - if (PyObject_HasAttrString(result, "close")) { - PyObject_CallMethod(result, (char *) "close", NULL); - } +done: - Py_DECREF(result); - nxt_python_run_ctx = NULL; + nxt_python_thread_state = PyEval_SaveThread(); - nxt_unit_request_done(req, NXT_UNIT_ERROR); + nxt_python_run_ctx = NULL; + nxt_unit_request_done(req, rc); } static void nxt_python_atexit(void) { + Py_XDECREF(nxt_py_stderr_flush); Py_XDECREF(nxt_py_application); Py_XDECREF(nxt_py_start_resp_obj); Py_XDECREF(nxt_py_write_obj); @@ -767,7 +751,7 @@ nxt_python_add_sptr(nxt_python_run_ctx_t *ctx, const char *name, nxt_unit_req_error(ctx->req, "Python failed to create value string \"%.*s\"", (int) size, src); - PyErr_Print(); + nxt_python_print_exception(); return NXT_UNIT_ERROR; } @@ -802,7 +786,7 @@ nxt_python_add_str(nxt_python_run_ctx_t *ctx, const char *name, nxt_unit_req_error(ctx->req, "Python failed to create value string \"%.*s\"", (int) size, str); - PyErr_Print(); + nxt_python_print_exception(); return NXT_UNIT_ERROR; } @@ -1137,6 +1121,28 @@ nxt_py_input_readlines(nxt_py_input_t *self, PyObject *args) } +static void +nxt_python_print_exception(void) +{ + PyErr_Print(); + +#if PY_MAJOR_VERSION == 3 + /* The backtrace may be buffered in sys.stderr file object. */ + { + PyObject *result; + + result = PyObject_CallFunction(nxt_py_stderr_flush, NULL); + if (nxt_slow_path(result == NULL)) { + PyErr_Clear(); + return; + } + + Py_DECREF(result); + } +#endif +} + + static int nxt_python_write(nxt_python_run_ctx_t *ctx, PyObject *bytes) { diff --git a/src/nxt_router.c b/src/nxt_router.c index 28781600..b9f5d921 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -537,6 +537,7 @@ nxt_router_msg_cancel(nxt_task_t *task, nxt_msg_info_t *msg_info, for (b = msg_info->buf; b != NULL; b = next) { next = b->next; + b->next = NULL; b->completion_handler = msg_info->completion_handler; @@ -1172,8 +1173,8 @@ nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) nxt_queue_each(skcf, &new_socket_confs, nxt_socket_conf_t, link) { - if (skcf->pass != NULL) { - nxt_http_pass_cleanup(task, skcf->pass); + if (skcf->action != NULL) { + nxt_http_action_cleanup(task, skcf->action); } } nxt_queue_loop; @@ -1458,7 +1459,8 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, next = 0; for ( ;; ) { - application = nxt_conf_next_object_member(applications, &name, &next); + application = nxt_conf_next_object_member(applications, + &name, &next); if (application == NULL) { break; } @@ -1676,10 +1678,16 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, skcf->large_header_buffers = 4; skcf->body_buffer_size = 16 * 1024; skcf->max_body_size = 8 * 1024 * 1024; + skcf->proxy_header_buffer_size = 64 * 1024; + skcf->proxy_buffer_size = 4096; + skcf->proxy_buffers = 256; skcf->idle_timeout = 180 * 1000; skcf->header_read_timeout = 30 * 1000; skcf->body_read_timeout = 30 * 1000; skcf->send_timeout = 30 * 1000; + skcf->proxy_timeout = 60 * 1000; + skcf->proxy_send_timeout = 30 * 1000; + skcf->proxy_read_timeout = 30 * 1000; skcf->websocket_conf.max_frame_size = 1024 * 1024; skcf->websocket_conf.read_timeout = 60 * 1000; @@ -1729,12 +1737,12 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, skcf->router_conf->count++; if (lscf.pass.length != 0) { - skcf->pass = nxt_http_pass_create(task, tmcf, &lscf.pass); + skcf->action = nxt_http_action_create(task, tmcf, &lscf.pass); /* COMPATIBILITY: listener application. */ } else if (lscf.application.length > 0) { - skcf->pass = nxt_http_pass_application(task, tmcf, - &lscf.application); + skcf->action = nxt_http_pass_application(task, tmcf, + &lscf.application); } } } @@ -3070,8 +3078,8 @@ nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint) nxt_thread_spin_unlock(lock); if (skcf != NULL) { - if (skcf->pass != NULL) { - nxt_http_pass_cleanup(task, skcf->pass); + if (skcf->action != NULL) { + nxt_http_action_cleanup(task, skcf->action); } #if (NXT_TLS) @@ -3497,7 +3505,7 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data) { nxt_int_t ret; - nxt_buf_t *b; + nxt_buf_t *b, *next; nxt_port_t *app_port; nxt_unit_field_t *f; nxt_http_field_t *field; @@ -3580,6 +3588,7 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, field->hash = f->hash; field->skip = 0; + field->hopbyhop = 0; field->name_length = f->name_length; field->value_length = f->value_length; @@ -3612,17 +3621,20 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, } if (nxt_buf_mem_used_size(&b->mem) == 0) { + next = b->next; + b->next = NULL; + nxt_work_queue_add(&task->thread->engine->fast_work_queue, b->completion_handler, task, b, b->parent); - b = b->next; + b = next; } if (b != NULL) { nxt_buf_chain_add(&r->out, b); } - nxt_http_request_header_send(task, r, nxt_http_request_send_body); + nxt_http_request_header_send(task, r, nxt_http_request_send_body, NULL); if (r->websocket_handshake && r->status == NXT_HTTP_SWITCHING_PROTOCOLS) @@ -5056,6 +5068,7 @@ nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r, if (nxt_slow_path(buf == NULL)) { while (out != NULL) { buf = out->next; + out->next = NULL; out->completion_handler(task, out, out->parent); out = buf; } diff --git a/src/nxt_router.h b/src/nxt_router.h index ec18ff48..1517c14b 100644 --- a/src/nxt_router.h +++ b/src/nxt_router.h @@ -16,12 +16,12 @@ typedef struct nxt_http_request_s nxt_http_request_t; #include <nxt_application.h> -typedef struct nxt_http_pass_s nxt_http_pass_t; +typedef struct nxt_http_action_s nxt_http_action_t; typedef struct nxt_http_routes_s nxt_http_routes_t; typedef struct nxt_router_access_log_s nxt_router_access_log_t; -#define NXT_HTTP_PASS_ERROR ((nxt_http_pass_t *) -1) +#define NXT_HTTP_ACTION_ERROR ((nxt_http_action_t *) -1) typedef struct { @@ -154,7 +154,7 @@ typedef struct { nxt_queue_link_t link; nxt_router_conf_t *router_conf; - nxt_http_pass_t *pass; + nxt_http_action_t *action; /* * A listen socket time can be shorter than socket configuration life @@ -170,10 +170,17 @@ typedef struct { size_t large_header_buffers; size_t body_buffer_size; size_t max_body_size; + size_t proxy_header_buffer_size; + size_t proxy_buffer_size; + size_t proxy_buffers; + nxt_msec_t idle_timeout; nxt_msec_t header_read_timeout; nxt_msec_t body_read_timeout; nxt_msec_t send_timeout; + nxt_msec_t proxy_timeout; + nxt_msec_t proxy_send_timeout; + nxt_msec_t proxy_read_timeout; nxt_websocket_conf_t websocket_conf; diff --git a/src/nxt_runtime.c b/src/nxt_runtime.c index de41ba4d..096aabc4 100644 --- a/src/nxt_runtime.c +++ b/src/nxt_runtime.c @@ -37,10 +37,10 @@ static void nxt_runtime_thread_pool_destroy(nxt_task_t *task, nxt_runtime_t *rt, static void nxt_runtime_thread_pool_init(void); static void nxt_runtime_thread_pool_exit(nxt_task_t *task, void *obj, void *data); -static void nxt_runtime_process_destroy(nxt_runtime_t *rt, +static nxt_process_t *nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid); +static void nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process); -static nxt_process_t *nxt_runtime_process_remove_pid(nxt_runtime_t *rt, - nxt_pid_t pid); +static void nxt_runtime_port_add(nxt_task_t *task, nxt_port_t *port); nxt_int_t @@ -459,7 +459,7 @@ nxt_runtime_close_idle_connections(nxt_event_engine_t *engine) idle = &engine->idle_connections; - for (link = nxt_queue_head(idle); + for (link = nxt_queue_first(idle); link != nxt_queue_tail(idle); link = next) { @@ -658,6 +658,8 @@ nxt_runtime_thread_pool_exit(nxt_task_t *task, void *obj, void *data) if (tp == thread_pools[i]) { nxt_array_remove(rt->thread_pools, &thread_pools[i]); + nxt_free(tp); + if (n == 1) { /* The last thread pool. */ rt->continuation(task, 0); @@ -1296,11 +1298,15 @@ nxt_runtime_process_new(nxt_runtime_t *rt) } -static void -nxt_runtime_process_destroy(nxt_runtime_t *rt, nxt_process_t *process) +void +nxt_runtime_process_release(nxt_runtime_t *rt, nxt_process_t *process) { nxt_port_t *port; + if (process->registered == 1) { + nxt_runtime_process_remove(rt, process); + } + nxt_assert(process->use_count == 0); nxt_assert(process->registered == 0); @@ -1316,6 +1322,10 @@ nxt_runtime_process_destroy(nxt_runtime_t *rt, nxt_process_t *process) nxt_thread_mutex_destroy(&process->outgoing.mutex); nxt_thread_mutex_destroy(&process->cp_mutex); + if (process->init != NULL) { + nxt_mp_free(rt->mem_pool, process->init); + } + nxt_mp_free(rt->mem_pool, process); } @@ -1379,7 +1389,7 @@ nxt_runtime_process_find(nxt_runtime_t *rt, nxt_pid_t pid) } -nxt_process_t * +static nxt_process_t * nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid) { nxt_process_t *process; @@ -1489,13 +1499,13 @@ nxt_runtime_process_add(nxt_task_t *task, nxt_process_t *process) } -static nxt_process_t * -nxt_runtime_process_remove_pid(nxt_runtime_t *rt, nxt_pid_t pid) +static void +nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process) { - nxt_process_t *process; + nxt_pid_t pid; nxt_lvlhsh_query_t lhq; - process = NULL; + pid = process->pid; nxt_runtime_process_lhq_pid(&lhq, &pid); @@ -1521,40 +1531,49 @@ nxt_runtime_process_remove_pid(nxt_runtime_t *rt, nxt_pid_t pid) } nxt_thread_mutex_unlock(&rt->processes_mutex); - - return process; } -void -nxt_process_use(nxt_task_t *task, nxt_process_t *process, int i) +nxt_process_t * +nxt_runtime_process_first(nxt_runtime_t *rt, nxt_lvlhsh_each_t *lhe) { - nxt_runtime_t *rt; + nxt_lvlhsh_each_init(lhe, &lvlhsh_processes_proto); + + return nxt_runtime_process_next(rt, lhe); +} - process->use_count += i; - if (process->use_count == 0) { - rt = task->thread->runtime; +nxt_port_t * +nxt_runtime_process_port_create(nxt_task_t *task, nxt_runtime_t *rt, + nxt_pid_t pid, nxt_port_id_t id, nxt_process_type_t type) +{ + nxt_port_t *port; + nxt_process_t *process; - if (process->registered == 1) { - nxt_runtime_process_remove_pid(rt, process->pid); - } + process = nxt_runtime_process_get(rt, pid); + if (nxt_slow_path(process == NULL)) { + return NULL; + } - nxt_runtime_process_destroy(rt, process); + port = nxt_port_new(task, id, pid, type); + if (nxt_slow_path(port == NULL)) { + nxt_process_use(task, process, -1); + return NULL; } -} + nxt_process_port_add(task, process, port); -nxt_process_t * -nxt_runtime_process_first(nxt_runtime_t *rt, nxt_lvlhsh_each_t *lhe) -{ - nxt_lvlhsh_each_init(lhe, &lvlhsh_processes_proto); + nxt_process_use(task, process, -1); - return nxt_runtime_process_next(rt, lhe); + nxt_runtime_port_add(task, port); + + nxt_port_use(task, port, -1); + + return port; } -void +static void nxt_runtime_port_add(nxt_task_t *task, nxt_port_t *port) { nxt_int_t res; diff --git a/src/nxt_runtime.h b/src/nxt_runtime.h index 0791f8e7..d5b340b6 100644 --- a/src/nxt_runtime.h +++ b/src/nxt_runtime.h @@ -93,22 +93,20 @@ nxt_int_t nxt_runtime_thread_pool_create(nxt_thread_t *thr, nxt_runtime_t *rt, nxt_process_t *nxt_runtime_process_new(nxt_runtime_t *rt); -nxt_process_t *nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid); - void nxt_runtime_process_add(nxt_task_t *task, nxt_process_t *process); nxt_process_t *nxt_runtime_process_find(nxt_runtime_t *rt, nxt_pid_t pid); -void nxt_process_use(nxt_task_t *task, nxt_process_t *process, int i); - nxt_process_t *nxt_runtime_process_first(nxt_runtime_t *rt, nxt_lvlhsh_each_t *lhe); +void nxt_runtime_process_release(nxt_runtime_t *rt, nxt_process_t *process); + #define nxt_runtime_process_next(rt, lhe) \ nxt_lvlhsh_each(&rt->processes, lhe) - -void nxt_runtime_port_add(nxt_task_t *task, nxt_port_t *port); +nxt_port_t *nxt_runtime_process_port_create(nxt_task_t *task, nxt_runtime_t *rt, + nxt_pid_t pid, nxt_port_id_t id, nxt_process_type_t type); void nxt_runtime_port_remove(nxt_task_t *task, nxt_port_t *port); diff --git a/src/nxt_sendbuf.c b/src/nxt_sendbuf.c index 16fe4724..94f8e9eb 100644 --- a/src/nxt_sendbuf.c +++ b/src/nxt_sendbuf.c @@ -9,6 +9,8 @@ static nxt_bool_t nxt_sendbuf_copy(nxt_buf_mem_t *bm, nxt_buf_t *b, size_t *copied); +static nxt_buf_t *nxt_sendbuf_coalesce_completion(nxt_task_t *task, + nxt_work_queue_t *wq, nxt_buf_t *start); nxt_uint_t @@ -380,15 +382,11 @@ nxt_sendbuf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b) { while (b != NULL) { - nxt_prefetch(b->next); - if (!nxt_buf_is_sync(b) && nxt_buf_used_size(b) != 0) { break; } - nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent); - - b = b->next; + b = nxt_sendbuf_coalesce_completion(task, wq, b); } return b; @@ -399,10 +397,49 @@ void nxt_sendbuf_drain(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b) { while (b != NULL) { - nxt_prefetch(b->next); + b = nxt_sendbuf_coalesce_completion(task, wq, b); + } +} - nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent); - b = b->next; +static nxt_buf_t * +nxt_sendbuf_coalesce_completion(nxt_task_t *task, nxt_work_queue_t *wq, + nxt_buf_t *start) +{ + nxt_buf_t *b, *next, **last, *rest, **last_rest; + nxt_work_handler_t handler; + + rest = NULL; + last_rest = &rest; + last = &start->next; + b = start; + handler = b->completion_handler; + + for ( ;; ) { + next = b->next; + if (next == NULL) { + break; + } + + b->next = NULL; + b = next; + + if (!nxt_buf_is_sync(b) && nxt_buf_used_size(b) != 0) { + *last_rest = b; + break; + } + + if (handler == b->completion_handler) { + *last = b; + last = &b->next; + + } else { + *last_rest = b; + last_rest = &b->next; + } } + + nxt_work_queue_add(wq, handler, task, start, start->parent); + + return rest; } diff --git a/src/nxt_sockaddr.c b/src/nxt_sockaddr.c index 99cf54b4..57dfbfa6 100644 --- a/src/nxt_sockaddr.c +++ b/src/nxt_sockaddr.c @@ -23,11 +23,11 @@ static nxt_int_t nxt_job_sockaddr_inet_parse(nxt_job_sockaddr_parse_t *jbs); nxt_sockaddr_t * nxt_sockaddr_cache_alloc(nxt_event_engine_t *engine, nxt_listen_socket_t *ls) { - uint8_t hint; size_t size; + uint8_t hint; nxt_sockaddr_t *sa; - hint = (uint8_t) -1; + hint = NXT_EVENT_ENGINE_NO_MEM_HINT; size = offsetof(nxt_sockaddr_t, u) + ls->socklen + ls->address_length; sa = nxt_event_engine_mem_alloc(engine, &hint, size); @@ -56,7 +56,11 @@ nxt_sockaddr_cache_alloc(nxt_event_engine_t *engine, nxt_listen_socket_t *ls) void nxt_sockaddr_cache_free(nxt_event_engine_t *engine, nxt_conn_t *c) { - nxt_event_engine_mem_free(engine, &c->remote->cache_hint, c->remote); + nxt_sockaddr_t *sa; + + sa = c->remote; + + nxt_event_engine_mem_free(engine, sa->cache_hint, sa, 0); } diff --git a/src/nxt_socket.c b/src/nxt_socket.c index 95a298d8..2a809184 100644 --- a/src/nxt_socket.c +++ b/src/nxt_socket.c @@ -300,6 +300,28 @@ nxt_socket_shutdown(nxt_task_t *task, nxt_socket_t s, nxt_uint_t how) } +nxt_err_t +nxt_socket_error(nxt_socket_t s) +{ + int ret, err; + socklen_t len; + + err = 0; + len = sizeof(int); + /* + * Linux and BSDs return 0 and store a pending error in the err argument; + * Solaris returns -1 and sets the errno. + */ + ret = getsockopt(s, SOL_SOCKET, SO_ERROR, (void *) &err, &len); + + if (nxt_slow_path(ret == -1)) { + err = nxt_errno; + } + + return err; +} + + nxt_uint_t nxt_socket_error_level(nxt_err_t err) { @@ -315,6 +337,9 @@ nxt_socket_error_level(nxt_err_t err) case NXT_EHOSTUNREACH: return NXT_LOG_INFO; + case NXT_ECONNREFUSED: + return NXT_LOG_ERR; + default: return NXT_LOG_ALERT; } diff --git a/src/nxt_socket.h b/src/nxt_socket.h index 3f00648d..6a450f83 100644 --- a/src/nxt_socket.h +++ b/src/nxt_socket.h @@ -106,6 +106,7 @@ NXT_EXPORT nxt_int_t nxt_socket_connect(nxt_task_t *task, nxt_socket_t s, nxt_sockaddr_t *sa); NXT_EXPORT void nxt_socket_shutdown(nxt_task_t *task, nxt_socket_t s, nxt_uint_t how); +nxt_err_t nxt_socket_error(nxt_socket_t s); nxt_uint_t nxt_socket_error_level(nxt_err_t err); NXT_EXPORT nxt_int_t nxt_socketpair_create(nxt_task_t *task, diff --git a/src/nxt_string.c b/src/nxt_string.c index b89e9555..d567883f 100644 --- a/src/nxt_string.c +++ b/src/nxt_string.c @@ -188,10 +188,14 @@ nxt_strncasecmp(const u_char *s1, const u_char *s2, size_t length) nxt_int_t -nxt_memcasecmp(const u_char *s1, const u_char *s2, size_t length) +nxt_memcasecmp(const void *p1, const void *p2, size_t length) { - u_char c1, c2; - nxt_int_t n; + u_char c1, c2; + nxt_int_t n; + const u_char *s1, *s2; + + s1 = p1; + s2 = p2; while (length-- != 0) { c1 = *s1++; diff --git a/src/nxt_string.h b/src/nxt_string.h index 8d7b3b73..de498048 100644 --- a/src/nxt_string.h +++ b/src/nxt_string.h @@ -87,7 +87,7 @@ NXT_EXPORT u_char *nxt_cpystrn(u_char *dst, const u_char *src, size_t length); NXT_EXPORT nxt_int_t nxt_strcasecmp(const u_char *s1, const u_char *s2); NXT_EXPORT nxt_int_t nxt_strncasecmp(const u_char *s1, const u_char *s2, size_t length); -NXT_EXPORT nxt_int_t nxt_memcasecmp(const u_char *s1, const u_char *s2, +NXT_EXPORT nxt_int_t nxt_memcasecmp(const void *p1, const void *p2, size_t length); NXT_EXPORT u_char *nxt_memstrn(const u_char *s, const u_char *end, diff --git a/src/nxt_unit.c b/src/nxt_unit.c index 9ccd1fd9..0cf32916 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -1316,8 +1316,12 @@ nxt_unit_response_init(nxt_unit_request_info_t *req, nxt_unit_req_debug(req, "duplicate response init"); } + /* + * Each field name and value 0-terminated by libunit, + * this is the reason of '+ 2' below. + */ buf_size = sizeof(nxt_unit_response_t) - + max_fields_count * sizeof(nxt_unit_field_t) + + max_fields_count * (sizeof(nxt_unit_field_t) + 2) + max_fields_size; if (nxt_slow_path(req->response_buf != NULL)) { @@ -1391,8 +1395,12 @@ nxt_unit_response_realloc(nxt_unit_request_info_t *req, return NXT_UNIT_ERROR; } + /* + * Each field name and value 0-terminated by libunit, + * this is the reason of '+ 2' below. + */ buf_size = sizeof(nxt_unit_response_t) - + max_fields_count * sizeof(nxt_unit_field_t) + + max_fields_count * (sizeof(nxt_unit_field_t) + 2) + max_fields_size; nxt_unit_req_debug(req, "realloc %"PRIu32"", buf_size); @@ -1458,7 +1466,8 @@ nxt_unit_response_realloc(nxt_unit_request_info_t *req, goto fail; } - resp->piggyback_content_length = req->response->piggyback_content_length; + resp->piggyback_content_length = + req->response->piggyback_content_length; nxt_unit_sptr_set(&resp->piggyback_content, p); p = nxt_cpymem(p, nxt_unit_sptr_get(&req->response->piggyback_content), @@ -1953,7 +1962,8 @@ nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream, if (hdr != NULL) { m.mmap_msg.mmap_id = hdr->id; - m.mmap_msg.chunk_id = nxt_port_mmap_chunk_id(hdr, (u_char *) buf->start); + m.mmap_msg.chunk_id = nxt_port_mmap_chunk_id(hdr, + (u_char *) buf->start); } nxt_unit_debug(ctx, "#%"PRIu32": send mmap: (%d,%d,%d)", diff --git a/src/nxt_unit_field.h b/src/nxt_unit_field.h index d19db0f0..b07d3046 100644 --- a/src/nxt_unit_field.h +++ b/src/nxt_unit_field.h @@ -21,7 +21,8 @@ enum { /* Name and Value field aka HTTP header. */ struct nxt_unit_field_s { uint16_t hash; - uint8_t skip; + uint8_t skip:1; + uint8_t hopbyhop:1; uint8_t name_length; uint32_t value_length; diff --git a/src/nxt_websocket.c b/src/nxt_websocket.c index 9a099760..91002237 100644 --- a/src/nxt_websocket.c +++ b/src/nxt_websocket.c @@ -19,7 +19,7 @@ nxt_inline void nxt_hton16(uint8_t *b, uint16_t v) { b[0] = (v >> 8); - b[1] = (v & 0xFFu); + b[1] = (v & 0xFFu); } diff --git a/src/ruby/nxt_ruby.c b/src/ruby/nxt_ruby.c index ab9f7020..45d7a7aa 100644 --- a/src/ruby/nxt_ruby.c +++ b/src/ruby/nxt_ruby.c @@ -85,14 +85,16 @@ static nxt_int_t nxt_ruby_init(nxt_task_t *task, nxt_common_app_conf_t *conf) { int state, rc; - VALUE dummy, res; + VALUE res; nxt_unit_ctx_t *unit_ctx; nxt_unit_init_t ruby_unit_init; nxt_ruby_rack_init_t rack_init; + static char *argv[2] = { (char *) "NGINX_Unit", (char *) "-e0" }; + + RUBY_INIT_STACK ruby_init(); - Init_stack(&dummy); - ruby_init_loadpath(); + ruby_options(2, argv); ruby_script("NGINX_Unit"); rack_init.task = task; @@ -707,7 +709,8 @@ nxt_ruby_rack_result_body(VALUE result) } } else if (rb_respond_to(body, rb_intern("each"))) { - rb_iterate(rb_each, body, nxt_ruby_rack_result_body_each, 0); + rb_block_call(body, rb_intern("each"), 0, 0, + nxt_ruby_rack_result_body_each, 0); } else { nxt_unit_req_error(nxt_ruby_run_ctx.req, diff --git a/src/ruby/nxt_ruby_stream_io.c b/src/ruby/nxt_ruby_stream_io.c index 3f6cac89..fcfcf5dd 100644 --- a/src/ruby/nxt_ruby_stream_io.c +++ b/src/ruby/nxt_ruby_stream_io.c @@ -31,7 +31,8 @@ nxt_ruby_stream_io_input_init(void) rb_gc_register_address(&stream_io); rb_define_singleton_method(stream_io, "new", nxt_ruby_stream_io_new, 1); - rb_define_method(stream_io, "initialize", nxt_ruby_stream_io_initialize, -1); + rb_define_method(stream_io, "initialize", + nxt_ruby_stream_io_initialize, -1); rb_define_method(stream_io, "gets", nxt_ruby_stream_io_gets, 0); rb_define_method(stream_io, "each", nxt_ruby_stream_io_each, 0); rb_define_method(stream_io, "read", nxt_ruby_stream_io_read, -2); @@ -51,7 +52,8 @@ nxt_ruby_stream_io_error_init(void) rb_gc_register_address(&stream_io); rb_define_singleton_method(stream_io, "new", nxt_ruby_stream_io_new, 1); - rb_define_method(stream_io, "initialize", nxt_ruby_stream_io_initialize, -1); + rb_define_method(stream_io, "initialize", + nxt_ruby_stream_io_initialize, -1); rb_define_method(stream_io, "puts", nxt_ruby_stream_io_puts, -2); rb_define_method(stream_io, "write", nxt_ruby_stream_io_write, -2); rb_define_method(stream_io, "flush", nxt_ruby_stream_io_flush, 0); diff --git a/src/test/nxt_unit_websocket_chat.c b/src/test/nxt_unit_websocket_chat.c index ecc9a243..6e274722 100644 --- a/src/test/nxt_unit_websocket_chat.c +++ b/src/test/nxt_unit_websocket_chat.c @@ -104,10 +104,10 @@ ws_chat_root(nxt_unit_request_info_t *req) rc = nxt_unit_response_init(req, 200 /* Status code. */, 2 /* Number of response headers. */, - nxt_length(CONTENT_TYPE) + 1 - + nxt_length(TEXT_HTML) + 1 - + nxt_length(CONTENT_LENGTH) + 1 - + ws_chat_index_content_length_size + 1 + nxt_length(CONTENT_TYPE) + + nxt_length(TEXT_HTML) + + nxt_length(CONTENT_LENGTH) + + ws_chat_index_content_length_size + ws_chat_index_html_size); if (nxt_slow_path(rc != NXT_UNIT_OK)) { return rc; diff --git a/test/python/delayed/wsgi.py b/test/python/delayed/wsgi.py new file mode 100644 index 00000000..d25e2765 --- /dev/null +++ b/test/python/delayed/wsgi.py @@ -0,0 +1,25 @@ +import time + + +def application(environ, start_response): + parts = int(environ.get('HTTP_X_PARTS', 1)) + delay = int(environ.get('HTTP_X_DELAY', 0)) + + content_length = int(environ.get('CONTENT_LENGTH', 0)) + body = bytes(environ['wsgi.input'].read(content_length)) + + write = start_response('200', [('Content-Length', str(len(body)))]) + + if not body: + return [] + + step = int(len(body) / parts) + for i in range(0, len(body), step): + try: + write(body[i : i + step]) + except: + break + + time.sleep(delay) + + return [] diff --git a/test/python/errors_write/wsgi.py b/test/python/errors_write/wsgi.py index b1a9d2ee..148bce9e 100644 --- a/test/python/errors_write/wsgi.py +++ b/test/python/errors_write/wsgi.py @@ -1,5 +1,6 @@ def application(environ, start_response): environ['wsgi.errors'].write('Error in application.') + environ['wsgi.errors'].flush() start_response('200', [('Content-Length', '0')]) return [] diff --git a/test/python/iter_exception/wsgi.py b/test/python/iter_exception/wsgi.py new file mode 100644 index 00000000..66a09af7 --- /dev/null +++ b/test/python/iter_exception/wsgi.py @@ -0,0 +1,45 @@ +class application: + def __init__(self, environ, start_response): + self.environ = environ + self.start = start_response + + self.next = self.__next__ + + def __iter__(self): + self.__i = 0 + self._skip_level = int(self.environ.get('HTTP_X_SKIP', 0)) + self._not_skip_close = int(self.environ.get('HTTP_X_NOT_SKIP_CLOSE', 0)) + self._is_chunked = self.environ.get('HTTP_X_CHUNKED') + + headers = [(('Content-Length', '10'))] + if self._is_chunked is not None: + headers = [] + + if self._skip_level < 1: + raise Exception('first exception') + + write = self.start('200', headers) + + if self._skip_level < 2: + raise Exception('second exception') + + write(b'XXXXX') + + if self._skip_level < 3: + raise Exception('third exception') + + return self + + def __next__(self): + if self._skip_level < 4: + raise Exception('next exception') + + self.__i += 1 + if self.__i > 2: + raise StopIteration + + return b'X' + + def close(self): + if self._not_skip_close == 1: + raise Exception('close exception') diff --git a/test/python/log_body/wsgi.py b/test/python/log_body/wsgi.py new file mode 100644 index 00000000..9dcb1b0c --- /dev/null +++ b/test/python/log_body/wsgi.py @@ -0,0 +1,9 @@ +def application(environ, start_response): + content_length = int(environ.get('CONTENT_LENGTH', 0)) + body = bytes(environ['wsgi.input'].read(content_length)) + + environ['wsgi.errors'].write(body) + environ['wsgi.errors'].flush() + + start_response('200', [('Content-Length', '0')]) + return [] diff --git a/test/python/threading/wsgi.py b/test/python/threading/wsgi.py new file mode 100644 index 00000000..adaa2a37 --- /dev/null +++ b/test/python/threading/wsgi.py @@ -0,0 +1,33 @@ +import sys +import time +import threading + + +class Foo(threading.Thread): + num = 10 + + def __init__(self, x): + self.__x = x + threading.Thread.__init__(self) + + def log_index(self, index): + sys.stderr.write( + "(" + str(index) + ") Thread: " + str(self.__x) + "\n" + ) + sys.stderr.flush() + + def run(self): + i = 0 + for _ in range(3): + self.log_index(i) + i += 1 + time.sleep(1) + self.log_index(i) + i += 1 + + +def application(environ, start_response): + Foo(Foo.num).start() + Foo.num += 10 + start_response('200 OK', [('Content-Length', '0')]) + return [] diff --git a/test/ruby/constants/config.ru b/test/ruby/constants/config.ru new file mode 100644 index 00000000..e0951bf4 --- /dev/null +++ b/test/ruby/constants/config.ru @@ -0,0 +1,15 @@ +app = Proc.new do |env| + ['200', { + 'X-Copyright' => RUBY_COPYRIGHT, + 'X-Description' => RUBY_DESCRIPTION, + 'X-Engine' => RUBY_ENGINE, + 'X-Engine-Version' => RUBY_ENGINE_VERSION, + 'X-Patchlevel' => RUBY_PATCHLEVEL.to_s, + 'X-Platform' => RUBY_PLATFORM, + 'X-Release-Date' => RUBY_RELEASE_DATE, + 'X-Revision' => RUBY_REVISION.to_s, + 'X-Version' => RUBY_VERSION, + }, []] +end + +run app diff --git a/test/test_access_log.py b/test/test_access_log.py index 8dc87524..94f6e7bf 100644 --- a/test/test_access_log.py +++ b/test/test_access_log.py @@ -12,7 +12,11 @@ class TestAccessLog(TestApplicationPython): def load(self, script): super().load(script) - self.conf('"' + self.testdir + '/access.log"', 'access_log') + self.assertIn( + 'success', + self.conf('"' + self.testdir + '/access.log"', 'access_log'), + 'access_log configure', + ) def wait_for_record(self, pattern, name='access.log'): return super().wait_for_record(pattern, name) @@ -111,7 +115,9 @@ Connection: close addr = self.testdir + '/sock' - self.conf({"unix:" + addr: {"pass": "applications/empty"}}, 'listeners') + self.conf( + {"unix:" + addr: {"pass": "applications/empty"}}, 'listeners' + ) self.get(sock_type='unix', addr=addr) @@ -292,42 +298,5 @@ Connection: close 'change', ) - def test_access_log_reopen(self): - self.load('empty') - - log_path = self.testdir + '/access.log' - - self.assertTrue(self.waitforfiles(log_path), 'open') - - log_path_new = self.testdir + '/new.log' - - os.rename(log_path, log_path_new) - - self.get() - - self.assertIsNotNone( - self.wait_for_record(r'"GET / HTTP/1.1" 200 0 "-" "-"', 'new.log'), - 'rename new', - ) - self.assertFalse(os.path.isfile(log_path), 'rename old') - - with open(self.testdir + '/unit.pid', 'r') as f: - pid = f.read().rstrip() - - call(['kill', '-s', 'USR1', pid]) - - self.assertTrue(self.waitforfiles(log_path), 'reopen') - - self.get(url='/usr1') - - self.assertIsNotNone( - self.wait_for_record(r'"GET /usr1 HTTP/1.1" 200 0 "-" "-"'), - 'reopen 2', - ) - self.assertIsNone( - self.search_in_log(r'/usr1', 'new.log'), 'rename new 2' - ) - - if __name__ == '__main__': TestAccessLog.main() diff --git a/test/test_configuration.py b/test/test_configuration.py index 69647858..186e037d 100644 --- a/test/test_configuration.py +++ b/test/test_configuration.py @@ -321,7 +321,7 @@ class TestConfiguration(TestControl): } for a in range(999) }, - "listeners": {"*:7001": {"pass": "applications/app-1"}}, + "listeners": {"*:7080": {"pass": "applications/app-1"}}, } self.assertIn('success', self.conf(conf)) diff --git a/test/test_go_isolation.py b/test/test_go_isolation.py index 780c2b03..ee5ddf47 100644 --- a/test/test_go_isolation.py +++ b/test/test_go_isolation.py @@ -130,6 +130,38 @@ class TestGoIsolation(TestApplicationGo): self.assertEqual(obj['PID'], 1, 'pid of container is 1') + def test_isolation_namespace_false(self): + self.load('ns_inspect') + allns = list(self.available['features']['isolation'].keys()) + + remove_list = ['unprivileged_userns_clone', 'ipc', 'cgroup'] + allns = [ns for ns in allns if ns not in remove_list] + + namespaces = {} + for ns in allns: + if ns == 'user': + namespaces['credential'] = False + elif ns == 'mnt': + namespaces['mount'] = False + elif ns == 'net': + namespaces['network'] = False + elif ns == 'uts': + namespaces['uname'] = False + else: + namespaces[ns] = False + + self.conf_isolation({"namespaces": namespaces}) + + obj = self.isolation.parsejson(self.get()['body']) + + for ns in allns: + if ns.upper() in obj['NS']: + self.assertEqual( + obj['NS'][ns.upper()], + self.available['features']['isolation'][ns], + '%s match' % ns, + ) + if __name__ == '__main__': TestGoIsolation.main() diff --git a/test/test_java_websockets.py b/test/test_java_websockets.py index 3f2c0a8a..d75ee3a6 100644 --- a/test/test_java_websockets.py +++ b/test/test_java_websockets.py @@ -8,7 +8,7 @@ from unit.applications.websockets import TestApplicationWebsocket class TestJavaWebsockets(TestApplicationJava): prerequisites = {'modules': ['java']} - ws = TestApplicationWebsocket(True) + ws = TestApplicationWebsocket() def setUp(self): super().setUp() @@ -179,18 +179,14 @@ class TestJavaWebsockets(TestApplicationJava): ): # FAIL https://tools.ietf.org/html/rfc6455#section-4.2.1 self.load('websockets_mirror') - self.get() - - key = self.ws.key() resp = self.get( headers={ 'Host': 'localhost', 'Connection': 'Upgrade', - 'Sec-WebSocket-Key': key, + 'Sec-WebSocket-Key': self.ws.key(), 'Sec-WebSocket-Protocol': 'chat', 'Sec-WebSocket-Version': 13, }, - read_timeout=1, ) self.assertEqual(resp['status'], 400, 'upgrade absent') @@ -198,20 +194,17 @@ class TestJavaWebsockets(TestApplicationJava): def test_java_websockets_handshake_case_insensitive(self): self.load('websockets_mirror') - self.get() - - key = self.ws.key() - resp = self.get( + resp, sock, _ = self.ws.upgrade( headers={ 'Host': 'localhost', 'Upgrade': 'WEBSOCKET', 'Connection': 'UPGRADE', - 'Sec-WebSocket-Key': key, + 'Sec-WebSocket-Key': self.ws.key(), 'Sec-WebSocket-Protocol': 'chat', 'Sec-WebSocket-Version': 13, - }, - read_timeout=1, + } ) + sock.close() self.assertEqual(resp['status'], 101, 'status') @@ -219,18 +212,14 @@ class TestJavaWebsockets(TestApplicationJava): def test_java_websockets_handshake_connection_absent(self): # FAIL self.load('websockets_mirror') - self.get() - - key = self.ws.key() resp = self.get( headers={ 'Host': 'localhost', 'Upgrade': 'websocket', - 'Sec-WebSocket-Key': key, + 'Sec-WebSocket-Key': self.ws.key(), 'Sec-WebSocket-Protocol': 'chat', 'Sec-WebSocket-Version': 13, }, - read_timeout=1, ) self.assertEqual(resp['status'], 400, 'status') @@ -238,18 +227,14 @@ class TestJavaWebsockets(TestApplicationJava): def test_java_websockets_handshake_version_absent(self): self.load('websockets_mirror') - self.get() - - key = self.ws.key() resp = self.get( headers={ 'Host': 'localhost', 'Upgrade': 'websocket', 'Connection': 'Upgrade', - 'Sec-WebSocket-Key': key, + 'Sec-WebSocket-Key': self.ws.key(), 'Sec-WebSocket-Protocol': 'chat', }, - read_timeout=1, ) self.assertEqual(resp['status'], 426, 'status') @@ -258,8 +243,6 @@ class TestJavaWebsockets(TestApplicationJava): def test_java_websockets_handshake_key_invalid(self): self.load('websockets_mirror') - self.get() - resp = self.get( headers={ 'Host': 'localhost', @@ -269,7 +252,6 @@ class TestJavaWebsockets(TestApplicationJava): 'Sec-WebSocket-Protocol': 'chat', 'Sec-WebSocket-Version': 13, }, - read_timeout=1, ) self.assertEqual(resp['status'], 400, 'key length') @@ -284,7 +266,6 @@ class TestJavaWebsockets(TestApplicationJava): 'Sec-WebSocket-Protocol': 'chat', 'Sec-WebSocket-Version': 13, }, - read_timeout=1, ) self.assertEqual( @@ -294,19 +275,15 @@ class TestJavaWebsockets(TestApplicationJava): def test_java_websockets_handshake_method_invalid(self): self.load('websockets_mirror') - self.get() - - key = self.ws.key() resp = self.post( headers={ 'Host': 'localhost', 'Upgrade': 'websocket', 'Connection': 'Upgrade', - 'Sec-WebSocket-Key': key, + 'Sec-WebSocket-Key': self.ws.key(), 'Sec-WebSocket-Protocol': 'chat', 'Sec-WebSocket-Version': 13, }, - read_timeout=1, ) self.assertEqual(resp['status'], 400, 'status') @@ -314,20 +291,16 @@ class TestJavaWebsockets(TestApplicationJava): def test_java_websockets_handshake_http_10(self): self.load('websockets_mirror') - self.get() - - key = self.ws.key() resp = self.get( headers={ 'Host': 'localhost', 'Upgrade': 'websocket', 'Connection': 'Upgrade', - 'Sec-WebSocket-Key': key, + 'Sec-WebSocket-Key': self.ws.key(), 'Sec-WebSocket-Protocol': 'chat', 'Sec-WebSocket-Version': 13, }, http_10=True, - read_timeout=1, ) self.assertEqual(resp['status'], 400, 'status') @@ -335,20 +308,16 @@ class TestJavaWebsockets(TestApplicationJava): def test_java_websockets_handshake_uri_invalid(self): self.load('websockets_mirror') - self.get() - - key = self.ws.key() resp = self.get( headers={ 'Host': 'localhost', 'Upgrade': 'websocket', 'Connection': 'Upgrade', - 'Sec-WebSocket-Key': key, + 'Sec-WebSocket-Key': self.ws.key(), 'Sec-WebSocket-Protocol': 'chat', 'Sec-WebSocket-Version': 13, }, url='!', - read_timeout=1, ) self.assertEqual(resp['status'], 400, 'status') @@ -356,19 +325,17 @@ class TestJavaWebsockets(TestApplicationJava): def test_java_websockets_protocol_absent(self): self.load('websockets_mirror') - self.get() - key = self.ws.key() - resp = self.get( + resp, sock, _ = self.ws.upgrade( headers={ 'Host': 'localhost', 'Upgrade': 'websocket', 'Connection': 'Upgrade', 'Sec-WebSocket-Key': key, 'Sec-WebSocket-Version': 13, - }, - read_timeout=1, + } ) + sock.close() self.assertEqual(resp['status'], 101, 'status') self.assertEqual(resp['headers']['Upgrade'], 'websocket', 'upgrade') @@ -1165,7 +1132,7 @@ class TestJavaWebsockets(TestApplicationJava): sock.close() - # 7_3_1 # FAIL + # 7_3_1 _, sock, _ = self.ws.upgrade() diff --git a/test/test_node_websockets.py b/test/test_node_websockets.py index b24bee75..bb189552 100644 --- a/test/test_node_websockets.py +++ b/test/test_node_websockets.py @@ -198,16 +198,14 @@ class TestNodeWebsockets(TestApplicationNode): ): # FAIL https://tools.ietf.org/html/rfc6455#section-4.2.1 self.load('websockets/mirror') - key = self.ws.key() resp = self.get( headers={ 'Host': 'localhost', 'Connection': 'Upgrade', - 'Sec-WebSocket-Key': key, + 'Sec-WebSocket-Key': self.ws.key(), 'Sec-WebSocket-Protocol': 'chat', 'Sec-WebSocket-Version': 13, }, - read_timeout=1, ) self.assertEqual(resp['status'], 400, 'upgrade absent') @@ -215,18 +213,17 @@ class TestNodeWebsockets(TestApplicationNode): def test_node_websockets_handshake_case_insensitive(self): self.load('websockets/mirror') - key = self.ws.key() - resp = self.get( + resp, sock, _ = self.ws.upgrade( headers={ 'Host': 'localhost', 'Upgrade': 'WEBSOCKET', 'Connection': 'UPGRADE', - 'Sec-WebSocket-Key': key, + 'Sec-WebSocket-Key': self.ws.key(), 'Sec-WebSocket-Protocol': 'chat', 'Sec-WebSocket-Version': 13, - }, - read_timeout=1, + } ) + sock.close() self.assertEqual(resp['status'], 101, 'status') @@ -234,16 +231,14 @@ class TestNodeWebsockets(TestApplicationNode): def test_node_websockets_handshake_connection_absent(self): # FAIL self.load('websockets/mirror') - key = self.ws.key() resp = self.get( headers={ 'Host': 'localhost', 'Upgrade': 'websocket', - 'Sec-WebSocket-Key': key, + 'Sec-WebSocket-Key': self.ws.key(), 'Sec-WebSocket-Protocol': 'chat', 'Sec-WebSocket-Version': 13, }, - read_timeout=1, ) self.assertEqual(resp['status'], 400, 'status') @@ -251,16 +246,14 @@ class TestNodeWebsockets(TestApplicationNode): def test_node_websockets_handshake_version_absent(self): self.load('websockets/mirror') - key = self.ws.key() resp = self.get( headers={ 'Host': 'localhost', 'Upgrade': 'websocket', 'Connection': 'Upgrade', - 'Sec-WebSocket-Key': key, + 'Sec-WebSocket-Key': self.ws.key(), 'Sec-WebSocket-Protocol': 'chat', }, - read_timeout=1, ) self.assertEqual(resp['status'], 426, 'status') @@ -278,7 +271,6 @@ class TestNodeWebsockets(TestApplicationNode): 'Sec-WebSocket-Protocol': 'chat', 'Sec-WebSocket-Version': 13, }, - read_timeout=1, ) self.assertEqual(resp['status'], 400, 'key length') @@ -293,7 +285,6 @@ class TestNodeWebsockets(TestApplicationNode): 'Sec-WebSocket-Protocol': 'chat', 'Sec-WebSocket-Version': 13, }, - read_timeout=1, ) self.assertEqual( @@ -303,17 +294,15 @@ class TestNodeWebsockets(TestApplicationNode): def test_node_websockets_handshake_method_invalid(self): self.load('websockets/mirror') - key = self.ws.key() resp = self.post( headers={ 'Host': 'localhost', 'Upgrade': 'websocket', 'Connection': 'Upgrade', - 'Sec-WebSocket-Key': key, + 'Sec-WebSocket-Key': self.ws.key(), 'Sec-WebSocket-Protocol': 'chat', 'Sec-WebSocket-Version': 13, }, - read_timeout=1, ) self.assertEqual(resp['status'], 400, 'status') @@ -321,18 +310,16 @@ class TestNodeWebsockets(TestApplicationNode): def test_node_websockets_handshake_http_10(self): self.load('websockets/mirror') - key = self.ws.key() resp = self.get( headers={ 'Host': 'localhost', 'Upgrade': 'websocket', 'Connection': 'Upgrade', - 'Sec-WebSocket-Key': key, + 'Sec-WebSocket-Key': self.ws.key(), 'Sec-WebSocket-Protocol': 'chat', 'Sec-WebSocket-Version': 13, }, http_10=True, - read_timeout=1, ) self.assertEqual(resp['status'], 400, 'status') @@ -340,18 +327,16 @@ class TestNodeWebsockets(TestApplicationNode): def test_node_websockets_handshake_uri_invalid(self): self.load('websockets/mirror') - key = self.ws.key() resp = self.get( headers={ 'Host': 'localhost', 'Upgrade': 'websocket', 'Connection': 'Upgrade', - 'Sec-WebSocket-Key': key, + 'Sec-WebSocket-Key': self.ws.key(), 'Sec-WebSocket-Protocol': 'chat', 'Sec-WebSocket-Version': 13, }, url='!', - read_timeout=1, ) self.assertEqual(resp['status'], 400, 'status') @@ -360,16 +345,16 @@ class TestNodeWebsockets(TestApplicationNode): self.load('websockets/mirror') key = self.ws.key() - resp = self.get( + resp, sock, _ = self.ws.upgrade( headers={ 'Host': 'localhost', 'Upgrade': 'websocket', 'Connection': 'Upgrade', 'Sec-WebSocket-Key': key, 'Sec-WebSocket-Version': 13, - }, - read_timeout=1, + } ) + sock.close() self.assertEqual(resp['status'], 101, 'status') self.assertEqual(resp['headers']['Upgrade'], 'websocket', 'upgrade') @@ -1166,7 +1151,7 @@ class TestNodeWebsockets(TestApplicationNode): sock.close() - # 7_3_1 # FAIL + # 7_3_1 _, sock, _ = self.ws.upgrade() diff --git a/test/test_proxy.py b/test/test_proxy.py new file mode 100644 index 00000000..4697b88f --- /dev/null +++ b/test/test_proxy.py @@ -0,0 +1,622 @@ +import re +import time +import socket +import unittest +from unit.applications.lang.python import TestApplicationPython + + +class TestProxy(TestApplicationPython): + prerequisites = {'modules': ['python']} + + SERVER_PORT = 7999 + + def run_server(self): + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + + server_address = ('', self.SERVER_PORT) + sock.bind(server_address) + sock.listen(5) + + def recvall(sock): + buff_size = 4096 + data = b'' + while True: + part = sock.recv(buff_size) + data += part + if len(part) < buff_size: + break + return data + + req = b"""HTTP/1.1 200 OK +Content-Length: 10 + +""" + + while True: + connection, client_address = sock.accept() + + data = recvall(connection).decode() + + to_send = req + + m = re.search('X-Len: (\d+)', data) + if m: + to_send += b'X' * int(m.group(1)) + + connection.sendall(to_send) + + connection.close() + + def get_http10(self, *args, **kwargs): + return self.get(*args, http_10=True, **kwargs) + + def post_http10(self, *args, **kwargs): + return self.post(*args, http_10=True, **kwargs) + + def setUp(self): + super().setUp() + + self.run_process(self.run_server) + self.waitforsocket(self.SERVER_PORT) + + self.assertIn( + 'success', + self.conf( + { + "listeners": { + "*:7080": {"pass": "routes"}, + "*:7081": {"pass": "applications/mirror"}, + }, + "routes": [{"action": {"proxy": "http://127.0.0.1:7081"}}], + "applications": { + "mirror": { + "type": "python", + "processes": {"spare": 0}, + "path": self.current_dir + "/python/mirror", + "working_directory": self.current_dir + + "/python/mirror", + "module": "wsgi", + }, + "custom_header": { + "type": "python", + "processes": {"spare": 0}, + "path": self.current_dir + "/python/custom_header", + "working_directory": self.current_dir + + "/python/custom_header", + "module": "wsgi", + }, + "delayed": { + "type": "python", + "processes": {"spare": 0}, + "path": self.current_dir + "/python/delayed", + "working_directory": self.current_dir + + "/python/delayed", + "module": "wsgi", + }, + }, + } + ), + 'proxy initial configuration', + ) + + def test_proxy_http10(self): + for _ in range(10): + self.assertEqual(self.get_http10()['status'], 200, 'status') + + def test_proxy_chain(self): + self.assertIn( + 'success', + self.conf( + { + "listeners": { + "*:7080": {"pass": "routes/first"}, + "*:7081": {"pass": "routes/second"}, + "*:7082": {"pass": "routes/third"}, + "*:7083": {"pass": "routes/fourth"}, + "*:7084": {"pass": "routes/fifth"}, + "*:7085": {"pass": "applications/mirror"}, + }, + "routes": { + "first": [ + {"action": {"proxy": "http://127.0.0.1:7081"}} + ], + "second": [ + {"action": {"proxy": "http://127.0.0.1:7082"}} + ], + "third": [ + {"action": {"proxy": "http://127.0.0.1:7083"}} + ], + "fourth": [ + {"action": {"proxy": "http://127.0.0.1:7084"}} + ], + "fifth": [ + {"action": {"proxy": "http://127.0.0.1:7085"}} + ], + }, + "applications": { + "mirror": { + "type": "python", + "processes": {"spare": 0}, + "path": self.current_dir + "/python/mirror", + "working_directory": self.current_dir + + "/python/mirror", + "module": "wsgi", + } + }, + } + ), + 'proxy chain configuration', + ) + + self.assertEqual(self.get_http10()['status'], 200, 'status') + + def test_proxy_body(self): + payload = '0123456789' + for _ in range(10): + resp = self.post_http10(body=payload) + + self.assertEqual(resp['status'], 200, 'status') + self.assertEqual(resp['body'], payload, 'body') + + payload = 'X' * 4096 + for _ in range(10): + resp = self.post_http10(body=payload) + + self.assertEqual(resp['status'], 200, 'status') + self.assertEqual(resp['body'], payload, 'body') + + payload = 'X' * 4097 + for _ in range(10): + resp = self.post_http10(body=payload) + + self.assertEqual(resp['status'], 200, 'status') + self.assertEqual(resp['body'], payload, 'body') + + payload = 'X' * 4096 * 256 + for _ in range(10): + resp = self.post_http10(body=payload, read_buffer_size=4096 * 128) + + self.assertEqual(resp['status'], 200, 'status') + self.assertEqual(resp['body'], payload, 'body') + + payload = 'X' * 4096 * 257 + for _ in range(10): + resp = self.post_http10(body=payload, read_buffer_size=4096 * 128) + + self.assertEqual(resp['status'], 200, 'status') + self.assertEqual(resp['body'], payload, 'body') + + def test_proxy_parallel(self): + payload = 'X' * 4096 * 257 + buff_size = 4096 * 258 + + socks = [] + for i in range(10): + _, sock = self.post_http10( + body=payload + str(i), + start=True, + no_recv=True, + read_buffer_size=buff_size, + ) + socks.append(sock) + + for i in range(10): + resp = self.recvall(socks[i], buff_size=buff_size).decode() + socks[i].close() + + resp = self._resp_to_dict(resp) + + self.assertEqual(resp['status'], 200, 'status') + self.assertEqual(resp['body'], payload + str(i), 'body') + + def test_proxy_header(self): + self.assertIn( + 'success', + self.conf( + {"pass": "applications/custom_header"}, 'listeners/*:7081' + ), + 'custom_header configure', + ) + + header_value = 'blah' + self.assertEqual( + self.get_http10( + headers={'Host': 'localhost', 'Custom-Header': header_value} + )['headers']['Custom-Header'], + header_value, + 'custom header', + ) + + header_value = '(),/:;<=>?@[\]{}\t !#$%&\'*+-.^_`|~' + self.assertEqual( + self.get_http10( + headers={'Host': 'localhost', 'Custom-Header': header_value} + )['headers']['Custom-Header'], + header_value, + 'custom header 2', + ) + + header_value = 'X' * 4096 + self.assertEqual( + self.get_http10( + headers={'Host': 'localhost', 'Custom-Header': header_value} + )['headers']['Custom-Header'], + header_value, + 'custom header 3', + ) + + header_value = 'X' * 8191 + self.assertEqual( + self.get_http10( + headers={'Host': 'localhost', 'Custom-Header': header_value} + )['headers']['Custom-Header'], + header_value, + 'custom header 4', + ) + + header_value = 'X' * 8192 + self.assertEqual( + self.get_http10( + headers={'Host': 'localhost', 'Custom-Header': header_value} + )['status'], + 431, + 'custom header 5', + ) + + def test_proxy_fragmented(self): + _, sock = self.http( + b"""GET / HTT""", raw=True, start=True, no_recv=True + ) + + time.sleep(1) + + sock.sendall("P/1.0\r\nHost: localhos".encode()) + + time.sleep(1) + + sock.sendall("t\r\n\r\n".encode()) + + self.assertRegex( + self.recvall(sock).decode(), '200 OK', 'fragmented send' + ) + sock.close() + + def test_proxy_fragmented_close(self): + _, sock = self.http( + b"""GET / HTT""", raw=True, start=True, no_recv=True + ) + + time.sleep(1) + + sock.sendall("P/1.0\r\nHo".encode()) + + sock.close() + + def test_proxy_fragmented_body(self): + _, sock = self.http( + b"""GET / HTT""", raw=True, start=True, no_recv=True + ) + + time.sleep(1) + + sock.sendall("P/1.0\r\nHost: localhost\r\n".encode()) + sock.sendall("Content-Length: 30000\r\n".encode()) + + time.sleep(1) + + sock.sendall("\r\n".encode()) + sock.sendall(("X" * 10000).encode()) + + time.sleep(1) + + sock.sendall(("X" * 10000).encode()) + + time.sleep(1) + + sock.sendall(("X" * 10000).encode()) + + resp = self._resp_to_dict(self.recvall(sock).decode()) + sock.close() + + self.assertEqual(resp['status'], 200, 'status') + self.assertEqual(resp['body'], "X" * 30000, 'body') + + def test_proxy_fragmented_body_close(self): + _, sock = self.http( + b"""GET / HTT""", raw=True, start=True, no_recv=True + ) + + time.sleep(1) + + sock.sendall("P/1.0\r\nHost: localhost\r\n".encode()) + sock.sendall("Content-Length: 30000\r\n".encode()) + + time.sleep(1) + + sock.sendall("\r\n".encode()) + sock.sendall(("X" * 10000).encode()) + + sock.close() + + def test_proxy_nowhere(self): + self.assertIn( + 'success', + self.conf( + [{"action": {"proxy": "http://127.0.0.1:7082"}}], 'routes' + ), + 'proxy path changed', + ) + + self.assertEqual(self.get_http10()['status'], 502, 'status') + + def test_proxy_ipv6(self): + self.assertIn( + 'success', + self.conf( + { + "*:7080": {"pass": "routes"}, + "[::1]:7081": {'application': 'mirror'}, + }, + 'listeners', + ), + 'add ipv6 listener configure', + ) + + self.assertIn( + 'success', + self.conf([{"action": {"proxy": "http://[::1]:7081"}}], 'routes'), + 'proxy ipv6 configure', + ) + + self.assertEqual(self.get_http10()['status'], 200, 'status') + + def test_proxy_unix(self): + addr = self.testdir + '/sock' + + self.assertIn( + 'success', + self.conf( + { + "*:7080": {"pass": "routes"}, + "unix:" + addr: {'application': 'mirror'}, + }, + 'listeners', + ), + 'add unix listener configure', + ) + + self.assertIn( + 'success', + self.conf( + [{"action": {"proxy": 'http://unix:' + addr}}], 'routes' + ), + 'proxy unix configure', + ) + + self.assertEqual(self.get_http10()['status'], 200, 'status') + + def test_proxy_delayed(self): + self.assertIn( + 'success', + self.conf( + {"pass": "applications/delayed"}, 'listeners/*:7081' + ), + 'delayed configure', + ) + + body = '0123456789' * 1000 + resp = self.post_http10( + headers={ + 'Host': 'localhost', + 'Content-Type': 'text/html', + 'Content-Length': str(len(body)), + 'X-Parts': '2', + 'X-Delay': '1', + }, + body=body, + ) + + self.assertEqual(resp['status'], 200, 'status') + self.assertEqual(resp['body'], body, 'body') + + resp = self.post_http10( + headers={ + 'Host': 'localhost', + 'Content-Type': 'text/html', + 'Content-Length': str(len(body)), + 'X-Parts': '2', + 'X-Delay': '1', + }, + body=body, + ) + + self.assertEqual(resp['status'], 200, 'status') + self.assertEqual(resp['body'], body, 'body') + + def test_proxy_delayed_close(self): + self.assertIn( + 'success', + self.conf( + {"pass": "applications/delayed"}, 'listeners/*:7081' + ), + 'delayed configure', + ) + + _, sock = self.post_http10( + headers={ + 'Host': 'localhost', + 'Content-Type': 'text/html', + 'Content-Length': '10000', + 'X-Parts': '3', + 'X-Delay': '1', + }, + body='0123456789' * 1000, + start=True, + no_recv=True, + ) + + self.assertRegex( + sock.recv(100).decode(), '200 OK', 'first' + ) + sock.close() + + _, sock = self.post_http10( + headers={ + 'Host': 'localhost', + 'Content-Type': 'text/html', + 'Content-Length': '10000', + 'X-Parts': '3', + 'X-Delay': '1', + }, + body='0123456789' * 1000, + start=True, + no_recv=True, + ) + + self.assertRegex( + sock.recv(100).decode(), '200 OK', 'second' + ) + sock.close() + + @unittest.skip('not yet') + def test_proxy_content_length(self): + self.assertIn( + 'success', + self.conf( + [ + { + "action": { + "proxy": "http://127.0.0.1:" + + str(self.SERVER_PORT) + } + } + ], + 'routes', + ), + 'proxy backend configure', + ) + + resp = self.get_http10() + self.assertEqual(len(resp['body']), 0, 'body lt Content-Length 0') + + resp = self.get_http10(headers={'Host': 'localhost', 'X-Len': '5'}) + self.assertEqual(len(resp['body']), 5, 'body lt Content-Length 5') + + resp = self.get_http10(headers={'Host': 'localhost', 'X-Len': '9'}) + self.assertEqual(len(resp['body']), 9, 'body lt Content-Length 9') + + resp = self.get_http10(headers={'Host': 'localhost', 'X-Len': '11'}) + self.assertEqual(len(resp['body']), 10, 'body gt Content-Length 11') + + resp = self.get_http10(headers={'Host': 'localhost', 'X-Len': '15'}) + self.assertEqual(len(resp['body']), 10, 'body gt Content-Length 15') + + def test_proxy_invalid(self): + self.assertIn( + 'error', + self.conf([{"action": {"proxy": 'blah'}}], 'routes'), + 'proxy invalid', + ) + self.assertIn( + 'error', + self.conf([{"action": {"proxy": '/blah'}}], 'routes'), + 'proxy invalid 2', + ) + self.assertIn( + 'error', + self.conf([{"action": {"proxy": 'unix:/blah'}}], 'routes'), + 'proxy unix invalid 2', + ) + self.assertIn( + 'error', + self.conf([{"action": {"proxy": 'http://blah'}}], 'routes'), + 'proxy unix invalid 3', + ) + self.assertIn( + 'error', + self.conf([{"action": {"proxy": 'http://127.0.0.1'}}], 'routes'), + 'proxy ipv4 invalid', + ) + self.assertIn( + 'error', + self.conf([{"action": {"proxy": 'http://127.0.0.1:'}}], 'routes'), + 'proxy ipv4 invalid 2', + ) + self.assertIn( + 'error', + self.conf( + [{"action": {"proxy": 'http://127.0.0.1:blah'}}], 'routes' + ), + 'proxy ipv4 invalid 3', + ) + self.assertIn( + 'error', + self.conf( + [{"action": {"proxy": 'http://127.0.0.1:-1'}}], 'routes' + ), + 'proxy ipv4 invalid 4', + ) + self.assertIn( + 'error', + self.conf( + [{"action": {"proxy": 'http://127.0.0.1:7080b'}}], 'routes' + ), + 'proxy ipv4 invalid 5', + ) + self.assertIn( + 'error', + self.conf( + [{"action": {"proxy": 'http://[]'}}], 'routes' + ), + 'proxy ipv6 invalid', + ) + self.assertIn( + 'error', + self.conf( + [{"action": {"proxy": 'http://[]:7080'}}], 'routes' + ), + 'proxy ipv6 invalid 2', + ) + self.assertIn( + 'error', + self.conf( + [{"action": {"proxy": 'http://[:]:7080'}}], 'routes' + ), + 'proxy ipv6 invalid 3', + ) + self.assertIn( + 'error', + self.conf( + [{"action": {"proxy": 'http://[::7080'}}], 'routes' + ), + 'proxy ipv6 invalid 4', + ) + + @unittest.skip('not yet') + def test_proxy_loop(self): + self.conf( + { + "listeners": { + "*:7080": {"pass": "routes"}, + "*:7081": {"pass": "applications/mirror"}, + "*:7082": {"pass": "routes"}, + }, + "routes": [{"action": {"proxy": "http://127.0.0.1:7082"}}], + "applications": { + "mirror": { + "type": "python", + "processes": {"spare": 0}, + "path": self.current_dir + "/python/mirror", + "working_directory": self.current_dir + + "/python/mirror", + "module": "wsgi", + }, + }, + } + ) + + self.get_http10(no_recv=True) + +if __name__ == '__main__': + TestProxy.main() diff --git a/test/test_python_application.py b/test/test_python_application.py index 5b6e2089..ae8f01ca 100644 --- a/test/test_python_application.py +++ b/test/test_python_application.py @@ -1,3 +1,4 @@ +import re import time import unittest from unit.applications.lang.python import TestApplicationPython @@ -6,6 +7,10 @@ from unit.applications.lang.python import TestApplicationPython class TestPythonApplication(TestApplicationPython): prerequisites = {'modules': ['python']} + def findall(self, pattern): + with open(self.testdir + '/unit.log', 'r', errors='ignore') as f: + return re.findall(pattern, f.read()) + def test_python_application_variables(self): self.load('variables') @@ -130,6 +135,18 @@ class TestPythonApplication(TestApplicationPython): self.get()['headers']['Server-Port'], '7080', 'Server-Port header' ) + @unittest.skip('not yet') + def test_python_application_working_directory_invalid(self): + self.load('empty') + + self.assertIn( + 'success', + self.conf('"/blah"', 'applications/empty/working_directory'), + 'configure invalid working_directory', + ) + + self.assertEqual(self.get()['status'], 500, 'status') + def test_python_application_204_transfer_encoding(self): self.load('204_no_content') @@ -495,6 +512,171 @@ Connection: close self.assertEqual(self.get()['body'], '0123456789', 'write') + def test_python_application_threading(self): + """wait_for_record() timeouts after 5s while every thread works at + least 3s. So without releasing GIL test should fail. + """ + + self.load('threading') + + for _ in range(10): + self.get(no_recv=True) + + self.assertIsNotNone( + self.wait_for_record(r'\(5\) Thread: 100'), 'last thread finished' + ) + + def test_python_application_iter_exception(self): + self.load('iter_exception') + + # Default request doesn't lead to the exception. + + resp = self.get( + headers={ + 'Host': 'localhost', + 'X-Skip': '9', + 'X-Chunked': '1', + 'Connection': 'close', + } + ) + self.assertEqual(resp['status'], 200, 'status') + self.assertEqual(resp['body'][-5:], '0\r\n\r\n', 'body') + + # Exception before start_response(). + + self.assertEqual(self.get()['status'], 503, 'error') + + self.assertIsNotNone(self.wait_for_record(r'Traceback'), 'traceback') + self.assertIsNotNone( + self.wait_for_record(r'raise Exception\(\'first exception\'\)'), + 'first exception raise', + ) + self.assertEqual( + len(self.findall(r'Traceback')), 1, 'traceback count 1' + ) + + # Exception after start_response(), before first write(). + + self.assertEqual( + self.get( + headers={ + 'Host': 'localhost', + 'X-Skip': '1', + 'Connection': 'close', + } + )['status'], + 503, + 'error 2', + ) + + self.assertIsNotNone( + self.wait_for_record(r'raise Exception\(\'second exception\'\)'), + 'exception raise second', + ) + self.assertEqual( + len(self.findall(r'Traceback')), 2, 'traceback count 2' + ) + + # Exception after first write(), before first __next__(). + + _, sock = self.get( + headers={ + 'Host': 'localhost', + 'X-Skip': '2', + 'Connection': 'keep-alive', + }, + start=True, + ) + + self.assertIsNotNone( + self.wait_for_record(r'raise Exception\(\'third exception\'\)'), + 'exception raise third', + ) + self.assertEqual( + len(self.findall(r'Traceback')), 3, 'traceback count 3' + ) + + self.assertDictEqual(self.get(sock=sock), {}, 'closed connection') + + # Exception after first write(), before first __next__(), + # chunked (incomplete body). + + resp = self.get( + headers={ + 'Host': 'localhost', + 'X-Skip': '2', + 'X-Chunked': '1', + 'Connection': 'close', + } + ) + if 'body' in resp: + self.assertNotEqual( + resp['body'][-5:], '0\r\n\r\n', 'incomplete body' + ) + self.assertEqual( + len(self.findall(r'Traceback')), 4, 'traceback count 4' + ) + + # Exception in __next__(). + + _, sock = self.get( + headers={ + 'Host': 'localhost', + 'X-Skip': '3', + 'Connection': 'keep-alive', + }, + start=True, + ) + + self.assertIsNotNone( + self.wait_for_record(r'raise Exception\(\'next exception\'\)'), + 'exception raise next', + ) + self.assertEqual( + len(self.findall(r'Traceback')), 5, 'traceback count 5' + ) + + self.assertDictEqual(self.get(sock=sock), {}, 'closed connection 2') + + # Exception in __next__(), chunked (incomplete body). + + resp = self.get( + headers={ + 'Host': 'localhost', + 'X-Skip': '3', + 'X-Chunked': '1', + 'Connection': 'close', + } + ) + if 'body' in resp: + self.assertNotEqual( + resp['body'][-5:], '0\r\n\r\n', 'incomplete body 2' + ) + self.assertEqual( + len(self.findall(r'Traceback')), 6, 'traceback count 6' + ) + + # Exception before start_response() and in close(). + + self.assertEqual( + self.get( + headers={ + 'Host': 'localhost', + 'X-Not-Skip-Close': '1', + 'Connection': 'close', + } + )['status'], + 503, + 'error', + ) + + self.assertIsNotNone( + self.wait_for_record(r'raise Exception\(\'close exception\'\)'), + 'exception raise close', + ) + self.assertEqual( + len(self.findall(r'Traceback')), 8, 'traceback count 8' + ) if __name__ == '__main__': TestPythonApplication.main() diff --git a/test/test_routing.py b/test/test_routing.py index 20e3a1c4..2960f978 100644 --- a/test/test_routing.py +++ b/test/test_routing.py @@ -8,34 +8,38 @@ class TestRouting(TestApplicationProto): def setUp(self): super().setUp() - self.conf( - { - "listeners": {"*:7080": {"pass": "routes"}}, - "routes": [ - { - "match": {"method": "GET"}, - "action": {"pass": "applications/empty"}, - } - ], - "applications": { - "empty": { - "type": "python", - "processes": {"spare": 0}, - "path": self.current_dir + '/python/empty', - "working_directory": self.current_dir - + '/python/empty', - "module": "wsgi", - }, - "mirror": { - "type": "python", - "processes": {"spare": 0}, - "path": self.current_dir + '/python/mirror', - "working_directory": self.current_dir - + '/python/mirror', - "module": "wsgi", + self.assertIn( + 'success', + self.conf( + { + "listeners": {"*:7080": {"pass": "routes"}}, + "routes": [ + { + "match": {"method": "GET"}, + "action": {"pass": "applications/empty"}, + } + ], + "applications": { + "empty": { + "type": "python", + "processes": {"spare": 0}, + "path": self.current_dir + '/python/empty', + "working_directory": self.current_dir + + '/python/empty', + "module": "wsgi", + }, + "mirror": { + "type": "python", + "processes": {"spare": 0}, + "path": self.current_dir + '/python/mirror', + "working_directory": self.current_dir + + '/python/mirror', + "module": "wsgi", + }, }, - }, - } + } + ), + 'routing configure', ) def route(self, route): @@ -897,31 +901,75 @@ class TestRouting(TestApplicationProto): 'success', self.route( { - "match": {"uri": "/"}, + "match": {"uri": ["/blah", "/slash/"]}, "action": {"pass": "applications/empty"}, } ), 'match uri positive configure', ) - self.assertEqual(self.get()['status'], 200, 'match uri positive') + self.assertEqual(self.get()['status'], 404, 'match uri positive') + self.assertEqual( + self.get(url='/blah')['status'], 200, 'match uri positive blah' + ) + self.assertEqual( + self.get(url='/blah#foo')['status'], + 200, + 'match uri positive #foo', + ) + self.assertEqual( + self.get(url='/blah?var')['status'], 200, 'match uri args' + ) + self.assertEqual( + self.get(url='//blah')['status'], 200, 'match uri adjacent slashes' + ) self.assertEqual( - self.get(url='/blah')['status'], 404, 'match uri positive blah' + self.get(url='/slash/foo/../')['status'], + 200, + 'match uri relative path', + ) + self.assertEqual( + self.get(url='/slash/./')['status'], + 200, + 'match uri relative path 2', + ) + self.assertEqual( + self.get(url='/slash//.//')['status'], + 200, + 'match uri adjacent slashes 2', + ) + self.assertEqual( + self.get(url='/%')['status'], 400, 'match uri percent' ) self.assertEqual( - self.get(url='/#blah')['status'], 200, 'match uri positive #blah' + self.get(url='/%1')['status'], 400, 'match uri percent digit' ) self.assertEqual( - self.get(url='/?var')['status'], 200, 'match uri params' + self.get(url='/%A')['status'], 400, 'match uri percent letter' ) self.assertEqual( - self.get(url='//')['status'], 200, 'match uri adjacent slashes' + self.get(url='/slash/.?args')['status'], 200, 'match uri dot args' ) self.assertEqual( - self.get(url='/blah/../')['status'], 200, 'match uri relative path' + self.get(url='/slash/.#frag')['status'], 200, 'match uri dot frag' ) self.assertEqual( - self.get(url='/./')['status'], 200, 'match uri relative path' + self.get(url='/slash/foo/..?args')['status'], + 200, + 'match uri dot dot args', + ) + self.assertEqual( + self.get(url='/slash/foo/..#frag')['status'], + 200, + 'match uri dot dot frag', + ) + self.assertEqual( + self.get(url='/slash/.')['status'], 200, 'match uri trailing dot' + ) + self.assertEqual( + self.get(url='/slash/foo/..')['status'], + 200, + 'match uri trailing dot dot', ) def test_routes_match_uri_case_sensitive(self): diff --git a/test/test_ruby_application.py b/test/test_ruby_application.py index 6f82ae81..bbb252d7 100644 --- a/test/test_ruby_application.py +++ b/test/test_ruby_application.py @@ -347,6 +347,29 @@ class TestRubyApplication(TestApplicationRuby): self.assertEqual(resp['body'], '0123456789', 'keep-alive 2') + def test_ruby_application_constants(self): + self.load('constants') + + resp = self.get() + + self.assertEqual(resp['status'], 200, 'status') + + headers = resp['headers'] + self.assertGreater(len(headers['X-Copyright']), 0, 'RUBY_COPYRIGHT') + self.assertGreater( + len(headers['X-Description']), 0, 'RUBY_DESCRIPTION' + ) + self.assertGreater(len(headers['X-Engine']), 0, 'RUBY_ENGINE') + self.assertGreater( + len(headers['X-Engine-Version']), 0, 'RUBY_ENGINE_VERSION' + ) + self.assertGreater(len(headers['X-Patchlevel']), 0, 'RUBY_PATCHLEVEL') + self.assertGreater(len(headers['X-Platform']), 0, 'RUBY_PLATFORM') + self.assertGreater( + len(headers['X-Release-Date']), 0, 'RUBY_RELEASE_DATE' + ) + self.assertGreater(len(headers['X-Revision']), 0, 'RUBY_REVISION') + self.assertGreater(len(headers['X-Version']), 0, 'RUBY_VERSION') if __name__ == '__main__': TestRubyApplication.main() diff --git a/test/test_static.py b/test/test_static.py index 4bdd83ed..f9dcb7dd 100644 --- a/test/test_static.py +++ b/test/test_static.py @@ -40,6 +40,12 @@ class TestStatic(TestApplicationProto): ) self.assertEqual(self.get(url='/')['body'], '0123456789', 'index 2') self.assertEqual( + self.get(url='/?blah')['body'], '0123456789', 'index vars' + ) + self.assertEqual( + self.get(url='/#blah')['body'], '0123456789', 'index anchor' + ) + self.assertEqual( self.get(url='/dir/')['status'], 404, 'index not found' ) diff --git a/test/test_usr1.py b/test/test_usr1.py new file mode 100644 index 00000000..dd9292c7 --- /dev/null +++ b/test/test_usr1.py @@ -0,0 +1,92 @@ +import os +import unittest +from subprocess import call +from unit.applications.lang.python import TestApplicationPython + + +class TestUSR1(TestApplicationPython): + prerequisites = {'modules': ['python']} + + def test_usr1_access_log(self): + self.load('empty') + + log_path = self.testdir + '/access.log' + + self.assertIn( + 'success', + self.conf('"' + log_path + '"', 'access_log'), + 'access log configure', + ) + + self.assertTrue(self.waitforfiles(log_path), 'open') + + log_path_new = self.testdir + '/new.log' + + os.rename(log_path, log_path_new) + + self.get() + + self.assertIsNotNone( + self.wait_for_record(r'"GET / HTTP/1.1" 200 0 "-" "-"', 'new.log'), + 'rename new', + ) + self.assertFalse(os.path.isfile(log_path), 'rename old') + + with open(self.testdir + '/unit.pid', 'r') as f: + pid = f.read().rstrip() + + call(['kill', '-s', 'USR1', pid]) + + self.assertTrue(self.waitforfiles(log_path), 'reopen') + + self.get(url='/usr1') + + self.assertIsNotNone( + self.wait_for_record( + r'"GET /usr1 HTTP/1.1" 200 0 "-" "-"', 'access.log' + ), + 'reopen 2', + ) + self.assertIsNone( + self.search_in_log(r'/usr1', 'new.log'), 'rename new 2' + ) + + @unittest.skip('not yet') + def test_usr1_unit_log(self): + self.load('log_body') + + log_path = self.testdir + '/unit.log' + log_path_new = self.testdir + '/new.log' + + os.rename(log_path, log_path_new) + + body = 'body_for_a_log_new' + self.post(body=body) + + self.assertIsNotNone( + self.wait_for_record(body, 'new.log'), 'rename new' + ) + self.assertFalse(os.path.isfile(log_path), 'rename old') + + with open(self.testdir + '/unit.pid', 'r') as f: + pid = f.read().rstrip() + + call(['kill', '-s', 'USR1', pid]) + + self.assertTrue(self.waitforfiles(log_path), 'reopen') + + body = 'body_for_a_log_unit' + self.post(body=body) + + self.assertIsNotNone(self.wait_for_record(body), 'rename new') + self.assertIsNone(self.search_in_log(body, 'new.log'), 'rename new 2') + + # merge two log files into unit.log to check alerts + + with open(log_path, 'w') as unit_log, \ + open(log_path_new, 'r') as new_log: + unit_log.write(new_log.read()) + + +if __name__ == '__main__': + TestUSR1.main() diff --git a/test/unit/applications/websockets.py b/test/unit/applications/websockets.py index 50ff2797..ef16f433 100644 --- a/test/unit/applications/websockets.py +++ b/test/unit/applications/websockets.py @@ -1,3 +1,4 @@ +import re import random import base64 import struct @@ -30,25 +31,37 @@ class TestApplicationWebsocket(TestApplicationProto): sha1 = hashlib.sha1((key + GUID).encode()).digest() return base64.b64encode(sha1).decode() - def upgrade(self): - key = self.key() + def upgrade(self, headers=None): + key = None - if self.preinit: - self.get() - - resp, sock = self.get( - headers={ + if headers is None: + key = self.key() + headers = { 'Host': 'localhost', 'Upgrade': 'websocket', 'Connection': 'Upgrade', 'Sec-WebSocket-Key': key, 'Sec-WebSocket-Protocol': 'chat', 'Sec-WebSocket-Version': 13, - }, - read_timeout=1, + } + + _, sock = self.get( + headers=headers, + no_recv=True, start=True, ) + resp = '' + while select.select([sock], [], [], 30)[0]: + resp += sock.recv(4096).decode() + + if ( + re.search('101 Switching Protocols', resp) + and resp[-4:] == '\r\n\r\n' + ): + resp = self._resp_to_dict(resp) + break + return (resp, sock, key) def apply_mask(self, data, mask): diff --git a/test/unit/http.py b/test/unit/http.py index 82a6bd6a..c7e3e36d 100644 --- a/test/unit/http.py +++ b/test/unit/http.py @@ -1,4 +1,5 @@ import re +import time import socket import select from unit.main import TestUnit @@ -63,7 +64,7 @@ class TestHTTP(TestUnit): if 'raw' not in kwargs: req = ' '.join([start_str, url, http]) + crlf - if body is not b'': + if body != b'': if isinstance(body, str): body = body.encode() @@ -178,3 +179,20 @@ class TestHTTP(TestUnit): headers[m.group(1)] = [headers[m.group(1)], m.group(2)] return {'status': int(status), 'headers': headers, 'body': body} + + def waitforsocket(self, port): + ret = False + + for i in range(50): + try: + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.connect(('127.0.0.1', port)) + ret = True + break + except: + sock.close() + time.sleep(0.1) + + sock.close() + + self.assertTrue(ret, 'socket connected') diff --git a/test/unit/main.py b/test/unit/main.py index 873f1815..094fdb0e 100644 --- a/test/unit/main.py +++ b/test/unit/main.py @@ -185,6 +185,8 @@ class TestUnit(unittest.TestCase): if self._started: self._stop() + self.stop_processes() + def _run(self): self.unitd = self.pardir + '/build/unitd' @@ -240,24 +242,24 @@ class TestUnit(unittest.TestCase): break time.sleep(0.1) - if os.path.exists(self.testdir + '/unit.pid'): - exit("Could not terminate unit") + self._p.join(timeout=5) - self._started = False + if self._p.is_alive(): + self._p.terminate() + self._p.join(timeout=5) - self._p.join(timeout=1) - self._terminate_process(self._p) + if self._p.is_alive(): + self.fail("Could not terminate process " + str(self._p.pid)) - def _terminate_process(self, process): - if process.is_alive(): - process.terminate() - process.join(timeout=5) + if os.path.exists(self.testdir + '/unit.pid'): + self.fail("Could not terminate unit") - if process.is_alive(): - exit("Could not terminate process " + process.pid) + self._started = False - if process.exitcode: - exit("Child process terminated with code " + str(process.exitcode)) + if self._p.exitcode: + self.fail( + "Child process terminated with code " + str(self._p.exitcode) + ) def _check_alerts(self, log): found = False @@ -287,6 +289,26 @@ class TestUnit(unittest.TestCase): if found: print('skipped.') + def run_process(self, target): + if not hasattr(self, '_processes'): + self._processes = [] + + process = Process(target=target) + process.start() + + self._processes.append(process) + + def stop_processes(self): + if not hasattr(self, '_processes'): + return + + for process in self._processes: + process.terminate() + process.join(timeout=5) + + if process.is_alive(): + self.fail('Fail to stop process') + def waitforfiles(self, *files): for i in range(50): wait = False @@ -1,5 +1,5 @@ # Copyright (C) NGINX, Inc. -NXT_VERSION=1.12.0 -NXT_VERNUM=11200 +NXT_VERSION=1.13.0 +NXT_VERNUM=11300 |