summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_port.h
diff options
context:
space:
mode:
authorMax Romanov <max.romanov@nginx.com>2017-10-04 14:58:47 +0300
committerMax Romanov <max.romanov@nginx.com>2017-10-04 14:58:47 +0300
commit6a64533fa3b96bb64bfde4615e40376d65a292cb (patch)
treea18ed8059158d833290519e1135209747e28af21 /src/nxt_port.h
parent414d508e04d26ebef0e3e1ba4ed518b11d3af1a0 (diff)
downloadunit-6a64533fa3b96bb64bfde4615e40376d65a292cb.tar.gz
unit-6a64533fa3b96bb64bfde4615e40376d65a292cb.tar.bz2
Introducing use counters for port and app. Thread safe port write.
Use counter helps to simplify logic around port and application free. Port 'post' function introduced to simplify post execution of particular function to original port engine's thread. Write message queue is protected by mutex which makes port write operation thread safe.
Diffstat (limited to 'src/nxt_port.h')
-rw-r--r--src/nxt_port.h17
1 files changed, 13 insertions, 4 deletions
diff --git a/src/nxt_port.h b/src/nxt_port.h
index c5b2b40e..f6679bb2 100644
--- a/src/nxt_port.h
+++ b/src/nxt_port.h
@@ -115,7 +115,6 @@ typedef struct {
size_t share;
nxt_fd_t fd;
nxt_bool_t close_fd;
- nxt_bool_t opened;
nxt_port_msg_t port_msg;
nxt_work_t work;
@@ -145,12 +144,15 @@ struct nxt_port_s {
nxt_app_t *app;
nxt_queue_t messages; /* of nxt_port_send_msg_t */
+ nxt_thread_mutex_t write_mutex;
/* Maximum size of message part. */
uint32_t max_size;
/* Maximum interleave of message parts. */
uint32_t max_share;
- uint32_t app_stream;
+
+ uint32_t app_requests;
+ uint32_t app_responses;
nxt_port_handler_t handler;
nxt_port_handler_t *data;
@@ -167,8 +169,9 @@ struct nxt_port_s {
nxt_lvlhsh_t rpc_streams; /* stream to nxt_port_rpc_reg_t */
nxt_lvlhsh_t rpc_peers; /* peer to queue of nxt_port_rpc_reg_t */
+ nxt_atomic_t use_count;
+
nxt_process_type_t type;
- nxt_work_t work;
struct iovec *iov;
void *mmsg_buf;
@@ -194,9 +197,11 @@ typedef union {
} nxt_port_data_t;
+typedef void (*nxt_port_post_handler_t)(nxt_task_t *task, nxt_port_t *port,
+ void *data);
+
nxt_port_t *nxt_port_new(nxt_task_t *task, nxt_port_id_t id, nxt_pid_t pid,
nxt_process_type_t type);
-nxt_bool_t nxt_port_release(nxt_port_t *port);
nxt_port_id_t nxt_port_get_next_id(void);
void nxt_port_reset_next_id(void);
@@ -204,6 +209,7 @@ void nxt_port_reset_next_id(void);
nxt_int_t nxt_port_socket_init(nxt_task_t *task, nxt_port_t *port,
size_t max_size);
void nxt_port_destroy(nxt_port_t *port);
+void nxt_port_close(nxt_task_t *task, nxt_port_t *port);
void nxt_port_write_enable(nxt_task_t *task, nxt_port_t *port);
void nxt_port_write_close(nxt_port_t *port);
void nxt_port_read_enable(nxt_task_t *task, nxt_port_t *port);
@@ -231,5 +237,8 @@ void nxt_port_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
void nxt_port_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
void nxt_port_empty_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
+nxt_int_t nxt_port_post(nxt_task_t *task, nxt_port_t *port,
+ nxt_port_post_handler_t handler, void *data);
+void nxt_port_use(nxt_task_t *task, nxt_port_t *port, int i);
#endif /* _NXT_PORT_H_INCLUDED_ */