From ec3389b63bd7a9159d2be4a2863140f75095c7d3 Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Tue, 11 Aug 2020 19:19:55 +0300 Subject: Libunit refactoring: port management. - Changed the port management callbacks to notifications, which e. g. avoids the need to call the libunit function - Added context and library instance reference counts for a safer resource release - Added the router main port initialization --- src/nxt_unit.h | 44 ++++---------------------------------------- 1 file changed, 4 insertions(+), 40 deletions(-) (limited to 'src/nxt_unit.h') diff --git a/src/nxt_unit.h b/src/nxt_unit.h index 596dd8b6..fa1fa843 100644 --- a/src/nxt_unit.h +++ b/src/nxt_unit.h @@ -130,15 +130,15 @@ struct nxt_unit_callbacks_s { int (*add_port)(nxt_unit_ctx_t *, nxt_unit_port_t *port); /* Remove previously added port. Optional. */ - void (*remove_port)(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id); + void (*remove_port)(nxt_unit_t *, nxt_unit_port_t *port); /* Remove all data associated with process pid including ports. Optional. */ - void (*remove_pid)(nxt_unit_ctx_t *, pid_t pid); + void (*remove_pid)(nxt_unit_t *, pid_t pid); /* Gracefully quit the application. Optional. */ void (*quit)(nxt_unit_ctx_t *); - /* Shared memory release acknowledgement. */ + /* Shared memory release acknowledgement. Optional. */ void (*shm_ack_handler)(nxt_unit_ctx_t *); /* Send data and control to process pid using port id. Optional. */ @@ -149,7 +149,6 @@ struct nxt_unit_callbacks_s { /* Receive data on port id. Optional. */ ssize_t (*port_recv)(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id, void *buf, size_t buf_size, void *oob, size_t oob_size); - }; @@ -165,6 +164,7 @@ struct nxt_unit_init_s { nxt_unit_port_t ready_port; uint32_t ready_stream; + nxt_unit_port_t router_port; nxt_unit_port_t read_port; int log_fd; }; @@ -222,45 +222,9 @@ void nxt_unit_done(nxt_unit_ctx_t *); */ nxt_unit_ctx_t *nxt_unit_ctx_alloc(nxt_unit_ctx_t *, void *); -/* Free unused context. It is not required to free main context. */ -void nxt_unit_ctx_free(nxt_unit_ctx_t *); - /* Initialize port_id, calculate hash. */ void nxt_unit_port_id_init(nxt_unit_port_id_t *port_id, pid_t pid, uint16_t id); -/* - * Create extra incoming port, perform all required actions to propogate - * the port to Unit server. Fills structure referenced by port_id with - * current pid and new port id. - */ -int nxt_unit_create_send_port(nxt_unit_ctx_t *, nxt_unit_port_id_t *dst, - nxt_unit_port_id_t *port_id); - -/* Default 'add_port' implementation. */ -int nxt_unit_add_port(nxt_unit_ctx_t *, nxt_unit_port_t *port); - -/* Find previously added port. */ -nxt_unit_port_t *nxt_unit_find_port(nxt_unit_ctx_t *, - nxt_unit_port_id_t *port_id); - -/* Find, fill output 'port' and remove port from storage. */ -void nxt_unit_find_remove_port(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id, - nxt_unit_port_t *port); - -/* Default 'remove_port' implementation. */ -void nxt_unit_remove_port(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id); - -/* Default 'remove_pid' implementation. */ -void nxt_unit_remove_pid(nxt_unit_ctx_t *, pid_t pid); - -/* Default 'quit' implementation. */ -void nxt_unit_quit(nxt_unit_ctx_t *); - -/* Default 'port_send' implementation. */ -ssize_t nxt_unit_port_send(nxt_unit_ctx_t *, int fd, - const void *buf, size_t buf_size, - const void *oob, size_t oob_size); - /* Default 'port_recv' implementation. */ ssize_t nxt_unit_port_recv(nxt_unit_ctx_t *, int fd, void *buf, size_t buf_size, void *oob, size_t oob_size); -- cgit From bf647588ff781e606651f001b53a4e83bb34c000 Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Tue, 11 Aug 2020 19:20:06 +0300 Subject: Adding a reference counter to the libunit port structure. The goal is to minimize the number of (pid, id) to port hash lookups which require a library mutex lock. The response port is found once per request, while the read port is initialized at startup. --- src/nxt_unit.h | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) (limited to 'src/nxt_unit.h') diff --git a/src/nxt_unit.h b/src/nxt_unit.h index fa1fa843..6723026f 100644 --- a/src/nxt_unit.h +++ b/src/nxt_unit.h @@ -91,8 +91,7 @@ struct nxt_unit_request_info_s { nxt_unit_t *unit; nxt_unit_ctx_t *ctx; - nxt_unit_port_id_t request_port; - nxt_unit_port_id_t response_port; + nxt_unit_port_t *response_port; nxt_unit_request_t *request; nxt_unit_buf_t *request_buf; @@ -142,12 +141,12 @@ struct nxt_unit_callbacks_s { void (*shm_ack_handler)(nxt_unit_ctx_t *); /* Send data and control to process pid using port id. Optional. */ - ssize_t (*port_send)(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id, + ssize_t (*port_send)(nxt_unit_ctx_t *, nxt_unit_port_t *port, const void *buf, size_t buf_size, const void *oob, size_t oob_size); /* Receive data on port id. Optional. */ - ssize_t (*port_recv)(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id, + ssize_t (*port_recv)(nxt_unit_ctx_t *, nxt_unit_port_t *port, void *buf, size_t buf_size, void *oob, size_t oob_size); }; @@ -195,7 +194,7 @@ nxt_unit_ctx_t *nxt_unit_init(nxt_unit_init_t *); * from port socket should be initially processed by unit. This function * may invoke other application-defined callback for message processing. */ -int nxt_unit_process_msg(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id, +int nxt_unit_process_msg(nxt_unit_ctx_t *, void *buf, size_t buf_size, void *oob, size_t oob_size); /* @@ -225,10 +224,6 @@ nxt_unit_ctx_t *nxt_unit_ctx_alloc(nxt_unit_ctx_t *, void *); /* Initialize port_id, calculate hash. */ void nxt_unit_port_id_init(nxt_unit_port_id_t *port_id, pid_t pid, uint16_t id); -/* Default 'port_recv' implementation. */ -ssize_t nxt_unit_port_recv(nxt_unit_ctx_t *, int fd, void *buf, size_t buf_size, - void *oob, size_t oob_size); - /* Calculates hash for given field name. */ uint16_t nxt_unit_field_hash(const char* name, size_t name_length); -- cgit From 3cbc22a6dc45abdeade4deb364601230ddca02c1 Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Tue, 11 Aug 2020 19:20:10 +0300 Subject: Changing router to application port exchange protocol. The application process needs to request the port from the router instead of the latter pushing the port before sending a request to the application. This is required to simplify the communication between the router and the application and to prepare the router to use the application shared port and then the queue. --- src/nxt_unit.h | 1 + 1 file changed, 1 insertion(+) (limited to 'src/nxt_unit.h') diff --git a/src/nxt_unit.h b/src/nxt_unit.h index 6723026f..8fa64f4e 100644 --- a/src/nxt_unit.h +++ b/src/nxt_unit.h @@ -19,6 +19,7 @@ enum { NXT_UNIT_OK = 0, NXT_UNIT_ERROR = 1, + NXT_UNIT_AGAIN = 2, }; enum { -- cgit From 6e31d6cd39be9d3f4ee680fc13c3fe42f5cd39e7 Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Tue, 11 Aug 2020 19:20:13 +0300 Subject: Changing router to application shared memory exchange protocol. The application process needs to request the shared memory segment from the router instead of the latter pushing the segment before sending a request to the application. This is required to simplify the communication between the router and the application and to prepare the router for using the application shared port and then the queue. --- src/nxt_unit.h | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) (limited to 'src/nxt_unit.h') diff --git a/src/nxt_unit.h b/src/nxt_unit.h index 8fa64f4e..79157f5f 100644 --- a/src/nxt_unit.h +++ b/src/nxt_unit.h @@ -20,6 +20,7 @@ enum { NXT_UNIT_OK = 0, NXT_UNIT_ERROR = 1, NXT_UNIT_AGAIN = 2, + NXT_UNIT_CANCELLED = 3, }; enum { @@ -188,16 +189,6 @@ struct nxt_unit_read_info_s { */ nxt_unit_ctx_t *nxt_unit_init(nxt_unit_init_t *); -/* - * Process received message, invoke configured callbacks. - * - * If application implements it's own event loop, each datagram received - * from port socket should be initially processed by unit. This function - * may invoke other application-defined callback for message processing. - */ -int nxt_unit_process_msg(nxt_unit_ctx_t *, - void *buf, size_t buf_size, void *oob, size_t oob_size); - /* * Main function useful in case when application does not have it's own * event loop. nxt_unit_run() starts infinite message wait and process loop. -- cgit From 83595606121a821f9e3cef0f0b7e7fe87eb1e50a Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Tue, 11 Aug 2020 19:20:15 +0300 Subject: Introducing the shared application port. This is the port shared between all application processes which use it to pass requests for processing. Using it significantly simplifies the request processing code in the router. The drawback is 2 more file descriptors per each configured application and more complex libunit message wait/read code. --- src/nxt_unit.h | 13 +++++++++++++ 1 file changed, 13 insertions(+) (limited to 'src/nxt_unit.h') diff --git a/src/nxt_unit.h b/src/nxt_unit.h index 79157f5f..0f16773f 100644 --- a/src/nxt_unit.h +++ b/src/nxt_unit.h @@ -202,8 +202,21 @@ nxt_unit_ctx_t *nxt_unit_init(nxt_unit_init_t *); */ int nxt_unit_run(nxt_unit_ctx_t *); +int nxt_unit_run_ctx(nxt_unit_ctx_t *ctx); + +int nxt_unit_run_shared(nxt_unit_ctx_t *ctx); + +/* + * Receive and process one message, invoke configured callbacks. + * + * If application implements it's own event loop, each datagram received + * from port socket should be initially processed by unit. This function + * may invoke other application-defined callback for message processing. + */ int nxt_unit_run_once(nxt_unit_ctx_t *ctx); +int nxt_unit_process_port_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port); + /* Destroy application library object. */ void nxt_unit_done(nxt_unit_ctx_t *); -- cgit From e227fc9e6281c280c46139a81646ecd7b0510e2b Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Tue, 11 Aug 2020 19:20:34 +0300 Subject: Introducing application and port shared memory queues. The goal is to minimize the number of syscalls needed to deliver a message. --- src/nxt_unit.h | 2 ++ 1 file changed, 2 insertions(+) (limited to 'src/nxt_unit.h') diff --git a/src/nxt_unit.h b/src/nxt_unit.h index 0f16773f..67244cf4 100644 --- a/src/nxt_unit.h +++ b/src/nxt_unit.h @@ -121,6 +121,8 @@ struct nxt_unit_callbacks_s { */ void (*request_handler)(nxt_unit_request_info_t *req); + void (*data_handler)(nxt_unit_request_info_t *req); + /* Process websocket frame. */ void (*websocket_handler)(nxt_unit_websocket_frame_t *ws); -- cgit