From f0e9e3ace94c82fab78ab1d4ee8c3042f3e94fdf Mon Sep 17 00:00:00 2001 From: Igor Sysoev Date: Thu, 31 Aug 2017 00:42:16 +0300 Subject: nginext has been renamed to unit. --- auto/make | 4 +- auto/modules/go | 15 +- auto/modules/php | 8 +- auto/modules/python | 8 +- auto/shmem | 4 +- auto/summary | 8 +- configure | 6 +- src/go/unit/cbytes-1.6.go | 15 ++ src/go/unit/cbytes-1.7.go | 15 ++ src/go/unit/nxt_go_array.c | 66 +++++ src/go/unit/nxt_go_array.h | 36 +++ src/go/unit/nxt_go_lib.c | 193 ++++++++++++++ src/go/unit/nxt_go_lib.h | 39 +++ src/go/unit/nxt_go_log.h | 34 +++ src/go/unit/nxt_go_mutex.h | 21 ++ src/go/unit/nxt_go_port.c | 210 ++++++++++++++++ src/go/unit/nxt_go_port.h | 18 ++ src/go/unit/nxt_go_port_memory.c | 192 ++++++++++++++ src/go/unit/nxt_go_port_memory.h | 24 ++ src/go/unit/nxt_go_process.c | 150 +++++++++++ src/go/unit/nxt_go_process.h | 33 +++ src/go/unit/nxt_go_run_ctx.c | 531 +++++++++++++++++++++++++++++++++++++++ src/go/unit/nxt_go_run_ctx.h | 75 ++++++ src/go/unit/port.go | 219 ++++++++++++++++ src/go/unit/request.go | 211 ++++++++++++++++ src/go/unit/response.go | 69 +++++ src/go/unit/unit.go | 134 ++++++++++ src/nginext/cbytes-1.6.go | 15 -- src/nginext/cbytes-1.7.go | 15 -- src/nginext/nginext.go | 134 ---------- src/nginext/nxt_go_array.c | 66 ----- src/nginext/nxt_go_array.h | 36 --- src/nginext/nxt_go_lib.c | 193 -------------- src/nginext/nxt_go_lib.h | 39 --- src/nginext/nxt_go_log.h | 34 --- src/nginext/nxt_go_mutex.h | 21 -- src/nginext/nxt_go_port.c | 210 ---------------- src/nginext/nxt_go_port.h | 18 -- src/nginext/nxt_go_port_memory.c | 192 -------------- src/nginext/nxt_go_port_memory.h | 24 -- src/nginext/nxt_go_process.c | 150 ----------- src/nginext/nxt_go_process.h | 33 --- src/nginext/nxt_go_run_ctx.c | 531 --------------------------------------- src/nginext/nxt_go_run_ctx.h | 75 ------ src/nginext/port.go | 219 ---------------- src/nginext/request.go | 211 ---------------- src/nginext/response.go | 69 ----- src/nxt_controller.c | 4 +- src/nxt_main.c | 4 +- src/nxt_main_process.c | 2 +- src/nxt_php_sapi.c | 6 +- src/nxt_port_memory.c | 2 +- src/nxt_process.c | 2 +- src/nxt_python_wsgi.c | 10 +- src/nxt_runtime.c | 10 +- 55 files changed, 2331 insertions(+), 2332 deletions(-) create mode 100644 src/go/unit/cbytes-1.6.go create mode 100644 src/go/unit/cbytes-1.7.go create mode 100644 src/go/unit/nxt_go_array.c create mode 100644 src/go/unit/nxt_go_array.h create mode 100644 src/go/unit/nxt_go_lib.c create mode 100644 src/go/unit/nxt_go_lib.h create mode 100644 src/go/unit/nxt_go_log.h create mode 100644 src/go/unit/nxt_go_mutex.h create mode 100644 src/go/unit/nxt_go_port.c create mode 100644 src/go/unit/nxt_go_port.h create mode 100644 src/go/unit/nxt_go_port_memory.c create mode 100644 src/go/unit/nxt_go_port_memory.h create mode 100644 src/go/unit/nxt_go_process.c create mode 100644 src/go/unit/nxt_go_process.h create mode 100644 src/go/unit/nxt_go_run_ctx.c create mode 100644 src/go/unit/nxt_go_run_ctx.h create mode 100644 src/go/unit/port.go create mode 100644 src/go/unit/request.go create mode 100644 src/go/unit/response.go create mode 100644 src/go/unit/unit.go delete mode 100644 src/nginext/cbytes-1.6.go delete mode 100644 src/nginext/cbytes-1.7.go delete mode 100644 src/nginext/nginext.go delete mode 100644 src/nginext/nxt_go_array.c delete mode 100644 src/nginext/nxt_go_array.h delete mode 100644 src/nginext/nxt_go_lib.c delete mode 100644 src/nginext/nxt_go_lib.h delete mode 100644 src/nginext/nxt_go_log.h delete mode 100644 src/nginext/nxt_go_mutex.h delete mode 100644 src/nginext/nxt_go_port.c delete mode 100644 src/nginext/nxt_go_port.h delete mode 100644 src/nginext/nxt_go_port_memory.c delete mode 100644 src/nginext/nxt_go_port_memory.h delete mode 100644 src/nginext/nxt_go_process.c delete mode 100644 src/nginext/nxt_go_process.h delete mode 100644 src/nginext/nxt_go_run_ctx.c delete mode 100644 src/nginext/nxt_go_run_ctx.h delete mode 100644 src/nginext/port.go delete mode 100644 src/nginext/request.go delete mode 100644 src/nginext/response.go diff --git a/auto/make b/auto/make index e977210f..907a97d0 100644 --- a/auto/make +++ b/auto/make @@ -180,7 +180,7 @@ done $echo >> $NXT_MAKEFILE -# nginext executable. +# unit executable. cat << END >> $NXT_MAKEFILE @@ -193,7 +193,7 @@ $NXT_BUILD_DIR/$NXT_BIN: $NXT_BUILD_DIR/$NXT_LIB_STATIC \\ END -# nginext object files. +# unit object files. for nxt_src in $NXT_MAKE_SRCS do diff --git a/auto/modules/go b/auto/modules/go index 748b8c8d..f8a3ea6c 100644 --- a/auto/modules/go +++ b/auto/modules/go @@ -97,24 +97,23 @@ GOARCH = `${NXT_GO} env GOARCH` ${NXT_GO}: $NXT_BUILD_DIR/nxt_go_gen.h -$NXT_BUILD_DIR/nxt_go_gen.h: +$NXT_BUILD_DIR/nxt_go_gen.h: src/go/unit/*.go GOPATH=`pwd` \\ CGO_CPPFLAGS='-DNXT_CONFIGURE \\ -I`pwd`/src' \\ ${NXT_GO} build -o $NXT_BUILD_DIR/nxt_go_gen.a \\ - --buildmode=c-archive nginext + --buildmode=c-archive go/unit ${NXT_GO}-install: ${NXT_GO} - install -d \$(GOPATH)/src/nginext - install -p ./src/nginext/*.c ./src/nginext/*.h \\ - ./src/nginext/*.go \$(GOPATH)/src/nginext/ + install -d \$(GOPATH)/src/unit + install -p ./src/go/unit/* \$(GOPATH)/src/unit/ CGO_CFLAGS="-I\$(NXT_ROOT)/$NXT_BUILD_DIR -I\$(NXT_ROOT)/src" \\ CGO_LDFLAGS="-L\$(NXT_ROOT)/$NXT_BUILD_DIR ${NXT_LIBRT}" \\ GOPATH=$NXT_GO_PATH \\ - ${NXT_GO} install -v nginext + ${NXT_GO} install -v unit ${NXT_GO}-uninstall: - rm -rf \$(GOPATH)/src/nginext - rm -f \$(GOPATH)/pkg/\$(GOOS)_\$(GOARCH)/nginext.a + rm -rf \$(GOPATH)/src/unit + rm -f \$(GOPATH)/pkg/\$(GOOS)_\$(GOARCH)/unit.a END diff --git a/auto/modules/php b/auto/modules/php index bb9fa848..a941b658 100644 --- a/auto/modules/php +++ b/auto/modules/php @@ -118,7 +118,7 @@ if grep ^$NXT_PHP_MODULE: $NXT_MAKEFILE 2>&1 > /dev/null; then exit 1; fi -$echo " + PHP module: nginext.${NXT_PHP_MODULE}" +$echo " + PHP module: unit.${NXT_PHP_MODULE}" $echo >> $NXT_MAKEFILE @@ -152,10 +152,10 @@ cat << END >> $NXT_MAKEFILE all: ${NXT_PHP_MODULE} -${NXT_PHP_MODULE}: $NXT_BUILD_DIR/nginext.${NXT_PHP_MODULE} +${NXT_PHP_MODULE}: $NXT_BUILD_DIR/unit.${NXT_PHP_MODULE} -$NXT_BUILD_DIR/nginext.${NXT_PHP_MODULE}: $nxt_objs - $NXT_MODULE_LINK -o $NXT_BUILD_DIR/nginext.${NXT_PHP_MODULE} \\ +$NXT_BUILD_DIR/unit.${NXT_PHP_MODULE}: $nxt_objs + $NXT_MODULE_LINK -o $NXT_BUILD_DIR/unit.${NXT_PHP_MODULE} \\ $nxt_objs ${NXT_PHP_LIB} ${NXT_PHP_LDFLAGS} END diff --git a/auto/modules/python b/auto/modules/python index 91d0ed79..1034be82 100644 --- a/auto/modules/python +++ b/auto/modules/python @@ -115,7 +115,7 @@ if grep ^$NXT_PYTHON_MODULE: $NXT_MAKEFILE 2>&1 > /dev/null; then exit 1; fi -$echo " + Python module: nginext.${NXT_PYTHON_MODULE}" +$echo " + Python module: unit.${NXT_PYTHON_MODULE}" $echo >> $NXT_MAKEFILE @@ -149,10 +149,10 @@ cat << END >> $NXT_MAKEFILE all: ${NXT_PYTHON_MODULE} -${NXT_PYTHON_MODULE}: $NXT_BUILD_DIR/nginext.${NXT_PYTHON_MODULE} +${NXT_PYTHON_MODULE}: $NXT_BUILD_DIR/unit.${NXT_PYTHON_MODULE} -$NXT_BUILD_DIR/nginext.${NXT_PYTHON_MODULE}: $nxt_objs - $NXT_MODULE_LINK -o $NXT_BUILD_DIR/nginext.${NXT_PYTHON_MODULE} \\ +$NXT_BUILD_DIR/unit.${NXT_PYTHON_MODULE}: $nxt_objs + $NXT_MODULE_LINK -o $NXT_BUILD_DIR/unit.${NXT_PYTHON_MODULE} \\ $nxt_objs $NXT_PYTHON_LIBS END diff --git a/auto/shmem b/auto/shmem index 8fd5c3f4..86962ec7 100644 --- a/auto/shmem +++ b/auto/shmem @@ -22,7 +22,7 @@ nxt_feature_test="#include #include int main() { - static char name[] = \"/nginext.configure\"; + static char name[] = \"/unit.configure\"; shm_unlink(name); @@ -49,7 +49,7 @@ nxt_feature_test="#include #include int main() { - static char name[] = \"/nginext.configure\"; + static char name[] = \"/unit.configure\"; int fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC); if (fd == -1) diff --git a/auto/summary b/auto/summary index cb50ae31..93324869 100644 --- a/auto/summary +++ b/auto/summary @@ -7,11 +7,11 @@ cat << END Configuration summary: - nginext pid file: "$NXT_PID" - nginext log file: "$NXT_LOG" - nginext modules path: "$NXT_MODULES" + unit pid file: "$NXT_PID" + unit log file: "$NXT_LOG" + unit modules path: "$NXT_MODULES" - nginext control API socket: "$NXT_CONTROL" + unit control API socket: "$NXT_CONTROL" non-privileged user: "$NXT_USER" non-privileged group: "$NXT_GROUP" diff --git a/configure b/configure index b845df54..4a78e944 100755 --- a/configure +++ b/configure @@ -27,13 +27,13 @@ NXT_AUTOCONF_ERR=$NXT_BUILD_DIR/autoconf.err NXT_AUTOCONF_DATA=$NXT_BUILD_DIR/autoconf.data NXT_AUTO_CONFIG_H=$NXT_BUILD_DIR/nxt_auto_config.h NXT_MAKEFILE=$NXT_BUILD_DIR/Makefile -NXT_BIN=nginext +NXT_BIN=unitd CC=${CC:-cc} -NXT_PID="nginext.pid" -NXT_LOG="nginext.log" +NXT_PID="unit.pid" +NXT_LOG="unit.log" NXT_MODULES="modules/" NXT_CONTROL="unix:control.unit.sock" NXT_USER="nobody" diff --git a/src/go/unit/cbytes-1.6.go b/src/go/unit/cbytes-1.6.go new file mode 100644 index 00000000..f756b1de --- /dev/null +++ b/src/go/unit/cbytes-1.6.go @@ -0,0 +1,15 @@ +// +build !go1.7 + +/* + * Copyright (C) Max Romanov + * Copyright (C) NGINX, Inc. + */ + +package unit + +import "C" +import "unsafe" + +func getCBytes(p []byte) unsafe.Pointer { + return unsafe.Pointer(C.CString(string(p))) // go <= 1.6 +} diff --git a/src/go/unit/cbytes-1.7.go b/src/go/unit/cbytes-1.7.go new file mode 100644 index 00000000..e0de283d --- /dev/null +++ b/src/go/unit/cbytes-1.7.go @@ -0,0 +1,15 @@ +// +build go1.7 + +/* + * Copyright (C) Max Romanov + * Copyright (C) NGINX, Inc. + */ + +package unit + +import "C" +import "unsafe" + +func getCBytes(p []byte) unsafe.Pointer { + return C.CBytes(p) // go >= 1.7 +} diff --git a/src/go/unit/nxt_go_array.c b/src/go/unit/nxt_go_array.c new file mode 100644 index 00000000..f409cb10 --- /dev/null +++ b/src/go/unit/nxt_go_array.c @@ -0,0 +1,66 @@ + +/* + * Copyright (C) Max Romanov + * Copyright (C) NGINX, Inc. + */ + +#ifndef NXT_CONFIGURE + +#include +#include + +#include + +#include "nxt_go_array.h" + +void +nxt_go_array_init(nxt_array_t *array, nxt_uint_t n, size_t size) +{ + array->elts = malloc(n * size); + + if (nxt_slow_path(n != 0 && array->elts == NULL)) { + return; + } + + array->nelts = 0; + array->size = size; + array->nalloc = n; + array->mem_pool = NULL; +} + +void * +nxt_go_array_add(nxt_array_t *array) +{ + void *p; + uint32_t nalloc, new_alloc; + + nalloc = array->nalloc; + + if (array->nelts == nalloc) { + + if (nalloc < 16) { + /* Allocate new array twice larger than current. */ + new_alloc = nalloc * 2; + + } else { + /* Allocate new array 1.5 times larger than current. */ + new_alloc = nalloc + nalloc / 2; + } + + p = realloc(array->elts, array->size * new_alloc); + + if (nxt_slow_path(p == NULL)) { + return NULL; + } + + array->elts = p; + array->nalloc = new_alloc; + } + + p = nxt_pointer_to(array->elts, array->size * array->nelts); + array->nelts++; + + return p; +} + +#endif /* NXT_CONFIGURE */ diff --git a/src/go/unit/nxt_go_array.h b/src/go/unit/nxt_go_array.h new file mode 100644 index 00000000..d96db663 --- /dev/null +++ b/src/go/unit/nxt_go_array.h @@ -0,0 +1,36 @@ + +/* + * Copyright (C) Max Romanov + * Copyright (C) NGINX, Inc. + */ + +#ifndef _NXT_GO_ARRAY_H_INCLUDED_ +#define _NXT_GO_ARRAY_H_INCLUDED_ + + +#include + +void nxt_go_array_init(nxt_array_t *array, nxt_uint_t n, size_t size); + +void *nxt_go_array_add(nxt_array_t *array); + +nxt_inline void * +nxt_go_array_zero_add(nxt_array_t *array) +{ + void *p; + + p = nxt_go_array_add(array); + + if (nxt_fast_path(p != NULL)) { + nxt_memzero(p, array->size); + } + + return p; +} + +#define \ +nxt_go_array_at(array, n) \ + nxt_pointer_to((array)->elts, (array)->size * (n)) + + +#endif /* _NXT_GO_ARRAY_H_INCLUDED_ */ diff --git a/src/go/unit/nxt_go_lib.c b/src/go/unit/nxt_go_lib.c new file mode 100644 index 00000000..474d313b --- /dev/null +++ b/src/go/unit/nxt_go_lib.c @@ -0,0 +1,193 @@ + +/* + * Copyright (C) Max Romanov + * Copyright (C) NGINX, Inc. + */ + +#ifdef NXT_CONFIGURE + +#include +#include "nxt_go_lib.h" + +// Stubs to compile during configure process. +int +nxt_go_response_write(nxt_go_request_t r, void *buf, size_t len) +{ + return -1; +} + +int +nxt_go_request_read(nxt_go_request_t r, void *dst, size_t dst_len) +{ + return -1; +} + +int +nxt_go_request_read_from(nxt_go_request_t r, void *dst, size_t dst_len, + void *src, size_t src_len) +{ + return -1; +} + +int +nxt_go_request_close(nxt_go_request_t r) +{ + return -1; +} + +int +nxt_go_request_done(nxt_go_request_t r) +{ + return -1; +} + +void +nxt_go_ready() +{ +} + +nxt_go_request_t +nxt_go_process_port_msg(void *buf, size_t buf_len, void *oob, size_t oob_len) +{ + return 0; +} + +#else + +#include "nxt_go_run_ctx.h" +#include "nxt_go_log.h" +#include "nxt_go_port.h" + +#include +#include + +int +nxt_go_response_write(nxt_go_request_t r, void *buf, size_t len) +{ + nxt_int_t rc; + nxt_go_run_ctx_t *ctx; + + if (nxt_slow_path(r == 0)) { + return 0; + } + + nxt_go_debug("write: %d", (int) len); + + ctx = (nxt_go_run_ctx_t *) r; + rc = nxt_go_ctx_write(ctx, buf, len); + + return rc == NXT_OK ? len : -1; +} + + +int +nxt_go_request_read(nxt_go_request_t r, void *dst, size_t dst_len) +{ + size_t res; + nxt_go_run_ctx_t *ctx; + + if (nxt_slow_path(r == 0)) { + return 0; + } + + ctx = (nxt_go_run_ctx_t *) r; + + dst_len = nxt_min(dst_len, ctx->r.body.preread_size); + + res = nxt_go_ctx_read_raw(ctx, dst, dst_len); + + ctx->r.body.preread_size -= res; + + return res; +} + + +int +nxt_go_request_read_from(nxt_go_request_t r, void *dst, size_t dst_len, + void *src, size_t src_len) +{ + nxt_go_run_ctx_t *ctx; + + if (nxt_slow_path(r == 0)) { + return 0; + } + + ctx = (nxt_go_run_ctx_t *) r; + + nxt_go_ctx_add_msg(ctx, src, src_len); + + return nxt_go_request_read(r, dst, dst_len); +} + + +int +nxt_go_request_close(nxt_go_request_t r) +{ + return 0; +} + + +int +nxt_go_request_done(nxt_go_request_t r) +{ + nxt_int_t res; + nxt_go_run_ctx_t *ctx; + nxt_go_msg_t *msg, *b; + + if (nxt_slow_path(r == 0)) { + return 0; + } + + ctx = (nxt_go_run_ctx_t *) r; + + res = nxt_go_ctx_flush(ctx, 1); + + nxt_go_ctx_release_msg(ctx, &ctx->msg); + + msg = ctx->msg.next; + while (msg != NULL) { + nxt_go_ctx_release_msg(ctx, msg); + + b = msg; + msg = b->next; + + free(b); + } + + free(ctx); + + return res; +} + + +void +nxt_go_ready() +{ + char *go_stream; + nxt_port_msg_t port_msg; + + go_stream = getenv("NXT_GO_STREAM"); + + if (go_stream == NULL) { + return; + } + + port_msg.stream = atol(go_stream); + port_msg.pid = getpid(); + port_msg.reply_port = 0; + port_msg.type = _NXT_PORT_MSG_READY; + port_msg.last = 1; + port_msg.mmap = 0; + + nxt_go_main_send(&port_msg, sizeof(port_msg), NULL, 0); +} + + +nxt_go_request_t +nxt_go_process_port_msg(void *buf, size_t buf_len, void *oob, size_t oob_len) +{ + return nxt_go_port_on_read(buf, buf_len, oob, oob_len); +} + + +#endif /* NXT_CONFIGURE */ diff --git a/src/go/unit/nxt_go_lib.h b/src/go/unit/nxt_go_lib.h new file mode 100644 index 00000000..b3a86be9 --- /dev/null +++ b/src/go/unit/nxt_go_lib.h @@ -0,0 +1,39 @@ + +/* + * Copyright (C) Max Romanov + * Copyright (C) NGINX, Inc. + */ + +#ifndef _NXT_GO_LIB_H_INCLUDED_ +#define _NXT_GO_LIB_H_INCLUDED_ + + +#include +#include +#include + +typedef struct { + int length; + char *start; +} nxt_go_str_t; + +typedef uintptr_t nxt_go_request_t; + +int nxt_go_response_write(nxt_go_request_t r, void *buf, size_t len); + +int nxt_go_request_read(nxt_go_request_t r, void *dst, size_t dst_len); + +int nxt_go_request_read_from(nxt_go_request_t r, void *dst, size_t dst_len, + void *src, size_t src_len); + +int nxt_go_request_close(nxt_go_request_t r); + +int nxt_go_request_done(nxt_go_request_t r); + +void nxt_go_ready(); + +nxt_go_request_t nxt_go_process_port_msg(void *buf, size_t buf_len, + void *oob, size_t oob_len); + + +#endif /* _NXT_GO_LIB_H_INCLUDED_ */ diff --git a/src/go/unit/nxt_go_log.h b/src/go/unit/nxt_go_log.h new file mode 100644 index 00000000..d596cfb3 --- /dev/null +++ b/src/go/unit/nxt_go_log.h @@ -0,0 +1,34 @@ + +/* + * Copyright (C) Max Romanov + * Copyright (C) NGINX, Inc. + */ + +#ifndef _NXT_GO_LOG_H_INCLUDED_ +#define _NXT_GO_LOG_H_INCLUDED_ + + +#include +#include + +#include + +#if (NXT_DEBUG) + +#define nxt_go_debug(fmt, ARGS...) \ + fprintf(stderr, "[go debug] " fmt "\n", ##ARGS) + +#else + +#define nxt_go_debug(fmt, ARGS...) + +#endif + +#define nxt_go_warn(fmt, ARGS...) \ + fprintf(stderr, "[go warn] " fmt "\n", ##ARGS) + +#define nxt_go_error(fmt, ARGS...) \ + fprintf(stderr, "[go error] " fmt "\n", ##ARGS) + + +#endif /* _NXT_GO_LOG_H_INCLUDED_ */ diff --git a/src/go/unit/nxt_go_mutex.h b/src/go/unit/nxt_go_mutex.h new file mode 100644 index 00000000..98bd27f0 --- /dev/null +++ b/src/go/unit/nxt_go_mutex.h @@ -0,0 +1,21 @@ + +/* + * Copyright (C) Max Romanov + * Copyright (C) NGINX, Inc. + */ + +#ifndef _NXT_GO_MUTEX_H_INCLUDED_ +#define _NXT_GO_MUTEX_H_INCLUDED_ + + +#include + +typedef pthread_mutex_t nxt_go_mutex_t; + +#define nxt_go_mutex_create(mutex) pthread_mutex_init(mutex, NULL) +#define nxt_go_mutex_destroy(mutex) pthread_mutex_destroy(mutex) +#define nxt_go_mutex_lock(mutex) pthread_mutex_lock(mutex) +#define nxt_go_mutex_unlock(mutex) pthread_mutex_unlock(mutex) + + +#endif /* _NXT_GO_MUTEX_H_INCLUDED_ */ diff --git a/src/go/unit/nxt_go_port.c b/src/go/unit/nxt_go_port.c new file mode 100644 index 00000000..a46a33d1 --- /dev/null +++ b/src/go/unit/nxt_go_port.c @@ -0,0 +1,210 @@ + +/* + * Copyright (C) Max Romanov + * Copyright (C) NGINX, Inc. + */ + +#ifndef NXT_CONFIGURE + + +#include "nxt_go_port.h" +#include "nxt_go_log.h" +#include "nxt_go_process.h" +#include "nxt_go_run_ctx.h" + +#include +#include + + +#define nxt_go_str(p) ((nxt_go_str_t *)(p)) + +static nxt_go_request_t +nxt_go_data_handler(nxt_port_msg_t *port_msg, size_t size) +{ + size_t s; + nxt_str_t n, v; + nxt_int_t rc; + nxt_uint_t i; + nxt_go_run_ctx_t *ctx; + nxt_go_request_t r; + nxt_app_request_header_t *h; + + r = nxt_go_find_request(port_msg->stream); + if (r != 0) { + return r; + } + + ctx = malloc(sizeof(nxt_go_run_ctx_t)); + nxt_go_ctx_init(ctx, port_msg, size - sizeof(nxt_port_msg_t)); + + r = (nxt_go_request_t)(ctx); + h = &ctx->r.header; + + nxt_go_ctx_read_str(ctx, &h->method); + nxt_go_ctx_read_str(ctx, &h->target); + nxt_go_ctx_read_str(ctx, &h->path); + + nxt_go_ctx_read_size(ctx, &s); + if (s > 0) { + s--; + h->query.start = h->target.start + s; + h->query.length = h->target.length - s; + + if (h->path.start == NULL) { + h->path.start = h->target.start; + h->path.length = s - 1; + } + } + + if (h->path.start == NULL) { + h->path = h->target; + } + + nxt_go_new_request(r, port_msg->stream, nxt_go_str(&h->method), + nxt_go_str(&h->target)); + + nxt_go_ctx_read_str(ctx, &h->version); + + nxt_go_request_set_proto(r, nxt_go_str(&h->version), + h->version.start[5] - '0', + h->version.start[7] - '0'); + + nxt_go_ctx_read_str(ctx, &ctx->r.remote); + if (ctx->r.remote.start != NULL) { + nxt_go_request_set_remote_addr(r, nxt_go_str(&ctx->r.remote)); + } + + nxt_go_ctx_read_str(ctx, &h->host); + nxt_go_ctx_read_str(ctx, &h->cookie); + nxt_go_ctx_read_str(ctx, &h->content_type); + nxt_go_ctx_read_str(ctx, &h->content_length); + + if (h->host.start != NULL) { + nxt_go_request_set_host(r, nxt_go_str(&h->host)); + } + + nxt_go_ctx_read_size(ctx, &s); + h->parsed_content_length = s; + + do { + rc = nxt_go_ctx_read_str(ctx, &n); + + if (n.length == 0) { + break; + } + + rc = nxt_go_ctx_read_str(ctx, &v); + nxt_go_request_add_header(r, nxt_go_str(&n), nxt_go_str(&v)); + } while(1); + + nxt_go_ctx_read_size(ctx, &s); + ctx->r.body.preread_size = s; + + if (h->parsed_content_length > 0) { + nxt_go_request_set_content_length(r, h->parsed_content_length); + } + + if (ctx->r.body.preread_size < h->parsed_content_length) { + nxt_go_request_create_channel(r); + } + + return r; +} + +nxt_go_request_t +nxt_go_port_on_read(void *buf, size_t buf_size, void *oob, size_t oob_size) +{ + void *buf_end; + void *payload; + size_t payload_size; + nxt_fd_t fd; + struct cmsghdr *cm; + nxt_port_msg_t *port_msg; + nxt_port_msg_new_port_t *new_port_msg; + + fd = -1; + nxt_go_debug("on read: %d (%d)", (int)buf_size, (int)oob_size); + + cm = oob; + if (oob_size >= CMSG_SPACE(sizeof(int)) + && cm->cmsg_len == CMSG_LEN(sizeof(int)) + && cm->cmsg_level == SOL_SOCKET + && cm->cmsg_type == SCM_RIGHTS) { + + nxt_memcpy(&fd, CMSG_DATA(cm), sizeof(int)); + nxt_go_debug("fd = %d", fd); + } + + port_msg = buf; + if (buf_size < sizeof(nxt_port_msg_t)) { + nxt_go_warn("message too small (%d bytes)", (int)buf_size); + goto fail; + } + + buf_end = ((char *)buf) + buf_size; + + payload = port_msg + 1; + payload_size = buf_size - sizeof(nxt_port_msg_t); + + if (port_msg->mmap) { + nxt_go_debug("using data in shared memory"); + } + + if (port_msg->type >= NXT_PORT_MSG_MAX) { + nxt_go_warn("unknown message type (%d)", (int)port_msg->type); + goto fail; + } + + switch (port_msg->type) { + case _NXT_PORT_MSG_QUIT: + nxt_go_debug("quit"); + + nxt_go_set_quit(); + break; + + case _NXT_PORT_MSG_NEW_PORT: + nxt_go_debug("new port"); + new_port_msg = payload; + + nxt_go_new_port(new_port_msg->pid, new_port_msg->id, new_port_msg->type, + -1, fd); + break; + + case _NXT_PORT_MSG_CHANGE_FILE: + nxt_go_debug("change file"); + break; + + case _NXT_PORT_MSG_MMAP: + nxt_go_debug("mmap"); + + nxt_go_new_incoming_mmap(port_msg->pid, fd); + break; + + case _NXT_PORT_MSG_DATA: + nxt_go_debug("data"); + + return nxt_go_data_handler(port_msg, buf_size); + + case _NXT_PORT_MSG_REMOVE_PID: + nxt_go_debug("remove pid"); + + /* TODO remove all ports for this pid in Go */ + /* TODO remove incoming & outgoing mmaps for this pid */ + break; + + default: + goto fail; + } + + +fail: + + if (fd != -1) { + close(fd); + } + + return 0; +} + + +#endif /* NXT_CONFIGURE */ diff --git a/src/go/unit/nxt_go_port.h b/src/go/unit/nxt_go_port.h new file mode 100644 index 00000000..ce9dbcc3 --- /dev/null +++ b/src/go/unit/nxt_go_port.h @@ -0,0 +1,18 @@ + +/* + * Copyright (C) Max Romanov + * Copyright (C) NGINX, Inc. + */ + +#ifndef _NXT_GO_PORT_H_INCLUDED_ +#define _NXT_GO_PORT_H_INCLUDED_ + + +#include +#include "nxt_go_lib.h" + +nxt_go_request_t +nxt_go_port_on_read(void *buf, size_t buf_size, void *oob, size_t oob_size); + + +#endif /* _NXT_GO_PORT_H_INCLUDED_ */ diff --git a/src/go/unit/nxt_go_port_memory.c b/src/go/unit/nxt_go_port_memory.c new file mode 100644 index 00000000..e63ca16c --- /dev/null +++ b/src/go/unit/nxt_go_port_memory.c @@ -0,0 +1,192 @@ + +/* + * Copyright (C) Max Romanov + * Copyright (C) NGINX, Inc. + */ + +#ifndef NXT_CONFIGURE + + +#include "nxt_go_port_memory.h" +#include "nxt_go_process.h" +#include "nxt_go_array.h" +#include "nxt_go_log.h" + +#include +#include + +#if (NXT_HAVE_MEMFD_CREATE) + +#include +#include +#include + +#endif + + +static nxt_port_mmap_header_t * +nxt_go_new_port_mmap(nxt_go_process_t *process, nxt_port_id_t id) +{ + int name_len, rc; + void *mem; + char name[64]; + nxt_fd_t fd; + nxt_port_msg_t port_msg; + nxt_port_mmap_t *port_mmap; + nxt_port_mmap_header_t *hdr; + + fd = -1; + + union { + struct cmsghdr cm; + char space[CMSG_SPACE(sizeof(int))]; + } cmsg; + + port_mmap = nxt_go_array_zero_add(&process->outgoing); + if (nxt_slow_path(port_mmap == NULL)) { + nxt_go_warn("failed to add port mmap to outgoing array"); + + return NULL; + } + + name_len = snprintf(name, sizeof(name) - 1, "/unit.go.%p", name); + +#if (NXT_HAVE_MEMFD_CREATE) + fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC); + + if (nxt_slow_path(fd == -1)) { + nxt_go_warn("memfd_create(%s) failed %d", name, errno); + + goto remove_fail; + } + + nxt_go_debug("memfd_create(%s): %d", name, fd); + +#elif (NXT_HAVE_SHM_OPEN) + shm_unlink((char *) name); // just in case + + fd = shm_open((char *) name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR); + + nxt_go_debug("shm_open(%s): %d", name, fd); + + if (nxt_slow_path(fd == -1)) { + nxt_go_warn("shm_open(%s) failed %d", name, errno); + + goto remove_fail; + } + + if (nxt_slow_path(shm_unlink((char *) name) == -1)) { + nxt_go_warn("shm_unlink(%s) failed %d", name, errno); + } +#endif + + if (nxt_slow_path(ftruncate(fd, PORT_MMAP_SIZE) == -1)) { + nxt_go_warn("ftruncate() failed %d", errno); + + goto remove_fail; + } + + mem = mmap(NULL, PORT_MMAP_SIZE, + PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); + + if (nxt_slow_path(mem == MAP_FAILED)) { + goto remove_fail; + } + + port_mmap->hdr = mem; + + /* Init segment header. */ + hdr = port_mmap->hdr; + + memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map)); + + hdr->id = process->outgoing.nelts - 1; + hdr->pid = process->pid; + + /* Mark first chunk as busy */ + nxt_port_mmap_set_chunk_busy(hdr, 0); + + /* Mark as busy chunk followed the last available chunk. */ + nxt_port_mmap_set_chunk_busy(hdr, PORT_MMAP_CHUNK_COUNT); + + port_msg.stream = 0; + port_msg.pid = getpid(); + port_msg.reply_port = 0; + port_msg.type = _NXT_PORT_MSG_MMAP; + port_msg.last = 1; + port_msg.mmap = 0; + + cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int)); + cmsg.cm.cmsg_level = SOL_SOCKET; + cmsg.cm.cmsg_type = SCM_RIGHTS; + + /* + * nxt_memcpy() is used instead of simple + * *(int *) CMSG_DATA(&cmsg.cm) = fd; + * because GCC 4.4 with -O2/3/s optimization may issue a warning: + * dereferencing type-punned pointer will break strict-aliasing rules + * + * Fortunately, GCC with -O1 compiles this nxt_memcpy() + * in the same simple assignment as in the code above. + */ + memcpy(CMSG_DATA(&cmsg.cm), &fd, sizeof(int)); + + rc = nxt_go_port_send(hdr->pid, id, &port_msg, sizeof(port_msg), + &cmsg, sizeof(cmsg)); + + nxt_go_debug("new mmap #%d created for %d -> %d", + (int)hdr->id, (int)getpid(), (int)process->pid); + + close(fd); + + return hdr; + +remove_fail: + + if (fd != -1) { + close(fd); + } + + process->outgoing.nelts--; + + return NULL; +} + +nxt_port_mmap_header_t * +nxt_go_port_mmap_get(nxt_go_process_t *process, nxt_port_id_t port_id, + nxt_chunk_id_t *c) +{ + nxt_port_mmap_t *port_mmap; + nxt_port_mmap_t *end_port_mmap; + nxt_port_mmap_header_t *hdr; + + port_mmap = NULL; + hdr = NULL; + + nxt_go_mutex_lock(&process->outgoing_mutex); + + port_mmap = process->outgoing.elts; + end_port_mmap = port_mmap + process->outgoing.nelts; + + while (port_mmap < end_port_mmap) { + + if (nxt_port_mmap_get_free_chunk(port_mmap->hdr, c)) { + hdr = port_mmap->hdr; + + goto unlock_return; + } + + port_mmap++; + } + + hdr = nxt_go_new_port_mmap(process, port_id); + +unlock_return: + + nxt_go_mutex_unlock(&process->outgoing_mutex); + + return hdr; +} + + +#endif /* NXT_CONFIGURE */ diff --git a/src/go/unit/nxt_go_port_memory.h b/src/go/unit/nxt_go_port_memory.h new file mode 100644 index 00000000..f6fe94e3 --- /dev/null +++ b/src/go/unit/nxt_go_port_memory.h @@ -0,0 +1,24 @@ + +/* + * Copyright (C) Max Romanov + * Copyright (C) NGINX, Inc. + */ + +#ifndef _NXT_GO_PORT_MEMORY_H_INCLUDED_ +#define _NXT_GO_PORT_MEMORY_H_INCLUDED_ + + +#include +#include + +#ifndef _NXT_GO_PROCESS_T_DEFINED_ +#define _NXT_GO_PROCESS_T_DEFINED_ +typedef struct nxt_go_process_s nxt_go_process_t; +#endif + +struct nxt_port_mmap_header_s * +nxt_go_port_mmap_get(nxt_go_process_t *process, nxt_port_id_t port_id, + nxt_chunk_id_t *c); + + +#endif /* _NXT_GO_PORT_MEMORY_H_INCLUDED_ */ diff --git a/src/go/unit/nxt_go_process.c b/src/go/unit/nxt_go_process.c new file mode 100644 index 00000000..c7ce052d --- /dev/null +++ b/src/go/unit/nxt_go_process.c @@ -0,0 +1,150 @@ + +/* + * Copyright (C) Max Romanov + * Copyright (C) NGINX, Inc. + */ + +#ifndef NXT_CONFIGURE + + +#include "nxt_go_process.h" +#include "nxt_go_array.h" +#include "nxt_go_mutex.h" +#include "nxt_go_log.h" + +#include + + +static nxt_array_t processes; /* of nxt_go_process_t */ + +static nxt_go_process_t * +nxt_go_find_process(nxt_pid_t pid, uint32_t *pos) +{ + uint32_t l, r, i; + nxt_go_process_t *process; + + if (nxt_slow_path(processes.size == 0)) { + nxt_go_array_init(&processes, 1, sizeof(nxt_go_process_t)); + } + + l = 0; + r = processes.nelts; + i = (l + r) / 2; + + while (r > l) { + process = nxt_go_array_at(&processes, i); + + nxt_go_debug("compare process #%d (%p) at %d", + (int)process->pid, process, (int)i); + + if (pid == process->pid) { + nxt_go_debug("found process %d at %d", (int)pid, (int)i); + + if (pos != NULL) { + *pos = i; + } + + return process; + } + + if (pid < process->pid) { + r = i; + } else { + l = i + 1; + } + + i = (l + r) / 2; + } + + if (pos != NULL) { + *pos = i; + } + + nxt_go_debug("process %d not found, best pos %d", (int)pid, (int)i); + + return NULL; +} + + +nxt_go_process_t * +nxt_go_get_process(nxt_pid_t pid) +{ + uint32_t pos; + nxt_go_process_t *process; + + process = nxt_go_find_process(pid, &pos); + + if (process == NULL) { + nxt_go_array_add(&processes); + process = nxt_go_array_at(&processes, pos); + + nxt_go_debug("init process #%d (%p) at %d", (int)pid, process, + (int)pos); + + if (pos < processes.nelts - 1) { + memmove(process + 1, process, + processes.size * (processes.nelts - 1 - pos)); + } + + process->pid = pid; + nxt_go_mutex_create(&process->incoming_mutex); + nxt_go_array_init(&process->incoming, 1, sizeof(nxt_port_mmap_t)); + nxt_go_mutex_create(&process->outgoing_mutex); + nxt_go_array_init(&process->outgoing, 1, sizeof(nxt_port_mmap_t)); + } + + return process; +} + + +void +nxt_go_new_incoming_mmap(nxt_pid_t pid, nxt_fd_t fd) +{ + void *mem; + struct stat mmap_stat; + nxt_port_mmap_t *port_mmap; + nxt_go_process_t *process; + + process = nxt_go_get_process(pid); + + nxt_go_debug("got new mmap fd #%d from process %d", + (int)fd, (int)pid); + + if (fstat(fd, &mmap_stat) == -1) { + nxt_go_warn("fstat(%d) failed %d", (int)fd, errno); + + return; + } + + nxt_go_mutex_lock(&process->incoming_mutex); + + port_mmap = nxt_go_array_zero_add(&process->incoming); + if (nxt_slow_path(port_mmap == NULL)) { + nxt_go_warn("failed to add mmap to incoming array"); + + goto fail; + } + + mem = mmap(NULL, mmap_stat.st_size, + PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); + + if (nxt_slow_path(mem == MAP_FAILED)) { + nxt_go_warn("mmap() failed %d", errno); + + goto fail; + } + + port_mmap->hdr = mem; + + if (nxt_slow_path(port_mmap->hdr->id != process->incoming.nelts - 1)) { + nxt_go_warn("port mmap id mismatch (%d != %d)", + port_mmap->hdr->id, process->incoming.nelts - 1); + } + +fail: + + nxt_go_mutex_unlock(&process->incoming_mutex); +} + + +#endif /* NXT_CONFIGURE */ diff --git a/src/go/unit/nxt_go_process.h b/src/go/unit/nxt_go_process.h new file mode 100644 index 00000000..037844ce --- /dev/null +++ b/src/go/unit/nxt_go_process.h @@ -0,0 +1,33 @@ + +/* + * Copyright (C) Max Romanov + * Copyright (C) NGINX, Inc. + */ + +#ifndef _NXT_GO_PROCESS_H_INCLUDED_ +#define _NXT_GO_PROCESS_H_INCLUDED_ + + +#include +#include "nxt_go_mutex.h" + +#ifndef _NXT_GO_PROCESS_T_DEFINED_ +#define _NXT_GO_PROCESS_T_DEFINED_ +typedef struct nxt_go_process_s nxt_go_process_t; +#endif + +struct nxt_go_process_s { + nxt_pid_t pid; + nxt_go_mutex_t incoming_mutex; + nxt_array_t incoming; /* of nxt_port_mmap_t */ + nxt_go_mutex_t outgoing_mutex; + nxt_array_t outgoing; /* of nxt_port_mmap_t */ +}; + +nxt_go_process_t *nxt_go_get_process(nxt_pid_t pid); + +void nxt_go_new_incoming_mmap(nxt_pid_t pid, nxt_fd_t fd); + + +#endif /* _NXT_GO_PROCESS_H_INCLUDED_ */ + diff --git a/src/go/unit/nxt_go_run_ctx.c b/src/go/unit/nxt_go_run_ctx.c new file mode 100644 index 00000000..cca8273e --- /dev/null +++ b/src/go/unit/nxt_go_run_ctx.c @@ -0,0 +1,531 @@ + +/* + * Copyright (C) Max Romanov + * Copyright (C) NGINX, Inc. + */ + +#ifndef NXT_CONFIGURE + + +#include "nxt_go_run_ctx.h" +#include "nxt_go_log.h" +#include "nxt_go_process.h" +#include "nxt_go_array.h" +#include "nxt_go_mutex.h" +#include "nxt_go_port_memory.h" + +#include +#include +#include + + +static nxt_int_t +nxt_go_ctx_msg_rbuf(nxt_go_run_ctx_t *ctx, nxt_go_msg_t *msg, nxt_buf_t *buf, + uint32_t n) +{ + size_t nchunks; + nxt_port_mmap_t *port_mmap; + nxt_port_mmap_msg_t *mmap_msg; + + if (nxt_slow_path(msg->mmap_msg == NULL)) { + if (n > 0) { + nxt_go_warn("failed to get plain buf #%d", (int)n); + + return NXT_ERROR; + } + + buf->mem.start = (u_char *) (msg->port_msg + 1); + buf->mem.pos = buf->mem.start; + buf->mem.end = buf->mem.start + msg->raw_size; + buf->mem.free = buf->mem.end; + + return NXT_OK; + } + + mmap_msg = msg->mmap_msg + n; + if (nxt_slow_path(mmap_msg >= msg->end)) { + nxt_go_warn("no more data in shm #%d", (int)n); + + return NXT_ERROR; + } + + if (nxt_slow_path(mmap_msg->mmap_id >= ctx->process->incoming.nelts)) { + nxt_go_warn("incoming shared memory segment #%d not found " + "for process %d", (int)mmap_msg->mmap_id, + (int)msg->port_msg->pid); + + return NXT_ERROR; + } + + nxt_go_mutex_lock(&ctx->process->incoming_mutex); + + port_mmap = nxt_go_array_at(&ctx->process->incoming, mmap_msg->mmap_id); + buf->mem.start = nxt_port_mmap_chunk_start(port_mmap->hdr, + mmap_msg->chunk_id); + buf->mem.pos = buf->mem.start; + buf->mem.free = buf->mem.start + mmap_msg->size; + + nxt_go_mutex_unlock(&ctx->process->incoming_mutex); + + nchunks = mmap_msg->size / PORT_MMAP_CHUNK_SIZE; + if ((mmap_msg->size % PORT_MMAP_CHUNK_SIZE) != 0) { + nchunks++; + } + + buf->mem.end = buf->mem.start + nchunks * PORT_MMAP_CHUNK_SIZE; + + return NXT_OK; +} + +static nxt_int_t +nxt_go_ctx_init_rbuf(nxt_go_run_ctx_t *ctx) +{ + return nxt_go_ctx_msg_rbuf(ctx, &ctx->msg, &ctx->rbuf, ctx->nrbuf); +} + +static void +nxt_go_ctx_init_msg(nxt_go_msg_t *msg, nxt_port_msg_t *port_msg, + size_t payload_size) +{ + nxt_port_mmap_msg_t *mmap_msg; + + memset(msg, 0, sizeof(nxt_go_msg_t)); + + msg->port_msg = port_msg; + msg->raw_size = payload_size; + + if (nxt_fast_path(port_msg->mmap != 0)) { + msg->mmap_msg = (nxt_port_mmap_msg_t *) (port_msg + 1); + msg->end = nxt_pointer_to(msg->mmap_msg, payload_size); + + mmap_msg = msg->mmap_msg; + while(mmap_msg < msg->end) { + msg->data_size += mmap_msg->size; + mmap_msg += 1; + } + } else { + msg->mmap_msg = NULL; + msg->end = NULL; + msg->data_size = payload_size; + } +} + +void +nxt_go_ctx_release_msg(nxt_go_run_ctx_t *ctx, nxt_go_msg_t *msg) +{ + u_char *b, *e; + nxt_chunk_id_t c; + nxt_port_mmap_t *port_mmap; + nxt_port_mmap_msg_t *mmap_msg, *end; + + if (nxt_slow_path(msg->mmap_msg == NULL)) { + return; + } + + mmap_msg = msg->mmap_msg; + end = msg->end; + + nxt_go_mutex_lock(&ctx->process->incoming_mutex); + + for (; mmap_msg < end; mmap_msg++ ) { + port_mmap = nxt_go_array_at(&ctx->process->incoming, mmap_msg->mmap_id); + + c = mmap_msg->chunk_id; + b = nxt_port_mmap_chunk_start(port_mmap->hdr, c); + e = b + mmap_msg->size; + + while (b < e) { + nxt_port_mmap_set_chunk_free(port_mmap->hdr, c); + + b += PORT_MMAP_CHUNK_SIZE; + c++; + } + } + + nxt_go_mutex_unlock(&ctx->process->incoming_mutex); +} + + +nxt_int_t +nxt_go_ctx_init(nxt_go_run_ctx_t *ctx, nxt_port_msg_t *port_msg, + size_t payload_size) +{ + memset(ctx, 0, sizeof(nxt_go_run_ctx_t)); + + ctx->process = nxt_go_get_process(port_msg->pid); + if (nxt_slow_path(ctx->process == NULL)) { + nxt_go_warn("failed to get process %d", port_msg->pid); + + return NXT_ERROR; + } + + nxt_go_ctx_init_msg(&ctx->msg, port_msg, payload_size); + + ctx->msg_last = &ctx->msg; + + ctx->wport_msg.stream = port_msg->stream; + ctx->wport_msg.pid = getpid(); + ctx->wport_msg.type = _NXT_PORT_MSG_DATA; + ctx->wport_msg.mmap = 1; + + ctx->wmmap_msg = (nxt_port_mmap_msg_t *) ( &ctx->wport_msg + 1 ); + + return nxt_go_ctx_init_rbuf(ctx); +} + + +void +nxt_go_ctx_add_msg(nxt_go_run_ctx_t *ctx, nxt_port_msg_t *port_msg, size_t size) +{ + nxt_go_msg_t *msg; + + msg = malloc(sizeof(nxt_go_msg_t)); + + nxt_go_ctx_init_msg(msg, port_msg, size - sizeof(nxt_port_msg_t)); + + msg->start_offset = ctx->msg_last->start_offset; + + if (ctx->msg_last == &ctx->msg) { + msg->start_offset += ctx->r.body.preread_size; + } else { + msg->start_offset += ctx->msg_last->data_size; + } + + ctx->msg_last->next = msg; + ctx->msg_last = msg; +} + + +nxt_int_t +nxt_go_ctx_flush(nxt_go_run_ctx_t *ctx, int last) +{ + int i; + nxt_int_t rc; + + if (last != 0) { + ctx->wport_msg.last = 1; + } + + nxt_go_debug("flush buffers (%d)", last); + + for (i = 0; i < ctx->nwbuf; i++) { + nxt_port_mmap_msg_t *m = ctx->wmmap_msg + i; + + nxt_go_debug(" mmap_msg[%d]={%d, %d, %d}", i, + m->mmap_id, m->chunk_id, m->size); + } + + rc = nxt_go_port_send(ctx->msg.port_msg->pid, ctx->msg.port_msg->reply_port, + &ctx->wport_msg, sizeof(nxt_port_msg_t) + + ctx->nwbuf * sizeof(nxt_port_mmap_msg_t), NULL, 0); + + ctx->nwbuf = 0; + + memset(&ctx->wbuf, 0, sizeof(ctx->wbuf)); + + return rc; +} + + +nxt_buf_t * +nxt_go_port_mmap_get_buf(nxt_go_run_ctx_t *ctx, size_t size) +{ + size_t nchunks; + nxt_buf_t *buf; + nxt_chunk_id_t c; + nxt_port_mmap_t *port_mmap; + nxt_port_mmap_msg_t *mmap_msg; + nxt_port_mmap_header_t *hdr; + + c = 0; + + buf = &ctx->wbuf; + + hdr = nxt_go_port_mmap_get(ctx->process, + ctx->msg.port_msg->reply_port, &c); + if (nxt_slow_path(hdr == NULL)) { + nxt_go_warn("failed to get port_mmap"); + + return NULL; + } + + buf->mem.start = nxt_port_mmap_chunk_start(hdr, c); + buf->mem.pos = buf->mem.start; + buf->mem.free = buf->mem.start; + buf->mem.end = buf->mem.start + PORT_MMAP_CHUNK_SIZE; + + buf->parent = hdr; + + mmap_msg = ctx->wmmap_msg + ctx->nwbuf; + mmap_msg->mmap_id = hdr->id; + mmap_msg->chunk_id = c; + mmap_msg->size = 0; + + nchunks = size / PORT_MMAP_CHUNK_SIZE; + if ((size % PORT_MMAP_CHUNK_SIZE) != 0 || nchunks == 0) { + nchunks++; + } + + c++; + nchunks--; + + /* Try to acquire as much chunks as required. */ + while (nchunks > 0) { + + if (nxt_port_mmap_chk_set_chunk_busy(hdr, c) == 0) { + break; + } + + buf->mem.end += PORT_MMAP_CHUNK_SIZE; + c++; + nchunks--; + } + + ctx->nwbuf++; + + return buf; +} + + +nxt_int_t +nxt_go_port_mmap_increase_buf(nxt_buf_t *b, size_t size, size_t min_size) +{ + size_t nchunks, free_size; + nxt_chunk_id_t c, start; + nxt_port_mmap_header_t *hdr; + + free_size = nxt_buf_mem_free_size(&b->mem); + + if (nxt_slow_path(size <= free_size)) { + return NXT_OK; + } + + hdr = b->parent; + + start = nxt_port_mmap_chunk_id(hdr, b->mem.end); + + size -= free_size; + + nchunks = size / PORT_MMAP_CHUNK_SIZE; + if ((size % PORT_MMAP_CHUNK_SIZE) != 0 || nchunks == 0) { + nchunks++; + } + + c = start; + + /* Try to acquire as much chunks as required. */ + while (nchunks > 0) { + + if (nxt_port_mmap_chk_set_chunk_busy(hdr, c) == 0) { + break; + } + + c++; + nchunks--; + } + + if (nchunks != 0 && + min_size > free_size + PORT_MMAP_CHUNK_SIZE * (c - start)) { + + c--; + while (c >= start) { + nxt_port_mmap_set_chunk_free(hdr, c); + c--; + } + + return NXT_ERROR; + } else { + b->mem.end += PORT_MMAP_CHUNK_SIZE * (c - start); + + return NXT_OK; + } +} + + +nxt_int_t +nxt_go_ctx_write(nxt_go_run_ctx_t *ctx, void *data, size_t len) +{ + size_t free_size, copy_size; + nxt_buf_t *buf; + nxt_port_mmap_msg_t *mmap_msg; + + buf = &ctx->wbuf; + + while (len > 0) { + if (ctx->nwbuf == 0) { + buf = nxt_go_port_mmap_get_buf(ctx, len); + + if (nxt_slow_path(buf == NULL)) { + return NXT_ERROR; + } + } + + do { + free_size = nxt_buf_mem_free_size(&buf->mem); + + if (free_size > 0) { + copy_size = nxt_min(free_size, len); + + buf->mem.free = nxt_cpymem(buf->mem.free, data, copy_size); + + mmap_msg = ctx->wmmap_msg + ctx->nwbuf - 1; + mmap_msg->size += copy_size; + + len -= copy_size; + data = nxt_pointer_to(data, copy_size); + + if (len == 0) { + return NXT_OK; + } + } + } while (nxt_go_port_mmap_increase_buf(buf, len, 1) == NXT_OK); + + if (ctx->nwbuf >= 8) { + nxt_go_ctx_flush(ctx, 0); + } + + buf = nxt_go_port_mmap_get_buf(ctx, len); + + if (nxt_slow_path(buf == NULL)) { + return NXT_ERROR; + } + } + + return NXT_OK; +} + + +static nxt_int_t +nxt_go_ctx_read_size_(nxt_go_run_ctx_t *ctx, size_t *size) +{ + nxt_buf_t *buf; + nxt_int_t rc; + + do { + buf = &ctx->rbuf; + + if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) < 1)) { + if (nxt_fast_path(nxt_buf_mem_used_size(&buf->mem) == 0)) { + + ctx->nrbuf++; + rc = nxt_go_ctx_init_rbuf(ctx); + if (nxt_slow_path(rc != NXT_OK)) { + nxt_go_warn("read size: init rbuf failed"); + return rc; + } + + continue; + } + nxt_go_warn("read size: used size is not 0"); + return NXT_ERROR; + } + + if (buf->mem.pos[0] >= 128) { + if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) < 4)) { + nxt_go_warn("read size: used size < 4"); + return NXT_ERROR; + } + } + + break; + } while (1); + + buf->mem.pos = nxt_app_msg_read_length(buf->mem.pos, size); + + return NXT_OK; +} + +nxt_int_t +nxt_go_ctx_read_size(nxt_go_run_ctx_t *ctx, size_t *size) +{ + nxt_int_t rc; + + rc = nxt_go_ctx_read_size_(ctx, size); + + if (nxt_fast_path(rc == NXT_OK)) { + nxt_go_debug("read_size: %d", (int)*size); + } + + return rc; +} + +nxt_int_t +nxt_go_ctx_read_str(nxt_go_run_ctx_t *ctx, nxt_str_t *str) +{ + size_t length; + nxt_int_t rc; + nxt_buf_t *buf; + + rc = nxt_go_ctx_read_size_(ctx, &length); + if (nxt_slow_path(rc != NXT_OK)) { + nxt_go_warn("read str: read size failed"); + return rc; + } + + buf = &ctx->rbuf; + + if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) < (intptr_t)length)) { + nxt_go_warn("read str: used size too small %d < %d", + (int)nxt_buf_mem_used_size(&buf->mem), (int)length); + return NXT_ERROR; + } + + if (length > 0) { + str->start = buf->mem.pos; + str->length = length - 1; + + buf->mem.pos += length; + + nxt_go_debug("read_str: %d %.*s", (int)length - 1, (int)length - 1, + str->start); + } else { + str->start = NULL; + str->length = 0; + + nxt_go_debug("read_str: NULL"); + } + + return NXT_OK; +} + + +size_t +nxt_go_ctx_read_raw(nxt_go_run_ctx_t *ctx, void *dst, size_t size) +{ + size_t res, read_size; + nxt_int_t rc; + nxt_buf_t *buf; + + res = 0; + + while (size > 0) { + buf = &ctx->rbuf; + + if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) == 0)) { + ctx->nrbuf++; + rc = nxt_go_ctx_init_rbuf(ctx); + if (nxt_slow_path(rc != NXT_OK)) { + nxt_go_warn("read raw: init rbuf failed"); + return res; + } + + continue; + } + + read_size = nxt_buf_mem_used_size(&buf->mem); + read_size = nxt_min(read_size, size); + + dst = nxt_cpymem(dst, buf->mem.pos, read_size); + + size -= read_size; + buf->mem.pos += read_size; + res += read_size; + } + + nxt_go_debug("read_raw: %d", (int) res); + + return res; +} + + +#endif /* NXT_CONFIGURE */ diff --git a/src/go/unit/nxt_go_run_ctx.h b/src/go/unit/nxt_go_run_ctx.h new file mode 100644 index 00000000..a5a972c6 --- /dev/null +++ b/src/go/unit/nxt_go_run_ctx.h @@ -0,0 +1,75 @@ + +/* + * Copyright (C) Max Romanov + * Copyright (C) NGINX, Inc. + */ + +#ifndef _NXT_GO_RUN_CTX_H_INCLUDED_ +#define _NXT_GO_RUN_CTX_H_INCLUDED_ + + +#include +#include +#include + +#ifndef _NXT_GO_PROCESS_T_DEFINED_ +#define _NXT_GO_PROCESS_T_DEFINED_ +typedef struct nxt_go_process_s nxt_go_process_t; +#endif + +typedef struct nxt_go_msg_s nxt_go_msg_t; + +struct nxt_go_msg_s { + off_t start_offset; + + nxt_port_msg_t *port_msg; + size_t raw_size; + size_t data_size; + + nxt_port_mmap_msg_t *mmap_msg; + nxt_port_mmap_msg_t *end; + + nxt_go_msg_t *next; +}; + + +typedef struct { + nxt_go_msg_t msg; + + nxt_go_process_t *process; + nxt_port_mmap_msg_t *wmmap_msg; + + uint32_t nrbuf; + nxt_buf_t rbuf; + + uint32_t nwbuf; + nxt_buf_t wbuf; + nxt_port_msg_t wport_msg; + char wmmap_msg_buf[ sizeof(nxt_port_mmap_msg_t) * 8 ]; + + nxt_app_request_t r; + + nxt_go_msg_t *msg_last; +} nxt_go_run_ctx_t; + + +void nxt_go_ctx_release_msg(nxt_go_run_ctx_t *ctx, nxt_go_msg_t *msg); + +nxt_int_t nxt_go_ctx_init(nxt_go_run_ctx_t *ctx, nxt_port_msg_t *port_msg, + size_t payload_size); + +void nxt_go_ctx_add_msg(nxt_go_run_ctx_t *ctx, nxt_port_msg_t *port_msg, + size_t size); + +nxt_int_t nxt_go_ctx_flush(nxt_go_run_ctx_t *ctx, int last); + +nxt_int_t nxt_go_ctx_write(nxt_go_run_ctx_t *ctx, void *data, size_t len); + +nxt_int_t nxt_go_ctx_read_size(nxt_go_run_ctx_t *ctx, size_t *size); + +nxt_int_t nxt_go_ctx_read_str(nxt_go_run_ctx_t *ctx, nxt_str_t *str); + +size_t nxt_go_ctx_read_raw(nxt_go_run_ctx_t *ctx, void *dst, size_t size); + + +#endif /* _NXT_GO_RUN_CTX_H_INCLUDED_ */ diff --git a/src/go/unit/port.go b/src/go/unit/port.go new file mode 100644 index 00000000..a8faa2a0 --- /dev/null +++ b/src/go/unit/port.go @@ -0,0 +1,219 @@ +/* + * Copyright (C) Max Romanov + * Copyright (C) NGINX, Inc. + */ + +package unit + +/* +#include "nxt_go_lib.h" +#include "nxt_process_type.h" +*/ +import "C" + +import ( + "fmt" + "net" + "net/http" + "os" + "sync" + "unsafe" +) + +type port_key struct { + pid int + id int +} + +type port struct { + key port_key + t int + rcv *net.UnixConn + snd *net.UnixConn +} + +type port_registry struct { + sync.RWMutex + m map[port_key]*port + t [C.NXT_PROCESS_MAX]*port +} + +var port_registry_ port_registry + +func find_port(key port_key) *port { + port_registry_.RLock() + res := port_registry_.m[key] + port_registry_.RUnlock() + + return res +} + +func remove_by_pid(pid int) { + port_registry_.Lock() + if port_registry_.m != nil { + for k, p := range port_registry_.m { + if k.pid == pid { + if port_registry_.t[p.t] == p { + port_registry_.t[p.t] = nil + } + + delete(port_registry_.m, k) + } + } + } + + port_registry_.Unlock() +} + +func main_port() *port { + port_registry_.RLock() + res := port_registry_.t[C.NXT_PROCESS_MAIN] + port_registry_.RUnlock() + + return res +} + +func add_port(p *port) { + port_registry_.Lock() + if port_registry_.m == nil { + port_registry_.m = make(map[port_key]*port) + } + + port_registry_.m[p.key] = p + port_registry_.t[p.t] = p + + port_registry_.Unlock() +} + +func (p *port) Close() { + if p.rcv != nil { + p.rcv.Close() + } + + if p.snd != nil { + p.snd.Close() + } +} + +func getUnixConn(fd int) *net.UnixConn { + if fd < 0 { + return nil + } + + f := os.NewFile(uintptr(fd), "sock") + defer f.Close() + + c, err := net.FileConn(f) + if err != nil { + fmt.Printf("FileConn error %s\n", err) + return nil + } + + uc, ok := c.(*net.UnixConn) + if !ok { + fmt.Printf("Not a Unix-domain socket %d\n", fd) + return nil + } + + return uc +} + +//export nxt_go_new_port +func nxt_go_new_port(pid C.int, id C.int, t C.int, rcv C.int, snd C.int) { + new_port(int(pid), int(id), int(t), int(rcv), int(snd)) +} + +//export nxt_go_port_send +func nxt_go_port_send(pid C.int, id C.int, buf unsafe.Pointer, buf_size C.int, oob unsafe.Pointer, oob_size C.int) C.int { + key := port_key{ + pid: int(pid), + id: int(id), + } + + p := find_port(key) + + if p != nil { + n, oobn, err := p.snd.WriteMsgUnix(C.GoBytes(buf, buf_size), C.GoBytes(oob, oob_size), nil) + + if err != nil { + fmt.Printf("write result %d (%d), %s\n", n, oobn, err) + } + + return C.int(n) + } + + return 0 +} + +//export nxt_go_main_send +func nxt_go_main_send(buf unsafe.Pointer, buf_size C.int, oob unsafe.Pointer, oob_size C.int) C.int { + p := main_port() + + if p != nil { + n, oobn, err := p.snd.WriteMsgUnix(C.GoBytes(buf, buf_size), C.GoBytes(oob, oob_size), nil) + + if err != nil { + fmt.Printf("write result %d (%d), %s\n", n, oobn, err) + } + + return C.int(n) + } + + return 0 +} + +func new_port(pid int, id int, t int, rcv int, snd int) *port { + p := &port{ + key: port_key{ + pid: pid, + id: id, + }, + t: t, + rcv: getUnixConn(rcv), + snd: getUnixConn(snd), + } + + add_port(p) + + return p +} + +func (p *port) read(handler http.Handler) error { + var buf [16384]byte + var oob [1024]byte + + n, oobn, _, _, err := p.rcv.ReadMsgUnix(buf[:], oob[:]) + + if err != nil { + return err + } + + m := new_cmsg(buf[:n], oob[:oobn]) + + c_req := C.nxt_go_process_port_msg(m.buf.b, m.buf.s, m.oob.b, m.oob.s) + + if c_req == 0 { + m.Close() + } else { + r := find_request(c_req) + + go func(r *request) { + if handler == nil { + handler = http.DefaultServeMux + } + + handler.ServeHTTP(r.response(), &r.req) + r.done() + }(r) + + if len(r.msgs) == 0 { + r.push(m) + } else if r.ch != nil { + r.ch <- m + } else { + m.Close() + } + } + + return err +} diff --git a/src/go/unit/request.go b/src/go/unit/request.go new file mode 100644 index 00000000..aa6d1145 --- /dev/null +++ b/src/go/unit/request.go @@ -0,0 +1,211 @@ +/* + * Copyright (C) Max Romanov + * Copyright (C) NGINX, Inc. + */ + +package unit + +/* +#include "nxt_go_lib.h" +*/ +import "C" + +import ( + "net/http" + "net/url" + "sync" +) + +type request struct { + req http.Request + resp *response + c_req C.nxt_go_request_t + id C.uint32_t + msgs []*cmsg + ch chan *cmsg +} + +func (r *request) Read(p []byte) (n int, err error) { + c := C.size_t(cap(p)) + b := C.malloc(c) + res := C.nxt_go_request_read(r.c_req, b, c) + + if res == -2 /* NXT_AGAIN */ { + m := <-r.ch + + res = C.nxt_go_request_read_from(r.c_req, b, c, m.buf.b, m.buf.s) + r.push(m) + } + + if res > 0 { + copy(p, C.GoBytes(b, res)) + } + + C.free(b) + return int(res), nil +} + +func (r *request) Close() error { + C.nxt_go_request_close(r.c_req) + return nil +} + +type request_registry struct { + sync.RWMutex + m map[C.nxt_go_request_t]*request + id map[C.uint32_t]*request +} + +var request_registry_ request_registry + +func find_request(c_req C.nxt_go_request_t) *request { + request_registry_.RLock() + res := request_registry_.m[c_req] + request_registry_.RUnlock() + + return res +} + +func find_request_by_id(id C.uint32_t) *request { + request_registry_.RLock() + res := request_registry_.id[id] + request_registry_.RUnlock() + + return res +} + +func add_request(r *request) { + request_registry_.Lock() + if request_registry_.m == nil { + request_registry_.m = make(map[C.nxt_go_request_t]*request) + request_registry_.id = make(map[C.uint32_t]*request) + } + + request_registry_.m[r.c_req] = r + request_registry_.id[r.id] = r + + request_registry_.Unlock() +} + +func remove_request(r *request) { + request_registry_.Lock() + if request_registry_.m != nil { + delete(request_registry_.m, r.c_req) + delete(request_registry_.id, r.id) + } + + request_registry_.Unlock() +} + +func (r *request) response() *response { + if r.resp == nil { + r.resp = new_response(r.c_req, &r.req) + } + + return r.resp +} + +func (r *request) done() { + C.nxt_go_request_done(r.c_req) + + remove_request(r) + + for _, m := range r.msgs { + m.Close() + } + + if r.ch != nil { + close(r.ch) + } +} + +func (r *request) push(m *cmsg) { + r.msgs = append(r.msgs, m) +} + +//export nxt_go_new_request +func nxt_go_new_request(c_req C.nxt_go_request_t, id C.uint32_t, c_method *C.nxt_go_str_t, c_uri *C.nxt_go_str_t) { + uri := C.GoStringN(c_uri.start, c_uri.length) + + var URL *url.URL + var err error + if URL, err = url.ParseRequestURI(uri); err != nil { + return + } + + r := &request{ + req: http.Request{ + Method: C.GoStringN(c_method.start, c_method.length), + URL: URL, + Header: http.Header{}, + Body: nil, + RequestURI: uri, + }, + c_req: c_req, + id: id, + msgs: make([]*cmsg, 0, 1), + } + r.req.Body = r + + add_request(r) +} + +//export nxt_go_find_request +func nxt_go_find_request(id C.uint32_t) C.nxt_go_request_t { + r := find_request_by_id(id) + + if r != nil { + return r.c_req + } + + return 0 +} + +//export nxt_go_request_set_proto +func nxt_go_request_set_proto(c_req C.nxt_go_request_t, proto *C.nxt_go_str_t, maj C.int, min C.int) { + r := find_request(c_req) + r.req.Proto = C.GoStringN(proto.start, proto.length) + r.req.ProtoMajor = int(maj) + r.req.ProtoMinor = int(min) +} + +//export nxt_go_request_add_header +func nxt_go_request_add_header(c_req C.nxt_go_request_t, name *C.nxt_go_str_t, value *C.nxt_go_str_t) { + r := find_request(c_req) + r.req.Header.Add(C.GoStringN(name.start, name.length), C.GoStringN(value.start, value.length)) +} + +//export nxt_go_request_set_content_length +func nxt_go_request_set_content_length(c_req C.nxt_go_request_t, l C.int64_t) { + find_request(c_req).req.ContentLength = int64(l) +} + +//export nxt_go_request_create_channel +func nxt_go_request_create_channel(c_req C.nxt_go_request_t) { + find_request(c_req).ch = make(chan *cmsg) +} + +//export nxt_go_request_set_host +func nxt_go_request_set_host(c_req C.nxt_go_request_t, host *C.nxt_go_str_t) { + find_request(c_req).req.Host = C.GoStringN(host.start, host.length) +} + +//export nxt_go_request_set_url +func nxt_go_request_set_url(c_req C.nxt_go_request_t, scheme *C.char) { + find_request(c_req).req.URL.Scheme = C.GoString(scheme) +} + +//export nxt_go_request_set_remote_addr +func nxt_go_request_set_remote_addr(c_req C.nxt_go_request_t, addr *C.nxt_go_str_t) { + find_request(c_req).req.RemoteAddr = C.GoStringN(addr.start, addr.length) +} + +//export nxt_go_request_serve +func nxt_go_request_serve(c_req C.nxt_go_request_t) { + r := find_request(c_req) + + go func(r *request) { + http.DefaultServeMux.ServeHTTP(r.response(), &r.req) + r.done() + }(r) +} diff --git a/src/go/unit/response.go b/src/go/unit/response.go new file mode 100644 index 00000000..18daa2f3 --- /dev/null +++ b/src/go/unit/response.go @@ -0,0 +1,69 @@ +/* + * Copyright (C) Max Romanov + * Copyright (C) NGINX, Inc. + */ + +package unit + +/* +#include "nxt_go_lib.h" +*/ +import "C" + +import ( + "fmt" + "net/http" + "os" +) + +type response struct { + header http.Header + headerSent bool + req *http.Request + c_req C.nxt_go_request_t +} + +func new_response(c_req C.nxt_go_request_t, req *http.Request) *response { + resp := &response{ + header: http.Header{}, + req: req, + c_req: c_req, + } + + return resp +} + +func (r *response) Header() http.Header { + return r.header +} + +func (r *response) Write(p []byte) (n int, err error) { + if !r.headerSent { + r.WriteHeader(http.StatusOK) + } + + l := C.size_t(len(p)) + b := getCBytes(p) + res := C.nxt_go_response_write(r.c_req, b, l) + C.free(b) + return int(res), nil +} + +func (r *response) WriteHeader(code int) { + if r.headerSent { + // Note: explicitly using Stderr, as Stdout is our HTTP output. + fmt.Fprintf(os.Stderr, "CGI attempted to write header twice") + return + } + r.headerSent = true + fmt.Fprintf(r, "%s %d %s\r\n", r.req.Proto, code, http.StatusText(code)) + + // Set a default Content-Type + if _, hasType := r.header["Content-Type"]; !hasType { + r.header.Add("Content-Type", "text/html; charset=utf-8") + } + + r.header.Write(r) + + r.Write([]byte("\r\n")) +} diff --git a/src/go/unit/unit.go b/src/go/unit/unit.go new file mode 100644 index 00000000..74e53ecf --- /dev/null +++ b/src/go/unit/unit.go @@ -0,0 +1,134 @@ +/* + * Copyright (C) Max Romanov + * Copyright (C) NGINX, Inc. + */ + +package unit + +/* +#include "nxt_go_lib.h" +*/ +import "C" + +import ( + "fmt" + "net/http" + "os" + "strconv" + "strings" + "unsafe" +) + +type cbuf struct { + b unsafe.Pointer + s C.size_t + f bool +} + +func new_cbuf(buf []byte) *cbuf { + if len(buf) == 0 { + return nil + } + + return &cbuf{ + getCBytes(buf), C.size_t(len(buf)), true, + } +} + +func (buf *cbuf) Close() { + if buf == nil { + return + } + + if buf.f && buf.s > 0 { + C.free(buf.b) + buf.f = false + buf.b = nil + buf.s = 0 + } +} + +func (buf *cbuf) GoBytes() []byte { + if buf == nil { + var b [0]byte + return b[:0] + } + + return C.GoBytes(buf.b, C.int(buf.s)) +} + +type cmsg struct { + buf cbuf + oob cbuf +} + +func new_cmsg(buf []byte, oob []byte) *cmsg { + return &cmsg{ + buf: cbuf{getCBytes(buf), C.size_t(len(buf)), true}, + oob: cbuf{getCBytes(oob), C.size_t(len(oob)), true}, + } +} + +func (msg *cmsg) Close() { + msg.buf.Close() + msg.oob.Close() +} + +var nxt_go_quit bool = false + +//export nxt_go_set_quit +func nxt_go_set_quit() { + nxt_go_quit = true +} + +func ListenAndServe(addr string, handler http.Handler) error { + var read_port *port + + go_ports_env := os.Getenv("NXT_GO_PORTS") + + ports := strings.Split(go_ports_env, ";") + pid := os.Getpid() + + for _, port_str := range ports { + if len(port_str) <= 0 { + continue + } + + attrs := strings.Split(port_str, ",") + + var attrsN [5]int + var err error + for i, attr := range attrs { + attrsN[i], err = strconv.Atoi(attr) + if err != nil { + fmt.Printf("err %s\n", err) + break + } + } + + if err != nil { + continue + } + + p := new_port(attrsN[0], attrsN[1], attrsN[2], attrsN[3], attrsN[4]) + + if attrsN[0] == pid { + read_port = p + } + } + + if read_port != nil { + C.nxt_go_ready() + + for !nxt_go_quit { + err := read_port.read(handler) + if err != nil { + return err + } + } + } else { + return http.ListenAndServe(addr, handler) + } + + return nil +} diff --git a/src/nginext/cbytes-1.6.go b/src/nginext/cbytes-1.6.go deleted file mode 100644 index 7686dad8..00000000 --- a/src/nginext/cbytes-1.6.go +++ /dev/null @@ -1,15 +0,0 @@ -// +build !go1.7 - -/* - * Copyright (C) Max Romanov - * Copyright (C) NGINX, Inc. - */ - -package nginext - -import "C" -import "unsafe" - -func getCBytes(p []byte) unsafe.Pointer { - return unsafe.Pointer(C.CString(string(p))) // go <= 1.6 -} diff --git a/src/nginext/cbytes-1.7.go b/src/nginext/cbytes-1.7.go deleted file mode 100644 index e7d92f04..00000000 --- a/src/nginext/cbytes-1.7.go +++ /dev/null @@ -1,15 +0,0 @@ -// +build go1.7 - -/* - * Copyright (C) Max Romanov - * Copyright (C) NGINX, Inc. - */ - -package nginext - -import "C" -import "unsafe" - -func getCBytes(p []byte) unsafe.Pointer { - return C.CBytes(p) // go >= 1.7 -} diff --git a/src/nginext/nginext.go b/src/nginext/nginext.go deleted file mode 100644 index 895d6b28..00000000 --- a/src/nginext/nginext.go +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Copyright (C) Max Romanov - * Copyright (C) NGINX, Inc. - */ - -package nginext - -/* -#include "nxt_go_lib.h" -*/ -import "C" - -import ( - "fmt" - "net/http" - "os" - "strconv" - "strings" - "unsafe" -) - -type cbuf struct { - b unsafe.Pointer - s C.size_t - f bool -} - -func new_cbuf(buf []byte) *cbuf { - if len(buf) == 0 { - return nil - } - - return &cbuf{ - getCBytes(buf), C.size_t(len(buf)), true, - } -} - -func (buf *cbuf) Close() { - if buf == nil { - return - } - - if buf.f && buf.s > 0 { - C.free(buf.b) - buf.f = false - buf.b = nil - buf.s = 0 - } -} - -func (buf *cbuf) GoBytes() []byte { - if buf == nil { - var b [0]byte - return b[:0] - } - - return C.GoBytes(buf.b, C.int(buf.s)) -} - -type cmsg struct { - buf cbuf - oob cbuf -} - -func new_cmsg(buf []byte, oob []byte) *cmsg { - return &cmsg{ - buf: cbuf{getCBytes(buf), C.size_t(len(buf)), true}, - oob: cbuf{getCBytes(oob), C.size_t(len(oob)), true}, - } -} - -func (msg *cmsg) Close() { - msg.buf.Close() - msg.oob.Close() -} - -var nxt_go_quit bool = false - -//export nxt_go_set_quit -func nxt_go_set_quit() { - nxt_go_quit = true -} - -func ListenAndServe(addr string, handler http.Handler) error { - var read_port *port - - go_ports_env := os.Getenv("NXT_GO_PORTS") - - ports := strings.Split(go_ports_env, ";") - pid := os.Getpid() - - for _, port_str := range ports { - if len(port_str) <= 0 { - continue - } - - attrs := strings.Split(port_str, ",") - - var attrsN [5]int - var err error - for i, attr := range attrs { - attrsN[i], err = strconv.Atoi(attr) - if err != nil { - fmt.Printf("err %s\n", err) - break - } - } - - if err != nil { - continue - } - - p := new_port(attrsN[0], attrsN[1], attrsN[2], attrsN[3], attrsN[4]) - - if attrsN[0] == pid { - read_port = p - } - } - - if read_port != nil { - C.nxt_go_ready() - - for !nxt_go_quit { - err := read_port.read(handler) - if err != nil { - return err - } - } - } else { - return http.ListenAndServe(addr, handler) - } - - return nil -} diff --git a/src/nginext/nxt_go_array.c b/src/nginext/nxt_go_array.c deleted file mode 100644 index f409cb10..00000000 --- a/src/nginext/nxt_go_array.c +++ /dev/null @@ -1,66 +0,0 @@ - -/* - * Copyright (C) Max Romanov - * Copyright (C) NGINX, Inc. - */ - -#ifndef NXT_CONFIGURE - -#include -#include - -#include - -#include "nxt_go_array.h" - -void -nxt_go_array_init(nxt_array_t *array, nxt_uint_t n, size_t size) -{ - array->elts = malloc(n * size); - - if (nxt_slow_path(n != 0 && array->elts == NULL)) { - return; - } - - array->nelts = 0; - array->size = size; - array->nalloc = n; - array->mem_pool = NULL; -} - -void * -nxt_go_array_add(nxt_array_t *array) -{ - void *p; - uint32_t nalloc, new_alloc; - - nalloc = array->nalloc; - - if (array->nelts == nalloc) { - - if (nalloc < 16) { - /* Allocate new array twice larger than current. */ - new_alloc = nalloc * 2; - - } else { - /* Allocate new array 1.5 times larger than current. */ - new_alloc = nalloc + nalloc / 2; - } - - p = realloc(array->elts, array->size * new_alloc); - - if (nxt_slow_path(p == NULL)) { - return NULL; - } - - array->elts = p; - array->nalloc = new_alloc; - } - - p = nxt_pointer_to(array->elts, array->size * array->nelts); - array->nelts++; - - return p; -} - -#endif /* NXT_CONFIGURE */ diff --git a/src/nginext/nxt_go_array.h b/src/nginext/nxt_go_array.h deleted file mode 100644 index d96db663..00000000 --- a/src/nginext/nxt_go_array.h +++ /dev/null @@ -1,36 +0,0 @@ - -/* - * Copyright (C) Max Romanov - * Copyright (C) NGINX, Inc. - */ - -#ifndef _NXT_GO_ARRAY_H_INCLUDED_ -#define _NXT_GO_ARRAY_H_INCLUDED_ - - -#include - -void nxt_go_array_init(nxt_array_t *array, nxt_uint_t n, size_t size); - -void *nxt_go_array_add(nxt_array_t *array); - -nxt_inline void * -nxt_go_array_zero_add(nxt_array_t *array) -{ - void *p; - - p = nxt_go_array_add(array); - - if (nxt_fast_path(p != NULL)) { - nxt_memzero(p, array->size); - } - - return p; -} - -#define \ -nxt_go_array_at(array, n) \ - nxt_pointer_to((array)->elts, (array)->size * (n)) - - -#endif /* _NXT_GO_ARRAY_H_INCLUDED_ */ diff --git a/src/nginext/nxt_go_lib.c b/src/nginext/nxt_go_lib.c deleted file mode 100644 index 474d313b..00000000 --- a/src/nginext/nxt_go_lib.c +++ /dev/null @@ -1,193 +0,0 @@ - -/* - * Copyright (C) Max Romanov - * Copyright (C) NGINX, Inc. - */ - -#ifdef NXT_CONFIGURE - -#include -#include "nxt_go_lib.h" - -// Stubs to compile during configure process. -int -nxt_go_response_write(nxt_go_request_t r, void *buf, size_t len) -{ - return -1; -} - -int -nxt_go_request_read(nxt_go_request_t r, void *dst, size_t dst_len) -{ - return -1; -} - -int -nxt_go_request_read_from(nxt_go_request_t r, void *dst, size_t dst_len, - void *src, size_t src_len) -{ - return -1; -} - -int -nxt_go_request_close(nxt_go_request_t r) -{ - return -1; -} - -int -nxt_go_request_done(nxt_go_request_t r) -{ - return -1; -} - -void -nxt_go_ready() -{ -} - -nxt_go_request_t -nxt_go_process_port_msg(void *buf, size_t buf_len, void *oob, size_t oob_len) -{ - return 0; -} - -#else - -#include "nxt_go_run_ctx.h" -#include "nxt_go_log.h" -#include "nxt_go_port.h" - -#include -#include - -int -nxt_go_response_write(nxt_go_request_t r, void *buf, size_t len) -{ - nxt_int_t rc; - nxt_go_run_ctx_t *ctx; - - if (nxt_slow_path(r == 0)) { - return 0; - } - - nxt_go_debug("write: %d", (int) len); - - ctx = (nxt_go_run_ctx_t *) r; - rc = nxt_go_ctx_write(ctx, buf, len); - - return rc == NXT_OK ? len : -1; -} - - -int -nxt_go_request_read(nxt_go_request_t r, void *dst, size_t dst_len) -{ - size_t res; - nxt_go_run_ctx_t *ctx; - - if (nxt_slow_path(r == 0)) { - return 0; - } - - ctx = (nxt_go_run_ctx_t *) r; - - dst_len = nxt_min(dst_len, ctx->r.body.preread_size); - - res = nxt_go_ctx_read_raw(ctx, dst, dst_len); - - ctx->r.body.preread_size -= res; - - return res; -} - - -int -nxt_go_request_read_from(nxt_go_request_t r, void *dst, size_t dst_len, - void *src, size_t src_len) -{ - nxt_go_run_ctx_t *ctx; - - if (nxt_slow_path(r == 0)) { - return 0; - } - - ctx = (nxt_go_run_ctx_t *) r; - - nxt_go_ctx_add_msg(ctx, src, src_len); - - return nxt_go_request_read(r, dst, dst_len); -} - - -int -nxt_go_request_close(nxt_go_request_t r) -{ - return 0; -} - - -int -nxt_go_request_done(nxt_go_request_t r) -{ - nxt_int_t res; - nxt_go_run_ctx_t *ctx; - nxt_go_msg_t *msg, *b; - - if (nxt_slow_path(r == 0)) { - return 0; - } - - ctx = (nxt_go_run_ctx_t *) r; - - res = nxt_go_ctx_flush(ctx, 1); - - nxt_go_ctx_release_msg(ctx, &ctx->msg); - - msg = ctx->msg.next; - while (msg != NULL) { - nxt_go_ctx_release_msg(ctx, msg); - - b = msg; - msg = b->next; - - free(b); - } - - free(ctx); - - return res; -} - - -void -nxt_go_ready() -{ - char *go_stream; - nxt_port_msg_t port_msg; - - go_stream = getenv("NXT_GO_STREAM"); - - if (go_stream == NULL) { - return; - } - - port_msg.stream = atol(go_stream); - port_msg.pid = getpid(); - port_msg.reply_port = 0; - port_msg.type = _NXT_PORT_MSG_READY; - port_msg.last = 1; - port_msg.mmap = 0; - - nxt_go_main_send(&port_msg, sizeof(port_msg), NULL, 0); -} - - -nxt_go_request_t -nxt_go_process_port_msg(void *buf, size_t buf_len, void *oob, size_t oob_len) -{ - return nxt_go_port_on_read(buf, buf_len, oob, oob_len); -} - - -#endif /* NXT_CONFIGURE */ diff --git a/src/nginext/nxt_go_lib.h b/src/nginext/nxt_go_lib.h deleted file mode 100644 index b3a86be9..00000000 --- a/src/nginext/nxt_go_lib.h +++ /dev/null @@ -1,39 +0,0 @@ - -/* - * Copyright (C) Max Romanov - * Copyright (C) NGINX, Inc. - */ - -#ifndef _NXT_GO_LIB_H_INCLUDED_ -#define _NXT_GO_LIB_H_INCLUDED_ - - -#include -#include -#include - -typedef struct { - int length; - char *start; -} nxt_go_str_t; - -typedef uintptr_t nxt_go_request_t; - -int nxt_go_response_write(nxt_go_request_t r, void *buf, size_t len); - -int nxt_go_request_read(nxt_go_request_t r, void *dst, size_t dst_len); - -int nxt_go_request_read_from(nxt_go_request_t r, void *dst, size_t dst_len, - void *src, size_t src_len); - -int nxt_go_request_close(nxt_go_request_t r); - -int nxt_go_request_done(nxt_go_request_t r); - -void nxt_go_ready(); - -nxt_go_request_t nxt_go_process_port_msg(void *buf, size_t buf_len, - void *oob, size_t oob_len); - - -#endif /* _NXT_GO_LIB_H_INCLUDED_ */ diff --git a/src/nginext/nxt_go_log.h b/src/nginext/nxt_go_log.h deleted file mode 100644 index d596cfb3..00000000 --- a/src/nginext/nxt_go_log.h +++ /dev/null @@ -1,34 +0,0 @@ - -/* - * Copyright (C) Max Romanov - * Copyright (C) NGINX, Inc. - */ - -#ifndef _NXT_GO_LOG_H_INCLUDED_ -#define _NXT_GO_LOG_H_INCLUDED_ - - -#include -#include - -#include - -#if (NXT_DEBUG) - -#define nxt_go_debug(fmt, ARGS...) \ - fprintf(stderr, "[go debug] " fmt "\n", ##ARGS) - -#else - -#define nxt_go_debug(fmt, ARGS...) - -#endif - -#define nxt_go_warn(fmt, ARGS...) \ - fprintf(stderr, "[go warn] " fmt "\n", ##ARGS) - -#define nxt_go_error(fmt, ARGS...) \ - fprintf(stderr, "[go error] " fmt "\n", ##ARGS) - - -#endif /* _NXT_GO_LOG_H_INCLUDED_ */ diff --git a/src/nginext/nxt_go_mutex.h b/src/nginext/nxt_go_mutex.h deleted file mode 100644 index 98bd27f0..00000000 --- a/src/nginext/nxt_go_mutex.h +++ /dev/null @@ -1,21 +0,0 @@ - -/* - * Copyright (C) Max Romanov - * Copyright (C) NGINX, Inc. - */ - -#ifndef _NXT_GO_MUTEX_H_INCLUDED_ -#define _NXT_GO_MUTEX_H_INCLUDED_ - - -#include - -typedef pthread_mutex_t nxt_go_mutex_t; - -#define nxt_go_mutex_create(mutex) pthread_mutex_init(mutex, NULL) -#define nxt_go_mutex_destroy(mutex) pthread_mutex_destroy(mutex) -#define nxt_go_mutex_lock(mutex) pthread_mutex_lock(mutex) -#define nxt_go_mutex_unlock(mutex) pthread_mutex_unlock(mutex) - - -#endif /* _NXT_GO_MUTEX_H_INCLUDED_ */ diff --git a/src/nginext/nxt_go_port.c b/src/nginext/nxt_go_port.c deleted file mode 100644 index a46a33d1..00000000 --- a/src/nginext/nxt_go_port.c +++ /dev/null @@ -1,210 +0,0 @@ - -/* - * Copyright (C) Max Romanov - * Copyright (C) NGINX, Inc. - */ - -#ifndef NXT_CONFIGURE - - -#include "nxt_go_port.h" -#include "nxt_go_log.h" -#include "nxt_go_process.h" -#include "nxt_go_run_ctx.h" - -#include -#include - - -#define nxt_go_str(p) ((nxt_go_str_t *)(p)) - -static nxt_go_request_t -nxt_go_data_handler(nxt_port_msg_t *port_msg, size_t size) -{ - size_t s; - nxt_str_t n, v; - nxt_int_t rc; - nxt_uint_t i; - nxt_go_run_ctx_t *ctx; - nxt_go_request_t r; - nxt_app_request_header_t *h; - - r = nxt_go_find_request(port_msg->stream); - if (r != 0) { - return r; - } - - ctx = malloc(sizeof(nxt_go_run_ctx_t)); - nxt_go_ctx_init(ctx, port_msg, size - sizeof(nxt_port_msg_t)); - - r = (nxt_go_request_t)(ctx); - h = &ctx->r.header; - - nxt_go_ctx_read_str(ctx, &h->method); - nxt_go_ctx_read_str(ctx, &h->target); - nxt_go_ctx_read_str(ctx, &h->path); - - nxt_go_ctx_read_size(ctx, &s); - if (s > 0) { - s--; - h->query.start = h->target.start + s; - h->query.length = h->target.length - s; - - if (h->path.start == NULL) { - h->path.start = h->target.start; - h->path.length = s - 1; - } - } - - if (h->path.start == NULL) { - h->path = h->target; - } - - nxt_go_new_request(r, port_msg->stream, nxt_go_str(&h->method), - nxt_go_str(&h->target)); - - nxt_go_ctx_read_str(ctx, &h->version); - - nxt_go_request_set_proto(r, nxt_go_str(&h->version), - h->version.start[5] - '0', - h->version.start[7] - '0'); - - nxt_go_ctx_read_str(ctx, &ctx->r.remote); - if (ctx->r.remote.start != NULL) { - nxt_go_request_set_remote_addr(r, nxt_go_str(&ctx->r.remote)); - } - - nxt_go_ctx_read_str(ctx, &h->host); - nxt_go_ctx_read_str(ctx, &h->cookie); - nxt_go_ctx_read_str(ctx, &h->content_type); - nxt_go_ctx_read_str(ctx, &h->content_length); - - if (h->host.start != NULL) { - nxt_go_request_set_host(r, nxt_go_str(&h->host)); - } - - nxt_go_ctx_read_size(ctx, &s); - h->parsed_content_length = s; - - do { - rc = nxt_go_ctx_read_str(ctx, &n); - - if (n.length == 0) { - break; - } - - rc = nxt_go_ctx_read_str(ctx, &v); - nxt_go_request_add_header(r, nxt_go_str(&n), nxt_go_str(&v)); - } while(1); - - nxt_go_ctx_read_size(ctx, &s); - ctx->r.body.preread_size = s; - - if (h->parsed_content_length > 0) { - nxt_go_request_set_content_length(r, h->parsed_content_length); - } - - if (ctx->r.body.preread_size < h->parsed_content_length) { - nxt_go_request_create_channel(r); - } - - return r; -} - -nxt_go_request_t -nxt_go_port_on_read(void *buf, size_t buf_size, void *oob, size_t oob_size) -{ - void *buf_end; - void *payload; - size_t payload_size; - nxt_fd_t fd; - struct cmsghdr *cm; - nxt_port_msg_t *port_msg; - nxt_port_msg_new_port_t *new_port_msg; - - fd = -1; - nxt_go_debug("on read: %d (%d)", (int)buf_size, (int)oob_size); - - cm = oob; - if (oob_size >= CMSG_SPACE(sizeof(int)) - && cm->cmsg_len == CMSG_LEN(sizeof(int)) - && cm->cmsg_level == SOL_SOCKET - && cm->cmsg_type == SCM_RIGHTS) { - - nxt_memcpy(&fd, CMSG_DATA(cm), sizeof(int)); - nxt_go_debug("fd = %d", fd); - } - - port_msg = buf; - if (buf_size < sizeof(nxt_port_msg_t)) { - nxt_go_warn("message too small (%d bytes)", (int)buf_size); - goto fail; - } - - buf_end = ((char *)buf) + buf_size; - - payload = port_msg + 1; - payload_size = buf_size - sizeof(nxt_port_msg_t); - - if (port_msg->mmap) { - nxt_go_debug("using data in shared memory"); - } - - if (port_msg->type >= NXT_PORT_MSG_MAX) { - nxt_go_warn("unknown message type (%d)", (int)port_msg->type); - goto fail; - } - - switch (port_msg->type) { - case _NXT_PORT_MSG_QUIT: - nxt_go_debug("quit"); - - nxt_go_set_quit(); - break; - - case _NXT_PORT_MSG_NEW_PORT: - nxt_go_debug("new port"); - new_port_msg = payload; - - nxt_go_new_port(new_port_msg->pid, new_port_msg->id, new_port_msg->type, - -1, fd); - break; - - case _NXT_PORT_MSG_CHANGE_FILE: - nxt_go_debug("change file"); - break; - - case _NXT_PORT_MSG_MMAP: - nxt_go_debug("mmap"); - - nxt_go_new_incoming_mmap(port_msg->pid, fd); - break; - - case _NXT_PORT_MSG_DATA: - nxt_go_debug("data"); - - return nxt_go_data_handler(port_msg, buf_size); - - case _NXT_PORT_MSG_REMOVE_PID: - nxt_go_debug("remove pid"); - - /* TODO remove all ports for this pid in Go */ - /* TODO remove incoming & outgoing mmaps for this pid */ - break; - - default: - goto fail; - } - - -fail: - - if (fd != -1) { - close(fd); - } - - return 0; -} - - -#endif /* NXT_CONFIGURE */ diff --git a/src/nginext/nxt_go_port.h b/src/nginext/nxt_go_port.h deleted file mode 100644 index ce9dbcc3..00000000 --- a/src/nginext/nxt_go_port.h +++ /dev/null @@ -1,18 +0,0 @@ - -/* - * Copyright (C) Max Romanov - * Copyright (C) NGINX, Inc. - */ - -#ifndef _NXT_GO_PORT_H_INCLUDED_ -#define _NXT_GO_PORT_H_INCLUDED_ - - -#include -#include "nxt_go_lib.h" - -nxt_go_request_t -nxt_go_port_on_read(void *buf, size_t buf_size, void *oob, size_t oob_size); - - -#endif /* _NXT_GO_PORT_H_INCLUDED_ */ diff --git a/src/nginext/nxt_go_port_memory.c b/src/nginext/nxt_go_port_memory.c deleted file mode 100644 index dbdf5f73..00000000 --- a/src/nginext/nxt_go_port_memory.c +++ /dev/null @@ -1,192 +0,0 @@ - -/* - * Copyright (C) Max Romanov - * Copyright (C) NGINX, Inc. - */ - -#ifndef NXT_CONFIGURE - - -#include "nxt_go_port_memory.h" -#include "nxt_go_process.h" -#include "nxt_go_array.h" -#include "nxt_go_log.h" - -#include -#include - -#if (NXT_HAVE_MEMFD_CREATE) - -#include -#include -#include - -#endif - - -static nxt_port_mmap_header_t * -nxt_go_new_port_mmap(nxt_go_process_t *process, nxt_port_id_t id) -{ - int name_len, rc; - void *mem; - char name[64]; - nxt_fd_t fd; - nxt_port_msg_t port_msg; - nxt_port_mmap_t *port_mmap; - nxt_port_mmap_header_t *hdr; - - fd = -1; - - union { - struct cmsghdr cm; - char space[CMSG_SPACE(sizeof(int))]; - } cmsg; - - port_mmap = nxt_go_array_zero_add(&process->outgoing); - if (nxt_slow_path(port_mmap == NULL)) { - nxt_go_warn("failed to add port mmap to outgoing array"); - - return NULL; - } - - name_len = snprintf(name, sizeof(name) - 1, "/nginext.go.%p", name); - -#if (NXT_HAVE_MEMFD_CREATE) - fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC); - - if (nxt_slow_path(fd == -1)) { - nxt_go_warn("memfd_create(%s) failed %d", name, errno); - - goto remove_fail; - } - - nxt_go_debug("memfd_create(%s): %d", name, fd); - -#elif (NXT_HAVE_SHM_OPEN) - shm_unlink((char *) name); // just in case - - fd = shm_open((char *) name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR); - - nxt_go_debug("shm_open(%s): %d", name, fd); - - if (nxt_slow_path(fd == -1)) { - nxt_go_warn("shm_open(%s) failed %d", name, errno); - - goto remove_fail; - } - - if (nxt_slow_path(shm_unlink((char *) name) == -1)) { - nxt_go_warn("shm_unlink(%s) failed %d", name, errno); - } -#endif - - if (nxt_slow_path(ftruncate(fd, PORT_MMAP_SIZE) == -1)) { - nxt_go_warn("ftruncate() failed %d", errno); - - goto remove_fail; - } - - mem = mmap(NULL, PORT_MMAP_SIZE, - PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); - - if (nxt_slow_path(mem == MAP_FAILED)) { - goto remove_fail; - } - - port_mmap->hdr = mem; - - /* Init segment header. */ - hdr = port_mmap->hdr; - - memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map)); - - hdr->id = process->outgoing.nelts - 1; - hdr->pid = process->pid; - - /* Mark first chunk as busy */ - nxt_port_mmap_set_chunk_busy(hdr, 0); - - /* Mark as busy chunk followed the last available chunk. */ - nxt_port_mmap_set_chunk_busy(hdr, PORT_MMAP_CHUNK_COUNT); - - port_msg.stream = 0; - port_msg.pid = getpid(); - port_msg.reply_port = 0; - port_msg.type = _NXT_PORT_MSG_MMAP; - port_msg.last = 1; - port_msg.mmap = 0; - - cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int)); - cmsg.cm.cmsg_level = SOL_SOCKET; - cmsg.cm.cmsg_type = SCM_RIGHTS; - - /* - * nxt_memcpy() is used instead of simple - * *(int *) CMSG_DATA(&cmsg.cm) = fd; - * because GCC 4.4 with -O2/3/s optimization may issue a warning: - * dereferencing type-punned pointer will break strict-aliasing rules - * - * Fortunately, GCC with -O1 compiles this nxt_memcpy() - * in the same simple assignment as in the code above. - */ - memcpy(CMSG_DATA(&cmsg.cm), &fd, sizeof(int)); - - rc = nxt_go_port_send(hdr->pid, id, &port_msg, sizeof(port_msg), - &cmsg, sizeof(cmsg)); - - nxt_go_debug("new mmap #%d created for %d -> %d", - (int)hdr->id, (int)getpid(), (int)process->pid); - - close(fd); - - return hdr; - -remove_fail: - - if (fd != -1) { - close(fd); - } - - process->outgoing.nelts--; - - return NULL; -} - -nxt_port_mmap_header_t * -nxt_go_port_mmap_get(nxt_go_process_t *process, nxt_port_id_t port_id, - nxt_chunk_id_t *c) -{ - nxt_port_mmap_t *port_mmap; - nxt_port_mmap_t *end_port_mmap; - nxt_port_mmap_header_t *hdr; - - port_mmap = NULL; - hdr = NULL; - - nxt_go_mutex_lock(&process->outgoing_mutex); - - port_mmap = process->outgoing.elts; - end_port_mmap = port_mmap + process->outgoing.nelts; - - while (port_mmap < end_port_mmap) { - - if (nxt_port_mmap_get_free_chunk(port_mmap->hdr, c)) { - hdr = port_mmap->hdr; - - goto unlock_return; - } - - port_mmap++; - } - - hdr = nxt_go_new_port_mmap(process, port_id); - -unlock_return: - - nxt_go_mutex_unlock(&process->outgoing_mutex); - - return hdr; -} - - -#endif /* NXT_CONFIGURE */ diff --git a/src/nginext/nxt_go_port_memory.h b/src/nginext/nxt_go_port_memory.h deleted file mode 100644 index f6fe94e3..00000000 --- a/src/nginext/nxt_go_port_memory.h +++ /dev/null @@ -1,24 +0,0 @@ - -/* - * Copyright (C) Max Romanov - * Copyright (C) NGINX, Inc. - */ - -#ifndef _NXT_GO_PORT_MEMORY_H_INCLUDED_ -#define _NXT_GO_PORT_MEMORY_H_INCLUDED_ - - -#include -#include - -#ifndef _NXT_GO_PROCESS_T_DEFINED_ -#define _NXT_GO_PROCESS_T_DEFINED_ -typedef struct nxt_go_process_s nxt_go_process_t; -#endif - -struct nxt_port_mmap_header_s * -nxt_go_port_mmap_get(nxt_go_process_t *process, nxt_port_id_t port_id, - nxt_chunk_id_t *c); - - -#endif /* _NXT_GO_PORT_MEMORY_H_INCLUDED_ */ diff --git a/src/nginext/nxt_go_process.c b/src/nginext/nxt_go_process.c deleted file mode 100644 index c7ce052d..00000000 --- a/src/nginext/nxt_go_process.c +++ /dev/null @@ -1,150 +0,0 @@ - -/* - * Copyright (C) Max Romanov - * Copyright (C) NGINX, Inc. - */ - -#ifndef NXT_CONFIGURE - - -#include "nxt_go_process.h" -#include "nxt_go_array.h" -#include "nxt_go_mutex.h" -#include "nxt_go_log.h" - -#include - - -static nxt_array_t processes; /* of nxt_go_process_t */ - -static nxt_go_process_t * -nxt_go_find_process(nxt_pid_t pid, uint32_t *pos) -{ - uint32_t l, r, i; - nxt_go_process_t *process; - - if (nxt_slow_path(processes.size == 0)) { - nxt_go_array_init(&processes, 1, sizeof(nxt_go_process_t)); - } - - l = 0; - r = processes.nelts; - i = (l + r) / 2; - - while (r > l) { - process = nxt_go_array_at(&processes, i); - - nxt_go_debug("compare process #%d (%p) at %d", - (int)process->pid, process, (int)i); - - if (pid == process->pid) { - nxt_go_debug("found process %d at %d", (int)pid, (int)i); - - if (pos != NULL) { - *pos = i; - } - - return process; - } - - if (pid < process->pid) { - r = i; - } else { - l = i + 1; - } - - i = (l + r) / 2; - } - - if (pos != NULL) { - *pos = i; - } - - nxt_go_debug("process %d not found, best pos %d", (int)pid, (int)i); - - return NULL; -} - - -nxt_go_process_t * -nxt_go_get_process(nxt_pid_t pid) -{ - uint32_t pos; - nxt_go_process_t *process; - - process = nxt_go_find_process(pid, &pos); - - if (process == NULL) { - nxt_go_array_add(&processes); - process = nxt_go_array_at(&processes, pos); - - nxt_go_debug("init process #%d (%p) at %d", (int)pid, process, - (int)pos); - - if (pos < processes.nelts - 1) { - memmove(process + 1, process, - processes.size * (processes.nelts - 1 - pos)); - } - - process->pid = pid; - nxt_go_mutex_create(&process->incoming_mutex); - nxt_go_array_init(&process->incoming, 1, sizeof(nxt_port_mmap_t)); - nxt_go_mutex_create(&process->outgoing_mutex); - nxt_go_array_init(&process->outgoing, 1, sizeof(nxt_port_mmap_t)); - } - - return process; -} - - -void -nxt_go_new_incoming_mmap(nxt_pid_t pid, nxt_fd_t fd) -{ - void *mem; - struct stat mmap_stat; - nxt_port_mmap_t *port_mmap; - nxt_go_process_t *process; - - process = nxt_go_get_process(pid); - - nxt_go_debug("got new mmap fd #%d from process %d", - (int)fd, (int)pid); - - if (fstat(fd, &mmap_stat) == -1) { - nxt_go_warn("fstat(%d) failed %d", (int)fd, errno); - - return; - } - - nxt_go_mutex_lock(&process->incoming_mutex); - - port_mmap = nxt_go_array_zero_add(&process->incoming); - if (nxt_slow_path(port_mmap == NULL)) { - nxt_go_warn("failed to add mmap to incoming array"); - - goto fail; - } - - mem = mmap(NULL, mmap_stat.st_size, - PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); - - if (nxt_slow_path(mem == MAP_FAILED)) { - nxt_go_warn("mmap() failed %d", errno); - - goto fail; - } - - port_mmap->hdr = mem; - - if (nxt_slow_path(port_mmap->hdr->id != process->incoming.nelts - 1)) { - nxt_go_warn("port mmap id mismatch (%d != %d)", - port_mmap->hdr->id, process->incoming.nelts - 1); - } - -fail: - - nxt_go_mutex_unlock(&process->incoming_mutex); -} - - -#endif /* NXT_CONFIGURE */ diff --git a/src/nginext/nxt_go_process.h b/src/nginext/nxt_go_process.h deleted file mode 100644 index 037844ce..00000000 --- a/src/nginext/nxt_go_process.h +++ /dev/null @@ -1,33 +0,0 @@ - -/* - * Copyright (C) Max Romanov - * Copyright (C) NGINX, Inc. - */ - -#ifndef _NXT_GO_PROCESS_H_INCLUDED_ -#define _NXT_GO_PROCESS_H_INCLUDED_ - - -#include -#include "nxt_go_mutex.h" - -#ifndef _NXT_GO_PROCESS_T_DEFINED_ -#define _NXT_GO_PROCESS_T_DEFINED_ -typedef struct nxt_go_process_s nxt_go_process_t; -#endif - -struct nxt_go_process_s { - nxt_pid_t pid; - nxt_go_mutex_t incoming_mutex; - nxt_array_t incoming; /* of nxt_port_mmap_t */ - nxt_go_mutex_t outgoing_mutex; - nxt_array_t outgoing; /* of nxt_port_mmap_t */ -}; - -nxt_go_process_t *nxt_go_get_process(nxt_pid_t pid); - -void nxt_go_new_incoming_mmap(nxt_pid_t pid, nxt_fd_t fd); - - -#endif /* _NXT_GO_PROCESS_H_INCLUDED_ */ - diff --git a/src/nginext/nxt_go_run_ctx.c b/src/nginext/nxt_go_run_ctx.c deleted file mode 100644 index cca8273e..00000000 --- a/src/nginext/nxt_go_run_ctx.c +++ /dev/null @@ -1,531 +0,0 @@ - -/* - * Copyright (C) Max Romanov - * Copyright (C) NGINX, Inc. - */ - -#ifndef NXT_CONFIGURE - - -#include "nxt_go_run_ctx.h" -#include "nxt_go_log.h" -#include "nxt_go_process.h" -#include "nxt_go_array.h" -#include "nxt_go_mutex.h" -#include "nxt_go_port_memory.h" - -#include -#include -#include - - -static nxt_int_t -nxt_go_ctx_msg_rbuf(nxt_go_run_ctx_t *ctx, nxt_go_msg_t *msg, nxt_buf_t *buf, - uint32_t n) -{ - size_t nchunks; - nxt_port_mmap_t *port_mmap; - nxt_port_mmap_msg_t *mmap_msg; - - if (nxt_slow_path(msg->mmap_msg == NULL)) { - if (n > 0) { - nxt_go_warn("failed to get plain buf #%d", (int)n); - - return NXT_ERROR; - } - - buf->mem.start = (u_char *) (msg->port_msg + 1); - buf->mem.pos = buf->mem.start; - buf->mem.end = buf->mem.start + msg->raw_size; - buf->mem.free = buf->mem.end; - - return NXT_OK; - } - - mmap_msg = msg->mmap_msg + n; - if (nxt_slow_path(mmap_msg >= msg->end)) { - nxt_go_warn("no more data in shm #%d", (int)n); - - return NXT_ERROR; - } - - if (nxt_slow_path(mmap_msg->mmap_id >= ctx->process->incoming.nelts)) { - nxt_go_warn("incoming shared memory segment #%d not found " - "for process %d", (int)mmap_msg->mmap_id, - (int)msg->port_msg->pid); - - return NXT_ERROR; - } - - nxt_go_mutex_lock(&ctx->process->incoming_mutex); - - port_mmap = nxt_go_array_at(&ctx->process->incoming, mmap_msg->mmap_id); - buf->mem.start = nxt_port_mmap_chunk_start(port_mmap->hdr, - mmap_msg->chunk_id); - buf->mem.pos = buf->mem.start; - buf->mem.free = buf->mem.start + mmap_msg->size; - - nxt_go_mutex_unlock(&ctx->process->incoming_mutex); - - nchunks = mmap_msg->size / PORT_MMAP_CHUNK_SIZE; - if ((mmap_msg->size % PORT_MMAP_CHUNK_SIZE) != 0) { - nchunks++; - } - - buf->mem.end = buf->mem.start + nchunks * PORT_MMAP_CHUNK_SIZE; - - return NXT_OK; -} - -static nxt_int_t -nxt_go_ctx_init_rbuf(nxt_go_run_ctx_t *ctx) -{ - return nxt_go_ctx_msg_rbuf(ctx, &ctx->msg, &ctx->rbuf, ctx->nrbuf); -} - -static void -nxt_go_ctx_init_msg(nxt_go_msg_t *msg, nxt_port_msg_t *port_msg, - size_t payload_size) -{ - nxt_port_mmap_msg_t *mmap_msg; - - memset(msg, 0, sizeof(nxt_go_msg_t)); - - msg->port_msg = port_msg; - msg->raw_size = payload_size; - - if (nxt_fast_path(port_msg->mmap != 0)) { - msg->mmap_msg = (nxt_port_mmap_msg_t *) (port_msg + 1); - msg->end = nxt_pointer_to(msg->mmap_msg, payload_size); - - mmap_msg = msg->mmap_msg; - while(mmap_msg < msg->end) { - msg->data_size += mmap_msg->size; - mmap_msg += 1; - } - } else { - msg->mmap_msg = NULL; - msg->end = NULL; - msg->data_size = payload_size; - } -} - -void -nxt_go_ctx_release_msg(nxt_go_run_ctx_t *ctx, nxt_go_msg_t *msg) -{ - u_char *b, *e; - nxt_chunk_id_t c; - nxt_port_mmap_t *port_mmap; - nxt_port_mmap_msg_t *mmap_msg, *end; - - if (nxt_slow_path(msg->mmap_msg == NULL)) { - return; - } - - mmap_msg = msg->mmap_msg; - end = msg->end; - - nxt_go_mutex_lock(&ctx->process->incoming_mutex); - - for (; mmap_msg < end; mmap_msg++ ) { - port_mmap = nxt_go_array_at(&ctx->process->incoming, mmap_msg->mmap_id); - - c = mmap_msg->chunk_id; - b = nxt_port_mmap_chunk_start(port_mmap->hdr, c); - e = b + mmap_msg->size; - - while (b < e) { - nxt_port_mmap_set_chunk_free(port_mmap->hdr, c); - - b += PORT_MMAP_CHUNK_SIZE; - c++; - } - } - - nxt_go_mutex_unlock(&ctx->process->incoming_mutex); -} - - -nxt_int_t -nxt_go_ctx_init(nxt_go_run_ctx_t *ctx, nxt_port_msg_t *port_msg, - size_t payload_size) -{ - memset(ctx, 0, sizeof(nxt_go_run_ctx_t)); - - ctx->process = nxt_go_get_process(port_msg->pid); - if (nxt_slow_path(ctx->process == NULL)) { - nxt_go_warn("failed to get process %d", port_msg->pid); - - return NXT_ERROR; - } - - nxt_go_ctx_init_msg(&ctx->msg, port_msg, payload_size); - - ctx->msg_last = &ctx->msg; - - ctx->wport_msg.stream = port_msg->stream; - ctx->wport_msg.pid = getpid(); - ctx->wport_msg.type = _NXT_PORT_MSG_DATA; - ctx->wport_msg.mmap = 1; - - ctx->wmmap_msg = (nxt_port_mmap_msg_t *) ( &ctx->wport_msg + 1 ); - - return nxt_go_ctx_init_rbuf(ctx); -} - - -void -nxt_go_ctx_add_msg(nxt_go_run_ctx_t *ctx, nxt_port_msg_t *port_msg, size_t size) -{ - nxt_go_msg_t *msg; - - msg = malloc(sizeof(nxt_go_msg_t)); - - nxt_go_ctx_init_msg(msg, port_msg, size - sizeof(nxt_port_msg_t)); - - msg->start_offset = ctx->msg_last->start_offset; - - if (ctx->msg_last == &ctx->msg) { - msg->start_offset += ctx->r.body.preread_size; - } else { - msg->start_offset += ctx->msg_last->data_size; - } - - ctx->msg_last->next = msg; - ctx->msg_last = msg; -} - - -nxt_int_t -nxt_go_ctx_flush(nxt_go_run_ctx_t *ctx, int last) -{ - int i; - nxt_int_t rc; - - if (last != 0) { - ctx->wport_msg.last = 1; - } - - nxt_go_debug("flush buffers (%d)", last); - - for (i = 0; i < ctx->nwbuf; i++) { - nxt_port_mmap_msg_t *m = ctx->wmmap_msg + i; - - nxt_go_debug(" mmap_msg[%d]={%d, %d, %d}", i, - m->mmap_id, m->chunk_id, m->size); - } - - rc = nxt_go_port_send(ctx->msg.port_msg->pid, ctx->msg.port_msg->reply_port, - &ctx->wport_msg, sizeof(nxt_port_msg_t) + - ctx->nwbuf * sizeof(nxt_port_mmap_msg_t), NULL, 0); - - ctx->nwbuf = 0; - - memset(&ctx->wbuf, 0, sizeof(ctx->wbuf)); - - return rc; -} - - -nxt_buf_t * -nxt_go_port_mmap_get_buf(nxt_go_run_ctx_t *ctx, size_t size) -{ - size_t nchunks; - nxt_buf_t *buf; - nxt_chunk_id_t c; - nxt_port_mmap_t *port_mmap; - nxt_port_mmap_msg_t *mmap_msg; - nxt_port_mmap_header_t *hdr; - - c = 0; - - buf = &ctx->wbuf; - - hdr = nxt_go_port_mmap_get(ctx->process, - ctx->msg.port_msg->reply_port, &c); - if (nxt_slow_path(hdr == NULL)) { - nxt_go_warn("failed to get port_mmap"); - - return NULL; - } - - buf->mem.start = nxt_port_mmap_chunk_start(hdr, c); - buf->mem.pos = buf->mem.start; - buf->mem.free = buf->mem.start; - buf->mem.end = buf->mem.start + PORT_MMAP_CHUNK_SIZE; - - buf->parent = hdr; - - mmap_msg = ctx->wmmap_msg + ctx->nwbuf; - mmap_msg->mmap_id = hdr->id; - mmap_msg->chunk_id = c; - mmap_msg->size = 0; - - nchunks = size / PORT_MMAP_CHUNK_SIZE; - if ((size % PORT_MMAP_CHUNK_SIZE) != 0 || nchunks == 0) { - nchunks++; - } - - c++; - nchunks--; - - /* Try to acquire as much chunks as required. */ - while (nchunks > 0) { - - if (nxt_port_mmap_chk_set_chunk_busy(hdr, c) == 0) { - break; - } - - buf->mem.end += PORT_MMAP_CHUNK_SIZE; - c++; - nchunks--; - } - - ctx->nwbuf++; - - return buf; -} - - -nxt_int_t -nxt_go_port_mmap_increase_buf(nxt_buf_t *b, size_t size, size_t min_size) -{ - size_t nchunks, free_size; - nxt_chunk_id_t c, start; - nxt_port_mmap_header_t *hdr; - - free_size = nxt_buf_mem_free_size(&b->mem); - - if (nxt_slow_path(size <= free_size)) { - return NXT_OK; - } - - hdr = b->parent; - - start = nxt_port_mmap_chunk_id(hdr, b->mem.end); - - size -= free_size; - - nchunks = size / PORT_MMAP_CHUNK_SIZE; - if ((size % PORT_MMAP_CHUNK_SIZE) != 0 || nchunks == 0) { - nchunks++; - } - - c = start; - - /* Try to acquire as much chunks as required. */ - while (nchunks > 0) { - - if (nxt_port_mmap_chk_set_chunk_busy(hdr, c) == 0) { - break; - } - - c++; - nchunks--; - } - - if (nchunks != 0 && - min_size > free_size + PORT_MMAP_CHUNK_SIZE * (c - start)) { - - c--; - while (c >= start) { - nxt_port_mmap_set_chunk_free(hdr, c); - c--; - } - - return NXT_ERROR; - } else { - b->mem.end += PORT_MMAP_CHUNK_SIZE * (c - start); - - return NXT_OK; - } -} - - -nxt_int_t -nxt_go_ctx_write(nxt_go_run_ctx_t *ctx, void *data, size_t len) -{ - size_t free_size, copy_size; - nxt_buf_t *buf; - nxt_port_mmap_msg_t *mmap_msg; - - buf = &ctx->wbuf; - - while (len > 0) { - if (ctx->nwbuf == 0) { - buf = nxt_go_port_mmap_get_buf(ctx, len); - - if (nxt_slow_path(buf == NULL)) { - return NXT_ERROR; - } - } - - do { - free_size = nxt_buf_mem_free_size(&buf->mem); - - if (free_size > 0) { - copy_size = nxt_min(free_size, len); - - buf->mem.free = nxt_cpymem(buf->mem.free, data, copy_size); - - mmap_msg = ctx->wmmap_msg + ctx->nwbuf - 1; - mmap_msg->size += copy_size; - - len -= copy_size; - data = nxt_pointer_to(data, copy_size); - - if (len == 0) { - return NXT_OK; - } - } - } while (nxt_go_port_mmap_increase_buf(buf, len, 1) == NXT_OK); - - if (ctx->nwbuf >= 8) { - nxt_go_ctx_flush(ctx, 0); - } - - buf = nxt_go_port_mmap_get_buf(ctx, len); - - if (nxt_slow_path(buf == NULL)) { - return NXT_ERROR; - } - } - - return NXT_OK; -} - - -static nxt_int_t -nxt_go_ctx_read_size_(nxt_go_run_ctx_t *ctx, size_t *size) -{ - nxt_buf_t *buf; - nxt_int_t rc; - - do { - buf = &ctx->rbuf; - - if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) < 1)) { - if (nxt_fast_path(nxt_buf_mem_used_size(&buf->mem) == 0)) { - - ctx->nrbuf++; - rc = nxt_go_ctx_init_rbuf(ctx); - if (nxt_slow_path(rc != NXT_OK)) { - nxt_go_warn("read size: init rbuf failed"); - return rc; - } - - continue; - } - nxt_go_warn("read size: used size is not 0"); - return NXT_ERROR; - } - - if (buf->mem.pos[0] >= 128) { - if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) < 4)) { - nxt_go_warn("read size: used size < 4"); - return NXT_ERROR; - } - } - - break; - } while (1); - - buf->mem.pos = nxt_app_msg_read_length(buf->mem.pos, size); - - return NXT_OK; -} - -nxt_int_t -nxt_go_ctx_read_size(nxt_go_run_ctx_t *ctx, size_t *size) -{ - nxt_int_t rc; - - rc = nxt_go_ctx_read_size_(ctx, size); - - if (nxt_fast_path(rc == NXT_OK)) { - nxt_go_debug("read_size: %d", (int)*size); - } - - return rc; -} - -nxt_int_t -nxt_go_ctx_read_str(nxt_go_run_ctx_t *ctx, nxt_str_t *str) -{ - size_t length; - nxt_int_t rc; - nxt_buf_t *buf; - - rc = nxt_go_ctx_read_size_(ctx, &length); - if (nxt_slow_path(rc != NXT_OK)) { - nxt_go_warn("read str: read size failed"); - return rc; - } - - buf = &ctx->rbuf; - - if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) < (intptr_t)length)) { - nxt_go_warn("read str: used size too small %d < %d", - (int)nxt_buf_mem_used_size(&buf->mem), (int)length); - return NXT_ERROR; - } - - if (length > 0) { - str->start = buf->mem.pos; - str->length = length - 1; - - buf->mem.pos += length; - - nxt_go_debug("read_str: %d %.*s", (int)length - 1, (int)length - 1, - str->start); - } else { - str->start = NULL; - str->length = 0; - - nxt_go_debug("read_str: NULL"); - } - - return NXT_OK; -} - - -size_t -nxt_go_ctx_read_raw(nxt_go_run_ctx_t *ctx, void *dst, size_t size) -{ - size_t res, read_size; - nxt_int_t rc; - nxt_buf_t *buf; - - res = 0; - - while (size > 0) { - buf = &ctx->rbuf; - - if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) == 0)) { - ctx->nrbuf++; - rc = nxt_go_ctx_init_rbuf(ctx); - if (nxt_slow_path(rc != NXT_OK)) { - nxt_go_warn("read raw: init rbuf failed"); - return res; - } - - continue; - } - - read_size = nxt_buf_mem_used_size(&buf->mem); - read_size = nxt_min(read_size, size); - - dst = nxt_cpymem(dst, buf->mem.pos, read_size); - - size -= read_size; - buf->mem.pos += read_size; - res += read_size; - } - - nxt_go_debug("read_raw: %d", (int) res); - - return res; -} - - -#endif /* NXT_CONFIGURE */ diff --git a/src/nginext/nxt_go_run_ctx.h b/src/nginext/nxt_go_run_ctx.h deleted file mode 100644 index a5a972c6..00000000 --- a/src/nginext/nxt_go_run_ctx.h +++ /dev/null @@ -1,75 +0,0 @@ - -/* - * Copyright (C) Max Romanov - * Copyright (C) NGINX, Inc. - */ - -#ifndef _NXT_GO_RUN_CTX_H_INCLUDED_ -#define _NXT_GO_RUN_CTX_H_INCLUDED_ - - -#include -#include -#include - -#ifndef _NXT_GO_PROCESS_T_DEFINED_ -#define _NXT_GO_PROCESS_T_DEFINED_ -typedef struct nxt_go_process_s nxt_go_process_t; -#endif - -typedef struct nxt_go_msg_s nxt_go_msg_t; - -struct nxt_go_msg_s { - off_t start_offset; - - nxt_port_msg_t *port_msg; - size_t raw_size; - size_t data_size; - - nxt_port_mmap_msg_t *mmap_msg; - nxt_port_mmap_msg_t *end; - - nxt_go_msg_t *next; -}; - - -typedef struct { - nxt_go_msg_t msg; - - nxt_go_process_t *process; - nxt_port_mmap_msg_t *wmmap_msg; - - uint32_t nrbuf; - nxt_buf_t rbuf; - - uint32_t nwbuf; - nxt_buf_t wbuf; - nxt_port_msg_t wport_msg; - char wmmap_msg_buf[ sizeof(nxt_port_mmap_msg_t) * 8 ]; - - nxt_app_request_t r; - - nxt_go_msg_t *msg_last; -} nxt_go_run_ctx_t; - - -void nxt_go_ctx_release_msg(nxt_go_run_ctx_t *ctx, nxt_go_msg_t *msg); - -nxt_int_t nxt_go_ctx_init(nxt_go_run_ctx_t *ctx, nxt_port_msg_t *port_msg, - size_t payload_size); - -void nxt_go_ctx_add_msg(nxt_go_run_ctx_t *ctx, nxt_port_msg_t *port_msg, - size_t size); - -nxt_int_t nxt_go_ctx_flush(nxt_go_run_ctx_t *ctx, int last); - -nxt_int_t nxt_go_ctx_write(nxt_go_run_ctx_t *ctx, void *data, size_t len); - -nxt_int_t nxt_go_ctx_read_size(nxt_go_run_ctx_t *ctx, size_t *size); - -nxt_int_t nxt_go_ctx_read_str(nxt_go_run_ctx_t *ctx, nxt_str_t *str); - -size_t nxt_go_ctx_read_raw(nxt_go_run_ctx_t *ctx, void *dst, size_t size); - - -#endif /* _NXT_GO_RUN_CTX_H_INCLUDED_ */ diff --git a/src/nginext/port.go b/src/nginext/port.go deleted file mode 100644 index 768fbf84..00000000 --- a/src/nginext/port.go +++ /dev/null @@ -1,219 +0,0 @@ -/* - * Copyright (C) Max Romanov - * Copyright (C) NGINX, Inc. - */ - -package nginext - -/* -#include "nxt_go_lib.h" -#include "nxt_process_type.h" -*/ -import "C" - -import ( - "fmt" - "net" - "net/http" - "os" - "sync" - "unsafe" -) - -type port_key struct { - pid int - id int -} - -type port struct { - key port_key - t int - rcv *net.UnixConn - snd *net.UnixConn -} - -type port_registry struct { - sync.RWMutex - m map[port_key]*port - t [C.NXT_PROCESS_MAX]*port -} - -var port_registry_ port_registry - -func find_port(key port_key) *port { - port_registry_.RLock() - res := port_registry_.m[key] - port_registry_.RUnlock() - - return res -} - -func remove_by_pid(pid int) { - port_registry_.Lock() - if port_registry_.m != nil { - for k, p := range port_registry_.m { - if k.pid == pid { - if port_registry_.t[p.t] == p { - port_registry_.t[p.t] = nil - } - - delete(port_registry_.m, k) - } - } - } - - port_registry_.Unlock() -} - -func main_port() *port { - port_registry_.RLock() - res := port_registry_.t[C.NXT_PROCESS_MAIN] - port_registry_.RUnlock() - - return res -} - -func add_port(p *port) { - port_registry_.Lock() - if port_registry_.m == nil { - port_registry_.m = make(map[port_key]*port) - } - - port_registry_.m[p.key] = p - port_registry_.t[p.t] = p - - port_registry_.Unlock() -} - -func (p *port) Close() { - if p.rcv != nil { - p.rcv.Close() - } - - if p.snd != nil { - p.snd.Close() - } -} - -func getUnixConn(fd int) *net.UnixConn { - if fd < 0 { - return nil - } - - f := os.NewFile(uintptr(fd), "sock") - defer f.Close() - - c, err := net.FileConn(f) - if err != nil { - fmt.Printf("FileConn error %s\n", err) - return nil - } - - uc, ok := c.(*net.UnixConn) - if !ok { - fmt.Printf("Not a Unix-domain socket %d\n", fd) - return nil - } - - return uc -} - -//export nxt_go_new_port -func nxt_go_new_port(pid C.int, id C.int, t C.int, rcv C.int, snd C.int) { - new_port(int(pid), int(id), int(t), int(rcv), int(snd)) -} - -//export nxt_go_port_send -func nxt_go_port_send(pid C.int, id C.int, buf unsafe.Pointer, buf_size C.int, oob unsafe.Pointer, oob_size C.int) C.int { - key := port_key{ - pid: int(pid), - id: int(id), - } - - p := find_port(key) - - if p != nil { - n, oobn, err := p.snd.WriteMsgUnix(C.GoBytes(buf, buf_size), C.GoBytes(oob, oob_size), nil) - - if err != nil { - fmt.Printf("write result %d (%d), %s\n", n, oobn, err) - } - - return C.int(n) - } - - return 0 -} - -//export nxt_go_main_send -func nxt_go_main_send(buf unsafe.Pointer, buf_size C.int, oob unsafe.Pointer, oob_size C.int) C.int { - p := main_port() - - if p != nil { - n, oobn, err := p.snd.WriteMsgUnix(C.GoBytes(buf, buf_size), C.GoBytes(oob, oob_size), nil) - - if err != nil { - fmt.Printf("write result %d (%d), %s\n", n, oobn, err) - } - - return C.int(n) - } - - return 0 -} - -func new_port(pid int, id int, t int, rcv int, snd int) *port { - p := &port{ - key: port_key{ - pid: pid, - id: id, - }, - t: t, - rcv: getUnixConn(rcv), - snd: getUnixConn(snd), - } - - add_port(p) - - return p -} - -func (p *port) read(handler http.Handler) error { - var buf [16384]byte - var oob [1024]byte - - n, oobn, _, _, err := p.rcv.ReadMsgUnix(buf[:], oob[:]) - - if err != nil { - return err - } - - m := new_cmsg(buf[:n], oob[:oobn]) - - c_req := C.nxt_go_process_port_msg(m.buf.b, m.buf.s, m.oob.b, m.oob.s) - - if c_req == 0 { - m.Close() - } else { - r := find_request(c_req) - - go func(r *request) { - if handler == nil { - handler = http.DefaultServeMux - } - - handler.ServeHTTP(r.response(), &r.req) - r.done() - }(r) - - if len(r.msgs) == 0 { - r.push(m) - } else if r.ch != nil { - r.ch <- m - } else { - m.Close() - } - } - - return err -} diff --git a/src/nginext/request.go b/src/nginext/request.go deleted file mode 100644 index 1679f1c7..00000000 --- a/src/nginext/request.go +++ /dev/null @@ -1,211 +0,0 @@ -/* - * Copyright (C) Max Romanov - * Copyright (C) NGINX, Inc. - */ - -package nginext - -/* -#include "nxt_go_lib.h" -*/ -import "C" - -import ( - "net/http" - "net/url" - "sync" -) - -type request struct { - req http.Request - resp *response - c_req C.nxt_go_request_t - id C.uint32_t - msgs []*cmsg - ch chan *cmsg -} - -func (r *request) Read(p []byte) (n int, err error) { - c := C.size_t(cap(p)) - b := C.malloc(c) - res := C.nxt_go_request_read(r.c_req, b, c) - - if res == -2 /* NXT_AGAIN */ { - m := <-r.ch - - res = C.nxt_go_request_read_from(r.c_req, b, c, m.buf.b, m.buf.s) - r.push(m) - } - - if res > 0 { - copy(p, C.GoBytes(b, res)) - } - - C.free(b) - return int(res), nil -} - -func (r *request) Close() error { - C.nxt_go_request_close(r.c_req) - return nil -} - -type request_registry struct { - sync.RWMutex - m map[C.nxt_go_request_t]*request - id map[C.uint32_t]*request -} - -var request_registry_ request_registry - -func find_request(c_req C.nxt_go_request_t) *request { - request_registry_.RLock() - res := request_registry_.m[c_req] - request_registry_.RUnlock() - - return res -} - -func find_request_by_id(id C.uint32_t) *request { - request_registry_.RLock() - res := request_registry_.id[id] - request_registry_.RUnlock() - - return res -} - -func add_request(r *request) { - request_registry_.Lock() - if request_registry_.m == nil { - request_registry_.m = make(map[C.nxt_go_request_t]*request) - request_registry_.id = make(map[C.uint32_t]*request) - } - - request_registry_.m[r.c_req] = r - request_registry_.id[r.id] = r - - request_registry_.Unlock() -} - -func remove_request(r *request) { - request_registry_.Lock() - if request_registry_.m != nil { - delete(request_registry_.m, r.c_req) - delete(request_registry_.id, r.id) - } - - request_registry_.Unlock() -} - -func (r *request) response() *response { - if r.resp == nil { - r.resp = new_response(r.c_req, &r.req) - } - - return r.resp -} - -func (r *request) done() { - C.nxt_go_request_done(r.c_req) - - remove_request(r) - - for _, m := range r.msgs { - m.Close() - } - - if r.ch != nil { - close(r.ch) - } -} - -func (r *request) push(m *cmsg) { - r.msgs = append(r.msgs, m) -} - -//export nxt_go_new_request -func nxt_go_new_request(c_req C.nxt_go_request_t, id C.uint32_t, c_method *C.nxt_go_str_t, c_uri *C.nxt_go_str_t) { - uri := C.GoStringN(c_uri.start, c_uri.length) - - var URL *url.URL - var err error - if URL, err = url.ParseRequestURI(uri); err != nil { - return - } - - r := &request{ - req: http.Request{ - Method: C.GoStringN(c_method.start, c_method.length), - URL: URL, - Header: http.Header{}, - Body: nil, - RequestURI: uri, - }, - c_req: c_req, - id: id, - msgs: make([]*cmsg, 0, 1), - } - r.req.Body = r - - add_request(r) -} - -//export nxt_go_find_request -func nxt_go_find_request(id C.uint32_t) C.nxt_go_request_t { - r := find_request_by_id(id) - - if r != nil { - return r.c_req - } - - return 0 -} - -//export nxt_go_request_set_proto -func nxt_go_request_set_proto(c_req C.nxt_go_request_t, proto *C.nxt_go_str_t, maj C.int, min C.int) { - r := find_request(c_req) - r.req.Proto = C.GoStringN(proto.start, proto.length) - r.req.ProtoMajor = int(maj) - r.req.ProtoMinor = int(min) -} - -//export nxt_go_request_add_header -func nxt_go_request_add_header(c_req C.nxt_go_request_t, name *C.nxt_go_str_t, value *C.nxt_go_str_t) { - r := find_request(c_req) - r.req.Header.Add(C.GoStringN(name.start, name.length), C.GoStringN(value.start, value.length)) -} - -//export nxt_go_request_set_content_length -func nxt_go_request_set_content_length(c_req C.nxt_go_request_t, l C.int64_t) { - find_request(c_req).req.ContentLength = int64(l) -} - -//export nxt_go_request_create_channel -func nxt_go_request_create_channel(c_req C.nxt_go_request_t) { - find_request(c_req).ch = make(chan *cmsg) -} - -//export nxt_go_request_set_host -func nxt_go_request_set_host(c_req C.nxt_go_request_t, host *C.nxt_go_str_t) { - find_request(c_req).req.Host = C.GoStringN(host.start, host.length) -} - -//export nxt_go_request_set_url -func nxt_go_request_set_url(c_req C.nxt_go_request_t, scheme *C.char) { - find_request(c_req).req.URL.Scheme = C.GoString(scheme) -} - -//export nxt_go_request_set_remote_addr -func nxt_go_request_set_remote_addr(c_req C.nxt_go_request_t, addr *C.nxt_go_str_t) { - find_request(c_req).req.RemoteAddr = C.GoStringN(addr.start, addr.length) -} - -//export nxt_go_request_serve -func nxt_go_request_serve(c_req C.nxt_go_request_t) { - r := find_request(c_req) - - go func(r *request) { - http.DefaultServeMux.ServeHTTP(r.response(), &r.req) - r.done() - }(r) -} diff --git a/src/nginext/response.go b/src/nginext/response.go deleted file mode 100644 index dc864f6e..00000000 --- a/src/nginext/response.go +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Copyright (C) Max Romanov - * Copyright (C) NGINX, Inc. - */ - -package nginext - -/* -#include "nxt_go_lib.h" -*/ -import "C" - -import ( - "fmt" - "net/http" - "os" -) - -type response struct { - header http.Header - headerSent bool - req *http.Request - c_req C.nxt_go_request_t -} - -func new_response(c_req C.nxt_go_request_t, req *http.Request) *response { - resp := &response{ - header: http.Header{}, - req: req, - c_req: c_req, - } - - return resp -} - -func (r *response) Header() http.Header { - return r.header -} - -func (r *response) Write(p []byte) (n int, err error) { - if !r.headerSent { - r.WriteHeader(http.StatusOK) - } - - l := C.size_t(len(p)) - b := getCBytes(p) - res := C.nxt_go_response_write(r.c_req, b, l) - C.free(b) - return int(res), nil -} - -func (r *response) WriteHeader(code int) { - if r.headerSent { - // Note: explicitly using Stderr, as Stdout is our HTTP output. - fmt.Fprintf(os.Stderr, "CGI attempted to write header twice") - return - } - r.headerSent = true - fmt.Fprintf(r, "%s %d %s\r\n", r.req.Proto, code, http.StatusText(code)) - - // Set a default Content-Type - if _, hasType := r.header["Content-Type"]; !hasType { - r.header.Add("Content-Type", "text/html; charset=utf-8") - } - - r.header.Write(r) - - r.Write([]byte("\r\n")) -} diff --git a/src/nxt_controller.c b/src/nxt_controller.c index 74142c2e..7305b3d5 100644 --- a/src/nxt_controller.c +++ b/src/nxt_controller.c @@ -1076,7 +1076,7 @@ nxt_controller_response(nxt_task_t *task, nxt_controller_request_t *req, body->mem.free = nxt_cpymem(body->mem.free, "\r\n", 2); size = sizeof("HTTP/1.1 " "\r\n") - 1 + status_line.length - + sizeof("Server: nginext/0.1\r\n") - 1 + + sizeof("Server: unit/" NXT_VERSION "\r\n") - 1 + sizeof("Date: Wed, 31 Dec 1986 16:40:00 GMT\r\n") - 1 + sizeof("Content-Type: application/json\r\n") - 1 + sizeof("Content-Length: " "\r\n") - 1 + NXT_SIZE_T_LEN @@ -1098,7 +1098,7 @@ nxt_controller_response(nxt_task_t *task, nxt_controller_request_t *req, status_line.length); nxt_str_set(&str, "\r\n" - "Server: nginext/0.1\r\n" + "Server: unit/" NXT_VERSION "\r\n" "Date: "); b->mem.free = nxt_cpymem(b->mem.free, str.start, str.length); diff --git a/src/nxt_main.c b/src/nxt_main.c index 6f7aec39..03403991 100644 --- a/src/nxt_main.c +++ b/src/nxt_main.c @@ -16,7 +16,7 @@ main(int argc, char **argv) { nxt_int_t ret; - if (nxt_lib_start("nginext", argv, &environ) != NXT_OK) { + if (nxt_lib_start("unit", argv, &environ) != NXT_OK) { return 1; } @@ -30,7 +30,7 @@ main(int argc, char **argv) return 1; } - nxt_log(&nxt_main_task, NXT_LOG_INFO, "nginext started"); + nxt_log(&nxt_main_task, NXT_LOG_INFO, "unit started"); nxt_event_engine_start(nxt_main_task.thread->engine); diff --git a/src/nxt_main_process.c b/src/nxt_main_process.c index 4043d026..34fc7ea0 100644 --- a/src/nxt_main_process.c +++ b/src/nxt_main_process.c @@ -270,7 +270,7 @@ nxt_main_process_title(nxt_task_t *task) end = title + sizeof(title) - 1; - p = nxt_sprintf(title, end, "nginext: main [%s", nxt_process_argv[0]); + p = nxt_sprintf(title, end, "unit: main [%s", nxt_process_argv[0]); for (i = 1; nxt_process_argv[i] != NULL; i++) { p = nxt_sprintf(p, end, " %s", nxt_process_argv[i]); diff --git a/src/nxt_php_sapi.c b/src/nxt_php_sapi.c index 5655dcbc..cb2f6458 100644 --- a/src/nxt_php_sapi.c +++ b/src/nxt_php_sapi.c @@ -59,7 +59,7 @@ static void nxt_php_flush(void *server_context); static sapi_module_struct nxt_php_sapi_module = { (char *) "cli-server", - (char *) "nginext", + (char *) "unit", nxt_php_startup, /* startup */ php_module_shutdown_wrapper, /* shutdown */ @@ -497,13 +497,13 @@ nxt_php_send_headers(sapi_headers_struct *sapi_headers TSRMLS_DC) static const u_char default_repsonse[] = "HTTP/1.1 200 OK\r\n" - "Server: nginext/0.1\r\n" + "Server: unit/" NXT_VERSION "\r\n" "Content-Type: text/html; charset=UTF-8\r\n" "Connection: close\r\n" "\r\n"; static const u_char default_headers[] - = "Server: nginext/0.1\r\n" + - "Server: unit/" NXT_VERSION "\r\n" "Connection: close\r\n"; static const u_char http_11[] = "HTTP/1.1 "; diff --git a/src/nxt_port_memory.c b/src/nxt_port_memory.c index afb2f4a4..7cdd7c58 100644 --- a/src/nxt_port_memory.c +++ b/src/nxt_port_memory.c @@ -236,7 +236,7 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process, return NULL; } - p = nxt_sprintf(name, name + sizeof(name), "/nginext.%PI.%uxD", + p = nxt_sprintf(name, name + sizeof(name), "/unit.%PI.%uxD", nxt_pid, nxt_random(&task->thread->random)); *p = '\0'; diff --git a/src/nxt_process.c b/src/nxt_process.c index 473e45ea..f7430e5a 100644 --- a/src/nxt_process.c +++ b/src/nxt_process.c @@ -104,7 +104,7 @@ nxt_process_start(nxt_task_t *task, nxt_process_t *process) nxt_log(task, NXT_LOG_INFO, "%s started", init->name); - nxt_process_title(task, "nginext: %s", init->name); + nxt_process_title(task, "unit: %s", init->name); thread = task->thread; diff --git a/src/nxt_python_wsgi.c b/src/nxt_python_wsgi.c index fd718a95..6c188066 100644 --- a/src/nxt_python_wsgi.c +++ b/src/nxt_python_wsgi.c @@ -107,7 +107,7 @@ NXT_EXPORT nxt_application_module_t nxt_app_module = { static PyMethodDef nxt_py_start_resp_method[] = { - {"nginext_start_response", nxt_py_start_resp, METH_VARARGS, ""} + {"unit_start_response", nxt_py_start_resp, METH_VARARGS, ""} }; @@ -121,7 +121,7 @@ static PyMethodDef nxt_py_input_methods[] = { static PyTypeObject nxt_py_input_type = { PyVarObject_HEAD_INIT(NULL, 0) - "nginext._input", /* tp_name */ + "unit._input", /* tp_name */ (int) sizeof(nxt_py_input_t), /* tp_basicsize */ 0, /* tp_itemsize */ (destructor) nxt_py_input_dealloc, /* tp_dealloc */ @@ -140,7 +140,7 @@ static PyTypeObject nxt_py_input_type = { 0, /* tp_setattro */ 0, /* tp_as_buffer */ Py_TPFLAGS_DEFAULT, /* tp_flags */ - "nginext input object.", /* tp_doc */ + "unit input object.", /* tp_doc */ 0, /* tp_traverse */ 0, /* tp_clear */ 0, /* tp_richcompare */ @@ -247,7 +247,7 @@ nxt_python_init(nxt_task_t *task, nxt_common_app_conf_t *conf) nxt_py_environ_ptyp = obj; - obj = Py_BuildValue("[s]", "nginext"); + obj = Py_BuildValue("[s]", "unit"); if (obj == NULL) { nxt_log_alert(task->log, "Python failed to create the \"sys.argv\" list"); @@ -730,7 +730,7 @@ nxt_py_start_resp(PyObject *self, PyObject *args) static const u_char resp[] = "HTTP/1.1 "; static const u_char default_headers[] - = "Server: nginext/0.1\r\n" + = "Server: unit/" NXT_VERSION "\r\n" "Connection: close\r\n"; static const u_char cr_lf[] = "\r\n"; diff --git a/src/nxt_runtime.c b/src/nxt_runtime.c index cdaa72b6..d9dfa3a5 100644 --- a/src/nxt_runtime.c +++ b/src/nxt_runtime.c @@ -763,7 +763,7 @@ nxt_runtime_conf_init(nxt_task_t *task, nxt_runtime_t *rt) slash = "/"; } - ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%snginext.*%Z", + ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%sunit.*%Z", rt->modules, slash); if (nxt_slow_path(ret != NXT_OK)) { return NXT_ERROR; @@ -797,7 +797,7 @@ nxt_runtime_conf_read_cmd(nxt_task_t *task, nxt_runtime_t *rt) u_char buf[1024]; static const char version[] = - "nginext version: " NXT_VERSION "\n" + "unit version: " NXT_VERSION "\n" "configured as ./configure" NXT_CONFIGURE_OPTIONS "\n"; static const char no_control[] = @@ -811,11 +811,11 @@ nxt_runtime_conf_read_cmd(nxt_task_t *task, nxt_runtime_t *rt) static const char help[] = "\n" - "nginext options:\n" + "unit options:\n" "\n" - " --version print nginext version and configure options\n" + " --version print unit version and configure options\n" "\n" - " --no-daemon run nginext in non-daemon mode\n" + " --no-daemon run unit in non-daemon mode\n" "\n" " --control ADDRESS set address of control API socket\n" " default: \"" NXT_CONTROL_SOCK "\"\n" -- cgit