diff options
Diffstat (limited to 'src/nxt_unit.h')
-rw-r--r-- | src/nxt_unit.h | 82 |
1 files changed, 24 insertions, 58 deletions
diff --git a/src/nxt_unit.h b/src/nxt_unit.h index 596dd8b6..67244cf4 100644 --- a/src/nxt_unit.h +++ b/src/nxt_unit.h @@ -19,6 +19,8 @@ enum { NXT_UNIT_OK = 0, NXT_UNIT_ERROR = 1, + NXT_UNIT_AGAIN = 2, + NXT_UNIT_CANCELLED = 3, }; enum { @@ -91,8 +93,7 @@ struct nxt_unit_request_info_s { nxt_unit_t *unit; nxt_unit_ctx_t *ctx; - nxt_unit_port_id_t request_port; - nxt_unit_port_id_t response_port; + nxt_unit_port_t *response_port; nxt_unit_request_t *request; nxt_unit_buf_t *request_buf; @@ -120,6 +121,8 @@ struct nxt_unit_callbacks_s { */ void (*request_handler)(nxt_unit_request_info_t *req); + void (*data_handler)(nxt_unit_request_info_t *req); + /* Process websocket frame. */ void (*websocket_handler)(nxt_unit_websocket_frame_t *ws); @@ -130,26 +133,25 @@ struct nxt_unit_callbacks_s { int (*add_port)(nxt_unit_ctx_t *, nxt_unit_port_t *port); /* Remove previously added port. Optional. */ - void (*remove_port)(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id); + void (*remove_port)(nxt_unit_t *, nxt_unit_port_t *port); /* Remove all data associated with process pid including ports. Optional. */ - void (*remove_pid)(nxt_unit_ctx_t *, pid_t pid); + void (*remove_pid)(nxt_unit_t *, pid_t pid); /* Gracefully quit the application. Optional. */ void (*quit)(nxt_unit_ctx_t *); - /* Shared memory release acknowledgement. */ + /* Shared memory release acknowledgement. Optional. */ void (*shm_ack_handler)(nxt_unit_ctx_t *); /* Send data and control to process pid using port id. Optional. */ - ssize_t (*port_send)(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id, + ssize_t (*port_send)(nxt_unit_ctx_t *, nxt_unit_port_t *port, const void *buf, size_t buf_size, const void *oob, size_t oob_size); /* Receive data on port id. Optional. */ - ssize_t (*port_recv)(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id, + ssize_t (*port_recv)(nxt_unit_ctx_t *, nxt_unit_port_t *port, void *buf, size_t buf_size, void *oob, size_t oob_size); - }; @@ -165,6 +167,7 @@ struct nxt_unit_init_s { nxt_unit_port_t ready_port; uint32_t ready_stream; + nxt_unit_port_t router_port; nxt_unit_port_t read_port; int log_fd; }; @@ -189,16 +192,6 @@ struct nxt_unit_read_info_s { nxt_unit_ctx_t *nxt_unit_init(nxt_unit_init_t *); /* - * Process received message, invoke configured callbacks. - * - * If application implements it's own event loop, each datagram received - * from port socket should be initially processed by unit. This function - * may invoke other application-defined callback for message processing. - */ -int nxt_unit_process_msg(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id, - void *buf, size_t buf_size, void *oob, size_t oob_size); - -/* * Main function useful in case when application does not have it's own * event loop. nxt_unit_run() starts infinite message wait and process loop. * @@ -211,8 +204,21 @@ int nxt_unit_process_msg(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id, */ int nxt_unit_run(nxt_unit_ctx_t *); +int nxt_unit_run_ctx(nxt_unit_ctx_t *ctx); + +int nxt_unit_run_shared(nxt_unit_ctx_t *ctx); + +/* + * Receive and process one message, invoke configured callbacks. + * + * If application implements it's own event loop, each datagram received + * from port socket should be initially processed by unit. This function + * may invoke other application-defined callback for message processing. + */ int nxt_unit_run_once(nxt_unit_ctx_t *ctx); +int nxt_unit_process_port_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port); + /* Destroy application library object. */ void nxt_unit_done(nxt_unit_ctx_t *); @@ -222,49 +228,9 @@ void nxt_unit_done(nxt_unit_ctx_t *); */ nxt_unit_ctx_t *nxt_unit_ctx_alloc(nxt_unit_ctx_t *, void *); -/* Free unused context. It is not required to free main context. */ -void nxt_unit_ctx_free(nxt_unit_ctx_t *); - /* Initialize port_id, calculate hash. */ void nxt_unit_port_id_init(nxt_unit_port_id_t *port_id, pid_t pid, uint16_t id); -/* - * Create extra incoming port, perform all required actions to propogate - * the port to Unit server. Fills structure referenced by port_id with - * current pid and new port id. - */ -int nxt_unit_create_send_port(nxt_unit_ctx_t *, nxt_unit_port_id_t *dst, - nxt_unit_port_id_t *port_id); - -/* Default 'add_port' implementation. */ -int nxt_unit_add_port(nxt_unit_ctx_t *, nxt_unit_port_t *port); - -/* Find previously added port. */ -nxt_unit_port_t *nxt_unit_find_port(nxt_unit_ctx_t *, - nxt_unit_port_id_t *port_id); - -/* Find, fill output 'port' and remove port from storage. */ -void nxt_unit_find_remove_port(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id, - nxt_unit_port_t *port); - -/* Default 'remove_port' implementation. */ -void nxt_unit_remove_port(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id); - -/* Default 'remove_pid' implementation. */ -void nxt_unit_remove_pid(nxt_unit_ctx_t *, pid_t pid); - -/* Default 'quit' implementation. */ -void nxt_unit_quit(nxt_unit_ctx_t *); - -/* Default 'port_send' implementation. */ -ssize_t nxt_unit_port_send(nxt_unit_ctx_t *, int fd, - const void *buf, size_t buf_size, - const void *oob, size_t oob_size); - -/* Default 'port_recv' implementation. */ -ssize_t nxt_unit_port_recv(nxt_unit_ctx_t *, int fd, void *buf, size_t buf_size, - void *oob, size_t oob_size); - /* Calculates hash for given field name. */ uint16_t nxt_unit_field_hash(const char* name, size_t name_length); |