summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorMax Romanov <max.romanov@nginx.com>2017-12-13 18:12:13 +0300
committerMax Romanov <max.romanov@nginx.com>2017-12-13 18:12:13 +0300
commit799cff5f3e226d8a82a767714c59815939a989dd (patch)
treea844660aba1423de68726f55a83cdbe6a7d49d82
parentb680e944d65d56e92919287247bd47e13ad030e9 (diff)
downloadunit-799cff5f3e226d8a82a767714c59815939a989dd.tar.gz
unit-799cff5f3e226d8a82a767714c59815939a989dd.tar.bz2
Introducing Unit version check in Go package.
To communicate with the Go program, Unit setup environment variable named NXT_GO_PORTS with value contains Unit version, stream id to confirm application is started, and Unit ports information. Go Unit package parses this string and compares runtime version with compile time version. In case of parse error or version mismatch, ListenAndServe() returns with the error.
-rw-r--r--src/go/unit/nxt_go_lib.c20
-rw-r--r--src/go/unit/nxt_go_lib.h4
-rw-r--r--src/go/unit/unit.go61
-rw-r--r--src/nxt_go.c102
4 files changed, 122 insertions, 65 deletions
diff --git a/src/go/unit/nxt_go_lib.c b/src/go/unit/nxt_go_lib.c
index 84d64c0e..4c6b9ac2 100644
--- a/src/go/unit/nxt_go_lib.c
+++ b/src/go/unit/nxt_go_lib.c
@@ -112,18 +112,11 @@ nxt_go_request_done(nxt_go_request_t r)
void
-nxt_go_ready()
+nxt_go_ready(uint32_t stream)
{
- char *go_stream;
- nxt_port_msg_t port_msg;
+ nxt_port_msg_t port_msg;
- go_stream = getenv("NXT_GO_STREAM");
-
- if (go_stream == NULL) {
- return;
- }
-
- port_msg.stream = atol(go_stream);
+ port_msg.stream = stream;
port_msg.pid = getpid();
port_msg.reply_port = 0;
port_msg.type = _NXT_PORT_MSG_PROCESS_READY;
@@ -141,3 +134,10 @@ nxt_go_process_port_msg(uintptr_t buf, size_t buf_len, uintptr_t oob, size_t oob
{
return nxt_go_port_on_read((void *) buf, buf_len, (void *) oob, oob_len);
}
+
+
+const char *
+nxt_go_version()
+{
+ return NXT_VERSION;
+}
diff --git a/src/go/unit/nxt_go_lib.h b/src/go/unit/nxt_go_lib.h
index 33acf334..aecdd4af 100644
--- a/src/go/unit/nxt_go_lib.h
+++ b/src/go/unit/nxt_go_lib.h
@@ -30,10 +30,12 @@ int nxt_go_request_close(nxt_go_request_t r);
int nxt_go_request_done(nxt_go_request_t r);
-void nxt_go_ready();
+void nxt_go_ready(uint32_t stream);
nxt_go_request_t nxt_go_process_port_msg(uintptr_t buf, size_t buf_len,
uintptr_t oob, size_t oob_len);
+const char *nxt_go_version();
+
#endif /* _NXT_GO_LIB_H_INCLUDED_ */
diff --git a/src/go/unit/unit.go b/src/go/unit/unit.go
index 82a54ecc..77be7612 100644
--- a/src/go/unit/unit.go
+++ b/src/go/unit/unit.go
@@ -11,6 +11,7 @@ package unit
import "C"
import (
+ "errors"
"fmt"
"net/http"
"os"
@@ -65,31 +66,53 @@ func ListenAndServe(addr string, handler http.Handler) error {
var read_port *port
go_ports_env := os.Getenv("NXT_GO_PORTS")
+ if go_ports_env == "" {
+ return http.ListenAndServe(addr, handler)
+ }
+
+ nxt_go_debug("NXT_GO_PORTS=%s", go_ports_env)
ports := strings.Split(go_ports_env, ";")
pid := os.Getpid()
- for _, port_str := range ports {
- if len(port_str) <= 0 {
- continue
- }
+ if len(ports) != 4 {
+ return errors.New("Invalid NXT_GO_PORTS format")
+ }
+
+ nxt_go_debug("version=%s", ports[0])
+
+ builtin_version := C.GoString(C.nxt_go_version())
+
+ if ports[0] != builtin_version {
+ return fmt.Errorf("Versions mismatch: Unit %s, while application is built with %s",
+ ports[0], builtin_version)
+ }
+
+ stream, stream_err := strconv.Atoi(ports[1])
+ if stream_err != nil {
+ return stream_err
+ }
+
+ read_port = nil
+ for _, port_str := range ports[2:] {
attrs := strings.Split(port_str, ",")
+ if len(attrs) != 5 {
+ return fmt.Errorf("Invalid port format: unexpected port attributes number %d, while 5 expected",
+ len(attrs))
+ }
+
var attrsN [5]int
var err error
for i, attr := range attrs {
attrsN[i], err = strconv.Atoi(attr)
if err != nil {
- fmt.Printf("err %s\n", err)
- break
+ return fmt.Errorf("Invalid port format: number attribute expected at %d position instead of '%s'",
+ i, attr);
}
}
- if err != nil {
- continue
- }
-
p := new_port(attrsN[0], attrsN[1], attrsN[2], attrsN[3], attrsN[4])
if attrsN[0] == pid {
@@ -97,17 +120,17 @@ func ListenAndServe(addr string, handler http.Handler) error {
}
}
- if read_port != nil {
- C.nxt_go_ready()
+ if read_port == nil {
+ return errors.New("Application read port not found");
+ }
- for !nxt_go_quit {
- err := read_port.read(handler)
- if err != nil {
- return err
- }
+ C.nxt_go_ready(C.uint32_t(stream))
+
+ for !nxt_go_quit {
+ err := read_port.read(handler)
+ if err != nil {
+ return err
}
- } else {
- return http.ListenAndServe(addr, handler)
}
return nil
diff --git a/src/nxt_go.c b/src/nxt_go.c
index 6ca73957..207b0deb 100644
--- a/src/nxt_go.c
+++ b/src/nxt_go.c
@@ -25,13 +25,34 @@ nxt_application_module_t nxt_go_module = {
extern char **environ;
-nxt_inline int
-nxt_sock_no_cloexec(nxt_socket_t fd)
+nxt_inline nxt_int_t
+nxt_go_fd_no_cloexec(nxt_task_t *task, nxt_socket_t fd)
{
+ int res, flags;
+
if (fd == -1) {
- return 0;
+ return NXT_OK;
+ }
+
+ flags = fcntl(fd, F_GETFD);
+
+ if (nxt_slow_path(flags == -1)) {
+ nxt_log(task, NXT_LOG_CRIT, "fcntl(%d, F_GETFD) failed %E",
+ fd, nxt_errno);
+ return NXT_ERROR;
+ }
+
+ flags &= ~FD_CLOEXEC;
+
+ res = fcntl(fd, F_SETFD, flags);
+
+ if (nxt_slow_path(res == -1)) {
+ nxt_log(task, NXT_LOG_CRIT, "fcntl(%d, F_SETFD) failed %E",
+ fd, nxt_errno);
+ return NXT_ERROR;
}
- return fcntl(fd, F_SETFD, 0);
+
+ return NXT_OK;
}
@@ -40,58 +61,69 @@ nxt_go_init(nxt_task_t *task, nxt_common_app_conf_t *conf)
{
char *argv[2];
u_char buf[256];
- u_char *p;
- u_char stream_buf[32];
- nxt_port_t *port;
+ u_char *p, *end;
+ nxt_int_t rc;
+ nxt_port_t *my_port, *main_port;
nxt_runtime_t *rt;
nxt_go_app_conf_t *c;
- c = &conf->u.go;
rt = task->thread->runtime;
- p = buf;
- nxt_runtime_port_each(rt, port) {
+ main_port = rt->port_by_type[NXT_PROCESS_MAIN];
+ my_port = nxt_runtime_port_find(rt, nxt_pid, 0);
- if (port->pid != nxt_pid && port->type != NXT_PROCESS_MAIN) {
- continue;
- }
-
- if (port->pid == nxt_pid) {
- nxt_sprintf(stream_buf, stream_buf + sizeof(stream_buf),
- "%uD", port->process->init->stream);
+ if (nxt_slow_path(main_port == NULL || my_port == NULL)) {
+ return NXT_ERROR;
+ }
- setenv("NXT_GO_STREAM", (char *) stream_buf, 1);
- }
+ rc = nxt_go_fd_no_cloexec(task, main_port->pair[1]);
+ if (nxt_slow_path(rc != NXT_OK)) {
+ return NXT_ERROR;
+ }
- nxt_debug(task, "port %PI, %ud, (%d, %d)", port->pid, port->id,
- port->pair[0], port->pair[1]);
+ rc = nxt_go_fd_no_cloexec(task, my_port->pair[0]);
+ if (nxt_slow_path(rc != NXT_OK)) {
+ return NXT_ERROR;
+ }
- p = nxt_sprintf(p, buf + sizeof(buf), "%PI,%ud,%d,%d,%d;",
- port->pid, port->id, (int) port->type,
- port->pair[0], port->pair[1]);
+ end = buf + sizeof(buf);
- if (nxt_slow_path(nxt_sock_no_cloexec(port->pair[0]))) {
- nxt_log(task, NXT_LOG_WARN, "fcntl() failed %E", nxt_errno);
- }
+ p = nxt_sprintf(buf, end,
+ "%s;%uD;"
+ "%PI,%ud,%d,%d,%d;"
+ "%PI,%ud,%d,%d,%d%Z",
+ NXT_VERSION, my_port->process->init->stream,
+ main_port->pid, main_port->id, (int) main_port->type,
+ -1, main_port->pair[1],
+ my_port->pid, my_port->id, (int) my_port->type,
+ my_port->pair[0], -1);
- if (nxt_slow_path(nxt_sock_no_cloexec(port->pair[1]))) {
- nxt_log(task, NXT_LOG_WARN, "fcntl() failed %E", nxt_errno);
- }
+ if (nxt_slow_path(p == end)) {
+ nxt_log(task, NXT_LOG_ALERT,
+ "internal error: buffer too small for NXT_GO_PORTS");
- } nxt_runtime_port_loop;
+ return NXT_ERROR;
+ }
- *p = '\0';
nxt_debug(task, "update NXT_GO_PORTS=%s", buf);
- setenv("NXT_GO_PORTS", (char *) buf, 1);
+ rc = setenv("NXT_GO_PORTS", (char *) buf, 1);
+ if (nxt_slow_path(rc == -1)) {
+ nxt_log(task, NXT_LOG_CRIT, "setenv(NXT_GO_PORTS, %s) failed %E",
+ buf, nxt_errno);
+
+ return NXT_ERROR;
+ }
+
+ c = &conf->u.go;
argv[0] = c->executable;
argv[1] = NULL;
(void) execve(c->executable, argv, environ);
- nxt_log(task, NXT_LOG_WARN, "execve(%s) failed %E", c->executable,
- nxt_errno);
+ nxt_log(task, NXT_LOG_CRIT, "execve(%s) failed %E",
+ c->executable, nxt_errno);
return NXT_ERROR;
}