diff options
Diffstat (limited to '')
-rw-r--r-- | src/nxt_application.c | 325 | ||||
-rw-r--r-- | src/nxt_application.h | 67 | ||||
-rw-r--r-- | src/nxt_conf_validation.c | 28 | ||||
-rw-r--r-- | src/nxt_go.c | 95 | ||||
-rw-r--r-- | src/nxt_master_process.c | 177 | ||||
-rw-r--r-- | src/nxt_master_process.h | 2 | ||||
-rw-r--r-- | src/nxt_php_sapi.c | 115 | ||||
-rw-r--r-- | src/nxt_port.h | 2 | ||||
-rw-r--r-- | src/nxt_process.h | 1 | ||||
-rw-r--r-- | src/nxt_python_wsgi.c | 107 | ||||
-rw-r--r-- | src/nxt_router.c | 277 | ||||
-rw-r--r-- | src/nxt_router.h | 7 | ||||
-rw-r--r-- | src/nxt_runtime.c | 28 | ||||
-rw-r--r-- | src/nxt_runtime.h | 1 | ||||
-rw-r--r-- | src/nxt_string.c | 5 | ||||
-rw-r--r-- | src/nxt_string.h | 2 | ||||
-rw-r--r-- | src/nxt_worker_process.c | 20 |
17 files changed, 858 insertions, 401 deletions
diff --git a/src/nxt_application.c b/src/nxt_application.c index 1b23eca3..4689f622 100644 --- a/src/nxt_application.c +++ b/src/nxt_application.c @@ -11,8 +11,22 @@ #include <nxt_application.h> #include <nxt_master_process.h> +#include <glob.h> + + +typedef struct { + nxt_str_t type; + nxt_str_t version; + nxt_str_t file; +} nxt_module_t; + + +static nxt_buf_t *nxt_discovery_modules(nxt_task_t *task, const char *path); +static nxt_int_t nxt_discovery_module(nxt_task_t *task, nxt_mp_t *mp, + nxt_array_t *modules, const char *name); +static nxt_app_module_t *nxt_app_module_load(nxt_task_t *task, + const char *name); -nxt_application_module_t *nxt_app_modules[NXT_APP_MAX]; static nxt_thread_mutex_t nxt_app_mutex; static nxt_thread_cond_t nxt_app_cond; @@ -22,14 +36,243 @@ static nxt_http_fields_hash_t *nxt_app_request_fields_hash; static nxt_application_module_t *nxt_app; + +nxt_int_t +nxt_discovery_start(nxt_task_t *task, void *data) +{ + nxt_buf_t *b; + nxt_port_t *main_port; + nxt_runtime_t *rt; + + nxt_debug(task, "DISCOVERY"); + + b = nxt_discovery_modules(task, "build/nginext.*"); + + rt = task->thread->runtime; + main_port = rt->port_by_type[NXT_PROCESS_MASTER]; + + nxt_port_socket_write(task, main_port, NXT_PORT_MSG_MODULES, -1, + 0, -1, b); + + return NXT_OK; +} + + +static void +nxt_discovery_completion_handler(nxt_task_t *task, void *obj, void *data) +{ + nxt_mp_t *mp; + nxt_buf_t *b; + + b = obj; + mp = b->data; + + nxt_mp_destroy(mp); + + exit(0); +} + + +static nxt_buf_t * +nxt_discovery_modules(nxt_task_t *task, const char *path) +{ + char *name; + u_char *p, *end; + size_t size; + glob_t glb; + nxt_mp_t *mp; + nxt_buf_t *b; + nxt_int_t ret; + nxt_uint_t i, n; + nxt_array_t *modules; + nxt_module_t *module; + + b = NULL; + + mp = nxt_mp_create(1024, 128, 256, 32); + if (mp == NULL) { + return b; + } + + ret = glob(path, 0, NULL, &glb); + + if (ret == 0) { + n = glb.gl_pathc; + + modules = nxt_array_create(mp, n, sizeof(nxt_module_t)); + if (modules == NULL) { + goto fail; + } + + for (i = 0; i < n; i++) { + name = glb.gl_pathv[i]; + + ret = nxt_discovery_module(task, mp, modules, name); + if (ret != NXT_OK) { + goto fail; + } + } + + size = sizeof("[]") - 1; + module = modules->elts; + n = modules->nelts; + + for (i = 0; i < n; i++) { + nxt_debug(task, "module: %V %V %V", + &module[i].type, &module[i].version, &module[i].file); + + size += sizeof("{\"type\": \"\",") - 1; + size += sizeof(" \"version\": \"\",") - 1; + size += sizeof(" \"file\": \"\"},") - 1; + + size += module[i].type.length + + module[i].version.length + + module[i].file.length; + } + + b = nxt_buf_mem_alloc(mp, size, 0); + if (b == NULL) { + goto fail; + } + + b->completion_handler = nxt_discovery_completion_handler; + + p = b->mem.free; + end = b->mem.end; + *p++ = '['; + + for (i = 0; i < n; i++) { + p = nxt_sprintf(p, end, + "{\"type\": \"%V\", \"version\": \"%V\", \"file\": \"%V\"},", + &module[i].type, &module[i].version, &module[i].file); + } + + *p++ = ']'; + b->mem.free = p; + } + +fail: + + globfree(&glb); + + return b; +} + + +static nxt_int_t +nxt_discovery_module(nxt_task_t *task, nxt_mp_t *mp, nxt_array_t *modules, + const char *name) +{ + void *dl; + nxt_str_t *s; + nxt_int_t ret; + nxt_uint_t i, n; + nxt_module_t *module; + nxt_application_module_t *app; + + /* + * Only memory allocation failure should return NXT_ERROR. + * Any module processing errors are ignored. + */ + ret = NXT_ERROR; + + dl = dlopen(name, RTLD_GLOBAL | RTLD_NOW); + + if (dl == NULL) { + nxt_log(task, NXT_LOG_CRIT, "dlopen(\"%s\"), failed: \"%s\"", + name, dlerror()); + return NXT_OK; + } + + app = dlsym(dl, "nxt_app_module"); + + if (app != NULL) { + nxt_log(task, NXT_LOG_NOTICE, "module: %V \"%s\"", + &app->version, name); + + module = modules->elts; + n = modules->nelts; + + for (i = 0; i < n; i++) { + if (nxt_strstr_eq(&app->version, &module[i].version)) { + nxt_log(task, NXT_LOG_NOTICE, + "ignoring %s module with the same " + "application language version %V as in %s", + name, &module[i].version, &module[i].file); + + goto done; + } + } + + module = nxt_array_add(modules); + if (module == NULL) { + goto fail; + } + + s = nxt_str_dup(mp, &module->type, &app->type); + if (s == NULL) { + goto fail; + } + + s = nxt_str_dup(mp, &module->version, &app->version); + if (s == NULL) { + goto fail; + } + + module->file.length = nxt_strlen(name); + + module->file.start = nxt_mp_alloc(mp, module->file.length); + if (module->file.start == NULL) { + goto fail; + } + + nxt_memcpy(module->file.start, name, module->file.length); + + } else { + nxt_log(task, NXT_LOG_CRIT, "dlsym(\"%s\"), failed: \"%s\"", + name, dlerror()); + } + +done: + + ret = NXT_OK; + +fail: + + if (dlclose(dl) != 0) { + nxt_log(task, NXT_LOG_CRIT, "dlclose(\"%s\"), failed: \"%s\"", + name, dlerror()); + } + + return ret; +} + + nxt_int_t nxt_app_start(nxt_task_t *task, void *data) { - nxt_int_t ret; - nxt_common_app_conf_t *app_conf; + nxt_int_t ret; + nxt_app_lang_module_t *lang; + nxt_common_app_conf_t *app_conf; app_conf = data; + lang = nxt_app_lang_module(task->thread->runtime, &app_conf->type); + if (nxt_slow_path(lang == NULL)) { + nxt_log(task, NXT_LOG_CRIT, "unknown application type: \"%V\"", + &app_conf->type); + return NXT_ERROR; + } + + nxt_app = lang->module; + + if (nxt_app == NULL) { + nxt_debug(task, "application language module: %V \"%s\"", + &lang->version, lang->file); + + nxt_app = nxt_app_module_load(task, lang->file); + } + if (nxt_slow_path(nxt_thread_mutex_create(&nxt_app_mutex) != NXT_OK)) { return NXT_ERROR; } @@ -38,8 +281,6 @@ nxt_app_start(nxt_task_t *task, void *data) return NXT_ERROR; } - nxt_app = nxt_app_modules[app_conf->type_id]; - ret = nxt_app->init(task, data); if (nxt_slow_path(ret != NXT_OK)) { @@ -53,6 +294,24 @@ nxt_app_start(nxt_task_t *task, void *data) } +static nxt_app_module_t * +nxt_app_module_load(nxt_task_t *task, const char *name) +{ + void *dl; + + dl = dlopen(name, RTLD_GLOBAL | RTLD_LAZY); + + if (dl != NULL) { + return dlsym(dl, "nxt_app_module"); + } + + nxt_log(task, NXT_LOG_CRIT, "dlopen(\"%s\"), failed: \"%s\"", + name, dlerror()); + + return NULL; +} + + nxt_int_t nxt_app_http_init(nxt_task_t *task, nxt_runtime_t *rt) { @@ -681,30 +940,56 @@ nxt_app_msg_write_raw(nxt_task_t *task, nxt_app_wmsg_t *msg, const u_char *c, } +nxt_app_lang_module_t * +nxt_app_lang_module(nxt_runtime_t *rt, nxt_str_t *name) +{ + u_char *p, *end, *version; + size_t type_length, version_length; + nxt_uint_t i, n; + nxt_app_lang_module_t *lang; + + end = name->start + name->length; + version = end; + + for (p = name->start; p < end; p++) { + if (*p == ' ') { + version = p + 1; + break; + } + + if (*p >= '0' && *p <= '9') { + version = p; + break; + } + } + + type_length = p - name->start; + version_length = end - version; + + lang = rt->languages->elts; + n = rt->languages->nelts; + + for (i = 0; i < n; i++) { + if (nxt_str_eq(&lang[i].type, name->start, type_length) + && nxt_str_start(&lang[i].version, version, version_length)) + { + return &lang[i]; + } + } + + return NULL; +} + + nxt_app_type_t nxt_app_parse_type(nxt_str_t *str) { if (nxt_str_eq(str, "python", 6)) { return NXT_APP_PYTHON; - } else if (nxt_str_eq(str, "python2", 7)) { - return NXT_APP_PYTHON2; - - } else if (nxt_str_eq(str, "python3", 7)) { - return NXT_APP_PYTHON3; - } else if (nxt_str_eq(str, "php", 3)) { return NXT_APP_PHP; - } else if (nxt_str_eq(str, "php5", 4)) { - return NXT_APP_PHP5; - - } else if (nxt_str_eq(str, "php7", 4)) { - return NXT_APP_PHP7; - - } else if (nxt_str_eq(str, "ruby", 4)) { - return NXT_APP_RUBY; - } else if (nxt_str_eq(str, "go", 2)) { return NXT_APP_GO; diff --git a/src/nxt_application.h b/src/nxt_application.h index 9efb5008..c8d9998d 100644 --- a/src/nxt_application.h +++ b/src/nxt_application.h @@ -10,20 +10,26 @@ typedef enum { - NXT_APP_UNKNOWN = 0, NXT_APP_PYTHON, - NXT_APP_PYTHON2, - NXT_APP_PYTHON3, NXT_APP_PHP, - NXT_APP_PHP5, - NXT_APP_PHP7, - NXT_APP_RUBY, NXT_APP_GO, - NXT_APP_MAX, + NXT_APP_UNKNOWN, } nxt_app_type_t; +typedef struct nxt_app_module_s nxt_application_module_t; +typedef struct nxt_app_module_s nxt_app_module_t; + + +typedef struct { + nxt_str_t type; + nxt_str_t version; + char *file; + nxt_application_module_t *module; +} nxt_app_lang_module_t; + + typedef struct nxt_common_app_conf_s nxt_common_app_conf_t; @@ -48,7 +54,6 @@ typedef struct { struct nxt_common_app_conf_s { nxt_str_t name; nxt_str_t type; - nxt_app_type_t type_id; nxt_str_t user; nxt_str_t group; @@ -148,13 +153,13 @@ nxt_inline u_char * nxt_app_msg_write_length(u_char *dst, size_t length); /* TODO asynchronous mmap buffer assignment */ -u_char *nxt_app_msg_write_get_buf(nxt_task_t *task, nxt_app_wmsg_t *msg, - size_t size); +NXT_EXPORT u_char *nxt_app_msg_write_get_buf(nxt_task_t *task, + nxt_app_wmsg_t *msg, size_t size); -nxt_int_t nxt_app_msg_write(nxt_task_t *task, nxt_app_wmsg_t *msg, +NXT_EXPORT nxt_int_t nxt_app_msg_write(nxt_task_t *task, nxt_app_wmsg_t *msg, u_char *c, size_t size); -nxt_int_t nxt_app_msg_write_prefixed_upcase(nxt_task_t *task, +NXT_EXPORT nxt_int_t nxt_app_msg_write_prefixed_upcase(nxt_task_t *task, nxt_app_wmsg_t *msg, const nxt_str_t *prefix, const nxt_str_t *v); nxt_inline nxt_int_t @@ -178,44 +183,37 @@ nxt_app_msg_write_nvp_(nxt_task_t *task, nxt_app_wmsg_t *msg, nxt_inline nxt_int_t nxt_app_msg_write_size(nxt_task_t *task, nxt_app_wmsg_t *msg, size_t size); -nxt_int_t nxt_app_msg_flush(nxt_task_t *task, nxt_app_wmsg_t *msg, +NXT_EXPORT nxt_int_t nxt_app_msg_flush(nxt_task_t *task, nxt_app_wmsg_t *msg, nxt_bool_t last); -nxt_int_t nxt_app_msg_write_raw(nxt_task_t *task, nxt_app_wmsg_t *msg, - const u_char *c, size_t size); +NXT_EXPORT nxt_int_t nxt_app_msg_write_raw(nxt_task_t *task, + nxt_app_wmsg_t *msg, const u_char *c, size_t size); -nxt_int_t nxt_app_msg_read_str(nxt_task_t *task, nxt_app_rmsg_t *msg, +NXT_EXPORT nxt_int_t nxt_app_msg_read_str(nxt_task_t *task, nxt_app_rmsg_t *msg, nxt_str_t *str); -size_t nxt_app_msg_read_raw(nxt_task_t *task, nxt_app_rmsg_t *msg, void *buf, - size_t size); +NXT_EXPORT size_t nxt_app_msg_read_raw(nxt_task_t *task, + nxt_app_rmsg_t *msg, void *buf, size_t size); -nxt_int_t nxt_app_msg_read_nvp(nxt_task_t *task, nxt_app_rmsg_t *rmsg, - nxt_str_t *n, nxt_str_t *v); +NXT_EXPORT nxt_int_t nxt_app_msg_read_nvp(nxt_task_t *task, + nxt_app_rmsg_t *rmsg, nxt_str_t *n, nxt_str_t *v); -nxt_int_t nxt_app_msg_read_size(nxt_task_t *task, nxt_app_rmsg_t *rmsg, - size_t *size); +NXT_EXPORT nxt_int_t nxt_app_msg_read_size(nxt_task_t *task, + nxt_app_rmsg_t *rmsg, size_t *size); -typedef struct nxt_app_module_s nxt_application_module_t; -typedef struct nxt_app_module_s nxt_app_module_t; - struct nxt_app_module_s { + nxt_str_t type; + nxt_str_t version; + nxt_int_t (*init)(nxt_task_t *task, nxt_common_app_conf_t *conf); - nxt_int_t (*prepare_msg)(nxt_task_t *task, - nxt_app_request_t *r, - nxt_app_wmsg_t *wmsg); nxt_int_t (*run)(nxt_task_t *task, nxt_app_rmsg_t *rmsg, nxt_app_wmsg_t *wmsg); }; -extern nxt_application_module_t *nxt_app_modules[NXT_APP_MAX]; - - - nxt_int_t nxt_app_http_read_body(nxt_app_request_t *r, u_char *data, size_t len); nxt_int_t nxt_app_write(nxt_app_request_t *r, const u_char *data, size_t len); @@ -290,6 +288,11 @@ nxt_app_msg_read_length(u_char *src, size_t *length) } +nxt_app_lang_module_t *nxt_app_lang_module(nxt_runtime_t *rt, nxt_str_t *name); nxt_app_type_t nxt_app_parse_type(nxt_str_t *str); + +extern nxt_application_module_t nxt_go_module; + + #endif /* _NXT_APPLICATION_H_INCLIDED_ */ diff --git a/src/nxt_conf_validation.c b/src/nxt_conf_validation.c index 0608eaa9..5cc2569f 100644 --- a/src/nxt_conf_validation.c +++ b/src/nxt_conf_validation.c @@ -6,6 +6,7 @@ #include <nxt_main.h> #include <nxt_conf.h> +#include <nxt_application.h> typedef struct { @@ -213,11 +214,20 @@ static nxt_int_t nxt_conf_vldt_app(nxt_conf_value_t *conf, nxt_str_t *name, nxt_conf_value_t *value) { - nxt_str_t type; - nxt_conf_value_t *type_value; + nxt_str_t type; + nxt_uint_t n; + nxt_thread_t *thread; + nxt_conf_value_t *type_value; + nxt_app_lang_module_t *lang; static nxt_str_t type_str = nxt_string("type"); + static void *members[] = { + nxt_conf_vldt_python_members, + nxt_conf_vldt_php_members, + nxt_conf_vldt_go_members, + }; + type_value = nxt_conf_get_object_member(value, &type_str, NULL); if (nxt_slow_path(type_value == NULL)) { @@ -230,16 +240,16 @@ nxt_conf_vldt_app(nxt_conf_value_t *conf, nxt_str_t *name, nxt_conf_get_string(type_value, &type); - if (nxt_str_eq(&type, "python", 6)) { - return nxt_conf_vldt_object(conf, value, nxt_conf_vldt_python_members); - } + thread = nxt_thread(); - if (nxt_str_eq(&type, "php", 3)) { - return nxt_conf_vldt_object(conf, value, nxt_conf_vldt_php_members); + lang = nxt_app_lang_module(thread->runtime, &type); + if (lang == NULL) { + return NXT_ERROR; } - if (nxt_str_eq(&type, "go", 2)) { - return nxt_conf_vldt_object(conf, value, nxt_conf_vldt_go_members); + n = nxt_app_parse_type(&lang->type); + if (n != NXT_APP_UNKNOWN) { + return nxt_conf_vldt_object(conf, value, members[n]); } return NXT_ERROR; diff --git a/src/nxt_go.c b/src/nxt_go.c index b821cced..ee446921 100644 --- a/src/nxt_go.c +++ b/src/nxt_go.c @@ -10,30 +10,17 @@ static nxt_int_t nxt_go_init(nxt_task_t *task, nxt_common_app_conf_t *conf); -static nxt_int_t nxt_go_prepare_msg(nxt_task_t *task, - nxt_app_request_t *r, nxt_app_wmsg_t *msg); - static nxt_int_t nxt_go_run(nxt_task_t *task, nxt_app_rmsg_t *rmsg, nxt_app_wmsg_t *msg); nxt_application_module_t nxt_go_module = { + nxt_string("go"), + nxt_string("go"), nxt_go_init, - nxt_go_prepare_msg, - nxt_go_run + nxt_go_run, }; -nxt_int_t -nxt_go_module_init(nxt_thread_t *thr, nxt_runtime_t *rt); - -nxt_int_t -nxt_go_module_init(nxt_thread_t *thr, nxt_runtime_t *rt) -{ - nxt_app_modules[NXT_APP_GO] = &nxt_go_module; - - return NXT_OK; -} - extern char **environ; nxt_inline int @@ -113,82 +100,6 @@ nxt_go_init(nxt_task_t *task, nxt_common_app_conf_t *conf) static nxt_int_t -nxt_go_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, nxt_app_wmsg_t *wmsg) -{ - nxt_int_t rc; - nxt_buf_t *b; - nxt_http_field_t *field; - nxt_app_request_header_t *h; - - static const nxt_str_t eof = nxt_null_string; - - h = &r->header; - -#define RC(S) \ - do { \ - rc = (S); \ - if (nxt_slow_path(rc != NXT_OK)) { \ - goto fail; \ - } \ - } while(0) - -#define NXT_WRITE(N) \ - RC(nxt_app_msg_write_str(task, wmsg, N)) - - /* TODO error handle, async mmap buffer assignment */ - - NXT_WRITE(&h->method); - NXT_WRITE(&h->target); - if (h->path.start == h->target.start) { - NXT_WRITE(&eof); - } else { - NXT_WRITE(&h->path); - } - - if (h->query.start != NULL) { - RC(nxt_app_msg_write_size(task, wmsg, - h->query.start - h->target.start + 1)); - } else { - RC(nxt_app_msg_write_size(task, wmsg, 0)); - } - - NXT_WRITE(&h->version); - - NXT_WRITE(&h->host); - NXT_WRITE(&h->cookie); - NXT_WRITE(&h->content_type); - NXT_WRITE(&h->content_length); - - RC(nxt_app_msg_write_size(task, wmsg, h->parsed_content_length)); - - nxt_list_each(field, h->fields) { - NXT_WRITE(&field->name); - NXT_WRITE(&field->value); - - } nxt_list_loop; - - /* end-of-headers mark */ - NXT_WRITE(&eof); - - RC(nxt_app_msg_write_size(task, wmsg, r->body.preread_size)); - - for(b = r->body.buf; b != NULL; b = b->next) { - RC(nxt_app_msg_write_raw(task, wmsg, b->mem.pos, - nxt_buf_mem_used_size(&b->mem))); - } - -#undef NXT_WRITE -#undef RC - - return NXT_OK; - -fail: - - return NXT_ERROR; -} - - -static nxt_int_t nxt_go_run(nxt_task_t *task, nxt_app_rmsg_t *rmsg, nxt_app_wmsg_t *msg) { diff --git a/src/nxt_master_process.c b/src/nxt_master_process.c index 3e72ce85..8a4f8564 100644 --- a/src/nxt_master_process.c +++ b/src/nxt_master_process.c @@ -27,6 +27,8 @@ static nxt_int_t nxt_master_start_controller_process(nxt_task_t *task, nxt_runtime_t *rt); static nxt_int_t nxt_master_start_router_process(nxt_task_t *task, nxt_runtime_t *rt); +static nxt_int_t nxt_master_start_discovery_process(nxt_task_t *task, + nxt_runtime_t *rt); static nxt_int_t nxt_master_start_worker_process(nxt_task_t *task, nxt_runtime_t *rt, nxt_common_app_conf_t *app_conf, uint32_t stream); static nxt_int_t nxt_master_create_worker_process(nxt_task_t *task, @@ -44,6 +46,9 @@ static void nxt_master_port_socket_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); static nxt_int_t nxt_master_listening_socket(nxt_sockaddr_t *sa, nxt_listening_socket_t *ls); +static void nxt_master_port_modules_handler(nxt_task_t *task, + nxt_port_recv_msg_t *msg); +static int nxt_cdecl nxt_app_lang_compare(const void *v1, const void *v2); const nxt_sig_event_t nxt_master_process_signals[] = { @@ -63,8 +68,6 @@ nxt_int_t nxt_master_process_start(nxt_thread_t *thr, nxt_task_t *task, nxt_runtime_t *rt) { - nxt_int_t ret; - rt->types |= (1U << NXT_PROCESS_MASTER); if (nxt_master_process_port_create(task, rt) != NXT_OK) { @@ -73,12 +76,12 @@ nxt_master_process_start(nxt_thread_t *thr, nxt_task_t *task, nxt_master_process_title(task); - ret = nxt_master_start_controller_process(task, rt); - if (ret != NXT_OK) { - return ret; - } - - return nxt_master_start_router_process(task, rt); + /* + * The dicsovery process will send a message processed by + * nxt_master_port_modules_handler() which starts the controller + * and router processes. + */ + return nxt_master_start_discovery_process(task, rt); } @@ -197,8 +200,6 @@ nxt_port_master_start_worker_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) return; } - app_conf.type_id = nxt_app_parse_type(&app_conf.type); - ret = nxt_master_start_worker_process(task, task->thread->runtime, &app_conf, msg->port_msg.stream); @@ -216,6 +217,7 @@ static nxt_port_handler_t nxt_master_process_port_handlers[] = { nxt_port_ready_handler, nxt_port_master_start_worker_handler, nxt_master_port_socket_handler, + nxt_master_port_modules_handler, nxt_port_rpc_handler, nxt_port_rpc_handler, }; @@ -289,7 +291,7 @@ nxt_master_start_controller_process(nxt_task_t *task, nxt_runtime_t *rt) { nxt_process_init_t *init; - init = nxt_mp_get(rt->mem_pool, sizeof(nxt_process_init_t)); + init = nxt_malloc(sizeof(nxt_process_init_t)); if (nxt_slow_path(init == NULL)) { return NXT_ERROR; } @@ -309,11 +311,35 @@ nxt_master_start_controller_process(nxt_task_t *task, nxt_runtime_t *rt) static nxt_int_t +nxt_master_start_discovery_process(nxt_task_t *task, nxt_runtime_t *rt) +{ + nxt_process_init_t *init; + + init = nxt_malloc(sizeof(nxt_process_init_t)); + if (nxt_slow_path(init == NULL)) { + return NXT_ERROR; + } + + init->start = nxt_discovery_start; + init->name = "discovery"; + init->user_cred = &rt->user_cred; + init->port_handlers = nxt_discovery_process_port_handlers; + init->signals = nxt_worker_process_signals; + init->type = NXT_PROCESS_DISCOVERY; + init->data = rt; + init->stream = 0; + init->restart = 0; + + return nxt_master_create_worker_process(task, rt, init); +} + + +static nxt_int_t nxt_master_start_router_process(nxt_task_t *task, nxt_runtime_t *rt) { nxt_process_init_t *init; - init = nxt_mp_get(rt->mem_pool, sizeof(nxt_process_init_t)); + init = nxt_malloc(sizeof(nxt_process_init_t)); if (nxt_slow_path(init == NULL)) { return NXT_ERROR; } @@ -885,3 +911,130 @@ fail: return NXT_ERROR; } + + +static nxt_conf_map_t nxt_app_lang_module_map[] = { + { + nxt_string("type"), + NXT_CONF_MAP_STR_COPY, + offsetof(nxt_app_lang_module_t, type), + }, + + { + nxt_string("version"), + NXT_CONF_MAP_STR_COPY, + offsetof(nxt_app_lang_module_t, version), + }, + + { + nxt_string("file"), + NXT_CONF_MAP_CSTRZ, + offsetof(nxt_app_lang_module_t, file), + }, +}; + + +static void +nxt_master_port_modules_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +{ + uint32_t index; + nxt_mp_t *mp; + nxt_int_t ret; + nxt_buf_t *b; + nxt_runtime_t *rt; + nxt_conf_value_t *conf, *root, *value; + nxt_app_lang_module_t *lang; + + static nxt_str_t root_path = nxt_string("/"); + + rt = task->thread->runtime; + + if (msg->port_msg.pid != rt->port_by_type[NXT_PROCESS_DISCOVERY]->pid) { + return; + } + + b = msg->buf; + + if (b == NULL) { + return; + } + + nxt_debug(task, "application languages: \"%*s\"", + b->mem.free - b->mem.pos, b->mem.pos); + + mp = nxt_mp_create(1024, 128, 256, 32); + if (mp == NULL) { + return; + } + + conf = nxt_conf_json_parse(mp, b->mem.pos, b->mem.free, NULL); + if (conf == NULL) { + goto fail; + } + + root = nxt_conf_get_path(conf, &root_path); + if (root == NULL) { + goto fail; + } + + for (index = 0; /* void */ ; index++) { + value = nxt_conf_get_array_element(root, index); + if (value == NULL) { + break; + } + + lang = nxt_array_add(rt->languages); + if (lang == NULL) { + goto fail; + } + + lang->module = NULL; + + ret = nxt_conf_map_object(rt->mem_pool, value, nxt_app_lang_module_map, + nxt_nitems(nxt_app_lang_module_map), lang); + + if (ret != NXT_OK) { + goto fail; + } + + nxt_debug(task, "lang %V %V \"%s\"", + &lang->type, &lang->version, lang->file); + } + + qsort(rt->languages->elts, rt->languages->nelts, + sizeof(nxt_app_lang_module_t), nxt_app_lang_compare); + +fail: + + nxt_mp_destroy(mp); + + ret = nxt_master_start_controller_process(task, rt); + + if (ret == NXT_OK) { + (void) nxt_master_start_router_process(task, rt); + } +} + + +static int nxt_cdecl +nxt_app_lang_compare(const void *v1, const void *v2) +{ + int n; + size_t length; + const nxt_app_lang_module_t *lang1, *lang2; + + lang1 = v1; + lang2 = v2; + + length = nxt_min(lang1->version.length, lang2->version.length); + + n = nxt_strncmp(lang1->version.start, lang2->version.start, length); + + if (n == 0) { + n = lang1->version.length - lang2->version.length; + } + + /* Negate result to move higher versions to the beginning. */ + + return -n; +} diff --git a/src/nxt_master_process.h b/src/nxt_master_process.h index 3a7e4988..be591c76 100644 --- a/src/nxt_master_process.h +++ b/src/nxt_master_process.h @@ -25,10 +25,12 @@ void nxt_master_stop_worker_processes(nxt_task_t *task, nxt_runtime_t *runtime); nxt_int_t nxt_controller_start(nxt_task_t *task, void *data); nxt_int_t nxt_router_start(nxt_task_t *task, void *data); +nxt_int_t nxt_discovery_start(nxt_task_t *task, void *data); nxt_int_t nxt_app_start(nxt_task_t *task, void *data); extern nxt_port_handler_t nxt_controller_process_port_handlers[]; extern nxt_port_handler_t nxt_worker_process_port_handlers[]; +extern nxt_port_handler_t nxt_discovery_process_port_handlers[]; extern nxt_port_handler_t nxt_app_process_port_handlers[]; extern nxt_port_handler_t nxt_router_process_port_handlers[]; extern const nxt_sig_event_t nxt_master_process_signals[]; diff --git a/src/nxt_php_sapi.c b/src/nxt_php_sapi.c index 4f8f4696..1e1f4087 100644 --- a/src/nxt_php_sapi.c +++ b/src/nxt_php_sapi.c @@ -16,9 +16,6 @@ static nxt_int_t nxt_php_init(nxt_task_t *task, nxt_common_app_conf_t *conf); -static nxt_int_t nxt_php_prepare_msg(nxt_task_t *task, - nxt_app_request_t *r, nxt_app_wmsg_t *wmsg); - static nxt_int_t nxt_php_run(nxt_task_t *task, nxt_app_rmsg_t *rmsg, nxt_app_wmsg_t *wmsg); @@ -58,8 +55,6 @@ static int nxt_php_read_post(char *buffer, uint count_bytes TSRMLS_DC); static void nxt_php_flush(void *server_context); -extern nxt_int_t nxt_php_sapi_init(nxt_thread_t *thr, nxt_runtime_t *rt); - static sapi_module_struct nxt_php_sapi_module = { @@ -173,30 +168,14 @@ nxt_php_str_trim_lead(nxt_str_t *str, u_char t) } -nxt_application_module_t nxt_php_module = { +NXT_EXPORT nxt_application_module_t nxt_app_module = { + nxt_string("php"), + nxt_string(PHP_VERSION), nxt_php_init, - nxt_php_prepare_msg, - nxt_php_run + nxt_php_run, }; -nxt_int_t -nxt_php_sapi_init(nxt_thread_t *thr, nxt_runtime_t *rt) -{ - nxt_app_modules[NXT_APP_PHP] = &nxt_php_module; - -#if PHP_MAJOR_VERSION == 5 - nxt_app_modules[NXT_APP_PHP5] = &nxt_php_module; -#endif - -#if PHP_MAJOR_VERSION == 7 - nxt_app_modules[NXT_APP_PHP7] = &nxt_php_module; -#endif - - return NXT_OK; -} - - static nxt_int_t nxt_php_init(nxt_task_t *task, nxt_common_app_conf_t *conf) { @@ -357,92 +336,6 @@ fail: static nxt_int_t -nxt_php_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, - nxt_app_wmsg_t *wmsg) -{ - nxt_int_t rc; - nxt_buf_t *b; - nxt_http_field_t *field; - nxt_app_request_header_t *h; - - static const nxt_str_t prefix = nxt_string("HTTP_"); - static const nxt_str_t eof = nxt_null_string; - - h = &r->header; - -#define RC(S) \ - do { \ - rc = (S); \ - if (nxt_slow_path(rc != NXT_OK)) { \ - goto fail; \ - } \ - } while(0) - -#define NXT_WRITE(N) \ - RC(nxt_app_msg_write_str(task, wmsg, N)) - - /* TODO error handle, async mmap buffer assignment */ - - NXT_WRITE(&h->method); - NXT_WRITE(&h->target); - if (h->path.start == h->target.start) { - NXT_WRITE(&eof); - } else { - NXT_WRITE(&h->path); - } - - if (h->query.start != NULL) { - RC(nxt_app_msg_write_size(task, wmsg, - h->query.start - h->target.start + 1)); - } else { - RC(nxt_app_msg_write_size(task, wmsg, 0)); - } - - NXT_WRITE(&h->version); - - // PHP_SELF - // SCRIPT_NAME - // SCRIPT_FILENAME - // DOCUMENT_ROOT - - NXT_WRITE(&r->remote); - - NXT_WRITE(&h->host); - NXT_WRITE(&h->cookie); - NXT_WRITE(&h->content_type); - NXT_WRITE(&h->content_length); - - RC(nxt_app_msg_write_size(task, wmsg, h->parsed_content_length)); - - nxt_list_each(field, h->fields) { - RC(nxt_app_msg_write_prefixed_upcase(task, wmsg, - &prefix, &field->name)); - NXT_WRITE(&field->value); - - } nxt_list_loop; - - /* end-of-headers mark */ - NXT_WRITE(&eof); - - RC(nxt_app_msg_write_size(task, wmsg, r->body.preread_size)); - - for(b = r->body.buf; b != NULL; b = b->next) { - RC(nxt_app_msg_write_raw(task, wmsg, b->mem.pos, - nxt_buf_mem_used_size(&b->mem))); - } - -#undef NXT_WRITE -#undef RC - - return NXT_OK; - -fail: - - return NXT_ERROR; -} - - -static nxt_int_t nxt_php_run(nxt_task_t *task, nxt_app_rmsg_t *rmsg, nxt_app_wmsg_t *wmsg) { diff --git a/src/nxt_port.h b/src/nxt_port.h index 3f584e9f..78228ff7 100644 --- a/src/nxt_port.h +++ b/src/nxt_port.h @@ -24,6 +24,7 @@ typedef enum { _NXT_PORT_MSG_READY, _NXT_PORT_MSG_START_WORKER, _NXT_PORT_MSG_SOCKET, + _NXT_PORT_MSG_MODULES, _NXT_PORT_MSG_RPC_READY, _NXT_PORT_MSG_RPC_ERROR, @@ -41,6 +42,7 @@ typedef enum { NXT_PORT_MSG_START_WORKER = _NXT_PORT_MSG_START_WORKER | NXT_PORT_MSG_LAST, NXT_PORT_MSG_SOCKET = _NXT_PORT_MSG_SOCKET | NXT_PORT_MSG_LAST, + NXT_PORT_MSG_MODULES = _NXT_PORT_MSG_MODULES | NXT_PORT_MSG_LAST, NXT_PORT_MSG_RPC_READY = _NXT_PORT_MSG_RPC_READY, NXT_PORT_MSG_RPC_READY_LAST = _NXT_PORT_MSG_RPC_READY | NXT_PORT_MSG_LAST, NXT_PORT_MSG_RPC_ERROR = _NXT_PORT_MSG_RPC_ERROR | NXT_PORT_MSG_LAST, diff --git a/src/nxt_process.h b/src/nxt_process.h index b76c4f1a..8f33b4af 100644 --- a/src/nxt_process.h +++ b/src/nxt_process.h @@ -14,6 +14,7 @@ typedef enum { NXT_PROCESS_CONTROLLER, NXT_PROCESS_ROUTER, NXT_PROCESS_WORKER, + NXT_PROCESS_DISCOVERY, NXT_PROCESS_MAX, } nxt_process_type_t; diff --git a/src/nxt_python_wsgi.c b/src/nxt_python_wsgi.c index f1bd170f..f5c3ebae 100644 --- a/src/nxt_python_wsgi.c +++ b/src/nxt_python_wsgi.c @@ -61,9 +61,6 @@ typedef struct nxt_python_run_ctx_s nxt_python_run_ctx_t; static nxt_int_t nxt_python_init(nxt_task_t *task, nxt_common_app_conf_t *conf); -static nxt_int_t nxt_python_prepare_msg(nxt_task_t *task, - nxt_app_request_t *r, nxt_app_wmsg_t *msg); - static nxt_int_t nxt_python_run(nxt_task_t *task, nxt_app_rmsg_t *rmsg, nxt_app_wmsg_t *msg); @@ -93,13 +90,12 @@ nxt_inline nxt_int_t nxt_python_write(nxt_python_run_ctx_t *ctx, nxt_inline nxt_int_t nxt_python_write_py_str(nxt_python_run_ctx_t *ctx, PyObject *str, nxt_bool_t flush, nxt_bool_t last); -extern nxt_int_t nxt_python_wsgi_init(nxt_thread_t *thr, nxt_runtime_t *rt); - -nxt_application_module_t nxt_python_module = { +NXT_EXPORT nxt_application_module_t nxt_app_module = { + nxt_string("python"), + nxt_string(PY_VERSION), nxt_python_init, - nxt_python_prepare_msg, - nxt_python_run + nxt_python_run, }; @@ -177,23 +173,6 @@ static PyObject *nxt_py_environ_ptyp; static nxt_python_run_ctx_t *nxt_python_run_ctx; -nxt_int_t -nxt_python_wsgi_init(nxt_thread_t *thr, nxt_runtime_t *rt) -{ - nxt_app_modules[NXT_APP_PYTHON] = &nxt_python_module; - -#if PY_MAJOR_VERSION == 2 - nxt_app_modules[NXT_APP_PYTHON2] = &nxt_python_module; -#endif - -#if PY_MAJOR_VERSION == 3 - nxt_app_modules[NXT_APP_PYTHON3] = &nxt_python_module; -#endif - - return NXT_OK; -} - - static nxt_int_t nxt_python_init(nxt_task_t *task, nxt_common_app_conf_t *conf) { @@ -320,84 +299,6 @@ fail: static nxt_int_t -nxt_python_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, - nxt_app_wmsg_t *wmsg) -{ - nxt_int_t rc; - nxt_buf_t *b; - nxt_http_field_t *field; - nxt_app_request_header_t *h; - - static const nxt_str_t prefix = nxt_string("HTTP_"); - static const nxt_str_t eof = nxt_null_string; - - h = &r->header; - -#define RC(S) \ - do { \ - rc = (S); \ - if (nxt_slow_path(rc != NXT_OK)) { \ - goto fail; \ - } \ - } while(0) - -#define NXT_WRITE(N) \ - RC(nxt_app_msg_write_str(task, wmsg, N)) - - /* TODO error handle, async mmap buffer assignment */ - - NXT_WRITE(&h->method); - NXT_WRITE(&h->target); - if (h->path.start == h->target.start) { - NXT_WRITE(&eof); - } else { - NXT_WRITE(&h->path); - } - - if (h->query.start != NULL) { - RC(nxt_app_msg_write_size(task, wmsg, - h->query.start - h->target.start + 1)); - } else { - RC(nxt_app_msg_write_size(task, wmsg, 0)); - } - - NXT_WRITE(&h->version); - - NXT_WRITE(&r->remote); - - NXT_WRITE(&h->host); - NXT_WRITE(&h->content_type); - NXT_WRITE(&h->content_length); - - nxt_list_each(field, h->fields) { - RC(nxt_app_msg_write_prefixed_upcase(task, wmsg, - &prefix, &field->name)); - NXT_WRITE(&field->value); - - } nxt_list_loop; - - /* end-of-headers mark */ - NXT_WRITE(&eof); - - RC(nxt_app_msg_write_size(task, wmsg, r->body.preread_size)); - - for(b = r->body.buf; b != NULL; b = b->next) { - RC(nxt_app_msg_write_raw(task, wmsg, b->mem.pos, - nxt_buf_mem_used_size(&b->mem))); - } - -#undef NXT_WRITE -#undef RC - - return NXT_OK; - -fail: - - return NXT_ERROR; -} - - -static nxt_int_t nxt_python_run(nxt_task_t *task, nxt_app_rmsg_t *rmsg, nxt_app_wmsg_t *wmsg) { u_char *buf; diff --git a/src/nxt_router.c b/src/nxt_router.c index 5a053278..8ddddb46 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -134,6 +134,12 @@ static void nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c, nxt_app_parse_ctx_t *ap); static void nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra, nxt_port_t *port); +static nxt_int_t nxt_python_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, + nxt_app_wmsg_t *wmsg); +static nxt_int_t nxt_php_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, + nxt_app_wmsg_t *wmsg); +static nxt_int_t nxt_go_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, + nxt_app_wmsg_t *wmsg); static void nxt_router_conn_ready(nxt_task_t *task, void *obj, void *data); static void nxt_router_conn_close(nxt_task_t *task, void *obj, void *data); static void nxt_router_conn_free(nxt_task_t *task, void *obj, void *data); @@ -146,6 +152,14 @@ static void nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code, static nxt_router_t *nxt_router; + +static nxt_app_prepare_msg_t nxt_app_prepare_msg[] = { + nxt_python_prepare_msg, + nxt_php_prepare_msg, + nxt_go_prepare_msg, +}; + + nxt_int_t nxt_router_start(nxt_task_t *task, void *data) { @@ -654,6 +668,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, nxt_conf_value_t *applications, *application; nxt_conf_value_t *listeners, *listener; nxt_socket_conf_t *skcf; + nxt_app_lang_module_t *lang; nxt_router_app_conf_t apcf; nxt_router_listener_conf_t lscf; @@ -735,17 +750,27 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, nxt_debug(task, "application type: %V", &apcf.type); nxt_debug(task, "application workers: %D", apcf.workers); - type = nxt_app_parse_type(&apcf.type); + lang = nxt_app_lang_module(task->thread->runtime, &apcf.type); - if (type == NXT_APP_UNKNOWN) { + if (lang == NULL) { nxt_log(task, NXT_LOG_CRIT, "unknown application type: \"%V\"", &apcf.type); goto app_fail; } - if (nxt_app_modules[type] == NULL) { + nxt_debug(task, "application language module: \"%s\"", lang->file); + + type = nxt_app_parse_type(&lang->type); + + if (type == NXT_APP_UNKNOWN) { + nxt_log(task, NXT_LOG_CRIT, "unknown application type: \"%V\"", + &lang->type); + goto app_fail; + } + + if (nxt_app_prepare_msg[type] == NULL) { nxt_log(task, NXT_LOG_CRIT, "unsupported application type: \"%V\"", - &apcf.type); + &lang->type); goto app_fail; } @@ -763,7 +788,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, app->type = type; app->max_workers = apcf.workers; app->live = 1; - app->module = nxt_app_modules[type]; + app->prepare_msg = nxt_app_prepare_msg[type]; nxt_queue_insert_tail(&tmcf->apps, &app->link); } @@ -2687,7 +2712,7 @@ nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra, wmsg.buf = &wmsg.write; wmsg.stream = ra->req_id; - res = port->app->module->prepare_msg(task, &ap->r, &wmsg); + res = port->app->prepare_msg(task, &ap->r, &wmsg); if (nxt_slow_path(res != NXT_OK)) { nxt_router_gen_error(task, c, 500, @@ -2710,6 +2735,246 @@ nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra, } +static nxt_int_t +nxt_python_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, + nxt_app_wmsg_t *wmsg) +{ + nxt_int_t rc; + nxt_buf_t *b; + nxt_http_field_t *field; + nxt_app_request_header_t *h; + + static const nxt_str_t prefix = nxt_string("HTTP_"); + static const nxt_str_t eof = nxt_null_string; + + h = &r->header; + +#define RC(S) \ + do { \ + rc = (S); \ + if (nxt_slow_path(rc != NXT_OK)) { \ + goto fail; \ + } \ + } while(0) + +#define NXT_WRITE(N) \ + RC(nxt_app_msg_write_str(task, wmsg, N)) + + /* TODO error handle, async mmap buffer assignment */ + + NXT_WRITE(&h->method); + NXT_WRITE(&h->target); + if (h->path.start == h->target.start) { + NXT_WRITE(&eof); + } else { + NXT_WRITE(&h->path); + } + + if (h->query.start != NULL) { + RC(nxt_app_msg_write_size(task, wmsg, + h->query.start - h->target.start + 1)); + } else { + RC(nxt_app_msg_write_size(task, wmsg, 0)); + } + + NXT_WRITE(&h->version); + + NXT_WRITE(&r->remote); + + NXT_WRITE(&h->host); + NXT_WRITE(&h->content_type); + NXT_WRITE(&h->content_length); + + nxt_list_each(field, h->fields) { + RC(nxt_app_msg_write_prefixed_upcase(task, wmsg, + &prefix, &field->name)); + NXT_WRITE(&field->value); + + } nxt_list_loop; + + /* end-of-headers mark */ + NXT_WRITE(&eof); + + RC(nxt_app_msg_write_size(task, wmsg, r->body.preread_size)); + + for(b = r->body.buf; b != NULL; b = b->next) { + RC(nxt_app_msg_write_raw(task, wmsg, b->mem.pos, + nxt_buf_mem_used_size(&b->mem))); + } + +#undef NXT_WRITE +#undef RC + + return NXT_OK; + +fail: + + return NXT_ERROR; +} + + +static nxt_int_t +nxt_php_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, + nxt_app_wmsg_t *wmsg) +{ + nxt_int_t rc; + nxt_buf_t *b; + nxt_http_field_t *field; + nxt_app_request_header_t *h; + + static const nxt_str_t prefix = nxt_string("HTTP_"); + static const nxt_str_t eof = nxt_null_string; + + h = &r->header; + +#define RC(S) \ + do { \ + rc = (S); \ + if (nxt_slow_path(rc != NXT_OK)) { \ + goto fail; \ + } \ + } while(0) + +#define NXT_WRITE(N) \ + RC(nxt_app_msg_write_str(task, wmsg, N)) + + /* TODO error handle, async mmap buffer assignment */ + + NXT_WRITE(&h->method); + NXT_WRITE(&h->target); + if (h->path.start == h->target.start) { + NXT_WRITE(&eof); + } else { + NXT_WRITE(&h->path); + } + + if (h->query.start != NULL) { + RC(nxt_app_msg_write_size(task, wmsg, + h->query.start - h->target.start + 1)); + } else { + RC(nxt_app_msg_write_size(task, wmsg, 0)); + } + + NXT_WRITE(&h->version); + + // PHP_SELF + // SCRIPT_NAME + // SCRIPT_FILENAME + // DOCUMENT_ROOT + + NXT_WRITE(&r->remote); + + NXT_WRITE(&h->host); + NXT_WRITE(&h->cookie); + NXT_WRITE(&h->content_type); + NXT_WRITE(&h->content_length); + + RC(nxt_app_msg_write_size(task, wmsg, h->parsed_content_length)); + + nxt_list_each(field, h->fields) { + RC(nxt_app_msg_write_prefixed_upcase(task, wmsg, + &prefix, &field->name)); + NXT_WRITE(&field->value); + + } nxt_list_loop; + + /* end-of-headers mark */ + NXT_WRITE(&eof); + + RC(nxt_app_msg_write_size(task, wmsg, r->body.preread_size)); + + for(b = r->body.buf; b != NULL; b = b->next) { + RC(nxt_app_msg_write_raw(task, wmsg, b->mem.pos, + nxt_buf_mem_used_size(&b->mem))); + } + +#undef NXT_WRITE +#undef RC + + return NXT_OK; + +fail: + + return NXT_ERROR; +} + + +static nxt_int_t +nxt_go_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, nxt_app_wmsg_t *wmsg) +{ + nxt_int_t rc; + nxt_buf_t *b; + nxt_http_field_t *field; + nxt_app_request_header_t *h; + + static const nxt_str_t eof = nxt_null_string; + + h = &r->header; + +#define RC(S) \ + do { \ + rc = (S); \ + if (nxt_slow_path(rc != NXT_OK)) { \ + goto fail; \ + } \ + } while(0) + +#define NXT_WRITE(N) \ + RC(nxt_app_msg_write_str(task, wmsg, N)) + + /* TODO error handle, async mmap buffer assignment */ + + NXT_WRITE(&h->method); + NXT_WRITE(&h->target); + if (h->path.start == h->target.start) { + NXT_WRITE(&eof); + } else { + NXT_WRITE(&h->path); + } + + if (h->query.start != NULL) { + RC(nxt_app_msg_write_size(task, wmsg, + h->query.start - h->target.start + 1)); + } else { + RC(nxt_app_msg_write_size(task, wmsg, 0)); + } + + NXT_WRITE(&h->version); + + NXT_WRITE(&h->host); + NXT_WRITE(&h->cookie); + NXT_WRITE(&h->content_type); + NXT_WRITE(&h->content_length); + + RC(nxt_app_msg_write_size(task, wmsg, h->parsed_content_length)); + + nxt_list_each(field, h->fields) { + NXT_WRITE(&field->name); + NXT_WRITE(&field->value); + + } nxt_list_loop; + + /* end-of-headers mark */ + NXT_WRITE(&eof); + + RC(nxt_app_msg_write_size(task, wmsg, r->body.preread_size)); + + for(b = r->body.buf; b != NULL; b = b->next) { + RC(nxt_app_msg_write_raw(task, wmsg, b->mem.pos, + nxt_buf_mem_used_size(&b->mem))); + } + +#undef NXT_WRITE +#undef RC + + return NXT_OK; + +fail: + + return NXT_ERROR; +} + + static const nxt_conn_state_t nxt_router_conn_close_state nxt_aligned(64) = { diff --git a/src/nxt_router.h b/src/nxt_router.h index 2a8a30e1..6e44f1d8 100644 --- a/src/nxt_router.h +++ b/src/nxt_router.h @@ -66,6 +66,11 @@ typedef struct { } nxt_joint_job_t; + +typedef nxt_int_t (*nxt_app_prepare_msg_t)(nxt_task_t *task, + nxt_app_request_t *r, nxt_app_wmsg_t *wmsg); + + struct nxt_app_s { nxt_thread_mutex_t mutex; /* Protects ports queue. */ nxt_queue_t ports; /* of nxt_port_t.app_link */ @@ -83,7 +88,7 @@ struct nxt_app_s { nxt_queue_link_t link; nxt_str_t conf; - nxt_app_module_t *module; + nxt_app_prepare_msg_t prepare_msg; }; diff --git a/src/nxt_runtime.c b/src/nxt_runtime.c index 30474ce8..5e67ecfd 100644 --- a/src/nxt_runtime.c +++ b/src/nxt_runtime.c @@ -57,10 +57,11 @@ static nxt_process_t *nxt_runtime_process_remove_pid(nxt_runtime_t *rt, nxt_int_t nxt_runtime_create(nxt_task_t *task) { - nxt_mp_t *mp; - nxt_int_t ret; - nxt_array_t *listen_sockets; - nxt_runtime_t *rt; + nxt_mp_t *mp; + nxt_int_t ret; + nxt_array_t *listen_sockets; + nxt_runtime_t *rt; + nxt_app_lang_module_t *lang; mp = nxt_mp_create(1024, 128, 256, 32); @@ -90,6 +91,18 @@ nxt_runtime_create(nxt_task_t *task) goto fail; } + rt->languages = nxt_array_create(mp, 1, sizeof(nxt_app_lang_module_t)); + if (nxt_slow_path(rt->languages == NULL)) { + goto fail; + } + + /* Should not fail. */ + lang = nxt_array_add(rt->languages); + lang->type = (nxt_str_t) nxt_string("go"); + lang->version = (nxt_str_t) nxt_null_string; + lang->file = NULL; + lang->module = &nxt_go_module; + listen_sockets = nxt_array_create(mp, 1, sizeof(nxt_listen_socket_t)); if (nxt_slow_path(listen_sockets == NULL)) { goto fail; @@ -324,7 +337,6 @@ nxt_runtime_thread_pools(nxt_thread_t *thr, nxt_runtime_t *rt) static void nxt_runtime_start(nxt_task_t *task, void *obj, void *data) { - nxt_uint_t i; nxt_runtime_t *rt; rt = obj; @@ -338,12 +350,6 @@ nxt_runtime_start(nxt_task_t *task, void *obj, void *data) goto fail; } - for (i = 0; i < nxt_init_modules_n; i++) { - if (nxt_init_modules[i](task->thread, rt) != NXT_OK) { - goto fail; - } - } - if (nxt_runtime_log_files_create(task, rt) != NXT_OK) { goto fail; } diff --git a/src/nxt_runtime.h b/src/nxt_runtime.h index 7922c2ef..4961bfb8 100644 --- a/src/nxt_runtime.h +++ b/src/nxt_runtime.h @@ -19,6 +19,7 @@ struct nxt_runtime_s { nxt_array_t *listen_sockets; /* of nxt_listen_socket_t */ nxt_array_t *services; /* of nxt_service_t */ + nxt_array_t *languages; /* of nxt_app_lang_module_t */ void *data; nxt_runtime_cont_t start; diff --git a/src/nxt_string.c b/src/nxt_string.c index 90e5734d..4690851c 100644 --- a/src/nxt_string.c +++ b/src/nxt_string.c @@ -64,8 +64,7 @@ nxt_str_dup(nxt_mp_t *mp, nxt_str_t *dst, const nxt_str_t *src) * nxt_str_cstrz() creates a C style zero-terminated copy of a source * nxt_str_t. The function is intended to create strings suitable * for libc and kernel interfaces so result is pointer to char instead - * of u_char to minimize casts. The copy is aligned to 2 bytes thus - * the lowest bit may be used as marker. + * of u_char to minimize casts. */ char * @@ -73,7 +72,7 @@ nxt_str_cstrz(nxt_mp_t *mp, const nxt_str_t *src) { char *p, *dst; - dst = nxt_mp_align(mp, 2, src->length + 1); + dst = nxt_mp_alloc(mp, src->length + 1); if (nxt_fast_path(dst != NULL)) { p = nxt_cpymem(dst, src->start, src->length); diff --git a/src/nxt_string.h b/src/nxt_string.h index e5611e35..56dc18d7 100644 --- a/src/nxt_string.h +++ b/src/nxt_string.h @@ -158,7 +158,7 @@ nxt_str_eq(s, p, _length) \ #define \ nxt_str_start(s, p, _length) \ - (((s)->length > _length) && (nxt_memcmp((s)->start, p, _length) == 0)) + (((s)->length >= _length) && (nxt_memcmp((s)->start, p, _length) == 0)) #define \ diff --git a/src/nxt_worker_process.c b/src/nxt_worker_process.c index bd6674d5..59290d0b 100644 --- a/src/nxt_worker_process.c +++ b/src/nxt_worker_process.c @@ -32,6 +32,7 @@ nxt_port_handler_t nxt_controller_process_port_handlers[] = { NULL, /* NXT_PORT_MSG_READY */ NULL, /* NXT_PORT_MSG_START_WORKER */ NULL, /* NXT_PORT_MSG_SOCKET */ + NULL, /* NXT_PORT_MSG_MODULES */ nxt_port_rpc_handler, nxt_port_rpc_handler, }; @@ -47,6 +48,7 @@ nxt_port_handler_t nxt_worker_process_port_handlers[] = { NULL, /* NXT_PORT_MSG_READY */ NULL, /* NXT_PORT_MSG_START_WORKER */ NULL, /* NXT_PORT_MSG_SOCKET */ + NULL, /* NXT_PORT_MSG_MODULES */ nxt_port_rpc_handler, nxt_port_rpc_handler, }; @@ -62,6 +64,7 @@ nxt_port_handler_t nxt_app_process_port_handlers[] = { NULL, /* NXT_PORT_MSG_READY */ NULL, /* NXT_PORT_MSG_START_WORKER */ NULL, /* NXT_PORT_MSG_SOCKET */ + NULL, /* NXT_PORT_MSG_MODULES */ nxt_port_rpc_handler, nxt_port_rpc_handler, }; @@ -77,6 +80,23 @@ nxt_port_handler_t nxt_router_process_port_handlers[] = { NULL, /* NXT_PORT_MSG_READY */ NULL, /* NXT_PORT_MSG_START_WORKER */ NULL, /* NXT_PORT_MSG_SOCKET */ + NULL, /* NXT_PORT_MSG_MODULES */ + nxt_port_rpc_handler, + nxt_port_rpc_handler, +}; + + +nxt_port_handler_t nxt_discovery_process_port_handlers[] = { + nxt_worker_process_quit_handler, + nxt_port_new_port_handler, + nxt_port_change_log_file_handler, + nxt_port_mmap_handler, + nxt_port_data_handler, + nxt_port_remove_pid_handler, + NULL, /* NXT_PORT_MSG_READY */ + NULL, /* NXT_PORT_MSG_START_WORKER */ + NULL, /* NXT_PORT_MSG_SOCKET */ + NULL, /* NXT_PORT_MSG_MODULES */ nxt_port_rpc_handler, nxt_port_rpc_handler, }; |