summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_port.h
diff options
context:
space:
mode:
authorMax Romanov <max.romanov@nginx.com>2017-12-27 17:46:17 +0300
committerMax Romanov <max.romanov@nginx.com>2017-12-27 17:46:17 +0300
commit89c0f7c5db5003b8fd8df3e1babb0c802004bf4c (patch)
tree7ac164d6fe41fd76beb3d328e6fde4020f744b63 /src/nxt_port.h
parent45d08d5145c63fd788f85d9e789314dcf093c99e (diff)
downloadunit-89c0f7c5db5003b8fd8df3e1babb0c802004bf4c.tar.gz
unit-89c0f7c5db5003b8fd8df3e1babb0c802004bf4c.tar.bz2
Implementing the ability to cancel request before worker starts processing it.
Diffstat (limited to 'src/nxt_port.h')
-rw-r--r--src/nxt_port.h27
1 files changed, 23 insertions, 4 deletions
diff --git a/src/nxt_port.h b/src/nxt_port.h
index 2c9a1a99..c54a1537 100644
--- a/src/nxt_port.h
+++ b/src/nxt_port.h
@@ -102,13 +102,21 @@ typedef struct {
nxt_port_id_t reply_port;
uint8_t type;
+
+ /* Last message for this stream. */
uint8_t last; /* 1 bit */
/* Message data send using mmap, next chunk is a nxt_port_mmap_msg_t. */
uint8_t mmap; /* 1 bit */
- uint8_t nf;
- uint8_t mf;
+ /* Non-First fragment in fragmented message sequence. */
+ uint8_t nf; /* 1 bit */
+
+ /* More Fragments followed. */
+ uint8_t mf; /* 1 bit */
+
+ /* Message delivery tracking enabled, next chunk is tracking msg. */
+ uint8_t tracking; /* 1 bit */
} nxt_port_msg_t;
@@ -119,6 +127,7 @@ typedef struct {
nxt_fd_t fd;
nxt_bool_t close_fd;
nxt_port_msg_t port_msg;
+ uint32_t tracking_msg[2];
nxt_work_t work;
} nxt_port_send_msg_t;
@@ -130,6 +139,7 @@ struct nxt_port_recv_msg_s {
nxt_port_t *port;
nxt_port_msg_t port_msg;
size_t size;
+ nxt_bool_t cancelled;
union {
nxt_port_t *new_port;
nxt_pid_t removed_pid;
@@ -221,9 +231,18 @@ void nxt_port_write_enable(nxt_task_t *task, nxt_port_t *port);
void nxt_port_write_close(nxt_port_t *port);
void nxt_port_read_enable(nxt_task_t *task, nxt_port_t *port);
void nxt_port_read_close(nxt_port_t *port);
-nxt_int_t nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port,
+nxt_int_t nxt_port_socket_twrite(nxt_task_t *task, nxt_port_t *port,
+ nxt_uint_t type, nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port,
+ nxt_buf_t *b, void *tracking);
+
+nxt_inline nxt_int_t
+nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port,
nxt_uint_t type, nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port,
- nxt_buf_t *b);
+ nxt_buf_t *b)
+{
+ return nxt_port_socket_twrite(task, port, type, fd, stream, reply_port, b,
+ NULL);
+}
void nxt_port_enable(nxt_task_t *task, nxt_port_t *port,
nxt_port_handlers_t *handlers);