summaryrefslogblamecommitdiffhomepage
path: root/go/port.go
blob: 6635e87c2b50566b8a8a7b0bb9b2324962ee3827 (plain) (tree)
1
2
3
4
5
6
7
8
9




                            
            

  
                        



          
            
































                                    
 




                                                           




                                           























                                            
                                                      




                                   
                                                               


                          


                 
                        


                                                                         
                              

                                           
                  

                                                

         



                          
 







                                                

                            


                           

                                                                  
 
                        

                                   







                                             


                         
                                                                              
                                                       
 






                              
                     
                                                             

                        
 

                                                                  
 
                       
                                                                     

                      

         
                           

 

                                                                              
                                                           




                              
 
                           
 
                     
                                                             

                        
 
                                                                       
                                               
 
                       





                                                       
                                                                    

                      


                                          

         
                           
 
/*
 * Copyright (C) Max Romanov
 * Copyright (C) NGINX, Inc.
 */

package unit

/*
#include "nxt_cgo_lib.h"
*/
import "C"

import (
	"io"
	"net"
	"os"
	"sync"
	"unsafe"
)

type port_key struct {
	pid int
	id  int
}

type port struct {
	key port_key
	rcv *net.UnixConn
	snd *net.UnixConn
}

type port_registry struct {
	sync.RWMutex
	m map[port_key]*port
}

var port_registry_ port_registry

func find_port(key port_key) *port {
	port_registry_.RLock()
	res := port_registry_.m[key]
	port_registry_.RUnlock()

	return res
}

func add_port(p *port) {

	port_registry_.Lock()
	if port_registry_.m == nil {
		port_registry_.m = make(map[port_key]*port)
	}

	old := port_registry_.m[p.key]

	if old == nil {
		port_registry_.m[p.key] = p
	}

	port_registry_.Unlock()
}

func (p *port) Close() {
	if p.rcv != nil {
		p.rcv.Close()
	}

	if p.snd != nil {
		p.snd.Close()
	}
}

func getUnixConn(fd int) *net.UnixConn {
	if fd < 0 {
		return nil
	}

	f := os.NewFile(uintptr(fd), "sock")
	defer f.Close()

	c, err := net.FileConn(f)
	if err != nil {
		nxt_go_alert("FileConn error %s", err)
		return nil
	}

	uc, ok := c.(*net.UnixConn)
	if !ok {
		nxt_go_alert("Not a Unix-domain socket %d", fd)
		return nil
	}

	return uc
}

//export nxt_go_add_port
func nxt_go_add_port(ctx *C.nxt_unit_ctx_t, p *C.nxt_unit_port_t) C.int {

	new_port := &port{
		key: port_key{
			pid: int(p.id.pid),
			id:  int(p.id.id),
		},
		rcv: getUnixConn(int(p.in_fd)),
		snd: getUnixConn(int(p.out_fd)),
	}

	add_port(new_port)

	p.in_fd = -1
	p.out_fd = -1

	return C.NXT_UNIT_OK
}

//export nxt_go_ready
func nxt_go_ready(ctx *C.nxt_unit_ctx_t) C.int {
	go func(ctx *C.nxt_unit_ctx_t) {
		C.nxt_unit_run_shared(ctx)
	}(ctx)

	return C.NXT_UNIT_OK
}

//export nxt_go_remove_port
func nxt_go_remove_port(unit *C.nxt_unit_t, ctx *C.nxt_unit_ctx_t,
	p *C.nxt_unit_port_t) {

	key := port_key{
		pid: int(p.id.pid),
		id:  int(p.id.id),
	}

	port_registry_.Lock()
	if port_registry_.m != nil {
		delete(port_registry_.m, key)
	}

	port_registry_.Unlock()
}

//export nxt_go_port_send
func nxt_go_port_send(pid C.int, id C.int, buf unsafe.Pointer, buf_size C.int,
	oob unsafe.Pointer, oob_size C.int) C.ssize_t {

	key := port_key{
		pid: int(pid),
		id:  int(id),
	}

	p := find_port(key)

	if p == nil {
		nxt_go_alert("port %d:%d not found", pid, id)
		return 0
	}

	n, oobn, err := p.snd.WriteMsgUnix(GoBytes(buf, buf_size),
		GoBytes(oob, oob_size), nil)

	if err != nil {
		nxt_go_warn("write result %d (%d), %s", n, oobn, err)

		n = -1
	}

	return C.ssize_t(n)
}

//export nxt_go_port_recv
func nxt_go_port_recv(pid C.int, id C.int, buf unsafe.Pointer, buf_size C.int,
	oob unsafe.Pointer, oob_size *C.size_t) C.ssize_t {

	key := port_key{
		pid: int(pid),
		id:  int(id),
	}

	p := find_port(key)

	if p == nil {
		nxt_go_alert("port %d:%d not found", pid, id)
		return 0
	}

	n, oobn, _, _, err := p.rcv.ReadMsgUnix(GoBytes(buf, buf_size),
		GoBytes(oob, C.int(*oob_size)))

	if err != nil {
		if nerr, ok := err.(*net.OpError); ok {
			if nerr.Err == io.EOF {
				return 0
			}
		}

		nxt_go_warn("read result %d (%d), %s", n, oobn, err)

		n = -1

	} else {
		*oob_size = C.size_t(oobn)
	}

	return C.ssize_t(n)
}