summaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorMax Romanov <max.romanov@nginx.com>2017-06-23 19:20:08 +0300
committerMax Romanov <max.romanov@nginx.com>2017-06-23 19:20:08 +0300
commit4a1b59c27a8e85fc3b03c420fbc1642ce52e96cf (patch)
treec72ab253541c53dd918afc86973192416078fceb /src
parent5a43bd0bfd1eaa60dede7beb3206a53e8d008fa4 (diff)
downloadunit-4a1b59c27a8e85fc3b03c420fbc1642ce52e96cf.tar.gz
unit-4a1b59c27a8e85fc3b03c420fbc1642ce52e96cf.tar.bz2
External Go app request processing.
Diffstat (limited to 'src')
-rw-r--r--src/nginext/cbytes-1.6.go15
-rw-r--r--src/nginext/cbytes-1.7.go15
-rw-r--r--src/nginext/nginext.go127
-rw-r--r--src/nginext/nxt_go_array.c66
-rw-r--r--src/nginext/nxt_go_array.h36
-rw-r--r--src/nginext/nxt_go_lib.c194
-rw-r--r--src/nginext/nxt_go_lib.h40
-rw-r--r--src/nginext/nxt_go_log.h34
-rw-r--r--src/nginext/nxt_go_mutex.h21
-rw-r--r--src/nginext/nxt_go_port.c194
-rw-r--r--src/nginext/nxt_go_port.h18
-rw-r--r--src/nginext/nxt_go_port_memory.c192
-rw-r--r--src/nginext/nxt_go_port_memory.h22
-rw-r--r--src/nginext/nxt_go_process.c150
-rw-r--r--src/nginext/nxt_go_process.h30
-rw-r--r--src/nginext/nxt_go_run_ctx.c397
-rw-r--r--src/nginext/nxt_go_run_ctx.h71
-rw-r--r--src/nginext/port.go161
-rw-r--r--src/nginext/request.go213
-rw-r--r--src/nginext/response.go69
-rw-r--r--src/nxt_go.c205
21 files changed, 2270 insertions, 0 deletions
diff --git a/src/nginext/cbytes-1.6.go b/src/nginext/cbytes-1.6.go
new file mode 100644
index 00000000..7686dad8
--- /dev/null
+++ b/src/nginext/cbytes-1.6.go
@@ -0,0 +1,15 @@
+// +build !go1.7
+
+/*
+ * Copyright (C) Max Romanov
+ * Copyright (C) NGINX, Inc.
+ */
+
+package nginext
+
+import "C"
+import "unsafe"
+
+func getCBytes(p []byte) unsafe.Pointer {
+ return unsafe.Pointer(C.CString(string(p))) // go <= 1.6
+}
diff --git a/src/nginext/cbytes-1.7.go b/src/nginext/cbytes-1.7.go
new file mode 100644
index 00000000..e7d92f04
--- /dev/null
+++ b/src/nginext/cbytes-1.7.go
@@ -0,0 +1,15 @@
+// +build go1.7
+
+/*
+ * Copyright (C) Max Romanov
+ * Copyright (C) NGINX, Inc.
+ */
+
+package nginext
+
+import "C"
+import "unsafe"
+
+func getCBytes(p []byte) unsafe.Pointer {
+ return C.CBytes(p) // go >= 1.7
+}
diff --git a/src/nginext/nginext.go b/src/nginext/nginext.go
new file mode 100644
index 00000000..2bde80a6
--- /dev/null
+++ b/src/nginext/nginext.go
@@ -0,0 +1,127 @@
+/*
+ * Copyright (C) Max Romanov
+ * Copyright (C) NGINX, Inc.
+ */
+
+package nginext
+
+/*
+#include "nxt_go_lib.h"
+*/
+import "C"
+
+import (
+ "fmt"
+ "os"
+ "strconv"
+ "strings"
+ "unsafe"
+)
+
+type cbuf struct {
+ b unsafe.Pointer
+ s C.size_t
+ f bool
+}
+
+func new_cbuf(buf []byte) *cbuf {
+ if len(buf) == 0 {
+ return nil
+ }
+
+ return &cbuf{
+ getCBytes(buf), C.size_t(len(buf)), true,
+ }
+}
+
+func (buf *cbuf) Close() {
+ if buf == nil {
+ return
+ }
+
+ if buf.f && buf.s > 0 {
+ C.free(buf.b)
+ buf.f = false
+ buf.b = nil
+ buf.s = 0
+ }
+}
+
+func (buf *cbuf) GoBytes() []byte {
+ if buf == nil {
+ var b [0]byte
+ return b[:0]
+ }
+
+ return C.GoBytes(buf.b, C.int(buf.s))
+}
+
+type cmsg struct {
+ buf cbuf
+ oob cbuf
+}
+
+func new_cmsg(buf []byte, oob []byte) *cmsg {
+ return &cmsg{
+ buf: cbuf{getCBytes(buf), C.size_t(len(buf)), true},
+ oob: cbuf{getCBytes(oob), C.size_t(len(oob)), true},
+ }
+}
+
+func (msg *cmsg) Close() {
+ msg.buf.Close()
+ msg.oob.Close()
+}
+
+var nxt_go_quit bool = false
+
+//export nxt_go_set_quit
+func nxt_go_set_quit() {
+ nxt_go_quit = true
+}
+
+func ListenAndServe() {
+ var read_port *port
+
+ go_ports_env := os.Getenv("NXT_GO_PORTS")
+ fmt.Printf("NXT_GO_PORTS = %s\n", go_ports_env)
+
+ ports := strings.Split(go_ports_env, ";")
+ pid := os.Getpid()
+
+ for _, port_str := range ports {
+ if len(port_str) <= 0 {
+ continue
+ }
+
+ attrs := strings.Split(port_str, ",")
+ fmt.Printf("Port = %q\n", attrs)
+
+ var attrsN [5]int
+ var err error
+ for i, attr := range attrs {
+ attrsN[i], err = strconv.Atoi(attr)
+ if err != nil {
+ fmt.Printf("err %s\n", err)
+ break
+ }
+ }
+
+ if err != nil {
+ continue
+ }
+
+ p := new_port(attrsN[0], attrsN[1], attrsN[2], attrsN[3], attrsN[4])
+
+ if attrsN[0] == pid {
+ read_port = p
+ }
+ }
+
+ if read_port != nil {
+ for !nxt_go_quit {
+ read_port.read()
+ }
+ }
+
+}
diff --git a/src/nginext/nxt_go_array.c b/src/nginext/nxt_go_array.c
new file mode 100644
index 00000000..a788811a
--- /dev/null
+++ b/src/nginext/nxt_go_array.c
@@ -0,0 +1,66 @@
+
+/*
+ * Copyright (C) Max Romanov
+ * Copyright (C) NGINX, Inc.
+ */
+
+#ifndef NXT_CONFIGURE
+
+#include <stdint.h>
+#include <sys/types.h>
+
+#include <nxt_main.h>
+
+#include "nxt_go_array.h"
+
+void
+nxt_go_array_init(nxt_array_t *array, nxt_uint_t n, size_t size)
+{
+ array->elts = malloc(n * size);
+
+ if (nxt_slow_path(n != 0 && array->elts == NULL)) {
+ return;
+ }
+
+ array->nelts = 0;
+ array->size = size;
+ array->nalloc = n;
+ array->mem_pool = NULL;
+}
+
+void *
+nxt_go_array_add(nxt_array_t *array)
+{
+ void *p;
+ uint32_t nalloc, new_alloc;
+
+ nalloc = array->nalloc;
+
+ if (array->nelts == nalloc) {
+
+ if (nalloc < 16) {
+ /* Allocate new array twice larger than current. */
+ new_alloc = nalloc * 2;
+
+ } else {
+ /* Allocate new array 1.5 times larger than current. */
+ new_alloc = nalloc + nalloc / 2;
+ }
+
+ p = realloc(array->elts, array->size * new_alloc);
+
+ if (nxt_slow_path(p == NULL)) {
+ return NULL;
+ }
+
+ array->elts = p;
+ array->nalloc = new_alloc;
+ }
+
+ p = (char *) array->elts + array->size * array->nelts;
+ array->nelts++;
+
+ return p;
+}
+
+#endif /* NXT_CONFIGURE */
diff --git a/src/nginext/nxt_go_array.h b/src/nginext/nxt_go_array.h
new file mode 100644
index 00000000..0c3ba336
--- /dev/null
+++ b/src/nginext/nxt_go_array.h
@@ -0,0 +1,36 @@
+
+/*
+ * Copyright (C) Max Romanov
+ * Copyright (C) NGINX, Inc.
+ */
+
+#ifndef _NXT_GO_ARRAY_H_INCLUDED_
+#define _NXT_GO_ARRAY_H_INCLUDED_
+
+
+#include <nxt_array.h>
+
+void nxt_go_array_init(nxt_array_t *array, nxt_uint_t n, size_t size);
+
+void *nxt_go_array_add(nxt_array_t *array);
+
+nxt_inline void *
+nxt_go_array_zero_add(nxt_array_t *array)
+{
+ void *p;
+
+ p = nxt_go_array_add(array);
+
+ if (nxt_fast_path(p != NULL)) {
+ nxt_memzero(p, array->size);
+ }
+
+ return p;
+}
+
+#define \
+nxt_go_array_at(array, n) \
+ ((void *) ((char *) (array)->elts + (array)->size * (n)))
+
+
+#endif /* _NXT_GO_ARRAY_H_INCLUDED_ */
diff --git a/src/nginext/nxt_go_lib.c b/src/nginext/nxt_go_lib.c
new file mode 100644
index 00000000..86d5b619
--- /dev/null
+++ b/src/nginext/nxt_go_lib.c
@@ -0,0 +1,194 @@
+
+/*
+ * Copyright (C) Max Romanov
+ * Copyright (C) NGINX, Inc.
+ */
+
+#ifdef NXT_CONFIGURE
+
+#include <stdio.h>
+#include "nxt_go_lib.h"
+
+// Stubs to compile during configure process.
+int
+nxt_go_response_write(nxt_go_request_t r, void *buf, size_t len)
+{
+ return -1;
+}
+
+int
+nxt_go_request_read(nxt_go_request_t r, off_t off, void *dst, size_t dst_len)
+{
+ return -1;
+}
+
+int
+nxt_go_request_read_from(nxt_go_request_t r, off_t off, void *dst,
+ size_t dst_len, void *src, size_t src_len)
+{
+ return -1;
+}
+
+int
+nxt_go_request_close(nxt_go_request_t r)
+{
+ return -1;
+}
+
+int
+nxt_go_request_done(nxt_go_request_t r)
+{
+ return -1;
+}
+
+void
+nxt_go_listen_and_serve()
+{
+}
+
+nxt_go_request_t
+nxt_go_process_port_msg(void *buf, size_t buf_len, void *oob, size_t oob_len)
+{
+ return 0;
+}
+
+#else
+
+#if 0
+
+#include <nxt_runtime.h>
+#include <nxt_master_process.h>
+#include <nxt_application.h>
+
+#include "nxt_go_port.h"
+
+#endif
+
+#include "nxt_go_run_ctx.h"
+#include "nxt_go_log.h"
+#include "nxt_go_port.h"
+
+#include <nxt_main.h>
+#include <nxt_go_gen.h>
+
+int
+nxt_go_response_write(nxt_go_request_t r, void *buf, size_t len)
+{
+ nxt_int_t rc;
+ nxt_go_run_ctx_t *ctx;
+
+ if (nxt_slow_path(r == 0)) {
+ return 0;
+ }
+
+ nxt_go_debug("write: %d %.*s", (int) len, (int) len, (char *) buf);
+
+ ctx = (nxt_go_run_ctx_t *) r;
+ rc = nxt_go_ctx_write(ctx, buf, len);
+
+ return rc == NXT_OK ? len : -1;
+}
+
+
+int
+nxt_go_request_read(nxt_go_request_t r, off_t off, void *dst, size_t dst_len)
+{
+ nxt_go_msg_t *msg;
+ nxt_go_run_ctx_t *ctx;
+ nxt_app_request_body_t *b;
+ nxt_app_request_header_t *h;
+
+ if (nxt_slow_path(r == 0)) {
+ return 0;
+ }
+
+ ctx = (nxt_go_run_ctx_t *) r;
+ b = &ctx->r.body;
+ h = &ctx->r.header;
+
+ if (off >= h->parsed_content_length) {
+ return 0;
+ }
+
+ if (off < b->preread.length) {
+ dst_len = nxt_min(b->preread.length - off, dst_len);
+
+ if (dst_len != 0) {
+ nxt_memcpy(dst, b->preread.start + off, dst_len);
+ }
+
+ return dst_len;
+ }
+
+ /* TODO find msg to read */
+
+ return NXT_AGAIN;
+}
+
+
+int
+nxt_go_request_read_from(nxt_go_request_t r, off_t off, void *dst,
+ size_t dst_len, void *src, size_t src_len)
+{
+ nxt_go_run_ctx_t *ctx;
+
+ if (nxt_slow_path(r == 0)) {
+ return 0;
+ }
+
+ ctx = (nxt_go_run_ctx_t *) r;
+
+ nxt_go_ctx_add_msg(ctx, src, src_len);
+
+ return nxt_go_request_read(r, off, dst, dst_len);
+}
+
+
+int
+nxt_go_request_close(nxt_go_request_t r)
+{
+ return 0;
+}
+
+
+int
+nxt_go_request_done(nxt_go_request_t r)
+{
+ nxt_int_t res;
+ nxt_go_run_ctx_t *ctx;
+ nxt_go_msg_t *msg, *b;
+
+ if (nxt_slow_path(r == 0)) {
+ return 0;
+ }
+
+ ctx = (nxt_go_run_ctx_t *) r;
+
+ res = nxt_go_ctx_flush(ctx, 1);
+
+ nxt_go_ctx_release_msg(ctx, &ctx->msg);
+
+ msg = ctx->msg.next;
+ while (msg != NULL) {
+ nxt_go_ctx_release_msg(ctx, msg);
+
+ b = msg;
+ msg = b->next;
+
+ free(b);
+ }
+
+ free(ctx);
+
+ return res;
+}
+
+
+nxt_go_request_t
+nxt_go_process_port_msg(void *buf, size_t buf_len, void *oob, size_t oob_len)
+{
+ return nxt_go_port_on_read(buf, buf_len, oob, oob_len);
+}
+
+
+#endif /* NXT_CONFIGURE */
diff --git a/src/nginext/nxt_go_lib.h b/src/nginext/nxt_go_lib.h
new file mode 100644
index 00000000..1c7f61c0
--- /dev/null
+++ b/src/nginext/nxt_go_lib.h
@@ -0,0 +1,40 @@
+
+/*
+ * Copyright (C) Max Romanov
+ * Copyright (C) NGINX, Inc.
+ */
+
+#ifndef _NXT_GO_LIB_H_INCLUDED_
+#define _NXT_GO_LIB_H_INCLUDED_
+
+
+#include <stdint.h>
+#include <stdlib.h>
+#include <sys/types.h>
+
+typedef struct {
+ int length;
+ char *start;
+} nxt_go_str_t;
+
+typedef uintptr_t nxt_go_request_t;
+
+int nxt_go_response_write(nxt_go_request_t r, void *buf, size_t len);
+
+int nxt_go_request_read(nxt_go_request_t r, off_t off, void *dst,
+ size_t dst_len);
+
+int nxt_go_request_read_from(nxt_go_request_t r, off_t off, void *dst,
+ size_t dst_len, void *src, size_t src_len);
+
+int nxt_go_request_close(nxt_go_request_t r);
+
+int nxt_go_request_done(nxt_go_request_t r);
+
+void nxt_go_listen_and_serve();
+
+nxt_go_request_t nxt_go_process_port_msg(void *buf, size_t buf_len,
+ void *oob, size_t oob_len);
+
+
+#endif /* _NXT_GO_LIB_H_INCLUDED_ */
diff --git a/src/nginext/nxt_go_log.h b/src/nginext/nxt_go_log.h
new file mode 100644
index 00000000..f1291716
--- /dev/null
+++ b/src/nginext/nxt_go_log.h
@@ -0,0 +1,34 @@
+
+/*
+ * Copyright (C) Max Romanov
+ * Copyright (C) NGINX, Inc.
+ */
+
+#ifndef _NXT_GO_LOG_H_INCLUDED_
+#define _NXT_GO_LOG_H_INCLUDED_
+
+
+#include <stdio.h>
+#include <pthread.h>
+
+#include <nxt_auto_config.h>
+
+#if (NXT_DEBUG)
+
+#define nxt_go_debug(fmt, ARGS...) \
+ fprintf(stdout, "go debug[%p]: " fmt "\n", (void *) pthread_self(), ##ARGS)
+
+#else
+
+#define nxt_go_debug(fmt, ARGS...)
+
+#endif
+
+#define nxt_go_warn(fmt, ARGS...) \
+ fprintf(stdout, "go warn: " fmt "\n", ##ARGS)
+
+#define nxt_go_error(fmt, ARGS...) \
+ fprintf(stdout, "go error: " fmt "\n", ##ARGS)
+
+
+#endif /* _NXT_GO_LOG_H_INCLUDED_ */
diff --git a/src/nginext/nxt_go_mutex.h b/src/nginext/nxt_go_mutex.h
new file mode 100644
index 00000000..98bd27f0
--- /dev/null
+++ b/src/nginext/nxt_go_mutex.h
@@ -0,0 +1,21 @@
+
+/*
+ * Copyright (C) Max Romanov
+ * Copyright (C) NGINX, Inc.
+ */
+
+#ifndef _NXT_GO_MUTEX_H_INCLUDED_
+#define _NXT_GO_MUTEX_H_INCLUDED_
+
+
+#include <pthread.h>
+
+typedef pthread_mutex_t nxt_go_mutex_t;
+
+#define nxt_go_mutex_create(mutex) pthread_mutex_init(mutex, NULL)
+#define nxt_go_mutex_destroy(mutex) pthread_mutex_destroy(mutex)
+#define nxt_go_mutex_lock(mutex) pthread_mutex_lock(mutex)
+#define nxt_go_mutex_unlock(mutex) pthread_mutex_unlock(mutex)
+
+
+#endif /* _NXT_GO_MUTEX_H_INCLUDED_ */
diff --git a/src/nginext/nxt_go_port.c b/src/nginext/nxt_go_port.c
new file mode 100644
index 00000000..00d13a38
--- /dev/null
+++ b/src/nginext/nxt_go_port.c
@@ -0,0 +1,194 @@
+
+/*
+ * Copyright (C) Max Romanov
+ * Copyright (C) NGINX, Inc.
+ */
+
+#ifndef NXT_CONFIGURE
+
+
+#include "nxt_go_port.h"
+#include "nxt_go_log.h"
+#include "nxt_go_process.h"
+#include "nxt_go_run_ctx.h"
+
+#include <nxt_main.h>
+#include <nxt_go_gen.h>
+
+
+#define nxt_go_str(p) ((nxt_go_str_t *)(p))
+
+static nxt_go_request_t
+nxt_go_data_handler(nxt_port_msg_t *port_msg, size_t size)
+{
+ size_t s;
+ nxt_str_t n, v;
+ nxt_int_t rc;
+ nxt_uint_t i;
+ nxt_go_run_ctx_t *ctx;
+ nxt_go_request_t r;
+ nxt_app_request_header_t *h;
+
+ r = nxt_go_find_request(port_msg->stream);
+ if (r != 0) {
+ return r;
+ }
+
+ ctx = malloc(sizeof(nxt_go_run_ctx_t));
+ nxt_go_ctx_init(ctx, port_msg, size - sizeof(nxt_port_msg_t));
+
+ r = (nxt_go_request_t)(ctx);
+ h = &ctx->r.header;
+
+ nxt_go_ctx_read_str(ctx, &h->method);
+ nxt_go_ctx_read_str(ctx, &h->path);
+ h->path_no_query = h->path;
+
+ nxt_go_ctx_read_size(ctx, &s);
+ if (s > 0) {
+ s--;
+ h->query.start = h->path.start + s;
+ h->query.length = h->path.length - s;
+
+ if (s > 0) {
+ h->path_no_query.length = s - 1;
+ }
+ }
+
+ nxt_go_new_request(r, port_msg->stream, nxt_go_str(&h->method),
+ nxt_go_str(&h->path));
+
+ nxt_go_ctx_read_str(ctx, &h->version);
+
+ nxt_go_request_set_proto(r, nxt_go_str(&h->version),
+ h->version.start[5] - '0',
+ h->version.start[7] - '0');
+
+ nxt_go_ctx_read_str(ctx, &h->host);
+ nxt_go_ctx_read_str(ctx, &h->cookie);
+ nxt_go_ctx_read_str(ctx, &h->content_type);
+ nxt_go_ctx_read_str(ctx, &h->content_length);
+
+ if (h->host.start != NULL) {
+ nxt_go_request_set_host(r, nxt_go_str(&h->host));
+ }
+
+ nxt_go_ctx_read_size(ctx, &s);
+ h->parsed_content_length = s;
+
+ do {
+ rc = nxt_go_ctx_read_str(ctx, &n);
+ rc = nxt_go_ctx_read_str(ctx, &v);
+
+ if (n.length == 0) {
+ break;
+ }
+
+ nxt_go_request_add_header(r, nxt_go_str(&n), nxt_go_str(&v));
+ } while(1);
+
+ ctx->r.body.preread = v;
+
+ if (h->parsed_content_length > 0) {
+ nxt_go_request_set_content_length(r, h->parsed_content_length);
+ }
+
+ if (v.length < h->parsed_content_length) {
+ nxt_go_request_create_channel(r);
+ }
+
+ nxt_go_request_serve(r);
+
+ return r;
+}
+
+nxt_go_request_t
+nxt_go_port_on_read(void *buf, size_t buf_size, void *oob, size_t oob_size)
+{
+ void *buf_end;
+ void *payload;
+ size_t payload_size;
+ nxt_fd_t fd;
+ struct cmsghdr *cm;
+ nxt_port_msg_t *port_msg;
+ nxt_port_msg_new_port_t *new_port_msg;
+
+ fd = -1;
+ nxt_go_debug("on read: %d (%d)", (int)buf_size, (int)oob_size);
+
+ cm = oob;
+ if (oob_size >= CMSG_SPACE(sizeof(int))
+ && cm->cmsg_len == CMSG_LEN(sizeof(int))
+ && cm->cmsg_level == SOL_SOCKET
+ && cm->cmsg_type == SCM_RIGHTS) {
+
+ nxt_memcpy(&fd, CMSG_DATA(cm), sizeof(int));
+ nxt_go_debug("fd = %d", fd);
+ }
+
+ port_msg = buf;
+ if (buf_size < sizeof(nxt_port_msg_t)) {
+ nxt_go_warn("message too small (%d bytes)", (int)buf_size);
+ goto fail;
+ }
+
+ buf_end = ((char *)buf) + buf_size;
+
+ payload = port_msg + 1;
+ payload_size = buf_size - sizeof(nxt_port_msg_t);
+
+ if (port_msg->mmap) {
+ nxt_go_debug("using data in shared memory");
+ }
+
+ if (port_msg->type > NXT_PORT_MSG_MAX) {
+ nxt_go_warn("unknown message type (%d)", (int)port_msg->type);
+ goto fail;
+ }
+
+ switch (port_msg->type) {
+ case NXT_PORT_MSG_QUIT:
+ nxt_go_debug("quit");
+
+ nxt_go_set_quit();
+ break;
+
+ case NXT_PORT_MSG_NEW_PORT:
+ nxt_go_debug("new port");
+ new_port_msg = payload;
+
+ nxt_go_new_port(new_port_msg->pid, new_port_msg->id, new_port_msg->type,
+ -1, fd);
+ break;
+
+ case NXT_PORT_MSG_CHANGE_FILE:
+ nxt_go_debug("change file");
+ break;
+
+ case NXT_PORT_MSG_MMAP:
+ nxt_go_debug("mmap");
+
+ nxt_go_new_incoming_mmap(port_msg->pid, fd);
+ break;
+
+ case NXT_PORT_MSG_DATA:
+ nxt_go_debug("data");
+
+ return nxt_go_data_handler(port_msg, buf_size);
+
+ default:
+ goto fail;
+ }
+
+
+fail:
+
+ if (fd != -1) {
+ close(fd);
+ }
+
+ return 0;
+}
+
+
+#endif /* NXT_CONFIGURE */
diff --git a/src/nginext/nxt_go_port.h b/src/nginext/nxt_go_port.h
new file mode 100644
index 00000000..ce9dbcc3
--- /dev/null
+++ b/src/nginext/nxt_go_port.h
@@ -0,0 +1,18 @@
+
+/*
+ * Copyright (C) Max Romanov
+ * Copyright (C) NGINX, Inc.
+ */
+
+#ifndef _NXT_GO_PORT_H_INCLUDED_
+#define _NXT_GO_PORT_H_INCLUDED_
+
+
+#include <sys/types.h>
+#include "nxt_go_lib.h"
+
+nxt_go_request_t
+nxt_go_port_on_read(void *buf, size_t buf_size, void *oob, size_t oob_size);
+
+
+#endif /* _NXT_GO_PORT_H_INCLUDED_ */
diff --git a/src/nginext/nxt_go_port_memory.c b/src/nginext/nxt_go_port_memory.c
new file mode 100644
index 00000000..7e3f7a1a
--- /dev/null
+++ b/src/nginext/nxt_go_port_memory.c
@@ -0,0 +1,192 @@
+
+/*
+ * Copyright (C) Max Romanov
+ * Copyright (C) NGINX, Inc.
+ */
+
+#ifndef NXT_CONFIGURE
+
+
+#include "nxt_go_port_memory.h"
+#include "nxt_go_process.h"
+#include "nxt_go_array.h"
+#include "nxt_go_log.h"
+
+#include <nxt_go_gen.h>
+#include <nxt_main.h>
+
+#if (NXT_HAVE_MEMFD_CREATE)
+
+#include <linux/memfd.h>
+#include <unistd.h>
+#include <sys/syscall.h>
+
+#endif
+
+
+static nxt_port_mmap_header_t *
+nxt_go_new_port_mmap(nxt_go_process_t *process, nxt_port_id_t id)
+{
+ int name_len, rc;
+ void *mem;
+ char name[64];
+ nxt_fd_t fd;
+ nxt_port_msg_t port_msg;
+ nxt_port_mmap_t *port_mmap;
+ nxt_port_mmap_header_t *hdr;
+
+ fd = -1;
+
+ union {
+ struct cmsghdr cm;
+ char space[CMSG_SPACE(sizeof(int))];
+ } cmsg;
+
+ port_mmap = nxt_go_array_zero_add(&process->outgoing);
+ if (nxt_slow_path(port_mmap == NULL)) {
+ nxt_go_warn("failed to add port mmap to outgoing array");
+
+ return NULL;
+ }
+
+ name_len = snprintf(name, sizeof(name) - 1, "/nginext.go.%p", name);
+
+#if (NXT_HAVE_MEMFD_CREATE)
+ fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC);
+
+ if (nxt_slow_path(fd == -1)) {
+ nxt_go_warn("memfd_create(%s) failed %d", name, errno);
+
+ goto remove_fail;
+ }
+
+ nxt_go_debug("memfd_create(%s): %d", name, fd);
+
+#elif (NXT_HAVE_SHM_OPEN)
+ shm_unlink((char *) name); // just in case
+
+ fd = shm_open((char *) name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR);
+
+ nxt_go_debug("shm_open(%s): %d", name, fd);
+
+ if (nxt_slow_path(fd == -1)) {
+ nxt_go_warn("shm_open(%s) failed %d", name, errno);
+
+ goto remove_fail;
+ }
+
+ if (nxt_slow_path(shm_unlink((char *) name) == -1)) {
+ nxt_go_warn("shm_unlink(%s) failed %d", name, errno);
+ }
+#endif
+
+ if (nxt_slow_path(ftruncate(fd, PORT_MMAP_SIZE) == -1)) {
+ nxt_go_warn("ftruncate() failed %d", errno);
+
+ goto remove_fail;
+ }
+
+ mem = mmap(NULL, PORT_MMAP_SIZE,
+ PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
+
+ if (nxt_slow_path(mem == MAP_FAILED)) {
+ goto remove_fail;
+ }
+
+ port_mmap->hdr = mem;
+
+ /* Init segment header. */
+ hdr = port_mmap->hdr;
+
+ memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map));
+
+ hdr->id = process->outgoing.nelts - 1;
+ hdr->pid = process->pid;
+
+ /* Mark first chunk as busy */
+ nxt_port_mmap_set_chunk_busy(hdr, 0);
+
+ /* Mark as busy chunk followed the last available chunk. */
+ nxt_port_mmap_set_chunk_busy(hdr, PORT_MMAP_CHUNK_COUNT);
+
+ port_msg.stream = 0;
+ port_msg.pid = getpid();
+ port_msg.reply_port = 0;
+ port_msg.type = NXT_PORT_MSG_MMAP;
+ port_msg.last = 0;
+ port_msg.mmap = 0;
+
+ cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int));
+ cmsg.cm.cmsg_level = SOL_SOCKET;
+ cmsg.cm.cmsg_type = SCM_RIGHTS;
+
+ /*
+ * nxt_memcpy() is used instead of simple
+ * *(int *) CMSG_DATA(&cmsg.cm) = fd;
+ * because GCC 4.4 with -O2/3/s optimization may issue a warning:
+ * dereferencing type-punned pointer will break strict-aliasing rules
+ *
+ * Fortunately, GCC with -O1 compiles this nxt_memcpy()
+ * in the same simple assignment as in the code above.
+ */
+ memcpy(CMSG_DATA(&cmsg.cm), &fd, sizeof(int));
+
+ rc = nxt_go_port_send(hdr->pid, id, &port_msg, sizeof(port_msg),
+ &cmsg, sizeof(cmsg));
+
+ nxt_go_debug("new mmap #%d created for %d -> %d",
+ (int)hdr->id, (int)getpid(), (int)process->pid);
+
+ close(fd);
+
+ return hdr;
+
+remove_fail:
+
+ if (fd != -1) {
+ close(fd);
+ }
+
+ process->outgoing.nelts--;
+
+ return NULL;
+}
+
+nxt_port_mmap_header_t *
+nxt_go_port_mmap_get(nxt_go_process_t *process, nxt_port_id_t port_id,
+ nxt_chunk_id_t *c)
+{
+ nxt_port_mmap_t *port_mmap;
+ nxt_port_mmap_t *end_port_mmap;
+ nxt_port_mmap_header_t *hdr;
+
+ port_mmap = NULL;
+ hdr = NULL;
+
+ nxt_go_mutex_lock(&process->outgoing_mutex);
+
+ port_mmap = process->outgoing.elts;
+ end_port_mmap = port_mmap + process->outgoing.nelts;
+
+ while (port_mmap < end_port_mmap) {
+
+ if (nxt_port_mmap_get_free_chunk(port_mmap->hdr, c)) {
+ hdr = port_mmap->hdr;
+
+ goto unlock_return;
+ }
+
+ port_mmap++;
+ }
+
+ hdr = nxt_go_new_port_mmap(process, port_id);
+
+unlock_return:
+
+ nxt_go_mutex_unlock(&process->outgoing_mutex);
+
+ return hdr;
+}
+
+
+#endif /* NXT_CONFIGURE */
diff --git a/src/nginext/nxt_go_port_memory.h b/src/nginext/nxt_go_port_memory.h
new file mode 100644
index 00000000..0a1644ef
--- /dev/null
+++ b/src/nginext/nxt_go_port_memory.h
@@ -0,0 +1,22 @@
+
+/*
+ * Copyright (C) Max Romanov
+ * Copyright (C) NGINX, Inc.
+ */
+
+#ifndef _NXT_GO_PORT_MEMORY_H_INCLUDED_
+#define _NXT_GO_PORT_MEMORY_H_INCLUDED_
+
+
+#include <nxt_main.h>
+#include <nxt_port_memory_int.h>
+
+typedef struct nxt_port_mmap_header_s nxt_port_mmap_header_t;
+typedef struct nxt_go_process_s nxt_go_process_t;
+
+nxt_port_mmap_header_t *
+nxt_go_port_mmap_get(nxt_go_process_t *process, nxt_port_id_t port_id,
+ nxt_chunk_id_t *c);
+
+
+#endif /* _NXT_GO_PORT_MEMORY_H_INCLUDED_ */
diff --git a/src/nginext/nxt_go_process.c b/src/nginext/nxt_go_process.c
new file mode 100644
index 00000000..c7ce052d
--- /dev/null
+++ b/src/nginext/nxt_go_process.c
@@ -0,0 +1,150 @@
+
+/*
+ * Copyright (C) Max Romanov
+ * Copyright (C) NGINX, Inc.
+ */
+
+#ifndef NXT_CONFIGURE
+
+
+#include "nxt_go_process.h"
+#include "nxt_go_array.h"
+#include "nxt_go_mutex.h"
+#include "nxt_go_log.h"
+
+#include <nxt_port_memory_int.h>
+
+
+static nxt_array_t processes; /* of nxt_go_process_t */
+
+static nxt_go_process_t *
+nxt_go_find_process(nxt_pid_t pid, uint32_t *pos)
+{
+ uint32_t l, r, i;
+ nxt_go_process_t *process;
+
+ if (nxt_slow_path(processes.size == 0)) {
+ nxt_go_array_init(&processes, 1, sizeof(nxt_go_process_t));
+ }
+
+ l = 0;
+ r = processes.nelts;
+ i = (l + r) / 2;
+
+ while (r > l) {
+ process = nxt_go_array_at(&processes, i);
+
+ nxt_go_debug("compare process #%d (%p) at %d",
+ (int)process->pid, process, (int)i);
+
+ if (pid == process->pid) {
+ nxt_go_debug("found process %d at %d", (int)pid, (int)i);
+
+ if (pos != NULL) {
+ *pos = i;
+ }
+
+ return process;
+ }
+
+ if (pid < process->pid) {
+ r = i;
+ } else {
+ l = i + 1;
+ }
+
+ i = (l + r) / 2;
+ }
+
+ if (pos != NULL) {
+ *pos = i;
+ }
+
+ nxt_go_debug("process %d not found, best pos %d", (int)pid, (int)i);
+
+ return NULL;
+}
+
+
+nxt_go_process_t *
+nxt_go_get_process(nxt_pid_t pid)
+{
+ uint32_t pos;
+ nxt_go_process_t *process;
+
+ process = nxt_go_find_process(pid, &pos);
+
+ if (process == NULL) {
+ nxt_go_array_add(&processes);
+ process = nxt_go_array_at(&processes, pos);
+
+ nxt_go_debug("init process #%d (%p) at %d", (int)pid, process,
+ (int)pos);
+
+ if (pos < processes.nelts - 1) {
+ memmove(process + 1, process,
+ processes.size * (processes.nelts - 1 - pos));
+ }
+
+ process->pid = pid;
+ nxt_go_mutex_create(&process->incoming_mutex);
+ nxt_go_array_init(&process->incoming, 1, sizeof(nxt_port_mmap_t));
+ nxt_go_mutex_create(&process->outgoing_mutex);
+ nxt_go_array_init(&process->outgoing, 1, sizeof(nxt_port_mmap_t));
+ }
+
+ return process;
+}
+
+
+void
+nxt_go_new_incoming_mmap(nxt_pid_t pid, nxt_fd_t fd)
+{
+ void *mem;
+ struct stat mmap_stat;
+ nxt_port_mmap_t *port_mmap;
+ nxt_go_process_t *process;
+
+ process = nxt_go_get_process(pid);
+
+ nxt_go_debug("got new mmap fd #%d from process %d",
+ (int)fd, (int)pid);
+
+ if (fstat(fd, &mmap_stat) == -1) {
+ nxt_go_warn("fstat(%d) failed %d", (int)fd, errno);
+
+ return;
+ }
+
+ nxt_go_mutex_lock(&process->incoming_mutex);
+
+ port_mmap = nxt_go_array_zero_add(&process->incoming);
+ if (nxt_slow_path(port_mmap == NULL)) {
+ nxt_go_warn("failed to add mmap to incoming array");
+
+ goto fail;
+ }
+
+ mem = mmap(NULL, mmap_stat.st_size,
+ PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
+
+ if (nxt_slow_path(mem == MAP_FAILED)) {
+ nxt_go_warn("mmap() failed %d", errno);
+
+ goto fail;
+ }
+
+ port_mmap->hdr = mem;
+
+ if (nxt_slow_path(port_mmap->hdr->id != process->incoming.nelts - 1)) {
+ nxt_go_warn("port mmap id mismatch (%d != %d)",
+ port_mmap->hdr->id, process->incoming.nelts - 1);
+ }
+
+fail:
+
+ nxt_go_mutex_unlock(&process->incoming_mutex);
+}
+
+
+#endif /* NXT_CONFIGURE */
diff --git a/src/nginext/nxt_go_process.h b/src/nginext/nxt_go_process.h
new file mode 100644
index 00000000..5e31fa6e
--- /dev/null
+++ b/src/nginext/nxt_go_process.h
@@ -0,0 +1,30 @@
+
+/*
+ * Copyright (C) Max Romanov
+ * Copyright (C) NGINX, Inc.
+ */
+
+#ifndef _NXT_GO_PROCESS_H_INCLUDED_
+#define _NXT_GO_PROCESS_H_INCLUDED_
+
+
+#include <nxt_main.h>
+#include "nxt_go_mutex.h"
+
+typedef struct nxt_go_process_s nxt_go_process_t;
+
+struct nxt_go_process_s {
+ nxt_pid_t pid;
+ nxt_go_mutex_t incoming_mutex;
+ nxt_array_t incoming; /* of nxt_port_mmap_t */
+ nxt_go_mutex_t outgoing_mutex;
+ nxt_array_t outgoing; /* of nxt_port_mmap_t */
+};
+
+nxt_go_process_t *nxt_go_get_process(nxt_pid_t pid);
+
+void nxt_go_new_incoming_mmap(nxt_pid_t pid, nxt_fd_t fd);
+
+
+#endif /* _NXT_GO_PROCESS_H_INCLUDED_ */
+
diff --git a/src/nginext/nxt_go_run_ctx.c b/src/nginext/nxt_go_run_ctx.c
new file mode 100644
index 00000000..6a5f8d6b
--- /dev/null
+++ b/src/nginext/nxt_go_run_ctx.c
@@ -0,0 +1,397 @@
+
+/*
+ * Copyright (C) Max Romanov
+ * Copyright (C) NGINX, Inc.
+ */
+
+#ifndef NXT_CONFIGURE
+
+
+#include "nxt_go_run_ctx.h"
+#include "nxt_go_log.h"
+#include "nxt_go_process.h"
+#include "nxt_go_array.h"
+#include "nxt_go_mutex.h"
+#include "nxt_go_port_memory.h"
+
+#include <nxt_port_memory_int.h>
+#include <nxt_main.h>
+#include <nxt_go_gen.h>
+
+
+static nxt_int_t
+nxt_go_ctx_msg_rbuf(nxt_go_run_ctx_t *ctx, nxt_go_msg_t *msg, nxt_buf_t *buf,
+ uint32_t n)
+{
+ size_t nchunks;
+ nxt_port_mmap_t *port_mmap;
+ nxt_port_mmap_msg_t *mmap_msg;
+
+ if (nxt_slow_path(msg->mmap_msg == NULL)) {
+ if (n > 0) {
+ nxt_go_warn("failed to get plain buf #%d", (int)n);
+
+ return NXT_ERROR;
+ }
+
+ buf->mem.start = (u_char *) (msg->port_msg + 1);
+ buf->mem.pos = buf->mem.start;
+ buf->mem.end = buf->mem.start + msg->raw_size;
+ buf->mem.free = buf->mem.end;
+
+ return NXT_OK;
+ }
+
+ mmap_msg = msg->mmap_msg + n;
+ if (nxt_slow_path(mmap_msg >= msg->end)) {
+ nxt_go_warn("no more data in shm #%d", (int)n);
+
+ return NXT_ERROR;
+ }
+
+ if (nxt_slow_path(mmap_msg->mmap_id >= ctx->process->incoming.nelts)) {
+ nxt_go_warn("incoming shared memory segment #%d not found "
+ "for process %d", (int)mmap_msg->mmap_id,
+ (int)msg->port_msg->pid);
+
+ return NXT_ERROR;
+ }
+
+ nxt_go_mutex_lock(&ctx->process->incoming_mutex);
+
+ port_mmap = nxt_go_array_at(&ctx->process->incoming, mmap_msg->mmap_id);
+ buf->mem.start = nxt_port_mmap_chunk_start(port_mmap->hdr,
+ mmap_msg->chunk_id);
+ buf->mem.pos = buf->mem.start;
+ buf->mem.free = buf->mem.start + mmap_msg->size;
+
+ nxt_go_mutex_unlock(&ctx->process->incoming_mutex);
+
+ nchunks = mmap_msg->size / PORT_MMAP_CHUNK_SIZE;
+ if ((mmap_msg->size % PORT_MMAP_CHUNK_SIZE) != 0) {
+ nchunks++;
+ }
+
+ buf->mem.end = buf->mem.start + nchunks * PORT_MMAP_CHUNK_SIZE;
+
+ return NXT_OK;
+}
+
+static nxt_int_t
+nxt_go_ctx_init_rbuf(nxt_go_run_ctx_t *ctx)
+{
+ return nxt_go_ctx_msg_rbuf(ctx, &ctx->msg, &ctx->rbuf, ctx->nrbuf);
+}
+
+static void
+nxt_go_ctx_init_msg(nxt_go_msg_t *msg, nxt_port_msg_t *port_msg,
+ size_t payload_size)
+{
+ nxt_port_mmap_msg_t *mmap_msg;
+
+ memset(msg, 0, sizeof(nxt_go_msg_t));
+
+ msg->port_msg = port_msg;
+ msg->raw_size = payload_size;
+
+ if (nxt_fast_path(port_msg->mmap != 0)) {
+ msg->mmap_msg = (nxt_port_mmap_msg_t *) (port_msg + 1);
+ msg->end = (nxt_port_mmap_msg_t *) ( (char *) msg->mmap_msg +
+ payload_size );
+
+ mmap_msg = msg->mmap_msg;
+ while(mmap_msg < msg->end) {
+ msg->data_size += mmap_msg->size;
+ mmap_msg += 1;
+ }
+ } else {
+ msg->mmap_msg = NULL;
+ msg->end = NULL;
+ msg->data_size = payload_size;
+ }
+}
+
+void
+nxt_go_ctx_release_msg(nxt_go_run_ctx_t *ctx, nxt_go_msg_t *msg)
+{
+ u_char *b, *e;
+ nxt_chunk_id_t c;
+ nxt_port_mmap_t *port_mmap;
+ nxt_port_mmap_msg_t *mmap_msg, *end;
+
+ if (nxt_slow_path(msg->mmap_msg == NULL)) {
+ return;
+ }
+
+ mmap_msg = msg->mmap_msg;
+ end = msg->end;
+
+ nxt_go_mutex_lock(&ctx->process->incoming_mutex);
+
+ for (; mmap_msg < end; mmap_msg++ ) {
+ port_mmap = nxt_go_array_at(&ctx->process->incoming, mmap_msg->mmap_id);
+
+ c = mmap_msg->chunk_id;
+ b = nxt_port_mmap_chunk_start(port_mmap->hdr, c);
+ e = b + mmap_msg->size;
+
+ while (b < e) {
+ nxt_port_mmap_set_chunk_free(port_mmap->hdr, c);
+
+ b += PORT_MMAP_CHUNK_SIZE;
+ c++;
+ }
+ }
+
+ nxt_go_mutex_unlock(&ctx->process->incoming_mutex);
+}
+
+
+nxt_int_t
+nxt_go_ctx_init(nxt_go_run_ctx_t *ctx, nxt_port_msg_t *port_msg,
+ size_t payload_size)
+{
+ memset(ctx, 0, sizeof(nxt_go_run_ctx_t));
+
+ ctx->process = nxt_go_get_process(port_msg->pid);
+ if (nxt_slow_path(ctx->process == NULL)) {
+ nxt_go_warn("failed to get process %d", port_msg->pid);
+
+ return NXT_ERROR;
+ }
+
+ nxt_go_ctx_init_msg(&ctx->msg, port_msg, payload_size);
+
+ ctx->msg_last = &ctx->msg;
+
+ ctx->wport_msg.stream = port_msg->stream;
+ ctx->wport_msg.pid = getpid();
+ ctx->wport_msg.type = NXT_PORT_MSG_DATA;
+ ctx->wport_msg.mmap = 1;
+
+ return nxt_go_ctx_init_rbuf(ctx);
+}
+
+
+void
+nxt_go_ctx_add_msg(nxt_go_run_ctx_t *ctx, nxt_port_msg_t *port_msg, size_t size)
+{
+ nxt_go_msg_t *msg;
+
+ msg = malloc(sizeof(nxt_go_msg_t));
+
+ nxt_go_ctx_init_msg(msg, port_msg, size - sizeof(nxt_port_msg_t));
+
+ msg->start_offset = ctx->msg_last->start_offset;
+
+ if (ctx->msg_last == &ctx->msg) {
+ msg->start_offset += ctx->r.body.preread.length;
+ } else {
+ msg->start_offset += ctx->msg_last->data_size;
+ }
+
+ ctx->msg_last->next = msg;
+ ctx->msg_last = msg;
+}
+
+
+nxt_int_t
+nxt_go_ctx_flush(nxt_go_run_ctx_t *ctx, int last)
+{
+ nxt_int_t rc;
+
+ if (last != 0) {
+ ctx->wport_msg.last = 1;
+ }
+
+ nxt_go_debug("flush buffers (%d)", last);
+
+ rc = nxt_go_port_send(ctx->msg.port_msg->pid, ctx->msg.port_msg->reply_port,
+ &ctx->wport_msg, sizeof(nxt_port_msg_t) +
+ ctx->nwbuf * sizeof(nxt_port_mmap_msg_t), NULL, 0);
+
+ ctx->nwbuf = 0;
+
+ memset(&ctx->wbuf, 0, sizeof(ctx->wbuf));
+
+ return rc;
+}
+
+
+nxt_int_t
+nxt_go_ctx_write(nxt_go_run_ctx_t *ctx, void *data, size_t len)
+{
+ size_t nchunks;
+ nxt_buf_t *buf;
+ nxt_chunk_id_t c;
+ nxt_port_mmap_t *port_mmap;
+ nxt_port_mmap_msg_t *mmap_msg;
+ nxt_port_mmap_header_t *hdr;
+
+ buf = &ctx->wbuf;
+
+ if (ctx->nwbuf > 0 && nxt_buf_mem_free_size(&buf->mem) >= len) {
+ memcpy(buf->mem.free, data, len);
+ buf->mem.free += len;
+
+ mmap_msg = ctx->wmmap_msg + ctx->nwbuf - 1;
+ mmap_msg->size += len;
+
+ return NXT_OK;
+ }
+
+ if (ctx->nwbuf >= 8) {
+ nxt_go_ctx_flush(ctx, 0);
+ }
+
+ c = 0;
+
+ hdr = nxt_go_port_mmap_get(ctx->process,
+ ctx->msg.port_msg->reply_port, &c);
+ if (nxt_slow_path(hdr == NULL)) {
+ nxt_go_warn("failed to get port_mmap");
+
+ return NXT_ERROR;
+ }
+
+ buf->mem.start = nxt_port_mmap_chunk_start(hdr, c);
+ buf->mem.pos = buf->mem.start;
+ buf->mem.free = buf->mem.start;
+ buf->mem.end = buf->mem.start + PORT_MMAP_CHUNK_SIZE;
+
+ mmap_msg = ctx->wmmap_msg + ctx->nwbuf;
+ mmap_msg->mmap_id = hdr->id;
+ mmap_msg->chunk_id = c;
+
+ nchunks = len / PORT_MMAP_CHUNK_SIZE;
+ if ((len % PORT_MMAP_CHUNK_SIZE) != 0 || nchunks == 0) {
+ nchunks++;
+ }
+
+ c++;
+ nchunks--;
+
+ /* Try to acquire as much chunks as required. */
+ while (nchunks > 0) {
+
+ if (nxt_port_mmap_get_chunk_busy(hdr, c)) {
+ break;
+ }
+ nxt_port_mmap_set_chunk_busy(hdr, c);
+
+ buf->mem.end += PORT_MMAP_CHUNK_SIZE;
+ c++;
+ nchunks--;
+ }
+
+ if (nxt_buf_mem_free_size(&buf->mem) < len) {
+ len = nxt_buf_mem_free_size(&buf->mem);
+
+ }
+
+ memcpy(buf->mem.free, data, len);
+ buf->mem.free += len;
+
+ mmap_msg->size = len;
+
+ ctx->nwbuf++;
+
+ return NXT_OK;
+}
+
+
+static nxt_int_t
+nxt_go_ctx_read_size_(nxt_go_run_ctx_t *ctx, size_t *size)
+{
+ nxt_buf_t *buf;
+ nxt_int_t rc;
+
+ do {
+ buf = &ctx->rbuf;
+
+ if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) < 1)) {
+ if (nxt_fast_path(nxt_buf_mem_used_size(&buf->mem) == 0)) {
+
+ ctx->nrbuf++;
+ rc = nxt_go_ctx_init_rbuf(ctx);
+ if (nxt_slow_path(rc != NXT_OK)) {
+ nxt_go_warn("read size: init rbuf failed");
+ return rc;
+ }
+
+ continue;
+ }
+ nxt_go_warn("read size: used size is not 0");
+ return NXT_ERROR;
+ }
+
+ if (buf->mem.pos[0] >= 128) {
+ if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) < 4)) {
+ nxt_go_warn("read size: used size < 4");
+ return NXT_ERROR;
+ }
+ }
+
+ break;
+ } while (1);
+
+ buf->mem.pos = nxt_app_msg_read_length(buf->mem.pos, size);
+
+ return NXT_OK;
+}
+
+nxt_int_t
+nxt_go_ctx_read_size(nxt_go_run_ctx_t *ctx, size_t *size)
+{
+ nxt_int_t rc;
+
+ rc = nxt_go_ctx_read_size_(ctx, size);
+
+ if (nxt_fast_path(rc == NXT_OK)) {
+ nxt_go_debug("read_size: %d", (int)*size);
+ }
+
+ return rc;
+}
+
+nxt_int_t
+nxt_go_ctx_read_str(nxt_go_run_ctx_t *ctx, nxt_str_t *str)
+{
+ size_t length;
+ nxt_int_t rc;
+ nxt_buf_t *buf;
+
+ rc = nxt_go_ctx_read_size_(ctx, &length);
+ if (nxt_slow_path(rc != NXT_OK)) {
+ nxt_go_warn("read str: read size failed");
+ return rc;
+ }
+
+ buf = &ctx->rbuf;
+
+ if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) < (intptr_t)length)) {
+ nxt_go_warn("read str: used size too small %d < %d",
+ (int)nxt_buf_mem_used_size(&buf->mem), (int)length);
+ return NXT_ERROR;
+ }
+
+ if (length > 0) {
+ str->start = buf->mem.pos;
+ str->length = length - 1;
+
+ buf->mem.pos += length;
+
+ nxt_go_debug("read_str: %d %.*s", (int)length - 1, (int)length - 1,
+ str->start);
+ } else {
+ str->start = NULL;
+ str->length = 0;
+
+ nxt_go_debug("read_str: NULL");
+ }
+
+ return NXT_OK;
+}
+
+
+#endif /* NXT_CONFIGURE */
diff --git a/src/nginext/nxt_go_run_ctx.h b/src/nginext/nxt_go_run_ctx.h
new file mode 100644
index 00000000..244bd7c3
--- /dev/null
+++ b/src/nginext/nxt_go_run_ctx.h
@@ -0,0 +1,71 @@
+
+/*
+ * Copyright (C) Max Romanov
+ * Copyright (C) NGINX, Inc.
+ */
+
+#ifndef _NXT_GO_RUN_CTX_H_INCLUDED_
+#define _NXT_GO_RUN_CTX_H_INCLUDED_
+
+
+#include <nxt_main.h>
+#include <nxt_application.h>
+#include <nxt_port_memory_int.h>
+
+
+typedef struct nxt_go_process_s nxt_go_process_t;
+typedef struct nxt_port_mmap_msg_s nxt_port_mmap_msg_t;
+
+typedef struct nxt_go_msg_s nxt_go_msg_t;
+
+struct nxt_go_msg_s {
+ off_t start_offset;
+
+ nxt_port_msg_t *port_msg;
+ size_t raw_size;
+ size_t data_size;
+
+ nxt_port_mmap_msg_t *mmap_msg;
+ nxt_port_mmap_msg_t *end;
+
+ nxt_go_msg_t *next;
+};
+
+
+typedef struct {
+ nxt_go_msg_t msg;
+
+ nxt_go_process_t *process;
+
+ uint32_t nrbuf;
+ nxt_buf_t rbuf;
+
+ uint32_t nwbuf;
+ nxt_buf_t wbuf;
+ nxt_port_msg_t wport_msg;
+ nxt_port_mmap_msg_t wmmap_msg[8];
+
+ nxt_app_request_t r;
+
+ nxt_go_msg_t *msg_last;
+} nxt_go_run_ctx_t;
+
+
+void nxt_go_ctx_release_msg(nxt_go_run_ctx_t *ctx, nxt_go_msg_t *msg);
+
+nxt_int_t nxt_go_ctx_init(nxt_go_run_ctx_t *ctx, nxt_port_msg_t *port_msg,
+ size_t payload_size);
+
+void nxt_go_ctx_add_msg(nxt_go_run_ctx_t *ctx, nxt_port_msg_t *port_msg,
+ size_t size);
+
+nxt_int_t nxt_go_ctx_flush(nxt_go_run_ctx_t *ctx, int last);
+
+nxt_int_t nxt_go_ctx_write(nxt_go_run_ctx_t *ctx, void *data, size_t len);
+
+nxt_int_t nxt_go_ctx_read_size(nxt_go_run_ctx_t *ctx, size_t *size);
+
+nxt_int_t nxt_go_ctx_read_str(nxt_go_run_ctx_t *ctx, nxt_str_t *str);
+
+
+#endif /* _NXT_GO_RUN_CTX_H_INCLUDED_ */
diff --git a/src/nginext/port.go b/src/nginext/port.go
new file mode 100644
index 00000000..01921b5a
--- /dev/null
+++ b/src/nginext/port.go
@@ -0,0 +1,161 @@
+/*
+ * Copyright (C) Max Romanov
+ * Copyright (C) NGINX, Inc.
+ */
+
+package nginext
+
+/*
+#include "nxt_go_lib.h"
+*/
+import "C"
+
+import (
+ "fmt"
+ "net"
+ "os"
+ "sync"
+ "unsafe"
+)
+
+type port_key struct {
+ pid int
+ id int
+}
+
+type port struct {
+ key port_key
+ rcv *net.UnixConn
+ snd *net.UnixConn
+}
+
+type port_registry struct {
+ sync.RWMutex
+ m map[port_key]*port
+}
+
+var port_registry_ port_registry
+
+func find_port(key port_key) *port {
+ port_registry_.RLock()
+ res := port_registry_.m[key]
+ port_registry_.RUnlock()
+
+ return res
+}
+
+func add_port(p *port) {
+ port_registry_.Lock()
+ if port_registry_.m == nil {
+ port_registry_.m = make(map[port_key]*port)
+ }
+
+ port_registry_.m[p.key] = p
+
+ port_registry_.Unlock()
+}
+
+func (p *port) Close() {
+ if p.rcv != nil {
+ p.rcv.Close()
+ }
+
+ if p.snd != nil {
+ p.snd.Close()
+ }
+}
+
+func getUnixConn(fd int) *net.UnixConn {
+ if fd < 0 {
+ return nil
+ }
+
+ f := os.NewFile(uintptr(fd), "sock")
+ defer f.Close()
+
+ c, err := net.FileConn(f)
+ if err != nil {
+ fmt.Printf("FileConn error %s\n", err)
+ return nil
+ }
+
+ uc, ok := c.(*net.UnixConn)
+ if !ok {
+ fmt.Printf("Not a Unix-domain socket %d\n", fd)
+ return nil
+ }
+
+ fmt.Printf("Unix-domain socket %d\n", fd)
+ return uc
+}
+
+//export nxt_go_new_port
+func nxt_go_new_port(pid C.int, id C.int, t C.int, rcv C.int, snd C.int) {
+ new_port(int(pid), int(id), int(t), int(rcv), int(snd))
+}
+
+//export nxt_go_port_send
+func nxt_go_port_send(pid C.int, id C.int, buf unsafe.Pointer, buf_size C.int, oob unsafe.Pointer, oob_size C.int) C.int {
+ key := port_key{
+ pid: int(pid),
+ id: int(id),
+ }
+
+ p := find_port(key)
+
+ if p != nil {
+ n, oobn, err := p.snd.WriteMsgUnix(C.GoBytes(buf, buf_size), C.GoBytes(oob, oob_size), nil)
+
+ if err != nil {
+ fmt.Printf("write result %d (%d), %s\n", n, oobn, err)
+ }
+
+ return C.int(n)
+ }
+
+ return 0
+}
+
+func new_port(pid int, id int, t int, rcv int, snd int) *port {
+ p := &port{
+ key: port_key{
+ pid: pid,
+ id: id,
+ },
+ rcv: getUnixConn(rcv),
+ snd: getUnixConn(snd),
+ }
+
+ add_port(p)
+
+ fmt.Printf("new_port: %d, %d, %d, %d\n", pid, id, rcv, snd)
+
+ return p
+}
+
+func (p *port) read() {
+ var buf [16384]byte
+ var oob [1024]byte
+
+ n, oobn, _, _, err := p.rcv.ReadMsgUnix(buf[:], oob[:])
+
+ fmt.Printf("read result %d (%d), %s\n", n, oobn, err)
+
+ m := new_cmsg(buf[:n], oob[:oobn])
+
+ c_req := C.nxt_go_process_port_msg(m.buf.b, m.buf.s, m.oob.b, m.oob.s)
+
+ if c_req == 0 {
+ m.Close()
+ } else {
+ r := find_request(c_req)
+ if len(r.msgs) == 0 {
+ r.push(m)
+ } else if r.ch != nil {
+ r.ch <- m
+ } else {
+ m.Close()
+ }
+ }
+
+}
diff --git a/src/nginext/request.go b/src/nginext/request.go
new file mode 100644
index 00000000..8667a074
--- /dev/null
+++ b/src/nginext/request.go
@@ -0,0 +1,213 @@
+/*
+ * Copyright (C) Max Romanov
+ * Copyright (C) NGINX, Inc.
+ */
+
+package nginext
+
+/*
+#include "nxt_go_lib.h"
+*/
+import "C"
+
+import (
+ "net/http"
+ "net/url"
+ "sync"
+)
+
+type request struct {
+ req http.Request
+ resp *response
+ c_req C.nxt_go_request_t
+ id C.uint32_t
+ read_pos C.off_t
+ msgs []*cmsg
+ ch chan *cmsg
+}
+
+func (r *request) Read(p []byte) (n int, err error) {
+ c := C.size_t(cap(p))
+ b := C.malloc(c)
+ res := C.nxt_go_request_read(r.c_req, r.read_pos, b, c)
+
+ if res == -2 /* NXT_AGAIN */ {
+ m := <-r.ch
+
+ res = C.nxt_go_request_read_from(r.c_req, r.read_pos, b, c, m.buf.b, m.buf.s)
+ r.push(m)
+ }
+
+ if res > 0 {
+ copy(p, C.GoBytes(b, res))
+ r.read_pos += C.off_t(res)
+ }
+
+ C.free(b)
+ return int(res), nil
+}
+
+func (r *request) Close() error {
+ C.nxt_go_request_close(r.c_req)
+ return nil
+}
+
+type request_registry struct {
+ sync.RWMutex
+ m map[C.nxt_go_request_t]*request
+ id map[C.uint32_t]*request
+}
+
+var request_registry_ request_registry
+
+func find_request(c_req C.nxt_go_request_t) *request {
+ request_registry_.RLock()
+ res := request_registry_.m[c_req]
+ request_registry_.RUnlock()
+
+ return res
+}
+
+func find_request_by_id(id C.uint32_t) *request {
+ request_registry_.RLock()
+ res := request_registry_.id[id]
+ request_registry_.RUnlock()
+
+ return res
+}
+
+func add_request(r *request) {
+ request_registry_.Lock()
+ if request_registry_.m == nil {
+ request_registry_.m = make(map[C.nxt_go_request_t]*request)
+ request_registry_.id = make(map[C.uint32_t]*request)
+ }
+
+ request_registry_.m[r.c_req] = r
+ request_registry_.id[r.id] = r
+
+ request_registry_.Unlock()
+}
+
+func remove_request(r *request) {
+ request_registry_.Lock()
+ if request_registry_.m != nil {
+ delete(request_registry_.m, r.c_req)
+ delete(request_registry_.id, r.id)
+ }
+
+ request_registry_.Unlock()
+}
+
+func (r *request) response() *response {
+ if r.resp == nil {
+ r.resp = new_response(r.c_req, &r.req)
+ }
+
+ return r.resp
+}
+
+func (r *request) done() {
+ C.nxt_go_request_done(r.c_req)
+
+ remove_request(r)
+
+ for _, m := range r.msgs {
+ m.Close()
+ }
+
+ if r.ch != nil {
+ close(r.ch)
+ }
+}
+
+func (r *request) push(m *cmsg) {
+ r.msgs = append(r.msgs, m)
+}
+
+//export nxt_go_new_request
+func nxt_go_new_request(c_req C.nxt_go_request_t, id C.uint32_t, c_method *C.nxt_go_str_t, c_uri *C.nxt_go_str_t) {
+ uri := C.GoStringN(c_uri.start, c_uri.length)
+
+ var URL *url.URL
+ var err error
+ if URL, err = url.ParseRequestURI(uri); err != nil {
+ return
+ }
+
+ r := &request{
+ req: http.Request{
+ Method: C.GoStringN(c_method.start, c_method.length),
+ URL: URL,
+ Header: http.Header{},
+ Body: nil,
+ RequestURI: uri,
+ },
+ c_req: c_req,
+ id: id,
+ msgs: make([]*cmsg, 0, 1),
+ }
+ r.req.Body = r
+
+ add_request(r)
+}
+
+//export nxt_go_find_request
+func nxt_go_find_request(id C.uint32_t) C.nxt_go_request_t {
+ r := find_request_by_id(id)
+
+ if r != nil {
+ return r.c_req
+ }
+
+ return 0
+}
+
+//export nxt_go_request_set_proto
+func nxt_go_request_set_proto(c_req C.nxt_go_request_t, proto *C.nxt_go_str_t, maj C.int, min C.int) {
+ r := find_request(c_req)
+ r.req.Proto = C.GoStringN(proto.start, proto.length)
+ r.req.ProtoMajor = int(maj)
+ r.req.ProtoMinor = int(min)
+}
+
+//export nxt_go_request_add_header
+func nxt_go_request_add_header(c_req C.nxt_go_request_t, name *C.nxt_go_str_t, value *C.nxt_go_str_t) {
+ r := find_request(c_req)
+ r.req.Header.Add(C.GoStringN(name.start, name.length), C.GoStringN(value.start, value.length))
+}
+
+//export nxt_go_request_set_content_length
+func nxt_go_request_set_content_length(c_req C.nxt_go_request_t, l C.int64_t) {
+ find_request(c_req).req.ContentLength = int64(l)
+}
+
+//export nxt_go_request_create_channel
+func nxt_go_request_create_channel(c_req C.nxt_go_request_t) {
+ find_request(c_req).ch = make(chan *cmsg)
+}
+
+//export nxt_go_request_set_host
+func nxt_go_request_set_host(c_req C.nxt_go_request_t, host *C.nxt_go_str_t) {
+ find_request(c_req).req.Host = C.GoStringN(host.start, host.length)
+}
+
+//export nxt_go_request_set_url
+func nxt_go_request_set_url(c_req C.nxt_go_request_t, scheme *C.char) {
+ find_request(c_req).req.URL.Scheme = C.GoString(scheme)
+}
+
+//export nxt_go_request_set_remote_addr
+func nxt_go_request_set_remote_addr(c_req C.nxt_go_request_t, addr *C.nxt_go_str_t) {
+ find_request(c_req).req.RemoteAddr = C.GoStringN(addr.start, addr.length)
+}
+
+//export nxt_go_request_serve
+func nxt_go_request_serve(c_req C.nxt_go_request_t) {
+ r := find_request(c_req)
+
+ go func(r *request) {
+ http.DefaultServeMux.ServeHTTP(r.response(), &r.req)
+ r.done()
+ }(r)
+}
diff --git a/src/nginext/response.go b/src/nginext/response.go
new file mode 100644
index 00000000..dc864f6e
--- /dev/null
+++ b/src/nginext/response.go
@@ -0,0 +1,69 @@
+/*
+ * Copyright (C) Max Romanov
+ * Copyright (C) NGINX, Inc.
+ */
+
+package nginext
+
+/*
+#include "nxt_go_lib.h"
+*/
+import "C"
+
+import (
+ "fmt"
+ "net/http"
+ "os"
+)
+
+type response struct {
+ header http.Header
+ headerSent bool
+ req *http.Request
+ c_req C.nxt_go_request_t
+}
+
+func new_response(c_req C.nxt_go_request_t, req *http.Request) *response {
+ resp := &response{
+ header: http.Header{},
+ req: req,
+ c_req: c_req,
+ }
+
+ return resp
+}
+
+func (r *response) Header() http.Header {
+ return r.header
+}
+
+func (r *response) Write(p []byte) (n int, err error) {
+ if !r.headerSent {
+ r.WriteHeader(http.StatusOK)
+ }
+
+ l := C.size_t(len(p))
+ b := getCBytes(p)
+ res := C.nxt_go_response_write(r.c_req, b, l)
+ C.free(b)
+ return int(res), nil
+}
+
+func (r *response) WriteHeader(code int) {
+ if r.headerSent {
+ // Note: explicitly using Stderr, as Stdout is our HTTP output.
+ fmt.Fprintf(os.Stderr, "CGI attempted to write header twice")
+ return
+ }
+ r.headerSent = true
+ fmt.Fprintf(r, "%s %d %s\r\n", r.req.Proto, code, http.StatusText(code))
+
+ // Set a default Content-Type
+ if _, hasType := r.header["Content-Type"]; !hasType {
+ r.header.Add("Content-Type", "text/html; charset=utf-8")
+ }
+
+ r.header.Write(r)
+
+ r.Write([]byte("\r\n"))
+}
diff --git a/src/nxt_go.c b/src/nxt_go.c
new file mode 100644
index 00000000..703974ba
--- /dev/null
+++ b/src/nxt_go.c
@@ -0,0 +1,205 @@
+
+/*
+ * Copyright (C) Max Romanov
+ * Copyright (C) NGINX, Inc.
+ */
+
+#include <nxt_main.h>
+#include <nxt_application.h>
+
+
+static nxt_int_t nxt_go_init(nxt_task_t *task);
+
+static nxt_int_t nxt_go_prepare_msg(nxt_task_t *task,
+ nxt_app_request_t *r, nxt_app_wmsg_t *msg);
+
+static nxt_int_t nxt_go_run(nxt_task_t *task,
+ nxt_app_rmsg_t *rmsg, nxt_app_wmsg_t *msg);
+
+static nxt_str_t nxt_go_path;
+
+nxt_application_module_t nxt_go_module = {
+ nxt_go_init,
+ nxt_go_prepare_msg,
+ nxt_go_run
+};
+
+
+nxt_int_t
+nxt_go_module_init(nxt_thread_t *thr, nxt_runtime_t *rt);
+
+nxt_int_t
+nxt_go_module_init(nxt_thread_t *thr, nxt_runtime_t *rt)
+{
+ char **argv;
+ u_char *p;
+
+ argv = nxt_process_argv;
+
+ while (*argv != NULL) {
+ p = (u_char *) *argv++;
+
+ if (nxt_strcmp(p, "--go") == 0) {
+ if (*argv == NULL) {
+ nxt_log_error(NXT_LOG_ERR, thr->log,
+ "no argument for option \"--go\"");
+ return NXT_ERROR;
+ }
+
+ p = (u_char *) *argv;
+
+ nxt_go_path.start = p;
+ nxt_go_path.length = nxt_strlen(p);
+
+ nxt_log_error(NXT_LOG_INFO, thr->log,
+ "go program \"%V\"",
+ &nxt_go_path);
+
+ nxt_app = &nxt_go_module;
+
+ return NXT_OK;
+ }
+ }
+
+ nxt_log_error(NXT_LOG_ERR, thr->log, "no option \"--go\" specified");
+
+ return NXT_ERROR;
+}
+
+extern char **environ;
+
+nxt_inline int
+nxt_sock_no_cloexec(nxt_socket_t fd)
+{
+ if (fd == -1) {
+ return 0;
+ }
+ return fcntl(fd, F_SETFD, 0);
+}
+
+static nxt_int_t
+nxt_go_init(nxt_task_t *task)
+{
+ char *go_ports = getenv("NXT_GO_PORTS");
+
+ nxt_debug(task, "initialize go app, NXT_GO_PORTS=%s",
+ go_ports ? go_ports : "NULL");
+
+ if (go_ports == NULL) {
+ u_char buf[256];
+ u_char *p = buf;
+
+ nxt_runtime_t *rt = task->thread->runtime;
+ nxt_port_t *port;
+
+ nxt_runtime_port_each(rt, port) {
+
+ nxt_debug(task, "port %PI, %ud, (%d, %d)", port->pid, port->id,
+ port->pair[0], port->pair[1]);
+
+ p = nxt_sprintf(p, buf + sizeof(buf), "%PI,%ud,%d,%d,%d;",
+ port->pid, port->id, (int)port->type,
+ port->pair[0], port->pair[1]);
+
+ if (nxt_slow_path(nxt_sock_no_cloexec(port->pair[0]))) {
+ nxt_log(task, NXT_LOG_WARN, "fcntl() failed %E", nxt_errno);
+ }
+
+ if (nxt_slow_path(nxt_sock_no_cloexec(port->pair[1]))) {
+ nxt_log(task, NXT_LOG_WARN, "fcntl() failed %E", nxt_errno);
+ }
+
+ } nxt_runtime_port_loop;
+
+ *p = '\0';
+ nxt_debug(task, "update NXT_GO_PORTS=%s", buf);
+
+ setenv("NXT_GO_PORTS", (char *)buf, 1);
+
+ char *argv[] = {
+ (char *)nxt_go_path.start,
+ (char *)"--no-daemonize",
+ (char *)"--app", NULL };
+
+ (void) execve((char *)nxt_go_path.start, argv, environ);
+
+ nxt_log(task, NXT_LOG_WARN, "execve(%V) failed %E", &nxt_go_path,
+ nxt_errno);
+
+ return NXT_ERROR;
+ }
+
+ return NXT_OK;
+}
+
+
+static nxt_int_t
+nxt_go_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, nxt_app_wmsg_t *wmsg)
+{
+ nxt_int_t rc;
+ nxt_http_field_t *field;
+ nxt_app_request_header_t *h;
+
+ static const nxt_str_t eof = nxt_null_string;
+
+ h = &r->header;
+
+#define RC(S) \
+ do { \
+ rc = (S); \
+ if (nxt_slow_path(rc != NXT_OK)) { \
+ goto fail; \
+ } \
+ } while(0)
+
+#define NXT_WRITE(N) \
+ RC(nxt_app_msg_write_str(task, wmsg, N))
+
+ /* TODO error handle, async mmap buffer assignment */
+
+ NXT_WRITE(&h->method);
+ NXT_WRITE(&h->path);
+
+ if (h->query.start != NULL) {
+ RC(nxt_app_msg_write_size(task, wmsg,
+ h->query.start - h->path.start + 1));
+ } else {
+ RC(nxt_app_msg_write_size(task, wmsg, 0));
+ }
+
+ NXT_WRITE(&h->version);
+
+ NXT_WRITE(&h->host);
+ NXT_WRITE(&h->cookie);
+ NXT_WRITE(&h->content_type);
+ NXT_WRITE(&h->content_length);
+
+ RC(nxt_app_msg_write_size(task, wmsg, h->parsed_content_length));
+
+ nxt_list_each(field, h->fields) {
+ NXT_WRITE(&field->name);
+ NXT_WRITE(&field->value);
+
+ } nxt_list_loop;
+
+ /* end-of-headers mark */
+ NXT_WRITE(&eof);
+ NXT_WRITE(&r->body.preread);
+
+#undef NXT_WRITE
+#undef RC
+
+ return NXT_OK;
+
+fail:
+
+ return NXT_ERROR;
+}
+
+
+static nxt_int_t
+nxt_go_run(nxt_task_t *task,
+ nxt_app_rmsg_t *rmsg, nxt_app_wmsg_t *msg)
+{
+ return NXT_ERROR;
+}