diff options
author | Max Romanov <max.romanov@nginx.com> | 2020-11-05 00:04:59 +0300 |
---|---|---|
committer | Max Romanov <max.romanov@nginx.com> | 2020-11-05 00:04:59 +0300 |
commit | 8dcb0b9987033d0349a6ecf528014a9daa574787 (patch) | |
tree | 34a79dc0f21f6b3c76378343cc94682f25c6b417 /src/python/nxt_python.c | |
parent | 4225361f0ea7d230c80209d76fbc67a932651380 (diff) | |
download | unit-8dcb0b9987033d0349a6ecf528014a9daa574787.tar.gz unit-8dcb0b9987033d0349a6ecf528014a9daa574787.tar.bz2 |
Python: request processing in multiple threads.
This closes #459 issue on GitHub.
Diffstat (limited to 'src/python/nxt_python.c')
-rw-r--r-- | src/python/nxt_python.c | 268 |
1 files changed, 251 insertions, 17 deletions
diff --git a/src/python/nxt_python.c b/src/python/nxt_python.c index 01534a47..26a6f093 100644 --- a/src/python/nxt_python.c +++ b/src/python/nxt_python.c @@ -15,8 +15,20 @@ #include NXT_PYTHON_MOUNTS_H +typedef struct { + pthread_t thread; + nxt_unit_ctx_t *ctx; + void *ctx_data; +} nxt_py_thread_info_t; + + static nxt_int_t nxt_python_start(nxt_task_t *task, nxt_process_data_t *data); +static int nxt_python_init_threads(nxt_python_app_conf_t *c); +static int nxt_python_ready_handler(nxt_unit_ctx_t *ctx); +static void *nxt_python_thread_func(void *main_ctx); +static void nxt_python_join_threads(nxt_unit_ctx_t *ctx, + nxt_python_app_conf_t *c); static void nxt_python_atexit(void); static uint32_t compat[] = { @@ -44,11 +56,15 @@ static wchar_t *nxt_py_home; static char *nxt_py_home; #endif +static pthread_attr_t *nxt_py_thread_attr; +static nxt_py_thread_info_t *nxt_py_threads; +static nxt_python_proto_t nxt_py_proto; + static nxt_int_t nxt_python_start(nxt_task_t *task, nxt_process_data_t *data) { - int rc, asgi; + int rc; char *nxt_py_module; size_t len; PyObject *obj, *pypath, *module; @@ -124,9 +140,17 @@ nxt_python_start(nxt_task_t *task, nxt_process_data_t *data) Py_InitializeEx(0); +#if PY_VERSION_HEX < NXT_PYTHON_VER(3, 7) + if (c->threads > 1) { + PyEval_InitThreads(); + } +#endif + module = NULL; obj = NULL; + python_init.ctx_data = NULL; + obj = PySys_GetObject((char *) "stderr"); if (nxt_slow_path(obj == NULL)) { nxt_alert(task, "Python failed to get \"sys.stderr\" object"); @@ -216,35 +240,50 @@ nxt_python_start(nxt_task_t *task, nxt_process_data_t *data) nxt_unit_default_init(task, &python_init); + python_init.data = c; python_init.shm_limit = data->app->shm_limit; + python_init.callbacks.ready_handler = nxt_python_ready_handler; - asgi = nxt_python_asgi_check(nxt_py_application); - - if (asgi) { - rc = nxt_python_asgi_init(task, &python_init); + if (nxt_python_asgi_check(nxt_py_application)) { + rc = nxt_python_asgi_init(&python_init, &nxt_py_proto); } else { - rc = nxt_python_wsgi_init(task, &python_init); + rc = nxt_python_wsgi_init(&python_init, &nxt_py_proto); } - if (nxt_slow_path(rc == NXT_ERROR)) { + if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { goto fail; } + rc = nxt_py_proto.ctx_data_alloc(&python_init.ctx_data); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + goto fail; + } + + rc = nxt_python_init_threads(c); + if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { + goto fail; + } + + if (nxt_py_proto.startup != NULL) { + if (nxt_py_proto.startup(python_init.ctx_data) != NXT_UNIT_OK) { + goto fail; + } + } + unit_ctx = nxt_unit_init(&python_init); if (nxt_slow_path(unit_ctx == NULL)) { goto fail; } - if (asgi) { - rc = nxt_python_asgi_run(unit_ctx); + rc = nxt_py_proto.run(unit_ctx); - } else { - rc = nxt_python_wsgi_run(unit_ctx); - } + nxt_python_join_threads(unit_ctx, c); nxt_unit_done(unit_ctx); + nxt_py_proto.ctx_data_free(python_init.ctx_data); + nxt_python_atexit(); exit(rc); @@ -253,6 +292,12 @@ nxt_python_start(nxt_task_t *task, nxt_process_data_t *data) fail: + nxt_python_join_threads(NULL, c); + + if (python_init.ctx_data != NULL) { + nxt_py_proto.ctx_data_free(python_init.ctx_data); + } + Py_XDECREF(obj); Py_XDECREF(module); @@ -262,7 +307,195 @@ fail: } -nxt_int_t +static int +nxt_python_init_threads(nxt_python_app_conf_t *c) +{ + int res; + uint32_t i; + nxt_py_thread_info_t *ti; + static pthread_attr_t attr; + + if (c->threads <= 1) { + return NXT_UNIT_OK; + } + + if (c->thread_stack_size > 0) { + res = pthread_attr_init(&attr); + if (nxt_slow_path(res != 0)) { + nxt_unit_alert(NULL, "thread attr init failed: %s (%d)", + strerror(res), res); + + return NXT_UNIT_ERROR; + } + + res = pthread_attr_setstacksize(&attr, c->thread_stack_size); + if (nxt_slow_path(res != 0)) { + nxt_unit_alert(NULL, "thread attr set stack size failed: %s (%d)", + strerror(res), res); + + return NXT_UNIT_ERROR; + } + + nxt_py_thread_attr = &attr; + } + + nxt_py_threads = nxt_unit_malloc(NULL, sizeof(nxt_py_thread_info_t) + * (c->threads - 1)); + if (nxt_slow_path(nxt_py_threads == NULL)) { + nxt_unit_alert(NULL, "Failed to allocate thread info array"); + + return NXT_UNIT_ERROR; + } + + memset(nxt_py_threads, 0, sizeof(nxt_py_thread_info_t) * (c->threads - 1)); + + for (i = 0; i < c->threads - 1; i++) { + ti = &nxt_py_threads[i]; + + res = nxt_py_proto.ctx_data_alloc(&ti->ctx_data); + if (nxt_slow_path(res != NXT_UNIT_OK)) { + return NXT_UNIT_ERROR; + } + } + + return NXT_UNIT_OK; +} + + +static int +nxt_python_ready_handler(nxt_unit_ctx_t *ctx) +{ + int res; + uint32_t i; + nxt_py_thread_info_t *ti; + nxt_python_app_conf_t *c; + + if (nxt_py_proto.ready != NULL) { + res = nxt_py_proto.ready(ctx); + if (nxt_slow_path(res != NXT_UNIT_OK)) { + return NXT_UNIT_ERROR; + } + } + + /* Worker thread context. */ + if (!nxt_unit_is_main_ctx(ctx)) { + return NXT_UNIT_OK; + } + + c = ctx->unit->data; + + if (c->threads <= 1) { + return NXT_UNIT_OK; + } + + for (i = 0; i < c->threads - 1; i++) { + ti = &nxt_py_threads[i]; + + ti->ctx = ctx; + + res = pthread_create(&ti->thread, nxt_py_thread_attr, + nxt_python_thread_func, ti); + + if (nxt_fast_path(res == 0)) { + nxt_unit_debug(ctx, "thread #%d created", (int) (i + 1)); + + } else { + nxt_unit_alert(ctx, "thread #%d create failed: %s (%d)", + (int) (i + 1), strerror(res), res); + } + } + + return NXT_UNIT_OK; +} + + +static void * +nxt_python_thread_func(void *data) +{ + nxt_unit_ctx_t *ctx; + PyGILState_STATE gstate; + nxt_py_thread_info_t *ti; + + ti = data; + + nxt_unit_debug(ti->ctx, "worker thread #%d start", + (int) (ti - nxt_py_threads + 1)); + + gstate = PyGILState_Ensure(); + + if (nxt_py_proto.startup != NULL) { + if (nxt_py_proto.startup(ti->ctx_data) != NXT_UNIT_OK) { + goto fail; + } + } + + ctx = nxt_unit_ctx_alloc(ti->ctx, ti->ctx_data); + if (nxt_slow_path(ctx == NULL)) { + goto fail; + } + + (void) nxt_py_proto.run(ctx); + + nxt_unit_done(ctx); + +fail: + + PyGILState_Release(gstate); + + nxt_unit_debug(NULL, "worker thread #%d end", + (int) (ti - nxt_py_threads + 1)); + + return NULL; +} + + +static void +nxt_python_join_threads(nxt_unit_ctx_t *ctx, nxt_python_app_conf_t *c) +{ + int res; + uint32_t i; + PyThreadState *thread_state; + nxt_py_thread_info_t *ti; + + if (nxt_py_threads == NULL) { + return; + } + + thread_state = PyEval_SaveThread(); + + for (i = 0; i < c->threads - 1; i++) { + ti = &nxt_py_threads[i]; + + if ((uintptr_t) ti->thread == 0) { + continue; + } + + res = pthread_join(ti->thread, NULL); + + if (nxt_fast_path(res == 0)) { + nxt_unit_debug(ctx, "thread #%d joined", (int) (i + 1)); + + } else { + nxt_unit_alert(ctx, "thread #%d join failed: %s (%d)", + (int) (i + 1), strerror(res), res); + } + } + + PyEval_RestoreThread(thread_state); + + for (i = 0; i < c->threads - 1; i++) { + ti = &nxt_py_threads[i]; + + if (ti->ctx_data != NULL) { + nxt_py_proto.ctx_data_free(ti->ctx_data); + } + } + + nxt_unit_free(NULL, nxt_py_threads); +} + + +int nxt_python_init_strings(nxt_python_string_t *pstr) { PyObject *obj; @@ -271,7 +504,7 @@ nxt_python_init_strings(nxt_python_string_t *pstr) obj = PyString_FromStringAndSize((char *) pstr->string.start, pstr->string.length); if (nxt_slow_path(obj == NULL)) { - return NXT_ERROR; + return NXT_UNIT_ERROR; } PyUnicode_InternInPlace(&obj); @@ -281,7 +514,7 @@ nxt_python_init_strings(nxt_python_string_t *pstr) pstr++; } - return NXT_OK; + return NXT_UNIT_OK; } @@ -304,8 +537,9 @@ nxt_python_done_strings(nxt_python_string_t *pstr) static void nxt_python_atexit(void) { - nxt_python_wsgi_done(); - nxt_python_asgi_done(); + if (nxt_py_proto.done != NULL) { + nxt_py_proto.done(); + } Py_XDECREF(nxt_py_stderr_flush); Py_XDECREF(nxt_py_application); |