diff options
author | Max Romanov <max.romanov@nginx.com> | 2018-08-06 17:27:33 +0300 |
---|---|---|
committer | Max Romanov <max.romanov@nginx.com> | 2018-08-06 17:27:33 +0300 |
commit | 1bb22d1e922c87d3c86c67bdce626767ee48fb5c (patch) | |
tree | 6e067a82b309c3a0d0f592f037f26d886a7f8c13 /src/nxt_application.c | |
parent | b6ce2da65c9c5229d744b2d964623b2d0f731ee9 (diff) | |
download | unit-1bb22d1e922c87d3c86c67bdce626767ee48fb5c.tar.gz unit-1bb22d1e922c87d3c86c67bdce626767ee48fb5c.tar.bz2 |
Unit application library.
Library now used in all language modules.
Old 'nxt_app_*' code removed.
See src/test/nxt_unit_app_test.c for usage sample.
Diffstat (limited to 'src/nxt_application.c')
-rw-r--r-- | src/nxt_application.c | 476 |
1 files changed, 49 insertions, 427 deletions
diff --git a/src/nxt_application.c b/src/nxt_application.c index b2b7f9c0..3c62f7d4 100644 --- a/src/nxt_application.c +++ b/src/nxt_application.c @@ -12,6 +12,7 @@ #include <nxt_router.h> #include <nxt_http.h> #include <nxt_application.h> +#include <nxt_unit.h> #include <nxt_port_memory_int.h> #include <glob.h> @@ -46,10 +47,7 @@ static uint32_t compat[] = { nxt_str_t nxt_server = nxt_string(NXT_SERVER); -static nxt_thread_mutex_t nxt_app_mutex; -static nxt_thread_cond_t nxt_app_cond; - -static nxt_application_module_t *nxt_app; +static nxt_app_module_t *nxt_app; nxt_int_t @@ -188,13 +186,13 @@ static nxt_int_t nxt_discovery_module(nxt_task_t *task, nxt_mp_t *mp, nxt_array_t *modules, const char *name) { - void *dl; - nxt_str_t version; - nxt_int_t ret; - nxt_uint_t i, n; - nxt_module_t *module; - nxt_app_type_t type; - nxt_application_module_t *app; + void *dl; + nxt_str_t version; + nxt_int_t ret; + nxt_uint_t i, n; + nxt_module_t *module; + nxt_app_type_t type; + nxt_app_module_t *app; /* * Only memory allocation failure should return NXT_ERROR. @@ -353,14 +351,6 @@ nxt_app_start(nxt_task_t *task, void *data) return NXT_ERROR; } - if (nxt_slow_path(nxt_thread_mutex_create(&nxt_app_mutex) != NXT_OK)) { - return NXT_ERROR; - } - - if (nxt_slow_path(nxt_thread_cond_create(&nxt_app_cond) != NXT_OK)) { - return NXT_ERROR; - } - ret = nxt_app->init(task, data); if (nxt_slow_path(ret != NXT_OK)) { @@ -430,328 +420,6 @@ nxt_app_set_environment(nxt_conf_value_t *environment) } -void -nxt_app_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) -{ - if (nxt_app->atexit != NULL) { - nxt_app->atexit(task); - } - - nxt_worker_process_quit_handler(task, msg); -} - - -void -nxt_app_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) -{ - size_t dump_size; - nxt_int_t res; - nxt_buf_t *b; - nxt_port_t *port; - nxt_app_rmsg_t rmsg = { msg->buf }; - nxt_app_wmsg_t wmsg; - - b = msg->buf; - dump_size = b->mem.free - b->mem.pos; - - if (dump_size > 300) { - dump_size = 300; - } - - nxt_debug(task, "app data: %*s ...", dump_size, b->mem.pos); - - port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid, - msg->port_msg.reply_port); - if (nxt_slow_path(port == NULL)) { - nxt_debug(task, "stream #%uD: reply port %d not found", - msg->port_msg.stream, msg->port_msg.reply_port); - return; - } - - wmsg.port = port; - wmsg.write = NULL; - wmsg.buf = &wmsg.write; - wmsg.stream = msg->port_msg.stream; - - res = nxt_app->run(task, &rmsg, &wmsg); - - if (nxt_slow_path(res != NXT_OK)) { - nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, -1, - msg->port_msg.stream, 0, NULL); - } -} - - -u_char * -nxt_app_msg_write_get_buf(nxt_task_t *task, nxt_app_wmsg_t *msg, size_t size) -{ - size_t free_size; - u_char *res; - nxt_buf_t *b; - - res = NULL; - - do { - b = *msg->buf; - - if (b == NULL) { - b = nxt_port_mmap_get_buf(task, msg->port, size); - if (nxt_slow_path(b == NULL)) { - return NULL; - } - - *msg->buf = b; - } - - free_size = nxt_buf_mem_free_size(&b->mem); - - if (free_size >= size) { - res = b->mem.free; - b->mem.free += size; - - return res; - } - - if (nxt_port_mmap_increase_buf(task, b, size, size) == NXT_OK) { - res = b->mem.free; - b->mem.free += size; - - return res; - } - - msg->buf = &b->next; - } while(1); -} - - -nxt_int_t -nxt_app_msg_write(nxt_task_t *task, nxt_app_wmsg_t *msg, u_char *c, size_t size) -{ - u_char *dst; - size_t dst_length; - - if (c != NULL) { - dst_length = size + (size < 128 ? 1 : 4) + 1; - - dst = nxt_app_msg_write_get_buf(task, msg, dst_length); - if (nxt_slow_path(dst == NULL)) { - nxt_debug(task, "nxt_app_msg_write: get_buf(%uz) failed", - dst_length); - return NXT_ERROR; - } - - dst = nxt_app_msg_write_length(dst, size + 1); /* +1 for trailing 0 */ - - nxt_memcpy(dst, c, size); - dst[size] = 0; - - nxt_debug(task, "nxt_app_msg_write: %uz %*s", size, size, c); - - } else { - dst_length = 1; - - dst = nxt_app_msg_write_get_buf(task, msg, dst_length); - if (nxt_slow_path(dst == NULL)) { - nxt_debug(task, "nxt_app_msg_write: get_buf(%uz) failed", - dst_length); - return NXT_ERROR; - } - - dst = nxt_app_msg_write_length(dst, 0); - - nxt_debug(task, "nxt_app_msg_write: NULL"); - } - - return NXT_OK; -} - - -nxt_int_t -nxt_app_msg_write_prefixed_upcase(nxt_task_t *task, nxt_app_wmsg_t *msg, - const nxt_str_t *prefix, u_char *c, size_t size) -{ - u_char *dst, *src; - size_t i, length, dst_length; - - length = prefix->length + size; - - dst_length = length + (length < 128 ? 1 : 4) + 1; - - dst = nxt_app_msg_write_get_buf(task, msg, dst_length); - if (nxt_slow_path(dst == NULL)) { - return NXT_ERROR; - } - - dst = nxt_app_msg_write_length(dst, length + 1); /* +1 for trailing 0 */ - - nxt_memcpy(dst, prefix->start, prefix->length); - dst += prefix->length; - - src = c; - for (i = 0; i < size; i++, dst++, src++) { - - if (*src >= 'a' && *src <= 'z') { - *dst = *src & ~0x20; - continue; - } - - if (*src == '-') { - *dst = '_'; - continue; - } - - *dst = *src; - } - - *dst = 0; - - return NXT_OK; -} - - -nxt_inline nxt_int_t -nxt_app_msg_read_size_(nxt_task_t *task, nxt_app_rmsg_t *msg, size_t *size) -{ - nxt_buf_t *buf; - - do { - buf = msg->buf; - - if (nxt_slow_path(buf == NULL)) { - return NXT_DONE; - } - - if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) < 1)) { - if (nxt_fast_path(nxt_buf_mem_used_size(&buf->mem) == 0)) { - msg->buf = buf->next; - continue; - } - return NXT_ERROR; - } - - if (buf->mem.pos[0] >= 128) { - if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) < 4)) { - return NXT_ERROR; - } - } - - break; - } while (1); - - buf->mem.pos = nxt_app_msg_read_length(buf->mem.pos, size); - - return NXT_OK; -} - - -nxt_int_t -nxt_app_msg_read_str(nxt_task_t *task, nxt_app_rmsg_t *msg, nxt_str_t *str) -{ - size_t length; - nxt_int_t ret; - nxt_buf_t *buf; - - ret = nxt_app_msg_read_size_(task, msg, &length); - if (ret != NXT_OK) { - return ret; - } - - buf = msg->buf; - - if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) < (intptr_t) length)) { - return NXT_ERROR; - } - - if (length > 0) { - str->start = buf->mem.pos; - str->length = length - 1; - - buf->mem.pos += length; - - nxt_debug(task, "nxt_read_str: %uz %*s", length - 1, - length - 1, str->start); - - } else { - str->start = NULL; - str->length = 0; - - nxt_debug(task, "nxt_read_str: NULL"); - } - - return NXT_OK; -} - - -size_t -nxt_app_msg_read_raw(nxt_task_t *task, nxt_app_rmsg_t *msg, void *dst, - size_t size) -{ - size_t res, read_size; - nxt_buf_t *buf; - - res = 0; - - while (size > 0) { - buf = msg->buf; - - if (nxt_slow_path(buf == NULL)) { - break; - } - - if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) == 0)) { - msg->buf = buf->next; - continue; - } - - read_size = nxt_buf_mem_used_size(&buf->mem); - read_size = nxt_min(read_size, size); - - dst = nxt_cpymem(dst, buf->mem.pos, read_size); - - size -= read_size; - buf->mem.pos += read_size; - res += read_size; - } - - nxt_debug(task, "nxt_read_raw: %uz", res); - - return res; -} - - -nxt_int_t -nxt_app_msg_read_nvp(nxt_task_t *task, nxt_app_rmsg_t *rmsg, nxt_str_t *n, - nxt_str_t *v) -{ - nxt_int_t rc; - - rc = nxt_app_msg_read_str(task, rmsg, n); - if (nxt_slow_path(rc != NXT_OK)) { - return rc; - } - - rc = nxt_app_msg_read_str(task, rmsg, v); - if (nxt_slow_path(rc != NXT_OK)) { - return rc; - } - - return rc; -} - - -nxt_int_t -nxt_app_msg_read_size(nxt_task_t *task, nxt_app_rmsg_t *msg, size_t *size) -{ - nxt_int_t ret; - - ret = nxt_app_msg_read_size_(task, msg, size); - - nxt_debug(task, "nxt_read_size: %d", (int) *size); - - return ret; -} - - nxt_int_t nxt_app_http_req_done(nxt_task_t *task, nxt_app_parse_ctx_t *ar) { @@ -778,92 +446,6 @@ nxt_app_http_release(nxt_task_t *task, void *obj, void *data) } -nxt_int_t -nxt_app_msg_flush(nxt_task_t *task, nxt_app_wmsg_t *msg, nxt_bool_t last) -{ - nxt_int_t rc; - nxt_buf_t *b; - - rc = NXT_OK; - - if (nxt_slow_path(last == 1)) { - do { - b = *msg->buf; - - if (b == NULL) { - b = nxt_buf_sync_alloc(msg->port->mem_pool, NXT_BUF_SYNC_LAST); - *msg->buf = b; - break; - } - - msg->buf = &b->next; - } while(1); - } - - if (nxt_slow_path(msg->write != NULL)) { - rc = nxt_port_socket_write(task, msg->port, NXT_PORT_MSG_DATA, - -1, msg->stream, 0, msg->write); - - msg->write = NULL; - msg->buf = &msg->write; - } - - return rc; -} - - -nxt_int_t -nxt_app_msg_write_raw(nxt_task_t *task, nxt_app_wmsg_t *msg, const u_char *c, - size_t size) -{ - size_t free_size, copy_size; - nxt_buf_t *b; - - nxt_debug(task, "nxt_app_msg_write_raw: %uz", size); - - while (size > 0) { - b = *msg->buf; - - if (b == NULL) { - free_size = nxt_min(size, PORT_MMAP_DATA_SIZE); - - b = nxt_port_mmap_get_buf(task, msg->port, free_size); - if (nxt_slow_path(b == NULL)) { - return NXT_ERROR; - } - - *msg->buf = b; - - } else { - free_size = nxt_buf_mem_free_size(&b->mem); - - if (free_size < size - && nxt_port_mmap_increase_buf(task, b, size, 1) == NXT_OK) - { - free_size = nxt_buf_mem_free_size(&b->mem); - } - } - - if (free_size > 0) { - copy_size = nxt_min(free_size, size); - - b->mem.free = nxt_cpymem(b->mem.free, c, copy_size); - - size -= copy_size; - c += copy_size; - - if (size == 0) { - return NXT_OK; - } - } - - msg->buf = &b->next; - } - - return NXT_OK; -} - - nxt_app_lang_module_t * nxt_app_lang_module(nxt_runtime_t *rt, nxt_str_t *name) { @@ -943,3 +525,43 @@ nxt_app_parse_type(u_char *p, size_t length) return NXT_APP_UNKNOWN; } + + +nxt_int_t +nxt_unit_default_init(nxt_task_t *task, nxt_unit_init_t *init) +{ + nxt_port_t *my_port, *main_port; + nxt_runtime_t *rt; + + nxt_memzero(init, sizeof(nxt_unit_init_t)); + + rt = task->thread->runtime; + + main_port = rt->port_by_type[NXT_PROCESS_MAIN]; + if (nxt_slow_path(main_port == NULL)) { + return NXT_ERROR; + } + + my_port = nxt_runtime_port_find(rt, nxt_pid, 0); + if (nxt_slow_path(my_port == NULL)) { + return NXT_ERROR; + } + + init->ready_port.id.pid = main_port->pid; + init->ready_port.id.id = main_port->id; + init->ready_port.out_fd = main_port->pair[1]; + + nxt_fd_blocking(task, main_port->pair[1]); + + init->ready_stream = my_port->process->init->stream; + + init->read_port.id.pid = my_port->pid; + init->read_port.id.id = my_port->id; + init->read_port.in_fd = my_port->pair[0]; + + nxt_fd_blocking(task, my_port->pair[0]); + + init->log_fd = 2; + + return NXT_OK; +} |