summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--.hgtags1
-rw-r--r--CHANGES20
-rw-r--r--auto/make1
-rw-r--r--auto/modules/python4
-rw-r--r--auto/os/conf24
-rw-r--r--auto/os/test9
-rw-r--r--auto/sources1
-rw-r--r--docs/changes.xml70
-rw-r--r--pkg/docker/Dockerfile.full2
-rw-r--r--pkg/docker/Dockerfile.go1.7-dev2
-rw-r--r--pkg/docker/Dockerfile.go1.8-dev2
-rw-r--r--pkg/docker/Dockerfile.minimal2
-rw-r--r--pkg/docker/Dockerfile.perl5.242
-rw-r--r--pkg/docker/Dockerfile.php7.02
-rw-r--r--pkg/docker/Dockerfile.python2.72
-rw-r--r--pkg/docker/Dockerfile.python3.52
-rw-r--r--pkg/docker/Dockerfile.ruby2.32
-rw-r--r--src/go/unit/response.go2
-rw-r--r--src/nodejs/unit-http/unit.cpp21
-rw-r--r--src/nxt_application.c3
-rw-r--r--src/nxt_buf.c48
-rw-r--r--src/nxt_buf.h23
-rw-r--r--src/nxt_conf.c7
-rw-r--r--src/nxt_conf.h1
-rw-r--r--src/nxt_conf_validation.c87
-rw-r--r--src/nxt_conn.h2
-rw-r--r--src/nxt_conn_connect.c60
-rw-r--r--src/nxt_conn_read.c2
-rw-r--r--src/nxt_epoll_engine.c8
-rw-r--r--src/nxt_event_engine.c111
-rw-r--r--src/nxt_event_engine.h12
-rw-r--r--src/nxt_h1proto.c840
-rw-r--r--src/nxt_h1proto.h2
-rw-r--r--src/nxt_h1proto_websocket.c5
-rw-r--r--src/nxt_http.h64
-rw-r--r--src/nxt_http_error.c4
-rw-r--r--src/nxt_http_parse.c1
-rw-r--r--src/nxt_http_parse.h3
-rw-r--r--src/nxt_http_proxy.c403
-rw-r--r--src/nxt_http_request.c49
-rw-r--r--src/nxt_http_route.c210
-rw-r--r--src/nxt_http_static.c10
-rw-r--r--src/nxt_http_websocket.c1
-rw-r--r--src/nxt_main_process.c66
-rw-r--r--src/nxt_port.c20
-rw-r--r--src/nxt_port_socket.c22
-rw-r--r--src/nxt_process.c45
-rw-r--r--src/nxt_process.h2
-rw-r--r--src/nxt_python_wsgi.c246
-rw-r--r--src/nxt_router.c35
-rw-r--r--src/nxt_router.h13
-rw-r--r--src/nxt_runtime.c79
-rw-r--r--src/nxt_runtime.h10
-rw-r--r--src/nxt_sendbuf.c53
-rw-r--r--src/nxt_sockaddr.c10
-rw-r--r--src/nxt_socket.c25
-rw-r--r--src/nxt_socket.h1
-rw-r--r--src/nxt_string.c10
-rw-r--r--src/nxt_string.h2
-rw-r--r--src/nxt_unit.c18
-rw-r--r--src/nxt_unit_field.h3
-rw-r--r--src/nxt_websocket.c2
-rw-r--r--src/ruby/nxt_ruby.c11
-rw-r--r--src/ruby/nxt_ruby_stream_io.c6
-rw-r--r--src/test/nxt_unit_websocket_chat.c8
-rw-r--r--test/python/delayed/wsgi.py25
-rw-r--r--test/python/errors_write/wsgi.py1
-rw-r--r--test/python/iter_exception/wsgi.py45
-rw-r--r--test/python/log_body/wsgi.py9
-rw-r--r--test/python/threading/wsgi.py33
-rw-r--r--test/ruby/constants/config.ru15
-rw-r--r--test/test_access_log.py47
-rw-r--r--test/test_configuration.py2
-rw-r--r--test/test_go_isolation.py32
-rw-r--r--test/test_java_websockets.py63
-rw-r--r--test/test_node_websockets.py43
-rw-r--r--test/test_proxy.py622
-rw-r--r--test/test_python_application.py182
-rw-r--r--test/test_routing.py118
-rw-r--r--test/test_ruby_application.py23
-rw-r--r--test/test_static.py6
-rw-r--r--test/test_usr1.py92
-rw-r--r--test/unit/applications/websockets.py31
-rw-r--r--test/unit/http.py20
-rw-r--r--test/unit/main.py48
-rw-r--r--version4
86 files changed, 3555 insertions, 720 deletions
diff --git a/.hgtags b/.hgtags
index 0a0c6e6c..ac9ac382 100644
--- a/.hgtags
+++ b/.hgtags
@@ -31,3 +31,4 @@ b651ff72ffe080835f884a1ace8fa24eb33e3569 1.10.0-2
c27c08b0deeee58676f3880f0f315388ce5d9322 1.11.0-2
b391df5f0102aa6afe660cfc863729c1b1111c9e 1.12.0
c1625c52dd6444ed613348719fbb54c7abcc6619 1.12.0-1
+3313bf222e6e0a91213946dfcbd70bb5079f4cef 1.13.0
diff --git a/CHANGES b/CHANGES
index 608f3ee7..58e91d4f 100644
--- a/CHANGES
+++ b/CHANGES
@@ -1,4 +1,24 @@
+Changes with Unit 1.13.0 14 Nov 2019
+
+ *) Feature: basic support for HTTP reverse proxying.
+
+ *) Feature: compatibility with Python 3.8.
+
+ *) Bugfix: memory leak in Python application processes when the close
+ handler was used.
+
+ *) Bugfix: threads in Python applications might not work correctly.
+
+ *) Bugfix: Ruby on Rails applications might not work on Ruby 2.6.
+
+ *) Bugfix: backtraces for uncaught exceptions in Python 3 might be
+ logged with significant delays.
+
+ *) Bugfix: explicit setting a namespaces isolation option to false might
+ have enabled it.
+
+
Changes with Unit 1.12.0 03 Oct 2019
*) Feature: compatibility with PHP 7.4.
diff --git a/auto/make b/auto/make
index 4b1e76fc..32ead76e 100644
--- a/auto/make
+++ b/auto/make
@@ -14,6 +14,7 @@ mkdir -p $NXT_BUILD_DIR/src \
cat << END > $NXT_MAKEFILE
CC = $CC
+AR = $AR
CFLAGS = $NXT_CFLAGS $NXT_CC_OPT $CFLAGS
diff --git a/auto/modules/python b/auto/modules/python
index abd145c9..6c8198f5 100644
--- a/auto/modules/python
+++ b/auto/modules/python
@@ -64,6 +64,10 @@ nxt_found=no
if /bin/sh -c "$NXT_PYTHON_CONFIG --prefix" >> $NXT_AUTOCONF_ERR 2>&1; then
+ if ${NXT_PYTHON_CONFIG} --embed >/dev/null 2>&1; then
+ NXT_PYTHON_CONFIG="${NXT_PYTHON_CONFIG} --embed"
+ fi
+
NXT_PYTHON_INCLUDE=`${NXT_PYTHON_CONFIG} --includes`
NXT_PYTHON_LIBS=`${NXT_PYTHON_CONFIG} --ldflags`
diff --git a/auto/os/conf b/auto/os/conf
index 1e298ecd..02c4afaf 100644
--- a/auto/os/conf
+++ b/auto/os/conf
@@ -21,7 +21,7 @@ case "$NXT_SYSTEM" in
Linux)
nxt_have=NXT_LINUX . auto/have
- NXT_STATIC_LINK="ar -r -c"
+ NXT_STATIC_LINK="\$(AR) -r -c"
NXT_SHARED_LINK="\$(CC) -shared -Wl,-soname,libnxt.so"
NXT_SHARED_LOCAL_LINK="\$(CC) -shared \
-Wl,-soname,\\\$\$ORIGIN/libnxt.so"
@@ -44,7 +44,7 @@ case "$NXT_SYSTEM" in
FreeBSD)
nxt_have=NXT_FREEBSD . auto/have
- NXT_STATIC_LINK="ar -r -c"
+ NXT_STATIC_LINK="\$(AR) -r -c"
NXT_SHARED_LINK="\$(CC) -shared -Wl,-soname,libnxt.so"
NXT_SHARED_LOCAL_LINK="\$(CC) -shared \
-Wl,-soname,\\\$\$ORIGIN/libnxt.so"
@@ -71,14 +71,14 @@ case "$NXT_SYSTEM" in
case "$NXT_CC_NAME" in
SunC):
- NXT_STATIC_LINK="ar -r -c"
+ NXT_STATIC_LINK="\$(AR) -r -c"
NXT_SHARED_LINK="\$(CC) -G -h libnxt.so"
NXT_SHARED_LOCAL_LINK="\$(CC) -G -h \\\$\$ORIGIN/libnxt.so"
NXT_MODULE_LINK="\$(CC) -G"
;;
*)
- NXT_STATIC_LINK="ar -r -c"
+ NXT_STATIC_LINK="\$(AR) -r -c"
NXT_SHARED_LINK="\$(CC) -shared -Wl,-soname,libnxt.so"
NXT_SHARED_LOCAL_LINK="\$(CC) -shared \
-Wl,-soname,\\\$\$ORIGIN/libnxt.so"
@@ -106,7 +106,7 @@ case "$NXT_SYSTEM" in
# HFS+ volumes are caseless by default.
nxt_have=NXT_HAVE_CASELESS_FILESYSTEM . auto/have
- NXT_STATIC_LINK="ar -r -c"
+ NXT_STATIC_LINK="\$(AR) -r -c"
NXT_SHARED_LINK="\$(CC) -dynamiclib"
NXT_SHARED_LOCAL_LINK="\$(CC) -dynamiclib \
-install_name @executable_path/libnxt.dylib"
@@ -130,7 +130,7 @@ case "$NXT_SYSTEM" in
NetBSD)
nxt_have=NXT_NETBSD . auto/have
- NXT_STATIC_LINK="ar -r -c"
+ NXT_STATIC_LINK="\$(AR) -r -c"
NXT_SHARED_LINK="\$(CC) -shared"
NXT_SHARED_LOCAL_LINK="\$(CC) -shared"
NXT_MODULE_LINK="\$(CC) -shared"
@@ -152,7 +152,7 @@ case "$NXT_SYSTEM" in
OpenBSD)
nxt_have=NXT_OPENBSD . auto/have
- NXT_STATIC_LINK="ar -r -c"
+ NXT_STATIC_LINK="\$(AR) -r -c"
NXT_SHARED_LINK="\$(CC) -shared"
NXT_SHARED_LOCAL_LINK="\$(CC) -shared"
NXT_MODULE_LINK="\$(CC) -shared"
@@ -174,7 +174,7 @@ case "$NXT_SYSTEM" in
DragonFly)
nxt_have=NXT_DRAGONFLY . auto/have
- NXT_STATIC_LINK="ar -r -c"
+ NXT_STATIC_LINK="\$(AR) -r -c"
NXT_SHARED_LINK="\$(CC) -shared"
NXT_SHARED_LOCAL_LINK="\$(CC) -shared"
NXT_MODULE_LINK="\$(CC) -shared"
@@ -196,7 +196,7 @@ case "$NXT_SYSTEM" in
AIX)
nxt_have=NXT_AIX . auto/have
- NXT_STATIC_LINK="ar -r -c"
+ NXT_STATIC_LINK="\$(AR) -r -c"
NXT_SHARED_LINK="\$(CC) -G"
NXT_SHARED_LOCAL_LINK="\$(CC) -G"
NXT_MODULE_LINK="\$(CC) -G"
@@ -220,7 +220,7 @@ case "$NXT_SYSTEM" in
NXT_EXEC_LINK="\$(CC)"
NXT_SHARED_LOCAL_EXEC_LINK=
- NXT_STATIC_LINK="ar -r -c"
+ NXT_STATIC_LINK="\$(AR) -r -c"
NXT_SHARED_LINK="\$(CC) -shared"
NXT_SHARED_LOCAL_LINK="\$(CC) -shared"
NXT_MODULE_LINK="\$(CC) -shared"
@@ -238,7 +238,7 @@ case "$NXT_SYSTEM" in
QNX)
nxt_have=NXT_QNX . auto/have
- NXT_STATIC_LINK="ar -r -c"
+ NXT_STATIC_LINK="\$(AR) -r -c"
NXT_SHARED_LINK="\$(CC) -shared"
NXT_SHARED_LOCAL_LINK="\$(CC) -shared"
NXT_MODULE_LINK="\$(CC) -shared"
@@ -257,7 +257,7 @@ case "$NXT_SYSTEM" in
;;
*)
- NXT_STATIC_LINK="ar -r -c"
+ NXT_STATIC_LINK="\$(AR) -r -c"
NXT_SHARED_LINK="\$(CC) -shared"
NXT_SHARED_LOCAL_LINK="\$(CC) -shared"
NXT_MODULE_LINK="\$(CC) -shared"
diff --git a/auto/os/test b/auto/os/test
index 3188d3db..c37700a6 100644
--- a/auto/os/test
+++ b/auto/os/test
@@ -14,6 +14,7 @@ case "$NXT_SYSTEM" in
NXT_SYSTEM_PLATFORM=`uname -m 2>/dev/null`
echo=echo
CC=${CC:-cc}
+ AR=${AR:-ar}
;;
FreeBSD | NetBSD | OpenBSD | DragonFly)
@@ -21,6 +22,7 @@ case "$NXT_SYSTEM" in
NXT_SYSTEM_PLATFORM=`uname -m 2>/dev/null`
echo=echo
CC=${CC:-cc}
+ AR=${AR:-ar}
;;
SunOS)
@@ -28,6 +30,7 @@ case "$NXT_SYSTEM" in
NXT_SYSTEM_PLATFORM=`uname -p 2>/dev/null`
echo=echo
CC=${CC:-gcc}
+ AR=${AR:-ar}
NXT_TEST_CFLAGS="$NXT_TEST_CFLAGS -D_XOPEN_SOURCE"
NXT_TEST_CFLAGS="$NXT_TEST_CFLAGS -D_XOPEN_SOURCE_EXTENDED=1"
@@ -40,6 +43,7 @@ case "$NXT_SYSTEM" in
NXT_SYSTEM_PLATFORM=`uname -m 2>/dev/null`
echo=echo
CC=${CC:-cc}
+ AR=${AR:-ar}
;;
AIX)
@@ -47,6 +51,7 @@ case "$NXT_SYSTEM" in
NXT_SYSTEM_PLATFORM=`uname -p 2>/dev/null`
echo=echo
CC=${CC:-gcc}
+ AR=${AR:-ar}
;;
HP-UX)
@@ -54,6 +59,7 @@ case "$NXT_SYSTEM" in
NXT_SYSTEM_PLATFORM=`uname -m 2>/dev/null`
echo=echo
CC=${CC:-gcc}
+ AR=${AR:-ar}
NXT_TEST_CFLAGS="$NXT_TEST_CFLAGS -D_XOPEN_SOURCE"
NXT_TEST_CFLAGS="$NXT_TEST_CFLAGS -D_XOPEN_SOURCE_EXTENDED"
@@ -65,6 +71,7 @@ case "$NXT_SYSTEM" in
NXT_SYSTEM_PLATFORM=`uname -p 2>/dev/null`
echo=echo
CC=${CC:-gcc}
+ AR=${AR:-ar}
;;
MINGW*)
@@ -76,6 +83,7 @@ case "$NXT_SYSTEM" in
NXT_SYSTEM_PLATFORM=`uname -m 2>/dev/null`
echo=auto/echo/echo.exe
CC=${CC:-cl}
+ AR=${AR:-ar}
NXT_WINDOWS=YES
;;
@@ -84,6 +92,7 @@ case "$NXT_SYSTEM" in
NXT_SYSTEM_PLATFORM=`uname -p 2>/dev/null`
echo=echo
CC=${CC:-gcc}
+ AR=${AR:-ar}
;;
esac
diff --git a/auto/sources b/auto/sources
index c4b3808b..155e388b 100644
--- a/auto/sources
+++ b/auto/sources
@@ -85,6 +85,7 @@ NXT_LIB_SRCS=" \
src/nxt_http_error.c \
src/nxt_http_route.c \
src/nxt_http_static.c \
+ src/nxt_http_proxy.c \
src/nxt_application.c \
src/nxt_external.c \
src/nxt_port_hash.c \
diff --git a/docs/changes.xml b/docs/changes.xml
index 93b56293..a0dd7009 100644
--- a/docs/changes.xml
+++ b/docs/changes.xml
@@ -12,6 +12,76 @@
unit-perl
unit-ruby
unit-jsc-common unit-jsc8 unit-jsc10 unit-jsc11"
+ ver="1.13.0" rev="1"
+ date="2019-11-14" time="18:00:00 +0300"
+ packager="Andrei Belov &lt;defan@nginx.com&gt;">
+
+<change>
+<para>
+NGINX Unit updated to 1.13.0.
+</para>
+</change>
+
+</changes>
+
+
+<changes apply="unit" ver="1.13.0" rev="1"
+ date="2019-11-14" time="18:00:00 +0300"
+ packager="Andrei Belov &lt;defan@nginx.com&gt;">
+
+<change type="feature">
+<para>
+basic support for HTTP reverse proxying.
+</para>
+</change>
+
+<change type="feature">
+<para>
+compatibility with Python 3.8.
+</para>
+</change>
+
+<change type="bugfix">
+<para>
+memory leak in Python application processes when the close handler was used.
+</para>
+</change>
+
+<change type="bugfix">
+<para>
+threads in Python applications might not work correctly.
+</para>
+</change>
+
+<change type="bugfix">
+<para>
+Ruby on Rails applications might not work on Ruby 2.6.
+</para>
+</change>
+
+<change type="bugfix">
+<para>
+backtraces for uncaught exceptions in Python 3 might be logged with significant
+delays.
+</para>
+</change>
+
+<change type="bugfix">
+<para>
+explicit setting a namespaces isolation option to false might have enabled it.
+</para>
+</change>
+
+</changes>
+
+
+<changes apply="unit-php
+ unit-python unit-python2.7
+ unit-python3.4 unit-python3.5 unit-python3.6 unit-python3.7
+ unit-go unit-go1.7 unit-go1.8 unit-go1.9 unit-go1.10 unit-go1.11
+ unit-perl
+ unit-ruby
+ unit-jsc-common unit-jsc8 unit-jsc10 unit-jsc11"
ver="1.12.0" rev="1"
date="2019-10-03" time="18:00:00 +0300"
packager="Andrei Belov &lt;defan@nginx.com&gt;">
diff --git a/pkg/docker/Dockerfile.full b/pkg/docker/Dockerfile.full
index c6646fda..cf3b1150 100644
--- a/pkg/docker/Dockerfile.full
+++ b/pkg/docker/Dockerfile.full
@@ -2,7 +2,7 @@ FROM debian:stretch-slim
LABEL maintainer="NGINX Docker Maintainers <docker-maint@nginx.com>"
-ENV UNIT_VERSION 1.12.0-1~stretch
+ENV UNIT_VERSION 1.13.0-1~stretch
RUN set -x \
&& apt-get update \
diff --git a/pkg/docker/Dockerfile.go1.7-dev b/pkg/docker/Dockerfile.go1.7-dev
index 9b57d3ec..5622271e 100644
--- a/pkg/docker/Dockerfile.go1.7-dev
+++ b/pkg/docker/Dockerfile.go1.7-dev
@@ -2,7 +2,7 @@ FROM debian:stretch-slim
LABEL maintainer="NGINX Docker Maintainers <docker-maint@nginx.com>"
-ENV UNIT_VERSION 1.12.0-1~stretch
+ENV UNIT_VERSION 1.13.0-1~stretch
RUN set -x \
&& apt-get update \
diff --git a/pkg/docker/Dockerfile.go1.8-dev b/pkg/docker/Dockerfile.go1.8-dev
index 5f650aae..d38b669b 100644
--- a/pkg/docker/Dockerfile.go1.8-dev
+++ b/pkg/docker/Dockerfile.go1.8-dev
@@ -2,7 +2,7 @@ FROM debian:stretch-slim
LABEL maintainer="NGINX Docker Maintainers <docker-maint@nginx.com>"
-ENV UNIT_VERSION 1.12.0-1~stretch
+ENV UNIT_VERSION 1.13.0-1~stretch
RUN set -x \
&& apt-get update \
diff --git a/pkg/docker/Dockerfile.minimal b/pkg/docker/Dockerfile.minimal
index 427a3ada..4567b2da 100644
--- a/pkg/docker/Dockerfile.minimal
+++ b/pkg/docker/Dockerfile.minimal
@@ -2,7 +2,7 @@ FROM debian:stretch-slim
LABEL maintainer="NGINX Docker Maintainers <docker-maint@nginx.com>"
-ENV UNIT_VERSION 1.12.0-1~stretch
+ENV UNIT_VERSION 1.13.0-1~stretch
RUN set -x \
&& apt-get update \
diff --git a/pkg/docker/Dockerfile.perl5.24 b/pkg/docker/Dockerfile.perl5.24
index 7e51adcf..64d68a4e 100644
--- a/pkg/docker/Dockerfile.perl5.24
+++ b/pkg/docker/Dockerfile.perl5.24
@@ -2,7 +2,7 @@ FROM debian:stretch-slim
LABEL maintainer="NGINX Docker Maintainers <docker-maint@nginx.com>"
-ENV UNIT_VERSION 1.12.0-1~stretch
+ENV UNIT_VERSION 1.13.0-1~stretch
RUN set -x \
&& apt-get update \
diff --git a/pkg/docker/Dockerfile.php7.0 b/pkg/docker/Dockerfile.php7.0
index 9615e041..649de902 100644
--- a/pkg/docker/Dockerfile.php7.0
+++ b/pkg/docker/Dockerfile.php7.0
@@ -2,7 +2,7 @@ FROM debian:stretch-slim
LABEL maintainer="NGINX Docker Maintainers <docker-maint@nginx.com>"
-ENV UNIT_VERSION 1.12.0-1~stretch
+ENV UNIT_VERSION 1.13.0-1~stretch
RUN set -x \
&& apt-get update \
diff --git a/pkg/docker/Dockerfile.python2.7 b/pkg/docker/Dockerfile.python2.7
index eb599f95..e9866349 100644
--- a/pkg/docker/Dockerfile.python2.7
+++ b/pkg/docker/Dockerfile.python2.7
@@ -2,7 +2,7 @@ FROM debian:stretch-slim
LABEL maintainer="NGINX Docker Maintainers <docker-maint@nginx.com>"
-ENV UNIT_VERSION 1.12.0-1~stretch
+ENV UNIT_VERSION 1.13.0-1~stretch
RUN set -x \
&& apt-get update \
diff --git a/pkg/docker/Dockerfile.python3.5 b/pkg/docker/Dockerfile.python3.5
index 362fb3ed..831bc54a 100644
--- a/pkg/docker/Dockerfile.python3.5
+++ b/pkg/docker/Dockerfile.python3.5
@@ -2,7 +2,7 @@ FROM debian:stretch-slim
LABEL maintainer="NGINX Docker Maintainers <docker-maint@nginx.com>"
-ENV UNIT_VERSION 1.12.0-1~stretch
+ENV UNIT_VERSION 1.13.0-1~stretch
RUN set -x \
&& apt-get update \
diff --git a/pkg/docker/Dockerfile.ruby2.3 b/pkg/docker/Dockerfile.ruby2.3
index cc2e8637..181b2525 100644
--- a/pkg/docker/Dockerfile.ruby2.3
+++ b/pkg/docker/Dockerfile.ruby2.3
@@ -2,7 +2,7 @@ FROM debian:stretch-slim
LABEL maintainer="NGINX Docker Maintainers <docker-maint@nginx.com>"
-ENV UNIT_VERSION 1.12.0-1~stretch
+ENV UNIT_VERSION 1.13.0-1~stretch
RUN set -x \
&& apt-get update \
diff --git a/src/go/unit/response.go b/src/go/unit/response.go
index bb326ea5..767d66b7 100644
--- a/src/go/unit/response.go
+++ b/src/go/unit/response.go
@@ -63,7 +63,7 @@ func (r *response) WriteHeader(code int) {
for k, vv := range r.header {
for _, v := range vv {
fields++
- fields_size += len(k) + len(v) + 2
+ fields_size += len(k) + len(v)
}
}
diff --git a/src/nodejs/unit-http/unit.cpp b/src/nodejs/unit-http/unit.cpp
index ac10024c..10875703 100644
--- a/src/nodejs/unit-http/unit.cpp
+++ b/src/nodejs/unit-http/unit.cpp
@@ -629,9 +629,6 @@ Unit::response_send_headers(napi_env env, napi_callback_info info)
keys_count = napi.get_value_uint32(argv[2]);
header_len = napi.get_value_uint32(argv[3]);
- /* Need to reserve extra byte for C-string 0-termination. */
- header_len++;
-
headers = argv[1];
ret = nxt_unit_response_init(req, status_code, keys_count, header_len);
@@ -640,6 +637,12 @@ Unit::response_send_headers(napi_env env, napi_callback_info info)
return nullptr;
}
+ /*
+ * Each name and value are 0-terminated by libunit.
+ * Need to add extra 2 bytes for each header.
+ */
+ header_len += keys_count * 2;
+
keys = napi.get_property_names(headers);
keys_len = napi.get_array_length(keys);
@@ -656,8 +659,8 @@ Unit::response_send_headers(napi_env env, napi_callback_info info)
name_len = napi.get_value_string_latin1(name, ptr, header_len);
name_ptr = ptr;
- ptr += name_len;
- header_len -= name_len;
+ ptr += name_len + 1;
+ header_len -= name_len + 1;
hash = nxt_unit_field_hash(name_ptr, name_len);
@@ -689,8 +692,8 @@ Unit::response_send_headers(napi_env env, napi_callback_info info)
nxt_unit_sptr_set(&f->value, ptr);
f->value_length = (uint32_t) value_len;
- ptr += value_len;
- header_len -= value_len;
+ ptr += value_len + 1;
+ header_len -= value_len + 1;
req->response->fields_count++;
}
@@ -715,8 +718,8 @@ Unit::response_send_headers(napi_env env, napi_callback_info info)
nxt_unit_sptr_set(&f->value, ptr);
f->value_length = (uint32_t) value_len;
- ptr += value_len;
- header_len -= value_len;
+ ptr += value_len + 1;
+ header_len -= value_len + 1;
req->response->fields_count++;
}
diff --git a/src/nxt_application.c b/src/nxt_application.c
index 468bc627..bebe3907 100644
--- a/src/nxt_application.c
+++ b/src/nxt_application.c
@@ -327,6 +327,9 @@ nxt_app_start(nxt_task_t *task, void *data)
lang->version, lang->file);
nxt_app = nxt_app_module_load(task, lang->file);
+ if (nxt_slow_path(nxt_app == NULL)) {
+ return NXT_ERROR;
+ }
}
if (nxt_app->pre_init != NULL) {
diff --git a/src/nxt_buf.c b/src/nxt_buf.c
index 91846e4d..83be0fac 100644
--- a/src/nxt_buf.c
+++ b/src/nxt_buf.c
@@ -183,7 +183,10 @@ nxt_buf_chain_length(nxt_buf_t *b)
length = 0;
while (b != NULL) {
- length += b->mem.free - b->mem.pos;
+ if (!nxt_buf_is_sync(b)) {
+ length += b->mem.free - b->mem.pos;
+ }
+
b = b->next;
}
@@ -195,7 +198,7 @@ static void
nxt_buf_completion(nxt_task_t *task, void *obj, void *data)
{
nxt_mp_t *mp;
- nxt_buf_t *b, *parent;
+ nxt_buf_t *b, *next, *parent;
b = obj;
parent = data;
@@ -204,9 +207,23 @@ nxt_buf_completion(nxt_task_t *task, void *obj, void *data)
nxt_assert(data == b->parent);
- mp = b->data;
- nxt_mp_free(mp, b);
+ do {
+ next = b->next;
+ parent = b->parent;
+ mp = b->data;
+
+ nxt_mp_free(mp, b);
+ nxt_buf_parent_completion(task, parent);
+
+ b = next;
+ } while (b != NULL);
+}
+
+
+void
+nxt_buf_parent_completion(nxt_task_t *task, nxt_buf_t *parent)
+{
if (parent != NULL) {
nxt_debug(task, "parent retain:%uD", parent->retain);
@@ -255,7 +272,7 @@ static void
nxt_buf_ts_completion(nxt_task_t *task, void *obj, void *data)
{
nxt_mp_t *mp;
- nxt_buf_t *b, *parent;
+ nxt_buf_t *b, *next, *parent;
b = obj;
parent = data;
@@ -268,21 +285,18 @@ nxt_buf_ts_completion(nxt_task_t *task, void *obj, void *data)
nxt_assert(data == b->parent);
- mp = b->data;
- nxt_mp_free(mp, b);
- nxt_mp_release(mp);
+ do {
+ next = b->next;
+ parent = b->parent;
+ mp = b->data;
- if (parent != NULL) {
- nxt_debug(task, "parent retain:%uD", parent->retain);
+ nxt_mp_free(mp, b);
+ nxt_mp_release(mp);
- parent->retain--;
+ nxt_buf_parent_completion(task, parent);
- if (parent->retain == 0) {
- parent->mem.pos = parent->mem.free;
-
- parent->completion_handler(task, parent, parent->parent);
- }
- }
+ b = next;
+ } while (b != NULL);
}
diff --git a/src/nxt_buf.h b/src/nxt_buf.h
index 9c22d650..25e8499a 100644
--- a/src/nxt_buf.h
+++ b/src/nxt_buf.h
@@ -77,17 +77,17 @@ struct nxt_buf_s {
uint32_t retain;
- uint8_t is_file; /* 1 bit */
-
- uint16_t is_mmap:1;
- uint16_t is_port_mmap:1;
-
- uint16_t is_sync:1;
- uint16_t is_nobuf:1;
- uint16_t is_flush:1;
- uint16_t is_last:1;
- uint16_t is_port_mmap_sent:1;
- uint16_t is_ts:1;
+ uint8_t cache_hint;
+
+ uint8_t is_file:1;
+ uint8_t is_mmap:1;
+ uint8_t is_port_mmap:1;
+ uint8_t is_sync:1;
+ uint8_t is_nobuf:1;
+ uint8_t is_flush:1;
+ uint8_t is_last:1;
+ uint8_t is_port_mmap_sent:1;
+ uint8_t is_ts:1;
nxt_buf_mem_t mem;
@@ -250,6 +250,7 @@ NXT_EXPORT nxt_buf_t *nxt_buf_sync_alloc(nxt_mp_t *mp, nxt_uint_t flags);
NXT_EXPORT nxt_int_t nxt_buf_ts_handle(nxt_task_t *task, void *obj, void *data);
+NXT_EXPORT void nxt_buf_parent_completion(nxt_task_t *task, nxt_buf_t *parent);
NXT_EXPORT nxt_buf_t *nxt_buf_make_plain(nxt_mp_t *mp, nxt_buf_t *src,
size_t size);
diff --git a/src/nxt_conf.c b/src/nxt_conf.c
index 59eddd77..43820d2a 100644
--- a/src/nxt_conf.c
+++ b/src/nxt_conf.c
@@ -228,6 +228,13 @@ nxt_conf_get_integer(nxt_conf_value_t *value)
}
+uint8_t
+nxt_conf_get_boolean(nxt_conf_value_t *value)
+{
+ return value->u.boolean;
+}
+
+
nxt_uint_t
nxt_conf_object_members_count(nxt_conf_value_t *value)
{
diff --git a/src/nxt_conf.h b/src/nxt_conf.h
index 725a6c95..66201fee 100644
--- a/src/nxt_conf.h
+++ b/src/nxt_conf.h
@@ -115,6 +115,7 @@ NXT_EXPORT void nxt_conf_set_string(nxt_conf_value_t *value, nxt_str_t *str);
NXT_EXPORT nxt_int_t nxt_conf_set_string_dup(nxt_conf_value_t *value,
nxt_mp_t *mp, nxt_str_t *str);
NXT_EXPORT int64_t nxt_conf_get_integer(nxt_conf_value_t *value);
+NXT_EXPORT uint8_t nxt_conf_get_boolean(nxt_conf_value_t *value);
// FIXME reimplement and reorder functions below
nxt_uint_t nxt_conf_object_members_count(nxt_conf_value_t *value);
diff --git a/src/nxt_conf_validation.c b/src/nxt_conf_validation.c
index c934b10b..105af675 100644
--- a/src/nxt_conf_validation.c
+++ b/src/nxt_conf_validation.c
@@ -58,8 +58,12 @@ static nxt_int_t nxt_conf_vldt_listener(nxt_conf_validation_t *vldt,
static nxt_int_t nxt_conf_vldt_certificate(nxt_conf_validation_t *vldt,
nxt_conf_value_t *value, void *data);
#endif
+static nxt_int_t nxt_conf_vldt_action(nxt_conf_validation_t *vldt,
+ nxt_conf_value_t *value, void *data);
static nxt_int_t nxt_conf_vldt_pass(nxt_conf_validation_t *vldt,
nxt_conf_value_t *value, void *data);
+static nxt_int_t nxt_conf_vldt_proxy(nxt_conf_validation_t *vldt,
+ nxt_conf_value_t *value, void *data);
static nxt_int_t nxt_conf_vldt_routes(nxt_conf_validation_t *vldt,
nxt_conf_value_t *value, void *data);
static nxt_int_t nxt_conf_vldt_routes_member(nxt_conf_validation_t *vldt,
@@ -101,11 +105,9 @@ static nxt_int_t nxt_conf_vldt_java_classpath(nxt_conf_validation_t *vldt,
static nxt_int_t nxt_conf_vldt_java_option(nxt_conf_validation_t *vldt,
nxt_conf_value_t *value);
-static nxt_int_t
-nxt_conf_vldt_isolation(nxt_conf_validation_t *vldt, nxt_conf_value_t *value,
- void *data);
-static nxt_int_t
-nxt_conf_vldt_clone_namespaces(nxt_conf_validation_t *vldt,
+static nxt_int_t nxt_conf_vldt_isolation(nxt_conf_validation_t *vldt,
+ nxt_conf_value_t *value, void *data);
+static nxt_int_t nxt_conf_vldt_clone_namespaces(nxt_conf_validation_t *vldt,
nxt_conf_value_t *value, void *data);
#if (NXT_HAVE_CLONE_NEWUSER)
@@ -316,6 +318,11 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_action_members[] = {
NULL,
NULL },
+ { nxt_string("proxy"),
+ NXT_CONF_VLDT_STRING,
+ &nxt_conf_vldt_proxy,
+ NULL },
+
NXT_CONF_VLDT_END
};
@@ -328,8 +335,8 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_route_members[] = {
{ nxt_string("action"),
NXT_CONF_VLDT_OBJECT,
- &nxt_conf_vldt_object,
- (void *) &nxt_conf_vldt_action_members },
+ &nxt_conf_vldt_action,
+ NULL },
NXT_CONF_VLDT_END
};
@@ -618,7 +625,7 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_java_members[] = {
{ nxt_string("classpath"),
NXT_CONF_VLDT_ARRAY,
&nxt_conf_vldt_array_iterator,
- (void *) &nxt_conf_vldt_java_classpath},
+ (void *) &nxt_conf_vldt_java_classpath },
{ nxt_string("webapp"),
NXT_CONF_VLDT_STRING,
@@ -628,7 +635,7 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_java_members[] = {
{ nxt_string("options"),
NXT_CONF_VLDT_ARRAY,
&nxt_conf_vldt_array_iterator,
- (void *) &nxt_conf_vldt_java_option},
+ (void *) &nxt_conf_vldt_java_option },
{ nxt_string("unit_jars"),
NXT_CONF_VLDT_STRING,
@@ -881,6 +888,37 @@ nxt_conf_vldt_listener(nxt_conf_validation_t *vldt, nxt_str_t *name,
static nxt_int_t
+nxt_conf_vldt_action(nxt_conf_validation_t *vldt, nxt_conf_value_t *value,
+ void *data)
+{
+ nxt_int_t ret;
+ nxt_conf_value_t *pass_value, *share_value, *proxy_value;
+
+ static nxt_str_t pass_str = nxt_string("pass");
+ static nxt_str_t share_str = nxt_string("share");
+ static nxt_str_t proxy_str = nxt_string("proxy");
+
+ ret = nxt_conf_vldt_object(vldt, value, nxt_conf_vldt_action_members);
+
+ if (ret != NXT_OK) {
+ return ret;
+ }
+
+ pass_value = nxt_conf_get_object_member(value, &pass_str, NULL);
+ share_value = nxt_conf_get_object_member(value, &share_str, NULL);
+ proxy_value = nxt_conf_get_object_member(value, &proxy_str, NULL);
+
+ if (pass_value == NULL && share_value == NULL && proxy_value == NULL) {
+ return nxt_conf_vldt_error(vldt, "The \"action\" object must have "
+ "either \"pass\" or \"share\" or "
+ "\"proxy\" option set.");
+ }
+
+ return NXT_OK;
+}
+
+
+static nxt_int_t
nxt_conf_vldt_pass(nxt_conf_validation_t *vldt, nxt_conf_value_t *value,
void *data)
{
@@ -964,6 +1002,30 @@ error:
static nxt_int_t
+nxt_conf_vldt_proxy(nxt_conf_validation_t *vldt, nxt_conf_value_t *value,
+ void *data)
+{
+ nxt_str_t name;
+ nxt_sockaddr_t *sa;
+
+ nxt_conf_get_string(value, &name);
+
+ if (nxt_str_start(&name, "http://", 7)) {
+ name.length -= 7;
+ name.start += 7;
+
+ sa = nxt_sockaddr_parse(vldt->pool, &name);
+ if (sa != NULL) {
+ return NXT_OK;
+ }
+ }
+
+ return nxt_conf_vldt_error(vldt, "The \"proxy\" address is invalid \"%V\"",
+ &name);
+}
+
+
+static nxt_int_t
nxt_conf_vldt_routes(nxt_conf_validation_t *vldt, nxt_conf_value_t *value,
void *data)
{
@@ -1525,8 +1587,8 @@ nxt_conf_vldt_environment(nxt_conf_validation_t *vldt, nxt_str_t *name,
static nxt_int_t
-nxt_conf_vldt_clone_namespaces(nxt_conf_validation_t *vldt, nxt_conf_value_t *value,
- void *data)
+nxt_conf_vldt_clone_namespaces(nxt_conf_validation_t *vldt,
+ nxt_conf_value_t *value, void *data)
{
return nxt_conf_vldt_object(vldt, value, data);
}
@@ -1691,7 +1753,8 @@ nxt_conf_vldt_php_option(nxt_conf_validation_t *vldt, nxt_str_t *name,
static nxt_int_t
-nxt_conf_vldt_java_classpath(nxt_conf_validation_t *vldt, nxt_conf_value_t *value)
+nxt_conf_vldt_java_classpath(nxt_conf_validation_t *vldt,
+ nxt_conf_value_t *value)
{
nxt_str_t str;
diff --git a/src/nxt_conn.h b/src/nxt_conn.h
index 7284808b..2c1d49a0 100644
--- a/src/nxt_conn.h
+++ b/src/nxt_conn.h
@@ -8,7 +8,7 @@
#define _NXT_CONN_H_INCLUDED_
-typedef ssize_t (*nxt_conn_io_read_t)(nxt_conn_t *c);
+typedef ssize_t (*nxt_conn_io_read_t)(nxt_task_t *task, nxt_conn_t *c);
typedef nxt_msec_t (*nxt_conn_timer_value_t)(nxt_conn_t *c, uintptr_t data);
diff --git a/src/nxt_conn_connect.c b/src/nxt_conn_connect.c
index 12b6c80c..d045853f 100644
--- a/src/nxt_conn_connect.c
+++ b/src/nxt_conn_connect.c
@@ -7,6 +7,9 @@
#include <nxt_main.h>
+static nxt_err_t nxt_conn_connect_test_error(nxt_task_t *task, nxt_conn_t *c);
+
+
void
nxt_conn_sys_socket(nxt_task_t *task, void *obj, void *data)
{
@@ -49,7 +52,7 @@ nxt_conn_io_connect(nxt_task_t *task, void *obj, void *data)
case NXT_AGAIN:
c->socket.write_handler = nxt_conn_connect_test;
- c->socket.error_handler = state->error_handler;
+ c->socket.error_handler = nxt_conn_connect_error;
engine = task->thread->engine;
@@ -118,8 +121,7 @@ nxt_conn_socket(nxt_task_t *task, nxt_conn_t *c)
void
nxt_conn_connect_test(nxt_task_t *task, void *obj, void *data)
{
- int ret, err;
- socklen_t len;
+ nxt_err_t err;
nxt_conn_t *c;
c = obj;
@@ -132,48 +134,35 @@ nxt_conn_connect_test(nxt_task_t *task, void *obj, void *data)
nxt_timer_disable(task->thread->engine, &c->write_timer);
}
- err = 0;
- len = sizeof(int);
-
- /*
- * Linux and BSDs return 0 and store a pending error in the err argument;
- * Solaris returns -1 and sets the errno.
- */
-
- ret = getsockopt(c->socket.fd, SOL_SOCKET, SO_ERROR, (void *) &err, &len);
-
- if (nxt_slow_path(ret == -1)) {
- err = nxt_errno;
- }
+ err = nxt_conn_connect_test_error(task, c);
if (err == 0) {
nxt_work_queue_add(c->write_work_queue, c->write_state->ready_handler,
task, c, data);
- return;
+ } else {
+ nxt_conn_connect_error(task, c, data);
}
-
- c->socket.error = err;
-
- nxt_log(task, nxt_socket_error_level(err), "connect(%d, %*s) failed %E",
- c->socket.fd, (size_t) c->remote->length,
- nxt_sockaddr_start(c->remote), err);
-
- nxt_conn_connect_error(task, c, data);
}
void
nxt_conn_connect_error(nxt_task_t *task, void *obj, void *data)
{
+ nxt_err_t err;
nxt_conn_t *c;
nxt_work_handler_t handler;
const nxt_conn_state_t *state;
c = obj;
+ err = c->socket.error;
+
+ if (err == 0) {
+ err = nxt_conn_connect_test_error(task, c);
+ }
state = c->write_state;
- switch (c->socket.error) {
+ switch (err) {
case NXT_ECONNREFUSED:
#if (NXT_LINUX)
@@ -193,3 +182,22 @@ nxt_conn_connect_error(nxt_task_t *task, void *obj, void *data)
nxt_work_queue_add(c->write_work_queue, handler, task, c, data);
}
+
+
+static nxt_err_t
+nxt_conn_connect_test_error(nxt_task_t *task, nxt_conn_t *c)
+{
+ nxt_err_t err;
+
+ err = nxt_socket_error(c->socket.fd);
+
+ if (err != 0) {
+ c->socket.error = err;
+
+ nxt_log(task, nxt_socket_error_level(err), "connect(%d, %*s) failed %E",
+ c->socket.fd, (size_t) c->remote->length,
+ nxt_sockaddr_start(c->remote), err);
+ }
+
+ return err;
+}
diff --git a/src/nxt_conn_read.c b/src/nxt_conn_read.c
index 83969b31..3285abcd 100644
--- a/src/nxt_conn_read.c
+++ b/src/nxt_conn_read.c
@@ -69,7 +69,7 @@ nxt_conn_io_read(nxt_task_t *task, void *obj, void *data)
n = c->io->recvbuf(c, c->read);
} else {
- n = state->io_read_handler(c);
+ n = state->io_read_handler(task, c);
/* The state can be changed by io_read_handler. */
state = c->read_state;
}
diff --git a/src/nxt_epoll_engine.c b/src/nxt_epoll_engine.c
index 9cdaab9b..a944834e 100644
--- a/src/nxt_epoll_engine.c
+++ b/src/nxt_epoll_engine.c
@@ -944,12 +944,12 @@ nxt_epoll_poll(nxt_event_engine_t *engine, nxt_msec_t timeout)
nxt_work_queue_add(ev->read_work_queue, ev->read_handler,
ev->task, ev, ev->data);
+ error = 0;
+
} else if (engine->u.epoll.mode == 0) {
/* Level-triggered mode. */
nxt_epoll_disable_read(engine, ev);
}
-
- error = 0;
}
if ((events & EPOLLOUT) != 0) {
@@ -964,12 +964,12 @@ nxt_epoll_poll(nxt_event_engine_t *engine, nxt_msec_t timeout)
nxt_work_queue_add(ev->write_work_queue, ev->write_handler,
ev->task, ev, ev->data);
+ error = 0;
+
} else if (engine->u.epoll.mode == 0) {
/* Level-triggered mode. */
nxt_epoll_disable_write(engine, ev);
}
-
- error = 0;
}
if (!error) {
diff --git a/src/nxt_event_engine.c b/src/nxt_event_engine.c
index 31a35f6d..6f051067 100644
--- a/src/nxt_event_engine.c
+++ b/src/nxt_event_engine.c
@@ -556,19 +556,19 @@ nxt_event_engine_start(nxt_event_engine_t *engine)
void *
-nxt_event_engine_mem_alloc(nxt_event_engine_t *engine, uint8_t *slot,
+nxt_event_engine_mem_alloc(nxt_event_engine_t *engine, uint8_t *hint,
size_t size)
{
- uint8_t n;
+ uint32_t n;
nxt_uint_t items;
nxt_array_t *mem_cache;
nxt_mem_cache_t *cache;
nxt_mem_cache_block_t *block;
mem_cache = engine->mem_cache;
- n = *slot;
+ n = *hint;
- if (n == (uint8_t) -1) {
+ if (n == NXT_EVENT_ENGINE_NO_MEM_HINT) {
if (mem_cache == NULL) {
/* IPv4 nxt_sockaddr_t and HTTP/1 and HTTP/2 buffers. */
@@ -607,7 +607,9 @@ nxt_event_engine_mem_alloc(nxt_event_engine_t *engine, uint8_t *slot,
found:
- *slot = n;
+ if (n < NXT_EVENT_ENGINE_NO_MEM_HINT) {
+ *hint = (uint8_t) n;
+ }
}
cache = mem_cache->elts;
@@ -626,15 +628,39 @@ nxt_event_engine_mem_alloc(nxt_event_engine_t *engine, uint8_t *slot,
void
-nxt_event_engine_mem_free(nxt_event_engine_t *engine, uint8_t *slot, void *p)
+nxt_event_engine_mem_free(nxt_event_engine_t *engine, uint8_t hint, void *p,
+ size_t size)
{
+ uint32_t n;
+ nxt_array_t *mem_cache;
nxt_mem_cache_t *cache;
nxt_mem_cache_block_t *block;
block = p;
+ mem_cache = engine->mem_cache;
+ cache = mem_cache->elts;
+
+ n = hint;
+
+ if (nxt_slow_path(n == NXT_EVENT_ENGINE_NO_MEM_HINT)) {
- cache = engine->mem_cache->elts;
- cache = cache + *slot;
+ if (size != 0) {
+ for (n = 0; n < mem_cache->nelts; n++) {
+ if (cache[n].size == size) {
+ goto found;
+ }
+ }
+
+ nxt_alert(&engine->task,
+ "event engine mem free(%p, %z) not found", p, size);
+ }
+
+ goto done;
+ }
+
+found:
+
+ cache = cache + n;
if (cache->count < 16) {
cache->count++;
@@ -644,10 +670,79 @@ nxt_event_engine_mem_free(nxt_event_engine_t *engine, uint8_t *slot, void *p)
return;
}
+done:
+
nxt_mp_free(engine->mem_pool, p);
}
+void *
+nxt_event_engine_buf_mem_alloc(nxt_event_engine_t *engine, size_t size)
+{
+ nxt_buf_t *b;
+ uint8_t hint;
+
+ hint = NXT_EVENT_ENGINE_NO_MEM_HINT;
+
+ b = nxt_event_engine_mem_alloc(engine, &hint, NXT_BUF_MEM_SIZE + size);
+ if (nxt_slow_path(b == NULL)) {
+ return NULL;
+ }
+
+ nxt_memzero(b, NXT_BUF_MEM_SIZE);
+
+ b->cache_hint = hint;
+ b->data = engine;
+ b->completion_handler = nxt_event_engine_buf_mem_completion;
+
+ if (size != 0) {
+ b->mem.start = nxt_pointer_to(b, NXT_BUF_MEM_SIZE);
+ b->mem.pos = b->mem.start;
+ b->mem.free = b->mem.start;
+ b->mem.end = b->mem.start + size;
+ }
+
+ return b;
+}
+
+
+void
+nxt_event_engine_buf_mem_free(nxt_event_engine_t *engine, nxt_buf_t *b)
+{
+ size_t size;
+
+ size = NXT_BUF_MEM_SIZE + nxt_buf_mem_size(&b->mem);
+
+ nxt_event_engine_mem_free(engine, b->cache_hint, b, size);
+}
+
+
+void
+nxt_event_engine_buf_mem_completion(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_event_engine_t *engine;
+ nxt_buf_t *b, *next, *parent;
+
+ b = obj;
+ parent = data;
+
+ nxt_debug(task, "buf completion: %p %p", b, b->mem.start);
+
+ engine = b->data;
+
+ do {
+ next = b->next;
+ parent = b->parent;
+
+ nxt_event_engine_buf_mem_free(engine, b);
+
+ nxt_buf_parent_completion(task, parent);
+
+ b = next;
+ } while (b != NULL);
+}
+
+
#if (NXT_DEBUG)
void nxt_event_engine_thread_adopt(nxt_event_engine_t *engine)
diff --git a/src/nxt_event_engine.h b/src/nxt_event_engine.h
index 365d9e89..6b05d510 100644
--- a/src/nxt_event_engine.h
+++ b/src/nxt_event_engine.h
@@ -514,10 +514,16 @@ NXT_EXPORT void nxt_event_engine_post(nxt_event_engine_t *engine,
NXT_EXPORT void nxt_event_engine_signal(nxt_event_engine_t *engine,
nxt_uint_t signo);
-void *nxt_event_engine_mem_alloc(nxt_event_engine_t *engine, uint8_t *slot,
+#define NXT_EVENT_ENGINE_NO_MEM_HINT 255
+
+void *nxt_event_engine_mem_alloc(nxt_event_engine_t *engine, uint8_t *hint,
size_t size);
-void nxt_event_engine_mem_free(nxt_event_engine_t *engine, uint8_t *slot,
- void *p);
+void nxt_event_engine_mem_free(nxt_event_engine_t *engine, uint8_t hint,
+ void *p, size_t size);
+void *nxt_event_engine_buf_mem_alloc(nxt_event_engine_t *engine, size_t size);
+void nxt_event_engine_buf_mem_free(nxt_event_engine_t *engine, nxt_buf_t *b);
+void nxt_event_engine_buf_mem_completion(nxt_task_t *task, void *obj,
+ void *data);
nxt_inline nxt_event_engine_t *
diff --git a/src/nxt_h1proto.c b/src/nxt_h1proto.c
index 541fcb44..b07eaf84 100644
--- a/src/nxt_h1proto.c
+++ b/src/nxt_h1proto.c
@@ -18,10 +18,10 @@
*/
#if (NXT_TLS)
-static ssize_t nxt_http_idle_io_read_handler(nxt_conn_t *c);
+static ssize_t nxt_http_idle_io_read_handler(nxt_task_t *task, nxt_conn_t *c);
static void nxt_http_conn_test(nxt_task_t *task, void *obj, void *data);
#endif
-static ssize_t nxt_h1p_idle_io_read_handler(nxt_conn_t *c);
+static ssize_t nxt_h1p_idle_io_read_handler(nxt_task_t *task, nxt_conn_t *c);
static void nxt_h1p_conn_proto_init(nxt_task_t *task, void *obj, void *data);
static void nxt_h1p_conn_request_init(nxt_task_t *task, void *obj, void *data);
static void nxt_h1p_conn_request_header_parse(nxt_task_t *task, void *obj,
@@ -45,7 +45,7 @@ static void nxt_h1p_conn_request_body_read(nxt_task_t *task, void *obj,
void *data);
static void nxt_h1p_request_local_addr(nxt_task_t *task, nxt_http_request_t *r);
static void nxt_h1p_request_header_send(nxt_task_t *task,
- nxt_http_request_t *r, nxt_work_handler_t body_handler);
+ nxt_http_request_t *r, nxt_work_handler_t body_handler, void *data);
static void nxt_h1p_request_send(nxt_task_t *task, nxt_http_request_t *r,
nxt_buf_t *out);
static nxt_buf_t *nxt_h1p_chunk_create(nxt_task_t *task, nxt_http_request_t *r,
@@ -78,11 +78,32 @@ static void nxt_h1p_idle_response_timeout(nxt_task_t *task, void *obj,
static nxt_msec_t nxt_h1p_idle_response_timer_value(nxt_conn_t *c,
uintptr_t data);
static void nxt_h1p_shutdown(nxt_task_t *task, nxt_conn_t *c);
-static void nxt_h1p_shutdown_(nxt_task_t *task, nxt_conn_t *c);
+static void nxt_h1p_closing(nxt_task_t *task, nxt_conn_t *c);
static void nxt_h1p_conn_ws_shutdown(nxt_task_t *task, void *obj, void *data);
static void nxt_h1p_conn_closing(nxt_task_t *task, void *obj, void *data);
static void nxt_h1p_conn_free(nxt_task_t *task, void *obj, void *data);
+static void nxt_h1p_peer_connect(nxt_task_t *task, nxt_http_peer_t *peer);
+static void nxt_h1p_peer_connected(nxt_task_t *task, void *obj, void *data);
+static void nxt_h1p_peer_refused(nxt_task_t *task, void *obj, void *data);
+static void nxt_h1p_peer_header_send(nxt_task_t *task, nxt_http_peer_t *peer);
+static void nxt_h1p_peer_header_sent(nxt_task_t *task, void *obj, void *data);
+static void nxt_h1p_peer_header_read(nxt_task_t *task, nxt_http_peer_t *peer);
+static ssize_t nxt_h1p_peer_io_read_handler(nxt_task_t *task, nxt_conn_t *c);
+static void nxt_h1p_peer_header_read_done(nxt_task_t *task, void *obj,
+ void *data);
+static nxt_int_t nxt_h1p_peer_header_parse(nxt_http_peer_t *peer,
+ nxt_buf_mem_t *bm);
+static void nxt_h1p_peer_read(nxt_task_t *task, nxt_http_peer_t *peer);
+static void nxt_h1p_peer_read_done(nxt_task_t *task, void *obj, void *data);
+static void nxt_h1p_peer_closed(nxt_task_t *task, void *obj, void *data);
+static void nxt_h1p_peer_error(nxt_task_t *task, void *obj, void *data);
+static void nxt_h1p_peer_send_timeout(nxt_task_t *task, void *obj, void *data);
+static void nxt_h1p_peer_read_timeout(nxt_task_t *task, void *obj, void *data);
+static nxt_msec_t nxt_h1p_peer_timer_value(nxt_conn_t *c, uintptr_t data);
+static void nxt_h1p_peer_close(nxt_task_t *task, nxt_http_peer_t *peer);
+static void nxt_h1p_peer_free(nxt_task_t *task, void *obj, void *data);
+
#if (NXT_TLS)
static const nxt_conn_state_t nxt_http_idle_state;
static const nxt_conn_state_t nxt_h1p_shutdown_state;
@@ -94,6 +115,13 @@ static const nxt_conn_state_t nxt_h1p_request_send_state;
static const nxt_conn_state_t nxt_h1p_timeout_response_state;
static const nxt_conn_state_t nxt_h1p_keepalive_state;
static const nxt_conn_state_t nxt_h1p_close_state;
+static const nxt_conn_state_t nxt_h1p_peer_connect_state;
+static const nxt_conn_state_t nxt_h1p_peer_header_send_state;
+static const nxt_conn_state_t nxt_h1p_peer_header_body_send_state;
+static const nxt_conn_state_t nxt_h1p_peer_header_read_state;
+static const nxt_conn_state_t nxt_h1p_peer_header_read_timer_state;
+static const nxt_conn_state_t nxt_h1p_peer_read_state;
+static const nxt_conn_state_t nxt_h1p_peer_close_state;
const nxt_http_proto_table_t nxt_http_proto[3] = {
@@ -106,6 +134,13 @@ const nxt_http_proto_table_t nxt_http_proto[3] = {
.body_bytes_sent = nxt_h1p_request_body_bytes_sent,
.discard = nxt_h1p_request_discard,
.close = nxt_h1p_request_close,
+
+ .peer_connect = nxt_h1p_peer_connect,
+ .peer_header_send = nxt_h1p_peer_header_send,
+ .peer_header_read = nxt_h1p_peer_header_read,
+ .peer_read = nxt_h1p_peer_read,
+ .peer_close = nxt_h1p_peer_close,
+
.ws_frame_start = nxt_h1p_websocket_frame_start,
},
/* NXT_HTTP_PROTO_H2 */
@@ -113,9 +148,9 @@ const nxt_http_proto_table_t nxt_http_proto[3] = {
};
-static nxt_lvlhsh_t nxt_h1p_fields_hash;
+static nxt_lvlhsh_t nxt_h1p_fields_hash;
-static nxt_http_field_proc_t nxt_h1p_fields[] = {
+static nxt_http_field_proc_t nxt_h1p_fields[] = {
{ nxt_string("Connection"), &nxt_h1p_connection, 0 },
{ nxt_string("Upgrade"), &nxt_h1p_upgrade, 0 },
{ nxt_string("Sec-WebSocket-Key"), &nxt_h1p_websocket_key, 0 },
@@ -136,11 +171,32 @@ static nxt_http_field_proc_t nxt_h1p_fields[] = {
};
+static nxt_lvlhsh_t nxt_h1p_peer_fields_hash;
+
+static nxt_http_field_proc_t nxt_h1p_peer_fields[] = {
+ { nxt_string("Connection"), &nxt_http_proxy_skip, 0 },
+ { nxt_string("Transfer-Encoding"), &nxt_http_proxy_skip, 0 },
+ { nxt_string("Server"), &nxt_http_proxy_skip, 0 },
+ { nxt_string("Date"), &nxt_http_proxy_date, 0 },
+ { nxt_string("Content-Length"), &nxt_http_proxy_content_length, 0 },
+};
+
+
nxt_int_t
nxt_h1p_init(nxt_task_t *task, nxt_runtime_t *rt)
{
- return nxt_http_fields_hash(&nxt_h1p_fields_hash, rt->mem_pool,
- nxt_h1p_fields, nxt_nitems(nxt_h1p_fields));
+ nxt_int_t ret;
+
+ ret = nxt_http_fields_hash(&nxt_h1p_fields_hash, rt->mem_pool,
+ nxt_h1p_fields, nxt_nitems(nxt_h1p_fields));
+
+ if (nxt_fast_path(ret == NXT_OK)) {
+ ret = nxt_http_fields_hash(&nxt_h1p_peer_fields_hash,
+ rt->mem_pool, nxt_h1p_peer_fields,
+ nxt_nitems(nxt_h1p_peer_fields));
+ }
+
+ return ret;
}
@@ -196,7 +252,7 @@ static const nxt_conn_state_t nxt_http_idle_state
static ssize_t
-nxt_http_idle_io_read_handler(nxt_conn_t *c)
+nxt_http_idle_io_read_handler(nxt_task_t *task, nxt_conn_t *c)
{
size_t size;
ssize_t n;
@@ -216,7 +272,7 @@ nxt_http_idle_io_read_handler(nxt_conn_t *c)
size = joint->socket_conf->header_buffer_size;
- b = nxt_buf_mem_alloc(c->mem_pool, size, 0);
+ b = nxt_event_engine_buf_mem_alloc(task->thread->engine, size);
if (nxt_slow_path(b == NULL)) {
c->socket.error = NXT_ENOMEM;
return NXT_ERROR;
@@ -234,7 +290,7 @@ nxt_http_idle_io_read_handler(nxt_conn_t *c)
} else {
c->read = NULL;
- nxt_mp_free(c->mem_pool, b);
+ nxt_event_engine_buf_mem_free(task->thread->engine, b);
}
return n;
@@ -248,12 +304,14 @@ nxt_http_conn_test(nxt_task_t *task, void *obj, void *data)
nxt_buf_t *b;
nxt_conn_t *c;
nxt_tls_conf_t *tls;
+ nxt_event_engine_t *engine;
nxt_socket_conf_joint_t *joint;
c = obj;
nxt_debug(task, "h1p conn https test");
+ engine = task->thread->engine;
b = c->read;
p = b->mem.pos;
@@ -262,7 +320,7 @@ nxt_http_conn_test(nxt_task_t *task, void *obj, void *data)
if (p[0] != 0x16) {
b->mem.free = b->mem.pos;
- nxt_conn_read(task->thread->engine, c);
+ nxt_conn_read(engine, c);
return;
}
@@ -292,7 +350,7 @@ nxt_http_conn_test(nxt_task_t *task, void *obj, void *data)
#endif
c->read = NULL;
- nxt_mp_free(c->mem_pool, b);
+ nxt_event_engine_buf_mem_free(engine, b);
joint = c->listen->socket.data;
@@ -301,7 +359,7 @@ nxt_http_conn_test(nxt_task_t *task, void *obj, void *data)
* Listening socket had been closed while
* connection was in keep-alive state.
*/
- nxt_h1p_shutdown(task, c);
+ nxt_h1p_closing(task, c);
return;
}
@@ -330,7 +388,7 @@ static const nxt_conn_state_t nxt_h1p_idle_state
static ssize_t
-nxt_h1p_idle_io_read_handler(nxt_conn_t *c)
+nxt_h1p_idle_io_read_handler(nxt_task_t *task, nxt_conn_t *c)
{
size_t size;
ssize_t n;
@@ -353,7 +411,7 @@ nxt_h1p_idle_io_read_handler(nxt_conn_t *c)
if (b == NULL) {
size = joint->socket_conf->header_buffer_size;
- b = nxt_buf_mem_alloc(c->mem_pool, size, 0);
+ b = nxt_event_engine_buf_mem_alloc(task->thread->engine, size);
if (nxt_slow_path(b == NULL)) {
c->socket.error = NXT_ENOMEM;
return NXT_ERROR;
@@ -367,7 +425,7 @@ nxt_h1p_idle_io_read_handler(nxt_conn_t *c)
} else {
c->read = NULL;
- nxt_mp_free(c->mem_pool, b);
+ nxt_event_engine_buf_mem_free(task->thread->engine, b);
}
return n;
@@ -386,7 +444,7 @@ nxt_h1p_conn_proto_init(nxt_task_t *task, void *obj, void *data)
h1p = nxt_mp_zget(c->mem_pool, sizeof(nxt_h1proto_t));
if (nxt_slow_path(h1p == NULL)) {
- nxt_h1p_shutdown(task, c);
+ nxt_h1p_closing(task, c);
return;
}
@@ -424,6 +482,12 @@ nxt_h1p_conn_request_init(nxt_task_t *task, void *obj, void *data)
r->tls = c->u.tls;
#endif
+ r->task = c->task;
+ task = &r->task;
+ c->socket.task = task;
+ c->read_timer.task = task;
+ c->write_timer.task = task;
+
ret = nxt_http_parse_request_init(&h1p->parser, r->mem_pool);
if (nxt_fast_path(ret == NXT_OK)) {
@@ -444,7 +508,7 @@ nxt_h1p_conn_request_init(nxt_task_t *task, void *obj, void *data)
nxt_mp_release(r->mem_pool);
}
- nxt_h1p_shutdown(task, c);
+ nxt_h1p_closing(task, c);
}
@@ -599,13 +663,15 @@ nxt_h1p_header_process(nxt_task_t *task, nxt_h1proto_t *h1p,
}
if (nxt_slow_path(h1p->websocket_key == NULL)) {
- nxt_log(task, NXT_LOG_INFO, "h1p upgrade: bad or absent websocket key");
+ nxt_log(task, NXT_LOG_INFO,
+ "h1p upgrade: bad or absent websocket key");
return NXT_HTTP_BAD_REQUEST;
}
if (nxt_slow_path(h1p->websocket_version_ok == 0)) {
- nxt_log(task, NXT_LOG_INFO, "h1p upgrade: bad or absent websocket version");
+ nxt_log(task, NXT_LOG_INFO,
+ "h1p upgrade: bad or absent websocket version");
return NXT_HTTP_UPGRADE_REQUIRED;
}
@@ -655,16 +721,16 @@ nxt_h1p_header_buffer_test(nxt_task_t *task, nxt_h1proto_t *h1p, nxt_conn_t *c,
static nxt_int_t
nxt_h1p_connection(void *ctx, nxt_http_field_t *field, uintptr_t data)
{
- nxt_http_request_t *r;
- static const u_char *upgrade = (const u_char *) "upgrade";
+ nxt_http_request_t *r;
r = ctx;
+ field->hopbyhop = 1;
if (field->value_length == 5 && nxt_memcmp(field->value, "close", 5) == 0) {
r->proto.h1->keepalive = 0;
} else if (field->value_length == 7
- && nxt_memcasecmp(field->value, upgrade, 7) == 0)
+ && nxt_memcasecmp(field->value, "upgrade", 7) == 0)
{
r->proto.h1->connection_upgrade = 1;
}
@@ -676,13 +742,12 @@ nxt_h1p_connection(void *ctx, nxt_http_field_t *field, uintptr_t data)
static nxt_int_t
nxt_h1p_upgrade(void *ctx, nxt_http_field_t *field, uintptr_t data)
{
- nxt_http_request_t *r;
- static const u_char *websocket = (const u_char *) "websocket";
+ nxt_http_request_t *r;
r = ctx;
if (field->value_length == 9
- && nxt_memcasecmp(field->value, websocket, 9) == 0)
+ && nxt_memcasecmp(field->value, "websocket", 9) == 0)
{
r->proto.h1->upgrade_websocket = 1;
}
@@ -730,6 +795,8 @@ nxt_h1p_transfer_encoding(void *ctx, nxt_http_field_t *field, uintptr_t data)
nxt_http_request_t *r;
r = ctx;
+ field->skip = 1;
+ field->hopbyhop = 1;
if (field->value_length == 7
&& nxt_memcmp(field->value, "chunked", 7) == 0)
@@ -995,7 +1062,7 @@ static const nxt_str_t nxt_http_server_error[] = {
static void
nxt_h1p_request_header_send(nxt_task_t *task, nxt_http_request_t *r,
- nxt_work_handler_t body_handler)
+ nxt_work_handler_t body_handler, void *data)
{
u_char *p;
size_t size;
@@ -1172,7 +1239,7 @@ nxt_h1p_request_header_send(nxt_task_t *task, nxt_http_request_t *r,
* in engine->write_work_queue.
*/
nxt_work_queue_add(&task->thread->engine->fast_work_queue,
- body_handler, task, r, NULL);
+ body_handler, task, r, data);
} else {
header->next = nxt_http_buf_last(r);
@@ -1211,6 +1278,7 @@ nxt_h1p_complete_buffers(nxt_task_t *task, nxt_h1proto_t *h1p)
while (b != NULL) {
next = b->next;
+ b->next = NULL;
nxt_work_queue_add(&task->thread->engine->fast_work_queue,
b->completion_handler, task, b, b->parent);
@@ -1226,7 +1294,8 @@ nxt_h1p_complete_buffers(nxt_task_t *task, nxt_h1proto_t *h1p)
size = nxt_buf_mem_used_size(&in->mem);
if (size == 0) {
- nxt_mp_free(c->mem_pool, in);
+ nxt_work_queue_add(&task->thread->engine->fast_work_queue,
+ in->completion_handler, task, in, in->parent);
c->read = NULL;
}
@@ -1480,11 +1549,16 @@ nxt_h1p_request_close(nxt_task_t *task, nxt_http_proto_t proto,
nxt_debug(task, "h1p request close");
h1p = proto.h1;
+ h1p->keepalive &= !h1p->request->inconsistent;
h1p->request = NULL;
nxt_router_conf_release(task, joint);
c = h1p->conn;
+ task = &c->task;
+ c->socket.task = task;
+ c->read_timer.task = task;
+ c->write_timer.task = task;
if (h1p->keepalive) {
nxt_h1p_keepalive(task, h1p, c);
@@ -1770,14 +1844,31 @@ nxt_h1p_shutdown(nxt_task_t *task, nxt_conn_t *c)
}
} else {
- nxt_h1p_shutdown_(task, c);
+ nxt_h1p_closing(task, c);
}
}
static void
-nxt_h1p_shutdown_(nxt_task_t *task, nxt_conn_t *c)
+nxt_h1p_conn_ws_shutdown(nxt_task_t *task, void *obj, void *data)
{
+ nxt_timer_t *timer;
+ nxt_h1p_websocket_timer_t *ws_timer;
+
+ nxt_debug(task, "h1p conn ws shutdown");
+
+ timer = obj;
+ ws_timer = nxt_timer_data(timer, nxt_h1p_websocket_timer_t, timer);
+
+ nxt_h1p_closing(task, ws_timer->h1p->conn);
+}
+
+
+static void
+nxt_h1p_closing(nxt_task_t *task, nxt_conn_t *c)
+{
+ nxt_debug(task, "h1p closing");
+
c->socket.data = NULL;
#if (NXT_TLS)
@@ -1809,21 +1900,6 @@ static const nxt_conn_state_t nxt_h1p_shutdown_state
static void
-nxt_h1p_conn_ws_shutdown(nxt_task_t *task, void *obj, void *data)
-{
- nxt_timer_t *timer;
- nxt_h1p_websocket_timer_t *ws_timer;
-
- nxt_debug(task, "h1p conn ws shutdown");
-
- timer = obj;
- ws_timer = nxt_timer_data(timer, nxt_h1p_websocket_timer_t, timer);
-
- nxt_h1p_shutdown_(task, ws_timer->h1p->conn);
-}
-
-
-static void
nxt_h1p_conn_closing(nxt_task_t *task, void *obj, void *data)
{
nxt_conn_t *c;
@@ -1868,3 +1944,673 @@ nxt_h1p_conn_free(nxt_task_t *task, void *obj, void *data)
nxt_router_listen_event_release(&engine->task, lev, NULL);
}
+
+
+static void
+nxt_h1p_peer_connect(nxt_task_t *task, nxt_http_peer_t *peer)
+{
+ nxt_mp_t *mp;
+ nxt_int_t ret;
+ nxt_conn_t *c, *client;
+ nxt_h1proto_t *h1p;
+ nxt_fd_event_t *socket;
+ nxt_work_queue_t *wq;
+ nxt_http_request_t *r;
+
+ nxt_debug(task, "h1p peer connect");
+
+ peer->status = NXT_HTTP_UNSET;
+ r = peer->request;
+
+ mp = nxt_mp_create(1024, 128, 256, 32);
+
+ if (nxt_slow_path(mp == NULL)) {
+ goto fail;
+ }
+
+ h1p = nxt_mp_zalloc(mp, sizeof(nxt_h1proto_t));
+ if (nxt_slow_path(h1p == NULL)) {
+ goto fail;
+ }
+
+ ret = nxt_http_parse_request_init(&h1p->parser, r->mem_pool);
+ if (nxt_slow_path(ret != NXT_OK)) {
+ goto fail;
+ }
+
+ c = nxt_conn_create(mp, task);
+ if (nxt_slow_path(c == NULL)) {
+ goto fail;
+ }
+
+ c->mem_pool = mp;
+ h1p->conn = c;
+
+ peer->proto.h1 = h1p;
+ h1p->request = r;
+
+ c->socket.task = task;
+ c->read_timer.task = task;
+ c->write_timer.task = task;
+ c->socket.data = peer;
+ c->remote = peer->sockaddr;
+
+ c->socket.write_ready = 1;
+ c->write_state = &nxt_h1p_peer_connect_state;
+
+ /*
+ * TODO: queues should be implemented via client proto interface.
+ */
+ client = r->proto.h1->conn;
+
+ socket = &client->socket;
+ wq = socket->read_work_queue;
+ c->read_work_queue = wq;
+ c->socket.read_work_queue = wq;
+ c->read_timer.work_queue = wq;
+
+ wq = socket->write_work_queue;
+ c->write_work_queue = wq;
+ c->socket.write_work_queue = wq;
+ c->write_timer.work_queue = wq;
+ /* TODO END */
+
+ nxt_conn_connect(task->thread->engine, c);
+
+ return;
+
+fail:
+
+ peer->status = NXT_HTTP_INTERNAL_SERVER_ERROR;
+
+ r->state->error_handler(task, r, peer);
+}
+
+
+static const nxt_conn_state_t nxt_h1p_peer_connect_state
+ nxt_aligned(64) =
+{
+ .ready_handler = nxt_h1p_peer_connected,
+ .close_handler = nxt_h1p_peer_refused,
+ .error_handler = nxt_h1p_peer_error,
+
+ .timer_handler = nxt_h1p_peer_send_timeout,
+ .timer_value = nxt_h1p_peer_timer_value,
+ .timer_data = offsetof(nxt_socket_conf_t, proxy_timeout),
+};
+
+
+static void
+nxt_h1p_peer_connected(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_http_peer_t *peer;
+ nxt_http_request_t *r;
+
+ peer = data;
+
+ nxt_debug(task, "h1p peer connected");
+
+ r = peer->request;
+ r->state->ready_handler(task, r, peer);
+}
+
+
+static void
+nxt_h1p_peer_refused(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_http_peer_t *peer;
+ nxt_http_request_t *r;
+
+ peer = data;
+
+ nxt_debug(task, "h1p peer refused");
+
+ //peer->status = NXT_HTTP_SERVICE_UNAVAILABLE;
+ peer->status = NXT_HTTP_BAD_GATEWAY;
+
+ r = peer->request;
+ r->state->error_handler(task, r, peer);
+}
+
+
+static void
+nxt_h1p_peer_header_send(nxt_task_t *task, nxt_http_peer_t *peer)
+{
+ u_char *p;
+ size_t size;
+ nxt_buf_t *header, *body;
+ nxt_conn_t *c;
+ nxt_http_field_t *field;
+ nxt_http_request_t *r;
+
+ nxt_debug(task, "h1p peer header send");
+
+ r = peer->request;
+
+ size = r->method->length + sizeof(" ") + r->target.length
+ + sizeof(" HTTP/1.0\r\n")
+ + sizeof("\r\n");
+
+ nxt_list_each(field, r->fields) {
+
+ if (!field->hopbyhop) {
+ size += field->name_length + field->value_length;
+ size += nxt_length(": \r\n");
+ }
+
+ } nxt_list_loop;
+
+ header = nxt_http_buf_mem(task, r, size);
+ if (nxt_slow_path(header == NULL)) {
+ r->state->error_handler(task, r, peer);
+ return;
+ }
+
+ p = header->mem.free;
+
+ p = nxt_cpymem(p, r->method->start, r->method->length);
+ *p++ = ' ';
+ p = nxt_cpymem(p, r->target.start, r->target.length);
+ p = nxt_cpymem(p, " HTTP/1.0\r\n", 11);
+
+ nxt_list_each(field, r->fields) {
+
+ if (!field->hopbyhop) {
+ p = nxt_cpymem(p, field->name, field->name_length);
+ *p++ = ':'; *p++ = ' ';
+ p = nxt_cpymem(p, field->value, field->value_length);
+ *p++ = '\r'; *p++ = '\n';
+ }
+
+ } nxt_list_loop;
+
+ *p++ = '\r'; *p++ = '\n';
+ header->mem.free = p;
+ size = p - header->mem.pos;
+
+ c = peer->proto.h1->conn;
+ c->write = header;
+ c->write_state = &nxt_h1p_peer_header_send_state;
+
+ if (r->body != NULL) {
+ body = nxt_buf_mem_alloc(r->mem_pool, 0, 0);
+ if (nxt_slow_path(body == NULL)) {
+ r->state->error_handler(task, r, peer);
+ return;
+ }
+
+ header->next = body;
+
+ body->mem = r->body->mem;
+ size += nxt_buf_mem_used_size(&body->mem);
+
+// nxt_mp_retain(r->mem_pool);
+ }
+
+ if (size > 16384) {
+ /* Use proxy_send_timeout instead of proxy_timeout. */
+ c->write_state = &nxt_h1p_peer_header_body_send_state;
+ }
+
+ nxt_conn_write(task->thread->engine, c);
+}
+
+
+static const nxt_conn_state_t nxt_h1p_peer_header_send_state
+ nxt_aligned(64) =
+{
+ .ready_handler = nxt_h1p_peer_header_sent,
+ .error_handler = nxt_h1p_peer_error,
+
+ .timer_handler = nxt_h1p_peer_send_timeout,
+ .timer_value = nxt_h1p_peer_timer_value,
+ .timer_data = offsetof(nxt_socket_conf_t, proxy_timeout),
+};
+
+
+static const nxt_conn_state_t nxt_h1p_peer_header_body_send_state
+ nxt_aligned(64) =
+{
+ .ready_handler = nxt_h1p_peer_header_sent,
+ .error_handler = nxt_h1p_peer_error,
+
+ .timer_handler = nxt_h1p_peer_send_timeout,
+ .timer_value = nxt_h1p_peer_timer_value,
+ .timer_data = offsetof(nxt_socket_conf_t, proxy_send_timeout),
+ .timer_autoreset = 1,
+};
+
+
+static void
+nxt_h1p_peer_header_sent(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_conn_t *c;
+ nxt_http_peer_t *peer;
+ nxt_http_request_t *r;
+ nxt_event_engine_t *engine;
+
+ c = obj;
+ peer = data;
+
+ nxt_debug(task, "h1p peer header sent");
+
+ engine = task->thread->engine;
+
+ c->write = nxt_sendbuf_completion(task, &engine->fast_work_queue, c->write);
+
+ if (c->write == NULL) {
+ r = peer->request;
+ r->state->ready_handler(task, r, peer);
+ return;
+ }
+
+ nxt_conn_write(engine, c);
+}
+
+
+static void
+nxt_h1p_peer_header_read(nxt_task_t *task, nxt_http_peer_t *peer)
+{
+ nxt_conn_t *c;
+
+ nxt_debug(task, "h1p peer header read");
+
+ c = peer->proto.h1->conn;
+
+ if (c->write_timer.enabled) {
+ c->read_state = &nxt_h1p_peer_header_read_state;
+
+ } else {
+ c->read_state = &nxt_h1p_peer_header_read_timer_state;
+ }
+
+ nxt_conn_read(task->thread->engine, c);
+}
+
+
+static const nxt_conn_state_t nxt_h1p_peer_header_read_state
+ nxt_aligned(64) =
+{
+ .ready_handler = nxt_h1p_peer_header_read_done,
+ .close_handler = nxt_h1p_peer_closed,
+ .error_handler = nxt_h1p_peer_error,
+
+ .io_read_handler = nxt_h1p_peer_io_read_handler,
+};
+
+
+static const nxt_conn_state_t nxt_h1p_peer_header_read_timer_state
+ nxt_aligned(64) =
+{
+ .ready_handler = nxt_h1p_peer_header_read_done,
+ .close_handler = nxt_h1p_peer_closed,
+ .error_handler = nxt_h1p_peer_error,
+
+ .io_read_handler = nxt_h1p_peer_io_read_handler,
+
+ .timer_handler = nxt_h1p_peer_read_timeout,
+ .timer_value = nxt_h1p_peer_timer_value,
+ .timer_data = offsetof(nxt_socket_conf_t, proxy_timeout),
+};
+
+
+static ssize_t
+nxt_h1p_peer_io_read_handler(nxt_task_t *task, nxt_conn_t *c)
+{
+ size_t size;
+ ssize_t n;
+ nxt_buf_t *b;
+ nxt_http_peer_t *peer;
+ nxt_socket_conf_t *skcf;
+ nxt_http_request_t *r;
+
+ peer = c->socket.data;
+ r = peer->request;
+ b = c->read;
+
+ if (b == NULL) {
+ skcf = r->conf->socket_conf;
+
+ size = (peer->header_received) ? skcf->proxy_buffer_size
+ : skcf->proxy_header_buffer_size;
+
+ nxt_debug(task, "h1p peer io read: %z", size);
+
+ b = nxt_http_proxy_buf_mem_alloc(task, r, size);
+ if (nxt_slow_path(b == NULL)) {
+ c->socket.error = NXT_ENOMEM;
+ return NXT_ERROR;
+ }
+ }
+
+ n = c->io->recvbuf(c, b);
+
+ if (n > 0) {
+ c->read = b;
+
+ } else {
+ c->read = NULL;
+ nxt_http_proxy_buf_mem_free(task, r, b);
+ }
+
+ return n;
+}
+
+
+static void
+nxt_h1p_peer_header_read_done(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_int_t ret;
+ nxt_buf_t *b;
+ nxt_conn_t *c;
+ nxt_http_peer_t *peer;
+ nxt_http_request_t *r;
+ nxt_event_engine_t *engine;
+
+ c = obj;
+ peer = data;
+
+ nxt_debug(task, "h1p peer header read done");
+
+ b = c->read;
+
+ ret = nxt_h1p_peer_header_parse(peer, &b->mem);
+
+ r = peer->request;
+
+ ret = nxt_expect(NXT_DONE, ret);
+
+ if (ret != NXT_AGAIN) {
+ engine = task->thread->engine;
+ nxt_timer_disable(engine, &c->write_timer);
+ nxt_timer_disable(engine, &c->read_timer);
+ }
+
+ switch (ret) {
+
+ case NXT_DONE:
+ peer->fields = peer->proto.h1->parser.fields;
+
+ ret = nxt_http_fields_process(peer->fields,
+ &nxt_h1p_peer_fields_hash, r);
+ if (nxt_slow_path(ret != NXT_OK)) {
+ peer->status = NXT_HTTP_INTERNAL_SERVER_ERROR;
+ break;
+ }
+
+ c->read = NULL;
+
+ if (nxt_buf_mem_used_size(&b->mem) != 0) {
+ peer->body = b;
+ }
+
+ peer->header_received = 1;
+
+ r->state->ready_handler(task, r, peer);
+ return;
+
+ case NXT_AGAIN:
+ if (nxt_buf_mem_free_size(&b->mem) != 0) {
+ nxt_conn_read(task->thread->engine, c);
+ return;
+ }
+
+ /* Fall through. */
+
+ default:
+ case NXT_ERROR:
+ case NXT_HTTP_PARSE_INVALID:
+ case NXT_HTTP_PARSE_UNSUPPORTED_VERSION:
+ case NXT_HTTP_PARSE_TOO_LARGE_FIELD:
+ peer->status = NXT_HTTP_BAD_GATEWAY;
+ break;
+ }
+
+ nxt_http_proxy_buf_mem_free(task, r, b);
+
+ r->state->error_handler(task, r, peer);
+}
+
+
+static nxt_int_t
+nxt_h1p_peer_header_parse(nxt_http_peer_t *peer, nxt_buf_mem_t *bm)
+{
+ u_char *p;
+ size_t length;
+ nxt_int_t status;
+
+ if (peer->status < 0) {
+ length = nxt_buf_mem_used_size(bm);
+
+ if (nxt_slow_path(length < 12)) {
+ return NXT_AGAIN;
+ }
+
+ p = bm->pos;
+
+ if (nxt_slow_path(nxt_memcmp(p, "HTTP/1.", 7) != 0
+ || (p[7] != '0' && p[7] != '1')))
+ {
+ return NXT_ERROR;
+ }
+
+ status = nxt_int_parse(&p[9], 3);
+
+ if (nxt_slow_path(status < 0)) {
+ return NXT_ERROR;
+ }
+
+ p += 12;
+ length -= 12;
+
+ p = nxt_memchr(p, '\n', length);
+
+ if (nxt_slow_path(p == NULL)) {
+ return NXT_AGAIN;
+ }
+
+ bm->pos = p + 1;
+ peer->status = status;
+ }
+
+ return nxt_http_parse_fields(&peer->proto.h1->parser, bm);
+}
+
+
+static void
+nxt_h1p_peer_read(nxt_task_t *task, nxt_http_peer_t *peer)
+{
+ nxt_conn_t *c;
+
+ nxt_debug(task, "h1p peer read");
+
+ c = peer->proto.h1->conn;
+ c->read_state = &nxt_h1p_peer_read_state;
+
+ nxt_conn_read(task->thread->engine, c);
+}
+
+
+static const nxt_conn_state_t nxt_h1p_peer_read_state
+ nxt_aligned(64) =
+{
+ .ready_handler = nxt_h1p_peer_read_done,
+ .close_handler = nxt_h1p_peer_closed,
+ .error_handler = nxt_h1p_peer_error,
+
+ .io_read_handler = nxt_h1p_peer_io_read_handler,
+
+ .timer_handler = nxt_h1p_peer_read_timeout,
+ .timer_value = nxt_h1p_peer_timer_value,
+ .timer_data = offsetof(nxt_socket_conf_t, proxy_read_timeout),
+ .timer_autoreset = 1,
+};
+
+
+static void
+nxt_h1p_peer_read_done(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_conn_t *c;
+ nxt_http_peer_t *peer;
+ nxt_http_request_t *r;
+
+ c = obj;
+ peer = data;
+
+ nxt_debug(task, "h1p peer read done");
+
+ peer->body = c->read;
+ c->read = NULL;
+
+ r = peer->request;
+ r->state->ready_handler(task, r, peer);
+}
+
+
+static void
+nxt_h1p_peer_closed(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_http_peer_t *peer;
+ nxt_http_request_t *r;
+
+ peer = data;
+
+ nxt_debug(task, "h1p peer closed");
+
+ r = peer->request;
+
+ if (peer->header_received) {
+ peer->body = nxt_http_buf_last(r);
+
+ peer->closed = 1;
+
+ r->state->ready_handler(task, r, peer);
+
+ } else {
+ peer->status = NXT_HTTP_BAD_GATEWAY;
+
+ r->state->error_handler(task, r, peer);
+ }
+}
+
+
+static void
+nxt_h1p_peer_error(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_http_peer_t *peer;
+ nxt_http_request_t *r;
+
+ peer = data;
+
+ nxt_debug(task, "h1p peer error");
+
+ peer->status = NXT_HTTP_BAD_GATEWAY;
+
+ r = peer->request;
+ r->state->error_handler(task, r, peer);
+}
+
+
+static void
+nxt_h1p_peer_send_timeout(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_conn_t *c;
+ nxt_timer_t *timer;
+ nxt_http_peer_t *peer;
+ nxt_http_request_t *r;
+
+ timer = obj;
+
+ nxt_debug(task, "h1p peer send timeout");
+
+ c = nxt_write_timer_conn(timer);
+ c->block_write = 1;
+ c->block_read = 1;
+
+ peer = c->socket.data;
+ peer->status = NXT_HTTP_GATEWAY_TIMEOUT;
+
+ r = peer->request;
+ r->state->error_handler(task, r, peer);
+}
+
+
+static void
+nxt_h1p_peer_read_timeout(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_conn_t *c;
+ nxt_timer_t *timer;
+ nxt_http_peer_t *peer;
+ nxt_http_request_t *r;
+
+ timer = obj;
+
+ nxt_debug(task, "h1p peer read timeout");
+
+ c = nxt_read_timer_conn(timer);
+ c->block_write = 1;
+ c->block_read = 1;
+
+ peer = c->socket.data;
+ peer->status = NXT_HTTP_GATEWAY_TIMEOUT;
+
+ r = peer->request;
+ r->state->error_handler(task, r, peer);
+}
+
+
+static nxt_msec_t
+nxt_h1p_peer_timer_value(nxt_conn_t *c, uintptr_t data)
+{
+ nxt_http_peer_t *peer;
+
+ peer = c->socket.data;
+
+ return nxt_value_at(nxt_msec_t, peer->request->conf->socket_conf, data);
+}
+
+
+static void
+nxt_h1p_peer_close(nxt_task_t *task, nxt_http_peer_t *peer)
+{
+ nxt_conn_t *c;
+
+ nxt_debug(task, "h1p peer close");
+
+ peer->closed = 1;
+
+ c = peer->proto.h1->conn;
+ task = &c->task;
+ c->socket.task = task;
+ c->read_timer.task = task;
+ c->write_timer.task = task;
+
+ if (c->socket.fd != -1) {
+ c->write_state = &nxt_h1p_peer_close_state;
+
+ nxt_conn_close(task->thread->engine, c);
+
+ } else {
+ nxt_h1p_peer_free(task, c, NULL);
+ }
+}
+
+
+static const nxt_conn_state_t nxt_h1p_peer_close_state
+ nxt_aligned(64) =
+{
+ .ready_handler = nxt_h1p_peer_free,
+};
+
+
+static void
+nxt_h1p_peer_free(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_conn_t *c;
+
+ c = obj;
+
+ nxt_debug(task, "h1p peer free");
+
+ nxt_conn_free(task, c);
+}
diff --git a/src/nxt_h1proto.h b/src/nxt_h1proto.h
index c6d3bd53..61da6770 100644
--- a/src/nxt_h1proto.h
+++ b/src/nxt_h1proto.h
@@ -20,6 +20,8 @@ struct nxt_h1proto_s {
nxt_http_request_parse_t parser;
uint8_t nbuffers;
+ uint8_t header_buffer_slot;
+ uint8_t large_buffer_slot;
uint8_t keepalive; /* 1 bit */
uint8_t chunked; /* 1 bit */
uint8_t websocket; /* 1 bit */
diff --git a/src/nxt_h1proto_websocket.c b/src/nxt_h1proto_websocket.c
index 13754be0..c9ff899c 100644
--- a/src/nxt_h1proto_websocket.c
+++ b/src/nxt_h1proto_websocket.c
@@ -26,7 +26,7 @@ static void nxt_h1p_conn_ws_keepalive_enable(nxt_task_t *task,
static void nxt_h1p_conn_ws_frame_process(nxt_task_t *task, nxt_conn_t *c,
nxt_h1proto_t *h1p, nxt_websocket_header_t *wsh);
static void nxt_h1p_conn_ws_error(nxt_task_t *task, void *obj, void *data);
-static ssize_t nxt_h1p_ws_io_read_handler(nxt_conn_t *c);
+static ssize_t nxt_h1p_ws_io_read_handler(nxt_task_t *task, nxt_conn_t *c);
static void nxt_h1p_conn_ws_timeout(nxt_task_t *task, void *obj, void *data);
static void nxt_h1p_conn_ws_frame_payload_read(nxt_task_t *task, void *obj,
void *data);
@@ -473,7 +473,7 @@ nxt_h1p_conn_ws_error(nxt_task_t *task, void *obj, void *data)
static ssize_t
-nxt_h1p_ws_io_read_handler(nxt_conn_t *c)
+nxt_h1p_ws_io_read_handler(nxt_task_t *task, nxt_conn_t *c)
{
size_t size;
ssize_t n;
@@ -697,6 +697,7 @@ nxt_h1p_conn_ws_pong(nxt_task_t *task, void *obj, void *data)
for (i = 0; i < payload_len; i++) {
while (nxt_buf_mem_used_size(&b->mem) == 0) {
next = b->next;
+ b->next = NULL;
nxt_work_queue_add(&task->thread->engine->fast_work_queue,
b->completion_handler, task, b, b->parent);
diff --git a/src/nxt_http.h b/src/nxt_http.h
index 560b7310..030d77a7 100644
--- a/src/nxt_http.h
+++ b/src/nxt_http.h
@@ -9,6 +9,7 @@
typedef enum {
+ NXT_HTTP_UNSET = -1,
NXT_HTTP_INVALID = 0,
NXT_HTTP_CONTINUE = 100,
@@ -105,6 +106,21 @@ typedef struct {
} nxt_http_response_t;
+typedef struct {
+ nxt_http_proto_t proto;
+ nxt_http_request_t *request;
+ nxt_sockaddr_t *sockaddr;
+ nxt_list_t *fields;
+ nxt_buf_t *body;
+ nxt_off_t remainder;
+
+ nxt_http_status_t status:16;
+ nxt_http_protocol_t protocol:8; /* 2 bits */
+ uint8_t header_received; /* 1 bit */
+ uint8_t closed; /* 1 bit */
+} nxt_http_peer_t;
+
+
struct nxt_http_request_s {
nxt_http_proto_t proto;
nxt_socket_conf_joint_t *conf;
@@ -137,12 +153,14 @@ struct nxt_http_request_s {
nxt_sockaddr_t *remote;
nxt_sockaddr_t *local;
void *tls;
+ nxt_task_t task;
nxt_timer_t timer;
void *timer_data;
void *req_rpc_data;
+ nxt_http_peer_t *peer;
nxt_buf_t *last;
nxt_http_response_t resp;
@@ -153,20 +171,23 @@ struct nxt_http_request_s {
nxt_http_protocol_t protocol:8; /* 2 bits */
uint8_t logged; /* 1 bit */
uint8_t header_sent; /* 1 bit */
+ uint8_t inconsistent; /* 1 bit */
uint8_t error; /* 1 bit */
uint8_t websocket_handshake; /* 1 bit */
};
typedef struct nxt_http_route_s nxt_http_route_t;
+typedef struct nxt_http_upstream_s nxt_http_upstream_t;
-struct nxt_http_pass_s {
- nxt_http_pass_t *(*handler)(nxt_task_t *task,
+struct nxt_http_action_s {
+ nxt_http_action_t *(*handler)(nxt_task_t *task,
nxt_http_request_t *r,
- nxt_http_pass_t *pass);
+ nxt_http_action_t *action);
union {
nxt_http_route_t *route;
+ nxt_http_upstream_t *upstream;
nxt_app_t *application;
} u;
@@ -178,12 +199,19 @@ typedef struct {
void (*body_read)(nxt_task_t *task, nxt_http_request_t *r);
void (*local_addr)(nxt_task_t *task, nxt_http_request_t *r);
void (*header_send)(nxt_task_t *task, nxt_http_request_t *r,
- nxt_work_handler_t body_handler);
+ nxt_work_handler_t body_handler, void *data);
void (*send)(nxt_task_t *task, nxt_http_request_t *r, nxt_buf_t *out);
nxt_off_t (*body_bytes_sent)(nxt_task_t *task, nxt_http_proto_t proto);
void (*discard)(nxt_task_t *task, nxt_http_request_t *r, nxt_buf_t *last);
void (*close)(nxt_task_t *task, nxt_http_proto_t proto,
nxt_socket_conf_joint_t *joint);
+
+ void (*peer_connect)(nxt_task_t *task, nxt_http_peer_t *peer);
+ void (*peer_header_send)(nxt_task_t *task, nxt_http_peer_t *peer);
+ void (*peer_header_read)(nxt_task_t *task, nxt_http_peer_t *peer);
+ void (*peer_read)(nxt_task_t *task, nxt_http_peer_t *peer);
+ void (*peer_close)(nxt_task_t *task, nxt_http_peer_t *peer);
+
void (*ws_frame_start)(nxt_task_t *task, nxt_http_request_t *r,
nxt_buf_t *ws_frame);
} nxt_http_proto_table_t;
@@ -218,7 +246,7 @@ void nxt_http_request_error(nxt_task_t *task, nxt_http_request_t *r,
nxt_http_status_t status);
void nxt_http_request_read_body(nxt_task_t *task, nxt_http_request_t *r);
void nxt_http_request_header_send(nxt_task_t *task, nxt_http_request_t *r,
- nxt_work_handler_t body_handler);
+ nxt_work_handler_t body_handler, void *data);
void nxt_http_request_ws_frame_start(nxt_task_t *task, nxt_http_request_t *r,
nxt_buf_t *ws_frame);
void nxt_http_request_send(nxt_task_t *task, nxt_http_request_t *r,
@@ -238,24 +266,36 @@ nxt_int_t nxt_http_request_content_length(void *ctx, nxt_http_field_t *field,
nxt_http_routes_t *nxt_http_routes_create(nxt_task_t *task,
nxt_router_temp_conf_t *tmcf, nxt_conf_value_t *routes_conf);
-nxt_http_pass_t *nxt_http_pass_create(nxt_task_t *task,
+nxt_http_action_t *nxt_http_action_create(nxt_task_t *task,
nxt_router_temp_conf_t *tmcf, nxt_str_t *name);
void nxt_http_routes_resolve(nxt_task_t *task, nxt_router_temp_conf_t *tmcf);
-nxt_http_pass_t *nxt_http_pass_application(nxt_task_t *task,
+nxt_http_action_t *nxt_http_pass_application(nxt_task_t *task,
nxt_router_temp_conf_t *tmcf, nxt_str_t *name);
void nxt_http_routes_cleanup(nxt_task_t *task, nxt_http_routes_t *routes);
-void nxt_http_pass_cleanup(nxt_task_t *task, nxt_http_pass_t *pass);
+void nxt_http_action_cleanup(nxt_task_t *task, nxt_http_action_t *action);
-nxt_http_pass_t *nxt_http_static_handler(nxt_task_t *task,
- nxt_http_request_t *r, nxt_http_pass_t *pass);
+nxt_http_action_t *nxt_http_static_handler(nxt_task_t *task,
+ nxt_http_request_t *r, nxt_http_action_t *action);
nxt_int_t nxt_http_static_mtypes_init(nxt_mp_t *mp, nxt_lvlhsh_t *hash);
nxt_int_t nxt_http_static_mtypes_hash_add(nxt_mp_t *mp, nxt_lvlhsh_t *hash,
nxt_str_t *extension, nxt_str_t *type);
nxt_str_t *nxt_http_static_mtypes_hash_find(nxt_lvlhsh_t *hash,
nxt_str_t *extension);
-nxt_http_pass_t *nxt_http_request_application(nxt_task_t *task,
- nxt_http_request_t *r, nxt_http_pass_t *pass);
+nxt_http_action_t *nxt_http_application_handler(nxt_task_t *task,
+ nxt_http_request_t *r, nxt_http_action_t *action);
+
+nxt_int_t nxt_http_proxy_create(nxt_mp_t *mp, nxt_http_action_t *action);
+nxt_int_t nxt_http_proxy_date(void *ctx, nxt_http_field_t *field,
+ uintptr_t data);
+nxt_int_t nxt_http_proxy_content_length(void *ctx, nxt_http_field_t *field,
+ uintptr_t data);
+nxt_int_t nxt_http_proxy_skip(void *ctx, nxt_http_field_t *field,
+ uintptr_t data);
+nxt_buf_t *nxt_http_proxy_buf_mem_alloc(nxt_task_t *task, nxt_http_request_t *r,
+ size_t size);
+void nxt_http_proxy_buf_mem_free(nxt_task_t *task, nxt_http_request_t *r,
+ nxt_buf_t *b);
extern nxt_time_string_t nxt_http_date_cache;
diff --git a/src/nxt_http_error.c b/src/nxt_http_error.c
index 8e8b80f1..370b12db 100644
--- a/src/nxt_http_error.c
+++ b/src/nxt_http_error.c
@@ -57,8 +57,8 @@ nxt_http_request_error(nxt_task_t *task, nxt_http_request_t *r,
r->state = &nxt_http_request_send_error_body_state;
- nxt_http_request_header_send(task, r, nxt_http_request_send_error_body);
-
+ nxt_http_request_header_send(task, r,
+ nxt_http_request_send_error_body, NULL);
return;
fail:
diff --git a/src/nxt_http_parse.c b/src/nxt_http_parse.c
index e6e91454..4c5d4936 100644
--- a/src/nxt_http_parse.c
+++ b/src/nxt_http_parse.c
@@ -787,6 +787,7 @@ nxt_http_parse_field_end(nxt_http_request_parse_t *rp, u_char **pos,
field->hash = nxt_http_field_hash_end(rp->field_hash);
field->skip = 0;
+ field->hopbyhop = 0;
field->name_length = rp->field_name.length;
field->value_length = rp->field_value.length;
diff --git a/src/nxt_http_parse.h b/src/nxt_http_parse.h
index d7ce5e4f..d319c71d 100644
--- a/src/nxt_http_parse.h
+++ b/src/nxt_http_parse.h
@@ -81,7 +81,8 @@ typedef struct {
struct nxt_http_field_s {
uint16_t hash;
- uint8_t skip; /* 1 bit */
+ uint8_t skip:1;
+ uint8_t hopbyhop:1;
uint8_t name_length;
uint32_t value_length;
u_char *name;
diff --git a/src/nxt_http_proxy.c b/src/nxt_http_proxy.c
new file mode 100644
index 00000000..7f4eeff2
--- /dev/null
+++ b/src/nxt_http_proxy.c
@@ -0,0 +1,403 @@
+
+/*
+ * Copyright (C) Igor Sysoev
+ * Copyright (C) NGINX, Inc.
+ */
+
+#include <nxt_router.h>
+#include <nxt_http.h>
+
+
+typedef void (*nxt_http_upstream_connect_t)(nxt_task_t *task,
+ nxt_http_upstream_t *upstream, nxt_http_peer_t *peer);
+
+
+struct nxt_http_upstream_s {
+ uint32_t current;
+ uint32_t n;
+ uint8_t protocol;
+ nxt_http_upstream_connect_t connect;
+ nxt_sockaddr_t *sockaddr[1];
+};
+
+
+static void nxt_http_upstream_connect(nxt_task_t *task,
+ nxt_http_upstream_t *upstream, nxt_http_peer_t *peer);
+static nxt_http_action_t *nxt_http_proxy_handler(nxt_task_t *task,
+ nxt_http_request_t *r, nxt_http_action_t *action);
+static void nxt_http_proxy_header_send(nxt_task_t *task, void *obj, void *data);
+static void nxt_http_proxy_header_sent(nxt_task_t *task, void *obj, void *data);
+static void nxt_http_proxy_header_read(nxt_task_t *task, void *obj, void *data);
+static void nxt_http_proxy_send_body(nxt_task_t *task, void *obj, void *data);
+static void nxt_http_proxy_request_send(nxt_task_t *task,
+ nxt_http_request_t *r, nxt_buf_t *out);
+static void nxt_http_proxy_read(nxt_task_t *task, void *obj, void *data);
+static void nxt_http_proxy_buf_mem_completion(nxt_task_t *task, void *obj,
+ void *data);
+static void nxt_http_proxy_error(nxt_task_t *task, void *obj, void *data);
+
+
+static const nxt_http_request_state_t nxt_http_proxy_header_send_state;
+static const nxt_http_request_state_t nxt_http_proxy_header_sent_state;
+static const nxt_http_request_state_t nxt_http_proxy_header_read_state;
+static const nxt_http_request_state_t nxt_http_proxy_read_state;
+
+
+nxt_int_t
+nxt_http_proxy_create(nxt_mp_t *mp, nxt_http_action_t *action)
+{
+ nxt_str_t name;
+ nxt_sockaddr_t *sa;
+ nxt_http_upstream_t *upstream;
+
+ sa = NULL;
+ name = action->name;
+
+ if (nxt_str_start(&name, "http://", 7)) {
+ name.length -= 7;
+ name.start += 7;
+
+ sa = nxt_sockaddr_parse(mp, &name);
+ if (nxt_slow_path(sa == NULL)) {
+ return NXT_ERROR;
+ }
+
+ sa->type = SOCK_STREAM;
+ }
+
+ if (sa != NULL) {
+ upstream = nxt_mp_alloc(mp, sizeof(nxt_http_upstream_t));
+ if (nxt_slow_path(upstream == NULL)) {
+ return NXT_ERROR;
+ }
+
+ upstream->current = 0;
+ upstream->n = 1;
+ upstream->protocol = NXT_HTTP_PROTO_H1;
+ upstream->connect = nxt_http_upstream_connect;
+ upstream->sockaddr[0] = sa;
+
+ action->u.upstream = upstream;
+ action->handler = nxt_http_proxy_handler;
+ }
+
+ return NXT_OK;
+}
+
+
+static nxt_http_action_t *
+nxt_http_proxy_handler(nxt_task_t *task, nxt_http_request_t *r,
+ nxt_http_action_t *action)
+{
+ nxt_http_peer_t *peer;
+
+ peer = nxt_mp_zalloc(r->mem_pool, sizeof(nxt_http_peer_t));
+ if (nxt_slow_path(peer == NULL)) {
+ nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
+ return NULL;
+ }
+
+ peer->request = r;
+ r->peer = peer;
+
+ nxt_mp_retain(r->mem_pool);
+
+ action->u.upstream->connect(task, action->u.upstream, peer);
+
+ return NULL;
+}
+
+
+static void
+nxt_http_upstream_connect(nxt_task_t *task, nxt_http_upstream_t *upstream,
+ nxt_http_peer_t *peer)
+{
+ peer->protocol = upstream->protocol;
+ peer->sockaddr = upstream->sockaddr[0];
+
+ peer->request->state = &nxt_http_proxy_header_send_state;
+
+ nxt_http_proto[peer->protocol].peer_connect(task, peer);
+}
+
+
+static const nxt_http_request_state_t nxt_http_proxy_header_send_state
+ nxt_aligned(64) =
+{
+ .ready_handler = nxt_http_proxy_header_send,
+ .error_handler = nxt_http_proxy_error,
+};
+
+
+static void
+nxt_http_proxy_header_send(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_http_peer_t *peer;
+ nxt_http_request_t *r;
+
+ r = obj;
+ peer = data;
+ r->state = &nxt_http_proxy_header_sent_state;
+
+ nxt_http_proto[peer->protocol].peer_header_send(task, peer);
+}
+
+
+static const nxt_http_request_state_t nxt_http_proxy_header_sent_state
+ nxt_aligned(64) =
+{
+ .ready_handler = nxt_http_proxy_header_sent,
+ .error_handler = nxt_http_proxy_error,
+};
+
+
+static void
+nxt_http_proxy_header_sent(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_http_peer_t *peer;
+ nxt_http_request_t *r;
+
+ r = obj;
+ peer = data;
+ r->state = &nxt_http_proxy_header_read_state;
+
+ nxt_http_proto[peer->protocol].peer_header_read(task, peer);
+}
+
+
+static const nxt_http_request_state_t nxt_http_proxy_header_read_state
+ nxt_aligned(64) =
+{
+ .ready_handler = nxt_http_proxy_header_read,
+ .error_handler = nxt_http_proxy_error,
+};
+
+
+static void
+nxt_http_proxy_header_read(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_http_peer_t *peer;
+ nxt_http_field_t *f, *field;
+ nxt_http_request_t *r;
+
+ r = obj;
+ peer = data;
+
+ r->status = peer->status;
+
+ nxt_debug(task, "http proxy status: %d", peer->status);
+
+ if (r->resp.content_length_n > 0) {
+ peer->remainder = r->resp.content_length_n;
+ }
+
+ nxt_list_each(field, peer->fields) {
+
+ nxt_debug(task, "http proxy header: \"%*s: %*s\"",
+ (size_t) field->name_length, field->name,
+ (size_t) field->value_length, field->value);
+
+ if (!field->skip) {
+ f = nxt_list_add(r->resp.fields);
+ if (nxt_slow_path(f == NULL)) {
+ nxt_http_proxy_error(task, r, peer);
+ return;
+ }
+
+ *f = *field;
+ }
+
+ } nxt_list_loop;
+
+ nxt_http_request_header_send(task, r, nxt_http_proxy_send_body, peer);
+}
+
+
+static void
+nxt_http_proxy_send_body(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_buf_t *out;
+ nxt_http_peer_t *peer;
+ nxt_http_request_t *r;
+
+ r = obj;
+ peer = data;
+ out = peer->body;
+
+ if (out != NULL) {
+ peer->body = NULL;
+ nxt_http_proxy_request_send(task, r, out);
+ }
+
+ r->state = &nxt_http_proxy_read_state;
+
+ nxt_http_proto[peer->protocol].peer_read(task, peer);
+}
+
+
+static void
+nxt_http_proxy_request_send(nxt_task_t *task, nxt_http_request_t *r,
+ nxt_buf_t *out)
+{
+ size_t length;
+
+ if (r->peer->remainder > 0) {
+ length = nxt_buf_chain_length(out);
+ r->peer->remainder -= length;
+ }
+
+ nxt_http_request_send(task, r, out);
+}
+
+
+static const nxt_http_request_state_t nxt_http_proxy_read_state
+ nxt_aligned(64) =
+{
+ .ready_handler = nxt_http_proxy_read,
+ .error_handler = nxt_http_proxy_error,
+};
+
+
+static void
+nxt_http_proxy_read(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_buf_t *out;
+ nxt_bool_t last;
+ nxt_http_peer_t *peer;
+ nxt_http_request_t *r;
+
+ r = obj;
+ peer = data;
+ out = peer->body;
+ peer->body = NULL;
+ last = nxt_buf_is_last(out);
+
+ nxt_http_proxy_request_send(task, r, out);
+
+ if (!last) {
+ nxt_http_proto[peer->protocol].peer_read(task, peer);
+
+ } else {
+ r->inconsistent = (peer->remainder != 0);
+
+ nxt_http_proto[peer->protocol].peer_close(task, peer);
+
+ nxt_mp_release(r->mem_pool);
+ }
+}
+
+
+nxt_buf_t *
+nxt_http_proxy_buf_mem_alloc(nxt_task_t *task, nxt_http_request_t *r,
+ size_t size)
+{
+ nxt_buf_t *b;
+
+ b = nxt_event_engine_buf_mem_alloc(task->thread->engine, size);
+ if (nxt_fast_path(b != NULL)) {
+ b->completion_handler = nxt_http_proxy_buf_mem_completion;
+ b->parent = r;
+ nxt_mp_retain(r->mem_pool);
+
+ } else {
+ nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
+ }
+
+ return b;
+}
+
+
+static void
+nxt_http_proxy_buf_mem_completion(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_buf_t *b, *next;
+ nxt_http_peer_t *peer;
+ nxt_http_request_t *r;
+
+ b = obj;
+ r = data;
+
+ peer = r->peer;
+
+ do {
+ next = b->next;
+
+ nxt_http_proxy_buf_mem_free(task, r, b);
+
+ b = next;
+ } while (b != NULL);
+
+ if (!peer->closed) {
+ nxt_http_proto[peer->protocol].peer_read(task, peer);
+ }
+}
+
+
+void
+nxt_http_proxy_buf_mem_free(nxt_task_t *task, nxt_http_request_t *r,
+ nxt_buf_t *b)
+{
+ nxt_event_engine_buf_mem_free(task->thread->engine, b);
+
+ nxt_mp_release(r->mem_pool);
+}
+
+
+static void
+nxt_http_proxy_error(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_http_peer_t *peer;
+ nxt_http_request_t *r;
+
+ r = obj;
+ peer = r->peer;
+
+ nxt_http_proto[peer->protocol].peer_close(task, peer);
+
+ nxt_mp_release(r->mem_pool);
+
+ nxt_http_request_error(task, r, peer->status);
+}
+
+
+nxt_int_t
+nxt_http_proxy_date(void *ctx, nxt_http_field_t *field, uintptr_t data)
+{
+ nxt_http_request_t *r;
+
+ r = ctx;
+
+ r->resp.date = field;
+
+ return NXT_OK;
+}
+
+
+nxt_int_t
+nxt_http_proxy_content_length(void *ctx, nxt_http_field_t *field,
+ uintptr_t data)
+{
+ nxt_off_t n;
+ nxt_http_request_t *r;
+
+ r = ctx;
+
+ r->resp.content_length = field;
+
+ n = nxt_off_t_parse(field->value, field->value_length);
+
+ if (nxt_fast_path(n >= 0)) {
+ r->resp.content_length_n = n;
+ }
+
+ return NXT_OK;
+}
+
+
+nxt_int_t
+nxt_http_proxy_skip(void *ctx, nxt_http_field_t *field, uintptr_t data)
+{
+ field->skip = 1;
+
+ return NXT_OK;
+}
diff --git a/src/nxt_http_request.c b/src/nxt_http_request.c
index a18a02e7..14c75dab 100644
--- a/src/nxt_http_request.c
+++ b/src/nxt_http_request.c
@@ -10,7 +10,7 @@
static nxt_int_t nxt_http_validate_host(nxt_str_t *host, nxt_mp_t *mp);
static void nxt_http_request_start(nxt_task_t *task, void *obj, void *data);
-static void nxt_http_request_pass(nxt_task_t *task, void *obj, void *data);
+static void nxt_http_request_action(nxt_task_t *task, void *obj, void *data);
static void nxt_http_request_proto_info(nxt_task_t *task,
nxt_http_request_t *r);
static void nxt_http_request_mem_buf_completion(nxt_task_t *task, void *obj,
@@ -278,33 +278,33 @@ nxt_http_request_start(nxt_task_t *task, void *obj, void *data)
static const nxt_http_request_state_t nxt_http_request_body_state
nxt_aligned(64) =
{
- .ready_handler = nxt_http_request_pass,
+ .ready_handler = nxt_http_request_action,
.error_handler = nxt_http_request_close_handler,
};
static void
-nxt_http_request_pass(nxt_task_t *task, void *obj, void *data)
+nxt_http_request_action(nxt_task_t *task, void *obj, void *data)
{
- nxt_http_pass_t *pass;
+ nxt_http_action_t *action;
nxt_http_request_t *r;
r = obj;
- pass = r->conf->socket_conf->pass;
+ action = r->conf->socket_conf->action;
- if (nxt_fast_path(pass != NULL)) {
+ if (nxt_fast_path(action != NULL)) {
do {
- nxt_debug(task, "http request route: %V", &pass->name);
+ nxt_debug(task, "http request route: %V", &action->name);
- pass = pass->handler(task, r, pass);
+ action = action->handler(task, r, action);
- if (pass == NULL) {
+ if (action == NULL) {
return;
}
- if (pass == NXT_HTTP_PASS_ERROR) {
+ if (action == NXT_HTTP_ACTION_ERROR) {
break;
}
@@ -315,13 +315,13 @@ nxt_http_request_pass(nxt_task_t *task, void *obj, void *data)
}
-nxt_http_pass_t *
-nxt_http_request_application(nxt_task_t *task, nxt_http_request_t *r,
- nxt_http_pass_t *pass)
+nxt_http_action_t *
+nxt_http_application_handler(nxt_task_t *task, nxt_http_request_t *r,
+ nxt_http_action_t *action)
{
nxt_event_engine_t *engine;
- nxt_debug(task, "http request application");
+ nxt_debug(task, "http application handler");
nxt_mp_retain(r->mem_pool);
@@ -344,7 +344,7 @@ nxt_http_request_application(nxt_task_t *task, nxt_http_request_t *r,
nxt_str_set(&r->server_name, "localhost");
}
- nxt_router_process_http_request(task, r, pass->u.application);
+ nxt_router_process_http_request(task, r, action->u.application);
return NULL;
}
@@ -370,7 +370,7 @@ nxt_http_request_read_body(nxt_task_t *task, nxt_http_request_t *r)
void
nxt_http_request_header_send(nxt_task_t *task, nxt_http_request_t *r,
- nxt_work_handler_t body_handler)
+ nxt_work_handler_t body_handler, void *data)
{
u_char *p, *end;
nxt_http_field_t *server, *date, *content_length;
@@ -431,7 +431,7 @@ nxt_http_request_header_send(nxt_task_t *task, nxt_http_request_t *r,
}
if (nxt_fast_path(r->proto.any != NULL)) {
- nxt_http_proto[r->protocol].header_send(task, r, body_handler);
+ nxt_http_proto[r->protocol].header_send(task, r, body_handler, data);
}
return;
@@ -483,15 +483,20 @@ nxt_http_buf_mem(nxt_task_t *task, nxt_http_request_t *r, size_t size)
static void
nxt_http_request_mem_buf_completion(nxt_task_t *task, void *obj, void *data)
{
- nxt_buf_t *b;
+ nxt_buf_t *b, *next;
nxt_http_request_t *r;
b = obj;
r = data;
- nxt_mp_free(r->mem_pool, b);
+ do {
+ next = b->next;
- nxt_mp_release(r->mem_pool);
+ nxt_mp_free(r->mem_pool, b);
+ nxt_mp_release(r->mem_pool);
+
+ b = next;
+ } while (b != NULL);
}
@@ -570,9 +575,9 @@ nxt_http_request_close_handler(nxt_task_t *task, void *obj, void *data)
if (nxt_fast_path(proto.any != NULL)) {
protocol = r->protocol;
- nxt_mp_release(r->mem_pool);
-
nxt_http_proto[protocol].close(task, proto, conf);
+
+ nxt_mp_release(r->mem_pool);
}
}
diff --git a/src/nxt_http_route.c b/src/nxt_http_route.c
index c3c11faa..18b352ea 100644
--- a/src/nxt_http_route.c
+++ b/src/nxt_http_route.c
@@ -36,6 +36,13 @@ typedef enum {
typedef struct {
+ nxt_conf_value_t *pass;
+ nxt_conf_value_t *share;
+ nxt_conf_value_t *proxy;
+} nxt_http_route_action_conf_t;
+
+
+typedef struct {
nxt_conf_value_t *host;
nxt_conf_value_t *uri;
nxt_conf_value_t *method;
@@ -119,7 +126,7 @@ typedef union {
typedef struct {
uint32_t items;
- nxt_http_pass_t pass;
+ nxt_http_action_t action;
nxt_http_route_test_t test[0];
} nxt_http_route_match_t;
@@ -152,6 +159,8 @@ static nxt_http_route_t *nxt_http_route_create(nxt_task_t *task,
nxt_router_temp_conf_t *tmcf, nxt_conf_value_t *cv);
static nxt_http_route_match_t *nxt_http_route_match_create(nxt_task_t *task,
nxt_router_temp_conf_t *tmcf, nxt_conf_value_t *cv);
+static nxt_int_t nxt_http_route_action_create(nxt_router_temp_conf_t *tmcf,
+ nxt_conf_value_t *cv, nxt_http_route_match_t *match);
static nxt_http_route_table_t *nxt_http_route_table_create(nxt_task_t *task,
nxt_mp_t *mp, nxt_conf_value_t *table_cv, nxt_http_route_object_t object,
nxt_bool_t case_sensitive);
@@ -173,15 +182,15 @@ static u_char *nxt_http_route_pattern_copy(nxt_mp_t *mp, nxt_str_t *test,
static void nxt_http_route_resolve(nxt_task_t *task,
nxt_router_temp_conf_t *tmcf, nxt_http_route_t *route);
-static void nxt_http_pass_resolve(nxt_task_t *task,
- nxt_router_temp_conf_t *tmcf, nxt_http_pass_t *pass);
+static void nxt_http_action_resolve(nxt_task_t *task,
+ nxt_router_temp_conf_t *tmcf, nxt_http_action_t *action);
static nxt_http_route_t *nxt_http_route_find(nxt_http_routes_t *routes,
nxt_str_t *name);
static void nxt_http_route_cleanup(nxt_task_t *task, nxt_http_route_t *routes);
-static nxt_http_pass_t *nxt_http_route_pass(nxt_task_t *task,
- nxt_http_request_t *r, nxt_http_pass_t *start);
-static nxt_http_pass_t *nxt_http_route_match(nxt_http_request_t *r,
+static nxt_http_action_t *nxt_http_route_handler(nxt_task_t *task,
+ nxt_http_request_t *r, nxt_http_action_t *start);
+static nxt_http_action_t *nxt_http_route_match(nxt_http_request_t *r,
nxt_http_route_match_t *match);
static nxt_int_t nxt_http_route_table(nxt_http_request_t *r,
nxt_http_route_table_t *table);
@@ -367,16 +376,13 @@ nxt_http_route_match_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
uint32_t n;
nxt_mp_t *mp;
nxt_int_t ret;
- nxt_str_t pass, *string;
- nxt_conf_value_t *match_conf, *pass_conf;
+ nxt_conf_value_t *match_conf;
nxt_http_route_test_t *test;
nxt_http_route_rule_t *rule;
nxt_http_route_table_t *table;
nxt_http_route_match_t *match;
nxt_http_route_match_conf_t mtcf;
- static nxt_str_t pass_path = nxt_string("/action/pass");
- static nxt_str_t share_path = nxt_string("/action/share");
static nxt_str_t match_path = nxt_string("/match");
match_conf = nxt_conf_get_path(cv, &match_path);
@@ -391,25 +397,12 @@ nxt_http_route_match_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
return NULL;
}
- match->pass.u.route = NULL;
- match->pass.handler = NULL;
+ match->action.u.route = NULL;
+ match->action.handler = NULL;
match->items = n;
- pass_conf = nxt_conf_get_path(cv, &pass_path);
-
- if (pass_conf == NULL) {
- pass_conf = nxt_conf_get_path(cv, &share_path);
- if (nxt_slow_path(pass_conf == NULL)) {
- return NULL;
- }
-
- match->pass.handler = nxt_http_static_handler;
- }
-
- nxt_conf_get_string(pass_conf, &pass);
-
- string = nxt_str_dup(mp, &match->pass.name, &pass);
- if (nxt_slow_path(string == NULL)) {
+ ret = nxt_http_route_action_create(tmcf, cv, match);
+ if (nxt_slow_path(ret != NXT_OK)) {
return NULL;
}
@@ -516,6 +509,78 @@ nxt_http_route_match_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
}
+static nxt_conf_map_t nxt_http_route_action_conf[] = {
+ {
+ nxt_string("pass"),
+ NXT_CONF_MAP_PTR,
+ offsetof(nxt_http_route_action_conf_t, pass)
+ },
+ {
+ nxt_string("share"),
+ NXT_CONF_MAP_PTR,
+ offsetof(nxt_http_route_action_conf_t, share)
+ },
+ {
+ nxt_string("proxy"),
+ NXT_CONF_MAP_PTR,
+ offsetof(nxt_http_route_action_conf_t, proxy)
+ },
+};
+
+
+static nxt_int_t
+nxt_http_route_action_create(nxt_router_temp_conf_t *tmcf, nxt_conf_value_t *cv,
+ nxt_http_route_match_t *match)
+{
+ nxt_mp_t *mp;
+ nxt_int_t ret;
+ nxt_str_t name, *string;
+ nxt_conf_value_t *conf, *action_conf;
+ nxt_http_route_action_conf_t accf;
+
+ static nxt_str_t action_path = nxt_string("/action");
+
+ action_conf = nxt_conf_get_path(cv, &action_path);
+ if (action_conf == NULL) {
+ return NXT_ERROR;
+ }
+
+ nxt_memzero(&accf, sizeof(accf));
+
+ ret = nxt_conf_map_object(tmcf->mem_pool,
+ action_conf, nxt_http_route_action_conf,
+ nxt_nitems(nxt_http_route_action_conf), &accf);
+ if (ret != NXT_OK) {
+ return ret;
+ }
+
+ conf = accf.pass;
+
+ if (accf.share != NULL) {
+ conf = accf.share;
+ match->action.handler = nxt_http_static_handler;
+
+ } else if (accf.proxy != NULL) {
+ conf = accf.proxy;
+ }
+
+ nxt_conf_get_string(conf, &name);
+
+ mp = tmcf->router_conf->mem_pool;
+
+ string = nxt_str_dup(mp, &match->action.name, &name);
+ if (nxt_slow_path(string == NULL)) {
+ return NXT_ERROR;
+ }
+
+ if (accf.proxy != NULL) {
+ return nxt_http_proxy_create(mp, &match->action);
+ }
+
+ return NXT_OK;
+}
+
+
static nxt_http_route_table_t *
nxt_http_route_table_create(nxt_task_t *task, nxt_mp_t *mp,
nxt_conf_value_t *table_cv, nxt_http_route_object_t object,
@@ -877,17 +942,17 @@ static void
nxt_http_route_resolve(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
nxt_http_route_t *route)
{
- nxt_http_pass_t *pass;
+ nxt_http_action_t *action;
nxt_http_route_match_t **match, **end;
match = &route->match[0];
end = match + route->items;
while (match < end) {
- pass = &(*match)->pass;
+ action = &(*match)->action;
- if (pass->handler == NULL) {
- nxt_http_pass_resolve(task, tmcf, &(*match)->pass);
+ if (action->handler == NULL) {
+ nxt_http_action_resolve(task, tmcf, &(*match)->action);
}
match++;
@@ -896,21 +961,21 @@ nxt_http_route_resolve(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
static void
-nxt_http_pass_resolve(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
- nxt_http_pass_t *pass)
+nxt_http_action_resolve(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
+ nxt_http_action_t *action)
{
nxt_str_t name;
- name = pass->name;
+ name = action->name;
if (nxt_str_start(&name, "applications/", 13)) {
name.length -= 13;
name.start += 13;
- pass->u.application = nxt_router_listener_application(tmcf, &name);
- nxt_router_app_use(task, pass->u.application, 1);
+ action->u.application = nxt_router_listener_application(tmcf, &name);
+ nxt_router_app_use(task, action->u.application, 1);
- pass->handler = nxt_http_request_application;
+ action->handler = nxt_http_application_handler;
} else if (nxt_str_start(&name, "routes", 6)) {
@@ -923,9 +988,9 @@ nxt_http_pass_resolve(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
name.start += 7;
}
- pass->u.route = nxt_http_route_find(tmcf->router_conf->routes, &name);
+ action->u.route = nxt_http_route_find(tmcf->router_conf->routes, &name);
- pass->handler = nxt_http_route_pass;
+ action->handler = nxt_http_route_handler;
}
}
@@ -950,46 +1015,49 @@ nxt_http_route_find(nxt_http_routes_t *routes, nxt_str_t *name)
}
-nxt_http_pass_t *
-nxt_http_pass_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
+nxt_http_action_t *
+nxt_http_action_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
nxt_str_t *name)
{
- nxt_http_pass_t *pass;
+ nxt_http_action_t *action;
- pass = nxt_mp_alloc(tmcf->router_conf->mem_pool, sizeof(nxt_http_pass_t));
- if (nxt_slow_path(pass == NULL)) {
+ action = nxt_mp_alloc(tmcf->router_conf->mem_pool,
+ sizeof(nxt_http_action_t));
+ if (nxt_slow_path(action == NULL)) {
return NULL;
}
- pass->name = *name;
+ action->name = *name;
+ action->handler = NULL;
- nxt_http_pass_resolve(task, tmcf, pass);
+ nxt_http_action_resolve(task, tmcf, action);
- return pass;
+ return action;
}
/* COMPATIBILITY: listener application. */
-nxt_http_pass_t *
+nxt_http_action_t *
nxt_http_pass_application(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
nxt_str_t *name)
{
- nxt_http_pass_t *pass;
+ nxt_http_action_t *action;
- pass = nxt_mp_alloc(tmcf->router_conf->mem_pool, sizeof(nxt_http_pass_t));
- if (nxt_slow_path(pass == NULL)) {
+ action = nxt_mp_alloc(tmcf->router_conf->mem_pool,
+ sizeof(nxt_http_action_t));
+ if (nxt_slow_path(action == NULL)) {
return NULL;
}
- pass->name = *name;
+ action->name = *name;
- pass->u.application = nxt_router_listener_application(tmcf, name);
- nxt_router_app_use(task, pass->u.application, 1);
+ action->u.application = nxt_router_listener_application(tmcf, name);
+ nxt_router_app_use(task, action->u.application, 1);
- pass->handler = nxt_http_request_application;
+ action->handler = nxt_http_application_handler;
- return pass;
+ return action;
}
@@ -1020,7 +1088,7 @@ nxt_http_route_cleanup(nxt_task_t *task, nxt_http_route_t *route)
end = match + route->items;
while (match < end) {
- nxt_http_pass_cleanup(task, &(*match)->pass);
+ nxt_http_action_cleanup(task, &(*match)->action);
match++;
}
@@ -1028,20 +1096,20 @@ nxt_http_route_cleanup(nxt_task_t *task, nxt_http_route_t *route)
void
-nxt_http_pass_cleanup(nxt_task_t *task, nxt_http_pass_t *pass)
+nxt_http_action_cleanup(nxt_task_t *task, nxt_http_action_t *action)
{
- if (pass->handler == nxt_http_request_application) {
- nxt_router_app_use(task, pass->u.application, -1);
+ if (action->handler == nxt_http_application_handler) {
+ nxt_router_app_use(task, action->u.application, -1);
}
}
-static nxt_http_pass_t *
-nxt_http_route_pass(nxt_task_t *task, nxt_http_request_t *r,
- nxt_http_pass_t *start)
+static nxt_http_action_t *
+nxt_http_route_handler(nxt_task_t *task, nxt_http_request_t *r,
+ nxt_http_action_t *start)
{
- nxt_http_pass_t *pass;
nxt_http_route_t *route;
+ nxt_http_action_t *action;
nxt_http_route_match_t **match, **end;
route = start->u.route;
@@ -1049,9 +1117,9 @@ nxt_http_route_pass(nxt_task_t *task, nxt_http_request_t *r,
end = match + route->items;
while (match < end) {
- pass = nxt_http_route_match(r, *match);
- if (pass != NULL) {
- return pass;
+ action = nxt_http_route_match(r, *match);
+ if (action != NULL) {
+ return action;
}
match++;
@@ -1063,7 +1131,7 @@ nxt_http_route_pass(nxt_task_t *task, nxt_http_request_t *r,
}
-static nxt_http_pass_t *
+static nxt_http_action_t *
nxt_http_route_match(nxt_http_request_t *r, nxt_http_route_match_t *match)
{
nxt_int_t ret;
@@ -1081,14 +1149,14 @@ nxt_http_route_match(nxt_http_request_t *r, nxt_http_route_match_t *match)
}
if (ret <= 0) {
- /* 0 => NULL, -1 => NXT_HTTP_PASS_ERROR. */
- return (nxt_http_pass_t *) (intptr_t) ret;
+ /* 0 => NULL, -1 => NXT_HTTP_ACTION_ERROR. */
+ return (nxt_http_action_t *) (intptr_t) ret;
}
test++;
}
- return &match->pass;
+ return &match->action;
}
diff --git a/src/nxt_http_static.c b/src/nxt_http_static.c
index 48a989cf..44132859 100644
--- a/src/nxt_http_static.c
+++ b/src/nxt_http_static.c
@@ -27,9 +27,9 @@ static void nxt_http_static_mtypes_hash_free(void *data, void *p);
static const nxt_http_request_state_t nxt_http_static_send_state;
-nxt_http_pass_t *
+nxt_http_action_t *
nxt_http_static_handler(nxt_task_t *task, nxt_http_request_t *r,
- nxt_http_pass_t *pass)
+ nxt_http_action_t *action)
{
size_t alloc, encode;
u_char *p;
@@ -76,7 +76,7 @@ nxt_http_static_handler(nxt_task_t *task, nxt_http_request_t *r,
nxt_str_null(&extension);
}
- alloc = pass->name.length + r->path->length + index.length + 1;
+ alloc = action->name.length + r->path->length + index.length + 1;
f->name = nxt_mp_nget(r->mem_pool, alloc);
if (nxt_slow_path(f->name == NULL)) {
@@ -84,7 +84,7 @@ nxt_http_static_handler(nxt_task_t *task, nxt_http_request_t *r,
}
p = f->name;
- p = nxt_cpymem(p, pass->name.start, pass->name.length);
+ p = nxt_cpymem(p, action->name.start, action->name.length);
p = nxt_cpymem(p, r->path->start, r->path->length);
p = nxt_cpymem(p, index.start, index.length);
*p = '\0';
@@ -272,7 +272,7 @@ nxt_http_static_handler(nxt_task_t *task, nxt_http_request_t *r,
body_handler = NULL;
}
- nxt_http_request_header_send(task, r, body_handler);
+ nxt_http_request_header_send(task, r, body_handler, NULL);
r->state = &nxt_http_static_send_state;
return NULL;
diff --git a/src/nxt_http_websocket.c b/src/nxt_http_websocket.c
index d58d615c..fb888f5d 100644
--- a/src/nxt_http_websocket.c
+++ b/src/nxt_http_websocket.c
@@ -88,6 +88,7 @@ nxt_http_websocket_client(nxt_task_t *task, void *obj, void *data)
frame_size -= copy_size;
next = b->next;
+ b->next = NULL;
if (nxt_buf_mem_used_size(&b->mem) == 0) {
nxt_work_queue_add(&task->thread->engine->fast_work_queue,
diff --git a/src/nxt_main_process.c b/src/nxt_main_process.c
index 85685fbc..cfe0341f 100644
--- a/src/nxt_main_process.c
+++ b/src/nxt_main_process.c
@@ -70,8 +70,8 @@ static void nxt_main_port_access_log_handler(nxt_task_t *task,
static nxt_int_t nxt_init_set_isolation(nxt_task_t *task,
nxt_process_init_t *init, nxt_conf_value_t *isolation);
-static nxt_int_t nxt_init_set_ns(nxt_task_t *task,
- nxt_process_init_t *init, nxt_conf_value_t *ns);
+static nxt_int_t nxt_init_set_ns(nxt_task_t *task, nxt_process_init_t *init,
+ nxt_conf_value_t *ns);
const nxt_sig_event_t nxt_main_process_signals[] = {
nxt_event_signal(SIGHUP, nxt_main_process_signal_handler),
@@ -397,31 +397,19 @@ nxt_main_process_port_create(nxt_task_t *task, nxt_runtime_t *rt)
nxt_port_t *port;
nxt_process_t *process;
- process = nxt_runtime_process_get(rt, nxt_pid);
- if (nxt_slow_path(process == NULL)) {
- return NXT_ERROR;
- }
-
- port = nxt_port_new(task, 0, nxt_pid, NXT_PROCESS_MAIN);
+ port = nxt_runtime_process_port_create(task, rt, nxt_pid, 0,
+ NXT_PROCESS_MAIN);
if (nxt_slow_path(port == NULL)) {
- nxt_process_use(task, process, -1);
return NXT_ERROR;
}
- nxt_process_port_add(task, process, port);
-
- nxt_process_use(task, process, -1);
+ process = port->process;
ret = nxt_port_socket_init(task, port, 0);
if (nxt_slow_path(ret != NXT_OK)) {
- nxt_port_use(task, port, -1);
return ret;
}
- nxt_runtime_port_add(task, port);
-
- nxt_port_use(task, port, -1);
-
/*
* A main process port. A write port is not closed
* since it should be inherited by worker processes.
@@ -465,13 +453,11 @@ nxt_main_start_controller_process(nxt_task_t *task, nxt_runtime_t *rt)
{
nxt_process_init_t *init;
- init = nxt_malloc(sizeof(nxt_process_init_t));
+ init = nxt_mp_zalloc(rt->mem_pool, sizeof(nxt_process_init_t));
if (nxt_slow_path(init == NULL)) {
return NXT_ERROR;
}
- nxt_memzero(init, sizeof(nxt_process_init_t));
-
init->start = nxt_controller_start;
init->name = "controller";
init->user_cred = &rt->user_cred;
@@ -561,13 +547,11 @@ nxt_main_start_discovery_process(nxt_task_t *task, nxt_runtime_t *rt)
{
nxt_process_init_t *init;
- init = nxt_malloc(sizeof(nxt_process_init_t));
+ init = nxt_mp_zalloc(rt->mem_pool, sizeof(nxt_process_init_t));
if (nxt_slow_path(init == NULL)) {
return NXT_ERROR;
}
- nxt_memzero(init, sizeof(nxt_process_init_t));
-
init->start = nxt_discovery_start;
init->name = "discovery";
init->user_cred = &rt->user_cred;
@@ -587,13 +571,11 @@ nxt_main_start_router_process(nxt_task_t *task, nxt_runtime_t *rt)
{
nxt_process_init_t *init;
- init = nxt_malloc(sizeof(nxt_process_init_t));
+ init = nxt_mp_zalloc(rt->mem_pool, sizeof(nxt_process_init_t));
if (nxt_slow_path(init == NULL)) {
return NXT_ERROR;
}
- nxt_memzero(init, sizeof(nxt_process_init_t));
-
init->start = nxt_router_start;
init->name = "router";
init->user_cred = &rt->user_cred;
@@ -627,13 +609,11 @@ nxt_main_start_worker_process(nxt_task_t *task, nxt_runtime_t *rt,
+ app_conf->group.length + 1;
}
- init = nxt_malloc(size);
+ init = nxt_mp_zalloc(rt->mem_pool, size);
if (nxt_slow_path(init == NULL)) {
return NXT_ERROR;
}
- nxt_memzero(init, sizeof(nxt_process_init_t));
-
if (rt->capabilities.setid) {
init->user_cred = nxt_pointer_to(init, sizeof(nxt_process_init_t));
user = nxt_pointer_to(init->user_cred, sizeof(nxt_user_cred_t));
@@ -705,7 +685,7 @@ nxt_main_start_worker_process(nxt_task_t *task, nxt_runtime_t *rt,
fail:
- nxt_free(init);
+ nxt_mp_free(rt->mem_pool, init);
return NXT_ERROR;
}
@@ -726,6 +706,8 @@ nxt_main_create_worker_process(nxt_task_t *task, nxt_runtime_t *rt,
process = nxt_runtime_process_new(rt);
if (nxt_slow_path(process == NULL)) {
+ nxt_mp_free(rt->mem_pool, init);
+
return NXT_ERROR;
}
@@ -785,8 +767,6 @@ nxt_main_stop_all_processes(nxt_task_t *task, nxt_runtime_t *rt)
nxt_runtime_process_each(rt, process) {
if (nxt_pid != process->pid) {
- process->init = NULL;
-
nxt_process_port_each(process, port) {
(void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT,
@@ -1005,10 +985,11 @@ nxt_main_cleanup_worker_process(nxt_task_t *task, nxt_pid_t pid)
if (process) {
init = process->init;
+ process->init = NULL;
ptype = nxt_process_type(process);
- if (process->ready && init != NULL) {
+ if (process->ready) {
init->stream = 0;
}
@@ -1057,7 +1038,7 @@ nxt_main_cleanup_worker_process(nxt_task_t *task, nxt_pid_t pid)
init->restart(task, rt, init);
} else {
- nxt_free(init);
+ nxt_mp_free(rt->mem_pool, init);
}
}
}
@@ -1540,7 +1521,8 @@ nxt_init_set_isolation(nxt_task_t *task, nxt_process_init_t *init,
static nxt_int_t
-nxt_init_set_ns(nxt_task_t *task, nxt_process_init_t *init, nxt_conf_value_t *namespaces)
+nxt_init_set_ns(nxt_task_t *task, nxt_process_init_t *init,
+ nxt_conf_value_t *namespaces)
{
uint32_t index;
nxt_str_t name;
@@ -1549,7 +1531,13 @@ nxt_init_set_ns(nxt_task_t *task, nxt_process_init_t *init, nxt_conf_value_t *na
index = 0;
- while ((value = nxt_conf_next_object_member(namespaces, &name, &index)) != NULL) {
+ for ( ;; ) {
+ value = nxt_conf_next_object_member(namespaces, &name, &index);
+
+ if (value == NULL) {
+ break;
+ }
+
flag = 0;
#if (NXT_HAVE_CLONE_NEWUSER)
@@ -1593,11 +1581,9 @@ nxt_init_set_ns(nxt_task_t *task, nxt_process_init_t *init, nxt_conf_value_t *na
return NXT_ERROR;
}
- if (nxt_conf_get_integer(value) == 0) {
- continue; /* process shares everything by default */
+ if (nxt_conf_get_boolean(value)) {
+ init->isolation.clone.flags |= flag;
}
-
- init->isolation.clone.flags |= flag;
}
return NXT_OK;
diff --git a/src/nxt_port.c b/src/nxt_port.c
index 9029353a..8d14a5e7 100644
--- a/src/nxt_port.c
+++ b/src/nxt_port.c
@@ -238,7 +238,6 @@ void
nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
{
nxt_port_t *port;
- nxt_process_t *process;
nxt_runtime_t *rt;
nxt_port_msg_new_port_t *new_port_msg;
@@ -261,22 +260,13 @@ nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
return;
}
- process = nxt_runtime_process_get(rt, new_port_msg->pid);
- if (nxt_slow_path(process == NULL)) {
- return;
- }
-
- port = nxt_port_new(task, new_port_msg->id, new_port_msg->pid,
- new_port_msg->type);
+ port = nxt_runtime_process_port_create(task, rt, new_port_msg->pid,
+ new_port_msg->id,
+ new_port_msg->type);
if (nxt_slow_path(port == NULL)) {
- nxt_process_use(task, process, -1);
return;
}
- nxt_process_port_add(task, process, port);
-
- nxt_process_use(task, process, -1);
-
nxt_fd_nonblocking(task, msg->fd);
port->pair[0] = -1;
@@ -286,10 +276,6 @@ nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
port->socket.task = task;
- nxt_runtime_port_add(task, port);
-
- nxt_port_use(task, port, -1);
-
nxt_port_write_enable(task, port);
msg->u.new_port = port;
diff --git a/src/nxt_port_socket.c b/src/nxt_port_socket.c
index 4edc423a..9c7da970 100644
--- a/src/nxt_port_socket.c
+++ b/src/nxt_port_socket.c
@@ -478,7 +478,8 @@ static nxt_buf_t *
nxt_port_buf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b,
size_t sent, nxt_bool_t mmap_mode)
{
- size_t size;
+ size_t size;
+ nxt_buf_t *next;
while (b != NULL) {
@@ -528,7 +529,9 @@ nxt_port_buf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b,
nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent);
- b = b->next;
+ next = b->next;
+ b->next = NULL;
+ b = next;
}
return b;
@@ -796,7 +799,7 @@ static void
nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port,
nxt_port_recv_msg_t *msg)
{
- nxt_buf_t *b, *orig_b;
+ nxt_buf_t *b, *orig_b, *next;
nxt_port_recv_msg_t *fmsg;
if (nxt_slow_path(msg->size < sizeof(nxt_port_msg_t))) {
@@ -915,11 +918,15 @@ fmsg_failed:
*/
if (msg->buf == b) {
/* complete mmap buffers */
- for (; b != NULL; b = b->next) {
+ while (b != NULL) {
nxt_debug(task, "complete buffer %p", b);
nxt_work_queue_add(port->socket.read_work_queue,
b->completion_handler, task, b, b->parent);
+
+ next = b->next;
+ b->next = NULL;
+ b = next;
}
}
@@ -964,7 +971,7 @@ static void
nxt_port_error_handler(nxt_task_t *task, void *obj, void *data)
{
int use_delta;
- nxt_buf_t *b;
+ nxt_buf_t *b, *next;
nxt_port_t *port;
nxt_work_queue_t *wq;
nxt_port_send_msg_t *msg;
@@ -986,7 +993,10 @@ nxt_port_error_handler(nxt_task_t *task, void *obj, void *data)
nxt_queue_each(msg, &port->messages, nxt_port_send_msg_t, link) {
- for (b = msg->buf; b != NULL; b = b->next) {
+ for (b = msg->buf; b != NULL; b = next) {
+ next = b->next;
+ b->next = NULL;
+
if (nxt_buf_is_sync(b)) {
continue;
}
diff --git a/src/nxt_process.c b/src/nxt_process.c
index 0cc9ccc4..b246a58c 100644
--- a/src/nxt_process.c
+++ b/src/nxt_process.c
@@ -207,7 +207,7 @@ nxt_process_create(nxt_task_t *task, nxt_process_t *process)
goto fail;
}
-#if (NXT_HAVE_CLONE_NEWUSER)
+#if (NXT_HAVE_CLONE && NXT_HAVE_CLONE_NEWUSER)
if ((init->isolation.clone.flags & CLONE_NEWUSER) == CLONE_NEWUSER) {
ret = nxt_clone_proc_map(task, pid, &init->isolation.clone);
if (nxt_slow_path(ret != NXT_OK)) {
@@ -723,16 +723,35 @@ free:
nxt_int_t
nxt_user_cred_set(nxt_task_t *task, nxt_user_cred_t *uc)
{
- nxt_debug(task, "user cred set: \"%s\" uid:%uL base gid:%uL",
- uc->user, (uint64_t) uc->uid, (uint64_t) uc->base_gid);
+ nxt_debug(task, "user cred set: \"%s\" uid:%d base gid:%d",
+ uc->user, uc->uid, uc->base_gid);
if (setgid(uc->base_gid) != 0) {
+
+#if (NXT_HAVE_CLONE)
+ if (nxt_errno == EINVAL) {
+ nxt_log(task, NXT_LOG_ERR, "The gid %d isn't valid in the "
+ "application namespace.", uc->base_gid);
+ return NXT_ERROR;
+ }
+#endif
+
nxt_alert(task, "setgid(%d) failed %E", uc->base_gid, nxt_errno);
return NXT_ERROR;
}
if (uc->gids != NULL) {
if (setgroups(uc->ngroups, uc->gids) != 0) {
+
+#if (NXT_HAVE_CLONE)
+ if (nxt_errno == EINVAL) {
+ nxt_log(task, NXT_LOG_ERR, "The user \"%s\" (uid: %d) has "
+ "supplementary group ids not valid in the application "
+ "namespace.", uc->user, uc->uid);
+ return NXT_ERROR;
+ }
+#endif
+
nxt_alert(task, "setgroups(%i) failed %E", uc->ngroups, nxt_errno);
return NXT_ERROR;
}
@@ -747,6 +766,15 @@ nxt_user_cred_set(nxt_task_t *task, nxt_user_cred_t *uc)
}
if (setuid(uc->uid) != 0) {
+
+#if (NXT_HAVE_CLONE)
+ if (nxt_errno == EINVAL) {
+ nxt_log(task, NXT_LOG_ERR, "The uid %d (user \"%s\") isn't "
+ "valid in the application namespace.", uc->uid, uc->user);
+ return NXT_ERROR;
+ }
+#endif
+
nxt_alert(task, "setuid(%d) failed %E", uc->uid, nxt_errno);
return NXT_ERROR;
}
@@ -756,6 +784,17 @@ nxt_user_cred_set(nxt_task_t *task, nxt_user_cred_t *uc)
void
+nxt_process_use(nxt_task_t *task, nxt_process_t *process, int i)
+{
+ process->use_count += i;
+
+ if (process->use_count == 0) {
+ nxt_runtime_process_release(task->thread->runtime, process);
+ }
+}
+
+
+void
nxt_process_port_add(nxt_task_t *task, nxt_process_t *process, nxt_port_t *port)
{
nxt_assert(port->process == NULL);
diff --git a/src/nxt_process.h b/src/nxt_process.h
index df9ca038..d67573f1 100644
--- a/src/nxt_process.h
+++ b/src/nxt_process.h
@@ -114,6 +114,8 @@ NXT_EXPORT void nxt_process_port_add(nxt_task_t *task, nxt_process_t *process,
nxt_process_type_t nxt_process_type(nxt_process_t *process);
+void nxt_process_use(nxt_task_t *task, nxt_process_t *process, int i);
+
void nxt_process_close_ports(nxt_task_t *task, nxt_process_t *process);
void nxt_process_connected_port_add(nxt_process_t *process, nxt_port_t *port);
diff --git a/src/nxt_python_wsgi.c b/src/nxt_python_wsgi.c
index a6d5f217..5bb2fb2c 100644
--- a/src/nxt_python_wsgi.c
+++ b/src/nxt_python_wsgi.c
@@ -86,6 +86,7 @@ static PyObject *nxt_py_input_read(nxt_py_input_t *self, PyObject *args);
static PyObject *nxt_py_input_readline(nxt_py_input_t *self, PyObject *args);
static PyObject *nxt_py_input_readlines(nxt_py_input_t *self, PyObject *args);
+static void nxt_python_print_exception(void);
static int nxt_python_write(nxt_python_run_ctx_t *ctx, PyObject *bytes);
struct nxt_python_run_ctx_s {
@@ -130,58 +131,17 @@ static PyMethodDef nxt_py_input_methods[] = {
static PyTypeObject nxt_py_input_type = {
PyVarObject_HEAD_INIT(NULL, 0)
- "unit._input", /* tp_name */
- (int) sizeof(nxt_py_input_t), /* tp_basicsize */
- 0, /* tp_itemsize */
- (destructor) nxt_py_input_dealloc, /* tp_dealloc */
- 0, /* tp_print */
- 0, /* tp_getattr */
- 0, /* tp_setattr */
- 0, /* tp_compare */
- 0, /* tp_repr */
- 0, /* tp_as_number */
- 0, /* tp_as_sequence */
- 0, /* tp_as_mapping */
- 0, /* tp_hash */
- 0, /* tp_call */
- 0, /* tp_str */
- 0, /* tp_getattro */
- 0, /* tp_setattro */
- 0, /* tp_as_buffer */
- Py_TPFLAGS_DEFAULT, /* tp_flags */
- "unit input object.", /* tp_doc */
- 0, /* tp_traverse */
- 0, /* tp_clear */
- 0, /* tp_richcompare */
- 0, /* tp_weaklistoffset */
- 0, /* tp_iter */
- 0, /* tp_iternext */
- nxt_py_input_methods, /* tp_methods */
- 0, /* tp_members */
- 0, /* tp_getset */
- 0, /* tp_base */
- 0, /* tp_dict */
- 0, /* tp_descr_get */
- 0, /* tp_descr_set */
- 0, /* tp_dictoffset */
- 0, /* tp_init */
- 0, /* tp_alloc */
- 0, /* tp_new */
- 0, /* tp_free */
- 0, /* tp_is_gc */
- 0, /* tp_bases */
- 0, /* tp_mro - method resolution order */
- 0, /* tp_cache */
- 0, /* tp_subclasses */
- 0, /* tp_weaklist */
- 0, /* tp_del */
- 0, /* tp_version_tag */
-#if PY_MAJOR_VERSION == 3 && PY_MINOR_VERSION > 3
- 0, /* tp_finalize */
-#endif
+
+ .tp_name = "unit._input",
+ .tp_basicsize = sizeof(nxt_py_input_t),
+ .tp_dealloc = (destructor) nxt_py_input_dealloc,
+ .tp_flags = Py_TPFLAGS_DEFAULT,
+ .tp_doc = "unit input object.",
+ .tp_methods = nxt_py_input_methods,
};
+static PyObject *nxt_py_stderr_flush;
static PyObject *nxt_py_application;
static PyObject *nxt_py_start_resp_obj;
static PyObject *nxt_py_write_obj;
@@ -193,6 +153,7 @@ static wchar_t *nxt_py_home;
static char *nxt_py_home;
#endif
+static PyThreadState *nxt_python_thread_state;
static nxt_python_run_ctx_t *nxt_python_run_ctx;
@@ -279,6 +240,21 @@ nxt_python_init(nxt_task_t *task, nxt_common_app_conf_t *conf)
module = NULL;
+ obj = PySys_GetObject((char *) "stderr");
+ if (nxt_slow_path(obj == NULL)) {
+ nxt_alert(task, "Python failed to get \"sys.stderr\" object");
+ goto fail;
+ }
+
+ nxt_py_stderr_flush = PyObject_GetAttrString(obj, "flush");
+ if (nxt_slow_path(nxt_py_stderr_flush == NULL)) {
+ nxt_alert(task, "Python failed to get \"flush\" attribute of "
+ "\"sys.stderr\" object");
+ goto fail;
+ }
+
+ Py_DECREF(obj);
+
if (c->path.length > 0) {
obj = PyString_FromStringAndSize((char *) c->path.start,
c->path.length);
@@ -349,7 +325,7 @@ nxt_python_init(nxt_task_t *task, nxt_common_app_conf_t *conf)
module = PyImport_ImportModule(nxt_py_module);
if (nxt_slow_path(module == NULL)) {
nxt_alert(task, "Python failed to import module \"%s\"", nxt_py_module);
- PyErr_Print();
+ nxt_python_print_exception();
goto fail;
}
@@ -363,7 +339,6 @@ nxt_python_init(nxt_task_t *task, nxt_common_app_conf_t *conf)
if (nxt_slow_path(PyCallable_Check(obj) == 0)) {
nxt_alert(task, "\"application\" in module \"%s\" "
"is not a callable object", nxt_py_module);
- PyErr_Print();
goto fail;
}
@@ -382,10 +357,14 @@ nxt_python_init(nxt_task_t *task, nxt_common_app_conf_t *conf)
goto fail;
}
+ nxt_python_thread_state = PyEval_SaveThread();
+
rc = nxt_unit_run(unit_ctx);
nxt_unit_done(unit_ctx);
+ PyEval_RestoreThread(nxt_python_thread_state);
+
nxt_python_atexit();
exit(rc);
@@ -407,22 +386,26 @@ static void
nxt_python_request_handler(nxt_unit_request_info_t *req)
{
int rc;
- PyObject *result, *iterator, *item, *args, *environ;
+ PyObject *environ, *args, *response, *iterator, *item;
+ PyObject *close, *result;
nxt_python_run_ctx_t run_ctx = {-1, 0, NULL, req};
+ PyEval_RestoreThread(nxt_python_thread_state);
+
environ = nxt_python_get_environ(&run_ctx);
if (nxt_slow_path(environ == NULL)) {
- nxt_unit_request_done(req, NXT_UNIT_ERROR);
-
- return;
+ rc = NXT_UNIT_ERROR;
+ goto done;
}
args = PyTuple_New(2);
if (nxt_slow_path(args == NULL)) {
+ Py_DECREF(environ);
+
nxt_unit_req_error(req, "Python failed to create arguments tuple");
- nxt_unit_request_done(req, NXT_UNIT_ERROR);
- return;
+ rc = NXT_UNIT_ERROR;
+ goto done;
}
PyTuple_SET_ITEM(args, 0, environ);
@@ -432,103 +415,104 @@ nxt_python_request_handler(nxt_unit_request_info_t *req)
nxt_python_run_ctx = &run_ctx;
- result = PyObject_CallObject(nxt_py_application, args);
+ response = PyObject_CallObject(nxt_py_application, args);
Py_DECREF(args);
- if (nxt_slow_path(result == NULL)) {
+ if (nxt_slow_path(response == NULL)) {
nxt_unit_req_error(req, "Python failed to call the application");
- PyErr_Print();
-
- nxt_unit_request_done(req, NXT_UNIT_ERROR);
- nxt_python_run_ctx = NULL;
+ nxt_python_print_exception();
- return;
+ rc = NXT_UNIT_ERROR;
+ goto done;
}
- item = NULL;
- iterator = NULL;
+ /* Shortcut: avoid iterate over response string symbols. */
+ if (PyBytes_Check(response)) {
+ rc = nxt_python_write(&run_ctx, response);
- /* Shortcut: avoid iterate over result string symbols. */
- if (PyBytes_Check(result)) {
+ } else {
+ iterator = PyObject_GetIter(response);
- rc = nxt_python_write(&run_ctx, result);
- if (nxt_slow_path(rc != NXT_UNIT_OK)) {
- goto fail;
- }
+ if (nxt_fast_path(iterator != NULL)) {
+ rc = NXT_UNIT_OK;
- } else {
- iterator = PyObject_GetIter(result);
+ while (run_ctx.bytes_sent < run_ctx.content_length) {
+ item = PyIter_Next(iterator);
- if (nxt_slow_path(iterator == NULL)) {
- nxt_unit_req_error(req, "the application returned "
- "not an iterable object");
+ if (item == NULL) {
+ if (nxt_slow_path(PyErr_Occurred() != NULL)) {
+ nxt_unit_req_error(req, "Python failed to iterate over "
+ "the application response object");
+ nxt_python_print_exception();
- goto fail;
- }
+ rc = NXT_UNIT_ERROR;
+ }
- while (run_ctx.bytes_sent < run_ctx.content_length
- && (item = PyIter_Next(iterator)))
- {
- if (nxt_slow_path(!PyBytes_Check(item))) {
- nxt_unit_req_error(req, "the application returned "
- "not a bytestring object");
+ break;
+ }
- goto fail;
- }
+ if (nxt_fast_path(PyBytes_Check(item))) {
+ rc = nxt_python_write(&run_ctx, item);
- rc = nxt_python_write(&run_ctx, item);
- if (nxt_slow_path(rc != NXT_UNIT_OK)) {
- goto fail;
- }
+ } else {
+ nxt_unit_req_error(req, "the application returned "
+ "not a bytestring object");
+ rc = NXT_UNIT_ERROR;
+ }
- Py_DECREF(item);
- }
+ Py_DECREF(item);
- Py_DECREF(iterator);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ break;
+ }
+ }
- if (PyObject_HasAttrString(result, "close")) {
- PyObject_CallMethod(result, (char *) "close", NULL);
- }
- }
+ Py_DECREF(iterator);
- if (nxt_slow_path(PyErr_Occurred() != NULL)) {
- nxt_unit_req_error(req, "an application error occurred");
- PyErr_Print();
- }
+ } else {
+ nxt_unit_req_error(req,
+ "the application returned not an iterable object");
+ nxt_python_print_exception();
- nxt_unit_request_done(req, NXT_UNIT_OK);
+ rc = NXT_UNIT_ERROR;
+ }
- Py_DECREF(result);
+ close = PyObject_GetAttrString(response, "close");
- nxt_python_run_ctx = NULL;
+ if (close != NULL) {
+ result = PyObject_CallFunction(close, NULL);
+ if (nxt_slow_path(result == NULL)) {
+ nxt_unit_req_error(req, "Python failed to call the close() "
+ "method of the application response");
+ nxt_python_print_exception();
- return;
+ } else {
+ Py_DECREF(result);
+ }
-fail:
+ Py_DECREF(close);
- if (item != NULL) {
- Py_DECREF(item);
+ } else {
+ PyErr_Clear();
+ }
}
- if (iterator != NULL) {
- Py_DECREF(iterator);
- }
+ Py_DECREF(response);
- if (PyObject_HasAttrString(result, "close")) {
- PyObject_CallMethod(result, (char *) "close", NULL);
- }
+done:
- Py_DECREF(result);
- nxt_python_run_ctx = NULL;
+ nxt_python_thread_state = PyEval_SaveThread();
- nxt_unit_request_done(req, NXT_UNIT_ERROR);
+ nxt_python_run_ctx = NULL;
+ nxt_unit_request_done(req, rc);
}
static void
nxt_python_atexit(void)
{
+ Py_XDECREF(nxt_py_stderr_flush);
Py_XDECREF(nxt_py_application);
Py_XDECREF(nxt_py_start_resp_obj);
Py_XDECREF(nxt_py_write_obj);
@@ -767,7 +751,7 @@ nxt_python_add_sptr(nxt_python_run_ctx_t *ctx, const char *name,
nxt_unit_req_error(ctx->req,
"Python failed to create value string \"%.*s\"",
(int) size, src);
- PyErr_Print();
+ nxt_python_print_exception();
return NXT_UNIT_ERROR;
}
@@ -802,7 +786,7 @@ nxt_python_add_str(nxt_python_run_ctx_t *ctx, const char *name,
nxt_unit_req_error(ctx->req,
"Python failed to create value string \"%.*s\"",
(int) size, str);
- PyErr_Print();
+ nxt_python_print_exception();
return NXT_UNIT_ERROR;
}
@@ -1137,6 +1121,28 @@ nxt_py_input_readlines(nxt_py_input_t *self, PyObject *args)
}
+static void
+nxt_python_print_exception(void)
+{
+ PyErr_Print();
+
+#if PY_MAJOR_VERSION == 3
+ /* The backtrace may be buffered in sys.stderr file object. */
+ {
+ PyObject *result;
+
+ result = PyObject_CallFunction(nxt_py_stderr_flush, NULL);
+ if (nxt_slow_path(result == NULL)) {
+ PyErr_Clear();
+ return;
+ }
+
+ Py_DECREF(result);
+ }
+#endif
+}
+
+
static int
nxt_python_write(nxt_python_run_ctx_t *ctx, PyObject *bytes)
{
diff --git a/src/nxt_router.c b/src/nxt_router.c
index 28781600..b9f5d921 100644
--- a/src/nxt_router.c
+++ b/src/nxt_router.c
@@ -537,6 +537,7 @@ nxt_router_msg_cancel(nxt_task_t *task, nxt_msg_info_t *msg_info,
for (b = msg_info->buf; b != NULL; b = next) {
next = b->next;
+ b->next = NULL;
b->completion_handler = msg_info->completion_handler;
@@ -1172,8 +1173,8 @@ nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
nxt_queue_each(skcf, &new_socket_confs, nxt_socket_conf_t, link) {
- if (skcf->pass != NULL) {
- nxt_http_pass_cleanup(task, skcf->pass);
+ if (skcf->action != NULL) {
+ nxt_http_action_cleanup(task, skcf->action);
}
} nxt_queue_loop;
@@ -1458,7 +1459,8 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
next = 0;
for ( ;; ) {
- application = nxt_conf_next_object_member(applications, &name, &next);
+ application = nxt_conf_next_object_member(applications,
+ &name, &next);
if (application == NULL) {
break;
}
@@ -1676,10 +1678,16 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
skcf->large_header_buffers = 4;
skcf->body_buffer_size = 16 * 1024;
skcf->max_body_size = 8 * 1024 * 1024;
+ skcf->proxy_header_buffer_size = 64 * 1024;
+ skcf->proxy_buffer_size = 4096;
+ skcf->proxy_buffers = 256;
skcf->idle_timeout = 180 * 1000;
skcf->header_read_timeout = 30 * 1000;
skcf->body_read_timeout = 30 * 1000;
skcf->send_timeout = 30 * 1000;
+ skcf->proxy_timeout = 60 * 1000;
+ skcf->proxy_send_timeout = 30 * 1000;
+ skcf->proxy_read_timeout = 30 * 1000;
skcf->websocket_conf.max_frame_size = 1024 * 1024;
skcf->websocket_conf.read_timeout = 60 * 1000;
@@ -1729,12 +1737,12 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
skcf->router_conf->count++;
if (lscf.pass.length != 0) {
- skcf->pass = nxt_http_pass_create(task, tmcf, &lscf.pass);
+ skcf->action = nxt_http_action_create(task, tmcf, &lscf.pass);
/* COMPATIBILITY: listener application. */
} else if (lscf.application.length > 0) {
- skcf->pass = nxt_http_pass_application(task, tmcf,
- &lscf.application);
+ skcf->action = nxt_http_pass_application(task, tmcf,
+ &lscf.application);
}
}
}
@@ -3070,8 +3078,8 @@ nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint)
nxt_thread_spin_unlock(lock);
if (skcf != NULL) {
- if (skcf->pass != NULL) {
- nxt_http_pass_cleanup(task, skcf->pass);
+ if (skcf->action != NULL) {
+ nxt_http_action_cleanup(task, skcf->action);
}
#if (NXT_TLS)
@@ -3497,7 +3505,7 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
void *data)
{
nxt_int_t ret;
- nxt_buf_t *b;
+ nxt_buf_t *b, *next;
nxt_port_t *app_port;
nxt_unit_field_t *f;
nxt_http_field_t *field;
@@ -3580,6 +3588,7 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
field->hash = f->hash;
field->skip = 0;
+ field->hopbyhop = 0;
field->name_length = f->name_length;
field->value_length = f->value_length;
@@ -3612,17 +3621,20 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
}
if (nxt_buf_mem_used_size(&b->mem) == 0) {
+ next = b->next;
+ b->next = NULL;
+
nxt_work_queue_add(&task->thread->engine->fast_work_queue,
b->completion_handler, task, b, b->parent);
- b = b->next;
+ b = next;
}
if (b != NULL) {
nxt_buf_chain_add(&r->out, b);
}
- nxt_http_request_header_send(task, r, nxt_http_request_send_body);
+ nxt_http_request_header_send(task, r, nxt_http_request_send_body, NULL);
if (r->websocket_handshake
&& r->status == NXT_HTTP_SWITCHING_PROTOCOLS)
@@ -5056,6 +5068,7 @@ nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r,
if (nxt_slow_path(buf == NULL)) {
while (out != NULL) {
buf = out->next;
+ out->next = NULL;
out->completion_handler(task, out, out->parent);
out = buf;
}
diff --git a/src/nxt_router.h b/src/nxt_router.h
index ec18ff48..1517c14b 100644
--- a/src/nxt_router.h
+++ b/src/nxt_router.h
@@ -16,12 +16,12 @@ typedef struct nxt_http_request_s nxt_http_request_t;
#include <nxt_application.h>
-typedef struct nxt_http_pass_s nxt_http_pass_t;
+typedef struct nxt_http_action_s nxt_http_action_t;
typedef struct nxt_http_routes_s nxt_http_routes_t;
typedef struct nxt_router_access_log_s nxt_router_access_log_t;
-#define NXT_HTTP_PASS_ERROR ((nxt_http_pass_t *) -1)
+#define NXT_HTTP_ACTION_ERROR ((nxt_http_action_t *) -1)
typedef struct {
@@ -154,7 +154,7 @@ typedef struct {
nxt_queue_link_t link;
nxt_router_conf_t *router_conf;
- nxt_http_pass_t *pass;
+ nxt_http_action_t *action;
/*
* A listen socket time can be shorter than socket configuration life
@@ -170,10 +170,17 @@ typedef struct {
size_t large_header_buffers;
size_t body_buffer_size;
size_t max_body_size;
+ size_t proxy_header_buffer_size;
+ size_t proxy_buffer_size;
+ size_t proxy_buffers;
+
nxt_msec_t idle_timeout;
nxt_msec_t header_read_timeout;
nxt_msec_t body_read_timeout;
nxt_msec_t send_timeout;
+ nxt_msec_t proxy_timeout;
+ nxt_msec_t proxy_send_timeout;
+ nxt_msec_t proxy_read_timeout;
nxt_websocket_conf_t websocket_conf;
diff --git a/src/nxt_runtime.c b/src/nxt_runtime.c
index de41ba4d..096aabc4 100644
--- a/src/nxt_runtime.c
+++ b/src/nxt_runtime.c
@@ -37,10 +37,10 @@ static void nxt_runtime_thread_pool_destroy(nxt_task_t *task, nxt_runtime_t *rt,
static void nxt_runtime_thread_pool_init(void);
static void nxt_runtime_thread_pool_exit(nxt_task_t *task, void *obj,
void *data);
-static void nxt_runtime_process_destroy(nxt_runtime_t *rt,
+static nxt_process_t *nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid);
+static void nxt_runtime_process_remove(nxt_runtime_t *rt,
nxt_process_t *process);
-static nxt_process_t *nxt_runtime_process_remove_pid(nxt_runtime_t *rt,
- nxt_pid_t pid);
+static void nxt_runtime_port_add(nxt_task_t *task, nxt_port_t *port);
nxt_int_t
@@ -459,7 +459,7 @@ nxt_runtime_close_idle_connections(nxt_event_engine_t *engine)
idle = &engine->idle_connections;
- for (link = nxt_queue_head(idle);
+ for (link = nxt_queue_first(idle);
link != nxt_queue_tail(idle);
link = next)
{
@@ -658,6 +658,8 @@ nxt_runtime_thread_pool_exit(nxt_task_t *task, void *obj, void *data)
if (tp == thread_pools[i]) {
nxt_array_remove(rt->thread_pools, &thread_pools[i]);
+ nxt_free(tp);
+
if (n == 1) {
/* The last thread pool. */
rt->continuation(task, 0);
@@ -1296,11 +1298,15 @@ nxt_runtime_process_new(nxt_runtime_t *rt)
}
-static void
-nxt_runtime_process_destroy(nxt_runtime_t *rt, nxt_process_t *process)
+void
+nxt_runtime_process_release(nxt_runtime_t *rt, nxt_process_t *process)
{
nxt_port_t *port;
+ if (process->registered == 1) {
+ nxt_runtime_process_remove(rt, process);
+ }
+
nxt_assert(process->use_count == 0);
nxt_assert(process->registered == 0);
@@ -1316,6 +1322,10 @@ nxt_runtime_process_destroy(nxt_runtime_t *rt, nxt_process_t *process)
nxt_thread_mutex_destroy(&process->outgoing.mutex);
nxt_thread_mutex_destroy(&process->cp_mutex);
+ if (process->init != NULL) {
+ nxt_mp_free(rt->mem_pool, process->init);
+ }
+
nxt_mp_free(rt->mem_pool, process);
}
@@ -1379,7 +1389,7 @@ nxt_runtime_process_find(nxt_runtime_t *rt, nxt_pid_t pid)
}
-nxt_process_t *
+static nxt_process_t *
nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid)
{
nxt_process_t *process;
@@ -1489,13 +1499,13 @@ nxt_runtime_process_add(nxt_task_t *task, nxt_process_t *process)
}
-static nxt_process_t *
-nxt_runtime_process_remove_pid(nxt_runtime_t *rt, nxt_pid_t pid)
+static void
+nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process)
{
- nxt_process_t *process;
+ nxt_pid_t pid;
nxt_lvlhsh_query_t lhq;
- process = NULL;
+ pid = process->pid;
nxt_runtime_process_lhq_pid(&lhq, &pid);
@@ -1521,40 +1531,49 @@ nxt_runtime_process_remove_pid(nxt_runtime_t *rt, nxt_pid_t pid)
}
nxt_thread_mutex_unlock(&rt->processes_mutex);
-
- return process;
}
-void
-nxt_process_use(nxt_task_t *task, nxt_process_t *process, int i)
+nxt_process_t *
+nxt_runtime_process_first(nxt_runtime_t *rt, nxt_lvlhsh_each_t *lhe)
{
- nxt_runtime_t *rt;
+ nxt_lvlhsh_each_init(lhe, &lvlhsh_processes_proto);
+
+ return nxt_runtime_process_next(rt, lhe);
+}
- process->use_count += i;
- if (process->use_count == 0) {
- rt = task->thread->runtime;
+nxt_port_t *
+nxt_runtime_process_port_create(nxt_task_t *task, nxt_runtime_t *rt,
+ nxt_pid_t pid, nxt_port_id_t id, nxt_process_type_t type)
+{
+ nxt_port_t *port;
+ nxt_process_t *process;
- if (process->registered == 1) {
- nxt_runtime_process_remove_pid(rt, process->pid);
- }
+ process = nxt_runtime_process_get(rt, pid);
+ if (nxt_slow_path(process == NULL)) {
+ return NULL;
+ }
- nxt_runtime_process_destroy(rt, process);
+ port = nxt_port_new(task, id, pid, type);
+ if (nxt_slow_path(port == NULL)) {
+ nxt_process_use(task, process, -1);
+ return NULL;
}
-}
+ nxt_process_port_add(task, process, port);
-nxt_process_t *
-nxt_runtime_process_first(nxt_runtime_t *rt, nxt_lvlhsh_each_t *lhe)
-{
- nxt_lvlhsh_each_init(lhe, &lvlhsh_processes_proto);
+ nxt_process_use(task, process, -1);
- return nxt_runtime_process_next(rt, lhe);
+ nxt_runtime_port_add(task, port);
+
+ nxt_port_use(task, port, -1);
+
+ return port;
}
-void
+static void
nxt_runtime_port_add(nxt_task_t *task, nxt_port_t *port)
{
nxt_int_t res;
diff --git a/src/nxt_runtime.h b/src/nxt_runtime.h
index 0791f8e7..d5b340b6 100644
--- a/src/nxt_runtime.h
+++ b/src/nxt_runtime.h
@@ -93,22 +93,20 @@ nxt_int_t nxt_runtime_thread_pool_create(nxt_thread_t *thr, nxt_runtime_t *rt,
nxt_process_t *nxt_runtime_process_new(nxt_runtime_t *rt);
-nxt_process_t *nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid);
-
void nxt_runtime_process_add(nxt_task_t *task, nxt_process_t *process);
nxt_process_t *nxt_runtime_process_find(nxt_runtime_t *rt, nxt_pid_t pid);
-void nxt_process_use(nxt_task_t *task, nxt_process_t *process, int i);
-
nxt_process_t *nxt_runtime_process_first(nxt_runtime_t *rt,
nxt_lvlhsh_each_t *lhe);
+void nxt_runtime_process_release(nxt_runtime_t *rt, nxt_process_t *process);
+
#define nxt_runtime_process_next(rt, lhe) \
nxt_lvlhsh_each(&rt->processes, lhe)
-
-void nxt_runtime_port_add(nxt_task_t *task, nxt_port_t *port);
+nxt_port_t *nxt_runtime_process_port_create(nxt_task_t *task, nxt_runtime_t *rt,
+ nxt_pid_t pid, nxt_port_id_t id, nxt_process_type_t type);
void nxt_runtime_port_remove(nxt_task_t *task, nxt_port_t *port);
diff --git a/src/nxt_sendbuf.c b/src/nxt_sendbuf.c
index 16fe4724..94f8e9eb 100644
--- a/src/nxt_sendbuf.c
+++ b/src/nxt_sendbuf.c
@@ -9,6 +9,8 @@
static nxt_bool_t nxt_sendbuf_copy(nxt_buf_mem_t *bm, nxt_buf_t *b,
size_t *copied);
+static nxt_buf_t *nxt_sendbuf_coalesce_completion(nxt_task_t *task,
+ nxt_work_queue_t *wq, nxt_buf_t *start);
nxt_uint_t
@@ -380,15 +382,11 @@ nxt_sendbuf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b)
{
while (b != NULL) {
- nxt_prefetch(b->next);
-
if (!nxt_buf_is_sync(b) && nxt_buf_used_size(b) != 0) {
break;
}
- nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent);
-
- b = b->next;
+ b = nxt_sendbuf_coalesce_completion(task, wq, b);
}
return b;
@@ -399,10 +397,49 @@ void
nxt_sendbuf_drain(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b)
{
while (b != NULL) {
- nxt_prefetch(b->next);
+ b = nxt_sendbuf_coalesce_completion(task, wq, b);
+ }
+}
- nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent);
- b = b->next;
+static nxt_buf_t *
+nxt_sendbuf_coalesce_completion(nxt_task_t *task, nxt_work_queue_t *wq,
+ nxt_buf_t *start)
+{
+ nxt_buf_t *b, *next, **last, *rest, **last_rest;
+ nxt_work_handler_t handler;
+
+ rest = NULL;
+ last_rest = &rest;
+ last = &start->next;
+ b = start;
+ handler = b->completion_handler;
+
+ for ( ;; ) {
+ next = b->next;
+ if (next == NULL) {
+ break;
+ }
+
+ b->next = NULL;
+ b = next;
+
+ if (!nxt_buf_is_sync(b) && nxt_buf_used_size(b) != 0) {
+ *last_rest = b;
+ break;
+ }
+
+ if (handler == b->completion_handler) {
+ *last = b;
+ last = &b->next;
+
+ } else {
+ *last_rest = b;
+ last_rest = &b->next;
+ }
}
+
+ nxt_work_queue_add(wq, handler, task, start, start->parent);
+
+ return rest;
}
diff --git a/src/nxt_sockaddr.c b/src/nxt_sockaddr.c
index 99cf54b4..57dfbfa6 100644
--- a/src/nxt_sockaddr.c
+++ b/src/nxt_sockaddr.c
@@ -23,11 +23,11 @@ static nxt_int_t nxt_job_sockaddr_inet_parse(nxt_job_sockaddr_parse_t *jbs);
nxt_sockaddr_t *
nxt_sockaddr_cache_alloc(nxt_event_engine_t *engine, nxt_listen_socket_t *ls)
{
- uint8_t hint;
size_t size;
+ uint8_t hint;
nxt_sockaddr_t *sa;
- hint = (uint8_t) -1;
+ hint = NXT_EVENT_ENGINE_NO_MEM_HINT;
size = offsetof(nxt_sockaddr_t, u) + ls->socklen + ls->address_length;
sa = nxt_event_engine_mem_alloc(engine, &hint, size);
@@ -56,7 +56,11 @@ nxt_sockaddr_cache_alloc(nxt_event_engine_t *engine, nxt_listen_socket_t *ls)
void
nxt_sockaddr_cache_free(nxt_event_engine_t *engine, nxt_conn_t *c)
{
- nxt_event_engine_mem_free(engine, &c->remote->cache_hint, c->remote);
+ nxt_sockaddr_t *sa;
+
+ sa = c->remote;
+
+ nxt_event_engine_mem_free(engine, sa->cache_hint, sa, 0);
}
diff --git a/src/nxt_socket.c b/src/nxt_socket.c
index 95a298d8..2a809184 100644
--- a/src/nxt_socket.c
+++ b/src/nxt_socket.c
@@ -300,6 +300,28 @@ nxt_socket_shutdown(nxt_task_t *task, nxt_socket_t s, nxt_uint_t how)
}
+nxt_err_t
+nxt_socket_error(nxt_socket_t s)
+{
+ int ret, err;
+ socklen_t len;
+
+ err = 0;
+ len = sizeof(int);
+ /*
+ * Linux and BSDs return 0 and store a pending error in the err argument;
+ * Solaris returns -1 and sets the errno.
+ */
+ ret = getsockopt(s, SOL_SOCKET, SO_ERROR, (void *) &err, &len);
+
+ if (nxt_slow_path(ret == -1)) {
+ err = nxt_errno;
+ }
+
+ return err;
+}
+
+
nxt_uint_t
nxt_socket_error_level(nxt_err_t err)
{
@@ -315,6 +337,9 @@ nxt_socket_error_level(nxt_err_t err)
case NXT_EHOSTUNREACH:
return NXT_LOG_INFO;
+ case NXT_ECONNREFUSED:
+ return NXT_LOG_ERR;
+
default:
return NXT_LOG_ALERT;
}
diff --git a/src/nxt_socket.h b/src/nxt_socket.h
index 3f00648d..6a450f83 100644
--- a/src/nxt_socket.h
+++ b/src/nxt_socket.h
@@ -106,6 +106,7 @@ NXT_EXPORT nxt_int_t nxt_socket_connect(nxt_task_t *task, nxt_socket_t s,
nxt_sockaddr_t *sa);
NXT_EXPORT void nxt_socket_shutdown(nxt_task_t *task, nxt_socket_t s,
nxt_uint_t how);
+nxt_err_t nxt_socket_error(nxt_socket_t s);
nxt_uint_t nxt_socket_error_level(nxt_err_t err);
NXT_EXPORT nxt_int_t nxt_socketpair_create(nxt_task_t *task,
diff --git a/src/nxt_string.c b/src/nxt_string.c
index b89e9555..d567883f 100644
--- a/src/nxt_string.c
+++ b/src/nxt_string.c
@@ -188,10 +188,14 @@ nxt_strncasecmp(const u_char *s1, const u_char *s2, size_t length)
nxt_int_t
-nxt_memcasecmp(const u_char *s1, const u_char *s2, size_t length)
+nxt_memcasecmp(const void *p1, const void *p2, size_t length)
{
- u_char c1, c2;
- nxt_int_t n;
+ u_char c1, c2;
+ nxt_int_t n;
+ const u_char *s1, *s2;
+
+ s1 = p1;
+ s2 = p2;
while (length-- != 0) {
c1 = *s1++;
diff --git a/src/nxt_string.h b/src/nxt_string.h
index 8d7b3b73..de498048 100644
--- a/src/nxt_string.h
+++ b/src/nxt_string.h
@@ -87,7 +87,7 @@ NXT_EXPORT u_char *nxt_cpystrn(u_char *dst, const u_char *src, size_t length);
NXT_EXPORT nxt_int_t nxt_strcasecmp(const u_char *s1, const u_char *s2);
NXT_EXPORT nxt_int_t nxt_strncasecmp(const u_char *s1, const u_char *s2,
size_t length);
-NXT_EXPORT nxt_int_t nxt_memcasecmp(const u_char *s1, const u_char *s2,
+NXT_EXPORT nxt_int_t nxt_memcasecmp(const void *p1, const void *p2,
size_t length);
NXT_EXPORT u_char *nxt_memstrn(const u_char *s, const u_char *end,
diff --git a/src/nxt_unit.c b/src/nxt_unit.c
index 9ccd1fd9..0cf32916 100644
--- a/src/nxt_unit.c
+++ b/src/nxt_unit.c
@@ -1316,8 +1316,12 @@ nxt_unit_response_init(nxt_unit_request_info_t *req,
nxt_unit_req_debug(req, "duplicate response init");
}
+ /*
+ * Each field name and value 0-terminated by libunit,
+ * this is the reason of '+ 2' below.
+ */
buf_size = sizeof(nxt_unit_response_t)
- + max_fields_count * sizeof(nxt_unit_field_t)
+ + max_fields_count * (sizeof(nxt_unit_field_t) + 2)
+ max_fields_size;
if (nxt_slow_path(req->response_buf != NULL)) {
@@ -1391,8 +1395,12 @@ nxt_unit_response_realloc(nxt_unit_request_info_t *req,
return NXT_UNIT_ERROR;
}
+ /*
+ * Each field name and value 0-terminated by libunit,
+ * this is the reason of '+ 2' below.
+ */
buf_size = sizeof(nxt_unit_response_t)
- + max_fields_count * sizeof(nxt_unit_field_t)
+ + max_fields_count * (sizeof(nxt_unit_field_t) + 2)
+ max_fields_size;
nxt_unit_req_debug(req, "realloc %"PRIu32"", buf_size);
@@ -1458,7 +1466,8 @@ nxt_unit_response_realloc(nxt_unit_request_info_t *req,
goto fail;
}
- resp->piggyback_content_length = req->response->piggyback_content_length;
+ resp->piggyback_content_length =
+ req->response->piggyback_content_length;
nxt_unit_sptr_set(&resp->piggyback_content, p);
p = nxt_cpymem(p, nxt_unit_sptr_get(&req->response->piggyback_content),
@@ -1953,7 +1962,8 @@ nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream,
if (hdr != NULL) {
m.mmap_msg.mmap_id = hdr->id;
- m.mmap_msg.chunk_id = nxt_port_mmap_chunk_id(hdr, (u_char *) buf->start);
+ m.mmap_msg.chunk_id = nxt_port_mmap_chunk_id(hdr,
+ (u_char *) buf->start);
}
nxt_unit_debug(ctx, "#%"PRIu32": send mmap: (%d,%d,%d)",
diff --git a/src/nxt_unit_field.h b/src/nxt_unit_field.h
index d19db0f0..b07d3046 100644
--- a/src/nxt_unit_field.h
+++ b/src/nxt_unit_field.h
@@ -21,7 +21,8 @@ enum {
/* Name and Value field aka HTTP header. */
struct nxt_unit_field_s {
uint16_t hash;
- uint8_t skip;
+ uint8_t skip:1;
+ uint8_t hopbyhop:1;
uint8_t name_length;
uint32_t value_length;
diff --git a/src/nxt_websocket.c b/src/nxt_websocket.c
index 9a099760..91002237 100644
--- a/src/nxt_websocket.c
+++ b/src/nxt_websocket.c
@@ -19,7 +19,7 @@ nxt_inline void
nxt_hton16(uint8_t *b, uint16_t v)
{
b[0] = (v >> 8);
- b[1] = (v & 0xFFu);
+ b[1] = (v & 0xFFu);
}
diff --git a/src/ruby/nxt_ruby.c b/src/ruby/nxt_ruby.c
index ab9f7020..45d7a7aa 100644
--- a/src/ruby/nxt_ruby.c
+++ b/src/ruby/nxt_ruby.c
@@ -85,14 +85,16 @@ static nxt_int_t
nxt_ruby_init(nxt_task_t *task, nxt_common_app_conf_t *conf)
{
int state, rc;
- VALUE dummy, res;
+ VALUE res;
nxt_unit_ctx_t *unit_ctx;
nxt_unit_init_t ruby_unit_init;
nxt_ruby_rack_init_t rack_init;
+ static char *argv[2] = { (char *) "NGINX_Unit", (char *) "-e0" };
+
+ RUBY_INIT_STACK
ruby_init();
- Init_stack(&dummy);
- ruby_init_loadpath();
+ ruby_options(2, argv);
ruby_script("NGINX_Unit");
rack_init.task = task;
@@ -707,7 +709,8 @@ nxt_ruby_rack_result_body(VALUE result)
}
} else if (rb_respond_to(body, rb_intern("each"))) {
- rb_iterate(rb_each, body, nxt_ruby_rack_result_body_each, 0);
+ rb_block_call(body, rb_intern("each"), 0, 0,
+ nxt_ruby_rack_result_body_each, 0);
} else {
nxt_unit_req_error(nxt_ruby_run_ctx.req,
diff --git a/src/ruby/nxt_ruby_stream_io.c b/src/ruby/nxt_ruby_stream_io.c
index 3f6cac89..fcfcf5dd 100644
--- a/src/ruby/nxt_ruby_stream_io.c
+++ b/src/ruby/nxt_ruby_stream_io.c
@@ -31,7 +31,8 @@ nxt_ruby_stream_io_input_init(void)
rb_gc_register_address(&stream_io);
rb_define_singleton_method(stream_io, "new", nxt_ruby_stream_io_new, 1);
- rb_define_method(stream_io, "initialize", nxt_ruby_stream_io_initialize, -1);
+ rb_define_method(stream_io, "initialize",
+ nxt_ruby_stream_io_initialize, -1);
rb_define_method(stream_io, "gets", nxt_ruby_stream_io_gets, 0);
rb_define_method(stream_io, "each", nxt_ruby_stream_io_each, 0);
rb_define_method(stream_io, "read", nxt_ruby_stream_io_read, -2);
@@ -51,7 +52,8 @@ nxt_ruby_stream_io_error_init(void)
rb_gc_register_address(&stream_io);
rb_define_singleton_method(stream_io, "new", nxt_ruby_stream_io_new, 1);
- rb_define_method(stream_io, "initialize", nxt_ruby_stream_io_initialize, -1);
+ rb_define_method(stream_io, "initialize",
+ nxt_ruby_stream_io_initialize, -1);
rb_define_method(stream_io, "puts", nxt_ruby_stream_io_puts, -2);
rb_define_method(stream_io, "write", nxt_ruby_stream_io_write, -2);
rb_define_method(stream_io, "flush", nxt_ruby_stream_io_flush, 0);
diff --git a/src/test/nxt_unit_websocket_chat.c b/src/test/nxt_unit_websocket_chat.c
index ecc9a243..6e274722 100644
--- a/src/test/nxt_unit_websocket_chat.c
+++ b/src/test/nxt_unit_websocket_chat.c
@@ -104,10 +104,10 @@ ws_chat_root(nxt_unit_request_info_t *req)
rc = nxt_unit_response_init(req, 200 /* Status code. */,
2 /* Number of response headers. */,
- nxt_length(CONTENT_TYPE) + 1
- + nxt_length(TEXT_HTML) + 1
- + nxt_length(CONTENT_LENGTH) + 1
- + ws_chat_index_content_length_size + 1
+ nxt_length(CONTENT_TYPE)
+ + nxt_length(TEXT_HTML)
+ + nxt_length(CONTENT_LENGTH)
+ + ws_chat_index_content_length_size
+ ws_chat_index_html_size);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
return rc;
diff --git a/test/python/delayed/wsgi.py b/test/python/delayed/wsgi.py
new file mode 100644
index 00000000..d25e2765
--- /dev/null
+++ b/test/python/delayed/wsgi.py
@@ -0,0 +1,25 @@
+import time
+
+
+def application(environ, start_response):
+ parts = int(environ.get('HTTP_X_PARTS', 1))
+ delay = int(environ.get('HTTP_X_DELAY', 0))
+
+ content_length = int(environ.get('CONTENT_LENGTH', 0))
+ body = bytes(environ['wsgi.input'].read(content_length))
+
+ write = start_response('200', [('Content-Length', str(len(body)))])
+
+ if not body:
+ return []
+
+ step = int(len(body) / parts)
+ for i in range(0, len(body), step):
+ try:
+ write(body[i : i + step])
+ except:
+ break
+
+ time.sleep(delay)
+
+ return []
diff --git a/test/python/errors_write/wsgi.py b/test/python/errors_write/wsgi.py
index b1a9d2ee..148bce9e 100644
--- a/test/python/errors_write/wsgi.py
+++ b/test/python/errors_write/wsgi.py
@@ -1,5 +1,6 @@
def application(environ, start_response):
environ['wsgi.errors'].write('Error in application.')
+ environ['wsgi.errors'].flush()
start_response('200', [('Content-Length', '0')])
return []
diff --git a/test/python/iter_exception/wsgi.py b/test/python/iter_exception/wsgi.py
new file mode 100644
index 00000000..66a09af7
--- /dev/null
+++ b/test/python/iter_exception/wsgi.py
@@ -0,0 +1,45 @@
+class application:
+ def __init__(self, environ, start_response):
+ self.environ = environ
+ self.start = start_response
+
+ self.next = self.__next__
+
+ def __iter__(self):
+ self.__i = 0
+ self._skip_level = int(self.environ.get('HTTP_X_SKIP', 0))
+ self._not_skip_close = int(self.environ.get('HTTP_X_NOT_SKIP_CLOSE', 0))
+ self._is_chunked = self.environ.get('HTTP_X_CHUNKED')
+
+ headers = [(('Content-Length', '10'))]
+ if self._is_chunked is not None:
+ headers = []
+
+ if self._skip_level < 1:
+ raise Exception('first exception')
+
+ write = self.start('200', headers)
+
+ if self._skip_level < 2:
+ raise Exception('second exception')
+
+ write(b'XXXXX')
+
+ if self._skip_level < 3:
+ raise Exception('third exception')
+
+ return self
+
+ def __next__(self):
+ if self._skip_level < 4:
+ raise Exception('next exception')
+
+ self.__i += 1
+ if self.__i > 2:
+ raise StopIteration
+
+ return b'X'
+
+ def close(self):
+ if self._not_skip_close == 1:
+ raise Exception('close exception')
diff --git a/test/python/log_body/wsgi.py b/test/python/log_body/wsgi.py
new file mode 100644
index 00000000..9dcb1b0c
--- /dev/null
+++ b/test/python/log_body/wsgi.py
@@ -0,0 +1,9 @@
+def application(environ, start_response):
+ content_length = int(environ.get('CONTENT_LENGTH', 0))
+ body = bytes(environ['wsgi.input'].read(content_length))
+
+ environ['wsgi.errors'].write(body)
+ environ['wsgi.errors'].flush()
+
+ start_response('200', [('Content-Length', '0')])
+ return []
diff --git a/test/python/threading/wsgi.py b/test/python/threading/wsgi.py
new file mode 100644
index 00000000..adaa2a37
--- /dev/null
+++ b/test/python/threading/wsgi.py
@@ -0,0 +1,33 @@
+import sys
+import time
+import threading
+
+
+class Foo(threading.Thread):
+ num = 10
+
+ def __init__(self, x):
+ self.__x = x
+ threading.Thread.__init__(self)
+
+ def log_index(self, index):
+ sys.stderr.write(
+ "(" + str(index) + ") Thread: " + str(self.__x) + "\n"
+ )
+ sys.stderr.flush()
+
+ def run(self):
+ i = 0
+ for _ in range(3):
+ self.log_index(i)
+ i += 1
+ time.sleep(1)
+ self.log_index(i)
+ i += 1
+
+
+def application(environ, start_response):
+ Foo(Foo.num).start()
+ Foo.num += 10
+ start_response('200 OK', [('Content-Length', '0')])
+ return []
diff --git a/test/ruby/constants/config.ru b/test/ruby/constants/config.ru
new file mode 100644
index 00000000..e0951bf4
--- /dev/null
+++ b/test/ruby/constants/config.ru
@@ -0,0 +1,15 @@
+app = Proc.new do |env|
+ ['200', {
+ 'X-Copyright' => RUBY_COPYRIGHT,
+ 'X-Description' => RUBY_DESCRIPTION,
+ 'X-Engine' => RUBY_ENGINE,
+ 'X-Engine-Version' => RUBY_ENGINE_VERSION,
+ 'X-Patchlevel' => RUBY_PATCHLEVEL.to_s,
+ 'X-Platform' => RUBY_PLATFORM,
+ 'X-Release-Date' => RUBY_RELEASE_DATE,
+ 'X-Revision' => RUBY_REVISION.to_s,
+ 'X-Version' => RUBY_VERSION,
+ }, []]
+end
+
+run app
diff --git a/test/test_access_log.py b/test/test_access_log.py
index 8dc87524..94f6e7bf 100644
--- a/test/test_access_log.py
+++ b/test/test_access_log.py
@@ -12,7 +12,11 @@ class TestAccessLog(TestApplicationPython):
def load(self, script):
super().load(script)
- self.conf('"' + self.testdir + '/access.log"', 'access_log')
+ self.assertIn(
+ 'success',
+ self.conf('"' + self.testdir + '/access.log"', 'access_log'),
+ 'access_log configure',
+ )
def wait_for_record(self, pattern, name='access.log'):
return super().wait_for_record(pattern, name)
@@ -111,7 +115,9 @@ Connection: close
addr = self.testdir + '/sock'
- self.conf({"unix:" + addr: {"pass": "applications/empty"}}, 'listeners')
+ self.conf(
+ {"unix:" + addr: {"pass": "applications/empty"}}, 'listeners'
+ )
self.get(sock_type='unix', addr=addr)
@@ -292,42 +298,5 @@ Connection: close
'change',
)
- def test_access_log_reopen(self):
- self.load('empty')
-
- log_path = self.testdir + '/access.log'
-
- self.assertTrue(self.waitforfiles(log_path), 'open')
-
- log_path_new = self.testdir + '/new.log'
-
- os.rename(log_path, log_path_new)
-
- self.get()
-
- self.assertIsNotNone(
- self.wait_for_record(r'"GET / HTTP/1.1" 200 0 "-" "-"', 'new.log'),
- 'rename new',
- )
- self.assertFalse(os.path.isfile(log_path), 'rename old')
-
- with open(self.testdir + '/unit.pid', 'r') as f:
- pid = f.read().rstrip()
-
- call(['kill', '-s', 'USR1', pid])
-
- self.assertTrue(self.waitforfiles(log_path), 'reopen')
-
- self.get(url='/usr1')
-
- self.assertIsNotNone(
- self.wait_for_record(r'"GET /usr1 HTTP/1.1" 200 0 "-" "-"'),
- 'reopen 2',
- )
- self.assertIsNone(
- self.search_in_log(r'/usr1', 'new.log'), 'rename new 2'
- )
-
-
if __name__ == '__main__':
TestAccessLog.main()
diff --git a/test/test_configuration.py b/test/test_configuration.py
index 69647858..186e037d 100644
--- a/test/test_configuration.py
+++ b/test/test_configuration.py
@@ -321,7 +321,7 @@ class TestConfiguration(TestControl):
}
for a in range(999)
},
- "listeners": {"*:7001": {"pass": "applications/app-1"}},
+ "listeners": {"*:7080": {"pass": "applications/app-1"}},
}
self.assertIn('success', self.conf(conf))
diff --git a/test/test_go_isolation.py b/test/test_go_isolation.py
index 780c2b03..ee5ddf47 100644
--- a/test/test_go_isolation.py
+++ b/test/test_go_isolation.py
@@ -130,6 +130,38 @@ class TestGoIsolation(TestApplicationGo):
self.assertEqual(obj['PID'], 1, 'pid of container is 1')
+ def test_isolation_namespace_false(self):
+ self.load('ns_inspect')
+ allns = list(self.available['features']['isolation'].keys())
+
+ remove_list = ['unprivileged_userns_clone', 'ipc', 'cgroup']
+ allns = [ns for ns in allns if ns not in remove_list]
+
+ namespaces = {}
+ for ns in allns:
+ if ns == 'user':
+ namespaces['credential'] = False
+ elif ns == 'mnt':
+ namespaces['mount'] = False
+ elif ns == 'net':
+ namespaces['network'] = False
+ elif ns == 'uts':
+ namespaces['uname'] = False
+ else:
+ namespaces[ns] = False
+
+ self.conf_isolation({"namespaces": namespaces})
+
+ obj = self.isolation.parsejson(self.get()['body'])
+
+ for ns in allns:
+ if ns.upper() in obj['NS']:
+ self.assertEqual(
+ obj['NS'][ns.upper()],
+ self.available['features']['isolation'][ns],
+ '%s match' % ns,
+ )
+
if __name__ == '__main__':
TestGoIsolation.main()
diff --git a/test/test_java_websockets.py b/test/test_java_websockets.py
index 3f2c0a8a..d75ee3a6 100644
--- a/test/test_java_websockets.py
+++ b/test/test_java_websockets.py
@@ -8,7 +8,7 @@ from unit.applications.websockets import TestApplicationWebsocket
class TestJavaWebsockets(TestApplicationJava):
prerequisites = {'modules': ['java']}
- ws = TestApplicationWebsocket(True)
+ ws = TestApplicationWebsocket()
def setUp(self):
super().setUp()
@@ -179,18 +179,14 @@ class TestJavaWebsockets(TestApplicationJava):
): # FAIL https://tools.ietf.org/html/rfc6455#section-4.2.1
self.load('websockets_mirror')
- self.get()
-
- key = self.ws.key()
resp = self.get(
headers={
'Host': 'localhost',
'Connection': 'Upgrade',
- 'Sec-WebSocket-Key': key,
+ 'Sec-WebSocket-Key': self.ws.key(),
'Sec-WebSocket-Protocol': 'chat',
'Sec-WebSocket-Version': 13,
},
- read_timeout=1,
)
self.assertEqual(resp['status'], 400, 'upgrade absent')
@@ -198,20 +194,17 @@ class TestJavaWebsockets(TestApplicationJava):
def test_java_websockets_handshake_case_insensitive(self):
self.load('websockets_mirror')
- self.get()
-
- key = self.ws.key()
- resp = self.get(
+ resp, sock, _ = self.ws.upgrade(
headers={
'Host': 'localhost',
'Upgrade': 'WEBSOCKET',
'Connection': 'UPGRADE',
- 'Sec-WebSocket-Key': key,
+ 'Sec-WebSocket-Key': self.ws.key(),
'Sec-WebSocket-Protocol': 'chat',
'Sec-WebSocket-Version': 13,
- },
- read_timeout=1,
+ }
)
+ sock.close()
self.assertEqual(resp['status'], 101, 'status')
@@ -219,18 +212,14 @@ class TestJavaWebsockets(TestApplicationJava):
def test_java_websockets_handshake_connection_absent(self): # FAIL
self.load('websockets_mirror')
- self.get()
-
- key = self.ws.key()
resp = self.get(
headers={
'Host': 'localhost',
'Upgrade': 'websocket',
- 'Sec-WebSocket-Key': key,
+ 'Sec-WebSocket-Key': self.ws.key(),
'Sec-WebSocket-Protocol': 'chat',
'Sec-WebSocket-Version': 13,
},
- read_timeout=1,
)
self.assertEqual(resp['status'], 400, 'status')
@@ -238,18 +227,14 @@ class TestJavaWebsockets(TestApplicationJava):
def test_java_websockets_handshake_version_absent(self):
self.load('websockets_mirror')
- self.get()
-
- key = self.ws.key()
resp = self.get(
headers={
'Host': 'localhost',
'Upgrade': 'websocket',
'Connection': 'Upgrade',
- 'Sec-WebSocket-Key': key,
+ 'Sec-WebSocket-Key': self.ws.key(),
'Sec-WebSocket-Protocol': 'chat',
},
- read_timeout=1,
)
self.assertEqual(resp['status'], 426, 'status')
@@ -258,8 +243,6 @@ class TestJavaWebsockets(TestApplicationJava):
def test_java_websockets_handshake_key_invalid(self):
self.load('websockets_mirror')
- self.get()
-
resp = self.get(
headers={
'Host': 'localhost',
@@ -269,7 +252,6 @@ class TestJavaWebsockets(TestApplicationJava):
'Sec-WebSocket-Protocol': 'chat',
'Sec-WebSocket-Version': 13,
},
- read_timeout=1,
)
self.assertEqual(resp['status'], 400, 'key length')
@@ -284,7 +266,6 @@ class TestJavaWebsockets(TestApplicationJava):
'Sec-WebSocket-Protocol': 'chat',
'Sec-WebSocket-Version': 13,
},
- read_timeout=1,
)
self.assertEqual(
@@ -294,19 +275,15 @@ class TestJavaWebsockets(TestApplicationJava):
def test_java_websockets_handshake_method_invalid(self):
self.load('websockets_mirror')
- self.get()
-
- key = self.ws.key()
resp = self.post(
headers={
'Host': 'localhost',
'Upgrade': 'websocket',
'Connection': 'Upgrade',
- 'Sec-WebSocket-Key': key,
+ 'Sec-WebSocket-Key': self.ws.key(),
'Sec-WebSocket-Protocol': 'chat',
'Sec-WebSocket-Version': 13,
},
- read_timeout=1,
)
self.assertEqual(resp['status'], 400, 'status')
@@ -314,20 +291,16 @@ class TestJavaWebsockets(TestApplicationJava):
def test_java_websockets_handshake_http_10(self):
self.load('websockets_mirror')
- self.get()
-
- key = self.ws.key()
resp = self.get(
headers={
'Host': 'localhost',
'Upgrade': 'websocket',
'Connection': 'Upgrade',
- 'Sec-WebSocket-Key': key,
+ 'Sec-WebSocket-Key': self.ws.key(),
'Sec-WebSocket-Protocol': 'chat',
'Sec-WebSocket-Version': 13,
},
http_10=True,
- read_timeout=1,
)
self.assertEqual(resp['status'], 400, 'status')
@@ -335,20 +308,16 @@ class TestJavaWebsockets(TestApplicationJava):
def test_java_websockets_handshake_uri_invalid(self):
self.load('websockets_mirror')
- self.get()
-
- key = self.ws.key()
resp = self.get(
headers={
'Host': 'localhost',
'Upgrade': 'websocket',
'Connection': 'Upgrade',
- 'Sec-WebSocket-Key': key,
+ 'Sec-WebSocket-Key': self.ws.key(),
'Sec-WebSocket-Protocol': 'chat',
'Sec-WebSocket-Version': 13,
},
url='!',
- read_timeout=1,
)
self.assertEqual(resp['status'], 400, 'status')
@@ -356,19 +325,17 @@ class TestJavaWebsockets(TestApplicationJava):
def test_java_websockets_protocol_absent(self):
self.load('websockets_mirror')
- self.get()
-
key = self.ws.key()
- resp = self.get(
+ resp, sock, _ = self.ws.upgrade(
headers={
'Host': 'localhost',
'Upgrade': 'websocket',
'Connection': 'Upgrade',
'Sec-WebSocket-Key': key,
'Sec-WebSocket-Version': 13,
- },
- read_timeout=1,
+ }
)
+ sock.close()
self.assertEqual(resp['status'], 101, 'status')
self.assertEqual(resp['headers']['Upgrade'], 'websocket', 'upgrade')
@@ -1165,7 +1132,7 @@ class TestJavaWebsockets(TestApplicationJava):
sock.close()
- # 7_3_1 # FAIL
+ # 7_3_1
_, sock, _ = self.ws.upgrade()
diff --git a/test/test_node_websockets.py b/test/test_node_websockets.py
index b24bee75..bb189552 100644
--- a/test/test_node_websockets.py
+++ b/test/test_node_websockets.py
@@ -198,16 +198,14 @@ class TestNodeWebsockets(TestApplicationNode):
): # FAIL https://tools.ietf.org/html/rfc6455#section-4.2.1
self.load('websockets/mirror')
- key = self.ws.key()
resp = self.get(
headers={
'Host': 'localhost',
'Connection': 'Upgrade',
- 'Sec-WebSocket-Key': key,
+ 'Sec-WebSocket-Key': self.ws.key(),
'Sec-WebSocket-Protocol': 'chat',
'Sec-WebSocket-Version': 13,
},
- read_timeout=1,
)
self.assertEqual(resp['status'], 400, 'upgrade absent')
@@ -215,18 +213,17 @@ class TestNodeWebsockets(TestApplicationNode):
def test_node_websockets_handshake_case_insensitive(self):
self.load('websockets/mirror')
- key = self.ws.key()
- resp = self.get(
+ resp, sock, _ = self.ws.upgrade(
headers={
'Host': 'localhost',
'Upgrade': 'WEBSOCKET',
'Connection': 'UPGRADE',
- 'Sec-WebSocket-Key': key,
+ 'Sec-WebSocket-Key': self.ws.key(),
'Sec-WebSocket-Protocol': 'chat',
'Sec-WebSocket-Version': 13,
- },
- read_timeout=1,
+ }
)
+ sock.close()
self.assertEqual(resp['status'], 101, 'status')
@@ -234,16 +231,14 @@ class TestNodeWebsockets(TestApplicationNode):
def test_node_websockets_handshake_connection_absent(self): # FAIL
self.load('websockets/mirror')
- key = self.ws.key()
resp = self.get(
headers={
'Host': 'localhost',
'Upgrade': 'websocket',
- 'Sec-WebSocket-Key': key,
+ 'Sec-WebSocket-Key': self.ws.key(),
'Sec-WebSocket-Protocol': 'chat',
'Sec-WebSocket-Version': 13,
},
- read_timeout=1,
)
self.assertEqual(resp['status'], 400, 'status')
@@ -251,16 +246,14 @@ class TestNodeWebsockets(TestApplicationNode):
def test_node_websockets_handshake_version_absent(self):
self.load('websockets/mirror')
- key = self.ws.key()
resp = self.get(
headers={
'Host': 'localhost',
'Upgrade': 'websocket',
'Connection': 'Upgrade',
- 'Sec-WebSocket-Key': key,
+ 'Sec-WebSocket-Key': self.ws.key(),
'Sec-WebSocket-Protocol': 'chat',
},
- read_timeout=1,
)
self.assertEqual(resp['status'], 426, 'status')
@@ -278,7 +271,6 @@ class TestNodeWebsockets(TestApplicationNode):
'Sec-WebSocket-Protocol': 'chat',
'Sec-WebSocket-Version': 13,
},
- read_timeout=1,
)
self.assertEqual(resp['status'], 400, 'key length')
@@ -293,7 +285,6 @@ class TestNodeWebsockets(TestApplicationNode):
'Sec-WebSocket-Protocol': 'chat',
'Sec-WebSocket-Version': 13,
},
- read_timeout=1,
)
self.assertEqual(
@@ -303,17 +294,15 @@ class TestNodeWebsockets(TestApplicationNode):
def test_node_websockets_handshake_method_invalid(self):
self.load('websockets/mirror')
- key = self.ws.key()
resp = self.post(
headers={
'Host': 'localhost',
'Upgrade': 'websocket',
'Connection': 'Upgrade',
- 'Sec-WebSocket-Key': key,
+ 'Sec-WebSocket-Key': self.ws.key(),
'Sec-WebSocket-Protocol': 'chat',
'Sec-WebSocket-Version': 13,
},
- read_timeout=1,
)
self.assertEqual(resp['status'], 400, 'status')
@@ -321,18 +310,16 @@ class TestNodeWebsockets(TestApplicationNode):
def test_node_websockets_handshake_http_10(self):
self.load('websockets/mirror')
- key = self.ws.key()
resp = self.get(
headers={
'Host': 'localhost',
'Upgrade': 'websocket',
'Connection': 'Upgrade',
- 'Sec-WebSocket-Key': key,
+ 'Sec-WebSocket-Key': self.ws.key(),
'Sec-WebSocket-Protocol': 'chat',
'Sec-WebSocket-Version': 13,
},
http_10=True,
- read_timeout=1,
)
self.assertEqual(resp['status'], 400, 'status')
@@ -340,18 +327,16 @@ class TestNodeWebsockets(TestApplicationNode):
def test_node_websockets_handshake_uri_invalid(self):
self.load('websockets/mirror')
- key = self.ws.key()
resp = self.get(
headers={
'Host': 'localhost',
'Upgrade': 'websocket',
'Connection': 'Upgrade',
- 'Sec-WebSocket-Key': key,
+ 'Sec-WebSocket-Key': self.ws.key(),
'Sec-WebSocket-Protocol': 'chat',
'Sec-WebSocket-Version': 13,
},
url='!',
- read_timeout=1,
)
self.assertEqual(resp['status'], 400, 'status')
@@ -360,16 +345,16 @@ class TestNodeWebsockets(TestApplicationNode):
self.load('websockets/mirror')
key = self.ws.key()
- resp = self.get(
+ resp, sock, _ = self.ws.upgrade(
headers={
'Host': 'localhost',
'Upgrade': 'websocket',
'Connection': 'Upgrade',
'Sec-WebSocket-Key': key,
'Sec-WebSocket-Version': 13,
- },
- read_timeout=1,
+ }
)
+ sock.close()
self.assertEqual(resp['status'], 101, 'status')
self.assertEqual(resp['headers']['Upgrade'], 'websocket', 'upgrade')
@@ -1166,7 +1151,7 @@ class TestNodeWebsockets(TestApplicationNode):
sock.close()
- # 7_3_1 # FAIL
+ # 7_3_1
_, sock, _ = self.ws.upgrade()
diff --git a/test/test_proxy.py b/test/test_proxy.py
new file mode 100644
index 00000000..4697b88f
--- /dev/null
+++ b/test/test_proxy.py
@@ -0,0 +1,622 @@
+import re
+import time
+import socket
+import unittest
+from unit.applications.lang.python import TestApplicationPython
+
+
+class TestProxy(TestApplicationPython):
+ prerequisites = {'modules': ['python']}
+
+ SERVER_PORT = 7999
+
+ def run_server(self):
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+
+ server_address = ('', self.SERVER_PORT)
+ sock.bind(server_address)
+ sock.listen(5)
+
+ def recvall(sock):
+ buff_size = 4096
+ data = b''
+ while True:
+ part = sock.recv(buff_size)
+ data += part
+ if len(part) < buff_size:
+ break
+ return data
+
+ req = b"""HTTP/1.1 200 OK
+Content-Length: 10
+
+"""
+
+ while True:
+ connection, client_address = sock.accept()
+
+ data = recvall(connection).decode()
+
+ to_send = req
+
+ m = re.search('X-Len: (\d+)', data)
+ if m:
+ to_send += b'X' * int(m.group(1))
+
+ connection.sendall(to_send)
+
+ connection.close()
+
+ def get_http10(self, *args, **kwargs):
+ return self.get(*args, http_10=True, **kwargs)
+
+ def post_http10(self, *args, **kwargs):
+ return self.post(*args, http_10=True, **kwargs)
+
+ def setUp(self):
+ super().setUp()
+
+ self.run_process(self.run_server)
+ self.waitforsocket(self.SERVER_PORT)
+
+ self.assertIn(
+ 'success',
+ self.conf(
+ {
+ "listeners": {
+ "*:7080": {"pass": "routes"},
+ "*:7081": {"pass": "applications/mirror"},
+ },
+ "routes": [{"action": {"proxy": "http://127.0.0.1:7081"}}],
+ "applications": {
+ "mirror": {
+ "type": "python",
+ "processes": {"spare": 0},
+ "path": self.current_dir + "/python/mirror",
+ "working_directory": self.current_dir
+ + "/python/mirror",
+ "module": "wsgi",
+ },
+ "custom_header": {
+ "type": "python",
+ "processes": {"spare": 0},
+ "path": self.current_dir + "/python/custom_header",
+ "working_directory": self.current_dir
+ + "/python/custom_header",
+ "module": "wsgi",
+ },
+ "delayed": {
+ "type": "python",
+ "processes": {"spare": 0},
+ "path": self.current_dir + "/python/delayed",
+ "working_directory": self.current_dir
+ + "/python/delayed",
+ "module": "wsgi",
+ },
+ },
+ }
+ ),
+ 'proxy initial configuration',
+ )
+
+ def test_proxy_http10(self):
+ for _ in range(10):
+ self.assertEqual(self.get_http10()['status'], 200, 'status')
+
+ def test_proxy_chain(self):
+ self.assertIn(
+ 'success',
+ self.conf(
+ {
+ "listeners": {
+ "*:7080": {"pass": "routes/first"},
+ "*:7081": {"pass": "routes/second"},
+ "*:7082": {"pass": "routes/third"},
+ "*:7083": {"pass": "routes/fourth"},
+ "*:7084": {"pass": "routes/fifth"},
+ "*:7085": {"pass": "applications/mirror"},
+ },
+ "routes": {
+ "first": [
+ {"action": {"proxy": "http://127.0.0.1:7081"}}
+ ],
+ "second": [
+ {"action": {"proxy": "http://127.0.0.1:7082"}}
+ ],
+ "third": [
+ {"action": {"proxy": "http://127.0.0.1:7083"}}
+ ],
+ "fourth": [
+ {"action": {"proxy": "http://127.0.0.1:7084"}}
+ ],
+ "fifth": [
+ {"action": {"proxy": "http://127.0.0.1:7085"}}
+ ],
+ },
+ "applications": {
+ "mirror": {
+ "type": "python",
+ "processes": {"spare": 0},
+ "path": self.current_dir + "/python/mirror",
+ "working_directory": self.current_dir
+ + "/python/mirror",
+ "module": "wsgi",
+ }
+ },
+ }
+ ),
+ 'proxy chain configuration',
+ )
+
+ self.assertEqual(self.get_http10()['status'], 200, 'status')
+
+ def test_proxy_body(self):
+ payload = '0123456789'
+ for _ in range(10):
+ resp = self.post_http10(body=payload)
+
+ self.assertEqual(resp['status'], 200, 'status')
+ self.assertEqual(resp['body'], payload, 'body')
+
+ payload = 'X' * 4096
+ for _ in range(10):
+ resp = self.post_http10(body=payload)
+
+ self.assertEqual(resp['status'], 200, 'status')
+ self.assertEqual(resp['body'], payload, 'body')
+
+ payload = 'X' * 4097
+ for _ in range(10):
+ resp = self.post_http10(body=payload)
+
+ self.assertEqual(resp['status'], 200, 'status')
+ self.assertEqual(resp['body'], payload, 'body')
+
+ payload = 'X' * 4096 * 256
+ for _ in range(10):
+ resp = self.post_http10(body=payload, read_buffer_size=4096 * 128)
+
+ self.assertEqual(resp['status'], 200, 'status')
+ self.assertEqual(resp['body'], payload, 'body')
+
+ payload = 'X' * 4096 * 257
+ for _ in range(10):
+ resp = self.post_http10(body=payload, read_buffer_size=4096 * 128)
+
+ self.assertEqual(resp['status'], 200, 'status')
+ self.assertEqual(resp['body'], payload, 'body')
+
+ def test_proxy_parallel(self):
+ payload = 'X' * 4096 * 257
+ buff_size = 4096 * 258
+
+ socks = []
+ for i in range(10):
+ _, sock = self.post_http10(
+ body=payload + str(i),
+ start=True,
+ no_recv=True,
+ read_buffer_size=buff_size,
+ )
+ socks.append(sock)
+
+ for i in range(10):
+ resp = self.recvall(socks[i], buff_size=buff_size).decode()
+ socks[i].close()
+
+ resp = self._resp_to_dict(resp)
+
+ self.assertEqual(resp['status'], 200, 'status')
+ self.assertEqual(resp['body'], payload + str(i), 'body')
+
+ def test_proxy_header(self):
+ self.assertIn(
+ 'success',
+ self.conf(
+ {"pass": "applications/custom_header"}, 'listeners/*:7081'
+ ),
+ 'custom_header configure',
+ )
+
+ header_value = 'blah'
+ self.assertEqual(
+ self.get_http10(
+ headers={'Host': 'localhost', 'Custom-Header': header_value}
+ )['headers']['Custom-Header'],
+ header_value,
+ 'custom header',
+ )
+
+ header_value = '(),/:;<=>?@[\]{}\t !#$%&\'*+-.^_`|~'
+ self.assertEqual(
+ self.get_http10(
+ headers={'Host': 'localhost', 'Custom-Header': header_value}
+ )['headers']['Custom-Header'],
+ header_value,
+ 'custom header 2',
+ )
+
+ header_value = 'X' * 4096
+ self.assertEqual(
+ self.get_http10(
+ headers={'Host': 'localhost', 'Custom-Header': header_value}
+ )['headers']['Custom-Header'],
+ header_value,
+ 'custom header 3',
+ )
+
+ header_value = 'X' * 8191
+ self.assertEqual(
+ self.get_http10(
+ headers={'Host': 'localhost', 'Custom-Header': header_value}
+ )['headers']['Custom-Header'],
+ header_value,
+ 'custom header 4',
+ )
+
+ header_value = 'X' * 8192
+ self.assertEqual(
+ self.get_http10(
+ headers={'Host': 'localhost', 'Custom-Header': header_value}
+ )['status'],
+ 431,
+ 'custom header 5',
+ )
+
+ def test_proxy_fragmented(self):
+ _, sock = self.http(
+ b"""GET / HTT""", raw=True, start=True, no_recv=True
+ )
+
+ time.sleep(1)
+
+ sock.sendall("P/1.0\r\nHost: localhos".encode())
+
+ time.sleep(1)
+
+ sock.sendall("t\r\n\r\n".encode())
+
+ self.assertRegex(
+ self.recvall(sock).decode(), '200 OK', 'fragmented send'
+ )
+ sock.close()
+
+ def test_proxy_fragmented_close(self):
+ _, sock = self.http(
+ b"""GET / HTT""", raw=True, start=True, no_recv=True
+ )
+
+ time.sleep(1)
+
+ sock.sendall("P/1.0\r\nHo".encode())
+
+ sock.close()
+
+ def test_proxy_fragmented_body(self):
+ _, sock = self.http(
+ b"""GET / HTT""", raw=True, start=True, no_recv=True
+ )
+
+ time.sleep(1)
+
+ sock.sendall("P/1.0\r\nHost: localhost\r\n".encode())
+ sock.sendall("Content-Length: 30000\r\n".encode())
+
+ time.sleep(1)
+
+ sock.sendall("\r\n".encode())
+ sock.sendall(("X" * 10000).encode())
+
+ time.sleep(1)
+
+ sock.sendall(("X" * 10000).encode())
+
+ time.sleep(1)
+
+ sock.sendall(("X" * 10000).encode())
+
+ resp = self._resp_to_dict(self.recvall(sock).decode())
+ sock.close()
+
+ self.assertEqual(resp['status'], 200, 'status')
+ self.assertEqual(resp['body'], "X" * 30000, 'body')
+
+ def test_proxy_fragmented_body_close(self):
+ _, sock = self.http(
+ b"""GET / HTT""", raw=True, start=True, no_recv=True
+ )
+
+ time.sleep(1)
+
+ sock.sendall("P/1.0\r\nHost: localhost\r\n".encode())
+ sock.sendall("Content-Length: 30000\r\n".encode())
+
+ time.sleep(1)
+
+ sock.sendall("\r\n".encode())
+ sock.sendall(("X" * 10000).encode())
+
+ sock.close()
+
+ def test_proxy_nowhere(self):
+ self.assertIn(
+ 'success',
+ self.conf(
+ [{"action": {"proxy": "http://127.0.0.1:7082"}}], 'routes'
+ ),
+ 'proxy path changed',
+ )
+
+ self.assertEqual(self.get_http10()['status'], 502, 'status')
+
+ def test_proxy_ipv6(self):
+ self.assertIn(
+ 'success',
+ self.conf(
+ {
+ "*:7080": {"pass": "routes"},
+ "[::1]:7081": {'application': 'mirror'},
+ },
+ 'listeners',
+ ),
+ 'add ipv6 listener configure',
+ )
+
+ self.assertIn(
+ 'success',
+ self.conf([{"action": {"proxy": "http://[::1]:7081"}}], 'routes'),
+ 'proxy ipv6 configure',
+ )
+
+ self.assertEqual(self.get_http10()['status'], 200, 'status')
+
+ def test_proxy_unix(self):
+ addr = self.testdir + '/sock'
+
+ self.assertIn(
+ 'success',
+ self.conf(
+ {
+ "*:7080": {"pass": "routes"},
+ "unix:" + addr: {'application': 'mirror'},
+ },
+ 'listeners',
+ ),
+ 'add unix listener configure',
+ )
+
+ self.assertIn(
+ 'success',
+ self.conf(
+ [{"action": {"proxy": 'http://unix:' + addr}}], 'routes'
+ ),
+ 'proxy unix configure',
+ )
+
+ self.assertEqual(self.get_http10()['status'], 200, 'status')
+
+ def test_proxy_delayed(self):
+ self.assertIn(
+ 'success',
+ self.conf(
+ {"pass": "applications/delayed"}, 'listeners/*:7081'
+ ),
+ 'delayed configure',
+ )
+
+ body = '0123456789' * 1000
+ resp = self.post_http10(
+ headers={
+ 'Host': 'localhost',
+ 'Content-Type': 'text/html',
+ 'Content-Length': str(len(body)),
+ 'X-Parts': '2',
+ 'X-Delay': '1',
+ },
+ body=body,
+ )
+
+ self.assertEqual(resp['status'], 200, 'status')
+ self.assertEqual(resp['body'], body, 'body')
+
+ resp = self.post_http10(
+ headers={
+ 'Host': 'localhost',
+ 'Content-Type': 'text/html',
+ 'Content-Length': str(len(body)),
+ 'X-Parts': '2',
+ 'X-Delay': '1',
+ },
+ body=body,
+ )
+
+ self.assertEqual(resp['status'], 200, 'status')
+ self.assertEqual(resp['body'], body, 'body')
+
+ def test_proxy_delayed_close(self):
+ self.assertIn(
+ 'success',
+ self.conf(
+ {"pass": "applications/delayed"}, 'listeners/*:7081'
+ ),
+ 'delayed configure',
+ )
+
+ _, sock = self.post_http10(
+ headers={
+ 'Host': 'localhost',
+ 'Content-Type': 'text/html',
+ 'Content-Length': '10000',
+ 'X-Parts': '3',
+ 'X-Delay': '1',
+ },
+ body='0123456789' * 1000,
+ start=True,
+ no_recv=True,
+ )
+
+ self.assertRegex(
+ sock.recv(100).decode(), '200 OK', 'first'
+ )
+ sock.close()
+
+ _, sock = self.post_http10(
+ headers={
+ 'Host': 'localhost',
+ 'Content-Type': 'text/html',
+ 'Content-Length': '10000',
+ 'X-Parts': '3',
+ 'X-Delay': '1',
+ },
+ body='0123456789' * 1000,
+ start=True,
+ no_recv=True,
+ )
+
+ self.assertRegex(
+ sock.recv(100).decode(), '200 OK', 'second'
+ )
+ sock.close()
+
+ @unittest.skip('not yet')
+ def test_proxy_content_length(self):
+ self.assertIn(
+ 'success',
+ self.conf(
+ [
+ {
+ "action": {
+ "proxy": "http://127.0.0.1:"
+ + str(self.SERVER_PORT)
+ }
+ }
+ ],
+ 'routes',
+ ),
+ 'proxy backend configure',
+ )
+
+ resp = self.get_http10()
+ self.assertEqual(len(resp['body']), 0, 'body lt Content-Length 0')
+
+ resp = self.get_http10(headers={'Host': 'localhost', 'X-Len': '5'})
+ self.assertEqual(len(resp['body']), 5, 'body lt Content-Length 5')
+
+ resp = self.get_http10(headers={'Host': 'localhost', 'X-Len': '9'})
+ self.assertEqual(len(resp['body']), 9, 'body lt Content-Length 9')
+
+ resp = self.get_http10(headers={'Host': 'localhost', 'X-Len': '11'})
+ self.assertEqual(len(resp['body']), 10, 'body gt Content-Length 11')
+
+ resp = self.get_http10(headers={'Host': 'localhost', 'X-Len': '15'})
+ self.assertEqual(len(resp['body']), 10, 'body gt Content-Length 15')
+
+ def test_proxy_invalid(self):
+ self.assertIn(
+ 'error',
+ self.conf([{"action": {"proxy": 'blah'}}], 'routes'),
+ 'proxy invalid',
+ )
+ self.assertIn(
+ 'error',
+ self.conf([{"action": {"proxy": '/blah'}}], 'routes'),
+ 'proxy invalid 2',
+ )
+ self.assertIn(
+ 'error',
+ self.conf([{"action": {"proxy": 'unix:/blah'}}], 'routes'),
+ 'proxy unix invalid 2',
+ )
+ self.assertIn(
+ 'error',
+ self.conf([{"action": {"proxy": 'http://blah'}}], 'routes'),
+ 'proxy unix invalid 3',
+ )
+ self.assertIn(
+ 'error',
+ self.conf([{"action": {"proxy": 'http://127.0.0.1'}}], 'routes'),
+ 'proxy ipv4 invalid',
+ )
+ self.assertIn(
+ 'error',
+ self.conf([{"action": {"proxy": 'http://127.0.0.1:'}}], 'routes'),
+ 'proxy ipv4 invalid 2',
+ )
+ self.assertIn(
+ 'error',
+ self.conf(
+ [{"action": {"proxy": 'http://127.0.0.1:blah'}}], 'routes'
+ ),
+ 'proxy ipv4 invalid 3',
+ )
+ self.assertIn(
+ 'error',
+ self.conf(
+ [{"action": {"proxy": 'http://127.0.0.1:-1'}}], 'routes'
+ ),
+ 'proxy ipv4 invalid 4',
+ )
+ self.assertIn(
+ 'error',
+ self.conf(
+ [{"action": {"proxy": 'http://127.0.0.1:7080b'}}], 'routes'
+ ),
+ 'proxy ipv4 invalid 5',
+ )
+ self.assertIn(
+ 'error',
+ self.conf(
+ [{"action": {"proxy": 'http://[]'}}], 'routes'
+ ),
+ 'proxy ipv6 invalid',
+ )
+ self.assertIn(
+ 'error',
+ self.conf(
+ [{"action": {"proxy": 'http://[]:7080'}}], 'routes'
+ ),
+ 'proxy ipv6 invalid 2',
+ )
+ self.assertIn(
+ 'error',
+ self.conf(
+ [{"action": {"proxy": 'http://[:]:7080'}}], 'routes'
+ ),
+ 'proxy ipv6 invalid 3',
+ )
+ self.assertIn(
+ 'error',
+ self.conf(
+ [{"action": {"proxy": 'http://[::7080'}}], 'routes'
+ ),
+ 'proxy ipv6 invalid 4',
+ )
+
+ @unittest.skip('not yet')
+ def test_proxy_loop(self):
+ self.conf(
+ {
+ "listeners": {
+ "*:7080": {"pass": "routes"},
+ "*:7081": {"pass": "applications/mirror"},
+ "*:7082": {"pass": "routes"},
+ },
+ "routes": [{"action": {"proxy": "http://127.0.0.1:7082"}}],
+ "applications": {
+ "mirror": {
+ "type": "python",
+ "processes": {"spare": 0},
+ "path": self.current_dir + "/python/mirror",
+ "working_directory": self.current_dir
+ + "/python/mirror",
+ "module": "wsgi",
+ },
+ },
+ }
+ )
+
+ self.get_http10(no_recv=True)
+
+if __name__ == '__main__':
+ TestProxy.main()
diff --git a/test/test_python_application.py b/test/test_python_application.py
index 5b6e2089..ae8f01ca 100644
--- a/test/test_python_application.py
+++ b/test/test_python_application.py
@@ -1,3 +1,4 @@
+import re
import time
import unittest
from unit.applications.lang.python import TestApplicationPython
@@ -6,6 +7,10 @@ from unit.applications.lang.python import TestApplicationPython
class TestPythonApplication(TestApplicationPython):
prerequisites = {'modules': ['python']}
+ def findall(self, pattern):
+ with open(self.testdir + '/unit.log', 'r', errors='ignore') as f:
+ return re.findall(pattern, f.read())
+
def test_python_application_variables(self):
self.load('variables')
@@ -130,6 +135,18 @@ class TestPythonApplication(TestApplicationPython):
self.get()['headers']['Server-Port'], '7080', 'Server-Port header'
)
+ @unittest.skip('not yet')
+ def test_python_application_working_directory_invalid(self):
+ self.load('empty')
+
+ self.assertIn(
+ 'success',
+ self.conf('"/blah"', 'applications/empty/working_directory'),
+ 'configure invalid working_directory',
+ )
+
+ self.assertEqual(self.get()['status'], 500, 'status')
+
def test_python_application_204_transfer_encoding(self):
self.load('204_no_content')
@@ -495,6 +512,171 @@ Connection: close
self.assertEqual(self.get()['body'], '0123456789', 'write')
+ def test_python_application_threading(self):
+ """wait_for_record() timeouts after 5s while every thread works at
+ least 3s. So without releasing GIL test should fail.
+ """
+
+ self.load('threading')
+
+ for _ in range(10):
+ self.get(no_recv=True)
+
+ self.assertIsNotNone(
+ self.wait_for_record(r'\(5\) Thread: 100'), 'last thread finished'
+ )
+
+ def test_python_application_iter_exception(self):
+ self.load('iter_exception')
+
+ # Default request doesn't lead to the exception.
+
+ resp = self.get(
+ headers={
+ 'Host': 'localhost',
+ 'X-Skip': '9',
+ 'X-Chunked': '1',
+ 'Connection': 'close',
+ }
+ )
+ self.assertEqual(resp['status'], 200, 'status')
+ self.assertEqual(resp['body'][-5:], '0\r\n\r\n', 'body')
+
+ # Exception before start_response().
+
+ self.assertEqual(self.get()['status'], 503, 'error')
+
+ self.assertIsNotNone(self.wait_for_record(r'Traceback'), 'traceback')
+ self.assertIsNotNone(
+ self.wait_for_record(r'raise Exception\(\'first exception\'\)'),
+ 'first exception raise',
+ )
+ self.assertEqual(
+ len(self.findall(r'Traceback')), 1, 'traceback count 1'
+ )
+
+ # Exception after start_response(), before first write().
+
+ self.assertEqual(
+ self.get(
+ headers={
+ 'Host': 'localhost',
+ 'X-Skip': '1',
+ 'Connection': 'close',
+ }
+ )['status'],
+ 503,
+ 'error 2',
+ )
+
+ self.assertIsNotNone(
+ self.wait_for_record(r'raise Exception\(\'second exception\'\)'),
+ 'exception raise second',
+ )
+ self.assertEqual(
+ len(self.findall(r'Traceback')), 2, 'traceback count 2'
+ )
+
+ # Exception after first write(), before first __next__().
+
+ _, sock = self.get(
+ headers={
+ 'Host': 'localhost',
+ 'X-Skip': '2',
+ 'Connection': 'keep-alive',
+ },
+ start=True,
+ )
+
+ self.assertIsNotNone(
+ self.wait_for_record(r'raise Exception\(\'third exception\'\)'),
+ 'exception raise third',
+ )
+ self.assertEqual(
+ len(self.findall(r'Traceback')), 3, 'traceback count 3'
+ )
+
+ self.assertDictEqual(self.get(sock=sock), {}, 'closed connection')
+
+ # Exception after first write(), before first __next__(),
+ # chunked (incomplete body).
+
+ resp = self.get(
+ headers={
+ 'Host': 'localhost',
+ 'X-Skip': '2',
+ 'X-Chunked': '1',
+ 'Connection': 'close',
+ }
+ )
+ if 'body' in resp:
+ self.assertNotEqual(
+ resp['body'][-5:], '0\r\n\r\n', 'incomplete body'
+ )
+ self.assertEqual(
+ len(self.findall(r'Traceback')), 4, 'traceback count 4'
+ )
+
+ # Exception in __next__().
+
+ _, sock = self.get(
+ headers={
+ 'Host': 'localhost',
+ 'X-Skip': '3',
+ 'Connection': 'keep-alive',
+ },
+ start=True,
+ )
+
+ self.assertIsNotNone(
+ self.wait_for_record(r'raise Exception\(\'next exception\'\)'),
+ 'exception raise next',
+ )
+ self.assertEqual(
+ len(self.findall(r'Traceback')), 5, 'traceback count 5'
+ )
+
+ self.assertDictEqual(self.get(sock=sock), {}, 'closed connection 2')
+
+ # Exception in __next__(), chunked (incomplete body).
+
+ resp = self.get(
+ headers={
+ 'Host': 'localhost',
+ 'X-Skip': '3',
+ 'X-Chunked': '1',
+ 'Connection': 'close',
+ }
+ )
+ if 'body' in resp:
+ self.assertNotEqual(
+ resp['body'][-5:], '0\r\n\r\n', 'incomplete body 2'
+ )
+ self.assertEqual(
+ len(self.findall(r'Traceback')), 6, 'traceback count 6'
+ )
+
+ # Exception before start_response() and in close().
+
+ self.assertEqual(
+ self.get(
+ headers={
+ 'Host': 'localhost',
+ 'X-Not-Skip-Close': '1',
+ 'Connection': 'close',
+ }
+ )['status'],
+ 503,
+ 'error',
+ )
+
+ self.assertIsNotNone(
+ self.wait_for_record(r'raise Exception\(\'close exception\'\)'),
+ 'exception raise close',
+ )
+ self.assertEqual(
+ len(self.findall(r'Traceback')), 8, 'traceback count 8'
+ )
if __name__ == '__main__':
TestPythonApplication.main()
diff --git a/test/test_routing.py b/test/test_routing.py
index 20e3a1c4..2960f978 100644
--- a/test/test_routing.py
+++ b/test/test_routing.py
@@ -8,34 +8,38 @@ class TestRouting(TestApplicationProto):
def setUp(self):
super().setUp()
- self.conf(
- {
- "listeners": {"*:7080": {"pass": "routes"}},
- "routes": [
- {
- "match": {"method": "GET"},
- "action": {"pass": "applications/empty"},
- }
- ],
- "applications": {
- "empty": {
- "type": "python",
- "processes": {"spare": 0},
- "path": self.current_dir + '/python/empty',
- "working_directory": self.current_dir
- + '/python/empty',
- "module": "wsgi",
- },
- "mirror": {
- "type": "python",
- "processes": {"spare": 0},
- "path": self.current_dir + '/python/mirror',
- "working_directory": self.current_dir
- + '/python/mirror',
- "module": "wsgi",
+ self.assertIn(
+ 'success',
+ self.conf(
+ {
+ "listeners": {"*:7080": {"pass": "routes"}},
+ "routes": [
+ {
+ "match": {"method": "GET"},
+ "action": {"pass": "applications/empty"},
+ }
+ ],
+ "applications": {
+ "empty": {
+ "type": "python",
+ "processes": {"spare": 0},
+ "path": self.current_dir + '/python/empty',
+ "working_directory": self.current_dir
+ + '/python/empty',
+ "module": "wsgi",
+ },
+ "mirror": {
+ "type": "python",
+ "processes": {"spare": 0},
+ "path": self.current_dir + '/python/mirror',
+ "working_directory": self.current_dir
+ + '/python/mirror',
+ "module": "wsgi",
+ },
},
- },
- }
+ }
+ ),
+ 'routing configure',
)
def route(self, route):
@@ -897,31 +901,75 @@ class TestRouting(TestApplicationProto):
'success',
self.route(
{
- "match": {"uri": "/"},
+ "match": {"uri": ["/blah", "/slash/"]},
"action": {"pass": "applications/empty"},
}
),
'match uri positive configure',
)
- self.assertEqual(self.get()['status'], 200, 'match uri positive')
+ self.assertEqual(self.get()['status'], 404, 'match uri positive')
+ self.assertEqual(
+ self.get(url='/blah')['status'], 200, 'match uri positive blah'
+ )
+ self.assertEqual(
+ self.get(url='/blah#foo')['status'],
+ 200,
+ 'match uri positive #foo',
+ )
+ self.assertEqual(
+ self.get(url='/blah?var')['status'], 200, 'match uri args'
+ )
+ self.assertEqual(
+ self.get(url='//blah')['status'], 200, 'match uri adjacent slashes'
+ )
self.assertEqual(
- self.get(url='/blah')['status'], 404, 'match uri positive blah'
+ self.get(url='/slash/foo/../')['status'],
+ 200,
+ 'match uri relative path',
+ )
+ self.assertEqual(
+ self.get(url='/slash/./')['status'],
+ 200,
+ 'match uri relative path 2',
+ )
+ self.assertEqual(
+ self.get(url='/slash//.//')['status'],
+ 200,
+ 'match uri adjacent slashes 2',
+ )
+ self.assertEqual(
+ self.get(url='/%')['status'], 400, 'match uri percent'
)
self.assertEqual(
- self.get(url='/#blah')['status'], 200, 'match uri positive #blah'
+ self.get(url='/%1')['status'], 400, 'match uri percent digit'
)
self.assertEqual(
- self.get(url='/?var')['status'], 200, 'match uri params'
+ self.get(url='/%A')['status'], 400, 'match uri percent letter'
)
self.assertEqual(
- self.get(url='//')['status'], 200, 'match uri adjacent slashes'
+ self.get(url='/slash/.?args')['status'], 200, 'match uri dot args'
)
self.assertEqual(
- self.get(url='/blah/../')['status'], 200, 'match uri relative path'
+ self.get(url='/slash/.#frag')['status'], 200, 'match uri dot frag'
)
self.assertEqual(
- self.get(url='/./')['status'], 200, 'match uri relative path'
+ self.get(url='/slash/foo/..?args')['status'],
+ 200,
+ 'match uri dot dot args',
+ )
+ self.assertEqual(
+ self.get(url='/slash/foo/..#frag')['status'],
+ 200,
+ 'match uri dot dot frag',
+ )
+ self.assertEqual(
+ self.get(url='/slash/.')['status'], 200, 'match uri trailing dot'
+ )
+ self.assertEqual(
+ self.get(url='/slash/foo/..')['status'],
+ 200,
+ 'match uri trailing dot dot',
)
def test_routes_match_uri_case_sensitive(self):
diff --git a/test/test_ruby_application.py b/test/test_ruby_application.py
index 6f82ae81..bbb252d7 100644
--- a/test/test_ruby_application.py
+++ b/test/test_ruby_application.py
@@ -347,6 +347,29 @@ class TestRubyApplication(TestApplicationRuby):
self.assertEqual(resp['body'], '0123456789', 'keep-alive 2')
+ def test_ruby_application_constants(self):
+ self.load('constants')
+
+ resp = self.get()
+
+ self.assertEqual(resp['status'], 200, 'status')
+
+ headers = resp['headers']
+ self.assertGreater(len(headers['X-Copyright']), 0, 'RUBY_COPYRIGHT')
+ self.assertGreater(
+ len(headers['X-Description']), 0, 'RUBY_DESCRIPTION'
+ )
+ self.assertGreater(len(headers['X-Engine']), 0, 'RUBY_ENGINE')
+ self.assertGreater(
+ len(headers['X-Engine-Version']), 0, 'RUBY_ENGINE_VERSION'
+ )
+ self.assertGreater(len(headers['X-Patchlevel']), 0, 'RUBY_PATCHLEVEL')
+ self.assertGreater(len(headers['X-Platform']), 0, 'RUBY_PLATFORM')
+ self.assertGreater(
+ len(headers['X-Release-Date']), 0, 'RUBY_RELEASE_DATE'
+ )
+ self.assertGreater(len(headers['X-Revision']), 0, 'RUBY_REVISION')
+ self.assertGreater(len(headers['X-Version']), 0, 'RUBY_VERSION')
if __name__ == '__main__':
TestRubyApplication.main()
diff --git a/test/test_static.py b/test/test_static.py
index 4bdd83ed..f9dcb7dd 100644
--- a/test/test_static.py
+++ b/test/test_static.py
@@ -40,6 +40,12 @@ class TestStatic(TestApplicationProto):
)
self.assertEqual(self.get(url='/')['body'], '0123456789', 'index 2')
self.assertEqual(
+ self.get(url='/?blah')['body'], '0123456789', 'index vars'
+ )
+ self.assertEqual(
+ self.get(url='/#blah')['body'], '0123456789', 'index anchor'
+ )
+ self.assertEqual(
self.get(url='/dir/')['status'], 404, 'index not found'
)
diff --git a/test/test_usr1.py b/test/test_usr1.py
new file mode 100644
index 00000000..dd9292c7
--- /dev/null
+++ b/test/test_usr1.py
@@ -0,0 +1,92 @@
+import os
+import unittest
+from subprocess import call
+from unit.applications.lang.python import TestApplicationPython
+
+
+class TestUSR1(TestApplicationPython):
+ prerequisites = {'modules': ['python']}
+
+ def test_usr1_access_log(self):
+ self.load('empty')
+
+ log_path = self.testdir + '/access.log'
+
+ self.assertIn(
+ 'success',
+ self.conf('"' + log_path + '"', 'access_log'),
+ 'access log configure',
+ )
+
+ self.assertTrue(self.waitforfiles(log_path), 'open')
+
+ log_path_new = self.testdir + '/new.log'
+
+ os.rename(log_path, log_path_new)
+
+ self.get()
+
+ self.assertIsNotNone(
+ self.wait_for_record(r'"GET / HTTP/1.1" 200 0 "-" "-"', 'new.log'),
+ 'rename new',
+ )
+ self.assertFalse(os.path.isfile(log_path), 'rename old')
+
+ with open(self.testdir + '/unit.pid', 'r') as f:
+ pid = f.read().rstrip()
+
+ call(['kill', '-s', 'USR1', pid])
+
+ self.assertTrue(self.waitforfiles(log_path), 'reopen')
+
+ self.get(url='/usr1')
+
+ self.assertIsNotNone(
+ self.wait_for_record(
+ r'"GET /usr1 HTTP/1.1" 200 0 "-" "-"', 'access.log'
+ ),
+ 'reopen 2',
+ )
+ self.assertIsNone(
+ self.search_in_log(r'/usr1', 'new.log'), 'rename new 2'
+ )
+
+ @unittest.skip('not yet')
+ def test_usr1_unit_log(self):
+ self.load('log_body')
+
+ log_path = self.testdir + '/unit.log'
+ log_path_new = self.testdir + '/new.log'
+
+ os.rename(log_path, log_path_new)
+
+ body = 'body_for_a_log_new'
+ self.post(body=body)
+
+ self.assertIsNotNone(
+ self.wait_for_record(body, 'new.log'), 'rename new'
+ )
+ self.assertFalse(os.path.isfile(log_path), 'rename old')
+
+ with open(self.testdir + '/unit.pid', 'r') as f:
+ pid = f.read().rstrip()
+
+ call(['kill', '-s', 'USR1', pid])
+
+ self.assertTrue(self.waitforfiles(log_path), 'reopen')
+
+ body = 'body_for_a_log_unit'
+ self.post(body=body)
+
+ self.assertIsNotNone(self.wait_for_record(body), 'rename new')
+ self.assertIsNone(self.search_in_log(body, 'new.log'), 'rename new 2')
+
+ # merge two log files into unit.log to check alerts
+
+ with open(log_path, 'w') as unit_log, \
+ open(log_path_new, 'r') as new_log:
+ unit_log.write(new_log.read())
+
+
+if __name__ == '__main__':
+ TestUSR1.main()
diff --git a/test/unit/applications/websockets.py b/test/unit/applications/websockets.py
index 50ff2797..ef16f433 100644
--- a/test/unit/applications/websockets.py
+++ b/test/unit/applications/websockets.py
@@ -1,3 +1,4 @@
+import re
import random
import base64
import struct
@@ -30,25 +31,37 @@ class TestApplicationWebsocket(TestApplicationProto):
sha1 = hashlib.sha1((key + GUID).encode()).digest()
return base64.b64encode(sha1).decode()
- def upgrade(self):
- key = self.key()
+ def upgrade(self, headers=None):
+ key = None
- if self.preinit:
- self.get()
-
- resp, sock = self.get(
- headers={
+ if headers is None:
+ key = self.key()
+ headers = {
'Host': 'localhost',
'Upgrade': 'websocket',
'Connection': 'Upgrade',
'Sec-WebSocket-Key': key,
'Sec-WebSocket-Protocol': 'chat',
'Sec-WebSocket-Version': 13,
- },
- read_timeout=1,
+ }
+
+ _, sock = self.get(
+ headers=headers,
+ no_recv=True,
start=True,
)
+ resp = ''
+ while select.select([sock], [], [], 30)[0]:
+ resp += sock.recv(4096).decode()
+
+ if (
+ re.search('101 Switching Protocols', resp)
+ and resp[-4:] == '\r\n\r\n'
+ ):
+ resp = self._resp_to_dict(resp)
+ break
+
return (resp, sock, key)
def apply_mask(self, data, mask):
diff --git a/test/unit/http.py b/test/unit/http.py
index 82a6bd6a..c7e3e36d 100644
--- a/test/unit/http.py
+++ b/test/unit/http.py
@@ -1,4 +1,5 @@
import re
+import time
import socket
import select
from unit.main import TestUnit
@@ -63,7 +64,7 @@ class TestHTTP(TestUnit):
if 'raw' not in kwargs:
req = ' '.join([start_str, url, http]) + crlf
- if body is not b'':
+ if body != b'':
if isinstance(body, str):
body = body.encode()
@@ -178,3 +179,20 @@ class TestHTTP(TestUnit):
headers[m.group(1)] = [headers[m.group(1)], m.group(2)]
return {'status': int(status), 'headers': headers, 'body': body}
+
+ def waitforsocket(self, port):
+ ret = False
+
+ for i in range(50):
+ try:
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.connect(('127.0.0.1', port))
+ ret = True
+ break
+ except:
+ sock.close()
+ time.sleep(0.1)
+
+ sock.close()
+
+ self.assertTrue(ret, 'socket connected')
diff --git a/test/unit/main.py b/test/unit/main.py
index 873f1815..094fdb0e 100644
--- a/test/unit/main.py
+++ b/test/unit/main.py
@@ -185,6 +185,8 @@ class TestUnit(unittest.TestCase):
if self._started:
self._stop()
+ self.stop_processes()
+
def _run(self):
self.unitd = self.pardir + '/build/unitd'
@@ -240,24 +242,24 @@ class TestUnit(unittest.TestCase):
break
time.sleep(0.1)
- if os.path.exists(self.testdir + '/unit.pid'):
- exit("Could not terminate unit")
+ self._p.join(timeout=5)
- self._started = False
+ if self._p.is_alive():
+ self._p.terminate()
+ self._p.join(timeout=5)
- self._p.join(timeout=1)
- self._terminate_process(self._p)
+ if self._p.is_alive():
+ self.fail("Could not terminate process " + str(self._p.pid))
- def _terminate_process(self, process):
- if process.is_alive():
- process.terminate()
- process.join(timeout=5)
+ if os.path.exists(self.testdir + '/unit.pid'):
+ self.fail("Could not terminate unit")
- if process.is_alive():
- exit("Could not terminate process " + process.pid)
+ self._started = False
- if process.exitcode:
- exit("Child process terminated with code " + str(process.exitcode))
+ if self._p.exitcode:
+ self.fail(
+ "Child process terminated with code " + str(self._p.exitcode)
+ )
def _check_alerts(self, log):
found = False
@@ -287,6 +289,26 @@ class TestUnit(unittest.TestCase):
if found:
print('skipped.')
+ def run_process(self, target):
+ if not hasattr(self, '_processes'):
+ self._processes = []
+
+ process = Process(target=target)
+ process.start()
+
+ self._processes.append(process)
+
+ def stop_processes(self):
+ if not hasattr(self, '_processes'):
+ return
+
+ for process in self._processes:
+ process.terminate()
+ process.join(timeout=5)
+
+ if process.is_alive():
+ self.fail('Fail to stop process')
+
def waitforfiles(self, *files):
for i in range(50):
wait = False
diff --git a/version b/version
index 33ee1242..2e798f2b 100644
--- a/version
+++ b/version
@@ -1,5 +1,5 @@
# Copyright (C) NGINX, Inc.
-NXT_VERSION=1.12.0
-NXT_VERNUM=11200
+NXT_VERSION=1.13.0
+NXT_VERNUM=11300