diff options
author | Igor Sysoev <igor@sysoev.ru> | 2018-04-03 16:28:26 +0300 |
---|---|---|
committer | Igor Sysoev <igor@sysoev.ru> | 2018-04-03 16:28:26 +0300 |
commit | fa04c05aa0e67e6964ab006935f939c6b27754a3 (patch) | |
tree | 969d3dac0bda996a487dc6e58e3b1e2ef7f91946 | |
parent | 151160c1103166810daddf1e9cc7135cb6ccc321 (diff) | |
download | unit-fa04c05aa0e67e6964ab006935f939c6b27754a3.tar.gz unit-fa04c05aa0e67e6964ab006935f939c6b27754a3.tar.bz2 |
HTTP: using r->mem_pool retention counter for response buffers.
-rw-r--r-- | src/nxt_buf.c | 2 | ||||
-rw-r--r-- | src/nxt_buf.h | 2 | ||||
-rw-r--r-- | src/nxt_h1proto.c | 56 | ||||
-rw-r--r-- | src/nxt_http.h | 13 | ||||
-rw-r--r-- | src/nxt_http_error.c | 29 | ||||
-rw-r--r-- | src/nxt_http_request.c | 69 | ||||
-rw-r--r-- | src/nxt_router.c | 17 | ||||
-rw-r--r-- | src/nxt_sendbuf.c | 15 | ||||
-rw-r--r-- | src/nxt_sendbuf.h | 1 |
9 files changed, 154 insertions, 50 deletions
diff --git a/src/nxt_buf.c b/src/nxt_buf.c index c764fecd..e2f1f047 100644 --- a/src/nxt_buf.c +++ b/src/nxt_buf.c @@ -144,7 +144,7 @@ nxt_buf_sync_alloc(nxt_mp_t *mp, nxt_uint_t flags) { nxt_buf_t *b; - b = nxt_mp_zalloc(mp, NXT_BUF_SYNC_SIZE); + b = nxt_mp_zalloc(mp, NXT_BUF_MEM_SIZE); if (nxt_fast_path(b != NULL)) { b->data = mp; diff --git a/src/nxt_buf.h b/src/nxt_buf.h index 7a894d0f..d9d4ee1b 100644 --- a/src/nxt_buf.h +++ b/src/nxt_buf.h @@ -101,8 +101,8 @@ struct nxt_buf_s { }; +#define NXT_BUF_SYNC_SIZE offsetof(nxt_buf_t, mem.free) #define NXT_BUF_MEM_SIZE offsetof(nxt_buf_t, file) -#define NXT_BUF_SYNC_SIZE NXT_BUF_MEM_SIZE #define NXT_BUF_FILE_SIZE sizeof(nxt_buf_t) #define NXT_BUF_MMAP_SIZE NXT_BUF_FILE_SIZE #define NXT_BUF_PORT_MMAP_SIZE NXT_BUF_MEM_SIZE diff --git a/src/nxt_h1proto.c b/src/nxt_h1proto.c index a216dafb..e4993f79 100644 --- a/src/nxt_h1proto.c +++ b/src/nxt_h1proto.c @@ -21,9 +21,12 @@ static void nxt_h1p_request_header_send(nxt_task_t *task, nxt_http_request_t *r); static void nxt_h1p_request_send(nxt_task_t *task, nxt_http_request_t *r, nxt_buf_t *out); +nxt_inline void nxt_h1p_request_error(nxt_task_t *task, nxt_http_request_t *r); static nxt_buf_t *nxt_h1p_chunk_create(nxt_task_t *task, nxt_http_request_t *r, nxt_buf_t *out); static void nxt_h1p_sent(nxt_task_t *task, void *obj, void *data); +static void nxt_h1p_request_discard(nxt_task_t *task, nxt_http_request_t *r, + nxt_buf_t *last); static void nxt_h1p_request_close(nxt_task_t *task, nxt_http_proto_t proto); static void nxt_h1p_keepalive(nxt_task_t *task, nxt_h1proto_t *h1p, nxt_conn_t *c); @@ -68,6 +71,13 @@ const nxt_http_proto_send_t nxt_http_proto_send[3] = { }; +const nxt_http_proto_discard_t nxt_http_proto_discard[3] = { + nxt_h1p_request_discard, + NULL, + NULL, +}; + + const nxt_http_proto_close_t nxt_http_proto_close[3] = { nxt_h1p_request_close, NULL, @@ -687,11 +697,9 @@ nxt_h1p_request_header_send(nxt_task_t *task, nxt_http_request_t *r) } nxt_list_loop; - header = nxt_buf_mem_alloc(r->mem_pool, size, 0); + header = nxt_http_buf_mem(task, r, size); if (nxt_slow_path(header == NULL)) { - /* The internal server error is set just for logging. */ - r->status = NXT_HTTP_INTERNAL_SERVER_ERROR; - nxt_h1p_conn_close(task, h1p->conn, h1p); + nxt_h1p_request_error(task, r); return; } @@ -738,6 +746,13 @@ nxt_h1p_request_header_send(nxt_task_t *task, nxt_http_request_t *r) } +nxt_inline void +nxt_h1p_request_error(nxt_task_t *task, nxt_http_request_t *r) +{ + r->state->error_handler(task, r, r->proto.h1); +} + + static const nxt_conn_state_t nxt_h1p_send_state nxt_aligned(64) = { @@ -764,7 +779,7 @@ nxt_h1p_request_send(nxt_task_t *task, nxt_http_request_t *r, nxt_buf_t *out) if (r->proto.h1->chunked) { out = nxt_h1p_chunk_create(task, r, out); if (nxt_slow_path(out == NULL)) { - nxt_h1p_conn_error(task, c, c->socket.data); + nxt_h1p_request_error(task, r); return; } } @@ -796,7 +811,7 @@ nxt_h1p_chunk_create(nxt_task_t *task, nxt_http_request_t *r, nxt_buf_t *out) for (b = out; b != NULL; b = b->next) { if (nxt_buf_is_last(b)) { - tail = nxt_buf_mem_alloc(r->mem_pool, chunk_size, 0); + tail = nxt_http_buf_mem(task, r, chunk_size); if (nxt_slow_path(tail == NULL)) { return NULL; } @@ -821,7 +836,7 @@ nxt_h1p_chunk_create(nxt_task_t *task, nxt_http_request_t *r, nxt_buf_t *out) return out; } - header = nxt_buf_mem_alloc(r->mem_pool, chunk_size, 0); + header = nxt_http_buf_mem(task, r, chunk_size); if (nxt_slow_path(header == NULL)) { return NULL; } @@ -854,6 +869,31 @@ nxt_h1p_sent(nxt_task_t *task, void *obj, void *data) static void +nxt_h1p_request_discard(nxt_task_t *task, nxt_http_request_t *r, + nxt_buf_t *last) +{ + nxt_buf_t *b; + nxt_conn_t *c; + nxt_h1proto_t *h1p; + nxt_work_queue_t *wq; + + nxt_debug(task, "h1p request discard"); + + h1p = r->proto.h1; + h1p->keepalive = 0; + + c = h1p->conn; + b = c->write; + c->write = NULL; + + wq = &task->thread->engine->fast_work_queue; + + nxt_sendbuf_drain(task, wq, b); + nxt_sendbuf_drain(task, wq, last); +} + + +static void nxt_h1p_request_close(nxt_task_t *task, nxt_http_proto_t proto) { nxt_conn_t *c; @@ -967,7 +1007,7 @@ nxt_h1p_conn_close(nxt_task_t *task, void *obj, void *data) r = h1p->request; if (r != NULL) { - r->state->error_handler(task, r, r->proto.h1); + nxt_h1p_request_error(task, r); return; } } diff --git a/src/nxt_http.h b/src/nxt_http.h index 604c2883..be751994 100644 --- a/src/nxt_http.h +++ b/src/nxt_http.h @@ -122,6 +122,8 @@ struct nxt_http_request_s { nxt_sockaddr_t *remote; nxt_sockaddr_t *local; + nxt_buf_t *last; + nxt_http_response_t resp; nxt_http_status_t status:16; @@ -129,6 +131,7 @@ struct nxt_http_request_s { uint8_t protocol; /* 2 bits */ uint8_t logged; /* 1 bit */ uint8_t header_sent; /* 1 bit */ + uint8_t error; /* 1 bit */ }; @@ -140,6 +143,8 @@ typedef void (*nxt_http_proto_header_send_t)(nxt_task_t *task, nxt_http_request_t *r); typedef void (*nxt_http_proto_send_t)(nxt_task_t *task, nxt_http_request_t *r, nxt_buf_t *out); +typedef void (*nxt_http_proto_discard_t)(nxt_task_t *task, + nxt_http_request_t *r, nxt_buf_t *last); typedef void (*nxt_http_proto_close_t)(nxt_task_t *task, nxt_http_proto_t proto); @@ -157,9 +162,10 @@ void nxt_http_request_local_addr(nxt_task_t *task, nxt_http_request_t *r); void nxt_http_request_header_send(nxt_task_t *task, nxt_http_request_t *r); void nxt_http_request_send(nxt_task_t *task, nxt_http_request_t *r, nxt_buf_t *out); -void nxt_http_request_release(nxt_task_t *task, nxt_http_request_t *r); -nxt_buf_t *nxt_http_request_last_buffer(nxt_task_t *task, - nxt_http_request_t *r); +nxt_buf_t *nxt_http_buf_mem(nxt_task_t *task, nxt_http_request_t *r, + size_t size); +nxt_buf_t *nxt_http_buf_last(nxt_http_request_t *r); +void nxt_http_request_error_handler(nxt_task_t *task, void *obj, void *data); void nxt_http_request_close_handler(nxt_task_t *task, void *obj, void *data); nxt_int_t nxt_http_request_host(void *ctx, nxt_http_field_t *field, @@ -177,6 +183,7 @@ extern const nxt_http_proto_body_read_t nxt_http_proto_body_read[]; extern const nxt_http_proto_local_addr_t nxt_http_proto_local_addr[]; extern const nxt_http_proto_header_send_t nxt_http_proto_header_send[]; extern const nxt_http_proto_send_t nxt_http_proto_send[]; +extern const nxt_http_proto_discard_t nxt_http_proto_discard[]; extern const nxt_http_proto_close_t nxt_http_proto_close[]; diff --git a/src/nxt_http_error.c b/src/nxt_http_error.c index 5030264b..8b10f508 100644 --- a/src/nxt_http_error.c +++ b/src/nxt_http_error.c @@ -12,7 +12,7 @@ static void nxt_http_request_send_error_body(nxt_task_t *task, void *r, void *data); -static const nxt_http_request_state_t nxt_http_request_send_state; +static const nxt_http_request_state_t nxt_http_request_send_error_body_state; static const char error[] = @@ -28,10 +28,12 @@ nxt_http_request_error(nxt_task_t *task, nxt_http_request_t *r, nxt_debug(task, "http request error: %d", status); - if (r->header_sent) { + if (r->header_sent || r->error) { goto fail; } + r->error = (status == NXT_HTTP_INTERNAL_SERVER_ERROR); + r->status = status; r->resp.fields = nxt_list_create(r->mem_pool, 8, sizeof(nxt_http_field_t)); @@ -49,36 +51,36 @@ nxt_http_request_error(nxt_task_t *task, nxt_http_request_t *r, r->resp.content_length = NULL; r->resp.content_length_n = sizeof(error) - 1; - r->state = &nxt_http_request_send_state; + r->state = &nxt_http_request_send_error_body_state; nxt_http_request_header_send(task, r); return; fail: - nxt_http_request_release(task, r); + nxt_http_request_error_handler(task, r, r->proto.any); } -static const nxt_http_request_state_t nxt_http_request_send_state +static const nxt_http_request_state_t nxt_http_request_send_error_body_state nxt_aligned(64) = { .ready_handler = nxt_http_request_send_error_body, - .error_handler = nxt_http_request_close_handler, + .error_handler = nxt_http_request_error_handler, }; static void nxt_http_request_send_error_body(nxt_task_t *task, void *obj, void *data) { - nxt_buf_t *out, *last; + nxt_buf_t *out; nxt_http_request_t *r; r = obj; nxt_debug(task, "http request send error body"); - out = nxt_buf_mem_alloc(r->mem_pool, 0, 0); + out = nxt_http_buf_mem(task, r, 0); if (nxt_slow_path(out == NULL)) { goto fail; } @@ -88,18 +90,13 @@ nxt_http_request_send_error_body(nxt_task_t *task, void *obj, void *data) out->mem.free = out->mem.start + sizeof(error) - 1; out->mem.end = out->mem.free; - last = nxt_http_request_last_buffer(task, r); - if (nxt_slow_path(last == NULL)) { - goto fail; - } - - out->next = last; + out->next = nxt_http_buf_last(r); nxt_http_request_send(task, r, out); return; fail: - // TODO - nxt_http_request_release(task, r); + + nxt_http_request_error_handler(task, r, r->proto.any); } diff --git a/src/nxt_http_request.c b/src/nxt_http_request.c index 40cbaf40..b1d52dd5 100644 --- a/src/nxt_http_request.c +++ b/src/nxt_http_request.c @@ -10,6 +10,8 @@ static void nxt_http_request_start(nxt_task_t *task, void *obj, void *data); static void nxt_http_app_request(nxt_task_t *task, void *obj, void *data); +static void nxt_http_request_mem_buf_completion(nxt_task_t *task, void *obj, + void *data); static void nxt_http_request_done(nxt_task_t *task, void *obj, void *data); static u_char *nxt_http_date(u_char *buf, nxt_realtime_t *now, struct tm *tm, @@ -82,6 +84,7 @@ nxt_http_request_t * nxt_http_request_create(nxt_task_t *task) { nxt_mp_t *mp; + nxt_buf_t *last; nxt_http_request_t *r; mp = nxt_mp_create(1024, 128, 256, 32); @@ -99,6 +102,17 @@ nxt_http_request_create(nxt_task_t *task) goto fail; } + last = nxt_mp_zget(mp, NXT_BUF_SYNC_SIZE); + if (nxt_slow_path(last == NULL)) { + goto fail; + } + + nxt_buf_set_sync(last); + nxt_buf_set_last(last); + last->completion_handler = nxt_http_request_done; + last->parent = r; + r->last = last; + r->mem_pool = mp; r->content_length_n = -1; r->resp.content_length_n = -1; @@ -109,6 +123,7 @@ nxt_http_request_create(nxt_task_t *task) fail: nxt_mp_release(mp); + return NULL; } @@ -349,20 +364,18 @@ nxt_http_request_send(nxt_task_t *task, nxt_http_request_t *r, nxt_buf_t *out) nxt_buf_t * -nxt_http_request_last_buffer(nxt_task_t *task, nxt_http_request_t *r) +nxt_http_buf_mem(nxt_task_t *task, nxt_http_request_t *r, size_t size) { nxt_buf_t *b; - b = nxt_buf_mem_alloc(r->mem_pool, 0, 0); - + b = nxt_buf_mem_alloc(r->mem_pool, size, 0); if (nxt_fast_path(b != NULL)) { - nxt_buf_set_sync(b); - nxt_buf_set_last(b); - b->completion_handler = nxt_http_request_done; + b->completion_handler = nxt_http_request_mem_buf_completion; b->parent = r; + nxt_mp_retain(r->mem_pool); } else { - nxt_http_request_release(task, r); + nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); } return b; @@ -370,6 +383,33 @@ nxt_http_request_last_buffer(nxt_task_t *task, nxt_http_request_t *r) static void +nxt_http_request_mem_buf_completion(nxt_task_t *task, void *obj, void *data) +{ + nxt_buf_t *b; + nxt_http_request_t *r; + + b = obj; + r = data; + + nxt_mp_free(r->mem_pool, b); + + nxt_mp_release(r->mem_pool); +} + + +nxt_buf_t * +nxt_http_buf_last(nxt_http_request_t *r) +{ + nxt_buf_t *last; + + last = r->last; + r->last = NULL; + + return last; +} + + +static void nxt_http_request_done(nxt_task_t *task, void *obj, void *data) { nxt_http_request_t *r; @@ -383,12 +423,19 @@ nxt_http_request_done(nxt_task_t *task, void *obj, void *data) void -nxt_http_request_release(nxt_task_t *task, nxt_http_request_t *r) +nxt_http_request_error_handler(nxt_task_t *task, void *obj, void *data) { - nxt_debug(task, "http request release"); + nxt_http_proto_t proto; + nxt_http_request_t *r; + + r = obj; + proto.any = data; + + nxt_debug(task, "http request error handler"); - nxt_work_queue_add(&task->thread->engine->fast_work_queue, - nxt_http_request_close_handler, task, r, r->proto.any); + if (proto.any != NULL) { + nxt_http_proto_discard[r->protocol](task, r, nxt_http_buf_last(r)); + } } diff --git a/src/nxt_router.c b/src/nxt_router.c index 57e3cff1..994ba4af 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -2636,7 +2636,7 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, { size_t dump_size; nxt_int_t ret; - nxt_buf_t *b, *last; + nxt_buf_t *b; nxt_http_request_t *r; nxt_req_conn_link_t *rc; nxt_app_parse_ctx_t *ar; @@ -2663,17 +2663,16 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, return; } + if (ar->request->error) { + nxt_app_http_req_done(task, ar); + nxt_router_rc_unlink(task, rc); + return; + } + if (msg->port_msg.last != 0) { nxt_debug(task, "router data create last buf"); - last = nxt_http_request_last_buffer(task, ar->request); - if (nxt_slow_path(last == NULL)) { - nxt_app_http_req_done(task, ar); - nxt_router_rc_unlink(task, rc); - return; - } - - nxt_buf_chain_add(&b, last); + nxt_buf_chain_add(&b, nxt_http_buf_last(ar->request)); nxt_router_rc_unlink(task, rc); diff --git a/src/nxt_sendbuf.c b/src/nxt_sendbuf.c index ed6fca36..16fe4724 100644 --- a/src/nxt_sendbuf.c +++ b/src/nxt_sendbuf.c @@ -382,7 +382,7 @@ nxt_sendbuf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b) nxt_prefetch(b->next); - if (nxt_buf_used_size(b) != 0) { + if (!nxt_buf_is_sync(b) && nxt_buf_used_size(b) != 0) { break; } @@ -393,3 +393,16 @@ nxt_sendbuf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b) return b; } + + +void +nxt_sendbuf_drain(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b) +{ + while (b != NULL) { + nxt_prefetch(b->next); + + nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent); + + b = b->next; + } +} diff --git a/src/nxt_sendbuf.h b/src/nxt_sendbuf.h index 46d2a0fe..b76ee06a 100644 --- a/src/nxt_sendbuf.h +++ b/src/nxt_sendbuf.h @@ -124,6 +124,7 @@ ssize_t nxt_sendbuf_copy_coalesce(nxt_conn_t *c, nxt_buf_mem_t *bm, nxt_buf_t *nxt_sendbuf_update(nxt_buf_t *b, size_t sent); nxt_buf_t *nxt_sendbuf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b); +void nxt_sendbuf_drain(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b); #endif /* _NXT_SENDBUF_H_INCLUDED_ */ |