diff options
Diffstat (limited to 'src/go/unit/port.go')
-rw-r--r-- | src/go/unit/port.go | 143 |
1 files changed, 45 insertions, 98 deletions
diff --git a/src/go/unit/port.go b/src/go/unit/port.go index 51733176..f716c9ec 100644 --- a/src/go/unit/port.go +++ b/src/go/unit/port.go @@ -6,14 +6,12 @@ package unit /* -#include "nxt_go_lib.h" -#include "nxt_process_type.h" +#include "nxt_cgo_lib.h" */ import "C" import ( "net" - "net/http" "os" "sync" "unsafe" @@ -26,7 +24,6 @@ type port_key struct { type port struct { key port_key - t int rcv *net.UnixConn snd *net.UnixConn } @@ -34,7 +31,6 @@ type port struct { type port_registry struct { sync.RWMutex m map[port_key]*port - t [C.NXT_PROCESS_MAX]*port } var port_registry_ port_registry @@ -47,42 +43,14 @@ func find_port(key port_key) *port { return res } -func remove_by_pid(pid int) { - port_registry_.Lock() - if port_registry_.m != nil { - for k, p := range port_registry_.m { - if k.pid == pid { - if port_registry_.t[p.t] == p { - port_registry_.t[p.t] = nil - } - - delete(port_registry_.m, k) - } - } - } - - port_registry_.Unlock() -} - -func main_port() *port { - port_registry_.RLock() - res := port_registry_.t[C.NXT_PROCESS_MAIN] - port_registry_.RUnlock() - - return res -} - func add_port(p *port) { - nxt_go_debug("add_port: %d:%d", p.key.pid, p.key.id); - port_registry_.Lock() if port_registry_.m == nil { port_registry_.m = make(map[port_key]*port) } port_registry_.m[p.key] = p - port_registry_.t[p.t] = p port_registry_.Unlock() } @@ -120,14 +88,38 @@ func getUnixConn(fd int) *net.UnixConn { return uc } -//export nxt_go_new_port -func nxt_go_new_port(pid C.int, id C.int, t C.int, rcv C.int, snd C.int) { - new_port(int(pid), int(id), int(t), int(rcv), int(snd)) +//export nxt_go_add_port +func nxt_go_add_port(pid C.int, id C.int, rcv C.int, snd C.int) { + p := &port{ + key: port_key{ + pid: int(pid), + id: int(id), + }, + rcv: getUnixConn(int(rcv)), + snd: getUnixConn(int(snd)), + } + + add_port(p) +} + +//export nxt_go_remove_port +func nxt_go_remove_port(pid C.int, id C.int) { + key := port_key{ + pid: int(pid), + id: int(id), + } + + port_registry_.Lock() + if port_registry_.m != nil { + delete(port_registry_.m, key) + } + + port_registry_.Unlock() } //export nxt_go_port_send func nxt_go_port_send(pid C.int, id C.int, buf unsafe.Pointer, buf_size C.int, - oob unsafe.Pointer, oob_size C.int) C.int { + oob unsafe.Pointer, oob_size C.int) C.ssize_t { key := port_key{ pid: int(pid), @@ -141,83 +133,38 @@ func nxt_go_port_send(pid C.int, id C.int, buf unsafe.Pointer, buf_size C.int, return 0 } - n, oobn, err := p.snd.WriteMsgUnix(C.GoBytes(buf, buf_size), - C.GoBytes(oob, oob_size), nil) + n, oobn, err := p.snd.WriteMsgUnix(GoBytes(buf, buf_size), + GoBytes(oob, oob_size), nil) if err != nil { nxt_go_warn("write result %d (%d), %s", n, oobn, err) } - return C.int(n) - + return C.ssize_t(n) } -//export nxt_go_main_send -func nxt_go_main_send(buf unsafe.Pointer, buf_size C.int, oob unsafe.Pointer, - oob_size C.int) C.int { +//export nxt_go_port_recv +func nxt_go_port_recv(pid C.int, id C.int, buf unsafe.Pointer, buf_size C.int, + oob unsafe.Pointer, oob_size C.int) C.ssize_t { + + key := port_key{ + pid: int(pid), + id: int(id), + } - p := main_port() + p := find_port(key) if p == nil { + nxt_go_warn("port %d:%d not found", pid, id) return 0 } - n, oobn, err := p.snd.WriteMsgUnix(C.GoBytes(buf, buf_size), - C.GoBytes(oob, oob_size), nil) + n, oobn, _, _, err := p.rcv.ReadMsgUnix(GoBytes(buf, buf_size), + GoBytes(oob, oob_size)) if err != nil { nxt_go_warn("write result %d (%d), %s", n, oobn, err) } - return C.int(n) -} - -func new_port(pid int, id int, t int, rcv int, snd int) *port { - p := &port{ - key: port_key{ - pid: pid, - id: id, - }, - t: t, - rcv: getUnixConn(rcv), - snd: getUnixConn(snd), - } - - add_port(p) - - return p -} - -func (p *port) read(handler http.Handler) error { - var buf [16384]byte - var oob [1024]byte - var c_buf, c_oob cbuf - - n, oobn, _, _, err := p.rcv.ReadMsgUnix(buf[:], oob[:]) - - if err != nil { - return err - } - - c_buf.init(buf[:n]) - c_oob.init(oob[:oobn]) - - go_req := C.nxt_go_process_port_msg(c_buf.b, c_buf.s, c_oob.b, c_oob.s) - - if go_req == 0 { - return nil - } - - r := get_request(go_req) - - go func(r *request) { - if handler == nil { - handler = http.DefaultServeMux - } - - handler.ServeHTTP(r.response(), &r.req) - r.done() - }(r) - - return nil + return C.ssize_t(n) } |