diff options
author | Andrei Belov <defan@nginx.com> | 2019-12-26 17:52:09 +0300 |
---|---|---|
committer | Andrei Belov <defan@nginx.com> | 2019-12-26 17:52:09 +0300 |
commit | 35ff5ee1e82a03e57d625230173a84c829c13257 (patch) | |
tree | c3dce5e8d50c8da9739f23b41a636931ad562e25 /go | |
parent | 0ec222bbb202194327c2e76d48f0b2608b37c162 (diff) | |
parent | 55f8e31ed70910ef07db31d7f3c53b12774180f9 (diff) | |
download | unit-35ff5ee1e82a03e57d625230173a84c829c13257.tar.gz unit-35ff5ee1e82a03e57d625230173a84c829c13257.tar.bz2 |
Merged with the default branch.1.14.0-1
Diffstat (limited to 'go')
-rw-r--r-- | go/ldflags-lrt.go | 13 | ||||
-rw-r--r-- | go/ldflags.go | 10 | ||||
-rw-r--r-- | go/nxt_cgo_lib.c | 209 | ||||
-rw-r--r-- | go/nxt_cgo_lib.h | 40 | ||||
-rw-r--r-- | go/observable.go | 32 | ||||
-rw-r--r-- | go/port.go | 170 | ||||
-rw-r--r-- | go/request.go | 144 | ||||
-rw-r--r-- | go/response.go | 119 | ||||
-rw-r--r-- | go/unit.go | 149 |
9 files changed, 886 insertions, 0 deletions
diff --git a/go/ldflags-lrt.go b/go/ldflags-lrt.go new file mode 100644 index 00000000..f5a63508 --- /dev/null +++ b/go/ldflags-lrt.go @@ -0,0 +1,13 @@ +// +build linux netbsd + +/* + * Copyright (C) Max Romanov + * Copyright (C) NGINX, Inc. + */ + +package unit + +/* +#cgo LDFLAGS: -lrt +*/ +import "C" diff --git a/go/ldflags.go b/go/ldflags.go new file mode 100644 index 00000000..68f2ab78 --- /dev/null +++ b/go/ldflags.go @@ -0,0 +1,10 @@ +/* + * Copyright (C) NGINX, Inc. + */ + +package unit + +/* +#cgo LDFLAGS: -lunit +*/ +import "C" diff --git a/go/nxt_cgo_lib.c b/go/nxt_cgo_lib.c new file mode 100644 index 00000000..a4fef9ea --- /dev/null +++ b/go/nxt_cgo_lib.c @@ -0,0 +1,209 @@ + +/* + * Copyright (C) Max Romanov + * Copyright (C) NGINX, Inc. + */ + +#include "_cgo_export.h" + +#include <nxt_unit.h> +#include <nxt_unit_request.h> + + +static void nxt_cgo_request_handler(nxt_unit_request_info_t *req); +static nxt_cgo_str_t *nxt_cgo_str_init(nxt_cgo_str_t *dst, + nxt_unit_sptr_t *sptr, uint32_t length); +static int nxt_cgo_add_port(nxt_unit_ctx_t *, nxt_unit_port_t *port); +static void nxt_cgo_remove_port(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id); +static ssize_t nxt_cgo_port_send(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id, + const void *buf, size_t buf_size, const void *oob, size_t oob_size); +static ssize_t nxt_cgo_port_recv(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id, + void *buf, size_t buf_size, void *oob, size_t oob_size); +static void nxt_cgo_shm_ack_handler(nxt_unit_ctx_t *ctx); + +int +nxt_cgo_run(uintptr_t handler) +{ + int rc; + nxt_unit_ctx_t *ctx; + nxt_unit_init_t init; + + memset(&init, 0, sizeof(init)); + + init.callbacks.request_handler = nxt_cgo_request_handler; + init.callbacks.add_port = nxt_cgo_add_port; + init.callbacks.remove_port = nxt_cgo_remove_port; + init.callbacks.port_send = nxt_cgo_port_send; + init.callbacks.port_recv = nxt_cgo_port_recv; + init.callbacks.shm_ack_handler = nxt_cgo_shm_ack_handler; + + init.data = (void *) handler; + + ctx = nxt_unit_init(&init); + if (ctx == NULL) { + return NXT_UNIT_ERROR; + } + + rc = nxt_unit_run(ctx); + + nxt_unit_done(ctx); + + return rc; +} + + +static void +nxt_cgo_request_handler(nxt_unit_request_info_t *req) +{ + uint32_t i; + uintptr_t go_req; + nxt_cgo_str_t method, uri, name, value, proto, host, remote_addr; + nxt_unit_field_t *f; + nxt_unit_request_t *r; + + r = req->request; + + go_req = nxt_go_request_create((uintptr_t) req, + nxt_cgo_str_init(&method, &r->method, r->method_length), + nxt_cgo_str_init(&uri, &r->target, r->target_length)); + + nxt_go_request_set_proto(go_req, + nxt_cgo_str_init(&proto, &r->version, r->version_length), 1, 1); + + for (i = 0; i < r->fields_count; i++) { + f = &r->fields[i]; + + nxt_go_request_add_header(go_req, + nxt_cgo_str_init(&name, &f->name, f->name_length), + nxt_cgo_str_init(&value, &f->value, f->value_length)); + } + + nxt_go_request_set_content_length(go_req, r->content_length); + nxt_go_request_set_host(go_req, + nxt_cgo_str_init(&host, &r->server_name, r->server_name_length)); + nxt_go_request_set_remote_addr(go_req, + nxt_cgo_str_init(&remote_addr, &r->remote, r->remote_length)); + + if (r->tls) { + nxt_go_request_set_tls(go_req); + } + + nxt_go_request_handler(go_req, (uintptr_t) req->unit->data); +} + + +static nxt_cgo_str_t * +nxt_cgo_str_init(nxt_cgo_str_t *dst, nxt_unit_sptr_t *sptr, uint32_t length) +{ + dst->length = length; + dst->start = nxt_unit_sptr_get(sptr); + + return dst; +} + + +static int +nxt_cgo_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) +{ + nxt_go_add_port(port->id.pid, port->id.id, + port->in_fd, port->out_fd); + + return nxt_unit_add_port(ctx, port); +} + + +static void +nxt_cgo_remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id) +{ + nxt_go_remove_port(port_id->pid, port_id->id); + + nxt_unit_remove_port(ctx, port_id); +} + + +static ssize_t +nxt_cgo_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, + const void *buf, size_t buf_size, const void *oob, size_t oob_size) +{ + return nxt_go_port_send(port_id->pid, port_id->id, + (void *) buf, buf_size, (void *) oob, oob_size); +} + + +static ssize_t +nxt_cgo_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, + void *buf, size_t buf_size, void *oob, size_t oob_size) +{ + return nxt_go_port_recv(port_id->pid, port_id->id, + buf, buf_size, oob, oob_size); +} + + +static void +nxt_cgo_shm_ack_handler(nxt_unit_ctx_t *ctx) +{ + return nxt_go_shm_ack_handler(); +} + + +int +nxt_cgo_response_create(uintptr_t req, int status, int fields, + uint32_t fields_size) +{ + return nxt_unit_response_init((nxt_unit_request_info_t *) req, + status, fields, fields_size); +} + + +int +nxt_cgo_response_add_field(uintptr_t req, uintptr_t name, uint8_t name_len, + uintptr_t value, uint32_t value_len) +{ + return nxt_unit_response_add_field((nxt_unit_request_info_t *) req, + (char *) name, name_len, + (char *) value, value_len); +} + + +int +nxt_cgo_response_send(uintptr_t req) +{ + return nxt_unit_response_send((nxt_unit_request_info_t *) req); +} + + +ssize_t +nxt_cgo_response_write(uintptr_t req, uintptr_t start, uint32_t len) +{ + return nxt_unit_response_write_nb((nxt_unit_request_info_t *) req, + (void *) start, len, 0); +} + + +ssize_t +nxt_cgo_request_read(uintptr_t req, uintptr_t dst, uint32_t dst_len) +{ + return nxt_unit_request_read((nxt_unit_request_info_t *) req, + (void *) dst, dst_len); +} + + +int +nxt_cgo_request_close(uintptr_t req) +{ + return 0; +} + + +void +nxt_cgo_request_done(uintptr_t req, int res) +{ + nxt_unit_request_done((nxt_unit_request_info_t *) req, res); +} + + +void +nxt_cgo_warn(uintptr_t msg, uint32_t msg_len) +{ + nxt_unit_warn(NULL, "%.*s", (int) msg_len, (char *) msg); +} diff --git a/go/nxt_cgo_lib.h b/go/nxt_cgo_lib.h new file mode 100644 index 00000000..5317380b --- /dev/null +++ b/go/nxt_cgo_lib.h @@ -0,0 +1,40 @@ + +/* + * Copyright (C) Max Romanov + * Copyright (C) NGINX, Inc. + */ + +#ifndef _NXT_CGO_LIB_H_INCLUDED_ +#define _NXT_CGO_LIB_H_INCLUDED_ + + +#include <stdint.h> +#include <stdlib.h> +#include <sys/types.h> + +typedef struct { + int length; + char *start; +} nxt_cgo_str_t; + +int nxt_cgo_run(uintptr_t handler); + +int nxt_cgo_response_create(uintptr_t req, int code, int fields, + uint32_t fields_size); + +int nxt_cgo_response_add_field(uintptr_t req, uintptr_t name, uint8_t name_len, + uintptr_t value, uint32_t value_len); + +int nxt_cgo_response_send(uintptr_t req); + +ssize_t nxt_cgo_response_write(uintptr_t req, uintptr_t src, uint32_t len); + +ssize_t nxt_cgo_request_read(uintptr_t req, uintptr_t dst, uint32_t dst_len); + +int nxt_cgo_request_close(uintptr_t req); + +void nxt_cgo_request_done(uintptr_t req, int res); + +void nxt_cgo_warn(uintptr_t msg, uint32_t msg_len); + +#endif /* _NXT_CGO_LIB_H_INCLUDED_ */ diff --git a/go/observable.go b/go/observable.go new file mode 100644 index 00000000..9a38802c --- /dev/null +++ b/go/observable.go @@ -0,0 +1,32 @@ +/* + * Copyright (C) NGINX, Inc. + */ + +package unit + +import ( + "sync" +) + +type observable struct { + sync.Mutex + observers []chan int +} + +func (o *observable) attach(c chan int) { + o.Lock() + defer o.Unlock() + + o.observers = append(o.observers, c) +} + +func (o *observable) notify(e int) { + o.Lock() + defer o.Unlock() + + for _, v := range o.observers { + v <- e + } + + o.observers = nil +} diff --git a/go/port.go b/go/port.go new file mode 100644 index 00000000..a68cae74 --- /dev/null +++ b/go/port.go @@ -0,0 +1,170 @@ +/* + * Copyright (C) Max Romanov + * Copyright (C) NGINX, Inc. + */ + +package unit + +/* +#include "nxt_cgo_lib.h" +*/ +import "C" + +import ( + "net" + "os" + "sync" + "unsafe" +) + +type port_key struct { + pid int + id int +} + +type port struct { + key port_key + rcv *net.UnixConn + snd *net.UnixConn +} + +type port_registry struct { + sync.RWMutex + m map[port_key]*port +} + +var port_registry_ port_registry + +func find_port(key port_key) *port { + port_registry_.RLock() + res := port_registry_.m[key] + port_registry_.RUnlock() + + return res +} + +func add_port(p *port) { + + port_registry_.Lock() + if port_registry_.m == nil { + port_registry_.m = make(map[port_key]*port) + } + + port_registry_.m[p.key] = p + + port_registry_.Unlock() +} + +func (p *port) Close() { + if p.rcv != nil { + p.rcv.Close() + } + + if p.snd != nil { + p.snd.Close() + } +} + +func getUnixConn(fd int) *net.UnixConn { + if fd < 0 { + return nil + } + + f := os.NewFile(uintptr(fd), "sock") + defer f.Close() + + c, err := net.FileConn(f) + if err != nil { + nxt_go_warn("FileConn error %s", err) + return nil + } + + uc, ok := c.(*net.UnixConn) + if !ok { + nxt_go_warn("Not a Unix-domain socket %d", fd) + return nil + } + + return uc +} + +//export nxt_go_add_port +func nxt_go_add_port(pid C.int, id C.int, rcv C.int, snd C.int) { + p := &port{ + key: port_key{ + pid: int(pid), + id: int(id), + }, + rcv: getUnixConn(int(rcv)), + snd: getUnixConn(int(snd)), + } + + add_port(p) +} + +//export nxt_go_remove_port +func nxt_go_remove_port(pid C.int, id C.int) { + key := port_key{ + pid: int(pid), + id: int(id), + } + + port_registry_.Lock() + if port_registry_.m != nil { + delete(port_registry_.m, key) + } + + port_registry_.Unlock() +} + +//export nxt_go_port_send +func nxt_go_port_send(pid C.int, id C.int, buf unsafe.Pointer, buf_size C.int, + oob unsafe.Pointer, oob_size C.int) C.ssize_t { + + key := port_key{ + pid: int(pid), + id: int(id), + } + + p := find_port(key) + + if p == nil { + nxt_go_warn("port %d:%d not found", pid, id) + return 0 + } + + n, oobn, err := p.snd.WriteMsgUnix(GoBytes(buf, buf_size), + GoBytes(oob, oob_size), nil) + + if err != nil { + nxt_go_warn("write result %d (%d), %s", n, oobn, err) + } + + return C.ssize_t(n) +} + +//export nxt_go_port_recv +func nxt_go_port_recv(pid C.int, id C.int, buf unsafe.Pointer, buf_size C.int, + oob unsafe.Pointer, oob_size C.int) C.ssize_t { + + key := port_key{ + pid: int(pid), + id: int(id), + } + + p := find_port(key) + + if p == nil { + nxt_go_warn("port %d:%d not found", pid, id) + return 0 + } + + n, oobn, _, _, err := p.rcv.ReadMsgUnix(GoBytes(buf, buf_size), + GoBytes(oob, oob_size)) + + if err != nil { + nxt_go_warn("read result %d (%d), %s", n, oobn, err) + } + + return C.ssize_t(n) +} diff --git a/go/request.go b/go/request.go new file mode 100644 index 00000000..1d8c6702 --- /dev/null +++ b/go/request.go @@ -0,0 +1,144 @@ +/* + * Copyright (C) Max Romanov + * Copyright (C) NGINX, Inc. + */ + +package unit + +/* +#include "nxt_cgo_lib.h" +*/ +import "C" + +import ( + "io" + "net/http" + "net/url" + "crypto/tls" + "unsafe" +) + +type request struct { + req http.Request + resp *response + c_req C.uintptr_t +} + +func (r *request) Read(p []byte) (n int, err error) { + res := C.nxt_cgo_request_read(r.c_req, buf_ref(p), C.uint32_t(len(p))) + + if res == 0 && len(p) > 0 { + return 0, io.EOF + } + + return int(res), nil +} + +func (r *request) Close() error { + C.nxt_cgo_request_close(r.c_req) + return nil +} + +func (r *request) response() *response { + if r.resp == nil { + r.resp = new_response(r.c_req, &r.req) + } + + return r.resp +} + +func (r *request) done() { + resp := r.response() + if !resp.headerSent { + resp.WriteHeader(http.StatusOK) + } + C.nxt_cgo_request_done(r.c_req, 0) +} + +func get_request(go_req uintptr) *request { + return (*request)(unsafe.Pointer(go_req)) +} + +//export nxt_go_request_create +func nxt_go_request_create(c_req C.uintptr_t, + c_method *C.nxt_cgo_str_t, c_uri *C.nxt_cgo_str_t) uintptr { + + uri := C.GoStringN(c_uri.start, c_uri.length) + + var URL *url.URL + var err error + if URL, err = url.ParseRequestURI(uri); err != nil { + return 0 + } + + r := &request{ + req: http.Request{ + Method: C.GoStringN(c_method.start, c_method.length), + URL: URL, + Header: http.Header{}, + Body: nil, + RequestURI: uri, + }, + c_req: c_req, + } + r.req.Body = r + + return uintptr(unsafe.Pointer(r)) +} + +//export nxt_go_request_set_proto +func nxt_go_request_set_proto(go_req uintptr, proto *C.nxt_cgo_str_t, + maj C.int, min C.int) { + + r := get_request(go_req) + r.req.Proto = C.GoStringN(proto.start, proto.length) + r.req.ProtoMajor = int(maj) + r.req.ProtoMinor = int(min) +} + +//export nxt_go_request_add_header +func nxt_go_request_add_header(go_req uintptr, name *C.nxt_cgo_str_t, + value *C.nxt_cgo_str_t) { + + r := get_request(go_req) + r.req.Header.Add(C.GoStringN(name.start, name.length), + C.GoStringN(value.start, value.length)) +} + +//export nxt_go_request_set_content_length +func nxt_go_request_set_content_length(go_req uintptr, l C.int64_t) { + get_request(go_req).req.ContentLength = int64(l) +} + +//export nxt_go_request_set_host +func nxt_go_request_set_host(go_req uintptr, host *C.nxt_cgo_str_t) { + get_request(go_req).req.Host = C.GoStringN(host.start, host.length) +} + +//export nxt_go_request_set_url +func nxt_go_request_set_url(go_req uintptr, scheme *C.char) { + get_request(go_req).req.URL.Scheme = C.GoString(scheme) +} + +//export nxt_go_request_set_remote_addr +func nxt_go_request_set_remote_addr(go_req uintptr, addr *C.nxt_cgo_str_t) { + + get_request(go_req).req.RemoteAddr = C.GoStringN(addr.start, addr.length) +} + +//export nxt_go_request_set_tls +func nxt_go_request_set_tls(go_req uintptr) { + + get_request(go_req).req.TLS = &tls.ConnectionState{ } +} + +//export nxt_go_request_handler +func nxt_go_request_handler(go_req uintptr, h uintptr) { + r := get_request(go_req) + handler := get_handler(h) + + go func(r *request) { + handler.ServeHTTP(r.response(), &r.req) + r.done() + }(r) +} diff --git a/go/response.go b/go/response.go new file mode 100644 index 00000000..bfa79656 --- /dev/null +++ b/go/response.go @@ -0,0 +1,119 @@ +/* + * Copyright (C) Max Romanov + * Copyright (C) NGINX, Inc. + */ + +package unit + +/* +#include "nxt_cgo_lib.h" +*/ +import "C" + +import ( + "net/http" +) + +type response struct { + header http.Header + headerSent bool + req *http.Request + c_req C.uintptr_t + ch chan int +} + +func new_response(c_req C.uintptr_t, req *http.Request) *response { + resp := &response{ + header: http.Header{}, + req: req, + c_req: c_req, + } + + return resp +} + +func (r *response) Header() http.Header { + return r.header +} + +func (r *response) Write(p []byte) (n int, err error) { + if !r.headerSent { + r.WriteHeader(http.StatusOK) + } + + l := len(p) + written := int(0) + br := buf_ref(p) + + for written < l { + res := C.nxt_cgo_response_write(r.c_req, br, C.uint32_t(l - written)) + + written += int(res) + br += C.uintptr_t(res) + + if (written < l) { + if r.ch == nil { + r.ch = make(chan int, 2) + } + + wait_shm_ack(r.ch) + } + } + + return written, nil +} + +func (r *response) WriteHeader(code int) { + if r.headerSent { + // Note: explicitly using Stderr, as Stdout is our HTTP output. + nxt_go_warn("multiple response.WriteHeader calls") + return + } + r.headerSent = true + + // Set a default Content-Type + if _, hasType := r.header["Content-Type"]; !hasType { + r.header.Add("Content-Type", "text/html; charset=utf-8") + } + + fields := 0 + fields_size := 0 + + for k, vv := range r.header { + for _, v := range vv { + fields++ + fields_size += len(k) + len(v) + } + } + + C.nxt_cgo_response_create(r.c_req, C.int(code), C.int(fields), + C.uint32_t(fields_size)) + + for k, vv := range r.header { + for _, v := range vv { + C.nxt_cgo_response_add_field(r.c_req, str_ref(k), C.uint8_t(len(k)), + str_ref(v), C.uint32_t(len(v))) + } + } + + C.nxt_cgo_response_send(r.c_req) +} + +func (r *response) Flush() { + if !r.headerSent { + r.WriteHeader(http.StatusOK) + } +} + +var observer_registry_ observable + +func wait_shm_ack(c chan int) { + observer_registry_.attach(c) + + _ = <-c +} + +//export nxt_go_shm_ack_handler +func nxt_go_shm_ack_handler() { + observer_registry_.notify(1) +} diff --git a/go/unit.go b/go/unit.go new file mode 100644 index 00000000..1534479e --- /dev/null +++ b/go/unit.go @@ -0,0 +1,149 @@ +/* + * Copyright (C) Max Romanov + * Copyright (C) NGINX, Inc. + */ + +package unit + +/* +#include "nxt_cgo_lib.h" +*/ +import "C" + +import ( + "fmt" + "net/http" + "sync" + "unsafe" +) + +type cbuf struct { + b C.uintptr_t + s C.size_t +} + +func buf_ref(buf []byte) C.uintptr_t { + if len(buf) == 0 { + return 0 + } + + return C.uintptr_t(uintptr(unsafe.Pointer(&buf[0]))) +} + +type StringHeader struct { + Data unsafe.Pointer + Len int +} + +func str_ref(s string) C.uintptr_t { + header := (*StringHeader)(unsafe.Pointer(&s)) + + return C.uintptr_t(uintptr(unsafe.Pointer(header.Data))) +} + +func (buf *cbuf) init_bytes(b []byte) { + buf.b = buf_ref(b) + buf.s = C.size_t(len(b)) +} + +func (buf *cbuf) init_string(s string) { + buf.b = str_ref(s) + buf.s = C.size_t(len(s)) +} + +type SliceHeader struct { + Data unsafe.Pointer + Len int + Cap int +} + +func (buf *cbuf) GoBytes() []byte { + if buf == nil { + var b [0]byte + return b[:0] + } + + bytesHeader := &SliceHeader{ + Data: unsafe.Pointer(uintptr(buf.b)), + Len: int(buf.s), + Cap: int(buf.s), + } + + return *(*[]byte)(unsafe.Pointer(bytesHeader)) +} + +func GoBytes(buf unsafe.Pointer, size C.int) []byte { + bytesHeader := &SliceHeader{ + Data: buf, + Len: int(size), + Cap: int(size), + } + + return *(*[]byte)(unsafe.Pointer(bytesHeader)) +} + +func nxt_go_warn(format string, args ...interface{}) { + str := fmt.Sprintf("[go] " + format, args...) + + C.nxt_cgo_warn(str_ref(str), C.uint32_t(len(str))) +} + +type handler_registry struct { + sync.RWMutex + next uintptr + m map[uintptr]*http.Handler +} + +var handler_registry_ handler_registry + +func set_handler(handler *http.Handler) uintptr { + + handler_registry_.Lock() + if handler_registry_.m == nil { + handler_registry_.m = make(map[uintptr]*http.Handler) + handler_registry_.next = 1 + } + + h := handler_registry_.next + handler_registry_.next += 1 + handler_registry_.m[h] = handler + + handler_registry_.Unlock() + + return h +} + +func get_handler(h uintptr) http.Handler { + handler_registry_.RLock() + defer handler_registry_.RUnlock() + + return *handler_registry_.m[h] +} + +func reset_handler(h uintptr) { + + handler_registry_.Lock() + if handler_registry_.m != nil { + delete(handler_registry_.m, h) + } + + handler_registry_.Unlock() +} + +func ListenAndServe(addr string, handler http.Handler) error { + if handler == nil { + handler = http.DefaultServeMux + } + + h := set_handler(&handler) + + rc := C.nxt_cgo_run(C.uintptr_t(h)) + + reset_handler(h) + + if rc != 0 { + return http.ListenAndServe(addr, handler) + } + + return nil +} |