diff options
author | Max Romanov <max.romanov@nginx.com> | 2020-11-05 00:04:59 +0300 |
---|---|---|
committer | Max Romanov <max.romanov@nginx.com> | 2020-11-05 00:04:59 +0300 |
commit | 8dcb0b9987033d0349a6ecf528014a9daa574787 (patch) | |
tree | 34a79dc0f21f6b3c76378343cc94682f25c6b417 /src | |
parent | 4225361f0ea7d230c80209d76fbc67a932651380 (diff) | |
download | unit-8dcb0b9987033d0349a6ecf528014a9daa574787.tar.gz unit-8dcb0b9987033d0349a6ecf528014a9daa574787.tar.bz2 |
Python: request processing in multiple threads.
This closes #459 issue on GitHub.
Diffstat (limited to '')
-rw-r--r-- | src/nxt_application.h | 2 | ||||
-rw-r--r-- | src/nxt_conf_validation.c | 58 | ||||
-rw-r--r-- | src/nxt_main_process.c | 12 | ||||
-rw-r--r-- | src/python/nxt_python.c | 268 | ||||
-rw-r--r-- | src/python/nxt_python.h | 19 | ||||
-rw-r--r-- | src/python/nxt_python_asgi.c | 490 | ||||
-rw-r--r-- | src/python/nxt_python_asgi.h | 33 | ||||
-rw-r--r-- | src/python/nxt_python_asgi_http.c | 36 | ||||
-rw-r--r-- | src/python/nxt_python_asgi_lifespan.c | 102 | ||||
-rw-r--r-- | src/python/nxt_python_asgi_str.c | 2 | ||||
-rw-r--r-- | src/python/nxt_python_asgi_str.h | 2 | ||||
-rw-r--r-- | src/python/nxt_python_asgi_websocket.c | 21 | ||||
-rw-r--r-- | src/python/nxt_python_wsgi.c | 444 |
13 files changed, 992 insertions, 497 deletions
diff --git a/src/nxt_application.h b/src/nxt_application.h index cb49a033..7f8ec1ba 100644 --- a/src/nxt_application.h +++ b/src/nxt_application.h @@ -51,6 +51,8 @@ typedef struct { nxt_str_t path; nxt_str_t module; char *callable; + uint32_t threads; + uint32_t thread_stack_size; } nxt_python_app_conf_t; diff --git a/src/nxt_conf_validation.c b/src/nxt_conf_validation.c index b44509e3..97a13e38 100644 --- a/src/nxt_conf_validation.c +++ b/src/nxt_conf_validation.c @@ -95,6 +95,10 @@ static nxt_int_t nxt_conf_vldt_return(nxt_conf_validation_t *vldt, nxt_conf_value_t *value, void *data); static nxt_int_t nxt_conf_vldt_proxy(nxt_conf_validation_t *vldt, nxt_conf_value_t *value, void *data); +static nxt_int_t nxt_conf_vldt_threads(nxt_conf_validation_t *vldt, + nxt_conf_value_t *value, void *data); +static nxt_int_t nxt_conf_vldt_thread_stack_size(nxt_conf_validation_t *vldt, + nxt_conf_value_t *value, void *data); static nxt_int_t nxt_conf_vldt_routes(nxt_conf_validation_t *vldt, nxt_conf_value_t *value, void *data); static nxt_int_t nxt_conf_vldt_routes_member(nxt_conf_validation_t *vldt, @@ -489,6 +493,14 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_python_members[] = { }, { .name = nxt_string("callable"), .type = NXT_CONF_VLDT_STRING, + }, { + .name = nxt_string("threads"), + .type = NXT_CONF_VLDT_INTEGER, + .validator = nxt_conf_vldt_threads, + }, { + .name = nxt_string("thread_stack_size"), + .type = NXT_CONF_VLDT_INTEGER, + .validator = nxt_conf_vldt_thread_stack_size, }, NXT_CONF_VLDT_NEXT(nxt_conf_vldt_common_members) @@ -1329,6 +1341,52 @@ nxt_conf_vldt_proxy(nxt_conf_validation_t *vldt, nxt_conf_value_t *value, static nxt_int_t +nxt_conf_vldt_threads(nxt_conf_validation_t *vldt, nxt_conf_value_t *value, + void *data) +{ + int64_t threads; + + threads = nxt_conf_get_number(value); + + if (threads < 1) { + return nxt_conf_vldt_error(vldt, "The \"threads\" number must be " + "equal to or greater than 1."); + } + + if (threads > NXT_INT32_T_MAX) { + return nxt_conf_vldt_error(vldt, "The \"threads\" number must " + "not exceed %d.", NXT_INT32_T_MAX); + } + + return NXT_OK; +} + + +static nxt_int_t +nxt_conf_vldt_thread_stack_size(nxt_conf_validation_t *vldt, + nxt_conf_value_t *value, void *data) +{ + int64_t size; + + size = nxt_conf_get_number(value); + + if (size < PTHREAD_STACK_MIN) { + return nxt_conf_vldt_error(vldt, "The \"thread_stack_size\" number " + "must be equal to or greater than %d.", + PTHREAD_STACK_MIN); + } + + if ((size % nxt_pagesize) != 0) { + return nxt_conf_vldt_error(vldt, "The \"thread_stack_size\" number " + "must be a multiple of the system page size (%d).", + nxt_pagesize); + } + + return NXT_OK; +} + + +static nxt_int_t nxt_conf_vldt_routes(nxt_conf_validation_t *vldt, nxt_conf_value_t *value, void *data) { diff --git a/src/nxt_main_process.c b/src/nxt_main_process.c index e4ec8b57..16c405ac 100644 --- a/src/nxt_main_process.c +++ b/src/nxt_main_process.c @@ -197,6 +197,18 @@ static nxt_conf_map_t nxt_python_app_conf[] = { NXT_CONF_MAP_CSTRZ, offsetof(nxt_common_app_conf_t, u.python.callable), }, + + { + nxt_string("threads"), + NXT_CONF_MAP_INT32, + offsetof(nxt_common_app_conf_t, u.python.threads), + }, + + { + nxt_string("thread_stack_size"), + NXT_CONF_MAP_INT32, + offsetof(nxt_common_app_conf_t, u.python.thread_stack_size), + }, }; diff --git a/src/python/nxt_python.c b/src/python/nxt_python.c index 01534a47..26a6f093 100644 --- a/src/python/nxt_python.c +++ b/src/python/nxt_python.c @@ -15,8 +15,20 @@ #include NXT_PYTHON_MOUNTS_H +typedef struct { + pthread_t thread; + nxt_unit_ctx_t *ctx; + void *ctx_data; +} nxt_py_thread_info_t; + + static nxt_int_t nxt_python_start(nxt_task_t *task, nxt_process_data_t *data); +static int nxt_python_init_threads(nxt_python_app_conf_t *c); +static int nxt_python_ready_handler(nxt_unit_ctx_t *ctx); +static void *nxt_python_thread_func(void *main_ctx); +static void nxt_python_join_threads(nxt_unit_ctx_t *ctx, + nxt_python_app_conf_t *c); static void nxt_python_atexit(void); static uint32_t compat[] = { @@ -44,11 +56,15 @@ static wchar_t *nxt_py_home; static char *nxt_py_home; #endif +static pthread_attr_t *nxt_py_thread_attr; +static nxt_py_thread_info_t *nxt_py_threads; +static nxt_python_proto_t nxt_py_proto; + static nxt_int_t nxt_python_start(nxt_task_t *task, nxt_process_data_t *data) { - int rc, asgi; + int rc; char *nxt_py_module; size_t len; PyObject *obj, *pypath, *module; @@ -124,9 +140,17 @@ nxt_python_start(nxt_task_t *task, nxt_process_data_t *data) Py_InitializeEx(0); +#if PY_VERSION_HEX < NXT_PYTHON_VER(3, 7) + if (c->threads > 1) { + PyEval_InitThreads(); + } +#endif + module = NULL; obj = NULL; + python_init.ctx_data = NULL; + obj = PySys_GetObject((char *) "stderr"); if (nxt_slow_path(obj == NULL)) { nxt_alert(task, "Python failed to get \"sys.stderr\" object"); @@ -216,35 +240,50 @@ nxt_python_start(nxt_task_t *task, nxt_process_data_t *data) nxt_unit_default_init(task, &python_init); + python_init.data = c; python_init.shm_limit = data->app->shm_limit; + python_init.callbacks.ready_handler = nxt_python_ready_handler; - asgi = nxt_python_asgi_check(nxt_py_application); - - if (asgi) { - rc = nxt_python_asgi_init(task, &python_init); + if (nxt_python_asgi_check(nxt_py_application)) { + rc = nxt_python_asgi_init(&python_init, &nxt_py_proto); } else { - rc = nxt_python_wsgi_init(task, &python_init); + rc = nxt_python_wsgi_init(&python_init, &nxt_py_proto); } - if (nxt_slow_path(rc == NXT_ERROR)) { + if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { goto fail; } + rc = nxt_py_proto.ctx_data_alloc(&python_init.ctx_data); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + goto fail; + } + + rc = nxt_python_init_threads(c); + if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { + goto fail; + } + + if (nxt_py_proto.startup != NULL) { + if (nxt_py_proto.startup(python_init.ctx_data) != NXT_UNIT_OK) { + goto fail; + } + } + unit_ctx = nxt_unit_init(&python_init); if (nxt_slow_path(unit_ctx == NULL)) { goto fail; } - if (asgi) { - rc = nxt_python_asgi_run(unit_ctx); + rc = nxt_py_proto.run(unit_ctx); - } else { - rc = nxt_python_wsgi_run(unit_ctx); - } + nxt_python_join_threads(unit_ctx, c); nxt_unit_done(unit_ctx); + nxt_py_proto.ctx_data_free(python_init.ctx_data); + nxt_python_atexit(); exit(rc); @@ -253,6 +292,12 @@ nxt_python_start(nxt_task_t *task, nxt_process_data_t *data) fail: + nxt_python_join_threads(NULL, c); + + if (python_init.ctx_data != NULL) { + nxt_py_proto.ctx_data_free(python_init.ctx_data); + } + Py_XDECREF(obj); Py_XDECREF(module); @@ -262,7 +307,195 @@ fail: } -nxt_int_t +static int +nxt_python_init_threads(nxt_python_app_conf_t *c) +{ + int res; + uint32_t i; + nxt_py_thread_info_t *ti; + static pthread_attr_t attr; + + if (c->threads <= 1) { + return NXT_UNIT_OK; + } + + if (c->thread_stack_size > 0) { + res = pthread_attr_init(&attr); + if (nxt_slow_path(res != 0)) { + nxt_unit_alert(NULL, "thread attr init failed: %s (%d)", + strerror(res), res); + + return NXT_UNIT_ERROR; + } + + res = pthread_attr_setstacksize(&attr, c->thread_stack_size); + if (nxt_slow_path(res != 0)) { + nxt_unit_alert(NULL, "thread attr set stack size failed: %s (%d)", + strerror(res), res); + + return NXT_UNIT_ERROR; + } + + nxt_py_thread_attr = &attr; + } + + nxt_py_threads = nxt_unit_malloc(NULL, sizeof(nxt_py_thread_info_t) + * (c->threads - 1)); + if (nxt_slow_path(nxt_py_threads == NULL)) { + nxt_unit_alert(NULL, "Failed to allocate thread info array"); + + return NXT_UNIT_ERROR; + } + + memset(nxt_py_threads, 0, sizeof(nxt_py_thread_info_t) * (c->threads - 1)); + + for (i = 0; i < c->threads - 1; i++) { + ti = &nxt_py_threads[i]; + + res = nxt_py_proto.ctx_data_alloc(&ti->ctx_data); + if (nxt_slow_path(res != NXT_UNIT_OK)) { + return NXT_UNIT_ERROR; + } + } + + return NXT_UNIT_OK; +} + + +static int +nxt_python_ready_handler(nxt_unit_ctx_t *ctx) +{ + int res; + uint32_t i; + nxt_py_thread_info_t *ti; + nxt_python_app_conf_t *c; + + if (nxt_py_proto.ready != NULL) { + res = nxt_py_proto.ready(ctx); + if (nxt_slow_path(res != NXT_UNIT_OK)) { + return NXT_UNIT_ERROR; + } + } + + /* Worker thread context. */ + if (!nxt_unit_is_main_ctx(ctx)) { + return NXT_UNIT_OK; + } + + c = ctx->unit->data; + + if (c->threads <= 1) { + return NXT_UNIT_OK; + } + + for (i = 0; i < c->threads - 1; i++) { + ti = &nxt_py_threads[i]; + + ti->ctx = ctx; + + res = pthread_create(&ti->thread, nxt_py_thread_attr, + nxt_python_thread_func, ti); + + if (nxt_fast_path(res == 0)) { + nxt_unit_debug(ctx, "thread #%d created", (int) (i + 1)); + + } else { + nxt_unit_alert(ctx, "thread #%d create failed: %s (%d)", + (int) (i + 1), strerror(res), res); + } + } + + return NXT_UNIT_OK; +} + + +static void * +nxt_python_thread_func(void *data) +{ + nxt_unit_ctx_t *ctx; + PyGILState_STATE gstate; + nxt_py_thread_info_t *ti; + + ti = data; + + nxt_unit_debug(ti->ctx, "worker thread #%d start", + (int) (ti - nxt_py_threads + 1)); + + gstate = PyGILState_Ensure(); + + if (nxt_py_proto.startup != NULL) { + if (nxt_py_proto.startup(ti->ctx_data) != NXT_UNIT_OK) { + goto fail; + } + } + + ctx = nxt_unit_ctx_alloc(ti->ctx, ti->ctx_data); + if (nxt_slow_path(ctx == NULL)) { + goto fail; + } + + (void) nxt_py_proto.run(ctx); + + nxt_unit_done(ctx); + +fail: + + PyGILState_Release(gstate); + + nxt_unit_debug(NULL, "worker thread #%d end", + (int) (ti - nxt_py_threads + 1)); + + return NULL; +} + + +static void +nxt_python_join_threads(nxt_unit_ctx_t *ctx, nxt_python_app_conf_t *c) +{ + int res; + uint32_t i; + PyThreadState *thread_state; + nxt_py_thread_info_t *ti; + + if (nxt_py_threads == NULL) { + return; + } + + thread_state = PyEval_SaveThread(); + + for (i = 0; i < c->threads - 1; i++) { + ti = &nxt_py_threads[i]; + + if ((uintptr_t) ti->thread == 0) { + continue; + } + + res = pthread_join(ti->thread, NULL); + + if (nxt_fast_path(res == 0)) { + nxt_unit_debug(ctx, "thread #%d joined", (int) (i + 1)); + + } else { + nxt_unit_alert(ctx, "thread #%d join failed: %s (%d)", + (int) (i + 1), strerror(res), res); + } + } + + PyEval_RestoreThread(thread_state); + + for (i = 0; i < c->threads - 1; i++) { + ti = &nxt_py_threads[i]; + + if (ti->ctx_data != NULL) { + nxt_py_proto.ctx_data_free(ti->ctx_data); + } + } + + nxt_unit_free(NULL, nxt_py_threads); +} + + +int nxt_python_init_strings(nxt_python_string_t *pstr) { PyObject *obj; @@ -271,7 +504,7 @@ nxt_python_init_strings(nxt_python_string_t *pstr) obj = PyString_FromStringAndSize((char *) pstr->string.start, pstr->string.length); if (nxt_slow_path(obj == NULL)) { - return NXT_ERROR; + return NXT_UNIT_ERROR; } PyUnicode_InternInPlace(&obj); @@ -281,7 +514,7 @@ nxt_python_init_strings(nxt_python_string_t *pstr) pstr++; } - return NXT_OK; + return NXT_UNIT_OK; } @@ -304,8 +537,9 @@ nxt_python_done_strings(nxt_python_string_t *pstr) static void nxt_python_atexit(void) { - nxt_python_wsgi_done(); - nxt_python_asgi_done(); + if (nxt_py_proto.done != NULL) { + nxt_py_proto.done(); + } Py_XDECREF(nxt_py_stderr_flush); Py_XDECREF(nxt_py_application); diff --git a/src/python/nxt_python.h b/src/python/nxt_python.h index 6d7eba8e..b581dd46 100644 --- a/src/python/nxt_python.h +++ b/src/python/nxt_python.h @@ -44,20 +44,25 @@ typedef struct { PyObject **object_p; } nxt_python_string_t; +typedef struct { + int (*ctx_data_alloc)(void **pdata); + void (*ctx_data_free)(void *data); + int (*startup)(void *data); + int (*run)(nxt_unit_ctx_t *ctx); + int (*ready)(nxt_unit_ctx_t *ctx); + void (*done)(void); +} nxt_python_proto_t; + -nxt_int_t nxt_python_init_strings(nxt_python_string_t *pstr); +int nxt_python_init_strings(nxt_python_string_t *pstr); void nxt_python_done_strings(nxt_python_string_t *pstr); void nxt_python_print_exception(void); -nxt_int_t nxt_python_wsgi_init(nxt_task_t *task, nxt_unit_init_t *init); -int nxt_python_wsgi_run(nxt_unit_ctx_t *ctx); -void nxt_python_wsgi_done(void); +int nxt_python_wsgi_init(nxt_unit_init_t *init, nxt_python_proto_t *proto); int nxt_python_asgi_check(PyObject *obj); -nxt_int_t nxt_python_asgi_init(nxt_task_t *task, nxt_unit_init_t *init); -nxt_int_t nxt_python_asgi_run(nxt_unit_ctx_t *ctx); -void nxt_python_asgi_done(void); +int nxt_python_asgi_init(nxt_unit_init_t *init, nxt_python_proto_t *proto); #endif /* _NXT_PYTHON_H_INCLUDED_ */ diff --git a/src/python/nxt_python_asgi.c b/src/python/nxt_python_asgi.c index 72408ea1..a188ff56 100644 --- a/src/python/nxt_python_asgi.c +++ b/src/python/nxt_python_asgi.c @@ -16,6 +16,13 @@ #include <python/nxt_python_asgi_str.h> +static int nxt_python_asgi_ctx_data_alloc(void **pdata); +static void nxt_python_asgi_ctx_data_free(void *data); +static int nxt_python_asgi_startup(void *data); +static int nxt_python_asgi_run(nxt_unit_ctx_t *ctx); + +static void nxt_py_asgi_remove_reader(nxt_unit_ctx_t *ctx, + nxt_unit_port_t *port); static void nxt_py_asgi_request_handler(nxt_unit_request_info_t *req); static PyObject *nxt_py_asgi_create_http_scope(nxt_unit_request_info_t *req); @@ -24,30 +31,32 @@ static PyObject *nxt_py_asgi_create_address(nxt_unit_sptr_t *sptr, uint8_t len, static PyObject *nxt_py_asgi_create_header(nxt_unit_field_t *f); static PyObject *nxt_py_asgi_create_subprotocols(nxt_unit_field_t *f); +static int nxt_python_asgi_ready(nxt_unit_ctx_t *ctx); + static int nxt_py_asgi_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port); static void nxt_py_asgi_remove_port(nxt_unit_t *lib, nxt_unit_port_t *port); static void nxt_py_asgi_quit(nxt_unit_ctx_t *ctx); static void nxt_py_asgi_shm_ack_handler(nxt_unit_ctx_t *ctx); static PyObject *nxt_py_asgi_port_read(PyObject *self, PyObject *args); +static void nxt_python_asgi_done(void); -PyObject *nxt_py_loop_run_until_complete; -PyObject *nxt_py_loop_create_future; -PyObject *nxt_py_loop_create_task; - -nxt_queue_t nxt_py_asgi_drain_queue; - -static PyObject *nxt_py_loop_call_soon; -static PyObject *nxt_py_quit_future; -static PyObject *nxt_py_quit_future_set_result; -static PyObject *nxt_py_loop_add_reader; -static PyObject *nxt_py_loop_remove_reader; -static PyObject *nxt_py_port_read; +static PyObject *nxt_py_port_read; +static nxt_unit_port_t *nxt_py_shared_port; static PyMethodDef nxt_py_port_read_method = {"unit_port_read", nxt_py_asgi_port_read, METH_VARARGS, ""}; +static nxt_python_proto_t nxt_py_asgi_proto = { + .ctx_data_alloc = nxt_python_asgi_ctx_data_alloc, + .ctx_data_free = nxt_python_asgi_ctx_data_free, + .startup = nxt_python_asgi_startup, + .run = nxt_python_asgi_run, + .ready = nxt_python_asgi_ready, + .done = nxt_python_asgi_done, +}; + #define NXT_UNIT_HASH_WS_PROTOCOL 0xED0A @@ -102,202 +111,254 @@ nxt_python_asgi_check(PyObject *obj) } -nxt_int_t -nxt_python_asgi_init(nxt_task_t *task, nxt_unit_init_t *init) +int +nxt_python_asgi_init(nxt_unit_init_t *init, nxt_python_proto_t *proto) { - PyObject *asyncio, *loop, *get_event_loop; - nxt_int_t rc; - - nxt_debug(task, "asgi_init"); + nxt_unit_debug(NULL, "asgi_init"); - if (nxt_slow_path(nxt_py_asgi_str_init() != NXT_OK)) { - nxt_alert(task, "Python failed to init string objects"); - return NXT_ERROR; + if (nxt_slow_path(nxt_py_asgi_str_init() != NXT_UNIT_OK)) { + nxt_unit_alert(NULL, "Python failed to init string objects"); + return NXT_UNIT_ERROR; } - asyncio = PyImport_ImportModule("asyncio"); - if (nxt_slow_path(asyncio == NULL)) { - nxt_alert(task, "Python failed to import module 'asyncio'"); - nxt_python_print_exception(); - return NXT_ERROR; + nxt_py_port_read = PyCFunction_New(&nxt_py_port_read_method, NULL); + if (nxt_slow_path(nxt_py_port_read == NULL)) { + nxt_unit_alert(NULL, + "Python failed to initialize the 'port_read' function"); + return NXT_UNIT_ERROR; } - loop = NULL; - get_event_loop = PyDict_GetItemString(PyModule_GetDict(asyncio), - "get_event_loop"); - if (nxt_slow_path(get_event_loop == NULL)) { - nxt_alert(task, - "Python failed to get 'get_event_loop' from module 'asyncio'"); - goto fail; + if (nxt_slow_path(nxt_py_asgi_http_init() == NXT_UNIT_ERROR)) { + return NXT_UNIT_ERROR; } - if (nxt_slow_path(PyCallable_Check(get_event_loop) == 0)) { - nxt_alert(task, "'asyncio.get_event_loop' is not a callable object"); - goto fail; + if (nxt_slow_path(nxt_py_asgi_websocket_init() == NXT_UNIT_ERROR)) { + return NXT_UNIT_ERROR; } - loop = PyObject_CallObject(get_event_loop, NULL); - if (nxt_slow_path(loop == NULL)) { - nxt_alert(task, "Python failed to call 'asyncio.get_event_loop'"); - goto fail; - } + init->callbacks.request_handler = nxt_py_asgi_request_handler; + init->callbacks.data_handler = nxt_py_asgi_http_data_handler; + init->callbacks.websocket_handler = nxt_py_asgi_websocket_handler; + init->callbacks.close_handler = nxt_py_asgi_websocket_close_handler; + init->callbacks.quit = nxt_py_asgi_quit; + init->callbacks.shm_ack_handler = nxt_py_asgi_shm_ack_handler; + init->callbacks.add_port = nxt_py_asgi_add_port; + init->callbacks.remove_port = nxt_py_asgi_remove_port; - nxt_py_loop_create_task = PyObject_GetAttrString(loop, "create_task"); - if (nxt_slow_path(nxt_py_loop_create_task == NULL)) { - nxt_alert(task, "Python failed to get 'loop.create_task'"); - goto fail; - } + *proto = nxt_py_asgi_proto; - if (nxt_slow_path(PyCallable_Check(nxt_py_loop_create_task) == 0)) { - nxt_alert(task, "'loop.create_task' is not a callable object"); - goto fail; - } + return NXT_UNIT_OK; +} - nxt_py_loop_add_reader = PyObject_GetAttrString(loop, "add_reader"); - if (nxt_slow_path(nxt_py_loop_add_reader == NULL)) { - nxt_alert(task, "Python failed to get 'loop.add_reader'"); - goto fail; - } - if (nxt_slow_path(PyCallable_Check(nxt_py_loop_add_reader) == 0)) { - nxt_alert(task, "'loop.add_reader' is not a callable object"); - goto fail; - } +static int +nxt_python_asgi_ctx_data_alloc(void **pdata) +{ + uint32_t i; + PyObject *asyncio, *loop, *new_event_loop, *obj; + nxt_py_asgi_ctx_data_t *ctx_data; - nxt_py_loop_remove_reader = PyObject_GetAttrString(loop, "remove_reader"); - if (nxt_slow_path(nxt_py_loop_remove_reader == NULL)) { - nxt_alert(task, "Python failed to get 'loop.remove_reader'"); - goto fail; + ctx_data = nxt_unit_malloc(NULL, sizeof(nxt_py_asgi_ctx_data_t)); + if (nxt_slow_path(ctx_data == NULL)) { + nxt_unit_alert(NULL, "Failed to allocate context data"); + return NXT_UNIT_ERROR; } - if (nxt_slow_path(PyCallable_Check(nxt_py_loop_remove_reader) == 0)) { - nxt_alert(task, "'loop.remove_reader' is not a callable object"); - goto fail; - } + memset(ctx_data, 0, sizeof(nxt_py_asgi_ctx_data_t)); - nxt_py_loop_call_soon = PyObject_GetAttrString(loop, "call_soon"); - if (nxt_slow_path(nxt_py_loop_call_soon == NULL)) { - nxt_alert(task, "Python failed to get 'loop.call_soon'"); - goto fail; - } + nxt_queue_init(&ctx_data->drain_queue); - if (nxt_slow_path(PyCallable_Check(nxt_py_loop_call_soon) == 0)) { - nxt_alert(task, "'loop.call_soon' is not a callable object"); - goto fail; - } + struct { + const char *key; + PyObject **handler; - nxt_py_loop_run_until_complete = PyObject_GetAttrString(loop, - "run_until_complete"); - if (nxt_slow_path(nxt_py_loop_run_until_complete == NULL)) { - nxt_alert(task, "Python failed to get 'loop.run_until_complete'"); - goto fail; - } + } handlers[] = { + { "create_task", &ctx_data->loop_create_task }, + { "add_reader", &ctx_data->loop_add_reader }, + { "remove_reader", &ctx_data->loop_remove_reader }, + { "call_soon", &ctx_data->loop_call_soon }, + { "run_until_complete", &ctx_data->loop_run_until_complete }, + { "create_future", &ctx_data->loop_create_future }, + }; - if (nxt_slow_path(PyCallable_Check(nxt_py_loop_run_until_complete) == 0)) { - nxt_alert(task, "'loop.run_until_complete' is not a callable object"); - goto fail; - } + loop = NULL; - nxt_py_loop_create_future = PyObject_GetAttrString(loop, "create_future"); - if (nxt_slow_path(nxt_py_loop_create_future == NULL)) { - nxt_alert(task, "Python failed to get 'loop.create_future'"); + asyncio = PyImport_ImportModule("asyncio"); + if (nxt_slow_path(asyncio == NULL)) { + nxt_unit_alert(NULL, "Python failed to import module 'asyncio'"); + nxt_python_print_exception(); goto fail; } - if (nxt_slow_path(PyCallable_Check(nxt_py_loop_create_future) == 0)) { - nxt_alert(task, "'loop.create_future' is not a callable object"); + new_event_loop = PyDict_GetItemString(PyModule_GetDict(asyncio), + "new_event_loop"); + if (nxt_slow_path(new_event_loop == NULL)) { + nxt_unit_alert(NULL, + "Python failed to get 'new_event_loop' from module 'asyncio'"); goto fail; } - nxt_py_quit_future = PyObject_CallObject(nxt_py_loop_create_future, NULL); - if (nxt_slow_path(nxt_py_quit_future == NULL)) { - nxt_alert(task, "Python failed to create Future "); - nxt_python_print_exception(); + if (nxt_slow_path(PyCallable_Check(new_event_loop) == 0)) { + nxt_unit_alert(NULL, + "'asyncio.new_event_loop' is not a callable object"); goto fail; } - nxt_py_quit_future_set_result = PyObject_GetAttrString(nxt_py_quit_future, - "set_result"); - if (nxt_slow_path(nxt_py_quit_future_set_result == NULL)) { - nxt_alert(task, "Python failed to get 'future.set_result'"); + loop = PyObject_CallObject(new_event_loop, NULL); + if (nxt_slow_path(loop == NULL)) { + nxt_unit_alert(NULL, "Python failed to call 'asyncio.new_event_loop'"); goto fail; } - if (nxt_slow_path(PyCallable_Check(nxt_py_quit_future_set_result) == 0)) { - nxt_alert(task, "'future.set_result' is not a callable object"); - goto fail; + for (i = 0; i < nxt_nitems(handlers); i++) { + obj = PyObject_GetAttrString(loop, handlers[i].key); + if (nxt_slow_path(obj == NULL)) { + nxt_unit_alert(NULL, "Python failed to get 'loop.%s'", + handlers[i].key); + goto fail; + } + + *handlers[i].handler = obj; + + if (nxt_slow_path(PyCallable_Check(obj) == 0)) { + nxt_unit_alert(NULL, "'loop.%s' is not a callable object", + handlers[i].key); + goto fail; + } } - nxt_py_port_read = PyCFunction_New(&nxt_py_port_read_method, NULL); - if (nxt_slow_path(nxt_py_port_read == NULL)) { - nxt_alert(task, "Python failed to initialize the 'port_read' function"); + obj = PyObject_CallObject(ctx_data->loop_create_future, NULL); + if (nxt_slow_path(obj == NULL)) { + nxt_unit_alert(NULL, "Python failed to create Future "); + nxt_python_print_exception(); goto fail; } - nxt_queue_init(&nxt_py_asgi_drain_queue); + ctx_data->quit_future = obj; - if (nxt_slow_path(nxt_py_asgi_http_init(task) == NXT_ERROR)) { + obj = PyObject_GetAttrString(ctx_data->quit_future, "set_result"); + if (nxt_slow_path(obj == NULL)) { + nxt_unit_alert(NULL, "Python failed to get 'future.set_result'"); goto fail; } - if (nxt_slow_path(nxt_py_asgi_websocket_init(task) == NXT_ERROR)) { - goto fail; - } + ctx_data->quit_future_set_result = obj; - rc = nxt_py_asgi_lifespan_startup(task); - if (nxt_slow_path(rc == NXT_ERROR)) { + if (nxt_slow_path(PyCallable_Check(obj) == 0)) { + nxt_unit_alert(NULL, "'future.set_result' is not a callable object"); goto fail; } - init->callbacks.request_handler = nxt_py_asgi_request_handler; - init->callbacks.data_handler = nxt_py_asgi_http_data_handler; - init->callbacks.websocket_handler = nxt_py_asgi_websocket_handler; - init->callbacks.close_handler = nxt_py_asgi_websocket_close_handler; - init->callbacks.quit = nxt_py_asgi_quit; - init->callbacks.shm_ack_handler = nxt_py_asgi_shm_ack_handler; - init->callbacks.add_port = nxt_py_asgi_add_port; - init->callbacks.remove_port = nxt_py_asgi_remove_port; - Py_DECREF(loop); Py_DECREF(asyncio); - return NXT_OK; + *pdata = ctx_data; + + return NXT_UNIT_OK; fail: + nxt_python_asgi_ctx_data_free(ctx_data); + Py_XDECREF(loop); - Py_DECREF(asyncio); + Py_XDECREF(asyncio); + + return NXT_UNIT_ERROR; +} + - return NXT_ERROR; +static void +nxt_python_asgi_ctx_data_free(void *data) +{ + nxt_py_asgi_ctx_data_t *ctx_data; + + ctx_data = data; + + Py_XDECREF(ctx_data->loop_run_until_complete); + Py_XDECREF(ctx_data->loop_create_future); + Py_XDECREF(ctx_data->loop_create_task); + Py_XDECREF(ctx_data->loop_call_soon); + Py_XDECREF(ctx_data->loop_add_reader); + Py_XDECREF(ctx_data->loop_remove_reader); + Py_XDECREF(ctx_data->quit_future); + Py_XDECREF(ctx_data->quit_future_set_result); + + nxt_unit_free(NULL, ctx_data); } -nxt_int_t +static int +nxt_python_asgi_startup(void *data) +{ + return nxt_py_asgi_lifespan_startup(data); +} + + +static int nxt_python_asgi_run(nxt_unit_ctx_t *ctx) { - PyObject *res; + PyObject *res; + nxt_py_asgi_ctx_data_t *ctx_data; + + ctx_data = ctx->data; - res = PyObject_CallFunctionObjArgs(nxt_py_loop_run_until_complete, - nxt_py_quit_future, NULL); + res = PyObject_CallFunctionObjArgs(ctx_data->loop_run_until_complete, + ctx_data->quit_future, NULL); if (nxt_slow_path(res == NULL)) { nxt_unit_alert(ctx, "Python failed to call loop.run_until_complete"); nxt_python_print_exception(); - return NXT_ERROR; + return NXT_UNIT_ERROR; } Py_DECREF(res); - nxt_py_asgi_lifespan_shutdown(); + nxt_py_asgi_remove_reader(ctx, nxt_py_shared_port); + nxt_py_asgi_remove_reader(ctx, ctx_data->port); - return NXT_OK; + if (ctx_data->port != NULL) { + ctx_data->port->data = NULL; + ctx_data->port = NULL; + } + + nxt_py_asgi_lifespan_shutdown(ctx); + + return NXT_UNIT_OK; +} + + +static void +nxt_py_asgi_remove_reader(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) +{ + PyObject *res; + nxt_py_asgi_ctx_data_t *ctx_data; + + if (port == NULL || port->in_fd == -1) { + return; + } + + ctx_data = ctx->data; + + nxt_unit_debug(ctx, "asgi_remove_reader %d %p", port->in_fd, port); + + res = PyObject_CallFunctionObjArgs(ctx_data->loop_remove_reader, + PyLong_FromLong(port->in_fd), NULL); + if (nxt_slow_path(res == NULL)) { + nxt_unit_alert(ctx, "Python failed to remove_reader"); + nxt_python_print_exception(); + + return; + } + + Py_DECREF(res); } static void nxt_py_asgi_request_handler(nxt_unit_request_info_t *req) { - PyObject *scope, *res, *task, *receive, *send, *done, *asgi; + PyObject *scope, *res, *task, *receive, *send, *done, *asgi; + nxt_py_asgi_ctx_data_t *ctx_data; if (req->request->websocket_handshake) { asgi = nxt_py_asgi_websocket_create(req); @@ -365,7 +426,9 @@ nxt_py_asgi_request_handler(nxt_unit_request_info_t *req) goto release_scope; } - task = PyObject_CallFunctionObjArgs(nxt_py_loop_create_task, res, NULL); + ctx_data = req->ctx->data; + + task = PyObject_CallFunctionObjArgs(ctx_data->loop_create_task, res, NULL); if (nxt_slow_path(task == NULL)) { nxt_unit_req_error(req, "Python failed to call the create_task"); nxt_python_print_exception(); @@ -724,10 +787,46 @@ fail: static int +nxt_python_asgi_ready(nxt_unit_ctx_t *ctx) +{ + PyObject *res; + nxt_unit_port_t *port; + nxt_py_asgi_ctx_data_t *ctx_data; + + if (nxt_slow_path(nxt_py_shared_port == NULL)) { + return NXT_UNIT_ERROR; + } + + port = nxt_py_shared_port; + + nxt_unit_debug(ctx, "asgi_ready %d %p %p", port->in_fd, ctx, port); + + ctx_data = ctx->data; + + res = PyObject_CallFunctionObjArgs(ctx_data->loop_add_reader, + PyLong_FromLong(port->in_fd), + nxt_py_port_read, + PyLong_FromVoidPtr(ctx), + PyLong_FromVoidPtr(port), NULL); + if (nxt_slow_path(res == NULL)) { + nxt_unit_alert(ctx, "Python failed to add_reader"); + nxt_python_print_exception(); + + return NXT_UNIT_ERROR; + } + + Py_DECREF(res); + + return NXT_UNIT_OK; +} + + +static int nxt_py_asgi_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) { - int nb; - PyObject *res; + int nb; + PyObject *res; + nxt_py_asgi_ctx_data_t *ctx_data; if (port->in_fd == -1) { return NXT_UNIT_OK; @@ -744,13 +843,25 @@ nxt_py_asgi_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) nxt_unit_debug(ctx, "asgi_add_port %d %p %p", port->in_fd, ctx, port); - res = PyObject_CallFunctionObjArgs(nxt_py_loop_add_reader, + if (port->id.id == NXT_UNIT_SHARED_PORT_ID) { + nxt_py_shared_port = port; + + return NXT_UNIT_OK; + } + + ctx_data = ctx->data; + + ctx_data->port = port; + port->data = ctx_data; + + res = PyObject_CallFunctionObjArgs(ctx_data->loop_add_reader, PyLong_FromLong(port->in_fd), nxt_py_port_read, PyLong_FromVoidPtr(ctx), PyLong_FromVoidPtr(port), NULL); if (nxt_slow_path(res == NULL)) { nxt_unit_alert(ctx, "Python failed to add_reader"); + nxt_python_print_exception(); return NXT_UNIT_ERROR; } @@ -764,53 +875,67 @@ nxt_py_asgi_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) static void nxt_py_asgi_remove_port(nxt_unit_t *lib, nxt_unit_port_t *port) { - PyObject *res; - - nxt_unit_debug(NULL, "asgi_remove_port %d %p", port->in_fd, port); - if (port->in_fd == -1) { return; } - res = PyObject_CallFunctionObjArgs(nxt_py_loop_remove_reader, - PyLong_FromLong(port->in_fd), NULL); - if (nxt_slow_path(res == NULL)) { - nxt_unit_alert(NULL, "Python failed to remove_reader"); - } + nxt_unit_debug(NULL, "asgi_remove_port %d %p", port->in_fd, port); - Py_DECREF(res); + if (nxt_py_shared_port == port) { + nxt_py_shared_port = NULL; + } } static void nxt_py_asgi_quit(nxt_unit_ctx_t *ctx) { - PyObject *res; + PyObject *res; + nxt_py_asgi_ctx_data_t *ctx_data; nxt_unit_debug(ctx, "asgi_quit %p", ctx); - res = PyObject_CallFunctionObjArgs(nxt_py_quit_future_set_result, + ctx_data = ctx->data; + + if (nxt_py_shared_port != NULL) { + res = PyObject_CallFunctionObjArgs(ctx_data->loop_remove_reader, + PyLong_FromLong(nxt_py_shared_port->in_fd), NULL); + if (nxt_slow_path(res == NULL)) { + nxt_unit_alert(NULL, "Python failed to remove_reader"); + nxt_python_print_exception(); + + } else { + Py_DECREF(res); + } + } + + res = PyObject_CallFunctionObjArgs(ctx_data->quit_future_set_result, PyLong_FromLong(0), NULL); if (nxt_slow_path(res == NULL)) { nxt_unit_alert(ctx, "Python failed to set_result"); - } + nxt_python_print_exception(); - Py_DECREF(res); + } else { + Py_DECREF(res); + } } static void nxt_py_asgi_shm_ack_handler(nxt_unit_ctx_t *ctx) { - int rc; - nxt_queue_link_t *lnk; + int rc; + nxt_queue_link_t *lnk; + nxt_py_asgi_ctx_data_t *ctx_data; + + ctx_data = ctx->data; - while (!nxt_queue_is_empty(&nxt_py_asgi_drain_queue)) { - lnk = nxt_queue_first(&nxt_py_asgi_drain_queue); + while (!nxt_queue_is_empty(&ctx_data->drain_queue)) { + lnk = nxt_queue_first(&ctx_data->drain_queue); rc = nxt_py_asgi_http_drain(lnk); if (rc == NXT_UNIT_AGAIN) { - break; + return; } nxt_queue_remove(lnk); @@ -859,7 +984,7 @@ nxt_py_asgi_port_read(PyObject *self, PyObject *args) if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { return PyErr_Format(PyExc_RuntimeError, - "error processing port message"); + "error processing port %d message", port->id.id); } Py_RETURN_NONE; @@ -996,8 +1121,8 @@ nxt_py_asgi_add_field(void *data, int i, PyObject *name, PyObject *val) PyObject * -nxt_py_asgi_set_result_soon(nxt_unit_request_info_t *req, PyObject *future, - PyObject *result) +nxt_py_asgi_set_result_soon(nxt_unit_request_info_t *req, + nxt_py_asgi_ctx_data_t *ctx_data, PyObject *future, PyObject *result) { PyObject *set_result, *res; @@ -1013,7 +1138,7 @@ nxt_py_asgi_set_result_soon(nxt_unit_request_info_t *req, PyObject *future, Py_CLEAR(future); - goto cleanup; + goto cleanup_result; } if (nxt_slow_path(PyCallable_Check(set_result) == 0)) { @@ -1024,7 +1149,7 @@ nxt_py_asgi_set_result_soon(nxt_unit_request_info_t *req, PyObject *future, goto cleanup; } - res = PyObject_CallFunctionObjArgs(nxt_py_loop_call_soon, set_result, + res = PyObject_CallFunctionObjArgs(ctx_data->loop_call_soon, set_result, result, NULL); if (nxt_slow_path(res == NULL)) { nxt_unit_req_alert(req, "Python failed to call 'loop.call_soon'"); @@ -1038,6 +1163,9 @@ nxt_py_asgi_set_result_soon(nxt_unit_request_info_t *req, PyObject *future, cleanup: Py_DECREF(set_result); + +cleanup_result: + Py_DECREF(result); return future; @@ -1148,6 +1276,17 @@ nxt_py_asgi_new_scope(nxt_unit_request_info_t *req, PyObject *type, void +nxt_py_asgi_drain_wait(nxt_unit_request_info_t *req, nxt_queue_link_t *link) +{ + nxt_py_asgi_ctx_data_t *ctx_data; + + ctx_data = req->ctx->data; + + nxt_queue_insert_tail(&ctx_data->drain_queue, link); +} + + +void nxt_py_asgi_dealloc(PyObject *self) { PyObject_Del(self); @@ -1177,19 +1316,11 @@ nxt_py_asgi_next(PyObject *self) } -void +static void nxt_python_asgi_done(void) { nxt_py_asgi_str_done(); - Py_XDECREF(nxt_py_quit_future); - Py_XDECREF(nxt_py_quit_future_set_result); - Py_XDECREF(nxt_py_loop_run_until_complete); - Py_XDECREF(nxt_py_loop_create_future); - Py_XDECREF(nxt_py_loop_create_task); - Py_XDECREF(nxt_py_loop_call_soon); - Py_XDECREF(nxt_py_loop_add_reader); - Py_XDECREF(nxt_py_loop_remove_reader); Py_XDECREF(nxt_py_port_read); } @@ -1203,25 +1334,12 @@ nxt_python_asgi_check(PyObject *obj) } -nxt_int_t -nxt_python_asgi_init(nxt_task_t *task, nxt_unit_init_t *init) -{ - nxt_alert(task, "ASGI not implemented"); - return NXT_ERROR; -} - - -nxt_int_t -nxt_python_asgi_run(nxt_unit_ctx_t *ctx) +int +nxt_python_asgi_init(nxt_unit_init_t *init, nxt_python_proto_t *proto) { - nxt_unit_alert(ctx, "ASGI not implemented"); - return NXT_ERROR; + nxt_unit_alert(NULL, "ASGI not implemented"); + return NXT_UNIT_ERROR; } -void -nxt_python_asgi_done(void) -{ -} - #endif /* NXT_HAVE_ASGI */ diff --git a/src/python/nxt_python_asgi.h b/src/python/nxt_python_asgi.h index 24337c37..69d58477 100644 --- a/src/python/nxt_python_asgi.h +++ b/src/python/nxt_python_asgi.h @@ -10,6 +10,9 @@ typedef PyObject * (*nxt_py_asgi_enum_header_cb)(void *ctx, int i, PyObject *name, PyObject *val); +void nxt_py_asgi_drain_wait(nxt_unit_request_info_t *req, + nxt_queue_link_t *link); + typedef struct { uint32_t fields_count; uint32_t fields_size; @@ -20,6 +23,20 @@ typedef struct { uint64_t content_length; } nxt_py_asgi_add_field_ctx_t; +typedef struct { + nxt_queue_t drain_queue; + PyObject *loop_run_until_complete; + PyObject *loop_create_future; + PyObject *loop_create_task; + PyObject *loop_call_soon; + PyObject *loop_add_reader; + PyObject *loop_remove_reader; + PyObject *quit_future; + PyObject *quit_future_set_result; + PyObject *lifespan; + nxt_unit_port_t *port; +} nxt_py_asgi_ctx_data_t; + PyObject *nxt_py_asgi_enum_headers(PyObject *headers, nxt_py_asgi_enum_header_cb cb, void *data); @@ -27,7 +44,7 @@ PyObject *nxt_py_asgi_calc_size(void *data, int i, PyObject *n, PyObject *v); PyObject *nxt_py_asgi_add_field(void *data, int i, PyObject *n, PyObject *v); PyObject *nxt_py_asgi_set_result_soon(nxt_unit_request_info_t *req, - PyObject *future, PyObject *result); + nxt_py_asgi_ctx_data_t *ctx_data, PyObject *future, PyObject *result); PyObject *nxt_py_asgi_new_msg(nxt_unit_request_info_t *req, PyObject *type); PyObject *nxt_py_asgi_new_scope(nxt_unit_request_info_t *req, PyObject *type, PyObject *spec_version); @@ -37,24 +54,18 @@ PyObject *nxt_py_asgi_await(PyObject *self); PyObject *nxt_py_asgi_iter(PyObject *self); PyObject *nxt_py_asgi_next(PyObject *self); -nxt_int_t nxt_py_asgi_http_init(nxt_task_t *task); +int nxt_py_asgi_http_init(void); PyObject *nxt_py_asgi_http_create(nxt_unit_request_info_t *req); void nxt_py_asgi_http_data_handler(nxt_unit_request_info_t *req); int nxt_py_asgi_http_drain(nxt_queue_link_t *lnk); -nxt_int_t nxt_py_asgi_websocket_init(nxt_task_t *task); +int nxt_py_asgi_websocket_init(void); PyObject *nxt_py_asgi_websocket_create(nxt_unit_request_info_t *req); void nxt_py_asgi_websocket_handler(nxt_unit_websocket_frame_t *ws); void nxt_py_asgi_websocket_close_handler(nxt_unit_request_info_t *req); -nxt_int_t nxt_py_asgi_lifespan_startup(nxt_task_t *task); -nxt_int_t nxt_py_asgi_lifespan_shutdown(void); - -extern PyObject *nxt_py_loop_run_until_complete; -extern PyObject *nxt_py_loop_create_future; -extern PyObject *nxt_py_loop_create_task; - -extern nxt_queue_t nxt_py_asgi_drain_queue; +int nxt_py_asgi_lifespan_startup(nxt_py_asgi_ctx_data_t *ctx_data); +int nxt_py_asgi_lifespan_shutdown(nxt_unit_ctx_t *ctx); #endif /* _NXT_PYTHON_ASGI_H_INCLUDED_ */ diff --git a/src/python/nxt_python_asgi_http.c b/src/python/nxt_python_asgi_http.c index b07d61d6..a5034ea6 100644 --- a/src/python/nxt_python_asgi_http.c +++ b/src/python/nxt_python_asgi_http.c @@ -67,15 +67,16 @@ static PyTypeObject nxt_py_asgi_http_type = { static Py_ssize_t nxt_py_asgi_http_body_buf_size = 32 * 1024 * 1024; -nxt_int_t -nxt_py_asgi_http_init(nxt_task_t *task) +int +nxt_py_asgi_http_init(void) { if (nxt_slow_path(PyType_Ready(&nxt_py_asgi_http_type) != 0)) { - nxt_alert(task, "Python failed to initialize the 'http' type object"); - return NXT_ERROR; + nxt_unit_alert(NULL, + "Python failed to initialize the 'http' type object"); + return NXT_UNIT_ERROR; } - return NXT_OK; + return NXT_UNIT_OK; } @@ -106,6 +107,7 @@ nxt_py_asgi_http_receive(PyObject *self, PyObject *none) { PyObject *msg, *future; nxt_py_asgi_http_t *http; + nxt_py_asgi_ctx_data_t *ctx_data; nxt_unit_request_info_t *req; http = (nxt_py_asgi_http_t *) self; @@ -118,7 +120,9 @@ nxt_py_asgi_http_receive(PyObject *self, PyObject *none) return NULL; } - future = PyObject_CallObject(nxt_py_loop_create_future, NULL); + ctx_data = req->ctx->data; + + future = PyObject_CallObject(ctx_data->loop_create_future, NULL); if (nxt_slow_path(future == NULL)) { nxt_unit_req_alert(req, "Python failed to create Future object"); nxt_python_print_exception(); @@ -130,7 +134,7 @@ nxt_py_asgi_http_receive(PyObject *self, PyObject *none) } if (msg != Py_None) { - return nxt_py_asgi_set_result_soon(req, future, msg); + return nxt_py_asgi_set_result_soon(req, ctx_data, future, msg); } http->receive_future = future; @@ -329,11 +333,12 @@ nxt_py_asgi_http_response_start(nxt_py_asgi_http_t *http, PyObject *dict) static PyObject * nxt_py_asgi_http_response_body(nxt_py_asgi_http_t *http, PyObject *dict) { - int rc; - char *body_str; - ssize_t sent; - PyObject *body, *more_body, *future; - Py_ssize_t body_len, body_off; + int rc; + char *body_str; + ssize_t sent; + PyObject *body, *more_body, *future; + Py_ssize_t body_len, body_off; + nxt_py_asgi_ctx_data_t *ctx_data; body = PyDict_GetItem(dict, nxt_py_body_str); if (nxt_slow_path(body != NULL && !PyBytes_Check(body))) { @@ -371,6 +376,8 @@ nxt_py_asgi_http_response_body(nxt_py_asgi_http_t *http, PyObject *dict) body_off = 0; + ctx_data = http->req->ctx->data; + while (body_len > 0) { sent = nxt_unit_response_write_nb(http->req, body_str, body_len, 0); if (nxt_slow_path(sent < 0)) { @@ -382,7 +389,8 @@ nxt_py_asgi_http_response_body(nxt_py_asgi_http_t *http, PyObject *dict) "out of shared memory, %d", (int) body_len); - future = PyObject_CallObject(nxt_py_loop_create_future, NULL); + future = PyObject_CallObject(ctx_data->loop_create_future, + NULL); if (nxt_slow_path(future == NULL)) { nxt_unit_req_alert(http->req, "Python failed to create Future object"); @@ -396,7 +404,7 @@ nxt_py_asgi_http_response_body(nxt_py_asgi_http_t *http, PyObject *dict) Py_INCREF(http->send_body); http->send_body_off = body_off; - nxt_queue_insert_tail(&nxt_py_asgi_drain_queue, &http->link); + nxt_py_asgi_drain_wait(http->req, &http->link); http->send_future = future; Py_INCREF(http->send_future); diff --git a/src/python/nxt_python_asgi_lifespan.c b/src/python/nxt_python_asgi_lifespan.c index 14d0ee97..6910d2e8 100644 --- a/src/python/nxt_python_asgi_lifespan.c +++ b/src/python/nxt_python_asgi_lifespan.c @@ -15,15 +15,16 @@ typedef struct { PyObject_HEAD - int disabled; - int startup_received; - int startup_sent; - int shutdown_received; - int shutdown_sent; - int shutdown_called; - PyObject *startup_future; - PyObject *shutdown_future; - PyObject *receive_future; + nxt_py_asgi_ctx_data_t *ctx_data; + int disabled; + int startup_received; + int startup_sent; + int shutdown_received; + int shutdown_sent; + int shutdown_called; + PyObject *startup_future; + PyObject *shutdown_future; + PyObject *receive_future; } nxt_py_asgi_lifespan_t; @@ -39,8 +40,6 @@ static PyObject *nxt_py_asgi_lifespan_disable(nxt_py_asgi_lifespan_t *lifespan); static PyObject *nxt_py_asgi_lifespan_done(PyObject *self, PyObject *future); -static nxt_py_asgi_lifespan_t *nxt_py_lifespan; - static PyMethodDef nxt_py_asgi_lifespan_methods[] = { { "receive", nxt_py_asgi_lifespan_receive, METH_NOARGS, 0 }, { "send", nxt_py_asgi_lifespan_send, METH_O, 0 }, @@ -67,46 +66,46 @@ static PyTypeObject nxt_py_asgi_lifespan_type = { }; -nxt_int_t -nxt_py_asgi_lifespan_startup(nxt_task_t *task) +int +nxt_py_asgi_lifespan_startup(nxt_py_asgi_ctx_data_t *ctx_data) { + int rc; PyObject *scope, *res, *py_task, *receive, *send, *done; - nxt_int_t rc; nxt_py_asgi_lifespan_t *lifespan; if (nxt_slow_path(PyType_Ready(&nxt_py_asgi_lifespan_type) != 0)) { - nxt_alert(task, + nxt_unit_alert(NULL, "Python failed to initialize the 'asgi_lifespan' type object"); - return NXT_ERROR; + return NXT_UNIT_ERROR; } lifespan = PyObject_New(nxt_py_asgi_lifespan_t, &nxt_py_asgi_lifespan_type); if (nxt_slow_path(lifespan == NULL)) { - nxt_alert(task, "Python failed to create lifespan object"); - return NXT_ERROR; + nxt_unit_alert(NULL, "Python failed to create lifespan object"); + return NXT_UNIT_ERROR; } - rc = NXT_ERROR; + rc = NXT_UNIT_ERROR; receive = PyObject_GetAttrString((PyObject *) lifespan, "receive"); if (nxt_slow_path(receive == NULL)) { - nxt_alert(task, "Python failed to get 'receive' method"); + nxt_unit_alert(NULL, "Python failed to get 'receive' method"); goto release_lifespan; } send = PyObject_GetAttrString((PyObject *) lifespan, "send"); if (nxt_slow_path(receive == NULL)) { - nxt_alert(task, "Python failed to get 'send' method"); + nxt_unit_alert(NULL, "Python failed to get 'send' method"); goto release_receive; } done = PyObject_GetAttrString((PyObject *) lifespan, "_done"); if (nxt_slow_path(receive == NULL)) { - nxt_alert(task, "Python failed to get '_done' method"); + nxt_unit_alert(NULL, "Python failed to get '_done' method"); goto release_send; } - lifespan->startup_future = PyObject_CallObject(nxt_py_loop_create_future, + lifespan->startup_future = PyObject_CallObject(ctx_data->loop_create_future, NULL); if (nxt_slow_path(lifespan->startup_future == NULL)) { nxt_unit_alert(NULL, "Python failed to create Future object"); @@ -115,6 +114,7 @@ nxt_py_asgi_lifespan_startup(nxt_task_t *task) goto release_done; } + lifespan->ctx_data = ctx_data; lifespan->disabled = 0; lifespan->startup_received = 0; lifespan->startup_sent = 0; @@ -132,21 +132,20 @@ nxt_py_asgi_lifespan_startup(nxt_task_t *task) res = PyObject_CallFunctionObjArgs(nxt_py_application, scope, receive, send, NULL); if (nxt_slow_path(res == NULL)) { - nxt_log(task, NXT_LOG_ERR, "Python failed to call the application"); + nxt_unit_error(NULL, "Python failed to call the application"); nxt_python_print_exception(); goto release_scope; } if (nxt_slow_path(!PyCoro_CheckExact(res))) { - nxt_log(task, NXT_LOG_ERR, - "Application result type is not a coroutine"); + nxt_unit_error(NULL, "Application result type is not a coroutine"); Py_DECREF(res); goto release_scope; } - py_task = PyObject_CallFunctionObjArgs(nxt_py_loop_create_task, res, NULL); + py_task = PyObject_CallFunctionObjArgs(ctx_data->loop_create_task, res, NULL); if (nxt_slow_path(py_task == NULL)) { - nxt_log(task, NXT_LOG_ERR, "Python failed to call the create_task"); + nxt_unit_alert(NULL, "Python failed to call the create_task"); nxt_python_print_exception(); Py_DECREF(res); goto release_scope; @@ -157,18 +156,17 @@ nxt_py_asgi_lifespan_startup(nxt_task_t *task) res = PyObject_CallMethodObjArgs(py_task, nxt_py_add_done_callback_str, done, NULL); if (nxt_slow_path(res == NULL)) { - nxt_log(task, NXT_LOG_ERR, - "Python failed to call 'task.add_done_callback'"); + nxt_unit_alert(NULL, "Python failed to call 'task.add_done_callback'"); nxt_python_print_exception(); goto release_task; } Py_DECREF(res); - res = PyObject_CallFunctionObjArgs(nxt_py_loop_run_until_complete, + res = PyObject_CallFunctionObjArgs(ctx_data->loop_run_until_complete, lifespan->startup_future, NULL); if (nxt_slow_path(res == NULL)) { - nxt_alert(task, "Python failed to call loop.run_until_complete"); + nxt_unit_alert(NULL, "Python failed to call loop.run_until_complete"); nxt_python_print_exception(); goto release_task; } @@ -176,10 +174,10 @@ nxt_py_asgi_lifespan_startup(nxt_task_t *task) Py_DECREF(res); if (lifespan->startup_sent == 1 || lifespan->disabled) { - nxt_py_lifespan = lifespan; - Py_INCREF(nxt_py_lifespan); + ctx_data->lifespan = (PyObject *) lifespan; + Py_INCREF(ctx_data->lifespan); - rc = NXT_OK; + rc = NXT_UNIT_OK; } release_task: @@ -201,17 +199,21 @@ release_lifespan: } -nxt_int_t -nxt_py_asgi_lifespan_shutdown(void) +int +nxt_py_asgi_lifespan_shutdown(nxt_unit_ctx_t *ctx) { PyObject *msg, *future, *res; nxt_py_asgi_lifespan_t *lifespan; + nxt_py_asgi_ctx_data_t *ctx_data; + + ctx_data = ctx->data; + + lifespan = (nxt_py_asgi_lifespan_t *) ctx_data->lifespan; - if (nxt_slow_path(nxt_py_lifespan == NULL || nxt_py_lifespan->disabled)) { - return NXT_OK; + if (nxt_slow_path(lifespan == NULL || lifespan->disabled)) { + return NXT_UNIT_OK; } - lifespan = nxt_py_lifespan; lifespan->shutdown_called = 1; if (lifespan->receive_future != NULL) { @@ -231,29 +233,29 @@ nxt_py_asgi_lifespan_shutdown(void) } if (lifespan->shutdown_sent) { - return NXT_OK; + return NXT_UNIT_OK; } - lifespan->shutdown_future = PyObject_CallObject(nxt_py_loop_create_future, + lifespan->shutdown_future = PyObject_CallObject(ctx_data->loop_create_future, NULL); if (nxt_slow_path(lifespan->shutdown_future == NULL)) { nxt_unit_alert(NULL, "Python failed to create Future object"); nxt_python_print_exception(); - return NXT_ERROR; + return NXT_UNIT_ERROR; } - res = PyObject_CallFunctionObjArgs(nxt_py_loop_run_until_complete, + res = PyObject_CallFunctionObjArgs(ctx_data->loop_run_until_complete, lifespan->shutdown_future, NULL); if (nxt_slow_path(res == NULL)) { nxt_unit_alert(NULL, "Python failed to call loop.run_until_complete"); nxt_python_print_exception(); - return NXT_ERROR; + return NXT_UNIT_ERROR; } Py_DECREF(res); Py_CLEAR(lifespan->shutdown_future); - return NXT_OK; + return NXT_UNIT_OK; } @@ -262,12 +264,14 @@ nxt_py_asgi_lifespan_receive(PyObject *self, PyObject *none) { PyObject *msg, *future; nxt_py_asgi_lifespan_t *lifespan; + nxt_py_asgi_ctx_data_t *ctx_data; lifespan = (nxt_py_asgi_lifespan_t *) self; + ctx_data = lifespan->ctx_data; nxt_unit_debug(NULL, "asgi_lifespan_receive"); - future = PyObject_CallObject(nxt_py_loop_create_future, NULL); + future = PyObject_CallObject(ctx_data->loop_create_future, NULL); if (nxt_slow_path(future == NULL)) { nxt_unit_alert(NULL, "Python failed to create Future object"); nxt_python_print_exception(); @@ -281,7 +285,7 @@ nxt_py_asgi_lifespan_receive(PyObject *self, PyObject *none) msg = nxt_py_asgi_new_msg(NULL, nxt_py_lifespan_startup_str); - return nxt_py_asgi_set_result_soon(NULL, future, msg); + return nxt_py_asgi_set_result_soon(NULL, ctx_data, future, msg); } if (lifespan->shutdown_called && !lifespan->shutdown_received) { @@ -289,7 +293,7 @@ nxt_py_asgi_lifespan_receive(PyObject *self, PyObject *none) msg = nxt_py_asgi_new_msg(NULL, nxt_py_lifespan_shutdown_str); - return nxt_py_asgi_set_result_soon(NULL, future, msg); + return nxt_py_asgi_set_result_soon(NULL, ctx_data, future, msg); } Py_INCREF(future); diff --git a/src/python/nxt_python_asgi_str.c b/src/python/nxt_python_asgi_str.c index 37fa7f04..34422973 100644 --- a/src/python/nxt_python_asgi_str.c +++ b/src/python/nxt_python_asgi_str.c @@ -124,7 +124,7 @@ static nxt_python_string_t nxt_py_asgi_strings[] = { }; -nxt_int_t +int nxt_py_asgi_str_init(void) { return nxt_python_init_strings(nxt_py_asgi_strings); diff --git a/src/python/nxt_python_asgi_str.h b/src/python/nxt_python_asgi_str.h index 3f389c62..92969fd2 100644 --- a/src/python/nxt_python_asgi_str.h +++ b/src/python/nxt_python_asgi_str.h @@ -62,7 +62,7 @@ extern PyObject *nxt_py_ws_str; extern PyObject *nxt_py_wss_str; -nxt_int_t nxt_py_asgi_str_init(void); +int nxt_py_asgi_str_init(void); void nxt_py_asgi_str_done(void); diff --git a/src/python/nxt_python_asgi_websocket.c b/src/python/nxt_python_asgi_websocket.c index 5a27b588..fc7d9fa4 100644 --- a/src/python/nxt_python_asgi_websocket.c +++ b/src/python/nxt_python_asgi_websocket.c @@ -98,16 +98,16 @@ static uint64_t nxt_py_asgi_ws_max_frame_size = 1024 * 1024; static uint64_t nxt_py_asgi_ws_max_buffer_size = 10 * 1024 * 1024; -nxt_int_t -nxt_py_asgi_websocket_init(nxt_task_t *task) +int +nxt_py_asgi_websocket_init(void) { if (nxt_slow_path(PyType_Ready(&nxt_py_asgi_websocket_type) != 0)) { - nxt_alert(task, + nxt_unit_alert(NULL, "Python failed to initialize the \"asgi_websocket\" type object"); - return NXT_ERROR; + return NXT_UNIT_ERROR; } - return NXT_OK; + return NXT_UNIT_OK; } @@ -137,6 +137,7 @@ static PyObject * nxt_py_asgi_websocket_receive(PyObject *self, PyObject *none) { PyObject *future, *msg; + nxt_py_asgi_ctx_data_t *ctx_data; nxt_py_asgi_websocket_t *ws; ws = (nxt_py_asgi_websocket_t *) self; @@ -160,7 +161,9 @@ nxt_py_asgi_websocket_receive(PyObject *self, PyObject *none) "WebSocket already closed"); } - future = PyObject_CallObject(nxt_py_loop_create_future, NULL); + ctx_data = ws->req->ctx->data; + + future = PyObject_CallObject(ctx_data->loop_create_future, NULL); if (nxt_slow_path(future == NULL)) { nxt_unit_req_alert(ws->req, "Python failed to create Future object"); nxt_python_print_exception(); @@ -174,19 +177,19 @@ nxt_py_asgi_websocket_receive(PyObject *self, PyObject *none) msg = nxt_py_asgi_new_msg(ws->req, nxt_py_websocket_connect_str); - return nxt_py_asgi_set_result_soon(ws->req, future, msg); + return nxt_py_asgi_set_result_soon(ws->req, ctx_data, future, msg); } if (ws->pending_fins > 0) { msg = nxt_py_asgi_websocket_pop_msg(ws, NULL); - return nxt_py_asgi_set_result_soon(ws->req, future, msg); + return nxt_py_asgi_set_result_soon(ws->req, ctx_data, future, msg); } if (nxt_slow_path(ws->state == NXT_WS_DISCONNECTED)) { msg = nxt_py_asgi_websocket_disconnect_msg(ws); - return nxt_py_asgi_set_result_soon(ws->req, future, msg); + return nxt_py_asgi_set_result_soon(ws->req, ctx_data, future, msg); } ws->receive_future = future; diff --git a/src/python/nxt_python_wsgi.c b/src/python/nxt_python_wsgi.c index 8f692941..da7b183c 100644 --- a/src/python/nxt_python_wsgi.c +++ b/src/python/nxt_python_wsgi.c @@ -38,55 +38,57 @@ */ -typedef struct nxt_python_run_ctx_s nxt_python_run_ctx_t; - typedef struct { PyObject_HEAD -} nxt_py_input_t; + uint64_t content_length; + uint64_t bytes_sent; + PyObject *environ; + PyObject *start_resp; + PyObject *write; + nxt_unit_request_info_t *req; + PyThreadState *thread_state; +} nxt_python_ctx_t; -typedef struct { - PyObject_HEAD -} nxt_py_error_t; + +static int nxt_python_wsgi_ctx_data_alloc(void **pdata); +static void nxt_python_wsgi_ctx_data_free(void *data); +static int nxt_python_wsgi_run(nxt_unit_ctx_t *ctx); +static void nxt_python_wsgi_done(void); static void nxt_python_request_handler(nxt_unit_request_info_t *req); -static PyObject *nxt_python_create_environ(nxt_task_t *task); -static PyObject *nxt_python_get_environ(nxt_python_run_ctx_t *ctx); -static int nxt_python_add_sptr(nxt_python_run_ctx_t *ctx, PyObject *name, +static PyObject *nxt_python_create_environ(nxt_python_app_conf_t *c); +static PyObject *nxt_python_get_environ(nxt_python_ctx_t *pctx); +static int nxt_python_add_sptr(nxt_python_ctx_t *pctx, PyObject *name, nxt_unit_sptr_t *sptr, uint32_t size); -static int nxt_python_add_field(nxt_python_run_ctx_t *ctx, +static int nxt_python_add_field(nxt_python_ctx_t *pctx, nxt_unit_field_t *field, int n, uint32_t vl); static PyObject *nxt_python_field_name(const char *name, uint8_t len); static PyObject *nxt_python_field_value(nxt_unit_field_t *f, int n, uint32_t vl); -static int nxt_python_add_obj(nxt_python_run_ctx_t *ctx, PyObject *name, +static int nxt_python_add_obj(nxt_python_ctx_t *pctx, PyObject *name, PyObject *value); static PyObject *nxt_py_start_resp(PyObject *self, PyObject *args); -static int nxt_python_response_add_field(nxt_python_run_ctx_t *ctx, +static int nxt_python_response_add_field(nxt_python_ctx_t *pctx, PyObject *name, PyObject *value, int i); static int nxt_python_str_buf(PyObject *str, char **buf, uint32_t *len, PyObject **bytes); static PyObject *nxt_py_write(PyObject *self, PyObject *args); -static void nxt_py_input_dealloc(nxt_py_input_t *self); -static PyObject *nxt_py_input_read(nxt_py_input_t *self, PyObject *args); -static PyObject *nxt_py_input_readline(nxt_py_input_t *self, PyObject *args); -static PyObject *nxt_py_input_getline(nxt_python_run_ctx_t *ctx, size_t size); -static PyObject *nxt_py_input_readlines(nxt_py_input_t *self, PyObject *args); +static void nxt_py_input_dealloc(nxt_python_ctx_t *pctx); +static PyObject *nxt_py_input_read(nxt_python_ctx_t *pctx, PyObject *args); +static PyObject *nxt_py_input_readline(nxt_python_ctx_t *pctx, + PyObject *args); +static PyObject *nxt_py_input_getline(nxt_python_ctx_t *pctx, size_t size); +static PyObject *nxt_py_input_readlines(nxt_python_ctx_t *self, + PyObject *args); -static PyObject *nxt_py_input_iter(PyObject *self); -static PyObject *nxt_py_input_next(PyObject *self); +static PyObject *nxt_py_input_iter(PyObject *pctx); +static PyObject *nxt_py_input_next(PyObject *pctx); -static int nxt_python_write(nxt_python_run_ctx_t *ctx, PyObject *bytes); - -struct nxt_python_run_ctx_s { - uint64_t content_length; - uint64_t bytes_sent; - PyObject *environ; - nxt_unit_request_info_t *req; -}; +static int nxt_python_write(nxt_python_ctx_t *pctx, PyObject *bytes); static PyMethodDef nxt_py_start_resp_method[] = { @@ -111,7 +113,7 @@ static PyTypeObject nxt_py_input_type = { PyVarObject_HEAD_INIT(NULL, 0) .tp_name = "unit._input", - .tp_basicsize = sizeof(nxt_py_input_t), + .tp_basicsize = sizeof(nxt_python_ctx_t), .tp_dealloc = (destructor) nxt_py_input_dealloc, .tp_flags = Py_TPFLAGS_DEFAULT, .tp_doc = "unit input object.", @@ -121,12 +123,7 @@ static PyTypeObject nxt_py_input_type = { }; -static PyObject *nxt_py_start_resp_obj; -static PyObject *nxt_py_write_obj; -static PyObject *nxt_py_environ_ptyp; - -static PyThreadState *nxt_python_thread_state; -static nxt_python_run_ctx_t *nxt_python_run_ctx; +static PyObject *nxt_py_environ_ptyp; static PyObject *nxt_py_80_str; static PyObject *nxt_py_close_str; @@ -143,6 +140,7 @@ static PyObject *nxt_py_server_addr_str; static PyObject *nxt_py_server_name_str; static PyObject *nxt_py_server_port_str; static PyObject *nxt_py_server_protocol_str; +static PyObject *nxt_py_wsgi_input_str; static PyObject *nxt_py_wsgi_uri_scheme_str; static nxt_python_string_t nxt_python_strings[] = { @@ -161,82 +159,132 @@ static nxt_python_string_t nxt_python_strings[] = { { nxt_string("SERVER_NAME"), &nxt_py_server_name_str }, { nxt_string("SERVER_PORT"), &nxt_py_server_port_str }, { nxt_string("SERVER_PROTOCOL"), &nxt_py_server_protocol_str }, + { nxt_string("wsgi.input"), &nxt_py_wsgi_input_str }, { nxt_string("wsgi.url_scheme"), &nxt_py_wsgi_uri_scheme_str }, { nxt_null_string, NULL }, }; +static nxt_python_proto_t nxt_py_wsgi_proto = { + .ctx_data_alloc = nxt_python_wsgi_ctx_data_alloc, + .ctx_data_free = nxt_python_wsgi_ctx_data_free, + .run = nxt_python_wsgi_run, + .done = nxt_python_wsgi_done, +}; + -nxt_int_t -nxt_python_wsgi_init(nxt_task_t *task, nxt_unit_init_t *init) +int +nxt_python_wsgi_init(nxt_unit_init_t *init, nxt_python_proto_t *proto) { PyObject *obj; obj = NULL; - if (nxt_slow_path(nxt_python_init_strings(nxt_python_strings) != NXT_OK)) { - nxt_alert(task, "Python failed to init string objects"); + if (nxt_slow_path(nxt_python_init_strings(nxt_python_strings) + != NXT_UNIT_OK)) + { + nxt_unit_alert(NULL, "Python failed to init string objects"); goto fail; } - obj = PyCFunction_New(nxt_py_start_resp_method, NULL); + obj = nxt_python_create_environ(init->data); if (nxt_slow_path(obj == NULL)) { - nxt_alert(task, - "Python failed to initialize the \"start_response\" function"); goto fail; } - nxt_py_start_resp_obj = obj; + nxt_py_environ_ptyp = obj; + obj = NULL; + + init->callbacks.request_handler = nxt_python_request_handler; + + *proto = nxt_py_wsgi_proto; - obj = PyCFunction_New(nxt_py_write_method, NULL); - if (nxt_slow_path(obj == NULL)) { - nxt_alert(task, "Python failed to initialize the \"write\" function"); - goto fail; + return NXT_UNIT_OK; + +fail: + + Py_XDECREF(obj); + + return NXT_UNIT_ERROR; +} + + +static int +nxt_python_wsgi_ctx_data_alloc(void **pdata) +{ + nxt_python_ctx_t *pctx; + + pctx = PyObject_New(nxt_python_ctx_t, &nxt_py_input_type); + if (nxt_slow_path(pctx == NULL)) { + nxt_unit_alert(NULL, + "Python failed to create the \"wsgi.input\" object"); + return NXT_UNIT_ERROR; } - nxt_py_write_obj = obj; + pctx->write = NULL; - obj = nxt_python_create_environ(task); - if (nxt_slow_path(obj == NULL)) { + pctx->start_resp = PyCFunction_New(nxt_py_start_resp_method, + (PyObject *) pctx); + if (nxt_slow_path(pctx->start_resp == NULL)) { + nxt_unit_alert(NULL, + "Python failed to initialize the \"start_response\" function"); goto fail; } - nxt_py_environ_ptyp = obj; - obj = NULL; + pctx->write = PyCFunction_New(nxt_py_write_method, (PyObject *) pctx); + if (nxt_slow_path(pctx->write == NULL)) { + nxt_unit_alert(NULL, + "Python failed to initialize the \"write\" function"); + goto fail; + } - init->callbacks.request_handler = nxt_python_request_handler; + *pdata = pctx; - return NXT_OK; + return NXT_UNIT_OK; fail: - Py_XDECREF(obj); + nxt_python_wsgi_ctx_data_free(pctx); - return NXT_ERROR; + return NXT_UNIT_ERROR; } -int +static void +nxt_python_wsgi_ctx_data_free(void *data) +{ + nxt_python_ctx_t *pctx; + + pctx = data; + + Py_XDECREF(pctx->start_resp); + Py_XDECREF(pctx->write); + Py_XDECREF(pctx); +} + + +static int nxt_python_wsgi_run(nxt_unit_ctx_t *ctx) { - int rc; + int rc; + nxt_python_ctx_t *pctx; - nxt_python_thread_state = PyEval_SaveThread(); + pctx = ctx->data; + + pctx->thread_state = PyEval_SaveThread(); rc = nxt_unit_run(ctx); - PyEval_RestoreThread(nxt_python_thread_state); + PyEval_RestoreThread(pctx->thread_state); return rc; } -void +static void nxt_python_wsgi_done(void) { nxt_python_done_strings(nxt_python_strings); - Py_XDECREF(nxt_py_start_resp_obj); - Py_XDECREF(nxt_py_write_obj); Py_XDECREF(nxt_py_environ_ptyp); } @@ -244,14 +292,20 @@ nxt_python_wsgi_done(void) static void nxt_python_request_handler(nxt_unit_request_info_t *req) { - int rc; - PyObject *environ, *args, *response, *iterator, *item; - PyObject *close, *result; - nxt_python_run_ctx_t run_ctx = {-1, 0, NULL, req}; + int rc; + PyObject *environ, *args, *response, *iterator, *item; + PyObject *close, *result; + nxt_python_ctx_t *pctx; - PyEval_RestoreThread(nxt_python_thread_state); + pctx = req->ctx->data; - environ = nxt_python_get_environ(&run_ctx); + pctx->content_length = -1; + pctx->bytes_sent = 0; + pctx->req = req; + + PyEval_RestoreThread(pctx->thread_state); + + environ = nxt_python_get_environ(pctx); if (nxt_slow_path(environ == NULL)) { rc = NXT_UNIT_ERROR; goto done; @@ -269,10 +323,8 @@ nxt_python_request_handler(nxt_unit_request_info_t *req) PyTuple_SET_ITEM(args, 0, environ); - Py_INCREF(nxt_py_start_resp_obj); - PyTuple_SET_ITEM(args, 1, nxt_py_start_resp_obj); - - nxt_python_run_ctx = &run_ctx; + Py_INCREF(pctx->start_resp); + PyTuple_SET_ITEM(args, 1, pctx->start_resp); response = PyObject_CallObject(nxt_py_application, args); @@ -288,7 +340,7 @@ nxt_python_request_handler(nxt_unit_request_info_t *req) /* Shortcut: avoid iterate over response string symbols. */ if (PyBytes_Check(response)) { - rc = nxt_python_write(&run_ctx, response); + rc = nxt_python_write(pctx, response); } else { iterator = PyObject_GetIter(response); @@ -296,7 +348,7 @@ nxt_python_request_handler(nxt_unit_request_info_t *req) if (nxt_fast_path(iterator != NULL)) { rc = NXT_UNIT_OK; - while (run_ctx.bytes_sent < run_ctx.content_length) { + while (pctx->bytes_sent < pctx->content_length) { item = PyIter_Next(iterator); if (item == NULL) { @@ -312,7 +364,7 @@ nxt_python_request_handler(nxt_unit_request_info_t *req) } if (nxt_fast_path(PyBytes_Check(item))) { - rc = nxt_python_write(&run_ctx, item); + rc = nxt_python_write(pctx, item); } else { nxt_unit_req_error(req, "the application returned " @@ -361,29 +413,31 @@ nxt_python_request_handler(nxt_unit_request_info_t *req) done: - nxt_python_thread_state = PyEval_SaveThread(); + pctx->thread_state = PyEval_SaveThread(); + + pctx->req = NULL; - nxt_python_run_ctx = NULL; nxt_unit_request_done(req, rc); } static PyObject * -nxt_python_create_environ(nxt_task_t *task) +nxt_python_create_environ(nxt_python_app_conf_t *c) { PyObject *obj, *err, *environ; environ = PyDict_New(); if (nxt_slow_path(environ == NULL)) { - nxt_alert(task, "Python failed to create the \"environ\" dictionary"); + nxt_unit_alert(NULL, + "Python failed to create the \"environ\" dictionary"); return NULL; } obj = PyString_FromStringAndSize((char *) nxt_server.start, nxt_server.length); if (nxt_slow_path(obj == NULL)) { - nxt_alert(task, + nxt_unit_alert(NULL, "Python failed to create the \"SERVER_SOFTWARE\" environ value"); goto fail; } @@ -391,7 +445,7 @@ nxt_python_create_environ(nxt_task_t *task) if (nxt_slow_path(PyDict_SetItemString(environ, "SERVER_SOFTWARE", obj) != 0)) { - nxt_alert(task, + nxt_unit_alert(NULL, "Python failed to set the \"SERVER_SOFTWARE\" environ value"); goto fail; } @@ -401,15 +455,15 @@ nxt_python_create_environ(nxt_task_t *task) obj = Py_BuildValue("(ii)", 1, 0); if (nxt_slow_path(obj == NULL)) { - nxt_alert(task, + nxt_unit_alert(NULL, "Python failed to build the \"wsgi.version\" environ value"); goto fail; } if (nxt_slow_path(PyDict_SetItemString(environ, "wsgi.version", obj) != 0)) { - nxt_alert(task, - "Python failed to set the \"wsgi.version\" environ value"); + nxt_unit_alert(NULL, + "Python failed to set the \"wsgi.version\" environ value"); goto fail; } @@ -418,10 +472,10 @@ nxt_python_create_environ(nxt_task_t *task) if (nxt_slow_path(PyDict_SetItemString(environ, "wsgi.multithread", - Py_False) + c->threads > 1 ? Py_True : Py_False) != 0)) { - nxt_alert(task, + nxt_unit_alert(NULL, "Python failed to set the \"wsgi.multithread\" environ value"); goto fail; } @@ -430,7 +484,7 @@ nxt_python_create_environ(nxt_task_t *task) Py_True) != 0)) { - nxt_alert(task, + nxt_unit_alert(NULL, "Python failed to set the \"wsgi.multiprocess\" environ value"); goto fail; } @@ -439,46 +493,30 @@ nxt_python_create_environ(nxt_task_t *task) Py_False) != 0)) { - nxt_alert(task, + nxt_unit_alert(NULL, "Python failed to set the \"wsgi.run_once\" environ value"); goto fail; } if (nxt_slow_path(PyType_Ready(&nxt_py_input_type) != 0)) { - nxt_alert(task, + nxt_unit_alert(NULL, "Python failed to initialize the \"wsgi.input\" type object"); goto fail; } - obj = (PyObject *) PyObject_New(nxt_py_input_t, &nxt_py_input_type); - - if (nxt_slow_path(obj == NULL)) { - nxt_alert(task, "Python failed to create the \"wsgi.input\" object"); - goto fail; - } - - if (nxt_slow_path(PyDict_SetItemString(environ, "wsgi.input", obj) != 0)) { - nxt_alert(task, - "Python failed to set the \"wsgi.input\" environ value"); - goto fail; - } - - Py_DECREF(obj); - obj = NULL; - err = PySys_GetObject((char *) "stderr"); if (nxt_slow_path(err == NULL)) { - nxt_alert(task, "Python failed to get \"sys.stderr\" object"); + nxt_unit_alert(NULL, "Python failed to get \"sys.stderr\" object"); goto fail; } if (nxt_slow_path(PyDict_SetItemString(environ, "wsgi.errors", err) != 0)) { - nxt_alert(task, - "Python failed to set the \"wsgi.errors\" environ value"); + nxt_unit_alert(NULL, + "Python failed to set the \"wsgi.errors\" environ value"); goto fail; } @@ -494,7 +532,7 @@ fail: static PyObject * -nxt_python_get_environ(nxt_python_run_ctx_t *ctx) +nxt_python_get_environ(nxt_python_ctx_t *pctx) { int rc; uint32_t i, j, vl; @@ -504,15 +542,15 @@ nxt_python_get_environ(nxt_python_run_ctx_t *ctx) environ = PyDict_Copy(nxt_py_environ_ptyp); if (nxt_slow_path(environ == NULL)) { - nxt_unit_req_error(ctx->req, + nxt_unit_req_error(pctx->req, "Python failed to copy the \"environ\" dictionary"); return NULL; } - ctx->environ = environ; + pctx->environ = environ; - r = ctx->req->request; + r = pctx->req->request; #define RC(S) \ do { \ @@ -522,36 +560,36 @@ nxt_python_get_environ(nxt_python_run_ctx_t *ctx) } \ } while(0) - RC(nxt_python_add_sptr(ctx, nxt_py_request_method_str, &r->method, + RC(nxt_python_add_sptr(pctx, nxt_py_request_method_str, &r->method, r->method_length)); - RC(nxt_python_add_sptr(ctx, nxt_py_request_uri_str, &r->target, + RC(nxt_python_add_sptr(pctx, nxt_py_request_uri_str, &r->target, r->target_length)); - RC(nxt_python_add_sptr(ctx, nxt_py_query_string_str, &r->query, + RC(nxt_python_add_sptr(pctx, nxt_py_query_string_str, &r->query, r->query_length)); - RC(nxt_python_add_sptr(ctx, nxt_py_path_info_str, &r->path, + RC(nxt_python_add_sptr(pctx, nxt_py_path_info_str, &r->path, r->path_length)); - RC(nxt_python_add_sptr(ctx, nxt_py_remote_addr_str, &r->remote, + RC(nxt_python_add_sptr(pctx, nxt_py_remote_addr_str, &r->remote, r->remote_length)); - RC(nxt_python_add_sptr(ctx, nxt_py_server_addr_str, &r->local, + RC(nxt_python_add_sptr(pctx, nxt_py_server_addr_str, &r->local, r->local_length)); if (r->tls) { - RC(nxt_python_add_obj(ctx, nxt_py_wsgi_uri_scheme_str, + RC(nxt_python_add_obj(pctx, nxt_py_wsgi_uri_scheme_str, nxt_py_https_str)); } else { - RC(nxt_python_add_obj(ctx, nxt_py_wsgi_uri_scheme_str, + RC(nxt_python_add_obj(pctx, nxt_py_wsgi_uri_scheme_str, nxt_py_http_str)); } - RC(nxt_python_add_sptr(ctx, nxt_py_server_protocol_str, &r->version, + RC(nxt_python_add_sptr(pctx, nxt_py_server_protocol_str, &r->version, r->version_length)); - RC(nxt_python_add_sptr(ctx, nxt_py_server_name_str, &r->server_name, + RC(nxt_python_add_sptr(pctx, nxt_py_server_name_str, &r->server_name, r->server_name_length)); - RC(nxt_python_add_obj(ctx, nxt_py_server_port_str, nxt_py_80_str)); + RC(nxt_python_add_obj(pctx, nxt_py_server_port_str, nxt_py_80_str)); - nxt_unit_request_group_dup_fields(ctx->req); + nxt_unit_request_group_dup_fields(pctx->req); for (i = 0; i < r->fields_count;) { f = r->fields + i; @@ -569,7 +607,7 @@ nxt_python_get_environ(nxt_python_run_ctx_t *ctx) vl += 2 + f2->value_length; } - RC(nxt_python_add_field(ctx, f, j - i, vl)); + RC(nxt_python_add_field(pctx, f, j - i, vl)); i = j; } @@ -577,19 +615,27 @@ nxt_python_get_environ(nxt_python_run_ctx_t *ctx) if (r->content_length_field != NXT_UNIT_NONE_FIELD) { f = r->fields + r->content_length_field; - RC(nxt_python_add_sptr(ctx, nxt_py_content_length_str, &f->value, + RC(nxt_python_add_sptr(pctx, nxt_py_content_length_str, &f->value, f->value_length)); } if (r->content_type_field != NXT_UNIT_NONE_FIELD) { f = r->fields + r->content_type_field; - RC(nxt_python_add_sptr(ctx, nxt_py_content_type_str, &f->value, + RC(nxt_python_add_sptr(pctx, nxt_py_content_type_str, &f->value, f->value_length)); } #undef RC + if (nxt_slow_path(PyDict_SetItem(environ, nxt_py_wsgi_input_str, + (PyObject *) pctx) != 0)) + { + nxt_unit_req_error(pctx->req, + "Python failed to set the \"wsgi.input\" environ value"); + goto fail; + } + return environ; fail: @@ -601,7 +647,7 @@ fail: static int -nxt_python_add_sptr(nxt_python_run_ctx_t *ctx, PyObject *name, +nxt_python_add_sptr(nxt_python_ctx_t *pctx, PyObject *name, nxt_unit_sptr_t *sptr, uint32_t size) { char *src; @@ -611,7 +657,7 @@ nxt_python_add_sptr(nxt_python_run_ctx_t *ctx, PyObject *name, value = PyString_FromStringAndSize(src, size); if (nxt_slow_path(value == NULL)) { - nxt_unit_req_error(ctx->req, + nxt_unit_req_error(pctx->req, "Python failed to create value string \"%.*s\"", (int) size, src); nxt_python_print_exception(); @@ -619,8 +665,8 @@ nxt_python_add_sptr(nxt_python_run_ctx_t *ctx, PyObject *name, return NXT_UNIT_ERROR; } - if (nxt_slow_path(PyDict_SetItem(ctx->environ, name, value) != 0)) { - nxt_unit_req_error(ctx->req, + if (nxt_slow_path(PyDict_SetItem(pctx->environ, name, value) != 0)) { + nxt_unit_req_error(pctx->req, "Python failed to set the \"%s\" environ value", PyUnicode_AsUTF8(name)); Py_DECREF(value); @@ -635,7 +681,7 @@ nxt_python_add_sptr(nxt_python_run_ctx_t *ctx, PyObject *name, static int -nxt_python_add_field(nxt_python_run_ctx_t *ctx, nxt_unit_field_t *field, int n, +nxt_python_add_field(nxt_python_ctx_t *pctx, nxt_unit_field_t *field, int n, uint32_t vl) { char *src; @@ -645,7 +691,7 @@ nxt_python_add_field(nxt_python_run_ctx_t *ctx, nxt_unit_field_t *field, int n, name = nxt_python_field_name(src, field->name_length); if (nxt_slow_path(name == NULL)) { - nxt_unit_req_error(ctx->req, + nxt_unit_req_error(pctx->req, "Python failed to create name string \"%.*s\"", (int) field->name_length, src); nxt_python_print_exception(); @@ -656,7 +702,7 @@ nxt_python_add_field(nxt_python_run_ctx_t *ctx, nxt_unit_field_t *field, int n, value = nxt_python_field_value(field, n, vl); if (nxt_slow_path(value == NULL)) { - nxt_unit_req_error(ctx->req, + nxt_unit_req_error(pctx->req, "Python failed to create value string \"%.*s\"", (int) field->value_length, (char *) nxt_unit_sptr_get(&field->value)); @@ -665,8 +711,8 @@ nxt_python_add_field(nxt_python_run_ctx_t *ctx, nxt_unit_field_t *field, int n, goto fail; } - if (nxt_slow_path(PyDict_SetItem(ctx->environ, name, value) != 0)) { - nxt_unit_req_error(ctx->req, + if (nxt_slow_path(PyDict_SetItem(pctx->environ, name, value) != 0)) { + nxt_unit_req_error(pctx->req, "Python failed to set the \"%s\" environ value", PyUnicode_AsUTF8(name)); goto fail; @@ -761,10 +807,10 @@ nxt_python_field_value(nxt_unit_field_t *f, int n, uint32_t vl) static int -nxt_python_add_obj(nxt_python_run_ctx_t *ctx, PyObject *name, PyObject *value) +nxt_python_add_obj(nxt_python_ctx_t *pctx, PyObject *name, PyObject *value) { - if (nxt_slow_path(PyDict_SetItem(ctx->environ, name, value) != 0)) { - nxt_unit_req_error(ctx->req, + if (nxt_slow_path(PyDict_SetItem(pctx->environ, name, value) != 0)) { + nxt_unit_req_error(pctx->req, "Python failed to set the \"%s\" environ value", PyUnicode_AsUTF8(name)); @@ -778,15 +824,15 @@ nxt_python_add_obj(nxt_python_run_ctx_t *ctx, PyObject *name, PyObject *value) static PyObject * nxt_py_start_resp(PyObject *self, PyObject *args) { - int rc, status; - char *status_str, *space_ptr; - uint32_t status_len; - PyObject *headers, *tuple, *string, *status_bytes; - Py_ssize_t i, n, fields_size, fields_count; - nxt_python_run_ctx_t *ctx; - - ctx = nxt_python_run_ctx; - if (nxt_slow_path(ctx == NULL)) { + int rc, status; + char *status_str, *space_ptr; + uint32_t status_len; + PyObject *headers, *tuple, *string, *status_bytes; + Py_ssize_t i, n, fields_size, fields_count; + nxt_python_ctx_t *pctx; + + pctx = (nxt_python_ctx_t *) self; + if (nxt_slow_path(pctx->req == NULL)) { return PyErr_Format(PyExc_RuntimeError, "start_response() is called " "outside of WSGI request processing"); @@ -851,7 +897,7 @@ nxt_py_start_resp(PyObject *self, PyObject *args) } } - ctx->content_length = -1; + pctx->content_length = -1; string = PyTuple_GET_ITEM(args, 0); rc = nxt_python_str_buf(string, &status_str, &status_len, &status_bytes); @@ -877,7 +923,7 @@ nxt_py_start_resp(PyObject *self, PyObject *args) * ... applications can replace their originally intended output with error * output, up until the last possible moment. */ - rc = nxt_unit_response_init(ctx->req, status, fields_count, fields_size); + rc = nxt_unit_response_init(pctx->req, status, fields_count, fields_size); if (nxt_slow_path(rc != NXT_UNIT_OK)) { return PyErr_Format(PyExc_RuntimeError, "failed to allocate response object"); @@ -886,7 +932,7 @@ nxt_py_start_resp(PyObject *self, PyObject *args) for (i = 0; i < fields_count; i++) { tuple = PyList_GET_ITEM(headers, i); - rc = nxt_python_response_add_field(ctx, PyTuple_GET_ITEM(tuple, 0), + rc = nxt_python_response_add_field(pctx, PyTuple_GET_ITEM(tuple, 0), PyTuple_GET_ITEM(tuple, 1), i); if (nxt_slow_path(rc != NXT_UNIT_OK)) { return PyErr_Format(PyExc_RuntimeError, @@ -907,21 +953,21 @@ nxt_py_start_resp(PyObject *self, PyObject *args) * possible exception to this rule is if the response headers explicitly * include a Content-Length of zero.) */ - if (ctx->content_length == 0) { - rc = nxt_unit_response_send(ctx->req); + if (pctx->content_length == 0) { + rc = nxt_unit_response_send(pctx->req); if (nxt_slow_path(rc != NXT_UNIT_OK)) { return PyErr_Format(PyExc_RuntimeError, "failed to send response headers"); } } - Py_INCREF(nxt_py_write_obj); - return nxt_py_write_obj; + Py_INCREF(pctx->write); + return pctx->write; } static int -nxt_python_response_add_field(nxt_python_run_ctx_t *ctx, PyObject *name, +nxt_python_response_add_field(nxt_python_ctx_t *pctx, PyObject *name, PyObject *value, int i) { int rc; @@ -943,20 +989,20 @@ nxt_python_response_add_field(nxt_python_run_ctx_t *ctx, PyObject *name, goto fail; } - rc = nxt_unit_response_add_field(ctx->req, name_str, name_length, + rc = nxt_unit_response_add_field(pctx->req, name_str, name_length, value_str, value_length); if (nxt_slow_path(rc != NXT_UNIT_OK)) { goto fail; } - if (ctx->req->response->fields[i].hash == NXT_UNIT_HASH_CONTENT_LENGTH) { + if (pctx->req->response->fields[i].hash == NXT_UNIT_HASH_CONTENT_LENGTH) { content_length = nxt_off_t_parse((u_char *) value_str, value_length); if (nxt_slow_path(content_length < 0)) { - nxt_unit_req_error(ctx->req, "failed to parse Content-Length " + nxt_unit_req_error(pctx->req, "failed to parse Content-Length " "value %.*s", (int) value_length, value_str); } else { - ctx->content_length = content_length; + pctx->content_length = content_length; } } @@ -1001,7 +1047,7 @@ nxt_py_write(PyObject *self, PyObject *str) NXT_PYTHON_BYTES_TYPE); } - rc = nxt_python_write(nxt_python_run_ctx, str); + rc = nxt_python_write((nxt_python_ctx_t *) self, str); if (nxt_slow_path(rc != NXT_UNIT_OK)) { return PyErr_Format(PyExc_RuntimeError, "failed to write response value"); @@ -1012,28 +1058,26 @@ nxt_py_write(PyObject *self, PyObject *str) static void -nxt_py_input_dealloc(nxt_py_input_t *self) +nxt_py_input_dealloc(nxt_python_ctx_t *self) { PyObject_Del(self); } static PyObject * -nxt_py_input_read(nxt_py_input_t *self, PyObject *args) +nxt_py_input_read(nxt_python_ctx_t *pctx, PyObject *args) { - char *buf; - PyObject *content, *obj; - Py_ssize_t size, n; - nxt_python_run_ctx_t *ctx; + char *buf; + PyObject *content, *obj; + Py_ssize_t size, n; - ctx = nxt_python_run_ctx; - if (nxt_slow_path(ctx == NULL)) { + if (nxt_slow_path(pctx->req == NULL)) { return PyErr_Format(PyExc_RuntimeError, "wsgi.input.read() is called " "outside of WSGI request processing"); } - size = ctx->req->content_length; + size = pctx->req->content_length; n = PyTuple_GET_SIZE(args); @@ -1057,8 +1101,8 @@ nxt_py_input_read(nxt_py_input_t *self, PyObject *args) } } - if (size == -1 || size > (Py_ssize_t) ctx->req->content_length) { - size = ctx->req->content_length; + if (size == -1 || size > (Py_ssize_t) pctx->req->content_length) { + size = pctx->req->content_length; } } @@ -1069,22 +1113,20 @@ nxt_py_input_read(nxt_py_input_t *self, PyObject *args) buf = PyBytes_AS_STRING(content); - size = nxt_unit_request_read(ctx->req, buf, size); + size = nxt_unit_request_read(pctx->req, buf, size); return content; } static PyObject * -nxt_py_input_readline(nxt_py_input_t *self, PyObject *args) +nxt_py_input_readline(nxt_python_ctx_t *pctx, PyObject *args) { - ssize_t ssize; - PyObject *obj; - Py_ssize_t n; - nxt_python_run_ctx_t *ctx; + ssize_t ssize; + PyObject *obj; + Py_ssize_t n; - ctx = nxt_python_run_ctx; - if (nxt_slow_path(ctx == NULL)) { + if (nxt_slow_path(pctx->req == NULL)) { return PyErr_Format(PyExc_RuntimeError, "wsgi.input.readline() is called " "outside of WSGI request processing"); @@ -1102,7 +1144,7 @@ nxt_py_input_readline(nxt_py_input_t *self, PyObject *args) ssize = PyNumber_AsSsize_t(obj, PyExc_OverflowError); if (nxt_fast_path(ssize > 0)) { - return nxt_py_input_getline(ctx, ssize); + return nxt_py_input_getline(pctx, ssize); } if (ssize == 0) { @@ -1119,18 +1161,18 @@ nxt_py_input_readline(nxt_py_input_t *self, PyObject *args) } } - return nxt_py_input_getline(ctx, SSIZE_MAX); + return nxt_py_input_getline(pctx, SSIZE_MAX); } static PyObject * -nxt_py_input_getline(nxt_python_run_ctx_t *ctx, size_t size) +nxt_py_input_getline(nxt_python_ctx_t *pctx, size_t size) { void *buf; ssize_t res; PyObject *content; - res = nxt_unit_request_readline_size(ctx->req, size); + res = nxt_unit_request_readline_size(pctx->req, size); if (nxt_slow_path(res < 0)) { return NULL; } @@ -1146,20 +1188,18 @@ nxt_py_input_getline(nxt_python_run_ctx_t *ctx, size_t size) buf = PyBytes_AS_STRING(content); - res = nxt_unit_request_read(ctx->req, buf, res); + res = nxt_unit_request_read(pctx->req, buf, res); return content; } static PyObject * -nxt_py_input_readlines(nxt_py_input_t *self, PyObject *args) +nxt_py_input_readlines(nxt_python_ctx_t *pctx, PyObject *args) { - PyObject *res; - nxt_python_run_ctx_t *ctx; + PyObject *res; - ctx = nxt_python_run_ctx; - if (nxt_slow_path(ctx == NULL)) { + if (nxt_slow_path(pctx->req == NULL)) { return PyErr_Format(PyExc_RuntimeError, "wsgi.input.readlines() is called " "outside of WSGI request processing"); @@ -1171,7 +1211,7 @@ nxt_py_input_readlines(nxt_py_input_t *self, PyObject *args) } for ( ;; ) { - PyObject *line = nxt_py_input_getline(ctx, SSIZE_MAX); + PyObject *line = nxt_py_input_getline(pctx, SSIZE_MAX); if (nxt_slow_path(line == NULL)) { Py_DECREF(res); return NULL; @@ -1201,17 +1241,17 @@ nxt_py_input_iter(PyObject *self) static PyObject * nxt_py_input_next(PyObject *self) { - PyObject *line; - nxt_python_run_ctx_t *ctx; + PyObject *line; + nxt_python_ctx_t *pctx; - ctx = nxt_python_run_ctx; - if (nxt_slow_path(ctx == NULL)) { + pctx = (nxt_python_ctx_t *) self; + if (nxt_slow_path(pctx->req == NULL)) { return PyErr_Format(PyExc_RuntimeError, "wsgi.input.next() is called " "outside of WSGI request processing"); } - line = nxt_py_input_getline(ctx, SSIZE_MAX); + line = nxt_py_input_getline(pctx, SSIZE_MAX); if (nxt_slow_path(line == NULL)) { return NULL; } @@ -1227,7 +1267,7 @@ nxt_py_input_next(PyObject *self) static int -nxt_python_write(nxt_python_run_ctx_t *ctx, PyObject *bytes) +nxt_python_write(nxt_python_ctx_t *pctx, PyObject *bytes) { int rc; char *str_buf; @@ -1248,16 +1288,16 @@ nxt_python_write(nxt_python_run_ctx_t *ctx, PyObject *bytes) * stop iterating over the response when enough data has been sent, or raise * an error if the application tries to write() past that point. */ - if (nxt_slow_path(str_length > ctx->content_length - ctx->bytes_sent)) { - nxt_unit_req_error(ctx->req, "content length %"PRIu64" exceeded", - ctx->content_length); + if (nxt_slow_path(str_length > pctx->content_length - pctx->bytes_sent)) { + nxt_unit_req_error(pctx->req, "content length %"PRIu64" exceeded", + pctx->content_length); return NXT_UNIT_ERROR; } - rc = nxt_unit_response_write(ctx->req, str_buf, str_length); + rc = nxt_unit_response_write(pctx->req, str_buf, str_length); if (nxt_fast_path(rc == NXT_UNIT_OK)) { - ctx->bytes_sent += str_length; + pctx->bytes_sent += str_length; } return rc; |