diff options
Diffstat (limited to '')
-rw-r--r-- | src/go/unit/nxt_cgo_lib.c | 207 | ||||
-rw-r--r-- | src/go/unit/nxt_cgo_lib.h | 40 | ||||
-rw-r--r-- | src/go/unit/nxt_go_array.c | 62 | ||||
-rw-r--r-- | src/go/unit/nxt_go_array.h | 36 | ||||
-rw-r--r-- | src/go/unit/nxt_go_lib.c | 143 | ||||
-rw-r--r-- | src/go/unit/nxt_go_lib.h | 40 | ||||
-rw-r--r-- | src/go/unit/nxt_go_log.h | 34 | ||||
-rw-r--r-- | src/go/unit/nxt_go_mutex.h | 21 | ||||
-rw-r--r-- | src/go/unit/nxt_go_port.c | 216 | ||||
-rw-r--r-- | src/go/unit/nxt_go_port.h | 18 | ||||
-rw-r--r-- | src/go/unit/nxt_go_port_memory.c | 217 | ||||
-rw-r--r-- | src/go/unit/nxt_go_port_memory.h | 30 | ||||
-rw-r--r-- | src/go/unit/nxt_go_process.c | 148 | ||||
-rw-r--r-- | src/go/unit/nxt_go_process.h | 33 | ||||
-rw-r--r-- | src/go/unit/nxt_go_run_ctx.c | 554 | ||||
-rw-r--r-- | src/go/unit/nxt_go_run_ctx.h | 78 | ||||
-rw-r--r-- | src/go/unit/port.go | 143 | ||||
-rw-r--r-- | src/go/unit/request.go | 51 | ||||
-rw-r--r-- | src/go/unit/response.go | 36 | ||||
-rw-r--r-- | src/go/unit/unit.go | 123 |
20 files changed, 388 insertions, 1842 deletions
diff --git a/src/go/unit/nxt_cgo_lib.c b/src/go/unit/nxt_cgo_lib.c new file mode 100644 index 00000000..9c080730 --- /dev/null +++ b/src/go/unit/nxt_cgo_lib.c @@ -0,0 +1,207 @@ + +/* + * Copyright (C) Max Romanov + * Copyright (C) NGINX, Inc. + */ + +#include "_cgo_export.h" + +#include <nxt_main.h> +#include <nxt_unit.h> +#include <nxt_unit_request.h> + + +static void nxt_cgo_request_handler(nxt_unit_request_info_t *req); +static nxt_cgo_str_t *nxt_cgo_str_init(nxt_cgo_str_t *dst, + nxt_unit_sptr_t *sptr, uint32_t length); +static int nxt_cgo_add_port(nxt_unit_ctx_t *, nxt_unit_port_t *port); +static void nxt_cgo_remove_port(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id); +static ssize_t nxt_cgo_port_send(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id, + const void *buf, size_t buf_size, const void *oob, size_t oob_size); +static ssize_t nxt_cgo_port_recv(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id, + void *buf, size_t buf_size, void *oob, size_t oob_size); + +int +nxt_cgo_run(uintptr_t handler) +{ + int rc; + nxt_unit_ctx_t *ctx; + nxt_unit_init_t init; + + memset(&init, 0, sizeof(init)); + + init.callbacks.request_handler = nxt_cgo_request_handler; + init.callbacks.add_port = nxt_cgo_add_port; + init.callbacks.remove_port = nxt_cgo_remove_port; + init.callbacks.port_send = nxt_cgo_port_send; + init.callbacks.port_recv = nxt_cgo_port_recv; + + init.data = (void *) handler; + + ctx = nxt_unit_init(&init); + if (nxt_slow_path(ctx == NULL)) { + return NXT_UNIT_ERROR; + } + + rc = nxt_unit_run(ctx); + + nxt_unit_done(ctx); + + return rc; +} + + +static void +nxt_cgo_request_handler(nxt_unit_request_info_t *req) +{ + uint32_t i; + uintptr_t go_req; + nxt_cgo_str_t method, uri, name, value, proto, host, remote_addr; + nxt_unit_field_t *f; + nxt_unit_request_t *r; + + r = req->request; + + go_req = nxt_go_request_create((uintptr_t) req, + nxt_cgo_str_init(&method, &r->method, r->method_length), + nxt_cgo_str_init(&uri, &r->target, r->target_length)); + + nxt_go_request_set_proto(go_req, + nxt_cgo_str_init(&proto, &r->version, r->version_length), 1, 1); + + for (i = 0; i < r->fields_count; i++) { + f = &r->fields[i]; + + nxt_go_request_add_header(go_req, + nxt_cgo_str_init(&name, &f->name, f->name_length), + nxt_cgo_str_init(&value, &f->value, f->value_length)); + + if (f->hash == NXT_UNIT_HASH_HOST) { + host = value; + } + } + + nxt_go_request_set_content_length(go_req, r->content_length); + nxt_go_request_set_host(go_req, &host); + nxt_go_request_set_remote_addr(go_req, + nxt_cgo_str_init(&remote_addr, &r->remote, r->remote_length)); + + nxt_go_request_handler(go_req, (uintptr_t) req->unit->data); +} + + +static nxt_cgo_str_t * +nxt_cgo_str_init(nxt_cgo_str_t *dst, nxt_unit_sptr_t *sptr, uint32_t length) +{ + dst->length = length; + dst->start = nxt_unit_sptr_get(sptr); + + return dst; +} + + +static int +nxt_cgo_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) +{ + nxt_go_add_port(port->id.pid, port->id.id, + port->in_fd, port->out_fd); + + return nxt_unit_add_port(ctx, port); +} + + +static void +nxt_cgo_remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id) +{ + nxt_go_remove_port(port_id->pid, port_id->id); + + nxt_unit_remove_port(ctx, port_id); +} + + +static ssize_t +nxt_cgo_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, + const void *buf, size_t buf_size, const void *oob, size_t oob_size) +{ + return nxt_go_port_send(port_id->pid, port_id->id, + (void *) buf, buf_size, (void *) oob, oob_size); +} + + +static ssize_t +nxt_cgo_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, + void *buf, size_t buf_size, void *oob, size_t oob_size) +{ + return nxt_go_port_recv(port_id->pid, port_id->id, + buf, buf_size, oob, oob_size); +} + + +int +nxt_cgo_response_create(uintptr_t req, int status, int fields, + uint32_t fields_size) +{ + return nxt_unit_response_init((nxt_unit_request_info_t *) req, + status, fields, fields_size); +} + + +int +nxt_cgo_response_add_field(uintptr_t req, uintptr_t name, uint8_t name_len, + uintptr_t value, uint32_t value_len) +{ + return nxt_unit_response_add_field((nxt_unit_request_info_t *) req, + (char *) name, name_len, + (char *) value, value_len); +} + + +int +nxt_cgo_response_send(uintptr_t req) +{ + return nxt_unit_response_send((nxt_unit_request_info_t *) req); +} + + +ssize_t +nxt_cgo_response_write(uintptr_t req, uintptr_t start, uint32_t len) +{ + int rc; + + rc = nxt_unit_response_write((nxt_unit_request_info_t *) req, + (void *) start, len); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + return -1; + } + + return len; +} + + +ssize_t +nxt_cgo_request_read(uintptr_t req, uintptr_t dst, uint32_t dst_len) +{ + return nxt_unit_request_read((nxt_unit_request_info_t *) req, + (void *) dst, dst_len); +} + + +int +nxt_cgo_request_close(uintptr_t req) +{ + return 0; +} + + +void +nxt_cgo_request_done(uintptr_t req, int res) +{ + nxt_unit_request_done((nxt_unit_request_info_t *) req, res); +} + + +void +nxt_cgo_warn(uintptr_t msg, uint32_t msg_len) +{ + nxt_unit_warn(NULL, ".*s", (int) msg_len, (char *) msg); +} diff --git a/src/go/unit/nxt_cgo_lib.h b/src/go/unit/nxt_cgo_lib.h new file mode 100644 index 00000000..5317380b --- /dev/null +++ b/src/go/unit/nxt_cgo_lib.h @@ -0,0 +1,40 @@ + +/* + * Copyright (C) Max Romanov + * Copyright (C) NGINX, Inc. + */ + +#ifndef _NXT_CGO_LIB_H_INCLUDED_ +#define _NXT_CGO_LIB_H_INCLUDED_ + + +#include <stdint.h> +#include <stdlib.h> +#include <sys/types.h> + +typedef struct { + int length; + char *start; +} nxt_cgo_str_t; + +int nxt_cgo_run(uintptr_t handler); + +int nxt_cgo_response_create(uintptr_t req, int code, int fields, + uint32_t fields_size); + +int nxt_cgo_response_add_field(uintptr_t req, uintptr_t name, uint8_t name_len, + uintptr_t value, uint32_t value_len); + +int nxt_cgo_response_send(uintptr_t req); + +ssize_t nxt_cgo_response_write(uintptr_t req, uintptr_t src, uint32_t len); + +ssize_t nxt_cgo_request_read(uintptr_t req, uintptr_t dst, uint32_t dst_len); + +int nxt_cgo_request_close(uintptr_t req); + +void nxt_cgo_request_done(uintptr_t req, int res); + +void nxt_cgo_warn(uintptr_t msg, uint32_t msg_len); + +#endif /* _NXT_CGO_LIB_H_INCLUDED_ */ diff --git a/src/go/unit/nxt_go_array.c b/src/go/unit/nxt_go_array.c deleted file mode 100644 index fea97faf..00000000 --- a/src/go/unit/nxt_go_array.c +++ /dev/null @@ -1,62 +0,0 @@ - -/* - * Copyright (C) Max Romanov - * Copyright (C) NGINX, Inc. - */ - -#include <stdint.h> -#include <sys/types.h> - -#include <nxt_main.h> - -#include "nxt_go_array.h" - -void -nxt_go_array_init(nxt_array_t *array, nxt_uint_t n, size_t size) -{ - array->elts = malloc(n * size); - - if (nxt_slow_path(n != 0 && array->elts == NULL)) { - return; - } - - array->nelts = 0; - array->size = size; - array->nalloc = n; - array->mem_pool = NULL; -} - -void * -nxt_go_array_add(nxt_array_t *array) -{ - void *p; - uint32_t nalloc, new_alloc; - - nalloc = array->nalloc; - - if (array->nelts == nalloc) { - - if (nalloc < 16) { - /* Allocate new array twice larger than current. */ - new_alloc = nalloc * 2; - - } else { - /* Allocate new array 1.5 times larger than current. */ - new_alloc = nalloc + nalloc / 2; - } - - p = realloc(array->elts, array->size * new_alloc); - - if (nxt_slow_path(p == NULL)) { - return NULL; - } - - array->elts = p; - array->nalloc = new_alloc; - } - - p = nxt_pointer_to(array->elts, array->size * array->nelts); - array->nelts++; - - return p; -} diff --git a/src/go/unit/nxt_go_array.h b/src/go/unit/nxt_go_array.h deleted file mode 100644 index d96db663..00000000 --- a/src/go/unit/nxt_go_array.h +++ /dev/null @@ -1,36 +0,0 @@ - -/* - * Copyright (C) Max Romanov - * Copyright (C) NGINX, Inc. - */ - -#ifndef _NXT_GO_ARRAY_H_INCLUDED_ -#define _NXT_GO_ARRAY_H_INCLUDED_ - - -#include <nxt_array.h> - -void nxt_go_array_init(nxt_array_t *array, nxt_uint_t n, size_t size); - -void *nxt_go_array_add(nxt_array_t *array); - -nxt_inline void * -nxt_go_array_zero_add(nxt_array_t *array) -{ - void *p; - - p = nxt_go_array_add(array); - - if (nxt_fast_path(p != NULL)) { - nxt_memzero(p, array->size); - } - - return p; -} - -#define \ -nxt_go_array_at(array, n) \ - nxt_pointer_to((array)->elts, (array)->size * (n)) - - -#endif /* _NXT_GO_ARRAY_H_INCLUDED_ */ diff --git a/src/go/unit/nxt_go_lib.c b/src/go/unit/nxt_go_lib.c deleted file mode 100644 index eeb7aa50..00000000 --- a/src/go/unit/nxt_go_lib.c +++ /dev/null @@ -1,143 +0,0 @@ - -/* - * Copyright (C) Max Romanov - * Copyright (C) NGINX, Inc. - */ - -#include "nxt_go_run_ctx.h" -#include "nxt_go_log.h" -#include "nxt_go_port.h" - -#include "_cgo_export.h" - -#include <nxt_main.h> - -int -nxt_go_response_write(nxt_go_request_t r, uintptr_t buf, size_t len) -{ - nxt_int_t rc; - nxt_go_run_ctx_t *ctx; - - if (nxt_slow_path(r == 0)) { - return 0; - } - - nxt_go_debug("write: %d", (int) len); - - ctx = (nxt_go_run_ctx_t *) r; - rc = nxt_go_ctx_write(ctx, (void *) buf, len); - - return rc == NXT_OK ? len : -1; -} - - -void -nxt_go_response_flush(nxt_go_request_t r) -{ - nxt_go_run_ctx_t *ctx; - - if (nxt_slow_path(r == 0)) { - return; - } - - ctx = (nxt_go_run_ctx_t *) r; - - if (ctx->nwbuf > 0) { - nxt_go_ctx_flush(ctx, 0); - } -} - - -int -nxt_go_request_read(nxt_go_request_t r, uintptr_t dst, size_t dst_len) -{ - size_t res; - nxt_go_run_ctx_t *ctx; - - if (nxt_slow_path(r == 0)) { - return 0; - } - - ctx = (nxt_go_run_ctx_t *) r; - - dst_len = nxt_min(dst_len, ctx->request.body.preread_size); - - res = nxt_go_ctx_read_raw(ctx, (void *) dst, dst_len); - - ctx->request.body.preread_size -= res; - - return res; -} - - -int -nxt_go_request_close(nxt_go_request_t r) -{ - return 0; -} - - -int -nxt_go_request_done(nxt_go_request_t r) -{ - nxt_int_t res; - nxt_go_run_ctx_t *ctx; - nxt_go_msg_t *msg, *b; - - if (nxt_slow_path(r == 0)) { - return 0; - } - - ctx = (nxt_go_run_ctx_t *) r; - - res = nxt_go_ctx_flush(ctx, 1); - - nxt_go_ctx_release_msg(ctx, &ctx->msg); - - msg = ctx->msg.next; - while (msg != NULL) { - nxt_go_ctx_release_msg(ctx, msg); - - b = msg; - msg = b->next; - - free(b); - } - - free(ctx); - - return res; -} - - -void -nxt_go_ready(uint32_t stream) -{ - nxt_port_msg_t port_msg; - - port_msg.stream = stream; - port_msg.pid = getpid(); - port_msg.reply_port = 0; - port_msg.type = _NXT_PORT_MSG_PROCESS_READY; - port_msg.last = 1; - port_msg.mmap = 0; - port_msg.nf = 0; - port_msg.mf = 0; - port_msg.tracking = 0; - - nxt_go_main_send(&port_msg, sizeof(port_msg), NULL, 0); -} - - -nxt_go_request_t -nxt_go_process_port_msg(uintptr_t buf, size_t buf_len, uintptr_t oob, size_t oob_len) -{ - return nxt_go_port_on_read((void *) buf, buf_len, (void *) oob, oob_len); -} - - -const char * -nxt_go_version() -{ - return NXT_VERSION; -} diff --git a/src/go/unit/nxt_go_lib.h b/src/go/unit/nxt_go_lib.h deleted file mode 100644 index 0621a4dc..00000000 --- a/src/go/unit/nxt_go_lib.h +++ /dev/null @@ -1,40 +0,0 @@ - -/* - * Copyright (C) Max Romanov - * Copyright (C) NGINX, Inc. - */ - -#ifndef _NXT_GO_LIB_H_INCLUDED_ -#define _NXT_GO_LIB_H_INCLUDED_ - - -#include <stdint.h> -#include <stdlib.h> -#include <sys/types.h> - -typedef struct { - int length; - char *start; -} nxt_go_str_t; - -typedef uintptr_t nxt_go_request_t; - -int nxt_go_response_write(nxt_go_request_t r, uintptr_t buf, size_t len); - -void nxt_go_response_flush(nxt_go_request_t r); - -int nxt_go_request_read(nxt_go_request_t r, uintptr_t dst, size_t dst_len); - -int nxt_go_request_close(nxt_go_request_t r); - -int nxt_go_request_done(nxt_go_request_t r); - -void nxt_go_ready(uint32_t stream); - -nxt_go_request_t nxt_go_process_port_msg(uintptr_t buf, size_t buf_len, - uintptr_t oob, size_t oob_len); - -const char *nxt_go_version(); - - -#endif /* _NXT_GO_LIB_H_INCLUDED_ */ diff --git a/src/go/unit/nxt_go_log.h b/src/go/unit/nxt_go_log.h deleted file mode 100644 index d596cfb3..00000000 --- a/src/go/unit/nxt_go_log.h +++ /dev/null @@ -1,34 +0,0 @@ - -/* - * Copyright (C) Max Romanov - * Copyright (C) NGINX, Inc. - */ - -#ifndef _NXT_GO_LOG_H_INCLUDED_ -#define _NXT_GO_LOG_H_INCLUDED_ - - -#include <stdio.h> -#include <pthread.h> - -#include <nxt_auto_config.h> - -#if (NXT_DEBUG) - -#define nxt_go_debug(fmt, ARGS...) \ - fprintf(stderr, "[go debug] " fmt "\n", ##ARGS) - -#else - -#define nxt_go_debug(fmt, ARGS...) - -#endif - -#define nxt_go_warn(fmt, ARGS...) \ - fprintf(stderr, "[go warn] " fmt "\n", ##ARGS) - -#define nxt_go_error(fmt, ARGS...) \ - fprintf(stderr, "[go error] " fmt "\n", ##ARGS) - - -#endif /* _NXT_GO_LOG_H_INCLUDED_ */ diff --git a/src/go/unit/nxt_go_mutex.h b/src/go/unit/nxt_go_mutex.h deleted file mode 100644 index 98bd27f0..00000000 --- a/src/go/unit/nxt_go_mutex.h +++ /dev/null @@ -1,21 +0,0 @@ - -/* - * Copyright (C) Max Romanov - * Copyright (C) NGINX, Inc. - */ - -#ifndef _NXT_GO_MUTEX_H_INCLUDED_ -#define _NXT_GO_MUTEX_H_INCLUDED_ - - -#include <pthread.h> - -typedef pthread_mutex_t nxt_go_mutex_t; - -#define nxt_go_mutex_create(mutex) pthread_mutex_init(mutex, NULL) -#define nxt_go_mutex_destroy(mutex) pthread_mutex_destroy(mutex) -#define nxt_go_mutex_lock(mutex) pthread_mutex_lock(mutex) -#define nxt_go_mutex_unlock(mutex) pthread_mutex_unlock(mutex) - - -#endif /* _NXT_GO_MUTEX_H_INCLUDED_ */ diff --git a/src/go/unit/nxt_go_port.c b/src/go/unit/nxt_go_port.c deleted file mode 100644 index f2f1fc5a..00000000 --- a/src/go/unit/nxt_go_port.c +++ /dev/null @@ -1,216 +0,0 @@ - -/* - * Copyright (C) Max Romanov - * Copyright (C) NGINX, Inc. - */ - -#include "nxt_go_port.h" -#include "nxt_go_log.h" -#include "nxt_go_process.h" -#include "nxt_go_run_ctx.h" - -#include "_cgo_export.h" - -#include <nxt_main.h> - - -#define nxt_go_str(p) ((nxt_go_str_t *)(p)) - -static nxt_go_request_t -nxt_go_data_handler(nxt_port_msg_t *port_msg, size_t size) -{ - size_t s; - nxt_str_t n, v; - nxt_int_t rc; - nxt_uint_t i; - nxt_go_run_ctx_t *ctx; - nxt_go_request_t r; - nxt_app_request_header_t *h; - - ctx = malloc(sizeof(nxt_go_run_ctx_t) + size); - - memcpy(ctx->port_msg, port_msg, size); - port_msg = ctx->port_msg; - - size -= sizeof(nxt_port_msg_t); - - nxt_go_ctx_init(ctx, port_msg, size); - - if (nxt_slow_path(ctx->cancelled)) { - nxt_go_debug("request already cancelled by router"); - free(ctx); - return 0; - } - - r = (nxt_go_request_t)(ctx); - h = &ctx->request.header; - - nxt_go_ctx_read_str(ctx, &h->method); - nxt_go_ctx_read_str(ctx, &h->target); - nxt_go_ctx_read_str(ctx, &h->path); - - nxt_go_ctx_read_size(ctx, &s); - if (s > 0) { - s--; - h->query.start = h->target.start + s; - h->query.length = h->target.length - s; - - if (h->path.start == NULL) { - h->path.start = h->target.start; - h->path.length = s - 1; - } - } - - if (h->path.start == NULL) { - h->path = h->target; - } - - ctx->go_request = nxt_go_new_request(r, port_msg->stream, - nxt_go_str(&h->method), - nxt_go_str(&h->target)); - - nxt_go_ctx_read_str(ctx, &h->version); - - nxt_go_request_set_proto(ctx->go_request, nxt_go_str(&h->version), - h->version.start[5] - '0', - h->version.start[7] - '0'); - - nxt_go_ctx_read_str(ctx, &ctx->request.remote); - if (ctx->request.remote.start != NULL) { - nxt_go_request_set_remote_addr(ctx->go_request, - nxt_go_str(&ctx->request.remote)); - } - - nxt_go_ctx_read_str(ctx, &h->host); - nxt_go_ctx_read_str(ctx, &h->cookie); - nxt_go_ctx_read_str(ctx, &h->content_type); - nxt_go_ctx_read_str(ctx, &h->content_length); - - if (h->host.start != NULL) { - nxt_go_request_set_host(ctx->go_request, nxt_go_str(&h->host)); - } - - nxt_go_ctx_read_size(ctx, &s); - h->parsed_content_length = s; - - do { - rc = nxt_go_ctx_read_str(ctx, &n); - - if (n.length == 0) { - break; - } - - rc = nxt_go_ctx_read_str(ctx, &v); - nxt_go_request_add_header(ctx->go_request, nxt_go_str(&n), - nxt_go_str(&v)); - } while(1); - - nxt_go_ctx_read_size(ctx, &s); - ctx->request.body.preread_size = s; - - if (h->parsed_content_length > 0) { - nxt_go_request_set_content_length(ctx->go_request, - h->parsed_content_length); - } - - if (ctx->request.body.preread_size < h->parsed_content_length) { - nxt_go_warn("preread_size < content_length"); - } - - return ctx->go_request; -} - -nxt_go_request_t -nxt_go_port_on_read(void *buf, size_t buf_size, void *oob, size_t oob_size) -{ - void *buf_end; - void *payload; - size_t payload_size; - nxt_fd_t fd; - struct cmsghdr *cm; - nxt_port_msg_t *port_msg; - nxt_port_msg_new_port_t *new_port_msg; - - fd = -1; - nxt_go_debug("on read: %d (%d)", (int) buf_size, (int) oob_size); - - cm = oob; - if (oob_size >= CMSG_SPACE(sizeof(int)) - && cm->cmsg_len == CMSG_LEN(sizeof(int)) - && cm->cmsg_level == SOL_SOCKET - && cm->cmsg_type == SCM_RIGHTS) { - - nxt_memcpy(&fd, CMSG_DATA(cm), sizeof(int)); - nxt_go_debug("fd = %d", fd); - } - - port_msg = buf; - if (buf_size < sizeof(nxt_port_msg_t)) { - nxt_go_warn("message too small (%d bytes)", (int) buf_size); - goto fail; - } - - buf_end = ((char *) buf) + buf_size; - - payload = port_msg + 1; - payload_size = buf_size - sizeof(nxt_port_msg_t); - - if (port_msg->mmap) { - nxt_go_debug("using data in shared memory"); - } - - if (port_msg->type >= NXT_PORT_MSG_MAX) { - nxt_go_warn("unknown message type (%d)", (int) port_msg->type); - goto fail; - } - - switch (port_msg->type) { - case _NXT_PORT_MSG_QUIT: - nxt_go_debug("quit"); - - nxt_go_set_quit(); - break; - - case _NXT_PORT_MSG_NEW_PORT: - nxt_go_debug("new port"); - new_port_msg = payload; - - nxt_go_new_port(new_port_msg->pid, new_port_msg->id, new_port_msg->type, - -1, fd); - break; - - case _NXT_PORT_MSG_CHANGE_FILE: - nxt_go_debug("change file"); - break; - - case _NXT_PORT_MSG_MMAP: - nxt_go_debug("mmap"); - - nxt_go_new_incoming_mmap(port_msg->pid, fd); - break; - - case _NXT_PORT_MSG_DATA: - nxt_go_debug("data"); - - return nxt_go_data_handler(port_msg, buf_size); - - case _NXT_PORT_MSG_REMOVE_PID: - nxt_go_debug("remove pid"); - - /* TODO remove all ports for this pid in Go */ - /* TODO remove incoming & outgoing mmaps for this pid */ - break; - - default: - goto fail; - } - - -fail: - - if (fd != -1) { - close(fd); - } - - return 0; -} diff --git a/src/go/unit/nxt_go_port.h b/src/go/unit/nxt_go_port.h deleted file mode 100644 index ce9dbcc3..00000000 --- a/src/go/unit/nxt_go_port.h +++ /dev/null @@ -1,18 +0,0 @@ - -/* - * Copyright (C) Max Romanov - * Copyright (C) NGINX, Inc. - */ - -#ifndef _NXT_GO_PORT_H_INCLUDED_ -#define _NXT_GO_PORT_H_INCLUDED_ - - -#include <sys/types.h> -#include "nxt_go_lib.h" - -nxt_go_request_t -nxt_go_port_on_read(void *buf, size_t buf_size, void *oob, size_t oob_size); - - -#endif /* _NXT_GO_PORT_H_INCLUDED_ */ diff --git a/src/go/unit/nxt_go_port_memory.c b/src/go/unit/nxt_go_port_memory.c deleted file mode 100644 index 85b46bed..00000000 --- a/src/go/unit/nxt_go_port_memory.c +++ /dev/null @@ -1,217 +0,0 @@ - -/* - * Copyright (C) Max Romanov - * Copyright (C) NGINX, Inc. - */ - -#include "nxt_go_port_memory.h" -#include "nxt_go_process.h" -#include "nxt_go_array.h" -#include "nxt_go_log.h" - -#include "_cgo_export.h" - -#include <nxt_main.h> - -#if (NXT_HAVE_MEMFD_CREATE) - -#include <linux/memfd.h> -#include <unistd.h> -#include <sys/syscall.h> - -#endif - - -static nxt_port_mmap_header_t * -nxt_go_new_port_mmap(nxt_go_process_t *process, nxt_port_id_t id, - nxt_bool_t tracking) -{ - int name_len, rc; - void *mem; - char name[64]; - nxt_fd_t fd; - nxt_free_map_t *free_map; - nxt_port_msg_t port_msg; - nxt_go_port_mmap_t *port_mmap; - nxt_port_mmap_header_t *hdr; - - fd = -1; - - union { - struct cmsghdr cm; - char space[CMSG_SPACE(sizeof(int))]; - } cmsg; - - port_mmap = nxt_go_array_zero_add(&process->outgoing); - if (nxt_slow_path(port_mmap == NULL)) { - nxt_go_warn("failed to add port mmap to outgoing array"); - - return NULL; - } - - name_len = snprintf(name, nxt_length(name), - NXT_SHM_PREFIX "unit.go.%p", name); - -#if (NXT_HAVE_MEMFD_CREATE) - - fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC); - - if (nxt_slow_path(fd == -1)) { - nxt_go_warn("memfd_create(%s) failed %d", name, errno); - - goto remove_fail; - } - - nxt_go_debug("memfd_create(%s): %d", name, fd); - -#elif (NXT_HAVE_SHM_OPEN_ANON) - - fd = shm_open(SHM_ANON, O_RDWR, S_IRUSR | S_IWUSR); - - nxt_go_debug("shm_open(SHM_ANON): %d", fd); - - if (nxt_slow_path(fd == -1)) { - nxt_go_warn("shm_open(SHM_ANON) failed %d", errno); - - goto remove_fail; - } - -#elif (NXT_HAVE_SHM_OPEN) - - /* Just in case. */ - shm_unlink((char *) name); - - fd = shm_open((char *) name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR); - - nxt_go_debug("shm_open(%s): %d", name, fd); - - if (nxt_slow_path(fd == -1)) { - nxt_go_warn("shm_open(%s) failed %d", name, errno); - - goto remove_fail; - } - - if (nxt_slow_path(shm_unlink((char *) name) == -1)) { - nxt_go_warn("shm_unlink(%s) failed %d", name, errno); - } - -#endif - - if (nxt_slow_path(ftruncate(fd, PORT_MMAP_SIZE) == -1)) { - nxt_go_warn("ftruncate() failed %d", errno); - - goto remove_fail; - } - - mem = mmap(NULL, PORT_MMAP_SIZE, - PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); - - if (nxt_slow_path(mem == MAP_FAILED)) { - goto remove_fail; - } - - port_mmap->hdr = mem; - - /* Init segment header. */ - hdr = port_mmap->hdr; - - memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map)); - memset(hdr->free_tracking_map, 0xFFU, sizeof(hdr->free_tracking_map)); - - hdr->id = process->outgoing.nelts - 1; - hdr->src_pid = getpid(); - hdr->dst_pid = process->pid; - hdr->sent_over = id; - - /* Mark first chunk as busy */ - free_map = tracking ? hdr->free_tracking_map : hdr->free_map; - - nxt_port_mmap_set_chunk_busy(free_map, 0); - - /* Mark as busy chunk followed the last available chunk. */ - nxt_port_mmap_set_chunk_busy(hdr->free_map, PORT_MMAP_CHUNK_COUNT); - nxt_port_mmap_set_chunk_busy(hdr->free_tracking_map, PORT_MMAP_CHUNK_COUNT); - - port_msg.stream = 0; - port_msg.pid = getpid(); - port_msg.reply_port = 0; - port_msg.type = _NXT_PORT_MSG_MMAP; - port_msg.last = 1; - port_msg.mmap = 0; - port_msg.nf = 0; - port_msg.mf = 0; - port_msg.tracking = 0; - - cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int)); - cmsg.cm.cmsg_level = SOL_SOCKET; - cmsg.cm.cmsg_type = SCM_RIGHTS; - - /* - * nxt_memcpy() is used instead of simple - * *(int *) CMSG_DATA(&cmsg.cm) = fd; - * because GCC 4.4 with -O2/3/s optimization may issue a warning: - * dereferencing type-punned pointer will break strict-aliasing rules - * - * Fortunately, GCC with -O1 compiles this nxt_memcpy() - * in the same simple assignment as in the code above. - */ - memcpy(CMSG_DATA(&cmsg.cm), &fd, sizeof(int)); - - rc = nxt_go_port_send(hdr->dst_pid, id, &port_msg, sizeof(port_msg), - &cmsg, sizeof(cmsg)); - - nxt_go_debug("new mmap #%d created for %d -> %d", - (int) hdr->id, (int) getpid(), (int) process->pid); - - close(fd); - - return hdr; - -remove_fail: - - if (fd != -1) { - close(fd); - } - - process->outgoing.nelts--; - - return NULL; -} - -nxt_port_mmap_header_t * -nxt_go_port_mmap_get(nxt_go_process_t *process, nxt_port_id_t port_id, - nxt_chunk_id_t *c, nxt_bool_t tracking) -{ - nxt_free_map_t *free_map; - nxt_go_port_mmap_t *port_mmap; - nxt_go_port_mmap_t *end_port_mmap; - nxt_port_mmap_header_t *hdr; - - nxt_go_mutex_lock(&process->outgoing_mutex); - - port_mmap = process->outgoing.elts; - end_port_mmap = port_mmap + process->outgoing.nelts; - - for (; port_mmap < end_port_mmap; port_mmap++) - { - hdr = port_mmap->hdr; - - if (hdr->sent_over != 0xFFFFu && hdr->sent_over != port_id) { - continue; - } - - free_map = tracking ? hdr->free_tracking_map : hdr->free_map; - - if (nxt_port_mmap_get_free_chunk(free_map, c)) { - goto unlock_return; - } - } - - hdr = nxt_go_new_port_mmap(process, port_id, tracking); - -unlock_return: - - nxt_go_mutex_unlock(&process->outgoing_mutex); - - return hdr; -} diff --git a/src/go/unit/nxt_go_port_memory.h b/src/go/unit/nxt_go_port_memory.h deleted file mode 100644 index 558eb68f..00000000 --- a/src/go/unit/nxt_go_port_memory.h +++ /dev/null @@ -1,30 +0,0 @@ - -/* - * Copyright (C) Max Romanov - * Copyright (C) NGINX, Inc. - */ - -#ifndef _NXT_GO_PORT_MEMORY_H_INCLUDED_ -#define _NXT_GO_PORT_MEMORY_H_INCLUDED_ - - -#include <nxt_main.h> -#include <nxt_port_memory_int.h> - -#ifndef _NXT_GO_PROCESS_T_DEFINED_ -#define _NXT_GO_PROCESS_T_DEFINED_ -typedef struct nxt_go_process_s nxt_go_process_t; -#endif - -typedef struct nxt_go_port_mmap_s nxt_go_port_mmap_t; - -struct nxt_go_port_mmap_s { - nxt_port_mmap_header_t *hdr; -}; - -struct nxt_port_mmap_header_s * -nxt_go_port_mmap_get(nxt_go_process_t *process, nxt_port_id_t port_id, - nxt_chunk_id_t *c, nxt_bool_t tracking); - - -#endif /* _NXT_GO_PORT_MEMORY_H_INCLUDED_ */ diff --git a/src/go/unit/nxt_go_process.c b/src/go/unit/nxt_go_process.c deleted file mode 100644 index bb2d279c..00000000 --- a/src/go/unit/nxt_go_process.c +++ /dev/null @@ -1,148 +0,0 @@ - -/* - * Copyright (C) Max Romanov - * Copyright (C) NGINX, Inc. - */ - -#include "nxt_go_process.h" -#include "nxt_go_array.h" -#include "nxt_go_mutex.h" -#include "nxt_go_log.h" -#include "nxt_go_port_memory.h" - -#include <nxt_port_memory_int.h> - - -static nxt_array_t processes; /* of nxt_go_process_t */ - -static nxt_go_process_t * -nxt_go_find_process(nxt_pid_t pid, uint32_t *pos) -{ - uint32_t l, r, i; - nxt_go_process_t *process; - - if (nxt_slow_path(processes.size == 0)) { - nxt_go_array_init(&processes, 1, sizeof(nxt_go_process_t)); - } - - l = 0; - r = processes.nelts; - i = (l + r) / 2; - - while (r > l) { - process = nxt_go_array_at(&processes, i); - - nxt_go_debug("compare process #%d (%p) at %d", - (int) process->pid, process, (int) i); - - if (pid == process->pid) { - nxt_go_debug("found process %d at %d", (int) pid, (int) i); - - if (pos != NULL) { - *pos = i; - } - - return process; - } - - if (pid < process->pid) { - r = i; - - } else { - l = i + 1; - } - - i = (l + r) / 2; - } - - if (pos != NULL) { - *pos = i; - } - - nxt_go_debug("process %d not found, best pos %d", (int) pid, (int) i); - - return NULL; -} - - -nxt_go_process_t * -nxt_go_get_process(nxt_pid_t pid) -{ - uint32_t pos; - nxt_go_process_t *process; - - process = nxt_go_find_process(pid, &pos); - - if (process == NULL) { - nxt_go_array_add(&processes); - process = nxt_go_array_at(&processes, pos); - - nxt_go_debug("init process #%d (%p) at %d", - (int) pid, process, (int) pos); - - if (pos < processes.nelts - 1) { - memmove(process + 1, process, - processes.size * (processes.nelts - 1 - pos)); - } - - process->pid = pid; - nxt_go_mutex_create(&process->incoming_mutex); - nxt_go_array_init(&process->incoming, 1, sizeof(nxt_go_port_mmap_t)); - nxt_go_mutex_create(&process->outgoing_mutex); - nxt_go_array_init(&process->outgoing, 1, sizeof(nxt_go_port_mmap_t)); - } - - return process; -} - - -void -nxt_go_new_incoming_mmap(nxt_pid_t pid, nxt_fd_t fd) -{ - void *mem; - struct stat mmap_stat; - nxt_go_process_t *process; - nxt_go_port_mmap_t *port_mmap; - - process = nxt_go_get_process(pid); - - nxt_go_debug("got new mmap fd #%d from process %d", - (int) fd, (int) pid); - - if (fstat(fd, &mmap_stat) == -1) { - nxt_go_warn("fstat(%d) failed %d", (int) fd, errno); - - return; - } - - nxt_go_mutex_lock(&process->incoming_mutex); - - port_mmap = nxt_go_array_zero_add(&process->incoming); - if (nxt_slow_path(port_mmap == NULL)) { - nxt_go_warn("failed to add mmap to incoming array"); - - goto fail; - } - - mem = mmap(NULL, mmap_stat.st_size, - PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); - - if (nxt_slow_path(mem == MAP_FAILED)) { - nxt_go_warn("mmap() failed %d", errno); - - goto fail; - } - - port_mmap->hdr = mem; - - if (nxt_slow_path(port_mmap->hdr->id != process->incoming.nelts - 1)) { - nxt_go_warn("port mmap id mismatch (%d != %d)", - port_mmap->hdr->id, process->incoming.nelts - 1); - } - - port_mmap->hdr->sent_over = 0xFFFFu; - -fail: - - nxt_go_mutex_unlock(&process->incoming_mutex); -} diff --git a/src/go/unit/nxt_go_process.h b/src/go/unit/nxt_go_process.h deleted file mode 100644 index 197de1e9..00000000 --- a/src/go/unit/nxt_go_process.h +++ /dev/null @@ -1,33 +0,0 @@ - -/* - * Copyright (C) Max Romanov - * Copyright (C) NGINX, Inc. - */ - -#ifndef _NXT_GO_PROCESS_H_INCLUDED_ -#define _NXT_GO_PROCESS_H_INCLUDED_ - - -#include <nxt_main.h> -#include "nxt_go_mutex.h" - -#ifndef _NXT_GO_PROCESS_T_DEFINED_ -#define _NXT_GO_PROCESS_T_DEFINED_ -typedef struct nxt_go_process_s nxt_go_process_t; -#endif - -struct nxt_go_process_s { - nxt_pid_t pid; - nxt_go_mutex_t incoming_mutex; - nxt_array_t incoming; /* of nxt_go_port_mmap_t */ - nxt_go_mutex_t outgoing_mutex; - nxt_array_t outgoing; /* of nxt_go_port_mmap_t */ -}; - -nxt_go_process_t *nxt_go_get_process(nxt_pid_t pid); - -void nxt_go_new_incoming_mmap(nxt_pid_t pid, nxt_fd_t fd); - - -#endif /* _NXT_GO_PROCESS_H_INCLUDED_ */ - diff --git a/src/go/unit/nxt_go_run_ctx.c b/src/go/unit/nxt_go_run_ctx.c deleted file mode 100644 index 6aa5db77..00000000 --- a/src/go/unit/nxt_go_run_ctx.c +++ /dev/null @@ -1,554 +0,0 @@ - -/* - * Copyright (C) Max Romanov - * Copyright (C) NGINX, Inc. - */ - -#include "nxt_go_run_ctx.h" -#include "nxt_go_log.h" -#include "nxt_go_process.h" -#include "nxt_go_array.h" -#include "nxt_go_mutex.h" -#include "nxt_go_port_memory.h" - -#include "_cgo_export.h" - -#include <nxt_port_memory_int.h> -#include <nxt_main.h> - - -static nxt_int_t -nxt_go_ctx_msg_rbuf(nxt_go_run_ctx_t *ctx, nxt_go_msg_t *msg, nxt_buf_t *buf, - uint32_t n) -{ - size_t nchunks; - nxt_go_port_mmap_t *port_mmap; - nxt_port_mmap_msg_t *mmap_msg; - - if (nxt_slow_path(msg->mmap_msg == NULL)) { - if (n > 0) { - nxt_go_warn("failed to get plain buf #%d", (int) n); - - return NXT_ERROR; - } - - buf->mem.start = (u_char *) (msg->port_msg + 1); - buf->mem.pos = buf->mem.start; - buf->mem.end = buf->mem.start + msg->raw_size; - buf->mem.free = buf->mem.end; - - return NXT_OK; - } - - mmap_msg = msg->mmap_msg + n; - if (nxt_slow_path(mmap_msg >= msg->end)) { - nxt_go_warn("no more data in shm #%d", (int) n); - - return NXT_ERROR; - } - - if (nxt_slow_path(mmap_msg->mmap_id >= ctx->process->incoming.nelts)) { - nxt_go_warn("incoming shared memory segment #%d not found " - "for process %d", (int) mmap_msg->mmap_id, - (int) msg->port_msg->pid); - - return NXT_ERROR; - } - - nxt_go_mutex_lock(&ctx->process->incoming_mutex); - - port_mmap = nxt_go_array_at(&ctx->process->incoming, mmap_msg->mmap_id); - buf->mem.start = nxt_port_mmap_chunk_start(port_mmap->hdr, - mmap_msg->chunk_id); - buf->mem.pos = buf->mem.start; - buf->mem.free = buf->mem.start + mmap_msg->size; - - nxt_go_mutex_unlock(&ctx->process->incoming_mutex); - - nchunks = mmap_msg->size / PORT_MMAP_CHUNK_SIZE; - if ((mmap_msg->size % PORT_MMAP_CHUNK_SIZE) != 0) { - nchunks++; - } - - buf->mem.end = buf->mem.start + nchunks * PORT_MMAP_CHUNK_SIZE; - - return NXT_OK; -} - -static nxt_int_t -nxt_go_ctx_init_rbuf(nxt_go_run_ctx_t *ctx) -{ - return nxt_go_ctx_msg_rbuf(ctx, &ctx->msg, &ctx->rbuf, ctx->nrbuf); -} - -static void -nxt_go_ctx_init_msg(nxt_go_msg_t *msg, nxt_port_msg_t *port_msg, - size_t payload_size) -{ - void *data, *end; - nxt_port_mmap_msg_t *mmap_msg; - - memset(msg, 0, sizeof(nxt_go_msg_t)); - - msg->port_msg = port_msg; - msg->raw_size = payload_size; - - data = port_msg + 1; - end = nxt_pointer_to(data, payload_size); - - if (port_msg->tracking) { - msg->tracking = data; - data = msg->tracking + 1; - } - - if (nxt_fast_path(port_msg->mmap != 0)) { - msg->mmap_msg = data; - msg->end = end; - - mmap_msg = msg->mmap_msg; - while(mmap_msg < msg->end) { - msg->data_size += mmap_msg->size; - mmap_msg += 1; - } - - } else { - msg->mmap_msg = NULL; - msg->end = NULL; - msg->data_size = payload_size; - } -} - -void -nxt_go_ctx_release_msg(nxt_go_run_ctx_t *ctx, nxt_go_msg_t *msg) -{ - u_char *b, *e; - nxt_chunk_id_t c; - nxt_go_port_mmap_t *port_mmap; - nxt_port_mmap_msg_t *mmap_msg, *end; - - if (nxt_slow_path(msg->mmap_msg == NULL)) { - return; - } - - mmap_msg = msg->mmap_msg; - end = msg->end; - - nxt_go_mutex_lock(&ctx->process->incoming_mutex); - - for (; mmap_msg < end; mmap_msg++ ) { - port_mmap = nxt_go_array_at(&ctx->process->incoming, mmap_msg->mmap_id); - - c = mmap_msg->chunk_id; - b = nxt_port_mmap_chunk_start(port_mmap->hdr, c); - e = b + mmap_msg->size; - - while (b < e) { - nxt_port_mmap_set_chunk_free(port_mmap->hdr->free_map, c); - - b += PORT_MMAP_CHUNK_SIZE; - c++; - } - } - - nxt_go_mutex_unlock(&ctx->process->incoming_mutex); -} - - -nxt_int_t -nxt_go_ctx_init(nxt_go_run_ctx_t *ctx, nxt_port_msg_t *port_msg, - size_t payload_size) -{ - nxt_atomic_t *val; - nxt_go_port_mmap_t *port_mmap; - nxt_port_mmap_tracking_msg_t *tracking; - - memset(ctx, 0, sizeof(nxt_go_run_ctx_t)); - - ctx->process = nxt_go_get_process(port_msg->pid); - if (nxt_slow_path(ctx->process == NULL)) { - nxt_go_warn("failed to get process %d", port_msg->pid); - - return NXT_ERROR; - } - - nxt_go_ctx_init_msg(&ctx->msg, port_msg, payload_size); - - if (ctx->msg.tracking != NULL) { - tracking = ctx->msg.tracking; - - if (nxt_slow_path(tracking->mmap_id >= ctx->process->incoming.nelts)) { - nxt_go_warn("incoming shared memory segment #%d not found " - "for process %d", (int) tracking->mmap_id, - (int) port_msg->pid); - - return NXT_ERROR; - } - - nxt_go_mutex_lock(&ctx->process->incoming_mutex); - - port_mmap = nxt_go_array_at(&ctx->process->incoming, tracking->mmap_id); - - nxt_go_mutex_unlock(&ctx->process->incoming_mutex); - - val = port_mmap->hdr->tracking + tracking->tracking_id; - - ctx->cancelled = nxt_atomic_cmp_set(val, port_msg->stream, 0) == 0; - - if (ctx->cancelled) { - nxt_port_mmap_set_chunk_free(port_mmap->hdr->free_tracking_map, - tracking->tracking_id); - - return NXT_OK; - } - } - - ctx->msg_last = &ctx->msg; - - ctx->wport_msg.stream = port_msg->stream; - ctx->wport_msg.pid = getpid(); - ctx->wport_msg.type = _NXT_PORT_MSG_DATA; - ctx->wport_msg.mmap = 1; - - ctx->wmmap_msg = (nxt_port_mmap_msg_t *) ( &ctx->wport_msg + 1 ); - - return nxt_go_ctx_init_rbuf(ctx); -} - - -nxt_int_t -nxt_go_ctx_flush(nxt_go_run_ctx_t *ctx, int last) -{ - int i; - nxt_int_t rc; - - if (last != 0) { - ctx->wport_msg.last = 1; - } - - nxt_go_debug("flush buffers (%d)", last); - - for (i = 0; i < ctx->nwbuf; i++) { - nxt_port_mmap_msg_t *m = ctx->wmmap_msg + i; - - nxt_go_debug(" mmap_msg[%d]={%d, %d, %d}", i, - m->mmap_id, m->chunk_id, m->size); - } - - rc = nxt_go_port_send(ctx->msg.port_msg->pid, ctx->msg.port_msg->reply_port, - &ctx->wport_msg, sizeof(nxt_port_msg_t) + - ctx->nwbuf * sizeof(nxt_port_mmap_msg_t), NULL, 0); - - nxt_go_debug(" port send res = %d", rc); - - ctx->nwbuf = 0; - - memset(&ctx->wbuf, 0, sizeof(ctx->wbuf)); - - return rc; -} - - -nxt_buf_t * -nxt_go_port_mmap_get_buf(nxt_go_run_ctx_t *ctx, size_t size) -{ - size_t nchunks; - nxt_buf_t *buf; - nxt_chunk_id_t c; - nxt_go_port_mmap_t *port_mmap; - nxt_port_mmap_msg_t *mmap_msg; - nxt_port_mmap_header_t *hdr; - - c = 0; - - buf = &ctx->wbuf; - - hdr = nxt_go_port_mmap_get(ctx->process, ctx->msg.port_msg->reply_port, &c, - 0); - if (nxt_slow_path(hdr == NULL)) { - nxt_go_warn("failed to get port_mmap"); - - return NULL; - } - - buf->mem.start = nxt_port_mmap_chunk_start(hdr, c); - buf->mem.pos = buf->mem.start; - buf->mem.free = buf->mem.start; - buf->mem.end = buf->mem.start + PORT_MMAP_CHUNK_SIZE; - - buf->parent = hdr; - - mmap_msg = ctx->wmmap_msg + ctx->nwbuf; - mmap_msg->mmap_id = hdr->id; - mmap_msg->chunk_id = c; - mmap_msg->size = 0; - - nchunks = size / PORT_MMAP_CHUNK_SIZE; - if ((size % PORT_MMAP_CHUNK_SIZE) != 0 || nchunks == 0) { - nchunks++; - } - - c++; - nchunks--; - - /* Try to acquire as much chunks as required. */ - while (nchunks > 0) { - - if (nxt_port_mmap_chk_set_chunk_busy(hdr->free_map, c) == 0) { - break; - } - - buf->mem.end += PORT_MMAP_CHUNK_SIZE; - c++; - nchunks--; - } - - ctx->nwbuf++; - - return buf; -} - - -nxt_int_t -nxt_go_port_mmap_increase_buf(nxt_buf_t *b, size_t size, size_t min_size) -{ - size_t nchunks, free_size; - nxt_chunk_id_t c, start; - nxt_port_mmap_header_t *hdr; - - free_size = nxt_buf_mem_free_size(&b->mem); - - if (nxt_slow_path(size <= free_size)) { - return NXT_OK; - } - - hdr = b->parent; - - start = nxt_port_mmap_chunk_id(hdr, b->mem.end); - - size -= free_size; - - nchunks = size / PORT_MMAP_CHUNK_SIZE; - if ((size % PORT_MMAP_CHUNK_SIZE) != 0 || nchunks == 0) { - nchunks++; - } - - c = start; - - /* Try to acquire as much chunks as required. */ - while (nchunks > 0) { - - if (nxt_port_mmap_chk_set_chunk_busy(hdr->free_map, c) == 0) { - break; - } - - c++; - nchunks--; - } - - if (nchunks != 0 - && min_size > free_size + PORT_MMAP_CHUNK_SIZE * (c - start)) - { - c--; - while (c >= start) { - nxt_port_mmap_set_chunk_free(hdr->free_map, c); - c--; - } - - return NXT_ERROR; - - } else { - b->mem.end += PORT_MMAP_CHUNK_SIZE * (c - start); - - return NXT_OK; - } -} - - -nxt_int_t -nxt_go_ctx_write(nxt_go_run_ctx_t *ctx, void *data, size_t len) -{ - size_t free_size, copy_size; - nxt_buf_t *buf; - nxt_port_mmap_msg_t *mmap_msg; - - buf = &ctx->wbuf; - - while (len > 0) { - if (ctx->nwbuf == 0) { - buf = nxt_go_port_mmap_get_buf(ctx, len); - - if (nxt_slow_path(buf == NULL)) { - return NXT_ERROR; - } - } - - do { - free_size = nxt_buf_mem_free_size(&buf->mem); - - if (free_size > 0) { - copy_size = nxt_min(free_size, len); - - buf->mem.free = nxt_cpymem(buf->mem.free, data, copy_size); - - mmap_msg = ctx->wmmap_msg + ctx->nwbuf - 1; - mmap_msg->size += copy_size; - - len -= copy_size; - data = nxt_pointer_to(data, copy_size); - - if (len == 0) { - return NXT_OK; - } - } - - } while (nxt_go_port_mmap_increase_buf(buf, len, 1) == NXT_OK); - - if (ctx->nwbuf >= 8) { - nxt_go_ctx_flush(ctx, 0); - } - - buf = nxt_go_port_mmap_get_buf(ctx, len); - - if (nxt_slow_path(buf == NULL)) { - return NXT_ERROR; - } - } - - return NXT_OK; -} - - -static nxt_int_t -nxt_go_ctx_read_size_(nxt_go_run_ctx_t *ctx, size_t *size) -{ - nxt_buf_t *buf; - nxt_int_t rc; - - do { - buf = &ctx->rbuf; - - if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) < 1)) { - if (nxt_fast_path(nxt_buf_mem_used_size(&buf->mem) == 0)) { - - ctx->nrbuf++; - rc = nxt_go_ctx_init_rbuf(ctx); - if (nxt_slow_path(rc != NXT_OK)) { - nxt_go_warn("read size: init rbuf failed"); - return rc; - } - - continue; - } - nxt_go_warn("read size: used size is not 0"); - return NXT_ERROR; - } - - if (buf->mem.pos[0] >= 128) { - if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) < 4)) { - nxt_go_warn("read size: used size < 4"); - return NXT_ERROR; - } - } - - break; - } while (1); - - buf->mem.pos = nxt_app_msg_read_length(buf->mem.pos, size); - - return NXT_OK; -} - - -nxt_int_t -nxt_go_ctx_read_size(nxt_go_run_ctx_t *ctx, size_t *size) -{ - nxt_int_t rc; - - rc = nxt_go_ctx_read_size_(ctx, size); - - if (nxt_fast_path(rc == NXT_OK)) { - nxt_go_debug("read_size: %d", (int)*size); - } - - return rc; -} - - -nxt_int_t -nxt_go_ctx_read_str(nxt_go_run_ctx_t *ctx, nxt_str_t *str) -{ - size_t length; - nxt_int_t rc; - nxt_buf_t *buf; - - rc = nxt_go_ctx_read_size_(ctx, &length); - if (nxt_slow_path(rc != NXT_OK)) { - nxt_go_warn("read str: read size failed"); - return rc; - } - - buf = &ctx->rbuf; - - if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) < (intptr_t) length)) { - nxt_go_warn("read str: used size too small %d < %d", - (int) nxt_buf_mem_used_size(&buf->mem), (int) length); - return NXT_ERROR; - } - - if (length > 0) { - str->start = buf->mem.pos; - str->length = length - 1; - - buf->mem.pos += length; - - nxt_go_debug("read_str: %d %.*s", - (int) length - 1, (int) length - 1, str->start); - - } else { - str->start = NULL; - str->length = 0; - - nxt_go_debug("read_str: NULL"); - } - - return NXT_OK; -} - - -size_t -nxt_go_ctx_read_raw(nxt_go_run_ctx_t *ctx, void *dst, size_t size) -{ - size_t res, read_size; - nxt_int_t rc; - nxt_buf_t *buf; - - res = 0; - - while (size > 0) { - buf = &ctx->rbuf; - - if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) == 0)) { - ctx->nrbuf++; - rc = nxt_go_ctx_init_rbuf(ctx); - if (nxt_slow_path(rc != NXT_OK)) { - nxt_go_warn("read raw: init rbuf failed"); - return res; - } - - continue; - } - - read_size = nxt_buf_mem_used_size(&buf->mem); - read_size = nxt_min(read_size, size); - - dst = nxt_cpymem(dst, buf->mem.pos, read_size); - - size -= read_size; - buf->mem.pos += read_size; - res += read_size; - } - - nxt_go_debug("read_raw: %d", (int) res); - - return res; -} diff --git a/src/go/unit/nxt_go_run_ctx.h b/src/go/unit/nxt_go_run_ctx.h deleted file mode 100644 index 42163295..00000000 --- a/src/go/unit/nxt_go_run_ctx.h +++ /dev/null @@ -1,78 +0,0 @@ - -/* - * Copyright (C) Max Romanov - * Copyright (C) NGINX, Inc. - */ - -#ifndef _NXT_GO_RUN_CTX_H_INCLUDED_ -#define _NXT_GO_RUN_CTX_H_INCLUDED_ - - -#include <nxt_main.h> -#include <nxt_router.h> -#include <nxt_port_memory_int.h> - -#ifndef _NXT_GO_PROCESS_T_DEFINED_ -#define _NXT_GO_PROCESS_T_DEFINED_ -typedef struct nxt_go_process_s nxt_go_process_t; -#endif - -typedef struct nxt_go_msg_s nxt_go_msg_t; - -struct nxt_go_msg_s { - off_t start_offset; - - nxt_port_msg_t *port_msg; - size_t raw_size; - size_t data_size; - - nxt_port_mmap_msg_t *mmap_msg; - nxt_port_mmap_msg_t *end; - - nxt_port_mmap_tracking_msg_t *tracking; - - nxt_go_msg_t *next; -}; - - -typedef struct { - nxt_go_msg_t msg; - - nxt_go_process_t *process; - nxt_port_mmap_msg_t *wmmap_msg; - nxt_bool_t cancelled; - - uint32_t nrbuf; - nxt_buf_t rbuf; - - uint32_t nwbuf; - nxt_buf_t wbuf; - nxt_port_msg_t wport_msg; - char wmmap_msg_buf[ sizeof(nxt_port_mmap_msg_t) * 8 ]; - - nxt_app_request_t request; - uintptr_t go_request; - - nxt_go_msg_t *msg_last; - - nxt_port_msg_t port_msg[]; -} nxt_go_run_ctx_t; - - -void nxt_go_ctx_release_msg(nxt_go_run_ctx_t *ctx, nxt_go_msg_t *msg); - -nxt_int_t nxt_go_ctx_init(nxt_go_run_ctx_t *ctx, nxt_port_msg_t *port_msg, - size_t payload_size); - -nxt_int_t nxt_go_ctx_flush(nxt_go_run_ctx_t *ctx, int last); - -nxt_int_t nxt_go_ctx_write(nxt_go_run_ctx_t *ctx, void *data, size_t len); - -nxt_int_t nxt_go_ctx_read_size(nxt_go_run_ctx_t *ctx, size_t *size); - -nxt_int_t nxt_go_ctx_read_str(nxt_go_run_ctx_t *ctx, nxt_str_t *str); - -size_t nxt_go_ctx_read_raw(nxt_go_run_ctx_t *ctx, void *dst, size_t size); - - -#endif /* _NXT_GO_RUN_CTX_H_INCLUDED_ */ diff --git a/src/go/unit/port.go b/src/go/unit/port.go index 51733176..f716c9ec 100644 --- a/src/go/unit/port.go +++ b/src/go/unit/port.go @@ -6,14 +6,12 @@ package unit /* -#include "nxt_go_lib.h" -#include "nxt_process_type.h" +#include "nxt_cgo_lib.h" */ import "C" import ( "net" - "net/http" "os" "sync" "unsafe" @@ -26,7 +24,6 @@ type port_key struct { type port struct { key port_key - t int rcv *net.UnixConn snd *net.UnixConn } @@ -34,7 +31,6 @@ type port struct { type port_registry struct { sync.RWMutex m map[port_key]*port - t [C.NXT_PROCESS_MAX]*port } var port_registry_ port_registry @@ -47,42 +43,14 @@ func find_port(key port_key) *port { return res } -func remove_by_pid(pid int) { - port_registry_.Lock() - if port_registry_.m != nil { - for k, p := range port_registry_.m { - if k.pid == pid { - if port_registry_.t[p.t] == p { - port_registry_.t[p.t] = nil - } - - delete(port_registry_.m, k) - } - } - } - - port_registry_.Unlock() -} - -func main_port() *port { - port_registry_.RLock() - res := port_registry_.t[C.NXT_PROCESS_MAIN] - port_registry_.RUnlock() - - return res -} - func add_port(p *port) { - nxt_go_debug("add_port: %d:%d", p.key.pid, p.key.id); - port_registry_.Lock() if port_registry_.m == nil { port_registry_.m = make(map[port_key]*port) } port_registry_.m[p.key] = p - port_registry_.t[p.t] = p port_registry_.Unlock() } @@ -120,14 +88,38 @@ func getUnixConn(fd int) *net.UnixConn { return uc } -//export nxt_go_new_port -func nxt_go_new_port(pid C.int, id C.int, t C.int, rcv C.int, snd C.int) { - new_port(int(pid), int(id), int(t), int(rcv), int(snd)) +//export nxt_go_add_port +func nxt_go_add_port(pid C.int, id C.int, rcv C.int, snd C.int) { + p := &port{ + key: port_key{ + pid: int(pid), + id: int(id), + }, + rcv: getUnixConn(int(rcv)), + snd: getUnixConn(int(snd)), + } + + add_port(p) +} + +//export nxt_go_remove_port +func nxt_go_remove_port(pid C.int, id C.int) { + key := port_key{ + pid: int(pid), + id: int(id), + } + + port_registry_.Lock() + if port_registry_.m != nil { + delete(port_registry_.m, key) + } + + port_registry_.Unlock() } //export nxt_go_port_send func nxt_go_port_send(pid C.int, id C.int, buf unsafe.Pointer, buf_size C.int, - oob unsafe.Pointer, oob_size C.int) C.int { + oob unsafe.Pointer, oob_size C.int) C.ssize_t { key := port_key{ pid: int(pid), @@ -141,83 +133,38 @@ func nxt_go_port_send(pid C.int, id C.int, buf unsafe.Pointer, buf_size C.int, return 0 } - n, oobn, err := p.snd.WriteMsgUnix(C.GoBytes(buf, buf_size), - C.GoBytes(oob, oob_size), nil) + n, oobn, err := p.snd.WriteMsgUnix(GoBytes(buf, buf_size), + GoBytes(oob, oob_size), nil) if err != nil { nxt_go_warn("write result %d (%d), %s", n, oobn, err) } - return C.int(n) - + return C.ssize_t(n) } -//export nxt_go_main_send -func nxt_go_main_send(buf unsafe.Pointer, buf_size C.int, oob unsafe.Pointer, - oob_size C.int) C.int { +//export nxt_go_port_recv +func nxt_go_port_recv(pid C.int, id C.int, buf unsafe.Pointer, buf_size C.int, + oob unsafe.Pointer, oob_size C.int) C.ssize_t { + + key := port_key{ + pid: int(pid), + id: int(id), + } - p := main_port() + p := find_port(key) if p == nil { + nxt_go_warn("port %d:%d not found", pid, id) return 0 } - n, oobn, err := p.snd.WriteMsgUnix(C.GoBytes(buf, buf_size), - C.GoBytes(oob, oob_size), nil) + n, oobn, _, _, err := p.rcv.ReadMsgUnix(GoBytes(buf, buf_size), + GoBytes(oob, oob_size)) if err != nil { nxt_go_warn("write result %d (%d), %s", n, oobn, err) } - return C.int(n) -} - -func new_port(pid int, id int, t int, rcv int, snd int) *port { - p := &port{ - key: port_key{ - pid: pid, - id: id, - }, - t: t, - rcv: getUnixConn(rcv), - snd: getUnixConn(snd), - } - - add_port(p) - - return p -} - -func (p *port) read(handler http.Handler) error { - var buf [16384]byte - var oob [1024]byte - var c_buf, c_oob cbuf - - n, oobn, _, _, err := p.rcv.ReadMsgUnix(buf[:], oob[:]) - - if err != nil { - return err - } - - c_buf.init(buf[:n]) - c_oob.init(oob[:oobn]) - - go_req := C.nxt_go_process_port_msg(c_buf.b, c_buf.s, c_oob.b, c_oob.s) - - if go_req == 0 { - return nil - } - - r := get_request(go_req) - - go func(r *request) { - if handler == nil { - handler = http.DefaultServeMux - } - - handler.ServeHTTP(r.response(), &r.req) - r.done() - }(r) - - return nil + return C.ssize_t(n) } diff --git a/src/go/unit/request.go b/src/go/unit/request.go index 7d99edaa..829a2c64 100644 --- a/src/go/unit/request.go +++ b/src/go/unit/request.go @@ -6,7 +6,7 @@ package unit /* -#include "nxt_go_lib.h" +#include "nxt_cgo_lib.h" */ import "C" @@ -20,15 +20,11 @@ import ( type request struct { req http.Request resp *response - c_req C.nxt_go_request_t - id C.uint32_t + c_req C.uintptr_t } func (r *request) Read(p []byte) (n int, err error) { - c := C.size_t(len(p)) - b := C.uintptr_t(uintptr(unsafe.Pointer(&p[0]))) - - res := C.nxt_go_request_read(r.c_req, b, c) + res := C.nxt_cgo_request_read(r.c_req, buf_ref(p), C.uint32_t(len(p))) if res == 0 && len(p) > 0 { return 0, io.EOF @@ -38,7 +34,7 @@ func (r *request) Read(p []byte) (n int, err error) { } func (r *request) Close() error { - C.nxt_go_request_close(r.c_req) + C.nxt_cgo_request_close(r.c_req) return nil } @@ -55,16 +51,16 @@ func (r *request) done() { if !resp.headerSent { resp.WriteHeader(http.StatusOK) } - C.nxt_go_request_done(r.c_req) + C.nxt_cgo_request_done(r.c_req, 0) } -func get_request(go_req C.nxt_go_request_t) *request { - return (*request)(unsafe.Pointer(uintptr(go_req))) +func get_request(go_req uintptr) *request { + return (*request)(unsafe.Pointer(go_req)) } -//export nxt_go_new_request -func nxt_go_new_request(c_req C.nxt_go_request_t, id C.uint32_t, - c_method *C.nxt_go_str_t, c_uri *C.nxt_go_str_t) uintptr { +//export nxt_go_request_create +func nxt_go_request_create(c_req C.uintptr_t, + c_method *C.nxt_cgo_str_t, c_uri *C.nxt_cgo_str_t) uintptr { uri := C.GoStringN(c_uri.start, c_uri.length) @@ -83,7 +79,6 @@ func nxt_go_new_request(c_req C.nxt_go_request_t, id C.uint32_t, RequestURI: uri, }, c_req: c_req, - id: id, } r.req.Body = r @@ -91,7 +86,7 @@ func nxt_go_new_request(c_req C.nxt_go_request_t, id C.uint32_t, } //export nxt_go_request_set_proto -func nxt_go_request_set_proto(go_req C.nxt_go_request_t, proto *C.nxt_go_str_t, +func nxt_go_request_set_proto(go_req uintptr, proto *C.nxt_cgo_str_t, maj C.int, min C.int) { r := get_request(go_req) @@ -101,8 +96,8 @@ func nxt_go_request_set_proto(go_req C.nxt_go_request_t, proto *C.nxt_go_str_t, } //export nxt_go_request_add_header -func nxt_go_request_add_header(go_req C.nxt_go_request_t, name *C.nxt_go_str_t, - value *C.nxt_go_str_t) { +func nxt_go_request_add_header(go_req uintptr, name *C.nxt_cgo_str_t, + value *C.nxt_cgo_str_t) { r := get_request(go_req) r.req.Header.Add(C.GoStringN(name.start, name.length), @@ -110,23 +105,33 @@ func nxt_go_request_add_header(go_req C.nxt_go_request_t, name *C.nxt_go_str_t, } //export nxt_go_request_set_content_length -func nxt_go_request_set_content_length(go_req C.nxt_go_request_t, l C.int64_t) { +func nxt_go_request_set_content_length(go_req uintptr, l C.int64_t) { get_request(go_req).req.ContentLength = int64(l) } //export nxt_go_request_set_host -func nxt_go_request_set_host(go_req C.nxt_go_request_t, host *C.nxt_go_str_t) { +func nxt_go_request_set_host(go_req uintptr, host *C.nxt_cgo_str_t) { get_request(go_req).req.Host = C.GoStringN(host.start, host.length) } //export nxt_go_request_set_url -func nxt_go_request_set_url(go_req C.nxt_go_request_t, scheme *C.char) { +func nxt_go_request_set_url(go_req uintptr, scheme *C.char) { get_request(go_req).req.URL.Scheme = C.GoString(scheme) } //export nxt_go_request_set_remote_addr -func nxt_go_request_set_remote_addr(go_req C.nxt_go_request_t, - addr *C.nxt_go_str_t) { +func nxt_go_request_set_remote_addr(go_req uintptr, addr *C.nxt_cgo_str_t) { get_request(go_req).req.RemoteAddr = C.GoStringN(addr.start, addr.length) } + +//export nxt_go_request_handler +func nxt_go_request_handler(go_req uintptr, h uintptr) { + r := get_request(go_req) + handler := *(*http.Handler)(unsafe.Pointer(h)) + + go func(r *request) { + handler.ServeHTTP(r.response(), &r.req) + r.done() + }(r) +} diff --git a/src/go/unit/response.go b/src/go/unit/response.go index 801a52e5..767d66b7 100644 --- a/src/go/unit/response.go +++ b/src/go/unit/response.go @@ -6,12 +6,11 @@ package unit /* -#include "nxt_go_lib.h" +#include "nxt_cgo_lib.h" */ import "C" import ( - "fmt" "net/http" ) @@ -19,10 +18,10 @@ type response struct { header http.Header headerSent bool req *http.Request - c_req C.nxt_go_request_t + c_req C.uintptr_t } -func new_response(c_req C.nxt_go_request_t, req *http.Request) *response { +func new_response(c_req C.uintptr_t, req *http.Request) *response { resp := &response{ header: http.Header{}, req: req, @@ -41,9 +40,7 @@ func (r *response) Write(p []byte) (n int, err error) { r.WriteHeader(http.StatusOK) } - l := C.size_t(len(p)) - b := buf_ref(p) - res := C.nxt_go_response_write(r.c_req, b, l) + res := C.nxt_cgo_response_write(r.c_req, buf_ref(p), C.uint32_t(len(p))) return int(res), nil } @@ -54,22 +51,37 @@ func (r *response) WriteHeader(code int) { return } r.headerSent = true - fmt.Fprintf(r, "Status: %d\r\n", code) // Set a default Content-Type if _, hasType := r.header["Content-Type"]; !hasType { r.header.Add("Content-Type", "text/html; charset=utf-8") } - r.header.Write(r) + fields := 0 + fields_size := 0 - r.Write([]byte("\r\n")) + for k, vv := range r.header { + for _, v := range vv { + fields++ + fields_size += len(k) + len(v) + } + } + + C.nxt_cgo_response_create(r.c_req, C.int(code), C.int(fields), + C.uint32_t(fields_size)) + + for k, vv := range r.header { + for _, v := range vv { + C.nxt_cgo_response_add_field(r.c_req, str_ref(k), C.uint8_t(len(k)), + str_ref(v), C.uint32_t(len(v))) + } + } + + C.nxt_cgo_response_send(r.c_req) } func (r *response) Flush() { if !r.headerSent { r.WriteHeader(http.StatusOK) } - - C.nxt_go_response_flush(r.c_req) } diff --git a/src/go/unit/unit.go b/src/go/unit/unit.go index 77be7612..06257768 100644 --- a/src/go/unit/unit.go +++ b/src/go/unit/unit.go @@ -6,17 +6,13 @@ package unit /* -#include "nxt_go_lib.h" +#include "nxt_cgo_lib.h" */ import "C" import ( - "errors" "fmt" "net/http" - "os" - "strconv" - "strings" "unsafe" ) @@ -33,104 +29,73 @@ func buf_ref(buf []byte) C.uintptr_t { return C.uintptr_t(uintptr(unsafe.Pointer(&buf[0]))) } -func (buf *cbuf) init(b []byte) { - buf.b = buf_ref(b) - buf.s = C.size_t(len(b)) +type StringHeader struct { + Data unsafe.Pointer + Len int } -func (buf *cbuf) GoBytes() []byte { - if buf == nil { - var b [0]byte - return b[:0] - } +func str_ref(s string) C.uintptr_t { + header := (*StringHeader)(unsafe.Pointer(&s)) - return C.GoBytes(unsafe.Pointer(uintptr(buf.b)), C.int(buf.s)) + return C.uintptr_t(uintptr(unsafe.Pointer(header.Data))) } -var nxt_go_quit bool = false - -//export nxt_go_set_quit -func nxt_go_set_quit() { - nxt_go_quit = true +func (buf *cbuf) init_bytes(b []byte) { + buf.b = buf_ref(b) + buf.s = C.size_t(len(b)) } -func nxt_go_warn(format string, args ...interface{}) { - fmt.Fprintf(os.Stderr, "[go warn] " + format + "\n", args...) +func (buf *cbuf) init_string(s string) { + buf.b = str_ref(s) + buf.s = C.size_t(len(s)) } -func nxt_go_debug(format string, args ...interface{}) { - // fmt.Fprintf(os.Stderr, "[go debug] " + format + "\n", args...) +type SliceHeader struct { + Data unsafe.Pointer + Len int + Cap int } -func ListenAndServe(addr string, handler http.Handler) error { - var read_port *port - - go_ports_env := os.Getenv("NXT_GO_PORTS") - if go_ports_env == "" { - return http.ListenAndServe(addr, handler) +func (buf *cbuf) GoBytes() []byte { + if buf == nil { + var b [0]byte + return b[:0] } - nxt_go_debug("NXT_GO_PORTS=%s", go_ports_env) - - ports := strings.Split(go_ports_env, ";") - pid := os.Getpid() - - if len(ports) != 4 { - return errors.New("Invalid NXT_GO_PORTS format") + bytesHeader := &SliceHeader{ + Data: unsafe.Pointer(uintptr(buf.b)), + Len: int(buf.s), + Cap: int(buf.s), } - nxt_go_debug("version=%s", ports[0]) - - builtin_version := C.GoString(C.nxt_go_version()) - - if ports[0] != builtin_version { - return fmt.Errorf("Versions mismatch: Unit %s, while application is built with %s", - ports[0], builtin_version) - } + return *(*[]byte)(unsafe.Pointer(bytesHeader)) +} - stream, stream_err := strconv.Atoi(ports[1]) - if stream_err != nil { - return stream_err +func GoBytes(buf unsafe.Pointer, size C.int) []byte { + bytesHeader := &SliceHeader{ + Data: buf, + Len: int(size), + Cap: int(size), } - read_port = nil - - for _, port_str := range ports[2:] { - attrs := strings.Split(port_str, ",") - - if len(attrs) != 5 { - return fmt.Errorf("Invalid port format: unexpected port attributes number %d, while 5 expected", - len(attrs)) - } - - var attrsN [5]int - var err error - for i, attr := range attrs { - attrsN[i], err = strconv.Atoi(attr) - if err != nil { - return fmt.Errorf("Invalid port format: number attribute expected at %d position instead of '%s'", - i, attr); - } - } + return *(*[]byte)(unsafe.Pointer(bytesHeader)) +} - p := new_port(attrsN[0], attrsN[1], attrsN[2], attrsN[3], attrsN[4]) +func nxt_go_warn(format string, args ...interface{}) { + str := fmt.Sprintf("[go] " + format, args...) - if attrsN[0] == pid { - read_port = p - } - } + C.nxt_cgo_warn(str_ref(str), C.uint32_t(len(str))) +} - if read_port == nil { - return errors.New("Application read port not found"); +func ListenAndServe(addr string, handler http.Handler) error { + if handler == nil { + handler = http.DefaultServeMux } - C.nxt_go_ready(C.uint32_t(stream)) + rc := C.nxt_cgo_run(C.uintptr_t(uintptr(unsafe.Pointer(&handler)))) - for !nxt_go_quit { - err := read_port.read(handler) - if err != nil { - return err - } + if rc != 0 { + return http.ListenAndServe(addr, handler) } return nil |