diff options
author | Max Romanov <max.romanov@nginx.com> | 2020-11-05 00:06:10 +0300 |
---|---|---|
committer | Max Romanov <max.romanov@nginx.com> | 2020-11-05 00:06:10 +0300 |
commit | 29db46c52ba0f05706d83ed75d88e4b57bac36e5 (patch) | |
tree | 81c75a66a09330b27664b07bf8856fbdb07995fa /src/nxt_java.c | |
parent | f27953af6103db390aa3eee012a254f130fdf5f4 (diff) | |
download | unit-29db46c52ba0f05706d83ed75d88e4b57bac36e5.tar.gz unit-29db46c52ba0f05706d83ed75d88e4b57bac36e5.tar.bz2 |
Java: request processing in multiple threads.
This closes #458 issue on GitHub.
Diffstat (limited to '')
-rw-r--r-- | src/nxt_java.c | 199 |
1 files changed, 185 insertions, 14 deletions
diff --git a/src/nxt_java.c b/src/nxt_java.c index 1f8864bd..ac715c0b 100644 --- a/src/nxt_java.c +++ b/src/nxt_java.c @@ -36,6 +36,11 @@ static nxt_int_t nxt_java_start(nxt_task_t *task, static void nxt_java_request_handler(nxt_unit_request_info_t *req); static void nxt_java_websocket_handler(nxt_unit_websocket_frame_t *ws); static void nxt_java_close_handler(nxt_unit_request_info_t *req); +static int nxt_java_ready_handler(nxt_unit_ctx_t *ctx); +static void *nxt_java_thread_func(void *main_ctx); +static int nxt_java_init_threads(nxt_java_app_conf_t *c); +static void nxt_java_join_threads(nxt_unit_ctx_t *ctx, + nxt_java_app_conf_t *c); static uint32_t compat[] = { NXT_VERNUM, NXT_DEBUG, @@ -43,6 +48,9 @@ static uint32_t compat[] = { char *nxt_java_modules; +static pthread_t *nxt_java_threads; +static pthread_attr_t *nxt_java_thread_attr; + #define NXT_STRING(x) _NXT_STRING(x) #define _NXT_STRING(x) #x @@ -59,8 +67,10 @@ NXT_EXPORT nxt_app_module_t nxt_app_module = { }; typedef struct { - JNIEnv *env; - jobject ctx; + JavaVM *jvm; + jobject cl; + jobject ctx; + nxt_java_app_conf_t *conf; } nxt_java_data_t; @@ -402,8 +412,10 @@ nxt_java_start(nxt_task_t *task, nxt_process_data_t *data) goto env_failed; } - java_data.env = env; + java_data.jvm = jvm; + java_data.cl = cl; java_data.ctx = nxt_java_startContext(env, c->webapp, classpath); + java_data.conf = c; if ((*env)->ExceptionCheck(env)) { nxt_alert(task, "Unhandled exception in application start"); @@ -411,13 +423,20 @@ nxt_java_start(nxt_task_t *task, nxt_process_data_t *data) return NXT_ERROR; } + rc = nxt_java_init_threads(c); + if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { + return NXT_ERROR; + } + nxt_unit_default_init(task, &java_init); java_init.callbacks.request_handler = nxt_java_request_handler; java_init.callbacks.websocket_handler = nxt_java_websocket_handler; java_init.callbacks.close_handler = nxt_java_close_handler; + java_init.callbacks.ready_handler = nxt_java_ready_handler; java_init.request_data_size = sizeof(nxt_java_request_data_t); java_init.data = &java_data; + java_init.ctx_data = env; java_init.shm_limit = app_conf->shm_limit; ctx = nxt_unit_init(&java_init); @@ -427,9 +446,8 @@ nxt_java_start(nxt_task_t *task, nxt_process_data_t *data) } rc = nxt_unit_run(ctx); - if (nxt_slow_path(rc != NXT_UNIT_OK)) { - /* TODO report error */ - } + + nxt_java_join_threads(ctx, c); nxt_java_stopContext(env, java_data.ctx); @@ -441,7 +459,7 @@ nxt_java_start(nxt_task_t *task, nxt_process_data_t *data) (*jvm)->DestroyJavaVM(jvm); - exit(0); + exit(rc); return NXT_OK; @@ -464,7 +482,7 @@ nxt_java_request_handler(nxt_unit_request_info_t *req) nxt_java_request_data_t *data; java_data = req->unit->data; - env = java_data->env; + env = req->ctx->data; data = req->data; jreq = nxt_java_newRequest(env, java_data->ctx, req); @@ -543,11 +561,9 @@ nxt_java_websocket_handler(nxt_unit_websocket_frame_t *ws) void *b; JNIEnv *env; jobject jbuf; - nxt_java_data_t *java_data; nxt_java_request_data_t *data; - java_data = ws->req->unit->data; - env = java_data->env; + env = ws->req->ctx->data; data = ws->req->data; b = malloc(ws->payload_len); @@ -578,11 +594,9 @@ static void nxt_java_close_handler(nxt_unit_request_info_t *req) { JNIEnv *env; - nxt_java_data_t *java_data; nxt_java_request_data_t *data; - java_data = req->unit->data; - env = java_data->env; + env = req->ctx->data; data = req->data; nxt_java_Request_close(env, data->jreq); @@ -593,3 +607,160 @@ nxt_java_close_handler(nxt_unit_request_info_t *req) nxt_unit_request_done(req, NXT_UNIT_OK); } + +static int +nxt_java_ready_handler(nxt_unit_ctx_t *ctx) +{ + int res; + uint32_t i; + nxt_java_data_t *java_data; + nxt_java_app_conf_t *c; + + /* Worker thread context. */ + if (!nxt_unit_is_main_ctx(ctx)) { + return NXT_UNIT_OK; + } + + java_data = ctx->unit->data; + c = java_data->conf; + + if (c->threads <= 1) { + return NXT_UNIT_OK; + } + + for (i = 0; i < c->threads - 1; i++) { + res = pthread_create(&nxt_java_threads[i], nxt_java_thread_attr, + nxt_java_thread_func, ctx); + + if (nxt_fast_path(res == 0)) { + nxt_unit_debug(ctx, "thread #%d created", (int) (i + 1)); + + } else { + nxt_unit_alert(ctx, "thread #%d create failed: %s (%d)", + (int) (i + 1), strerror(res), res); + + return NXT_UNIT_ERROR; + } + } + + return NXT_UNIT_OK; +} + + +static void * +nxt_java_thread_func(void *data) +{ + int rc; + JavaVM *jvm; + JNIEnv *env; + nxt_unit_ctx_t *main_ctx, *ctx; + nxt_java_data_t *java_data; + + main_ctx = data; + + nxt_unit_debug(main_ctx, "worker thread start"); + + java_data = main_ctx->unit->data; + jvm = java_data->jvm; + + rc = (*jvm)->AttachCurrentThread(jvm, (void **) &env, NULL); + if (rc != JNI_OK) { + nxt_unit_alert(main_ctx, "failed to attach Java VM: %d", (int) rc); + return NULL; + } + + nxt_java_setContextClassLoader(env, java_data->cl); + + ctx = nxt_unit_ctx_alloc(main_ctx, env); + if (nxt_slow_path(ctx == NULL)) { + goto fail; + } + + (void) nxt_unit_run(ctx); + + nxt_unit_done(ctx); + +fail: + + (*jvm)->DetachCurrentThread(jvm); + + nxt_unit_debug(NULL, "worker thread end"); + + return NULL; +} + + +static int +nxt_java_init_threads(nxt_java_app_conf_t *c) +{ + int res; + static pthread_attr_t attr; + + if (c->threads <= 1) { + return NXT_UNIT_OK; + } + + if (c->thread_stack_size > 0) { + res = pthread_attr_init(&attr); + if (nxt_slow_path(res != 0)) { + nxt_unit_alert(NULL, "thread attr init failed: %s (%d)", + strerror(res), res); + + return NXT_UNIT_ERROR; + } + + res = pthread_attr_setstacksize(&attr, c->thread_stack_size); + if (nxt_slow_path(res != 0)) { + nxt_unit_alert(NULL, "thread attr set stack size failed: %s (%d)", + strerror(res), res); + + return NXT_UNIT_ERROR; + } + + nxt_java_thread_attr = &attr; + } + + nxt_java_threads = nxt_unit_malloc(NULL, + sizeof(pthread_t) * (c->threads - 1)); + if (nxt_slow_path(nxt_java_threads == NULL)) { + nxt_unit_alert(NULL, "Failed to allocate thread id array"); + + return NXT_UNIT_ERROR; + } + + memset(nxt_java_threads, 0, sizeof(pthread_t) * (c->threads - 1)); + + return NXT_UNIT_OK; +} + + +static void +nxt_java_join_threads(nxt_unit_ctx_t *ctx, nxt_java_app_conf_t *c) +{ + int res; + uint32_t i; + + if (nxt_java_threads == NULL) { + return; + } + + for (i = 0; i < c->threads - 1; i++) { + if ((uintptr_t) nxt_java_threads[i] == 0) { + continue; + } + + res = pthread_join(nxt_java_threads[i], NULL); + + if (nxt_fast_path(res == 0)) { + nxt_unit_debug(ctx, "thread #%d joined", (int) i); + + } else { + nxt_unit_alert(ctx, "thread #%d join failed: %s (%d)", + (int) i, strerror(res), res); + } + } + + nxt_unit_free(ctx, nxt_java_threads); +} + + |