summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/nxt_port_rpc.c28
-rw-r--r--src/nxt_port_rpc.h2
-rw-r--r--src/nxt_runtime.c4
3 files changed, 31 insertions, 3 deletions
diff --git a/src/nxt_port_rpc.c b/src/nxt_port_rpc.c
index 77e8af45..37f2d902 100644
--- a/src/nxt_port_rpc.c
+++ b/src/nxt_port_rpc.c
@@ -8,7 +8,7 @@
#include <nxt_port_rpc.h>
-static nxt_atomic_t nxt_stream_ident = 1;
+static volatile uint32_t *nxt_stream_ident;
typedef struct nxt_port_rpc_reg_s nxt_port_rpc_reg_t;
@@ -30,6 +30,29 @@ nxt_port_rpc_remove_from_peers(nxt_task_t *task, nxt_port_t *port,
nxt_port_rpc_reg_t *reg);
+nxt_int_t
+nxt_port_rpc_init(void)
+{
+ void *p;
+
+ if (nxt_stream_ident != NULL) {
+ return NXT_OK;
+ }
+
+ p = nxt_mem_mmap(NULL, sizeof(*nxt_stream_ident), PROT_READ | PROT_WRITE,
+ MAP_ANON | MAP_SHARED, -1, 0);
+
+ if (nxt_slow_path(p == MAP_FAILED)) {
+ return NXT_ERROR;
+ }
+
+ nxt_stream_ident = p;
+ *nxt_stream_ident = 1;
+
+ return NXT_OK;
+}
+
+
static nxt_int_t
nxt_rpc_reg_test(nxt_lvlhsh_query_t *lhq, void *data)
{
@@ -105,8 +128,7 @@ nxt_port_rpc_register_handler_ex(nxt_task_t *task, nxt_port_t *port,
nxt_assert(port->pair[0] != -1);
- stream =
- (uint32_t) nxt_atomic_fetch_add(&nxt_stream_ident, 1) & 0x3FFFFFFF;
+ stream = nxt_atomic_fetch_add(nxt_stream_ident, 1);
reg = nxt_mp_zalloc(port->mem_pool, sizeof(nxt_port_rpc_reg_t) + ex_size);
diff --git a/src/nxt_port_rpc.h b/src/nxt_port_rpc.h
index 8011e474..c07683fb 100644
--- a/src/nxt_port_rpc.h
+++ b/src/nxt_port_rpc.h
@@ -11,6 +11,8 @@
typedef void (*nxt_port_rpc_handler_t)(nxt_task_t *task,
nxt_port_recv_msg_t *msg, void *data);
+nxt_int_t nxt_port_rpc_init(void);
+
uint32_t nxt_port_rpc_register_handler(nxt_task_t *task, nxt_port_t *port,
nxt_port_rpc_handler_t ready_handler, nxt_port_rpc_handler_t error_handler,
nxt_pid_t peer, void *data);
diff --git a/src/nxt_runtime.c b/src/nxt_runtime.c
index bcd156ee..ea01f06f 100644
--- a/src/nxt_runtime.c
+++ b/src/nxt_runtime.c
@@ -118,6 +118,10 @@ nxt_runtime_create(nxt_task_t *task)
goto fail;
}
+ if (nxt_port_rpc_init() != NXT_OK) {
+ goto fail;
+ }
+
nxt_work_queue_add(&task->thread->engine->fast_work_queue,
nxt_runtime_start, task, rt, NULL);