diff options
Diffstat (limited to '')
-rw-r--r-- | src/go/unit/nxt_go_port.c | 8 | ||||
-rw-r--r-- | src/go/unit/nxt_go_port_memory.c | 8 | ||||
-rw-r--r-- | src/go/unit/nxt_go_process.c | 15 | ||||
-rw-r--r-- | src/go/unit/nxt_go_run_ctx.c | 30 | ||||
-rw-r--r-- | src/go/unit/port.go | 16 | ||||
-rw-r--r-- | src/go/unit/request.go | 34 |
6 files changed, 68 insertions, 43 deletions
diff --git a/src/go/unit/nxt_go_port.c b/src/go/unit/nxt_go_port.c index a46a33d1..e866a242 100644 --- a/src/go/unit/nxt_go_port.c +++ b/src/go/unit/nxt_go_port.c @@ -123,7 +123,7 @@ nxt_go_port_on_read(void *buf, size_t buf_size, void *oob, size_t oob_size) nxt_port_msg_new_port_t *new_port_msg; fd = -1; - nxt_go_debug("on read: %d (%d)", (int)buf_size, (int)oob_size); + nxt_go_debug("on read: %d (%d)", (int) buf_size, (int) oob_size); cm = oob; if (oob_size >= CMSG_SPACE(sizeof(int)) @@ -137,11 +137,11 @@ nxt_go_port_on_read(void *buf, size_t buf_size, void *oob, size_t oob_size) port_msg = buf; if (buf_size < sizeof(nxt_port_msg_t)) { - nxt_go_warn("message too small (%d bytes)", (int)buf_size); + nxt_go_warn("message too small (%d bytes)", (int) buf_size); goto fail; } - buf_end = ((char *)buf) + buf_size; + buf_end = ((char *) buf) + buf_size; payload = port_msg + 1; payload_size = buf_size - sizeof(nxt_port_msg_t); @@ -151,7 +151,7 @@ nxt_go_port_on_read(void *buf, size_t buf_size, void *oob, size_t oob_size) } if (port_msg->type >= NXT_PORT_MSG_MAX) { - nxt_go_warn("unknown message type (%d)", (int)port_msg->type); + nxt_go_warn("unknown message type (%d)", (int) port_msg->type); goto fail; } diff --git a/src/go/unit/nxt_go_port_memory.c b/src/go/unit/nxt_go_port_memory.c index e63ca16c..c2a63a42 100644 --- a/src/go/unit/nxt_go_port_memory.c +++ b/src/go/unit/nxt_go_port_memory.c @@ -52,6 +52,7 @@ nxt_go_new_port_mmap(nxt_go_process_t *process, nxt_port_id_t id) name_len = snprintf(name, sizeof(name) - 1, "/unit.go.%p", name); #if (NXT_HAVE_MEMFD_CREATE) + fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC); if (nxt_slow_path(fd == -1)) { @@ -63,7 +64,9 @@ nxt_go_new_port_mmap(nxt_go_process_t *process, nxt_port_id_t id) nxt_go_debug("memfd_create(%s): %d", name, fd); #elif (NXT_HAVE_SHM_OPEN) - shm_unlink((char *) name); // just in case + + /* Just in case. */ + shm_unlink((char *) name); fd = shm_open((char *) name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR); @@ -78,6 +81,7 @@ nxt_go_new_port_mmap(nxt_go_process_t *process, nxt_port_id_t id) if (nxt_slow_path(shm_unlink((char *) name) == -1)) { nxt_go_warn("shm_unlink(%s) failed %d", name, errno); } + #endif if (nxt_slow_path(ftruncate(fd, PORT_MMAP_SIZE) == -1)) { @@ -135,7 +139,7 @@ nxt_go_new_port_mmap(nxt_go_process_t *process, nxt_port_id_t id) &cmsg, sizeof(cmsg)); nxt_go_debug("new mmap #%d created for %d -> %d", - (int)hdr->id, (int)getpid(), (int)process->pid); + (int) hdr->id, (int) getpid(), (int) process->pid); close(fd); diff --git a/src/go/unit/nxt_go_process.c b/src/go/unit/nxt_go_process.c index c7ce052d..c2b2c01c 100644 --- a/src/go/unit/nxt_go_process.c +++ b/src/go/unit/nxt_go_process.c @@ -35,10 +35,10 @@ nxt_go_find_process(nxt_pid_t pid, uint32_t *pos) process = nxt_go_array_at(&processes, i); nxt_go_debug("compare process #%d (%p) at %d", - (int)process->pid, process, (int)i); + (int) process->pid, process, (int) i); if (pid == process->pid) { - nxt_go_debug("found process %d at %d", (int)pid, (int)i); + nxt_go_debug("found process %d at %d", (int) pid, (int) i); if (pos != NULL) { *pos = i; @@ -49,6 +49,7 @@ nxt_go_find_process(nxt_pid_t pid, uint32_t *pos) if (pid < process->pid) { r = i; + } else { l = i + 1; } @@ -60,7 +61,7 @@ nxt_go_find_process(nxt_pid_t pid, uint32_t *pos) *pos = i; } - nxt_go_debug("process %d not found, best pos %d", (int)pid, (int)i); + nxt_go_debug("process %d not found, best pos %d", (int) pid, (int) i); return NULL; } @@ -78,8 +79,8 @@ nxt_go_get_process(nxt_pid_t pid) nxt_go_array_add(&processes); process = nxt_go_array_at(&processes, pos); - nxt_go_debug("init process #%d (%p) at %d", (int)pid, process, - (int)pos); + nxt_go_debug("init process #%d (%p) at %d", + (int) pid, process, (int) pos); if (pos < processes.nelts - 1) { memmove(process + 1, process, @@ -108,10 +109,10 @@ nxt_go_new_incoming_mmap(nxt_pid_t pid, nxt_fd_t fd) process = nxt_go_get_process(pid); nxt_go_debug("got new mmap fd #%d from process %d", - (int)fd, (int)pid); + (int) fd, (int) pid); if (fstat(fd, &mmap_stat) == -1) { - nxt_go_warn("fstat(%d) failed %d", (int)fd, errno); + nxt_go_warn("fstat(%d) failed %d", (int) fd, errno); return; } diff --git a/src/go/unit/nxt_go_run_ctx.c b/src/go/unit/nxt_go_run_ctx.c index cca8273e..0dd72f8b 100644 --- a/src/go/unit/nxt_go_run_ctx.c +++ b/src/go/unit/nxt_go_run_ctx.c @@ -29,7 +29,7 @@ nxt_go_ctx_msg_rbuf(nxt_go_run_ctx_t *ctx, nxt_go_msg_t *msg, nxt_buf_t *buf, if (nxt_slow_path(msg->mmap_msg == NULL)) { if (n > 0) { - nxt_go_warn("failed to get plain buf #%d", (int)n); + nxt_go_warn("failed to get plain buf #%d", (int) n); return NXT_ERROR; } @@ -44,15 +44,15 @@ nxt_go_ctx_msg_rbuf(nxt_go_run_ctx_t *ctx, nxt_go_msg_t *msg, nxt_buf_t *buf, mmap_msg = msg->mmap_msg + n; if (nxt_slow_path(mmap_msg >= msg->end)) { - nxt_go_warn("no more data in shm #%d", (int)n); + nxt_go_warn("no more data in shm #%d", (int) n); return NXT_ERROR; } if (nxt_slow_path(mmap_msg->mmap_id >= ctx->process->incoming.nelts)) { nxt_go_warn("incoming shared memory segment #%d not found " - "for process %d", (int)mmap_msg->mmap_id, - (int)msg->port_msg->pid); + "for process %d", (int) mmap_msg->mmap_id, + (int) msg->port_msg->pid); return NXT_ERROR; } @@ -103,6 +103,7 @@ nxt_go_ctx_init_msg(nxt_go_msg_t *msg, nxt_port_msg_t *port_msg, msg->data_size += mmap_msg->size; mmap_msg += 1; } + } else { msg->mmap_msg = NULL; msg->end = NULL; @@ -187,6 +188,7 @@ nxt_go_ctx_add_msg(nxt_go_run_ctx_t *ctx, nxt_port_msg_t *port_msg, size_t size) if (ctx->msg_last == &ctx->msg) { msg->start_offset += ctx->r.body.preread_size; + } else { msg->start_offset += ctx->msg_last->data_size; } @@ -241,8 +243,7 @@ nxt_go_port_mmap_get_buf(nxt_go_run_ctx_t *ctx, size_t size) buf = &ctx->wbuf; - hdr = nxt_go_port_mmap_get(ctx->process, - ctx->msg.port_msg->reply_port, &c); + hdr = nxt_go_port_mmap_get(ctx->process, ctx->msg.port_msg->reply_port, &c); if (nxt_slow_path(hdr == NULL)) { nxt_go_warn("failed to get port_mmap"); @@ -324,9 +325,9 @@ nxt_go_port_mmap_increase_buf(nxt_buf_t *b, size_t size, size_t min_size) nchunks--; } - if (nchunks != 0 && - min_size > free_size + PORT_MMAP_CHUNK_SIZE * (c - start)) { - + if (nchunks != 0 + && min_size > free_size + PORT_MMAP_CHUNK_SIZE * (c - start)) + { c--; while (c >= start) { nxt_port_mmap_set_chunk_free(hdr, c); @@ -334,6 +335,7 @@ nxt_go_port_mmap_increase_buf(nxt_buf_t *b, size_t size, size_t min_size) } return NXT_ERROR; + } else { b->mem.end += PORT_MMAP_CHUNK_SIZE * (c - start); @@ -378,6 +380,7 @@ nxt_go_ctx_write(nxt_go_run_ctx_t *ctx, void *data, size_t len) return NXT_OK; } } + } while (nxt_go_port_mmap_increase_buf(buf, len, 1) == NXT_OK); if (ctx->nwbuf >= 8) { @@ -464,9 +467,9 @@ nxt_go_ctx_read_str(nxt_go_run_ctx_t *ctx, nxt_str_t *str) buf = &ctx->rbuf; - if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) < (intptr_t)length)) { + if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) < (intptr_t) length)) { nxt_go_warn("read str: used size too small %d < %d", - (int)nxt_buf_mem_used_size(&buf->mem), (int)length); + (int) nxt_buf_mem_used_size(&buf->mem), (int) length); return NXT_ERROR; } @@ -476,8 +479,9 @@ nxt_go_ctx_read_str(nxt_go_run_ctx_t *ctx, nxt_str_t *str) buf->mem.pos += length; - nxt_go_debug("read_str: %d %.*s", (int)length - 1, (int)length - 1, - str->start); + nxt_go_debug("read_str: %d %.*s", + (int) length - 1, (int) length - 1, str->start); + } else { str->start = NULL; str->length = 0; diff --git a/src/go/unit/port.go b/src/go/unit/port.go index a8faa2a0..74b5e8fe 100644 --- a/src/go/unit/port.go +++ b/src/go/unit/port.go @@ -124,7 +124,9 @@ func nxt_go_new_port(pid C.int, id C.int, t C.int, rcv C.int, snd C.int) { } //export nxt_go_port_send -func nxt_go_port_send(pid C.int, id C.int, buf unsafe.Pointer, buf_size C.int, oob unsafe.Pointer, oob_size C.int) C.int { +func nxt_go_port_send(pid C.int, id C.int, buf unsafe.Pointer, buf_size C.int, + oob unsafe.Pointer, oob_size C.int) C.int { + key := port_key{ pid: int(pid), id: int(id), @@ -133,7 +135,8 @@ func nxt_go_port_send(pid C.int, id C.int, buf unsafe.Pointer, buf_size C.int, o p := find_port(key) if p != nil { - n, oobn, err := p.snd.WriteMsgUnix(C.GoBytes(buf, buf_size), C.GoBytes(oob, oob_size), nil) + n, oobn, err := p.snd.WriteMsgUnix(C.GoBytes(buf, buf_size), + C.GoBytes(oob, oob_size), nil) if err != nil { fmt.Printf("write result %d (%d), %s\n", n, oobn, err) @@ -146,11 +149,14 @@ func nxt_go_port_send(pid C.int, id C.int, buf unsafe.Pointer, buf_size C.int, o } //export nxt_go_main_send -func nxt_go_main_send(buf unsafe.Pointer, buf_size C.int, oob unsafe.Pointer, oob_size C.int) C.int { +func nxt_go_main_send(buf unsafe.Pointer, buf_size C.int, oob unsafe.Pointer, + oob_size C.int) C.int { + p := main_port() if p != nil { - n, oobn, err := p.snd.WriteMsgUnix(C.GoBytes(buf, buf_size), C.GoBytes(oob, oob_size), nil) + n, oobn, err := p.snd.WriteMsgUnix(C.GoBytes(buf, buf_size), + C.GoBytes(oob, oob_size), nil) if err != nil { fmt.Printf("write result %d (%d), %s\n", n, oobn, err) @@ -168,7 +174,7 @@ func new_port(pid int, id int, t int, rcv int, snd int) *port { pid: pid, id: id, }, - t: t, + t: t, rcv: getUnixConn(rcv), snd: getUnixConn(snd), } diff --git a/src/go/unit/request.go b/src/go/unit/request.go index aa6d1145..75c06ef5 100644 --- a/src/go/unit/request.go +++ b/src/go/unit/request.go @@ -17,12 +17,12 @@ import ( ) type request struct { - req http.Request - resp *response - c_req C.nxt_go_request_t - id C.uint32_t - msgs []*cmsg - ch chan *cmsg + req http.Request + resp *response + c_req C.nxt_go_request_t + id C.uint32_t + msgs []*cmsg + ch chan *cmsg } func (r *request) Read(p []byte) (n int, err error) { @@ -33,7 +33,8 @@ func (r *request) Read(p []byte) (n int, err error) { if res == -2 /* NXT_AGAIN */ { m := <-r.ch - res = C.nxt_go_request_read_from(r.c_req, b, c, m.buf.b, m.buf.s) + res = C.nxt_go_request_read_from(r.c_req, b, c, m.buf.b, + m.buf.s) r.push(m) } @@ -124,7 +125,9 @@ func (r *request) push(m *cmsg) { } //export nxt_go_new_request -func nxt_go_new_request(c_req C.nxt_go_request_t, id C.uint32_t, c_method *C.nxt_go_str_t, c_uri *C.nxt_go_str_t) { +func nxt_go_new_request(c_req C.nxt_go_request_t, id C.uint32_t, + c_method *C.nxt_go_str_t, c_uri *C.nxt_go_str_t) { + uri := C.GoStringN(c_uri.start, c_uri.length) var URL *url.URL @@ -162,7 +165,9 @@ func nxt_go_find_request(id C.uint32_t) C.nxt_go_request_t { } //export nxt_go_request_set_proto -func nxt_go_request_set_proto(c_req C.nxt_go_request_t, proto *C.nxt_go_str_t, maj C.int, min C.int) { +func nxt_go_request_set_proto(c_req C.nxt_go_request_t, proto *C.nxt_go_str_t, + maj C.int, min C.int) { + r := find_request(c_req) r.req.Proto = C.GoStringN(proto.start, proto.length) r.req.ProtoMajor = int(maj) @@ -170,9 +175,12 @@ func nxt_go_request_set_proto(c_req C.nxt_go_request_t, proto *C.nxt_go_str_t, m } //export nxt_go_request_add_header -func nxt_go_request_add_header(c_req C.nxt_go_request_t, name *C.nxt_go_str_t, value *C.nxt_go_str_t) { +func nxt_go_request_add_header(c_req C.nxt_go_request_t, name *C.nxt_go_str_t, + value *C.nxt_go_str_t) { + r := find_request(c_req) - r.req.Header.Add(C.GoStringN(name.start, name.length), C.GoStringN(value.start, value.length)) + r.req.Header.Add(C.GoStringN(name.start, name.length), + C.GoStringN(value.start, value.length)) } //export nxt_go_request_set_content_length @@ -196,7 +204,9 @@ func nxt_go_request_set_url(c_req C.nxt_go_request_t, scheme *C.char) { } //export nxt_go_request_set_remote_addr -func nxt_go_request_set_remote_addr(c_req C.nxt_go_request_t, addr *C.nxt_go_str_t) { +func nxt_go_request_set_remote_addr(c_req C.nxt_go_request_t, + addr *C.nxt_go_str_t) { + find_request(c_req).req.RemoteAddr = C.GoStringN(addr.start, addr.length) } |