summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_unit.h
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/nxt_unit.h82
1 files changed, 24 insertions, 58 deletions
diff --git a/src/nxt_unit.h b/src/nxt_unit.h
index 596dd8b6..67244cf4 100644
--- a/src/nxt_unit.h
+++ b/src/nxt_unit.h
@@ -19,6 +19,8 @@
enum {
NXT_UNIT_OK = 0,
NXT_UNIT_ERROR = 1,
+ NXT_UNIT_AGAIN = 2,
+ NXT_UNIT_CANCELLED = 3,
};
enum {
@@ -91,8 +93,7 @@ struct nxt_unit_request_info_s {
nxt_unit_t *unit;
nxt_unit_ctx_t *ctx;
- nxt_unit_port_id_t request_port;
- nxt_unit_port_id_t response_port;
+ nxt_unit_port_t *response_port;
nxt_unit_request_t *request;
nxt_unit_buf_t *request_buf;
@@ -120,6 +121,8 @@ struct nxt_unit_callbacks_s {
*/
void (*request_handler)(nxt_unit_request_info_t *req);
+ void (*data_handler)(nxt_unit_request_info_t *req);
+
/* Process websocket frame. */
void (*websocket_handler)(nxt_unit_websocket_frame_t *ws);
@@ -130,26 +133,25 @@ struct nxt_unit_callbacks_s {
int (*add_port)(nxt_unit_ctx_t *, nxt_unit_port_t *port);
/* Remove previously added port. Optional. */
- void (*remove_port)(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id);
+ void (*remove_port)(nxt_unit_t *, nxt_unit_port_t *port);
/* Remove all data associated with process pid including ports. Optional. */
- void (*remove_pid)(nxt_unit_ctx_t *, pid_t pid);
+ void (*remove_pid)(nxt_unit_t *, pid_t pid);
/* Gracefully quit the application. Optional. */
void (*quit)(nxt_unit_ctx_t *);
- /* Shared memory release acknowledgement. */
+ /* Shared memory release acknowledgement. Optional. */
void (*shm_ack_handler)(nxt_unit_ctx_t *);
/* Send data and control to process pid using port id. Optional. */
- ssize_t (*port_send)(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id,
+ ssize_t (*port_send)(nxt_unit_ctx_t *, nxt_unit_port_t *port,
const void *buf, size_t buf_size,
const void *oob, size_t oob_size);
/* Receive data on port id. Optional. */
- ssize_t (*port_recv)(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id,
+ ssize_t (*port_recv)(nxt_unit_ctx_t *, nxt_unit_port_t *port,
void *buf, size_t buf_size, void *oob, size_t oob_size);
-
};
@@ -165,6 +167,7 @@ struct nxt_unit_init_s {
nxt_unit_port_t ready_port;
uint32_t ready_stream;
+ nxt_unit_port_t router_port;
nxt_unit_port_t read_port;
int log_fd;
};
@@ -189,16 +192,6 @@ struct nxt_unit_read_info_s {
nxt_unit_ctx_t *nxt_unit_init(nxt_unit_init_t *);
/*
- * Process received message, invoke configured callbacks.
- *
- * If application implements it's own event loop, each datagram received
- * from port socket should be initially processed by unit. This function
- * may invoke other application-defined callback for message processing.
- */
-int nxt_unit_process_msg(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id,
- void *buf, size_t buf_size, void *oob, size_t oob_size);
-
-/*
* Main function useful in case when application does not have it's own
* event loop. nxt_unit_run() starts infinite message wait and process loop.
*
@@ -211,8 +204,21 @@ int nxt_unit_process_msg(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id,
*/
int nxt_unit_run(nxt_unit_ctx_t *);
+int nxt_unit_run_ctx(nxt_unit_ctx_t *ctx);
+
+int nxt_unit_run_shared(nxt_unit_ctx_t *ctx);
+
+/*
+ * Receive and process one message, invoke configured callbacks.
+ *
+ * If application implements it's own event loop, each datagram received
+ * from port socket should be initially processed by unit. This function
+ * may invoke other application-defined callback for message processing.
+ */
int nxt_unit_run_once(nxt_unit_ctx_t *ctx);
+int nxt_unit_process_port_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port);
+
/* Destroy application library object. */
void nxt_unit_done(nxt_unit_ctx_t *);
@@ -222,49 +228,9 @@ void nxt_unit_done(nxt_unit_ctx_t *);
*/
nxt_unit_ctx_t *nxt_unit_ctx_alloc(nxt_unit_ctx_t *, void *);
-/* Free unused context. It is not required to free main context. */
-void nxt_unit_ctx_free(nxt_unit_ctx_t *);
-
/* Initialize port_id, calculate hash. */
void nxt_unit_port_id_init(nxt_unit_port_id_t *port_id, pid_t pid, uint16_t id);
-/*
- * Create extra incoming port, perform all required actions to propogate
- * the port to Unit server. Fills structure referenced by port_id with
- * current pid and new port id.
- */
-int nxt_unit_create_send_port(nxt_unit_ctx_t *, nxt_unit_port_id_t *dst,
- nxt_unit_port_id_t *port_id);
-
-/* Default 'add_port' implementation. */
-int nxt_unit_add_port(nxt_unit_ctx_t *, nxt_unit_port_t *port);
-
-/* Find previously added port. */
-nxt_unit_port_t *nxt_unit_find_port(nxt_unit_ctx_t *,
- nxt_unit_port_id_t *port_id);
-
-/* Find, fill output 'port' and remove port from storage. */
-void nxt_unit_find_remove_port(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id,
- nxt_unit_port_t *port);
-
-/* Default 'remove_port' implementation. */
-void nxt_unit_remove_port(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id);
-
-/* Default 'remove_pid' implementation. */
-void nxt_unit_remove_pid(nxt_unit_ctx_t *, pid_t pid);
-
-/* Default 'quit' implementation. */
-void nxt_unit_quit(nxt_unit_ctx_t *);
-
-/* Default 'port_send' implementation. */
-ssize_t nxt_unit_port_send(nxt_unit_ctx_t *, int fd,
- const void *buf, size_t buf_size,
- const void *oob, size_t oob_size);
-
-/* Default 'port_recv' implementation. */
-ssize_t nxt_unit_port_recv(nxt_unit_ctx_t *, int fd, void *buf, size_t buf_size,
- void *oob, size_t oob_size);
-
/* Calculates hash for given field name. */
uint16_t nxt_unit_field_hash(const char* name, size_t name_length);