summaryrefslogtreecommitdiffhomepage
path: root/go
diff options
context:
space:
mode:
Diffstat (limited to 'go')
-rw-r--r--go/ldflags-lrt.go13
-rw-r--r--go/ldflags.go10
-rw-r--r--go/nxt_cgo_lib.c207
-rw-r--r--go/nxt_cgo_lib.h40
-rw-r--r--go/port.go170
-rw-r--r--go/request.go144
-rw-r--r--go/response.go87
-rw-r--r--go/unit.go149
8 files changed, 820 insertions, 0 deletions
diff --git a/go/ldflags-lrt.go b/go/ldflags-lrt.go
new file mode 100644
index 00000000..f5a63508
--- /dev/null
+++ b/go/ldflags-lrt.go
@@ -0,0 +1,13 @@
+// +build linux netbsd
+
+/*
+ * Copyright (C) Max Romanov
+ * Copyright (C) NGINX, Inc.
+ */
+
+package unit
+
+/*
+#cgo LDFLAGS: -lrt
+*/
+import "C"
diff --git a/go/ldflags.go b/go/ldflags.go
new file mode 100644
index 00000000..68f2ab78
--- /dev/null
+++ b/go/ldflags.go
@@ -0,0 +1,10 @@
+/*
+ * Copyright (C) NGINX, Inc.
+ */
+
+package unit
+
+/*
+#cgo LDFLAGS: -lunit
+*/
+import "C"
diff --git a/go/nxt_cgo_lib.c b/go/nxt_cgo_lib.c
new file mode 100644
index 00000000..5cb31b5a
--- /dev/null
+++ b/go/nxt_cgo_lib.c
@@ -0,0 +1,207 @@
+
+/*
+ * Copyright (C) Max Romanov
+ * Copyright (C) NGINX, Inc.
+ */
+
+#include "_cgo_export.h"
+
+#include <nxt_unit.h>
+#include <nxt_unit_request.h>
+
+
+static void nxt_cgo_request_handler(nxt_unit_request_info_t *req);
+static nxt_cgo_str_t *nxt_cgo_str_init(nxt_cgo_str_t *dst,
+ nxt_unit_sptr_t *sptr, uint32_t length);
+static int nxt_cgo_add_port(nxt_unit_ctx_t *, nxt_unit_port_t *port);
+static void nxt_cgo_remove_port(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id);
+static ssize_t nxt_cgo_port_send(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id,
+ const void *buf, size_t buf_size, const void *oob, size_t oob_size);
+static ssize_t nxt_cgo_port_recv(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id,
+ void *buf, size_t buf_size, void *oob, size_t oob_size);
+
+int
+nxt_cgo_run(uintptr_t handler)
+{
+ int rc;
+ nxt_unit_ctx_t *ctx;
+ nxt_unit_init_t init;
+
+ memset(&init, 0, sizeof(init));
+
+ init.callbacks.request_handler = nxt_cgo_request_handler;
+ init.callbacks.add_port = nxt_cgo_add_port;
+ init.callbacks.remove_port = nxt_cgo_remove_port;
+ init.callbacks.port_send = nxt_cgo_port_send;
+ init.callbacks.port_recv = nxt_cgo_port_recv;
+
+ init.data = (void *) handler;
+
+ ctx = nxt_unit_init(&init);
+ if (ctx == NULL) {
+ return NXT_UNIT_ERROR;
+ }
+
+ rc = nxt_unit_run(ctx);
+
+ nxt_unit_done(ctx);
+
+ return rc;
+}
+
+
+static void
+nxt_cgo_request_handler(nxt_unit_request_info_t *req)
+{
+ uint32_t i;
+ uintptr_t go_req;
+ nxt_cgo_str_t method, uri, name, value, proto, host, remote_addr;
+ nxt_unit_field_t *f;
+ nxt_unit_request_t *r;
+
+ r = req->request;
+
+ go_req = nxt_go_request_create((uintptr_t) req,
+ nxt_cgo_str_init(&method, &r->method, r->method_length),
+ nxt_cgo_str_init(&uri, &r->target, r->target_length));
+
+ nxt_go_request_set_proto(go_req,
+ nxt_cgo_str_init(&proto, &r->version, r->version_length), 1, 1);
+
+ for (i = 0; i < r->fields_count; i++) {
+ f = &r->fields[i];
+
+ nxt_go_request_add_header(go_req,
+ nxt_cgo_str_init(&name, &f->name, f->name_length),
+ nxt_cgo_str_init(&value, &f->value, f->value_length));
+ }
+
+ nxt_go_request_set_content_length(go_req, r->content_length);
+ nxt_go_request_set_host(go_req,
+ nxt_cgo_str_init(&host, &r->server_name, r->server_name_length));
+ nxt_go_request_set_remote_addr(go_req,
+ nxt_cgo_str_init(&remote_addr, &r->remote, r->remote_length));
+
+ if (r->tls) {
+ nxt_go_request_set_tls(go_req);
+ }
+
+ nxt_go_request_handler(go_req, (uintptr_t) req->unit->data);
+}
+
+
+static nxt_cgo_str_t *
+nxt_cgo_str_init(nxt_cgo_str_t *dst, nxt_unit_sptr_t *sptr, uint32_t length)
+{
+ dst->length = length;
+ dst->start = nxt_unit_sptr_get(sptr);
+
+ return dst;
+}
+
+
+static int
+nxt_cgo_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
+{
+ nxt_go_add_port(port->id.pid, port->id.id,
+ port->in_fd, port->out_fd);
+
+ return nxt_unit_add_port(ctx, port);
+}
+
+
+static void
+nxt_cgo_remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id)
+{
+ nxt_go_remove_port(port_id->pid, port_id->id);
+
+ nxt_unit_remove_port(ctx, port_id);
+}
+
+
+static ssize_t
+nxt_cgo_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
+ const void *buf, size_t buf_size, const void *oob, size_t oob_size)
+{
+ return nxt_go_port_send(port_id->pid, port_id->id,
+ (void *) buf, buf_size, (void *) oob, oob_size);
+}
+
+
+static ssize_t
+nxt_cgo_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
+ void *buf, size_t buf_size, void *oob, size_t oob_size)
+{
+ return nxt_go_port_recv(port_id->pid, port_id->id,
+ buf, buf_size, oob, oob_size);
+}
+
+
+int
+nxt_cgo_response_create(uintptr_t req, int status, int fields,
+ uint32_t fields_size)
+{
+ return nxt_unit_response_init((nxt_unit_request_info_t *) req,
+ status, fields, fields_size);
+}
+
+
+int
+nxt_cgo_response_add_field(uintptr_t req, uintptr_t name, uint8_t name_len,
+ uintptr_t value, uint32_t value_len)
+{
+ return nxt_unit_response_add_field((nxt_unit_request_info_t *) req,
+ (char *) name, name_len,
+ (char *) value, value_len);
+}
+
+
+int
+nxt_cgo_response_send(uintptr_t req)
+{
+ return nxt_unit_response_send((nxt_unit_request_info_t *) req);
+}
+
+
+ssize_t
+nxt_cgo_response_write(uintptr_t req, uintptr_t start, uint32_t len)
+{
+ int rc;
+
+ rc = nxt_unit_response_write((nxt_unit_request_info_t *) req,
+ (void *) start, len);
+ if (rc != NXT_UNIT_OK) {
+ return -1;
+ }
+
+ return len;
+}
+
+
+ssize_t
+nxt_cgo_request_read(uintptr_t req, uintptr_t dst, uint32_t dst_len)
+{
+ return nxt_unit_request_read((nxt_unit_request_info_t *) req,
+ (void *) dst, dst_len);
+}
+
+
+int
+nxt_cgo_request_close(uintptr_t req)
+{
+ return 0;
+}
+
+
+void
+nxt_cgo_request_done(uintptr_t req, int res)
+{
+ nxt_unit_request_done((nxt_unit_request_info_t *) req, res);
+}
+
+
+void
+nxt_cgo_warn(uintptr_t msg, uint32_t msg_len)
+{
+ nxt_unit_warn(NULL, "%.*s", (int) msg_len, (char *) msg);
+}
diff --git a/go/nxt_cgo_lib.h b/go/nxt_cgo_lib.h
new file mode 100644
index 00000000..5317380b
--- /dev/null
+++ b/go/nxt_cgo_lib.h
@@ -0,0 +1,40 @@
+
+/*
+ * Copyright (C) Max Romanov
+ * Copyright (C) NGINX, Inc.
+ */
+
+#ifndef _NXT_CGO_LIB_H_INCLUDED_
+#define _NXT_CGO_LIB_H_INCLUDED_
+
+
+#include <stdint.h>
+#include <stdlib.h>
+#include <sys/types.h>
+
+typedef struct {
+ int length;
+ char *start;
+} nxt_cgo_str_t;
+
+int nxt_cgo_run(uintptr_t handler);
+
+int nxt_cgo_response_create(uintptr_t req, int code, int fields,
+ uint32_t fields_size);
+
+int nxt_cgo_response_add_field(uintptr_t req, uintptr_t name, uint8_t name_len,
+ uintptr_t value, uint32_t value_len);
+
+int nxt_cgo_response_send(uintptr_t req);
+
+ssize_t nxt_cgo_response_write(uintptr_t req, uintptr_t src, uint32_t len);
+
+ssize_t nxt_cgo_request_read(uintptr_t req, uintptr_t dst, uint32_t dst_len);
+
+int nxt_cgo_request_close(uintptr_t req);
+
+void nxt_cgo_request_done(uintptr_t req, int res);
+
+void nxt_cgo_warn(uintptr_t msg, uint32_t msg_len);
+
+#endif /* _NXT_CGO_LIB_H_INCLUDED_ */
diff --git a/go/port.go b/go/port.go
new file mode 100644
index 00000000..a68cae74
--- /dev/null
+++ b/go/port.go
@@ -0,0 +1,170 @@
+/*
+ * Copyright (C) Max Romanov
+ * Copyright (C) NGINX, Inc.
+ */
+
+package unit
+
+/*
+#include "nxt_cgo_lib.h"
+*/
+import "C"
+
+import (
+ "net"
+ "os"
+ "sync"
+ "unsafe"
+)
+
+type port_key struct {
+ pid int
+ id int
+}
+
+type port struct {
+ key port_key
+ rcv *net.UnixConn
+ snd *net.UnixConn
+}
+
+type port_registry struct {
+ sync.RWMutex
+ m map[port_key]*port
+}
+
+var port_registry_ port_registry
+
+func find_port(key port_key) *port {
+ port_registry_.RLock()
+ res := port_registry_.m[key]
+ port_registry_.RUnlock()
+
+ return res
+}
+
+func add_port(p *port) {
+
+ port_registry_.Lock()
+ if port_registry_.m == nil {
+ port_registry_.m = make(map[port_key]*port)
+ }
+
+ port_registry_.m[p.key] = p
+
+ port_registry_.Unlock()
+}
+
+func (p *port) Close() {
+ if p.rcv != nil {
+ p.rcv.Close()
+ }
+
+ if p.snd != nil {
+ p.snd.Close()
+ }
+}
+
+func getUnixConn(fd int) *net.UnixConn {
+ if fd < 0 {
+ return nil
+ }
+
+ f := os.NewFile(uintptr(fd), "sock")
+ defer f.Close()
+
+ c, err := net.FileConn(f)
+ if err != nil {
+ nxt_go_warn("FileConn error %s", err)
+ return nil
+ }
+
+ uc, ok := c.(*net.UnixConn)
+ if !ok {
+ nxt_go_warn("Not a Unix-domain socket %d", fd)
+ return nil
+ }
+
+ return uc
+}
+
+//export nxt_go_add_port
+func nxt_go_add_port(pid C.int, id C.int, rcv C.int, snd C.int) {
+ p := &port{
+ key: port_key{
+ pid: int(pid),
+ id: int(id),
+ },
+ rcv: getUnixConn(int(rcv)),
+ snd: getUnixConn(int(snd)),
+ }
+
+ add_port(p)
+}
+
+//export nxt_go_remove_port
+func nxt_go_remove_port(pid C.int, id C.int) {
+ key := port_key{
+ pid: int(pid),
+ id: int(id),
+ }
+
+ port_registry_.Lock()
+ if port_registry_.m != nil {
+ delete(port_registry_.m, key)
+ }
+
+ port_registry_.Unlock()
+}
+
+//export nxt_go_port_send
+func nxt_go_port_send(pid C.int, id C.int, buf unsafe.Pointer, buf_size C.int,
+ oob unsafe.Pointer, oob_size C.int) C.ssize_t {
+
+ key := port_key{
+ pid: int(pid),
+ id: int(id),
+ }
+
+ p := find_port(key)
+
+ if p == nil {
+ nxt_go_warn("port %d:%d not found", pid, id)
+ return 0
+ }
+
+ n, oobn, err := p.snd.WriteMsgUnix(GoBytes(buf, buf_size),
+ GoBytes(oob, oob_size), nil)
+
+ if err != nil {
+ nxt_go_warn("write result %d (%d), %s", n, oobn, err)
+ }
+
+ return C.ssize_t(n)
+}
+
+//export nxt_go_port_recv
+func nxt_go_port_recv(pid C.int, id C.int, buf unsafe.Pointer, buf_size C.int,
+ oob unsafe.Pointer, oob_size C.int) C.ssize_t {
+
+ key := port_key{
+ pid: int(pid),
+ id: int(id),
+ }
+
+ p := find_port(key)
+
+ if p == nil {
+ nxt_go_warn("port %d:%d not found", pid, id)
+ return 0
+ }
+
+ n, oobn, _, _, err := p.rcv.ReadMsgUnix(GoBytes(buf, buf_size),
+ GoBytes(oob, oob_size))
+
+ if err != nil {
+ nxt_go_warn("read result %d (%d), %s", n, oobn, err)
+ }
+
+ return C.ssize_t(n)
+}
diff --git a/go/request.go b/go/request.go
new file mode 100644
index 00000000..1d8c6702
--- /dev/null
+++ b/go/request.go
@@ -0,0 +1,144 @@
+/*
+ * Copyright (C) Max Romanov
+ * Copyright (C) NGINX, Inc.
+ */
+
+package unit
+
+/*
+#include "nxt_cgo_lib.h"
+*/
+import "C"
+
+import (
+ "io"
+ "net/http"
+ "net/url"
+ "crypto/tls"
+ "unsafe"
+)
+
+type request struct {
+ req http.Request
+ resp *response
+ c_req C.uintptr_t
+}
+
+func (r *request) Read(p []byte) (n int, err error) {
+ res := C.nxt_cgo_request_read(r.c_req, buf_ref(p), C.uint32_t(len(p)))
+
+ if res == 0 && len(p) > 0 {
+ return 0, io.EOF
+ }
+
+ return int(res), nil
+}
+
+func (r *request) Close() error {
+ C.nxt_cgo_request_close(r.c_req)
+ return nil
+}
+
+func (r *request) response() *response {
+ if r.resp == nil {
+ r.resp = new_response(r.c_req, &r.req)
+ }
+
+ return r.resp
+}
+
+func (r *request) done() {
+ resp := r.response()
+ if !resp.headerSent {
+ resp.WriteHeader(http.StatusOK)
+ }
+ C.nxt_cgo_request_done(r.c_req, 0)
+}
+
+func get_request(go_req uintptr) *request {
+ return (*request)(unsafe.Pointer(go_req))
+}
+
+//export nxt_go_request_create
+func nxt_go_request_create(c_req C.uintptr_t,
+ c_method *C.nxt_cgo_str_t, c_uri *C.nxt_cgo_str_t) uintptr {
+
+ uri := C.GoStringN(c_uri.start, c_uri.length)
+
+ var URL *url.URL
+ var err error
+ if URL, err = url.ParseRequestURI(uri); err != nil {
+ return 0
+ }
+
+ r := &request{
+ req: http.Request{
+ Method: C.GoStringN(c_method.start, c_method.length),
+ URL: URL,
+ Header: http.Header{},
+ Body: nil,
+ RequestURI: uri,
+ },
+ c_req: c_req,
+ }
+ r.req.Body = r
+
+ return uintptr(unsafe.Pointer(r))
+}
+
+//export nxt_go_request_set_proto
+func nxt_go_request_set_proto(go_req uintptr, proto *C.nxt_cgo_str_t,
+ maj C.int, min C.int) {
+
+ r := get_request(go_req)
+ r.req.Proto = C.GoStringN(proto.start, proto.length)
+ r.req.ProtoMajor = int(maj)
+ r.req.ProtoMinor = int(min)
+}
+
+//export nxt_go_request_add_header
+func nxt_go_request_add_header(go_req uintptr, name *C.nxt_cgo_str_t,
+ value *C.nxt_cgo_str_t) {
+
+ r := get_request(go_req)
+ r.req.Header.Add(C.GoStringN(name.start, name.length),
+ C.GoStringN(value.start, value.length))
+}
+
+//export nxt_go_request_set_content_length
+func nxt_go_request_set_content_length(go_req uintptr, l C.int64_t) {
+ get_request(go_req).req.ContentLength = int64(l)
+}
+
+//export nxt_go_request_set_host
+func nxt_go_request_set_host(go_req uintptr, host *C.nxt_cgo_str_t) {
+ get_request(go_req).req.Host = C.GoStringN(host.start, host.length)
+}
+
+//export nxt_go_request_set_url
+func nxt_go_request_set_url(go_req uintptr, scheme *C.char) {
+ get_request(go_req).req.URL.Scheme = C.GoString(scheme)
+}
+
+//export nxt_go_request_set_remote_addr
+func nxt_go_request_set_remote_addr(go_req uintptr, addr *C.nxt_cgo_str_t) {
+
+ get_request(go_req).req.RemoteAddr = C.GoStringN(addr.start, addr.length)
+}
+
+//export nxt_go_request_set_tls
+func nxt_go_request_set_tls(go_req uintptr) {
+
+ get_request(go_req).req.TLS = &tls.ConnectionState{ }
+}
+
+//export nxt_go_request_handler
+func nxt_go_request_handler(go_req uintptr, h uintptr) {
+ r := get_request(go_req)
+ handler := get_handler(h)
+
+ go func(r *request) {
+ handler.ServeHTTP(r.response(), &r.req)
+ r.done()
+ }(r)
+}
diff --git a/go/response.go b/go/response.go
new file mode 100644
index 00000000..767d66b7
--- /dev/null
+++ b/go/response.go
@@ -0,0 +1,87 @@
+/*
+ * Copyright (C) Max Romanov
+ * Copyright (C) NGINX, Inc.
+ */
+
+package unit
+
+/*
+#include "nxt_cgo_lib.h"
+*/
+import "C"
+
+import (
+ "net/http"
+)
+
+type response struct {
+ header http.Header
+ headerSent bool
+ req *http.Request
+ c_req C.uintptr_t
+}
+
+func new_response(c_req C.uintptr_t, req *http.Request) *response {
+ resp := &response{
+ header: http.Header{},
+ req: req,
+ c_req: c_req,
+ }
+
+ return resp
+}
+
+func (r *response) Header() http.Header {
+ return r.header
+}
+
+func (r *response) Write(p []byte) (n int, err error) {
+ if !r.headerSent {
+ r.WriteHeader(http.StatusOK)
+ }
+
+ res := C.nxt_cgo_response_write(r.c_req, buf_ref(p), C.uint32_t(len(p)))
+ return int(res), nil
+}
+
+func (r *response) WriteHeader(code int) {
+ if r.headerSent {
+ // Note: explicitly using Stderr, as Stdout is our HTTP output.
+ nxt_go_warn("multiple response.WriteHeader calls")
+ return
+ }
+ r.headerSent = true
+
+ // Set a default Content-Type
+ if _, hasType := r.header["Content-Type"]; !hasType {
+ r.header.Add("Content-Type", "text/html; charset=utf-8")
+ }
+
+ fields := 0
+ fields_size := 0
+
+ for k, vv := range r.header {
+ for _, v := range vv {
+ fields++
+ fields_size += len(k) + len(v)
+ }
+ }
+
+ C.nxt_cgo_response_create(r.c_req, C.int(code), C.int(fields),
+ C.uint32_t(fields_size))
+
+ for k, vv := range r.header {
+ for _, v := range vv {
+ C.nxt_cgo_response_add_field(r.c_req, str_ref(k), C.uint8_t(len(k)),
+ str_ref(v), C.uint32_t(len(v)))
+ }
+ }
+
+ C.nxt_cgo_response_send(r.c_req)
+}
+
+func (r *response) Flush() {
+ if !r.headerSent {
+ r.WriteHeader(http.StatusOK)
+ }
+}
diff --git a/go/unit.go b/go/unit.go
new file mode 100644
index 00000000..1534479e
--- /dev/null
+++ b/go/unit.go
@@ -0,0 +1,149 @@
+/*
+ * Copyright (C) Max Romanov
+ * Copyright (C) NGINX, Inc.
+ */
+
+package unit
+
+/*
+#include "nxt_cgo_lib.h"
+*/
+import "C"
+
+import (
+ "fmt"
+ "net/http"
+ "sync"
+ "unsafe"
+)
+
+type cbuf struct {
+ b C.uintptr_t
+ s C.size_t
+}
+
+func buf_ref(buf []byte) C.uintptr_t {
+ if len(buf) == 0 {
+ return 0
+ }
+
+ return C.uintptr_t(uintptr(unsafe.Pointer(&buf[0])))
+}
+
+type StringHeader struct {
+ Data unsafe.Pointer
+ Len int
+}
+
+func str_ref(s string) C.uintptr_t {
+ header := (*StringHeader)(unsafe.Pointer(&s))
+
+ return C.uintptr_t(uintptr(unsafe.Pointer(header.Data)))
+}
+
+func (buf *cbuf) init_bytes(b []byte) {
+ buf.b = buf_ref(b)
+ buf.s = C.size_t(len(b))
+}
+
+func (buf *cbuf) init_string(s string) {
+ buf.b = str_ref(s)
+ buf.s = C.size_t(len(s))
+}
+
+type SliceHeader struct {
+ Data unsafe.Pointer
+ Len int
+ Cap int
+}
+
+func (buf *cbuf) GoBytes() []byte {
+ if buf == nil {
+ var b [0]byte
+ return b[:0]
+ }
+
+ bytesHeader := &SliceHeader{
+ Data: unsafe.Pointer(uintptr(buf.b)),
+ Len: int(buf.s),
+ Cap: int(buf.s),
+ }
+
+ return *(*[]byte)(unsafe.Pointer(bytesHeader))
+}
+
+func GoBytes(buf unsafe.Pointer, size C.int) []byte {
+ bytesHeader := &SliceHeader{
+ Data: buf,
+ Len: int(size),
+ Cap: int(size),
+ }
+
+ return *(*[]byte)(unsafe.Pointer(bytesHeader))
+}
+
+func nxt_go_warn(format string, args ...interface{}) {
+ str := fmt.Sprintf("[go] " + format, args...)
+
+ C.nxt_cgo_warn(str_ref(str), C.uint32_t(len(str)))
+}
+
+type handler_registry struct {
+ sync.RWMutex
+ next uintptr
+ m map[uintptr]*http.Handler
+}
+
+var handler_registry_ handler_registry
+
+func set_handler(handler *http.Handler) uintptr {
+
+ handler_registry_.Lock()
+ if handler_registry_.m == nil {
+ handler_registry_.m = make(map[uintptr]*http.Handler)
+ handler_registry_.next = 1
+ }
+
+ h := handler_registry_.next
+ handler_registry_.next += 1
+ handler_registry_.m[h] = handler
+
+ handler_registry_.Unlock()
+
+ return h
+}
+
+func get_handler(h uintptr) http.Handler {
+ handler_registry_.RLock()
+ defer handler_registry_.RUnlock()
+
+ return *handler_registry_.m[h]
+}
+
+func reset_handler(h uintptr) {
+
+ handler_registry_.Lock()
+ if handler_registry_.m != nil {
+ delete(handler_registry_.m, h)
+ }
+
+ handler_registry_.Unlock()
+}
+
+func ListenAndServe(addr string, handler http.Handler) error {
+ if handler == nil {
+ handler = http.DefaultServeMux
+ }
+
+ h := set_handler(&handler)
+
+ rc := C.nxt_cgo_run(C.uintptr_t(h))
+
+ reset_handler(h)
+
+ if rc != 0 {
+ return http.ListenAndServe(addr, handler)
+ }
+
+ return nil
+}