summaryrefslogtreecommitdiffhomepage
path: root/src/python/nxt_python.c
diff options
context:
space:
mode:
authorMax Romanov <max.romanov@nginx.com>2020-11-05 00:04:59 +0300
committerMax Romanov <max.romanov@nginx.com>2020-11-05 00:04:59 +0300
commit8dcb0b9987033d0349a6ecf528014a9daa574787 (patch)
tree34a79dc0f21f6b3c76378343cc94682f25c6b417 /src/python/nxt_python.c
parent4225361f0ea7d230c80209d76fbc67a932651380 (diff)
downloadunit-8dcb0b9987033d0349a6ecf528014a9daa574787.tar.gz
unit-8dcb0b9987033d0349a6ecf528014a9daa574787.tar.bz2
Python: request processing in multiple threads.
This closes #459 issue on GitHub.
Diffstat (limited to 'src/python/nxt_python.c')
-rw-r--r--src/python/nxt_python.c268
1 files changed, 251 insertions, 17 deletions
diff --git a/src/python/nxt_python.c b/src/python/nxt_python.c
index 01534a47..26a6f093 100644
--- a/src/python/nxt_python.c
+++ b/src/python/nxt_python.c
@@ -15,8 +15,20 @@
#include NXT_PYTHON_MOUNTS_H
+typedef struct {
+ pthread_t thread;
+ nxt_unit_ctx_t *ctx;
+ void *ctx_data;
+} nxt_py_thread_info_t;
+
+
static nxt_int_t nxt_python_start(nxt_task_t *task,
nxt_process_data_t *data);
+static int nxt_python_init_threads(nxt_python_app_conf_t *c);
+static int nxt_python_ready_handler(nxt_unit_ctx_t *ctx);
+static void *nxt_python_thread_func(void *main_ctx);
+static void nxt_python_join_threads(nxt_unit_ctx_t *ctx,
+ nxt_python_app_conf_t *c);
static void nxt_python_atexit(void);
static uint32_t compat[] = {
@@ -44,11 +56,15 @@ static wchar_t *nxt_py_home;
static char *nxt_py_home;
#endif
+static pthread_attr_t *nxt_py_thread_attr;
+static nxt_py_thread_info_t *nxt_py_threads;
+static nxt_python_proto_t nxt_py_proto;
+
static nxt_int_t
nxt_python_start(nxt_task_t *task, nxt_process_data_t *data)
{
- int rc, asgi;
+ int rc;
char *nxt_py_module;
size_t len;
PyObject *obj, *pypath, *module;
@@ -124,9 +140,17 @@ nxt_python_start(nxt_task_t *task, nxt_process_data_t *data)
Py_InitializeEx(0);
+#if PY_VERSION_HEX < NXT_PYTHON_VER(3, 7)
+ if (c->threads > 1) {
+ PyEval_InitThreads();
+ }
+#endif
+
module = NULL;
obj = NULL;
+ python_init.ctx_data = NULL;
+
obj = PySys_GetObject((char *) "stderr");
if (nxt_slow_path(obj == NULL)) {
nxt_alert(task, "Python failed to get \"sys.stderr\" object");
@@ -216,35 +240,50 @@ nxt_python_start(nxt_task_t *task, nxt_process_data_t *data)
nxt_unit_default_init(task, &python_init);
+ python_init.data = c;
python_init.shm_limit = data->app->shm_limit;
+ python_init.callbacks.ready_handler = nxt_python_ready_handler;
- asgi = nxt_python_asgi_check(nxt_py_application);
-
- if (asgi) {
- rc = nxt_python_asgi_init(task, &python_init);
+ if (nxt_python_asgi_check(nxt_py_application)) {
+ rc = nxt_python_asgi_init(&python_init, &nxt_py_proto);
} else {
- rc = nxt_python_wsgi_init(task, &python_init);
+ rc = nxt_python_wsgi_init(&python_init, &nxt_py_proto);
}
- if (nxt_slow_path(rc == NXT_ERROR)) {
+ if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
goto fail;
}
+ rc = nxt_py_proto.ctx_data_alloc(&python_init.ctx_data);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ goto fail;
+ }
+
+ rc = nxt_python_init_threads(c);
+ if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
+ goto fail;
+ }
+
+ if (nxt_py_proto.startup != NULL) {
+ if (nxt_py_proto.startup(python_init.ctx_data) != NXT_UNIT_OK) {
+ goto fail;
+ }
+ }
+
unit_ctx = nxt_unit_init(&python_init);
if (nxt_slow_path(unit_ctx == NULL)) {
goto fail;
}
- if (asgi) {
- rc = nxt_python_asgi_run(unit_ctx);
+ rc = nxt_py_proto.run(unit_ctx);
- } else {
- rc = nxt_python_wsgi_run(unit_ctx);
- }
+ nxt_python_join_threads(unit_ctx, c);
nxt_unit_done(unit_ctx);
+ nxt_py_proto.ctx_data_free(python_init.ctx_data);
+
nxt_python_atexit();
exit(rc);
@@ -253,6 +292,12 @@ nxt_python_start(nxt_task_t *task, nxt_process_data_t *data)
fail:
+ nxt_python_join_threads(NULL, c);
+
+ if (python_init.ctx_data != NULL) {
+ nxt_py_proto.ctx_data_free(python_init.ctx_data);
+ }
+
Py_XDECREF(obj);
Py_XDECREF(module);
@@ -262,7 +307,195 @@ fail:
}
-nxt_int_t
+static int
+nxt_python_init_threads(nxt_python_app_conf_t *c)
+{
+ int res;
+ uint32_t i;
+ nxt_py_thread_info_t *ti;
+ static pthread_attr_t attr;
+
+ if (c->threads <= 1) {
+ return NXT_UNIT_OK;
+ }
+
+ if (c->thread_stack_size > 0) {
+ res = pthread_attr_init(&attr);
+ if (nxt_slow_path(res != 0)) {
+ nxt_unit_alert(NULL, "thread attr init failed: %s (%d)",
+ strerror(res), res);
+
+ return NXT_UNIT_ERROR;
+ }
+
+ res = pthread_attr_setstacksize(&attr, c->thread_stack_size);
+ if (nxt_slow_path(res != 0)) {
+ nxt_unit_alert(NULL, "thread attr set stack size failed: %s (%d)",
+ strerror(res), res);
+
+ return NXT_UNIT_ERROR;
+ }
+
+ nxt_py_thread_attr = &attr;
+ }
+
+ nxt_py_threads = nxt_unit_malloc(NULL, sizeof(nxt_py_thread_info_t)
+ * (c->threads - 1));
+ if (nxt_slow_path(nxt_py_threads == NULL)) {
+ nxt_unit_alert(NULL, "Failed to allocate thread info array");
+
+ return NXT_UNIT_ERROR;
+ }
+
+ memset(nxt_py_threads, 0, sizeof(nxt_py_thread_info_t) * (c->threads - 1));
+
+ for (i = 0; i < c->threads - 1; i++) {
+ ti = &nxt_py_threads[i];
+
+ res = nxt_py_proto.ctx_data_alloc(&ti->ctx_data);
+ if (nxt_slow_path(res != NXT_UNIT_OK)) {
+ return NXT_UNIT_ERROR;
+ }
+ }
+
+ return NXT_UNIT_OK;
+}
+
+
+static int
+nxt_python_ready_handler(nxt_unit_ctx_t *ctx)
+{
+ int res;
+ uint32_t i;
+ nxt_py_thread_info_t *ti;
+ nxt_python_app_conf_t *c;
+
+ if (nxt_py_proto.ready != NULL) {
+ res = nxt_py_proto.ready(ctx);
+ if (nxt_slow_path(res != NXT_UNIT_OK)) {
+ return NXT_UNIT_ERROR;
+ }
+ }
+
+ /* Worker thread context. */
+ if (!nxt_unit_is_main_ctx(ctx)) {
+ return NXT_UNIT_OK;
+ }
+
+ c = ctx->unit->data;
+
+ if (c->threads <= 1) {
+ return NXT_UNIT_OK;
+ }
+
+ for (i = 0; i < c->threads - 1; i++) {
+ ti = &nxt_py_threads[i];
+
+ ti->ctx = ctx;
+
+ res = pthread_create(&ti->thread, nxt_py_thread_attr,
+ nxt_python_thread_func, ti);
+
+ if (nxt_fast_path(res == 0)) {
+ nxt_unit_debug(ctx, "thread #%d created", (int) (i + 1));
+
+ } else {
+ nxt_unit_alert(ctx, "thread #%d create failed: %s (%d)",
+ (int) (i + 1), strerror(res), res);
+ }
+ }
+
+ return NXT_UNIT_OK;
+}
+
+
+static void *
+nxt_python_thread_func(void *data)
+{
+ nxt_unit_ctx_t *ctx;
+ PyGILState_STATE gstate;
+ nxt_py_thread_info_t *ti;
+
+ ti = data;
+
+ nxt_unit_debug(ti->ctx, "worker thread #%d start",
+ (int) (ti - nxt_py_threads + 1));
+
+ gstate = PyGILState_Ensure();
+
+ if (nxt_py_proto.startup != NULL) {
+ if (nxt_py_proto.startup(ti->ctx_data) != NXT_UNIT_OK) {
+ goto fail;
+ }
+ }
+
+ ctx = nxt_unit_ctx_alloc(ti->ctx, ti->ctx_data);
+ if (nxt_slow_path(ctx == NULL)) {
+ goto fail;
+ }
+
+ (void) nxt_py_proto.run(ctx);
+
+ nxt_unit_done(ctx);
+
+fail:
+
+ PyGILState_Release(gstate);
+
+ nxt_unit_debug(NULL, "worker thread #%d end",
+ (int) (ti - nxt_py_threads + 1));
+
+ return NULL;
+}
+
+
+static void
+nxt_python_join_threads(nxt_unit_ctx_t *ctx, nxt_python_app_conf_t *c)
+{
+ int res;
+ uint32_t i;
+ PyThreadState *thread_state;
+ nxt_py_thread_info_t *ti;
+
+ if (nxt_py_threads == NULL) {
+ return;
+ }
+
+ thread_state = PyEval_SaveThread();
+
+ for (i = 0; i < c->threads - 1; i++) {
+ ti = &nxt_py_threads[i];
+
+ if ((uintptr_t) ti->thread == 0) {
+ continue;
+ }
+
+ res = pthread_join(ti->thread, NULL);
+
+ if (nxt_fast_path(res == 0)) {
+ nxt_unit_debug(ctx, "thread #%d joined", (int) (i + 1));
+
+ } else {
+ nxt_unit_alert(ctx, "thread #%d join failed: %s (%d)",
+ (int) (i + 1), strerror(res), res);
+ }
+ }
+
+ PyEval_RestoreThread(thread_state);
+
+ for (i = 0; i < c->threads - 1; i++) {
+ ti = &nxt_py_threads[i];
+
+ if (ti->ctx_data != NULL) {
+ nxt_py_proto.ctx_data_free(ti->ctx_data);
+ }
+ }
+
+ nxt_unit_free(NULL, nxt_py_threads);
+}
+
+
+int
nxt_python_init_strings(nxt_python_string_t *pstr)
{
PyObject *obj;
@@ -271,7 +504,7 @@ nxt_python_init_strings(nxt_python_string_t *pstr)
obj = PyString_FromStringAndSize((char *) pstr->string.start,
pstr->string.length);
if (nxt_slow_path(obj == NULL)) {
- return NXT_ERROR;
+ return NXT_UNIT_ERROR;
}
PyUnicode_InternInPlace(&obj);
@@ -281,7 +514,7 @@ nxt_python_init_strings(nxt_python_string_t *pstr)
pstr++;
}
- return NXT_OK;
+ return NXT_UNIT_OK;
}
@@ -304,8 +537,9 @@ nxt_python_done_strings(nxt_python_string_t *pstr)
static void
nxt_python_atexit(void)
{
- nxt_python_wsgi_done();
- nxt_python_asgi_done();
+ if (nxt_py_proto.done != NULL) {
+ nxt_py_proto.done();
+ }
Py_XDECREF(nxt_py_stderr_flush);
Py_XDECREF(nxt_py_application);