summaryrefslogtreecommitdiffhomepage
path: root/src/go/unit
diff options
context:
space:
mode:
Diffstat (limited to 'src/go/unit')
-rw-r--r--src/go/unit/nxt_cgo_lib.c207
-rw-r--r--src/go/unit/nxt_cgo_lib.h40
-rw-r--r--src/go/unit/nxt_go_array.c62
-rw-r--r--src/go/unit/nxt_go_array.h36
-rw-r--r--src/go/unit/nxt_go_lib.c143
-rw-r--r--src/go/unit/nxt_go_lib.h40
-rw-r--r--src/go/unit/nxt_go_log.h34
-rw-r--r--src/go/unit/nxt_go_mutex.h21
-rw-r--r--src/go/unit/nxt_go_port.c216
-rw-r--r--src/go/unit/nxt_go_port.h18
-rw-r--r--src/go/unit/nxt_go_port_memory.c217
-rw-r--r--src/go/unit/nxt_go_port_memory.h30
-rw-r--r--src/go/unit/nxt_go_process.c148
-rw-r--r--src/go/unit/nxt_go_process.h33
-rw-r--r--src/go/unit/nxt_go_run_ctx.c554
-rw-r--r--src/go/unit/nxt_go_run_ctx.h78
-rw-r--r--src/go/unit/port.go143
-rw-r--r--src/go/unit/request.go51
-rw-r--r--src/go/unit/response.go36
-rw-r--r--src/go/unit/unit.go123
20 files changed, 388 insertions, 1842 deletions
diff --git a/src/go/unit/nxt_cgo_lib.c b/src/go/unit/nxt_cgo_lib.c
new file mode 100644
index 00000000..9c080730
--- /dev/null
+++ b/src/go/unit/nxt_cgo_lib.c
@@ -0,0 +1,207 @@
+
+/*
+ * Copyright (C) Max Romanov
+ * Copyright (C) NGINX, Inc.
+ */
+
+#include "_cgo_export.h"
+
+#include <nxt_main.h>
+#include <nxt_unit.h>
+#include <nxt_unit_request.h>
+
+
+static void nxt_cgo_request_handler(nxt_unit_request_info_t *req);
+static nxt_cgo_str_t *nxt_cgo_str_init(nxt_cgo_str_t *dst,
+ nxt_unit_sptr_t *sptr, uint32_t length);
+static int nxt_cgo_add_port(nxt_unit_ctx_t *, nxt_unit_port_t *port);
+static void nxt_cgo_remove_port(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id);
+static ssize_t nxt_cgo_port_send(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id,
+ const void *buf, size_t buf_size, const void *oob, size_t oob_size);
+static ssize_t nxt_cgo_port_recv(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id,
+ void *buf, size_t buf_size, void *oob, size_t oob_size);
+
+int
+nxt_cgo_run(uintptr_t handler)
+{
+ int rc;
+ nxt_unit_ctx_t *ctx;
+ nxt_unit_init_t init;
+
+ memset(&init, 0, sizeof(init));
+
+ init.callbacks.request_handler = nxt_cgo_request_handler;
+ init.callbacks.add_port = nxt_cgo_add_port;
+ init.callbacks.remove_port = nxt_cgo_remove_port;
+ init.callbacks.port_send = nxt_cgo_port_send;
+ init.callbacks.port_recv = nxt_cgo_port_recv;
+
+ init.data = (void *) handler;
+
+ ctx = nxt_unit_init(&init);
+ if (nxt_slow_path(ctx == NULL)) {
+ return NXT_UNIT_ERROR;
+ }
+
+ rc = nxt_unit_run(ctx);
+
+ nxt_unit_done(ctx);
+
+ return rc;
+}
+
+
+static void
+nxt_cgo_request_handler(nxt_unit_request_info_t *req)
+{
+ uint32_t i;
+ uintptr_t go_req;
+ nxt_cgo_str_t method, uri, name, value, proto, host, remote_addr;
+ nxt_unit_field_t *f;
+ nxt_unit_request_t *r;
+
+ r = req->request;
+
+ go_req = nxt_go_request_create((uintptr_t) req,
+ nxt_cgo_str_init(&method, &r->method, r->method_length),
+ nxt_cgo_str_init(&uri, &r->target, r->target_length));
+
+ nxt_go_request_set_proto(go_req,
+ nxt_cgo_str_init(&proto, &r->version, r->version_length), 1, 1);
+
+ for (i = 0; i < r->fields_count; i++) {
+ f = &r->fields[i];
+
+ nxt_go_request_add_header(go_req,
+ nxt_cgo_str_init(&name, &f->name, f->name_length),
+ nxt_cgo_str_init(&value, &f->value, f->value_length));
+
+ if (f->hash == NXT_UNIT_HASH_HOST) {
+ host = value;
+ }
+ }
+
+ nxt_go_request_set_content_length(go_req, r->content_length);
+ nxt_go_request_set_host(go_req, &host);
+ nxt_go_request_set_remote_addr(go_req,
+ nxt_cgo_str_init(&remote_addr, &r->remote, r->remote_length));
+
+ nxt_go_request_handler(go_req, (uintptr_t) req->unit->data);
+}
+
+
+static nxt_cgo_str_t *
+nxt_cgo_str_init(nxt_cgo_str_t *dst, nxt_unit_sptr_t *sptr, uint32_t length)
+{
+ dst->length = length;
+ dst->start = nxt_unit_sptr_get(sptr);
+
+ return dst;
+}
+
+
+static int
+nxt_cgo_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
+{
+ nxt_go_add_port(port->id.pid, port->id.id,
+ port->in_fd, port->out_fd);
+
+ return nxt_unit_add_port(ctx, port);
+}
+
+
+static void
+nxt_cgo_remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id)
+{
+ nxt_go_remove_port(port_id->pid, port_id->id);
+
+ nxt_unit_remove_port(ctx, port_id);
+}
+
+
+static ssize_t
+nxt_cgo_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
+ const void *buf, size_t buf_size, const void *oob, size_t oob_size)
+{
+ return nxt_go_port_send(port_id->pid, port_id->id,
+ (void *) buf, buf_size, (void *) oob, oob_size);
+}
+
+
+static ssize_t
+nxt_cgo_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
+ void *buf, size_t buf_size, void *oob, size_t oob_size)
+{
+ return nxt_go_port_recv(port_id->pid, port_id->id,
+ buf, buf_size, oob, oob_size);
+}
+
+
+int
+nxt_cgo_response_create(uintptr_t req, int status, int fields,
+ uint32_t fields_size)
+{
+ return nxt_unit_response_init((nxt_unit_request_info_t *) req,
+ status, fields, fields_size);
+}
+
+
+int
+nxt_cgo_response_add_field(uintptr_t req, uintptr_t name, uint8_t name_len,
+ uintptr_t value, uint32_t value_len)
+{
+ return nxt_unit_response_add_field((nxt_unit_request_info_t *) req,
+ (char *) name, name_len,
+ (char *) value, value_len);
+}
+
+
+int
+nxt_cgo_response_send(uintptr_t req)
+{
+ return nxt_unit_response_send((nxt_unit_request_info_t *) req);
+}
+
+
+ssize_t
+nxt_cgo_response_write(uintptr_t req, uintptr_t start, uint32_t len)
+{
+ int rc;
+
+ rc = nxt_unit_response_write((nxt_unit_request_info_t *) req,
+ (void *) start, len);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ return -1;
+ }
+
+ return len;
+}
+
+
+ssize_t
+nxt_cgo_request_read(uintptr_t req, uintptr_t dst, uint32_t dst_len)
+{
+ return nxt_unit_request_read((nxt_unit_request_info_t *) req,
+ (void *) dst, dst_len);
+}
+
+
+int
+nxt_cgo_request_close(uintptr_t req)
+{
+ return 0;
+}
+
+
+void
+nxt_cgo_request_done(uintptr_t req, int res)
+{
+ nxt_unit_request_done((nxt_unit_request_info_t *) req, res);
+}
+
+
+void
+nxt_cgo_warn(uintptr_t msg, uint32_t msg_len)
+{
+ nxt_unit_warn(NULL, ".*s", (int) msg_len, (char *) msg);
+}
diff --git a/src/go/unit/nxt_cgo_lib.h b/src/go/unit/nxt_cgo_lib.h
new file mode 100644
index 00000000..5317380b
--- /dev/null
+++ b/src/go/unit/nxt_cgo_lib.h
@@ -0,0 +1,40 @@
+
+/*
+ * Copyright (C) Max Romanov
+ * Copyright (C) NGINX, Inc.
+ */
+
+#ifndef _NXT_CGO_LIB_H_INCLUDED_
+#define _NXT_CGO_LIB_H_INCLUDED_
+
+
+#include <stdint.h>
+#include <stdlib.h>
+#include <sys/types.h>
+
+typedef struct {
+ int length;
+ char *start;
+} nxt_cgo_str_t;
+
+int nxt_cgo_run(uintptr_t handler);
+
+int nxt_cgo_response_create(uintptr_t req, int code, int fields,
+ uint32_t fields_size);
+
+int nxt_cgo_response_add_field(uintptr_t req, uintptr_t name, uint8_t name_len,
+ uintptr_t value, uint32_t value_len);
+
+int nxt_cgo_response_send(uintptr_t req);
+
+ssize_t nxt_cgo_response_write(uintptr_t req, uintptr_t src, uint32_t len);
+
+ssize_t nxt_cgo_request_read(uintptr_t req, uintptr_t dst, uint32_t dst_len);
+
+int nxt_cgo_request_close(uintptr_t req);
+
+void nxt_cgo_request_done(uintptr_t req, int res);
+
+void nxt_cgo_warn(uintptr_t msg, uint32_t msg_len);
+
+#endif /* _NXT_CGO_LIB_H_INCLUDED_ */
diff --git a/src/go/unit/nxt_go_array.c b/src/go/unit/nxt_go_array.c
deleted file mode 100644
index fea97faf..00000000
--- a/src/go/unit/nxt_go_array.c
+++ /dev/null
@@ -1,62 +0,0 @@
-
-/*
- * Copyright (C) Max Romanov
- * Copyright (C) NGINX, Inc.
- */
-
-#include <stdint.h>
-#include <sys/types.h>
-
-#include <nxt_main.h>
-
-#include "nxt_go_array.h"
-
-void
-nxt_go_array_init(nxt_array_t *array, nxt_uint_t n, size_t size)
-{
- array->elts = malloc(n * size);
-
- if (nxt_slow_path(n != 0 && array->elts == NULL)) {
- return;
- }
-
- array->nelts = 0;
- array->size = size;
- array->nalloc = n;
- array->mem_pool = NULL;
-}
-
-void *
-nxt_go_array_add(nxt_array_t *array)
-{
- void *p;
- uint32_t nalloc, new_alloc;
-
- nalloc = array->nalloc;
-
- if (array->nelts == nalloc) {
-
- if (nalloc < 16) {
- /* Allocate new array twice larger than current. */
- new_alloc = nalloc * 2;
-
- } else {
- /* Allocate new array 1.5 times larger than current. */
- new_alloc = nalloc + nalloc / 2;
- }
-
- p = realloc(array->elts, array->size * new_alloc);
-
- if (nxt_slow_path(p == NULL)) {
- return NULL;
- }
-
- array->elts = p;
- array->nalloc = new_alloc;
- }
-
- p = nxt_pointer_to(array->elts, array->size * array->nelts);
- array->nelts++;
-
- return p;
-}
diff --git a/src/go/unit/nxt_go_array.h b/src/go/unit/nxt_go_array.h
deleted file mode 100644
index d96db663..00000000
--- a/src/go/unit/nxt_go_array.h
+++ /dev/null
@@ -1,36 +0,0 @@
-
-/*
- * Copyright (C) Max Romanov
- * Copyright (C) NGINX, Inc.
- */
-
-#ifndef _NXT_GO_ARRAY_H_INCLUDED_
-#define _NXT_GO_ARRAY_H_INCLUDED_
-
-
-#include <nxt_array.h>
-
-void nxt_go_array_init(nxt_array_t *array, nxt_uint_t n, size_t size);
-
-void *nxt_go_array_add(nxt_array_t *array);
-
-nxt_inline void *
-nxt_go_array_zero_add(nxt_array_t *array)
-{
- void *p;
-
- p = nxt_go_array_add(array);
-
- if (nxt_fast_path(p != NULL)) {
- nxt_memzero(p, array->size);
- }
-
- return p;
-}
-
-#define \
-nxt_go_array_at(array, n) \
- nxt_pointer_to((array)->elts, (array)->size * (n))
-
-
-#endif /* _NXT_GO_ARRAY_H_INCLUDED_ */
diff --git a/src/go/unit/nxt_go_lib.c b/src/go/unit/nxt_go_lib.c
deleted file mode 100644
index eeb7aa50..00000000
--- a/src/go/unit/nxt_go_lib.c
+++ /dev/null
@@ -1,143 +0,0 @@
-
-/*
- * Copyright (C) Max Romanov
- * Copyright (C) NGINX, Inc.
- */
-
-#include "nxt_go_run_ctx.h"
-#include "nxt_go_log.h"
-#include "nxt_go_port.h"
-
-#include "_cgo_export.h"
-
-#include <nxt_main.h>
-
-int
-nxt_go_response_write(nxt_go_request_t r, uintptr_t buf, size_t len)
-{
- nxt_int_t rc;
- nxt_go_run_ctx_t *ctx;
-
- if (nxt_slow_path(r == 0)) {
- return 0;
- }
-
- nxt_go_debug("write: %d", (int) len);
-
- ctx = (nxt_go_run_ctx_t *) r;
- rc = nxt_go_ctx_write(ctx, (void *) buf, len);
-
- return rc == NXT_OK ? len : -1;
-}
-
-
-void
-nxt_go_response_flush(nxt_go_request_t r)
-{
- nxt_go_run_ctx_t *ctx;
-
- if (nxt_slow_path(r == 0)) {
- return;
- }
-
- ctx = (nxt_go_run_ctx_t *) r;
-
- if (ctx->nwbuf > 0) {
- nxt_go_ctx_flush(ctx, 0);
- }
-}
-
-
-int
-nxt_go_request_read(nxt_go_request_t r, uintptr_t dst, size_t dst_len)
-{
- size_t res;
- nxt_go_run_ctx_t *ctx;
-
- if (nxt_slow_path(r == 0)) {
- return 0;
- }
-
- ctx = (nxt_go_run_ctx_t *) r;
-
- dst_len = nxt_min(dst_len, ctx->request.body.preread_size);
-
- res = nxt_go_ctx_read_raw(ctx, (void *) dst, dst_len);
-
- ctx->request.body.preread_size -= res;
-
- return res;
-}
-
-
-int
-nxt_go_request_close(nxt_go_request_t r)
-{
- return 0;
-}
-
-
-int
-nxt_go_request_done(nxt_go_request_t r)
-{
- nxt_int_t res;
- nxt_go_run_ctx_t *ctx;
- nxt_go_msg_t *msg, *b;
-
- if (nxt_slow_path(r == 0)) {
- return 0;
- }
-
- ctx = (nxt_go_run_ctx_t *) r;
-
- res = nxt_go_ctx_flush(ctx, 1);
-
- nxt_go_ctx_release_msg(ctx, &ctx->msg);
-
- msg = ctx->msg.next;
- while (msg != NULL) {
- nxt_go_ctx_release_msg(ctx, msg);
-
- b = msg;
- msg = b->next;
-
- free(b);
- }
-
- free(ctx);
-
- return res;
-}
-
-
-void
-nxt_go_ready(uint32_t stream)
-{
- nxt_port_msg_t port_msg;
-
- port_msg.stream = stream;
- port_msg.pid = getpid();
- port_msg.reply_port = 0;
- port_msg.type = _NXT_PORT_MSG_PROCESS_READY;
- port_msg.last = 1;
- port_msg.mmap = 0;
- port_msg.nf = 0;
- port_msg.mf = 0;
- port_msg.tracking = 0;
-
- nxt_go_main_send(&port_msg, sizeof(port_msg), NULL, 0);
-}
-
-
-nxt_go_request_t
-nxt_go_process_port_msg(uintptr_t buf, size_t buf_len, uintptr_t oob, size_t oob_len)
-{
- return nxt_go_port_on_read((void *) buf, buf_len, (void *) oob, oob_len);
-}
-
-
-const char *
-nxt_go_version()
-{
- return NXT_VERSION;
-}
diff --git a/src/go/unit/nxt_go_lib.h b/src/go/unit/nxt_go_lib.h
deleted file mode 100644
index 0621a4dc..00000000
--- a/src/go/unit/nxt_go_lib.h
+++ /dev/null
@@ -1,40 +0,0 @@
-
-/*
- * Copyright (C) Max Romanov
- * Copyright (C) NGINX, Inc.
- */
-
-#ifndef _NXT_GO_LIB_H_INCLUDED_
-#define _NXT_GO_LIB_H_INCLUDED_
-
-
-#include <stdint.h>
-#include <stdlib.h>
-#include <sys/types.h>
-
-typedef struct {
- int length;
- char *start;
-} nxt_go_str_t;
-
-typedef uintptr_t nxt_go_request_t;
-
-int nxt_go_response_write(nxt_go_request_t r, uintptr_t buf, size_t len);
-
-void nxt_go_response_flush(nxt_go_request_t r);
-
-int nxt_go_request_read(nxt_go_request_t r, uintptr_t dst, size_t dst_len);
-
-int nxt_go_request_close(nxt_go_request_t r);
-
-int nxt_go_request_done(nxt_go_request_t r);
-
-void nxt_go_ready(uint32_t stream);
-
-nxt_go_request_t nxt_go_process_port_msg(uintptr_t buf, size_t buf_len,
- uintptr_t oob, size_t oob_len);
-
-const char *nxt_go_version();
-
-
-#endif /* _NXT_GO_LIB_H_INCLUDED_ */
diff --git a/src/go/unit/nxt_go_log.h b/src/go/unit/nxt_go_log.h
deleted file mode 100644
index d596cfb3..00000000
--- a/src/go/unit/nxt_go_log.h
+++ /dev/null
@@ -1,34 +0,0 @@
-
-/*
- * Copyright (C) Max Romanov
- * Copyright (C) NGINX, Inc.
- */
-
-#ifndef _NXT_GO_LOG_H_INCLUDED_
-#define _NXT_GO_LOG_H_INCLUDED_
-
-
-#include <stdio.h>
-#include <pthread.h>
-
-#include <nxt_auto_config.h>
-
-#if (NXT_DEBUG)
-
-#define nxt_go_debug(fmt, ARGS...) \
- fprintf(stderr, "[go debug] " fmt "\n", ##ARGS)
-
-#else
-
-#define nxt_go_debug(fmt, ARGS...)
-
-#endif
-
-#define nxt_go_warn(fmt, ARGS...) \
- fprintf(stderr, "[go warn] " fmt "\n", ##ARGS)
-
-#define nxt_go_error(fmt, ARGS...) \
- fprintf(stderr, "[go error] " fmt "\n", ##ARGS)
-
-
-#endif /* _NXT_GO_LOG_H_INCLUDED_ */
diff --git a/src/go/unit/nxt_go_mutex.h b/src/go/unit/nxt_go_mutex.h
deleted file mode 100644
index 98bd27f0..00000000
--- a/src/go/unit/nxt_go_mutex.h
+++ /dev/null
@@ -1,21 +0,0 @@
-
-/*
- * Copyright (C) Max Romanov
- * Copyright (C) NGINX, Inc.
- */
-
-#ifndef _NXT_GO_MUTEX_H_INCLUDED_
-#define _NXT_GO_MUTEX_H_INCLUDED_
-
-
-#include <pthread.h>
-
-typedef pthread_mutex_t nxt_go_mutex_t;
-
-#define nxt_go_mutex_create(mutex) pthread_mutex_init(mutex, NULL)
-#define nxt_go_mutex_destroy(mutex) pthread_mutex_destroy(mutex)
-#define nxt_go_mutex_lock(mutex) pthread_mutex_lock(mutex)
-#define nxt_go_mutex_unlock(mutex) pthread_mutex_unlock(mutex)
-
-
-#endif /* _NXT_GO_MUTEX_H_INCLUDED_ */
diff --git a/src/go/unit/nxt_go_port.c b/src/go/unit/nxt_go_port.c
deleted file mode 100644
index f2f1fc5a..00000000
--- a/src/go/unit/nxt_go_port.c
+++ /dev/null
@@ -1,216 +0,0 @@
-
-/*
- * Copyright (C) Max Romanov
- * Copyright (C) NGINX, Inc.
- */
-
-#include "nxt_go_port.h"
-#include "nxt_go_log.h"
-#include "nxt_go_process.h"
-#include "nxt_go_run_ctx.h"
-
-#include "_cgo_export.h"
-
-#include <nxt_main.h>
-
-
-#define nxt_go_str(p) ((nxt_go_str_t *)(p))
-
-static nxt_go_request_t
-nxt_go_data_handler(nxt_port_msg_t *port_msg, size_t size)
-{
- size_t s;
- nxt_str_t n, v;
- nxt_int_t rc;
- nxt_uint_t i;
- nxt_go_run_ctx_t *ctx;
- nxt_go_request_t r;
- nxt_app_request_header_t *h;
-
- ctx = malloc(sizeof(nxt_go_run_ctx_t) + size);
-
- memcpy(ctx->port_msg, port_msg, size);
- port_msg = ctx->port_msg;
-
- size -= sizeof(nxt_port_msg_t);
-
- nxt_go_ctx_init(ctx, port_msg, size);
-
- if (nxt_slow_path(ctx->cancelled)) {
- nxt_go_debug("request already cancelled by router");
- free(ctx);
- return 0;
- }
-
- r = (nxt_go_request_t)(ctx);
- h = &ctx->request.header;
-
- nxt_go_ctx_read_str(ctx, &h->method);
- nxt_go_ctx_read_str(ctx, &h->target);
- nxt_go_ctx_read_str(ctx, &h->path);
-
- nxt_go_ctx_read_size(ctx, &s);
- if (s > 0) {
- s--;
- h->query.start = h->target.start + s;
- h->query.length = h->target.length - s;
-
- if (h->path.start == NULL) {
- h->path.start = h->target.start;
- h->path.length = s - 1;
- }
- }
-
- if (h->path.start == NULL) {
- h->path = h->target;
- }
-
- ctx->go_request = nxt_go_new_request(r, port_msg->stream,
- nxt_go_str(&h->method),
- nxt_go_str(&h->target));
-
- nxt_go_ctx_read_str(ctx, &h->version);
-
- nxt_go_request_set_proto(ctx->go_request, nxt_go_str(&h->version),
- h->version.start[5] - '0',
- h->version.start[7] - '0');
-
- nxt_go_ctx_read_str(ctx, &ctx->request.remote);
- if (ctx->request.remote.start != NULL) {
- nxt_go_request_set_remote_addr(ctx->go_request,
- nxt_go_str(&ctx->request.remote));
- }
-
- nxt_go_ctx_read_str(ctx, &h->host);
- nxt_go_ctx_read_str(ctx, &h->cookie);
- nxt_go_ctx_read_str(ctx, &h->content_type);
- nxt_go_ctx_read_str(ctx, &h->content_length);
-
- if (h->host.start != NULL) {
- nxt_go_request_set_host(ctx->go_request, nxt_go_str(&h->host));
- }
-
- nxt_go_ctx_read_size(ctx, &s);
- h->parsed_content_length = s;
-
- do {
- rc = nxt_go_ctx_read_str(ctx, &n);
-
- if (n.length == 0) {
- break;
- }
-
- rc = nxt_go_ctx_read_str(ctx, &v);
- nxt_go_request_add_header(ctx->go_request, nxt_go_str(&n),
- nxt_go_str(&v));
- } while(1);
-
- nxt_go_ctx_read_size(ctx, &s);
- ctx->request.body.preread_size = s;
-
- if (h->parsed_content_length > 0) {
- nxt_go_request_set_content_length(ctx->go_request,
- h->parsed_content_length);
- }
-
- if (ctx->request.body.preread_size < h->parsed_content_length) {
- nxt_go_warn("preread_size < content_length");
- }
-
- return ctx->go_request;
-}
-
-nxt_go_request_t
-nxt_go_port_on_read(void *buf, size_t buf_size, void *oob, size_t oob_size)
-{
- void *buf_end;
- void *payload;
- size_t payload_size;
- nxt_fd_t fd;
- struct cmsghdr *cm;
- nxt_port_msg_t *port_msg;
- nxt_port_msg_new_port_t *new_port_msg;
-
- fd = -1;
- nxt_go_debug("on read: %d (%d)", (int) buf_size, (int) oob_size);
-
- cm = oob;
- if (oob_size >= CMSG_SPACE(sizeof(int))
- && cm->cmsg_len == CMSG_LEN(sizeof(int))
- && cm->cmsg_level == SOL_SOCKET
- && cm->cmsg_type == SCM_RIGHTS) {
-
- nxt_memcpy(&fd, CMSG_DATA(cm), sizeof(int));
- nxt_go_debug("fd = %d", fd);
- }
-
- port_msg = buf;
- if (buf_size < sizeof(nxt_port_msg_t)) {
- nxt_go_warn("message too small (%d bytes)", (int) buf_size);
- goto fail;
- }
-
- buf_end = ((char *) buf) + buf_size;
-
- payload = port_msg + 1;
- payload_size = buf_size - sizeof(nxt_port_msg_t);
-
- if (port_msg->mmap) {
- nxt_go_debug("using data in shared memory");
- }
-
- if (port_msg->type >= NXT_PORT_MSG_MAX) {
- nxt_go_warn("unknown message type (%d)", (int) port_msg->type);
- goto fail;
- }
-
- switch (port_msg->type) {
- case _NXT_PORT_MSG_QUIT:
- nxt_go_debug("quit");
-
- nxt_go_set_quit();
- break;
-
- case _NXT_PORT_MSG_NEW_PORT:
- nxt_go_debug("new port");
- new_port_msg = payload;
-
- nxt_go_new_port(new_port_msg->pid, new_port_msg->id, new_port_msg->type,
- -1, fd);
- break;
-
- case _NXT_PORT_MSG_CHANGE_FILE:
- nxt_go_debug("change file");
- break;
-
- case _NXT_PORT_MSG_MMAP:
- nxt_go_debug("mmap");
-
- nxt_go_new_incoming_mmap(port_msg->pid, fd);
- break;
-
- case _NXT_PORT_MSG_DATA:
- nxt_go_debug("data");
-
- return nxt_go_data_handler(port_msg, buf_size);
-
- case _NXT_PORT_MSG_REMOVE_PID:
- nxt_go_debug("remove pid");
-
- /* TODO remove all ports for this pid in Go */
- /* TODO remove incoming & outgoing mmaps for this pid */
- break;
-
- default:
- goto fail;
- }
-
-
-fail:
-
- if (fd != -1) {
- close(fd);
- }
-
- return 0;
-}
diff --git a/src/go/unit/nxt_go_port.h b/src/go/unit/nxt_go_port.h
deleted file mode 100644
index ce9dbcc3..00000000
--- a/src/go/unit/nxt_go_port.h
+++ /dev/null
@@ -1,18 +0,0 @@
-
-/*
- * Copyright (C) Max Romanov
- * Copyright (C) NGINX, Inc.
- */
-
-#ifndef _NXT_GO_PORT_H_INCLUDED_
-#define _NXT_GO_PORT_H_INCLUDED_
-
-
-#include <sys/types.h>
-#include "nxt_go_lib.h"
-
-nxt_go_request_t
-nxt_go_port_on_read(void *buf, size_t buf_size, void *oob, size_t oob_size);
-
-
-#endif /* _NXT_GO_PORT_H_INCLUDED_ */
diff --git a/src/go/unit/nxt_go_port_memory.c b/src/go/unit/nxt_go_port_memory.c
deleted file mode 100644
index 85b46bed..00000000
--- a/src/go/unit/nxt_go_port_memory.c
+++ /dev/null
@@ -1,217 +0,0 @@
-
-/*
- * Copyright (C) Max Romanov
- * Copyright (C) NGINX, Inc.
- */
-
-#include "nxt_go_port_memory.h"
-#include "nxt_go_process.h"
-#include "nxt_go_array.h"
-#include "nxt_go_log.h"
-
-#include "_cgo_export.h"
-
-#include <nxt_main.h>
-
-#if (NXT_HAVE_MEMFD_CREATE)
-
-#include <linux/memfd.h>
-#include <unistd.h>
-#include <sys/syscall.h>
-
-#endif
-
-
-static nxt_port_mmap_header_t *
-nxt_go_new_port_mmap(nxt_go_process_t *process, nxt_port_id_t id,
- nxt_bool_t tracking)
-{
- int name_len, rc;
- void *mem;
- char name[64];
- nxt_fd_t fd;
- nxt_free_map_t *free_map;
- nxt_port_msg_t port_msg;
- nxt_go_port_mmap_t *port_mmap;
- nxt_port_mmap_header_t *hdr;
-
- fd = -1;
-
- union {
- struct cmsghdr cm;
- char space[CMSG_SPACE(sizeof(int))];
- } cmsg;
-
- port_mmap = nxt_go_array_zero_add(&process->outgoing);
- if (nxt_slow_path(port_mmap == NULL)) {
- nxt_go_warn("failed to add port mmap to outgoing array");
-
- return NULL;
- }
-
- name_len = snprintf(name, nxt_length(name),
- NXT_SHM_PREFIX "unit.go.%p", name);
-
-#if (NXT_HAVE_MEMFD_CREATE)
-
- fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC);
-
- if (nxt_slow_path(fd == -1)) {
- nxt_go_warn("memfd_create(%s) failed %d", name, errno);
-
- goto remove_fail;
- }
-
- nxt_go_debug("memfd_create(%s): %d", name, fd);
-
-#elif (NXT_HAVE_SHM_OPEN_ANON)
-
- fd = shm_open(SHM_ANON, O_RDWR, S_IRUSR | S_IWUSR);
-
- nxt_go_debug("shm_open(SHM_ANON): %d", fd);
-
- if (nxt_slow_path(fd == -1)) {
- nxt_go_warn("shm_open(SHM_ANON) failed %d", errno);
-
- goto remove_fail;
- }
-
-#elif (NXT_HAVE_SHM_OPEN)
-
- /* Just in case. */
- shm_unlink((char *) name);
-
- fd = shm_open((char *) name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR);
-
- nxt_go_debug("shm_open(%s): %d", name, fd);
-
- if (nxt_slow_path(fd == -1)) {
- nxt_go_warn("shm_open(%s) failed %d", name, errno);
-
- goto remove_fail;
- }
-
- if (nxt_slow_path(shm_unlink((char *) name) == -1)) {
- nxt_go_warn("shm_unlink(%s) failed %d", name, errno);
- }
-
-#endif
-
- if (nxt_slow_path(ftruncate(fd, PORT_MMAP_SIZE) == -1)) {
- nxt_go_warn("ftruncate() failed %d", errno);
-
- goto remove_fail;
- }
-
- mem = mmap(NULL, PORT_MMAP_SIZE,
- PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
-
- if (nxt_slow_path(mem == MAP_FAILED)) {
- goto remove_fail;
- }
-
- port_mmap->hdr = mem;
-
- /* Init segment header. */
- hdr = port_mmap->hdr;
-
- memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map));
- memset(hdr->free_tracking_map, 0xFFU, sizeof(hdr->free_tracking_map));
-
- hdr->id = process->outgoing.nelts - 1;
- hdr->src_pid = getpid();
- hdr->dst_pid = process->pid;
- hdr->sent_over = id;
-
- /* Mark first chunk as busy */
- free_map = tracking ? hdr->free_tracking_map : hdr->free_map;
-
- nxt_port_mmap_set_chunk_busy(free_map, 0);
-
- /* Mark as busy chunk followed the last available chunk. */
- nxt_port_mmap_set_chunk_busy(hdr->free_map, PORT_MMAP_CHUNK_COUNT);
- nxt_port_mmap_set_chunk_busy(hdr->free_tracking_map, PORT_MMAP_CHUNK_COUNT);
-
- port_msg.stream = 0;
- port_msg.pid = getpid();
- port_msg.reply_port = 0;
- port_msg.type = _NXT_PORT_MSG_MMAP;
- port_msg.last = 1;
- port_msg.mmap = 0;
- port_msg.nf = 0;
- port_msg.mf = 0;
- port_msg.tracking = 0;
-
- cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int));
- cmsg.cm.cmsg_level = SOL_SOCKET;
- cmsg.cm.cmsg_type = SCM_RIGHTS;
-
- /*
- * nxt_memcpy() is used instead of simple
- * *(int *) CMSG_DATA(&cmsg.cm) = fd;
- * because GCC 4.4 with -O2/3/s optimization may issue a warning:
- * dereferencing type-punned pointer will break strict-aliasing rules
- *
- * Fortunately, GCC with -O1 compiles this nxt_memcpy()
- * in the same simple assignment as in the code above.
- */
- memcpy(CMSG_DATA(&cmsg.cm), &fd, sizeof(int));
-
- rc = nxt_go_port_send(hdr->dst_pid, id, &port_msg, sizeof(port_msg),
- &cmsg, sizeof(cmsg));
-
- nxt_go_debug("new mmap #%d created for %d -> %d",
- (int) hdr->id, (int) getpid(), (int) process->pid);
-
- close(fd);
-
- return hdr;
-
-remove_fail:
-
- if (fd != -1) {
- close(fd);
- }
-
- process->outgoing.nelts--;
-
- return NULL;
-}
-
-nxt_port_mmap_header_t *
-nxt_go_port_mmap_get(nxt_go_process_t *process, nxt_port_id_t port_id,
- nxt_chunk_id_t *c, nxt_bool_t tracking)
-{
- nxt_free_map_t *free_map;
- nxt_go_port_mmap_t *port_mmap;
- nxt_go_port_mmap_t *end_port_mmap;
- nxt_port_mmap_header_t *hdr;
-
- nxt_go_mutex_lock(&process->outgoing_mutex);
-
- port_mmap = process->outgoing.elts;
- end_port_mmap = port_mmap + process->outgoing.nelts;
-
- for (; port_mmap < end_port_mmap; port_mmap++)
- {
- hdr = port_mmap->hdr;
-
- if (hdr->sent_over != 0xFFFFu && hdr->sent_over != port_id) {
- continue;
- }
-
- free_map = tracking ? hdr->free_tracking_map : hdr->free_map;
-
- if (nxt_port_mmap_get_free_chunk(free_map, c)) {
- goto unlock_return;
- }
- }
-
- hdr = nxt_go_new_port_mmap(process, port_id, tracking);
-
-unlock_return:
-
- nxt_go_mutex_unlock(&process->outgoing_mutex);
-
- return hdr;
-}
diff --git a/src/go/unit/nxt_go_port_memory.h b/src/go/unit/nxt_go_port_memory.h
deleted file mode 100644
index 558eb68f..00000000
--- a/src/go/unit/nxt_go_port_memory.h
+++ /dev/null
@@ -1,30 +0,0 @@
-
-/*
- * Copyright (C) Max Romanov
- * Copyright (C) NGINX, Inc.
- */
-
-#ifndef _NXT_GO_PORT_MEMORY_H_INCLUDED_
-#define _NXT_GO_PORT_MEMORY_H_INCLUDED_
-
-
-#include <nxt_main.h>
-#include <nxt_port_memory_int.h>
-
-#ifndef _NXT_GO_PROCESS_T_DEFINED_
-#define _NXT_GO_PROCESS_T_DEFINED_
-typedef struct nxt_go_process_s nxt_go_process_t;
-#endif
-
-typedef struct nxt_go_port_mmap_s nxt_go_port_mmap_t;
-
-struct nxt_go_port_mmap_s {
- nxt_port_mmap_header_t *hdr;
-};
-
-struct nxt_port_mmap_header_s *
-nxt_go_port_mmap_get(nxt_go_process_t *process, nxt_port_id_t port_id,
- nxt_chunk_id_t *c, nxt_bool_t tracking);
-
-
-#endif /* _NXT_GO_PORT_MEMORY_H_INCLUDED_ */
diff --git a/src/go/unit/nxt_go_process.c b/src/go/unit/nxt_go_process.c
deleted file mode 100644
index bb2d279c..00000000
--- a/src/go/unit/nxt_go_process.c
+++ /dev/null
@@ -1,148 +0,0 @@
-
-/*
- * Copyright (C) Max Romanov
- * Copyright (C) NGINX, Inc.
- */
-
-#include "nxt_go_process.h"
-#include "nxt_go_array.h"
-#include "nxt_go_mutex.h"
-#include "nxt_go_log.h"
-#include "nxt_go_port_memory.h"
-
-#include <nxt_port_memory_int.h>
-
-
-static nxt_array_t processes; /* of nxt_go_process_t */
-
-static nxt_go_process_t *
-nxt_go_find_process(nxt_pid_t pid, uint32_t *pos)
-{
- uint32_t l, r, i;
- nxt_go_process_t *process;
-
- if (nxt_slow_path(processes.size == 0)) {
- nxt_go_array_init(&processes, 1, sizeof(nxt_go_process_t));
- }
-
- l = 0;
- r = processes.nelts;
- i = (l + r) / 2;
-
- while (r > l) {
- process = nxt_go_array_at(&processes, i);
-
- nxt_go_debug("compare process #%d (%p) at %d",
- (int) process->pid, process, (int) i);
-
- if (pid == process->pid) {
- nxt_go_debug("found process %d at %d", (int) pid, (int) i);
-
- if (pos != NULL) {
- *pos = i;
- }
-
- return process;
- }
-
- if (pid < process->pid) {
- r = i;
-
- } else {
- l = i + 1;
- }
-
- i = (l + r) / 2;
- }
-
- if (pos != NULL) {
- *pos = i;
- }
-
- nxt_go_debug("process %d not found, best pos %d", (int) pid, (int) i);
-
- return NULL;
-}
-
-
-nxt_go_process_t *
-nxt_go_get_process(nxt_pid_t pid)
-{
- uint32_t pos;
- nxt_go_process_t *process;
-
- process = nxt_go_find_process(pid, &pos);
-
- if (process == NULL) {
- nxt_go_array_add(&processes);
- process = nxt_go_array_at(&processes, pos);
-
- nxt_go_debug("init process #%d (%p) at %d",
- (int) pid, process, (int) pos);
-
- if (pos < processes.nelts - 1) {
- memmove(process + 1, process,
- processes.size * (processes.nelts - 1 - pos));
- }
-
- process->pid = pid;
- nxt_go_mutex_create(&process->incoming_mutex);
- nxt_go_array_init(&process->incoming, 1, sizeof(nxt_go_port_mmap_t));
- nxt_go_mutex_create(&process->outgoing_mutex);
- nxt_go_array_init(&process->outgoing, 1, sizeof(nxt_go_port_mmap_t));
- }
-
- return process;
-}
-
-
-void
-nxt_go_new_incoming_mmap(nxt_pid_t pid, nxt_fd_t fd)
-{
- void *mem;
- struct stat mmap_stat;
- nxt_go_process_t *process;
- nxt_go_port_mmap_t *port_mmap;
-
- process = nxt_go_get_process(pid);
-
- nxt_go_debug("got new mmap fd #%d from process %d",
- (int) fd, (int) pid);
-
- if (fstat(fd, &mmap_stat) == -1) {
- nxt_go_warn("fstat(%d) failed %d", (int) fd, errno);
-
- return;
- }
-
- nxt_go_mutex_lock(&process->incoming_mutex);
-
- port_mmap = nxt_go_array_zero_add(&process->incoming);
- if (nxt_slow_path(port_mmap == NULL)) {
- nxt_go_warn("failed to add mmap to incoming array");
-
- goto fail;
- }
-
- mem = mmap(NULL, mmap_stat.st_size,
- PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
-
- if (nxt_slow_path(mem == MAP_FAILED)) {
- nxt_go_warn("mmap() failed %d", errno);
-
- goto fail;
- }
-
- port_mmap->hdr = mem;
-
- if (nxt_slow_path(port_mmap->hdr->id != process->incoming.nelts - 1)) {
- nxt_go_warn("port mmap id mismatch (%d != %d)",
- port_mmap->hdr->id, process->incoming.nelts - 1);
- }
-
- port_mmap->hdr->sent_over = 0xFFFFu;
-
-fail:
-
- nxt_go_mutex_unlock(&process->incoming_mutex);
-}
diff --git a/src/go/unit/nxt_go_process.h b/src/go/unit/nxt_go_process.h
deleted file mode 100644
index 197de1e9..00000000
--- a/src/go/unit/nxt_go_process.h
+++ /dev/null
@@ -1,33 +0,0 @@
-
-/*
- * Copyright (C) Max Romanov
- * Copyright (C) NGINX, Inc.
- */
-
-#ifndef _NXT_GO_PROCESS_H_INCLUDED_
-#define _NXT_GO_PROCESS_H_INCLUDED_
-
-
-#include <nxt_main.h>
-#include "nxt_go_mutex.h"
-
-#ifndef _NXT_GO_PROCESS_T_DEFINED_
-#define _NXT_GO_PROCESS_T_DEFINED_
-typedef struct nxt_go_process_s nxt_go_process_t;
-#endif
-
-struct nxt_go_process_s {
- nxt_pid_t pid;
- nxt_go_mutex_t incoming_mutex;
- nxt_array_t incoming; /* of nxt_go_port_mmap_t */
- nxt_go_mutex_t outgoing_mutex;
- nxt_array_t outgoing; /* of nxt_go_port_mmap_t */
-};
-
-nxt_go_process_t *nxt_go_get_process(nxt_pid_t pid);
-
-void nxt_go_new_incoming_mmap(nxt_pid_t pid, nxt_fd_t fd);
-
-
-#endif /* _NXT_GO_PROCESS_H_INCLUDED_ */
-
diff --git a/src/go/unit/nxt_go_run_ctx.c b/src/go/unit/nxt_go_run_ctx.c
deleted file mode 100644
index 6aa5db77..00000000
--- a/src/go/unit/nxt_go_run_ctx.c
+++ /dev/null
@@ -1,554 +0,0 @@
-
-/*
- * Copyright (C) Max Romanov
- * Copyright (C) NGINX, Inc.
- */
-
-#include "nxt_go_run_ctx.h"
-#include "nxt_go_log.h"
-#include "nxt_go_process.h"
-#include "nxt_go_array.h"
-#include "nxt_go_mutex.h"
-#include "nxt_go_port_memory.h"
-
-#include "_cgo_export.h"
-
-#include <nxt_port_memory_int.h>
-#include <nxt_main.h>
-
-
-static nxt_int_t
-nxt_go_ctx_msg_rbuf(nxt_go_run_ctx_t *ctx, nxt_go_msg_t *msg, nxt_buf_t *buf,
- uint32_t n)
-{
- size_t nchunks;
- nxt_go_port_mmap_t *port_mmap;
- nxt_port_mmap_msg_t *mmap_msg;
-
- if (nxt_slow_path(msg->mmap_msg == NULL)) {
- if (n > 0) {
- nxt_go_warn("failed to get plain buf #%d", (int) n);
-
- return NXT_ERROR;
- }
-
- buf->mem.start = (u_char *) (msg->port_msg + 1);
- buf->mem.pos = buf->mem.start;
- buf->mem.end = buf->mem.start + msg->raw_size;
- buf->mem.free = buf->mem.end;
-
- return NXT_OK;
- }
-
- mmap_msg = msg->mmap_msg + n;
- if (nxt_slow_path(mmap_msg >= msg->end)) {
- nxt_go_warn("no more data in shm #%d", (int) n);
-
- return NXT_ERROR;
- }
-
- if (nxt_slow_path(mmap_msg->mmap_id >= ctx->process->incoming.nelts)) {
- nxt_go_warn("incoming shared memory segment #%d not found "
- "for process %d", (int) mmap_msg->mmap_id,
- (int) msg->port_msg->pid);
-
- return NXT_ERROR;
- }
-
- nxt_go_mutex_lock(&ctx->process->incoming_mutex);
-
- port_mmap = nxt_go_array_at(&ctx->process->incoming, mmap_msg->mmap_id);
- buf->mem.start = nxt_port_mmap_chunk_start(port_mmap->hdr,
- mmap_msg->chunk_id);
- buf->mem.pos = buf->mem.start;
- buf->mem.free = buf->mem.start + mmap_msg->size;
-
- nxt_go_mutex_unlock(&ctx->process->incoming_mutex);
-
- nchunks = mmap_msg->size / PORT_MMAP_CHUNK_SIZE;
- if ((mmap_msg->size % PORT_MMAP_CHUNK_SIZE) != 0) {
- nchunks++;
- }
-
- buf->mem.end = buf->mem.start + nchunks * PORT_MMAP_CHUNK_SIZE;
-
- return NXT_OK;
-}
-
-static nxt_int_t
-nxt_go_ctx_init_rbuf(nxt_go_run_ctx_t *ctx)
-{
- return nxt_go_ctx_msg_rbuf(ctx, &ctx->msg, &ctx->rbuf, ctx->nrbuf);
-}
-
-static void
-nxt_go_ctx_init_msg(nxt_go_msg_t *msg, nxt_port_msg_t *port_msg,
- size_t payload_size)
-{
- void *data, *end;
- nxt_port_mmap_msg_t *mmap_msg;
-
- memset(msg, 0, sizeof(nxt_go_msg_t));
-
- msg->port_msg = port_msg;
- msg->raw_size = payload_size;
-
- data = port_msg + 1;
- end = nxt_pointer_to(data, payload_size);
-
- if (port_msg->tracking) {
- msg->tracking = data;
- data = msg->tracking + 1;
- }
-
- if (nxt_fast_path(port_msg->mmap != 0)) {
- msg->mmap_msg = data;
- msg->end = end;
-
- mmap_msg = msg->mmap_msg;
- while(mmap_msg < msg->end) {
- msg->data_size += mmap_msg->size;
- mmap_msg += 1;
- }
-
- } else {
- msg->mmap_msg = NULL;
- msg->end = NULL;
- msg->data_size = payload_size;
- }
-}
-
-void
-nxt_go_ctx_release_msg(nxt_go_run_ctx_t *ctx, nxt_go_msg_t *msg)
-{
- u_char *b, *e;
- nxt_chunk_id_t c;
- nxt_go_port_mmap_t *port_mmap;
- nxt_port_mmap_msg_t *mmap_msg, *end;
-
- if (nxt_slow_path(msg->mmap_msg == NULL)) {
- return;
- }
-
- mmap_msg = msg->mmap_msg;
- end = msg->end;
-
- nxt_go_mutex_lock(&ctx->process->incoming_mutex);
-
- for (; mmap_msg < end; mmap_msg++ ) {
- port_mmap = nxt_go_array_at(&ctx->process->incoming, mmap_msg->mmap_id);
-
- c = mmap_msg->chunk_id;
- b = nxt_port_mmap_chunk_start(port_mmap->hdr, c);
- e = b + mmap_msg->size;
-
- while (b < e) {
- nxt_port_mmap_set_chunk_free(port_mmap->hdr->free_map, c);
-
- b += PORT_MMAP_CHUNK_SIZE;
- c++;
- }
- }
-
- nxt_go_mutex_unlock(&ctx->process->incoming_mutex);
-}
-
-
-nxt_int_t
-nxt_go_ctx_init(nxt_go_run_ctx_t *ctx, nxt_port_msg_t *port_msg,
- size_t payload_size)
-{
- nxt_atomic_t *val;
- nxt_go_port_mmap_t *port_mmap;
- nxt_port_mmap_tracking_msg_t *tracking;
-
- memset(ctx, 0, sizeof(nxt_go_run_ctx_t));
-
- ctx->process = nxt_go_get_process(port_msg->pid);
- if (nxt_slow_path(ctx->process == NULL)) {
- nxt_go_warn("failed to get process %d", port_msg->pid);
-
- return NXT_ERROR;
- }
-
- nxt_go_ctx_init_msg(&ctx->msg, port_msg, payload_size);
-
- if (ctx->msg.tracking != NULL) {
- tracking = ctx->msg.tracking;
-
- if (nxt_slow_path(tracking->mmap_id >= ctx->process->incoming.nelts)) {
- nxt_go_warn("incoming shared memory segment #%d not found "
- "for process %d", (int) tracking->mmap_id,
- (int) port_msg->pid);
-
- return NXT_ERROR;
- }
-
- nxt_go_mutex_lock(&ctx->process->incoming_mutex);
-
- port_mmap = nxt_go_array_at(&ctx->process->incoming, tracking->mmap_id);
-
- nxt_go_mutex_unlock(&ctx->process->incoming_mutex);
-
- val = port_mmap->hdr->tracking + tracking->tracking_id;
-
- ctx->cancelled = nxt_atomic_cmp_set(val, port_msg->stream, 0) == 0;
-
- if (ctx->cancelled) {
- nxt_port_mmap_set_chunk_free(port_mmap->hdr->free_tracking_map,
- tracking->tracking_id);
-
- return NXT_OK;
- }
- }
-
- ctx->msg_last = &ctx->msg;
-
- ctx->wport_msg.stream = port_msg->stream;
- ctx->wport_msg.pid = getpid();
- ctx->wport_msg.type = _NXT_PORT_MSG_DATA;
- ctx->wport_msg.mmap = 1;
-
- ctx->wmmap_msg = (nxt_port_mmap_msg_t *) ( &ctx->wport_msg + 1 );
-
- return nxt_go_ctx_init_rbuf(ctx);
-}
-
-
-nxt_int_t
-nxt_go_ctx_flush(nxt_go_run_ctx_t *ctx, int last)
-{
- int i;
- nxt_int_t rc;
-
- if (last != 0) {
- ctx->wport_msg.last = 1;
- }
-
- nxt_go_debug("flush buffers (%d)", last);
-
- for (i = 0; i < ctx->nwbuf; i++) {
- nxt_port_mmap_msg_t *m = ctx->wmmap_msg + i;
-
- nxt_go_debug(" mmap_msg[%d]={%d, %d, %d}", i,
- m->mmap_id, m->chunk_id, m->size);
- }
-
- rc = nxt_go_port_send(ctx->msg.port_msg->pid, ctx->msg.port_msg->reply_port,
- &ctx->wport_msg, sizeof(nxt_port_msg_t) +
- ctx->nwbuf * sizeof(nxt_port_mmap_msg_t), NULL, 0);
-
- nxt_go_debug(" port send res = %d", rc);
-
- ctx->nwbuf = 0;
-
- memset(&ctx->wbuf, 0, sizeof(ctx->wbuf));
-
- return rc;
-}
-
-
-nxt_buf_t *
-nxt_go_port_mmap_get_buf(nxt_go_run_ctx_t *ctx, size_t size)
-{
- size_t nchunks;
- nxt_buf_t *buf;
- nxt_chunk_id_t c;
- nxt_go_port_mmap_t *port_mmap;
- nxt_port_mmap_msg_t *mmap_msg;
- nxt_port_mmap_header_t *hdr;
-
- c = 0;
-
- buf = &ctx->wbuf;
-
- hdr = nxt_go_port_mmap_get(ctx->process, ctx->msg.port_msg->reply_port, &c,
- 0);
- if (nxt_slow_path(hdr == NULL)) {
- nxt_go_warn("failed to get port_mmap");
-
- return NULL;
- }
-
- buf->mem.start = nxt_port_mmap_chunk_start(hdr, c);
- buf->mem.pos = buf->mem.start;
- buf->mem.free = buf->mem.start;
- buf->mem.end = buf->mem.start + PORT_MMAP_CHUNK_SIZE;
-
- buf->parent = hdr;
-
- mmap_msg = ctx->wmmap_msg + ctx->nwbuf;
- mmap_msg->mmap_id = hdr->id;
- mmap_msg->chunk_id = c;
- mmap_msg->size = 0;
-
- nchunks = size / PORT_MMAP_CHUNK_SIZE;
- if ((size % PORT_MMAP_CHUNK_SIZE) != 0 || nchunks == 0) {
- nchunks++;
- }
-
- c++;
- nchunks--;
-
- /* Try to acquire as much chunks as required. */
- while (nchunks > 0) {
-
- if (nxt_port_mmap_chk_set_chunk_busy(hdr->free_map, c) == 0) {
- break;
- }
-
- buf->mem.end += PORT_MMAP_CHUNK_SIZE;
- c++;
- nchunks--;
- }
-
- ctx->nwbuf++;
-
- return buf;
-}
-
-
-nxt_int_t
-nxt_go_port_mmap_increase_buf(nxt_buf_t *b, size_t size, size_t min_size)
-{
- size_t nchunks, free_size;
- nxt_chunk_id_t c, start;
- nxt_port_mmap_header_t *hdr;
-
- free_size = nxt_buf_mem_free_size(&b->mem);
-
- if (nxt_slow_path(size <= free_size)) {
- return NXT_OK;
- }
-
- hdr = b->parent;
-
- start = nxt_port_mmap_chunk_id(hdr, b->mem.end);
-
- size -= free_size;
-
- nchunks = size / PORT_MMAP_CHUNK_SIZE;
- if ((size % PORT_MMAP_CHUNK_SIZE) != 0 || nchunks == 0) {
- nchunks++;
- }
-
- c = start;
-
- /* Try to acquire as much chunks as required. */
- while (nchunks > 0) {
-
- if (nxt_port_mmap_chk_set_chunk_busy(hdr->free_map, c) == 0) {
- break;
- }
-
- c++;
- nchunks--;
- }
-
- if (nchunks != 0
- && min_size > free_size + PORT_MMAP_CHUNK_SIZE * (c - start))
- {
- c--;
- while (c >= start) {
- nxt_port_mmap_set_chunk_free(hdr->free_map, c);
- c--;
- }
-
- return NXT_ERROR;
-
- } else {
- b->mem.end += PORT_MMAP_CHUNK_SIZE * (c - start);
-
- return NXT_OK;
- }
-}
-
-
-nxt_int_t
-nxt_go_ctx_write(nxt_go_run_ctx_t *ctx, void *data, size_t len)
-{
- size_t free_size, copy_size;
- nxt_buf_t *buf;
- nxt_port_mmap_msg_t *mmap_msg;
-
- buf = &ctx->wbuf;
-
- while (len > 0) {
- if (ctx->nwbuf == 0) {
- buf = nxt_go_port_mmap_get_buf(ctx, len);
-
- if (nxt_slow_path(buf == NULL)) {
- return NXT_ERROR;
- }
- }
-
- do {
- free_size = nxt_buf_mem_free_size(&buf->mem);
-
- if (free_size > 0) {
- copy_size = nxt_min(free_size, len);
-
- buf->mem.free = nxt_cpymem(buf->mem.free, data, copy_size);
-
- mmap_msg = ctx->wmmap_msg + ctx->nwbuf - 1;
- mmap_msg->size += copy_size;
-
- len -= copy_size;
- data = nxt_pointer_to(data, copy_size);
-
- if (len == 0) {
- return NXT_OK;
- }
- }
-
- } while (nxt_go_port_mmap_increase_buf(buf, len, 1) == NXT_OK);
-
- if (ctx->nwbuf >= 8) {
- nxt_go_ctx_flush(ctx, 0);
- }
-
- buf = nxt_go_port_mmap_get_buf(ctx, len);
-
- if (nxt_slow_path(buf == NULL)) {
- return NXT_ERROR;
- }
- }
-
- return NXT_OK;
-}
-
-
-static nxt_int_t
-nxt_go_ctx_read_size_(nxt_go_run_ctx_t *ctx, size_t *size)
-{
- nxt_buf_t *buf;
- nxt_int_t rc;
-
- do {
- buf = &ctx->rbuf;
-
- if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) < 1)) {
- if (nxt_fast_path(nxt_buf_mem_used_size(&buf->mem) == 0)) {
-
- ctx->nrbuf++;
- rc = nxt_go_ctx_init_rbuf(ctx);
- if (nxt_slow_path(rc != NXT_OK)) {
- nxt_go_warn("read size: init rbuf failed");
- return rc;
- }
-
- continue;
- }
- nxt_go_warn("read size: used size is not 0");
- return NXT_ERROR;
- }
-
- if (buf->mem.pos[0] >= 128) {
- if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) < 4)) {
- nxt_go_warn("read size: used size < 4");
- return NXT_ERROR;
- }
- }
-
- break;
- } while (1);
-
- buf->mem.pos = nxt_app_msg_read_length(buf->mem.pos, size);
-
- return NXT_OK;
-}
-
-
-nxt_int_t
-nxt_go_ctx_read_size(nxt_go_run_ctx_t *ctx, size_t *size)
-{
- nxt_int_t rc;
-
- rc = nxt_go_ctx_read_size_(ctx, size);
-
- if (nxt_fast_path(rc == NXT_OK)) {
- nxt_go_debug("read_size: %d", (int)*size);
- }
-
- return rc;
-}
-
-
-nxt_int_t
-nxt_go_ctx_read_str(nxt_go_run_ctx_t *ctx, nxt_str_t *str)
-{
- size_t length;
- nxt_int_t rc;
- nxt_buf_t *buf;
-
- rc = nxt_go_ctx_read_size_(ctx, &length);
- if (nxt_slow_path(rc != NXT_OK)) {
- nxt_go_warn("read str: read size failed");
- return rc;
- }
-
- buf = &ctx->rbuf;
-
- if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) < (intptr_t) length)) {
- nxt_go_warn("read str: used size too small %d < %d",
- (int) nxt_buf_mem_used_size(&buf->mem), (int) length);
- return NXT_ERROR;
- }
-
- if (length > 0) {
- str->start = buf->mem.pos;
- str->length = length - 1;
-
- buf->mem.pos += length;
-
- nxt_go_debug("read_str: %d %.*s",
- (int) length - 1, (int) length - 1, str->start);
-
- } else {
- str->start = NULL;
- str->length = 0;
-
- nxt_go_debug("read_str: NULL");
- }
-
- return NXT_OK;
-}
-
-
-size_t
-nxt_go_ctx_read_raw(nxt_go_run_ctx_t *ctx, void *dst, size_t size)
-{
- size_t res, read_size;
- nxt_int_t rc;
- nxt_buf_t *buf;
-
- res = 0;
-
- while (size > 0) {
- buf = &ctx->rbuf;
-
- if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) == 0)) {
- ctx->nrbuf++;
- rc = nxt_go_ctx_init_rbuf(ctx);
- if (nxt_slow_path(rc != NXT_OK)) {
- nxt_go_warn("read raw: init rbuf failed");
- return res;
- }
-
- continue;
- }
-
- read_size = nxt_buf_mem_used_size(&buf->mem);
- read_size = nxt_min(read_size, size);
-
- dst = nxt_cpymem(dst, buf->mem.pos, read_size);
-
- size -= read_size;
- buf->mem.pos += read_size;
- res += read_size;
- }
-
- nxt_go_debug("read_raw: %d", (int) res);
-
- return res;
-}
diff --git a/src/go/unit/nxt_go_run_ctx.h b/src/go/unit/nxt_go_run_ctx.h
deleted file mode 100644
index 42163295..00000000
--- a/src/go/unit/nxt_go_run_ctx.h
+++ /dev/null
@@ -1,78 +0,0 @@
-
-/*
- * Copyright (C) Max Romanov
- * Copyright (C) NGINX, Inc.
- */
-
-#ifndef _NXT_GO_RUN_CTX_H_INCLUDED_
-#define _NXT_GO_RUN_CTX_H_INCLUDED_
-
-
-#include <nxt_main.h>
-#include <nxt_router.h>
-#include <nxt_port_memory_int.h>
-
-#ifndef _NXT_GO_PROCESS_T_DEFINED_
-#define _NXT_GO_PROCESS_T_DEFINED_
-typedef struct nxt_go_process_s nxt_go_process_t;
-#endif
-
-typedef struct nxt_go_msg_s nxt_go_msg_t;
-
-struct nxt_go_msg_s {
- off_t start_offset;
-
- nxt_port_msg_t *port_msg;
- size_t raw_size;
- size_t data_size;
-
- nxt_port_mmap_msg_t *mmap_msg;
- nxt_port_mmap_msg_t *end;
-
- nxt_port_mmap_tracking_msg_t *tracking;
-
- nxt_go_msg_t *next;
-};
-
-
-typedef struct {
- nxt_go_msg_t msg;
-
- nxt_go_process_t *process;
- nxt_port_mmap_msg_t *wmmap_msg;
- nxt_bool_t cancelled;
-
- uint32_t nrbuf;
- nxt_buf_t rbuf;
-
- uint32_t nwbuf;
- nxt_buf_t wbuf;
- nxt_port_msg_t wport_msg;
- char wmmap_msg_buf[ sizeof(nxt_port_mmap_msg_t) * 8 ];
-
- nxt_app_request_t request;
- uintptr_t go_request;
-
- nxt_go_msg_t *msg_last;
-
- nxt_port_msg_t port_msg[];
-} nxt_go_run_ctx_t;
-
-
-void nxt_go_ctx_release_msg(nxt_go_run_ctx_t *ctx, nxt_go_msg_t *msg);
-
-nxt_int_t nxt_go_ctx_init(nxt_go_run_ctx_t *ctx, nxt_port_msg_t *port_msg,
- size_t payload_size);
-
-nxt_int_t nxt_go_ctx_flush(nxt_go_run_ctx_t *ctx, int last);
-
-nxt_int_t nxt_go_ctx_write(nxt_go_run_ctx_t *ctx, void *data, size_t len);
-
-nxt_int_t nxt_go_ctx_read_size(nxt_go_run_ctx_t *ctx, size_t *size);
-
-nxt_int_t nxt_go_ctx_read_str(nxt_go_run_ctx_t *ctx, nxt_str_t *str);
-
-size_t nxt_go_ctx_read_raw(nxt_go_run_ctx_t *ctx, void *dst, size_t size);
-
-
-#endif /* _NXT_GO_RUN_CTX_H_INCLUDED_ */
diff --git a/src/go/unit/port.go b/src/go/unit/port.go
index 51733176..f716c9ec 100644
--- a/src/go/unit/port.go
+++ b/src/go/unit/port.go
@@ -6,14 +6,12 @@
package unit
/*
-#include "nxt_go_lib.h"
-#include "nxt_process_type.h"
+#include "nxt_cgo_lib.h"
*/
import "C"
import (
"net"
- "net/http"
"os"
"sync"
"unsafe"
@@ -26,7 +24,6 @@ type port_key struct {
type port struct {
key port_key
- t int
rcv *net.UnixConn
snd *net.UnixConn
}
@@ -34,7 +31,6 @@ type port struct {
type port_registry struct {
sync.RWMutex
m map[port_key]*port
- t [C.NXT_PROCESS_MAX]*port
}
var port_registry_ port_registry
@@ -47,42 +43,14 @@ func find_port(key port_key) *port {
return res
}
-func remove_by_pid(pid int) {
- port_registry_.Lock()
- if port_registry_.m != nil {
- for k, p := range port_registry_.m {
- if k.pid == pid {
- if port_registry_.t[p.t] == p {
- port_registry_.t[p.t] = nil
- }
-
- delete(port_registry_.m, k)
- }
- }
- }
-
- port_registry_.Unlock()
-}
-
-func main_port() *port {
- port_registry_.RLock()
- res := port_registry_.t[C.NXT_PROCESS_MAIN]
- port_registry_.RUnlock()
-
- return res
-}
-
func add_port(p *port) {
- nxt_go_debug("add_port: %d:%d", p.key.pid, p.key.id);
-
port_registry_.Lock()
if port_registry_.m == nil {
port_registry_.m = make(map[port_key]*port)
}
port_registry_.m[p.key] = p
- port_registry_.t[p.t] = p
port_registry_.Unlock()
}
@@ -120,14 +88,38 @@ func getUnixConn(fd int) *net.UnixConn {
return uc
}
-//export nxt_go_new_port
-func nxt_go_new_port(pid C.int, id C.int, t C.int, rcv C.int, snd C.int) {
- new_port(int(pid), int(id), int(t), int(rcv), int(snd))
+//export nxt_go_add_port
+func nxt_go_add_port(pid C.int, id C.int, rcv C.int, snd C.int) {
+ p := &port{
+ key: port_key{
+ pid: int(pid),
+ id: int(id),
+ },
+ rcv: getUnixConn(int(rcv)),
+ snd: getUnixConn(int(snd)),
+ }
+
+ add_port(p)
+}
+
+//export nxt_go_remove_port
+func nxt_go_remove_port(pid C.int, id C.int) {
+ key := port_key{
+ pid: int(pid),
+ id: int(id),
+ }
+
+ port_registry_.Lock()
+ if port_registry_.m != nil {
+ delete(port_registry_.m, key)
+ }
+
+ port_registry_.Unlock()
}
//export nxt_go_port_send
func nxt_go_port_send(pid C.int, id C.int, buf unsafe.Pointer, buf_size C.int,
- oob unsafe.Pointer, oob_size C.int) C.int {
+ oob unsafe.Pointer, oob_size C.int) C.ssize_t {
key := port_key{
pid: int(pid),
@@ -141,83 +133,38 @@ func nxt_go_port_send(pid C.int, id C.int, buf unsafe.Pointer, buf_size C.int,
return 0
}
- n, oobn, err := p.snd.WriteMsgUnix(C.GoBytes(buf, buf_size),
- C.GoBytes(oob, oob_size), nil)
+ n, oobn, err := p.snd.WriteMsgUnix(GoBytes(buf, buf_size),
+ GoBytes(oob, oob_size), nil)
if err != nil {
nxt_go_warn("write result %d (%d), %s", n, oobn, err)
}
- return C.int(n)
-
+ return C.ssize_t(n)
}
-//export nxt_go_main_send
-func nxt_go_main_send(buf unsafe.Pointer, buf_size C.int, oob unsafe.Pointer,
- oob_size C.int) C.int {
+//export nxt_go_port_recv
+func nxt_go_port_recv(pid C.int, id C.int, buf unsafe.Pointer, buf_size C.int,
+ oob unsafe.Pointer, oob_size C.int) C.ssize_t {
+
+ key := port_key{
+ pid: int(pid),
+ id: int(id),
+ }
- p := main_port()
+ p := find_port(key)
if p == nil {
+ nxt_go_warn("port %d:%d not found", pid, id)
return 0
}
- n, oobn, err := p.snd.WriteMsgUnix(C.GoBytes(buf, buf_size),
- C.GoBytes(oob, oob_size), nil)
+ n, oobn, _, _, err := p.rcv.ReadMsgUnix(GoBytes(buf, buf_size),
+ GoBytes(oob, oob_size))
if err != nil {
nxt_go_warn("write result %d (%d), %s", n, oobn, err)
}
- return C.int(n)
-}
-
-func new_port(pid int, id int, t int, rcv int, snd int) *port {
- p := &port{
- key: port_key{
- pid: pid,
- id: id,
- },
- t: t,
- rcv: getUnixConn(rcv),
- snd: getUnixConn(snd),
- }
-
- add_port(p)
-
- return p
-}
-
-func (p *port) read(handler http.Handler) error {
- var buf [16384]byte
- var oob [1024]byte
- var c_buf, c_oob cbuf
-
- n, oobn, _, _, err := p.rcv.ReadMsgUnix(buf[:], oob[:])
-
- if err != nil {
- return err
- }
-
- c_buf.init(buf[:n])
- c_oob.init(oob[:oobn])
-
- go_req := C.nxt_go_process_port_msg(c_buf.b, c_buf.s, c_oob.b, c_oob.s)
-
- if go_req == 0 {
- return nil
- }
-
- r := get_request(go_req)
-
- go func(r *request) {
- if handler == nil {
- handler = http.DefaultServeMux
- }
-
- handler.ServeHTTP(r.response(), &r.req)
- r.done()
- }(r)
-
- return nil
+ return C.ssize_t(n)
}
diff --git a/src/go/unit/request.go b/src/go/unit/request.go
index 7d99edaa..829a2c64 100644
--- a/src/go/unit/request.go
+++ b/src/go/unit/request.go
@@ -6,7 +6,7 @@
package unit
/*
-#include "nxt_go_lib.h"
+#include "nxt_cgo_lib.h"
*/
import "C"
@@ -20,15 +20,11 @@ import (
type request struct {
req http.Request
resp *response
- c_req C.nxt_go_request_t
- id C.uint32_t
+ c_req C.uintptr_t
}
func (r *request) Read(p []byte) (n int, err error) {
- c := C.size_t(len(p))
- b := C.uintptr_t(uintptr(unsafe.Pointer(&p[0])))
-
- res := C.nxt_go_request_read(r.c_req, b, c)
+ res := C.nxt_cgo_request_read(r.c_req, buf_ref(p), C.uint32_t(len(p)))
if res == 0 && len(p) > 0 {
return 0, io.EOF
@@ -38,7 +34,7 @@ func (r *request) Read(p []byte) (n int, err error) {
}
func (r *request) Close() error {
- C.nxt_go_request_close(r.c_req)
+ C.nxt_cgo_request_close(r.c_req)
return nil
}
@@ -55,16 +51,16 @@ func (r *request) done() {
if !resp.headerSent {
resp.WriteHeader(http.StatusOK)
}
- C.nxt_go_request_done(r.c_req)
+ C.nxt_cgo_request_done(r.c_req, 0)
}
-func get_request(go_req C.nxt_go_request_t) *request {
- return (*request)(unsafe.Pointer(uintptr(go_req)))
+func get_request(go_req uintptr) *request {
+ return (*request)(unsafe.Pointer(go_req))
}
-//export nxt_go_new_request
-func nxt_go_new_request(c_req C.nxt_go_request_t, id C.uint32_t,
- c_method *C.nxt_go_str_t, c_uri *C.nxt_go_str_t) uintptr {
+//export nxt_go_request_create
+func nxt_go_request_create(c_req C.uintptr_t,
+ c_method *C.nxt_cgo_str_t, c_uri *C.nxt_cgo_str_t) uintptr {
uri := C.GoStringN(c_uri.start, c_uri.length)
@@ -83,7 +79,6 @@ func nxt_go_new_request(c_req C.nxt_go_request_t, id C.uint32_t,
RequestURI: uri,
},
c_req: c_req,
- id: id,
}
r.req.Body = r
@@ -91,7 +86,7 @@ func nxt_go_new_request(c_req C.nxt_go_request_t, id C.uint32_t,
}
//export nxt_go_request_set_proto
-func nxt_go_request_set_proto(go_req C.nxt_go_request_t, proto *C.nxt_go_str_t,
+func nxt_go_request_set_proto(go_req uintptr, proto *C.nxt_cgo_str_t,
maj C.int, min C.int) {
r := get_request(go_req)
@@ -101,8 +96,8 @@ func nxt_go_request_set_proto(go_req C.nxt_go_request_t, proto *C.nxt_go_str_t,
}
//export nxt_go_request_add_header
-func nxt_go_request_add_header(go_req C.nxt_go_request_t, name *C.nxt_go_str_t,
- value *C.nxt_go_str_t) {
+func nxt_go_request_add_header(go_req uintptr, name *C.nxt_cgo_str_t,
+ value *C.nxt_cgo_str_t) {
r := get_request(go_req)
r.req.Header.Add(C.GoStringN(name.start, name.length),
@@ -110,23 +105,33 @@ func nxt_go_request_add_header(go_req C.nxt_go_request_t, name *C.nxt_go_str_t,
}
//export nxt_go_request_set_content_length
-func nxt_go_request_set_content_length(go_req C.nxt_go_request_t, l C.int64_t) {
+func nxt_go_request_set_content_length(go_req uintptr, l C.int64_t) {
get_request(go_req).req.ContentLength = int64(l)
}
//export nxt_go_request_set_host
-func nxt_go_request_set_host(go_req C.nxt_go_request_t, host *C.nxt_go_str_t) {
+func nxt_go_request_set_host(go_req uintptr, host *C.nxt_cgo_str_t) {
get_request(go_req).req.Host = C.GoStringN(host.start, host.length)
}
//export nxt_go_request_set_url
-func nxt_go_request_set_url(go_req C.nxt_go_request_t, scheme *C.char) {
+func nxt_go_request_set_url(go_req uintptr, scheme *C.char) {
get_request(go_req).req.URL.Scheme = C.GoString(scheme)
}
//export nxt_go_request_set_remote_addr
-func nxt_go_request_set_remote_addr(go_req C.nxt_go_request_t,
- addr *C.nxt_go_str_t) {
+func nxt_go_request_set_remote_addr(go_req uintptr, addr *C.nxt_cgo_str_t) {
get_request(go_req).req.RemoteAddr = C.GoStringN(addr.start, addr.length)
}
+
+//export nxt_go_request_handler
+func nxt_go_request_handler(go_req uintptr, h uintptr) {
+ r := get_request(go_req)
+ handler := *(*http.Handler)(unsafe.Pointer(h))
+
+ go func(r *request) {
+ handler.ServeHTTP(r.response(), &r.req)
+ r.done()
+ }(r)
+}
diff --git a/src/go/unit/response.go b/src/go/unit/response.go
index 801a52e5..767d66b7 100644
--- a/src/go/unit/response.go
+++ b/src/go/unit/response.go
@@ -6,12 +6,11 @@
package unit
/*
-#include "nxt_go_lib.h"
+#include "nxt_cgo_lib.h"
*/
import "C"
import (
- "fmt"
"net/http"
)
@@ -19,10 +18,10 @@ type response struct {
header http.Header
headerSent bool
req *http.Request
- c_req C.nxt_go_request_t
+ c_req C.uintptr_t
}
-func new_response(c_req C.nxt_go_request_t, req *http.Request) *response {
+func new_response(c_req C.uintptr_t, req *http.Request) *response {
resp := &response{
header: http.Header{},
req: req,
@@ -41,9 +40,7 @@ func (r *response) Write(p []byte) (n int, err error) {
r.WriteHeader(http.StatusOK)
}
- l := C.size_t(len(p))
- b := buf_ref(p)
- res := C.nxt_go_response_write(r.c_req, b, l)
+ res := C.nxt_cgo_response_write(r.c_req, buf_ref(p), C.uint32_t(len(p)))
return int(res), nil
}
@@ -54,22 +51,37 @@ func (r *response) WriteHeader(code int) {
return
}
r.headerSent = true
- fmt.Fprintf(r, "Status: %d\r\n", code)
// Set a default Content-Type
if _, hasType := r.header["Content-Type"]; !hasType {
r.header.Add("Content-Type", "text/html; charset=utf-8")
}
- r.header.Write(r)
+ fields := 0
+ fields_size := 0
- r.Write([]byte("\r\n"))
+ for k, vv := range r.header {
+ for _, v := range vv {
+ fields++
+ fields_size += len(k) + len(v)
+ }
+ }
+
+ C.nxt_cgo_response_create(r.c_req, C.int(code), C.int(fields),
+ C.uint32_t(fields_size))
+
+ for k, vv := range r.header {
+ for _, v := range vv {
+ C.nxt_cgo_response_add_field(r.c_req, str_ref(k), C.uint8_t(len(k)),
+ str_ref(v), C.uint32_t(len(v)))
+ }
+ }
+
+ C.nxt_cgo_response_send(r.c_req)
}
func (r *response) Flush() {
if !r.headerSent {
r.WriteHeader(http.StatusOK)
}
-
- C.nxt_go_response_flush(r.c_req)
}
diff --git a/src/go/unit/unit.go b/src/go/unit/unit.go
index 77be7612..06257768 100644
--- a/src/go/unit/unit.go
+++ b/src/go/unit/unit.go
@@ -6,17 +6,13 @@
package unit
/*
-#include "nxt_go_lib.h"
+#include "nxt_cgo_lib.h"
*/
import "C"
import (
- "errors"
"fmt"
"net/http"
- "os"
- "strconv"
- "strings"
"unsafe"
)
@@ -33,104 +29,73 @@ func buf_ref(buf []byte) C.uintptr_t {
return C.uintptr_t(uintptr(unsafe.Pointer(&buf[0])))
}
-func (buf *cbuf) init(b []byte) {
- buf.b = buf_ref(b)
- buf.s = C.size_t(len(b))
+type StringHeader struct {
+ Data unsafe.Pointer
+ Len int
}
-func (buf *cbuf) GoBytes() []byte {
- if buf == nil {
- var b [0]byte
- return b[:0]
- }
+func str_ref(s string) C.uintptr_t {
+ header := (*StringHeader)(unsafe.Pointer(&s))
- return C.GoBytes(unsafe.Pointer(uintptr(buf.b)), C.int(buf.s))
+ return C.uintptr_t(uintptr(unsafe.Pointer(header.Data)))
}
-var nxt_go_quit bool = false
-
-//export nxt_go_set_quit
-func nxt_go_set_quit() {
- nxt_go_quit = true
+func (buf *cbuf) init_bytes(b []byte) {
+ buf.b = buf_ref(b)
+ buf.s = C.size_t(len(b))
}
-func nxt_go_warn(format string, args ...interface{}) {
- fmt.Fprintf(os.Stderr, "[go warn] " + format + "\n", args...)
+func (buf *cbuf) init_string(s string) {
+ buf.b = str_ref(s)
+ buf.s = C.size_t(len(s))
}
-func nxt_go_debug(format string, args ...interface{}) {
- // fmt.Fprintf(os.Stderr, "[go debug] " + format + "\n", args...)
+type SliceHeader struct {
+ Data unsafe.Pointer
+ Len int
+ Cap int
}
-func ListenAndServe(addr string, handler http.Handler) error {
- var read_port *port
-
- go_ports_env := os.Getenv("NXT_GO_PORTS")
- if go_ports_env == "" {
- return http.ListenAndServe(addr, handler)
+func (buf *cbuf) GoBytes() []byte {
+ if buf == nil {
+ var b [0]byte
+ return b[:0]
}
- nxt_go_debug("NXT_GO_PORTS=%s", go_ports_env)
-
- ports := strings.Split(go_ports_env, ";")
- pid := os.Getpid()
-
- if len(ports) != 4 {
- return errors.New("Invalid NXT_GO_PORTS format")
+ bytesHeader := &SliceHeader{
+ Data: unsafe.Pointer(uintptr(buf.b)),
+ Len: int(buf.s),
+ Cap: int(buf.s),
}
- nxt_go_debug("version=%s", ports[0])
-
- builtin_version := C.GoString(C.nxt_go_version())
-
- if ports[0] != builtin_version {
- return fmt.Errorf("Versions mismatch: Unit %s, while application is built with %s",
- ports[0], builtin_version)
- }
+ return *(*[]byte)(unsafe.Pointer(bytesHeader))
+}
- stream, stream_err := strconv.Atoi(ports[1])
- if stream_err != nil {
- return stream_err
+func GoBytes(buf unsafe.Pointer, size C.int) []byte {
+ bytesHeader := &SliceHeader{
+ Data: buf,
+ Len: int(size),
+ Cap: int(size),
}
- read_port = nil
-
- for _, port_str := range ports[2:] {
- attrs := strings.Split(port_str, ",")
-
- if len(attrs) != 5 {
- return fmt.Errorf("Invalid port format: unexpected port attributes number %d, while 5 expected",
- len(attrs))
- }
-
- var attrsN [5]int
- var err error
- for i, attr := range attrs {
- attrsN[i], err = strconv.Atoi(attr)
- if err != nil {
- return fmt.Errorf("Invalid port format: number attribute expected at %d position instead of '%s'",
- i, attr);
- }
- }
+ return *(*[]byte)(unsafe.Pointer(bytesHeader))
+}
- p := new_port(attrsN[0], attrsN[1], attrsN[2], attrsN[3], attrsN[4])
+func nxt_go_warn(format string, args ...interface{}) {
+ str := fmt.Sprintf("[go] " + format, args...)
- if attrsN[0] == pid {
- read_port = p
- }
- }
+ C.nxt_cgo_warn(str_ref(str), C.uint32_t(len(str)))
+}
- if read_port == nil {
- return errors.New("Application read port not found");
+func ListenAndServe(addr string, handler http.Handler) error {
+ if handler == nil {
+ handler = http.DefaultServeMux
}
- C.nxt_go_ready(C.uint32_t(stream))
+ rc := C.nxt_cgo_run(C.uintptr_t(uintptr(unsafe.Pointer(&handler))))
- for !nxt_go_quit {
- err := read_port.read(handler)
- if err != nil {
- return err
- }
+ if rc != 0 {
+ return http.ListenAndServe(addr, handler)
}
return nil