From 8132e1f700934a32bc9e3fb0ab66f550a335a326 Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Wed, 18 Nov 2020 22:33:53 +0300 Subject: Go: removing C proxy functions and re-using goroutines. --- go/port.go | 48 +++++++++++++++++++++++++++++++----------------- 1 file changed, 31 insertions(+), 17 deletions(-) (limited to 'go/port.go') diff --git a/go/port.go b/go/port.go index 64004d91..78351322 100644 --- a/go/port.go +++ b/go/port.go @@ -11,6 +11,7 @@ package unit import "C" import ( + "io" "net" "os" "sync" @@ -79,13 +80,13 @@ func getUnixConn(fd int) *net.UnixConn { c, err := net.FileConn(f) if err != nil { - nxt_go_warn("FileConn error %s", err) + nxt_go_alert("FileConn error %s", err) return nil } uc, ok := c.(*net.UnixConn) if !ok { - nxt_go_warn("Not a Unix-domain socket %d", fd) + nxt_go_alert("Not a Unix-domain socket %d", fd) return nil } @@ -93,30 +94,37 @@ func getUnixConn(fd int) *net.UnixConn { } //export nxt_go_add_port -func nxt_go_add_port(ctx C.uintptr_t, pid C.int, id C.int, rcv C.int, snd C.int) { - p := &port{ +func nxt_go_add_port(ctx *C.nxt_unit_ctx_t, p *C.nxt_unit_port_t) C.int { + + new_port := &port{ key: port_key{ - pid: int(pid), - id: int(id), + pid: int(p.id.pid), + id: int(p.id.id), }, - rcv: getUnixConn(int(rcv)), - snd: getUnixConn(int(snd)), + rcv: getUnixConn(int(p.in_fd)), + snd: getUnixConn(int(p.out_fd)), } - add_port(p) + add_port(new_port) + + p.in_fd = -1 + p.out_fd = -1 - if id == 65535 { - go func(ctx C.uintptr_t) { - C.nxt_cgo_unit_run_shared(ctx); + if new_port.key.id == 65535 { + go func(ctx *C.nxt_unit_ctx_t) { + C.nxt_unit_run_shared(ctx); }(ctx) } + + return C.NXT_UNIT_OK } //export nxt_go_remove_port -func nxt_go_remove_port(pid C.int, id C.int) { +func nxt_go_remove_port(unit *C.nxt_unit_t, p *C.nxt_unit_port_t) { + key := port_key{ - pid: int(pid), - id: int(id), + pid: int(p.id.pid), + id: int(p.id.id), } port_registry_.Lock() @@ -139,7 +147,7 @@ func nxt_go_port_send(pid C.int, id C.int, buf unsafe.Pointer, buf_size C.int, p := find_port(key) if p == nil { - nxt_go_warn("port %d:%d not found", pid, id) + nxt_go_alert("port %d:%d not found", pid, id) return 0 } @@ -167,7 +175,7 @@ func nxt_go_port_recv(pid C.int, id C.int, buf unsafe.Pointer, buf_size C.int, p := find_port(key) if p == nil { - nxt_go_warn("port %d:%d not found", pid, id) + nxt_go_alert("port %d:%d not found", pid, id) return 0 } @@ -175,6 +183,12 @@ func nxt_go_port_recv(pid C.int, id C.int, buf unsafe.Pointer, buf_size C.int, GoBytes(oob, oob_size)) if err != nil { + if nerr, ok := err.(*net.OpError); ok { + if nerr.Err == io.EOF { + return 0 + } + } + nxt_go_warn("read result %d (%d), %s", n, oobn, err) n = -1 -- cgit