diff options
Diffstat (limited to 'src/go')
-rw-r--r-- | src/go/unit/ldflags-lrt.go | 13 | ||||
-rw-r--r-- | src/go/unit/nxt_cgo_lib.c | 207 | ||||
-rw-r--r-- | src/go/unit/nxt_cgo_lib.h | 40 | ||||
-rw-r--r-- | src/go/unit/port.go | 170 | ||||
-rw-r--r-- | src/go/unit/request.go | 144 | ||||
-rw-r--r-- | src/go/unit/response.go | 87 | ||||
-rw-r--r-- | src/go/unit/unit.go | 149 |
7 files changed, 0 insertions, 810 deletions
diff --git a/src/go/unit/ldflags-lrt.go b/src/go/unit/ldflags-lrt.go deleted file mode 100644 index f5a63508..00000000 --- a/src/go/unit/ldflags-lrt.go +++ /dev/null @@ -1,13 +0,0 @@ -// +build linux netbsd - -/* - * Copyright (C) Max Romanov - * Copyright (C) NGINX, Inc. - */ - -package unit - -/* -#cgo LDFLAGS: -lrt -*/ -import "C" diff --git a/src/go/unit/nxt_cgo_lib.c b/src/go/unit/nxt_cgo_lib.c deleted file mode 100644 index 5cb31b5a..00000000 --- a/src/go/unit/nxt_cgo_lib.c +++ /dev/null @@ -1,207 +0,0 @@ - -/* - * Copyright (C) Max Romanov - * Copyright (C) NGINX, Inc. - */ - -#include "_cgo_export.h" - -#include <nxt_unit.h> -#include <nxt_unit_request.h> - - -static void nxt_cgo_request_handler(nxt_unit_request_info_t *req); -static nxt_cgo_str_t *nxt_cgo_str_init(nxt_cgo_str_t *dst, - nxt_unit_sptr_t *sptr, uint32_t length); -static int nxt_cgo_add_port(nxt_unit_ctx_t *, nxt_unit_port_t *port); -static void nxt_cgo_remove_port(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id); -static ssize_t nxt_cgo_port_send(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id, - const void *buf, size_t buf_size, const void *oob, size_t oob_size); -static ssize_t nxt_cgo_port_recv(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id, - void *buf, size_t buf_size, void *oob, size_t oob_size); - -int -nxt_cgo_run(uintptr_t handler) -{ - int rc; - nxt_unit_ctx_t *ctx; - nxt_unit_init_t init; - - memset(&init, 0, sizeof(init)); - - init.callbacks.request_handler = nxt_cgo_request_handler; - init.callbacks.add_port = nxt_cgo_add_port; - init.callbacks.remove_port = nxt_cgo_remove_port; - init.callbacks.port_send = nxt_cgo_port_send; - init.callbacks.port_recv = nxt_cgo_port_recv; - - init.data = (void *) handler; - - ctx = nxt_unit_init(&init); - if (ctx == NULL) { - return NXT_UNIT_ERROR; - } - - rc = nxt_unit_run(ctx); - - nxt_unit_done(ctx); - - return rc; -} - - -static void -nxt_cgo_request_handler(nxt_unit_request_info_t *req) -{ - uint32_t i; - uintptr_t go_req; - nxt_cgo_str_t method, uri, name, value, proto, host, remote_addr; - nxt_unit_field_t *f; - nxt_unit_request_t *r; - - r = req->request; - - go_req = nxt_go_request_create((uintptr_t) req, - nxt_cgo_str_init(&method, &r->method, r->method_length), - nxt_cgo_str_init(&uri, &r->target, r->target_length)); - - nxt_go_request_set_proto(go_req, - nxt_cgo_str_init(&proto, &r->version, r->version_length), 1, 1); - - for (i = 0; i < r->fields_count; i++) { - f = &r->fields[i]; - - nxt_go_request_add_header(go_req, - nxt_cgo_str_init(&name, &f->name, f->name_length), - nxt_cgo_str_init(&value, &f->value, f->value_length)); - } - - nxt_go_request_set_content_length(go_req, r->content_length); - nxt_go_request_set_host(go_req, - nxt_cgo_str_init(&host, &r->server_name, r->server_name_length)); - nxt_go_request_set_remote_addr(go_req, - nxt_cgo_str_init(&remote_addr, &r->remote, r->remote_length)); - - if (r->tls) { - nxt_go_request_set_tls(go_req); - } - - nxt_go_request_handler(go_req, (uintptr_t) req->unit->data); -} - - -static nxt_cgo_str_t * -nxt_cgo_str_init(nxt_cgo_str_t *dst, nxt_unit_sptr_t *sptr, uint32_t length) -{ - dst->length = length; - dst->start = nxt_unit_sptr_get(sptr); - - return dst; -} - - -static int -nxt_cgo_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) -{ - nxt_go_add_port(port->id.pid, port->id.id, - port->in_fd, port->out_fd); - - return nxt_unit_add_port(ctx, port); -} - - -static void -nxt_cgo_remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id) -{ - nxt_go_remove_port(port_id->pid, port_id->id); - - nxt_unit_remove_port(ctx, port_id); -} - - -static ssize_t -nxt_cgo_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, - const void *buf, size_t buf_size, const void *oob, size_t oob_size) -{ - return nxt_go_port_send(port_id->pid, port_id->id, - (void *) buf, buf_size, (void *) oob, oob_size); -} - - -static ssize_t -nxt_cgo_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, - void *buf, size_t buf_size, void *oob, size_t oob_size) -{ - return nxt_go_port_recv(port_id->pid, port_id->id, - buf, buf_size, oob, oob_size); -} - - -int -nxt_cgo_response_create(uintptr_t req, int status, int fields, - uint32_t fields_size) -{ - return nxt_unit_response_init((nxt_unit_request_info_t *) req, - status, fields, fields_size); -} - - -int -nxt_cgo_response_add_field(uintptr_t req, uintptr_t name, uint8_t name_len, - uintptr_t value, uint32_t value_len) -{ - return nxt_unit_response_add_field((nxt_unit_request_info_t *) req, - (char *) name, name_len, - (char *) value, value_len); -} - - -int -nxt_cgo_response_send(uintptr_t req) -{ - return nxt_unit_response_send((nxt_unit_request_info_t *) req); -} - - -ssize_t -nxt_cgo_response_write(uintptr_t req, uintptr_t start, uint32_t len) -{ - int rc; - - rc = nxt_unit_response_write((nxt_unit_request_info_t *) req, - (void *) start, len); - if (rc != NXT_UNIT_OK) { - return -1; - } - - return len; -} - - -ssize_t -nxt_cgo_request_read(uintptr_t req, uintptr_t dst, uint32_t dst_len) -{ - return nxt_unit_request_read((nxt_unit_request_info_t *) req, - (void *) dst, dst_len); -} - - -int -nxt_cgo_request_close(uintptr_t req) -{ - return 0; -} - - -void -nxt_cgo_request_done(uintptr_t req, int res) -{ - nxt_unit_request_done((nxt_unit_request_info_t *) req, res); -} - - -void -nxt_cgo_warn(uintptr_t msg, uint32_t msg_len) -{ - nxt_unit_warn(NULL, "%.*s", (int) msg_len, (char *) msg); -} diff --git a/src/go/unit/nxt_cgo_lib.h b/src/go/unit/nxt_cgo_lib.h deleted file mode 100644 index 5317380b..00000000 --- a/src/go/unit/nxt_cgo_lib.h +++ /dev/null @@ -1,40 +0,0 @@ - -/* - * Copyright (C) Max Romanov - * Copyright (C) NGINX, Inc. - */ - -#ifndef _NXT_CGO_LIB_H_INCLUDED_ -#define _NXT_CGO_LIB_H_INCLUDED_ - - -#include <stdint.h> -#include <stdlib.h> -#include <sys/types.h> - -typedef struct { - int length; - char *start; -} nxt_cgo_str_t; - -int nxt_cgo_run(uintptr_t handler); - -int nxt_cgo_response_create(uintptr_t req, int code, int fields, - uint32_t fields_size); - -int nxt_cgo_response_add_field(uintptr_t req, uintptr_t name, uint8_t name_len, - uintptr_t value, uint32_t value_len); - -int nxt_cgo_response_send(uintptr_t req); - -ssize_t nxt_cgo_response_write(uintptr_t req, uintptr_t src, uint32_t len); - -ssize_t nxt_cgo_request_read(uintptr_t req, uintptr_t dst, uint32_t dst_len); - -int nxt_cgo_request_close(uintptr_t req); - -void nxt_cgo_request_done(uintptr_t req, int res); - -void nxt_cgo_warn(uintptr_t msg, uint32_t msg_len); - -#endif /* _NXT_CGO_LIB_H_INCLUDED_ */ diff --git a/src/go/unit/port.go b/src/go/unit/port.go deleted file mode 100644 index a68cae74..00000000 --- a/src/go/unit/port.go +++ /dev/null @@ -1,170 +0,0 @@ -/* - * Copyright (C) Max Romanov - * Copyright (C) NGINX, Inc. - */ - -package unit - -/* -#include "nxt_cgo_lib.h" -*/ -import "C" - -import ( - "net" - "os" - "sync" - "unsafe" -) - -type port_key struct { - pid int - id int -} - -type port struct { - key port_key - rcv *net.UnixConn - snd *net.UnixConn -} - -type port_registry struct { - sync.RWMutex - m map[port_key]*port -} - -var port_registry_ port_registry - -func find_port(key port_key) *port { - port_registry_.RLock() - res := port_registry_.m[key] - port_registry_.RUnlock() - - return res -} - -func add_port(p *port) { - - port_registry_.Lock() - if port_registry_.m == nil { - port_registry_.m = make(map[port_key]*port) - } - - port_registry_.m[p.key] = p - - port_registry_.Unlock() -} - -func (p *port) Close() { - if p.rcv != nil { - p.rcv.Close() - } - - if p.snd != nil { - p.snd.Close() - } -} - -func getUnixConn(fd int) *net.UnixConn { - if fd < 0 { - return nil - } - - f := os.NewFile(uintptr(fd), "sock") - defer f.Close() - - c, err := net.FileConn(f) - if err != nil { - nxt_go_warn("FileConn error %s", err) - return nil - } - - uc, ok := c.(*net.UnixConn) - if !ok { - nxt_go_warn("Not a Unix-domain socket %d", fd) - return nil - } - - return uc -} - -//export nxt_go_add_port -func nxt_go_add_port(pid C.int, id C.int, rcv C.int, snd C.int) { - p := &port{ - key: port_key{ - pid: int(pid), - id: int(id), - }, - rcv: getUnixConn(int(rcv)), - snd: getUnixConn(int(snd)), - } - - add_port(p) -} - -//export nxt_go_remove_port -func nxt_go_remove_port(pid C.int, id C.int) { - key := port_key{ - pid: int(pid), - id: int(id), - } - - port_registry_.Lock() - if port_registry_.m != nil { - delete(port_registry_.m, key) - } - - port_registry_.Unlock() -} - -//export nxt_go_port_send -func nxt_go_port_send(pid C.int, id C.int, buf unsafe.Pointer, buf_size C.int, - oob unsafe.Pointer, oob_size C.int) C.ssize_t { - - key := port_key{ - pid: int(pid), - id: int(id), - } - - p := find_port(key) - - if p == nil { - nxt_go_warn("port %d:%d not found", pid, id) - return 0 - } - - n, oobn, err := p.snd.WriteMsgUnix(GoBytes(buf, buf_size), - GoBytes(oob, oob_size), nil) - - if err != nil { - nxt_go_warn("write result %d (%d), %s", n, oobn, err) - } - - return C.ssize_t(n) -} - -//export nxt_go_port_recv -func nxt_go_port_recv(pid C.int, id C.int, buf unsafe.Pointer, buf_size C.int, - oob unsafe.Pointer, oob_size C.int) C.ssize_t { - - key := port_key{ - pid: int(pid), - id: int(id), - } - - p := find_port(key) - - if p == nil { - nxt_go_warn("port %d:%d not found", pid, id) - return 0 - } - - n, oobn, _, _, err := p.rcv.ReadMsgUnix(GoBytes(buf, buf_size), - GoBytes(oob, oob_size)) - - if err != nil { - nxt_go_warn("read result %d (%d), %s", n, oobn, err) - } - - return C.ssize_t(n) -} diff --git a/src/go/unit/request.go b/src/go/unit/request.go deleted file mode 100644 index 1d8c6702..00000000 --- a/src/go/unit/request.go +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Copyright (C) Max Romanov - * Copyright (C) NGINX, Inc. - */ - -package unit - -/* -#include "nxt_cgo_lib.h" -*/ -import "C" - -import ( - "io" - "net/http" - "net/url" - "crypto/tls" - "unsafe" -) - -type request struct { - req http.Request - resp *response - c_req C.uintptr_t -} - -func (r *request) Read(p []byte) (n int, err error) { - res := C.nxt_cgo_request_read(r.c_req, buf_ref(p), C.uint32_t(len(p))) - - if res == 0 && len(p) > 0 { - return 0, io.EOF - } - - return int(res), nil -} - -func (r *request) Close() error { - C.nxt_cgo_request_close(r.c_req) - return nil -} - -func (r *request) response() *response { - if r.resp == nil { - r.resp = new_response(r.c_req, &r.req) - } - - return r.resp -} - -func (r *request) done() { - resp := r.response() - if !resp.headerSent { - resp.WriteHeader(http.StatusOK) - } - C.nxt_cgo_request_done(r.c_req, 0) -} - -func get_request(go_req uintptr) *request { - return (*request)(unsafe.Pointer(go_req)) -} - -//export nxt_go_request_create -func nxt_go_request_create(c_req C.uintptr_t, - c_method *C.nxt_cgo_str_t, c_uri *C.nxt_cgo_str_t) uintptr { - - uri := C.GoStringN(c_uri.start, c_uri.length) - - var URL *url.URL - var err error - if URL, err = url.ParseRequestURI(uri); err != nil { - return 0 - } - - r := &request{ - req: http.Request{ - Method: C.GoStringN(c_method.start, c_method.length), - URL: URL, - Header: http.Header{}, - Body: nil, - RequestURI: uri, - }, - c_req: c_req, - } - r.req.Body = r - - return uintptr(unsafe.Pointer(r)) -} - -//export nxt_go_request_set_proto -func nxt_go_request_set_proto(go_req uintptr, proto *C.nxt_cgo_str_t, - maj C.int, min C.int) { - - r := get_request(go_req) - r.req.Proto = C.GoStringN(proto.start, proto.length) - r.req.ProtoMajor = int(maj) - r.req.ProtoMinor = int(min) -} - -//export nxt_go_request_add_header -func nxt_go_request_add_header(go_req uintptr, name *C.nxt_cgo_str_t, - value *C.nxt_cgo_str_t) { - - r := get_request(go_req) - r.req.Header.Add(C.GoStringN(name.start, name.length), - C.GoStringN(value.start, value.length)) -} - -//export nxt_go_request_set_content_length -func nxt_go_request_set_content_length(go_req uintptr, l C.int64_t) { - get_request(go_req).req.ContentLength = int64(l) -} - -//export nxt_go_request_set_host -func nxt_go_request_set_host(go_req uintptr, host *C.nxt_cgo_str_t) { - get_request(go_req).req.Host = C.GoStringN(host.start, host.length) -} - -//export nxt_go_request_set_url -func nxt_go_request_set_url(go_req uintptr, scheme *C.char) { - get_request(go_req).req.URL.Scheme = C.GoString(scheme) -} - -//export nxt_go_request_set_remote_addr -func nxt_go_request_set_remote_addr(go_req uintptr, addr *C.nxt_cgo_str_t) { - - get_request(go_req).req.RemoteAddr = C.GoStringN(addr.start, addr.length) -} - -//export nxt_go_request_set_tls -func nxt_go_request_set_tls(go_req uintptr) { - - get_request(go_req).req.TLS = &tls.ConnectionState{ } -} - -//export nxt_go_request_handler -func nxt_go_request_handler(go_req uintptr, h uintptr) { - r := get_request(go_req) - handler := get_handler(h) - - go func(r *request) { - handler.ServeHTTP(r.response(), &r.req) - r.done() - }(r) -} diff --git a/src/go/unit/response.go b/src/go/unit/response.go deleted file mode 100644 index 767d66b7..00000000 --- a/src/go/unit/response.go +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Copyright (C) Max Romanov - * Copyright (C) NGINX, Inc. - */ - -package unit - -/* -#include "nxt_cgo_lib.h" -*/ -import "C" - -import ( - "net/http" -) - -type response struct { - header http.Header - headerSent bool - req *http.Request - c_req C.uintptr_t -} - -func new_response(c_req C.uintptr_t, req *http.Request) *response { - resp := &response{ - header: http.Header{}, - req: req, - c_req: c_req, - } - - return resp -} - -func (r *response) Header() http.Header { - return r.header -} - -func (r *response) Write(p []byte) (n int, err error) { - if !r.headerSent { - r.WriteHeader(http.StatusOK) - } - - res := C.nxt_cgo_response_write(r.c_req, buf_ref(p), C.uint32_t(len(p))) - return int(res), nil -} - -func (r *response) WriteHeader(code int) { - if r.headerSent { - // Note: explicitly using Stderr, as Stdout is our HTTP output. - nxt_go_warn("multiple response.WriteHeader calls") - return - } - r.headerSent = true - - // Set a default Content-Type - if _, hasType := r.header["Content-Type"]; !hasType { - r.header.Add("Content-Type", "text/html; charset=utf-8") - } - - fields := 0 - fields_size := 0 - - for k, vv := range r.header { - for _, v := range vv { - fields++ - fields_size += len(k) + len(v) - } - } - - C.nxt_cgo_response_create(r.c_req, C.int(code), C.int(fields), - C.uint32_t(fields_size)) - - for k, vv := range r.header { - for _, v := range vv { - C.nxt_cgo_response_add_field(r.c_req, str_ref(k), C.uint8_t(len(k)), - str_ref(v), C.uint32_t(len(v))) - } - } - - C.nxt_cgo_response_send(r.c_req) -} - -func (r *response) Flush() { - if !r.headerSent { - r.WriteHeader(http.StatusOK) - } -} diff --git a/src/go/unit/unit.go b/src/go/unit/unit.go deleted file mode 100644 index 1534479e..00000000 --- a/src/go/unit/unit.go +++ /dev/null @@ -1,149 +0,0 @@ -/* - * Copyright (C) Max Romanov - * Copyright (C) NGINX, Inc. - */ - -package unit - -/* -#include "nxt_cgo_lib.h" -*/ -import "C" - -import ( - "fmt" - "net/http" - "sync" - "unsafe" -) - -type cbuf struct { - b C.uintptr_t - s C.size_t -} - -func buf_ref(buf []byte) C.uintptr_t { - if len(buf) == 0 { - return 0 - } - - return C.uintptr_t(uintptr(unsafe.Pointer(&buf[0]))) -} - -type StringHeader struct { - Data unsafe.Pointer - Len int -} - -func str_ref(s string) C.uintptr_t { - header := (*StringHeader)(unsafe.Pointer(&s)) - - return C.uintptr_t(uintptr(unsafe.Pointer(header.Data))) -} - -func (buf *cbuf) init_bytes(b []byte) { - buf.b = buf_ref(b) - buf.s = C.size_t(len(b)) -} - -func (buf *cbuf) init_string(s string) { - buf.b = str_ref(s) - buf.s = C.size_t(len(s)) -} - -type SliceHeader struct { - Data unsafe.Pointer - Len int - Cap int -} - -func (buf *cbuf) GoBytes() []byte { - if buf == nil { - var b [0]byte - return b[:0] - } - - bytesHeader := &SliceHeader{ - Data: unsafe.Pointer(uintptr(buf.b)), - Len: int(buf.s), - Cap: int(buf.s), - } - - return *(*[]byte)(unsafe.Pointer(bytesHeader)) -} - -func GoBytes(buf unsafe.Pointer, size C.int) []byte { - bytesHeader := &SliceHeader{ - Data: buf, - Len: int(size), - Cap: int(size), - } - - return *(*[]byte)(unsafe.Pointer(bytesHeader)) -} - -func nxt_go_warn(format string, args ...interface{}) { - str := fmt.Sprintf("[go] " + format, args...) - - C.nxt_cgo_warn(str_ref(str), C.uint32_t(len(str))) -} - -type handler_registry struct { - sync.RWMutex - next uintptr - m map[uintptr]*http.Handler -} - -var handler_registry_ handler_registry - -func set_handler(handler *http.Handler) uintptr { - - handler_registry_.Lock() - if handler_registry_.m == nil { - handler_registry_.m = make(map[uintptr]*http.Handler) - handler_registry_.next = 1 - } - - h := handler_registry_.next - handler_registry_.next += 1 - handler_registry_.m[h] = handler - - handler_registry_.Unlock() - - return h -} - -func get_handler(h uintptr) http.Handler { - handler_registry_.RLock() - defer handler_registry_.RUnlock() - - return *handler_registry_.m[h] -} - -func reset_handler(h uintptr) { - - handler_registry_.Lock() - if handler_registry_.m != nil { - delete(handler_registry_.m, h) - } - - handler_registry_.Unlock() -} - -func ListenAndServe(addr string, handler http.Handler) error { - if handler == nil { - handler = http.DefaultServeMux - } - - h := set_handler(&handler) - - rc := C.nxt_cgo_run(C.uintptr_t(h)) - - reset_handler(h) - - if rc != 0 { - return http.ListenAndServe(addr, handler) - } - - return nil -} |