diff options
author | Max Romanov <max.romanov@nginx.com> | 2020-11-05 00:06:10 +0300 |
---|---|---|
committer | Max Romanov <max.romanov@nginx.com> | 2020-11-05 00:06:10 +0300 |
commit | 29db46c52ba0f05706d83ed75d88e4b57bac36e5 (patch) | |
tree | 81c75a66a09330b27664b07bf8856fbdb07995fa /src | |
parent | f27953af6103db390aa3eee012a254f130fdf5f4 (diff) | |
download | unit-29db46c52ba0f05706d83ed75d88e4b57bac36e5.tar.gz unit-29db46c52ba0f05706d83ed75d88e4b57bac36e5.tar.bz2 |
Java: request processing in multiple threads.
This closes #458 issue on GitHub.
Diffstat (limited to 'src')
-rw-r--r-- | src/nxt_application.h | 2 | ||||
-rw-r--r-- | src/nxt_conf_validation.c | 8 | ||||
-rw-r--r-- | src/nxt_java.c | 199 | ||||
-rw-r--r-- | src/nxt_main_process.c | 10 |
4 files changed, 205 insertions, 14 deletions
diff --git a/src/nxt_application.h b/src/nxt_application.h index 7f8ec1ba..08de0fce 100644 --- a/src/nxt_application.h +++ b/src/nxt_application.h @@ -77,6 +77,8 @@ typedef struct { char *webapp; nxt_conf_value_t *options; char *unit_jars; + uint32_t threads; + uint32_t thread_stack_size; } nxt_java_app_conf_t; diff --git a/src/nxt_conf_validation.c b/src/nxt_conf_validation.c index 97a13e38..1ce2d9ea 100644 --- a/src/nxt_conf_validation.c +++ b/src/nxt_conf_validation.c @@ -639,6 +639,14 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_java_members[] = { }, { .name = nxt_string("unit_jars"), .type = NXT_CONF_VLDT_STRING, + }, { + .name = nxt_string("threads"), + .type = NXT_CONF_VLDT_INTEGER, + .validator = nxt_conf_vldt_threads, + }, { + .name = nxt_string("thread_stack_size"), + .type = NXT_CONF_VLDT_INTEGER, + .validator = nxt_conf_vldt_thread_stack_size, }, NXT_CONF_VLDT_NEXT(nxt_conf_vldt_common_members) diff --git a/src/nxt_java.c b/src/nxt_java.c index 1f8864bd..ac715c0b 100644 --- a/src/nxt_java.c +++ b/src/nxt_java.c @@ -36,6 +36,11 @@ static nxt_int_t nxt_java_start(nxt_task_t *task, static void nxt_java_request_handler(nxt_unit_request_info_t *req); static void nxt_java_websocket_handler(nxt_unit_websocket_frame_t *ws); static void nxt_java_close_handler(nxt_unit_request_info_t *req); +static int nxt_java_ready_handler(nxt_unit_ctx_t *ctx); +static void *nxt_java_thread_func(void *main_ctx); +static int nxt_java_init_threads(nxt_java_app_conf_t *c); +static void nxt_java_join_threads(nxt_unit_ctx_t *ctx, + nxt_java_app_conf_t *c); static uint32_t compat[] = { NXT_VERNUM, NXT_DEBUG, @@ -43,6 +48,9 @@ static uint32_t compat[] = { char *nxt_java_modules; +static pthread_t *nxt_java_threads; +static pthread_attr_t *nxt_java_thread_attr; + #define NXT_STRING(x) _NXT_STRING(x) #define _NXT_STRING(x) #x @@ -59,8 +67,10 @@ NXT_EXPORT nxt_app_module_t nxt_app_module = { }; typedef struct { - JNIEnv *env; - jobject ctx; + JavaVM *jvm; + jobject cl; + jobject ctx; + nxt_java_app_conf_t *conf; } nxt_java_data_t; @@ -402,8 +412,10 @@ nxt_java_start(nxt_task_t *task, nxt_process_data_t *data) goto env_failed; } - java_data.env = env; + java_data.jvm = jvm; + java_data.cl = cl; java_data.ctx = nxt_java_startContext(env, c->webapp, classpath); + java_data.conf = c; if ((*env)->ExceptionCheck(env)) { nxt_alert(task, "Unhandled exception in application start"); @@ -411,13 +423,20 @@ nxt_java_start(nxt_task_t *task, nxt_process_data_t *data) return NXT_ERROR; } + rc = nxt_java_init_threads(c); + if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { + return NXT_ERROR; + } + nxt_unit_default_init(task, &java_init); java_init.callbacks.request_handler = nxt_java_request_handler; java_init.callbacks.websocket_handler = nxt_java_websocket_handler; java_init.callbacks.close_handler = nxt_java_close_handler; + java_init.callbacks.ready_handler = nxt_java_ready_handler; java_init.request_data_size = sizeof(nxt_java_request_data_t); java_init.data = &java_data; + java_init.ctx_data = env; java_init.shm_limit = app_conf->shm_limit; ctx = nxt_unit_init(&java_init); @@ -427,9 +446,8 @@ nxt_java_start(nxt_task_t *task, nxt_process_data_t *data) } rc = nxt_unit_run(ctx); - if (nxt_slow_path(rc != NXT_UNIT_OK)) { - /* TODO report error */ - } + + nxt_java_join_threads(ctx, c); nxt_java_stopContext(env, java_data.ctx); @@ -441,7 +459,7 @@ nxt_java_start(nxt_task_t *task, nxt_process_data_t *data) (*jvm)->DestroyJavaVM(jvm); - exit(0); + exit(rc); return NXT_OK; @@ -464,7 +482,7 @@ nxt_java_request_handler(nxt_unit_request_info_t *req) nxt_java_request_data_t *data; java_data = req->unit->data; - env = java_data->env; + env = req->ctx->data; data = req->data; jreq = nxt_java_newRequest(env, java_data->ctx, req); @@ -543,11 +561,9 @@ nxt_java_websocket_handler(nxt_unit_websocket_frame_t *ws) void *b; JNIEnv *env; jobject jbuf; - nxt_java_data_t *java_data; nxt_java_request_data_t *data; - java_data = ws->req->unit->data; - env = java_data->env; + env = ws->req->ctx->data; data = ws->req->data; b = malloc(ws->payload_len); @@ -578,11 +594,9 @@ static void nxt_java_close_handler(nxt_unit_request_info_t *req) { JNIEnv *env; - nxt_java_data_t *java_data; nxt_java_request_data_t *data; - java_data = req->unit->data; - env = java_data->env; + env = req->ctx->data; data = req->data; nxt_java_Request_close(env, data->jreq); @@ -593,3 +607,160 @@ nxt_java_close_handler(nxt_unit_request_info_t *req) nxt_unit_request_done(req, NXT_UNIT_OK); } + +static int +nxt_java_ready_handler(nxt_unit_ctx_t *ctx) +{ + int res; + uint32_t i; + nxt_java_data_t *java_data; + nxt_java_app_conf_t *c; + + /* Worker thread context. */ + if (!nxt_unit_is_main_ctx(ctx)) { + return NXT_UNIT_OK; + } + + java_data = ctx->unit->data; + c = java_data->conf; + + if (c->threads <= 1) { + return NXT_UNIT_OK; + } + + for (i = 0; i < c->threads - 1; i++) { + res = pthread_create(&nxt_java_threads[i], nxt_java_thread_attr, + nxt_java_thread_func, ctx); + + if (nxt_fast_path(res == 0)) { + nxt_unit_debug(ctx, "thread #%d created", (int) (i + 1)); + + } else { + nxt_unit_alert(ctx, "thread #%d create failed: %s (%d)", + (int) (i + 1), strerror(res), res); + + return NXT_UNIT_ERROR; + } + } + + return NXT_UNIT_OK; +} + + +static void * +nxt_java_thread_func(void *data) +{ + int rc; + JavaVM *jvm; + JNIEnv *env; + nxt_unit_ctx_t *main_ctx, *ctx; + nxt_java_data_t *java_data; + + main_ctx = data; + + nxt_unit_debug(main_ctx, "worker thread start"); + + java_data = main_ctx->unit->data; + jvm = java_data->jvm; + + rc = (*jvm)->AttachCurrentThread(jvm, (void **) &env, NULL); + if (rc != JNI_OK) { + nxt_unit_alert(main_ctx, "failed to attach Java VM: %d", (int) rc); + return NULL; + } + + nxt_java_setContextClassLoader(env, java_data->cl); + + ctx = nxt_unit_ctx_alloc(main_ctx, env); + if (nxt_slow_path(ctx == NULL)) { + goto fail; + } + + (void) nxt_unit_run(ctx); + + nxt_unit_done(ctx); + +fail: + + (*jvm)->DetachCurrentThread(jvm); + + nxt_unit_debug(NULL, "worker thread end"); + + return NULL; +} + + +static int +nxt_java_init_threads(nxt_java_app_conf_t *c) +{ + int res; + static pthread_attr_t attr; + + if (c->threads <= 1) { + return NXT_UNIT_OK; + } + + if (c->thread_stack_size > 0) { + res = pthread_attr_init(&attr); + if (nxt_slow_path(res != 0)) { + nxt_unit_alert(NULL, "thread attr init failed: %s (%d)", + strerror(res), res); + + return NXT_UNIT_ERROR; + } + + res = pthread_attr_setstacksize(&attr, c->thread_stack_size); + if (nxt_slow_path(res != 0)) { + nxt_unit_alert(NULL, "thread attr set stack size failed: %s (%d)", + strerror(res), res); + + return NXT_UNIT_ERROR; + } + + nxt_java_thread_attr = &attr; + } + + nxt_java_threads = nxt_unit_malloc(NULL, + sizeof(pthread_t) * (c->threads - 1)); + if (nxt_slow_path(nxt_java_threads == NULL)) { + nxt_unit_alert(NULL, "Failed to allocate thread id array"); + + return NXT_UNIT_ERROR; + } + + memset(nxt_java_threads, 0, sizeof(pthread_t) * (c->threads - 1)); + + return NXT_UNIT_OK; +} + + +static void +nxt_java_join_threads(nxt_unit_ctx_t *ctx, nxt_java_app_conf_t *c) +{ + int res; + uint32_t i; + + if (nxt_java_threads == NULL) { + return; + } + + for (i = 0; i < c->threads - 1; i++) { + if ((uintptr_t) nxt_java_threads[i] == 0) { + continue; + } + + res = pthread_join(nxt_java_threads[i], NULL); + + if (nxt_fast_path(res == 0)) { + nxt_unit_debug(ctx, "thread #%d joined", (int) i); + + } else { + nxt_unit_alert(ctx, "thread #%d join failed: %s (%d)", + (int) i, strerror(res), res); + } + } + + nxt_unit_free(ctx, nxt_java_threads); +} + + diff --git a/src/nxt_main_process.c b/src/nxt_main_process.c index 16c405ac..ce8d6916 100644 --- a/src/nxt_main_process.c +++ b/src/nxt_main_process.c @@ -266,6 +266,16 @@ static nxt_conf_map_t nxt_java_app_conf[] = { NXT_CONF_MAP_CSTRZ, offsetof(nxt_common_app_conf_t, u.java.unit_jars), }, + { + nxt_string("threads"), + NXT_CONF_MAP_INT32, + offsetof(nxt_common_app_conf_t, u.java.threads), + }, + { + nxt_string("thread_stack_size"), + NXT_CONF_MAP_INT32, + offsetof(nxt_common_app_conf_t, u.java.thread_stack_size), + }, }; |