diff options
author | Igor Sysoev <igor@sysoev.ru> | 2017-01-17 20:00:00 +0300 |
---|---|---|
committer | Igor Sysoev <igor@sysoev.ru> | 2017-01-17 20:00:00 +0300 |
commit | 16cbf3c076a0aca6d47adaf3f719493674cf2363 (patch) | |
tree | e6530480020f62a2bdbf249988ec3e2a751d3927 /src/nxt_cache.c | |
download | unit-16cbf3c076a0aca6d47adaf3f719493674cf2363.tar.gz unit-16cbf3c076a0aca6d47adaf3f719493674cf2363.tar.bz2 |
Initial version.
Diffstat (limited to 'src/nxt_cache.c')
-rw-r--r-- | src/nxt_cache.c | 643 |
1 files changed, 643 insertions, 0 deletions
diff --git a/src/nxt_cache.c b/src/nxt_cache.c new file mode 100644 index 00000000..409ba301 --- /dev/null +++ b/src/nxt_cache.c @@ -0,0 +1,643 @@ + +/* + * Copyright (C) Igor Sysoev + * Copyright (C) NGINX, Inc. + */ + +#include <nxt_main.h> + + +/* A cache time resolution is 10ms. */ +#define \ +nxt_cache_time(thr) \ + (uint64_t) (nxt_thread_time(thr) * 100) + + +static nxt_int_t nxt_cache_lvlhsh_test(nxt_lvlhsh_query_t *lhq, void *data); +static nxt_work_handler_t nxt_cache_query_locked(nxt_cache_t *cache, + nxt_cache_query_t *q, nxt_lvlhsh_query_t *lhq); +static nxt_work_handler_t nxt_cache_node_hold(nxt_cache_t *cache, + nxt_cache_query_t *q, nxt_lvlhsh_query_t *lhq); +static nxt_work_handler_t nxt_cache_node_test(nxt_cache_t *cache, + nxt_cache_query_t *q); + +static void nxt_cache_wait_handler(nxt_thread_t *thr, void *obj, void *data); +static void nxt_cache_timeout_handler(nxt_thread_t *thr, void *obj, void *data); +static void nxt_cache_wake_handler(nxt_thread_t *thr, void *obj, void *data); +static ssize_t nxt_cache_release_locked(nxt_cache_t *cache, + nxt_cache_query_t *q, u_char *buf, size_t size); + +static nxt_cache_node_t *nxt_cache_node_alloc(nxt_cache_t *cache); +static void nxt_cache_node_free(nxt_cache_t *cache, nxt_cache_node_t *node, + nxt_bool_t fast); +static nxt_cache_query_wait_t *nxt_cache_query_wait_alloc(nxt_cache_t *cache, + nxt_bool_t *slow); +static void nxt_cache_query_wait_free(nxt_cache_t *cache, + nxt_cache_query_wait_t *qw); + + +/* STUB */ +nxt_int_t nxt_cache_shm_create(nxt_mem_zone_t *pool); +static void *nxt_cache_shm_alloc(void *data, size_t size, nxt_uint_t nalloc); +/**/ + + +nxt_int_t +nxt_cache_shm_create(nxt_mem_zone_t *mz) +{ + nxt_cache_t *cache; + + static const nxt_lvlhsh_proto_t proto nxt_aligned(64) = { + NXT_LVLHSH_LARGE_SLAB, + 0, + nxt_cache_lvlhsh_test, + (nxt_lvlhsh_alloc_t) nxt_cache_shm_alloc, + (nxt_lvlhsh_free_t) nxt_mem_zone_free, + }; + + cache = nxt_mem_zone_zalloc(mz, sizeof(nxt_cache_t)); + + if (cache == NULL) { + return NXT_ERROR; + } + + cache->proto = &proto; + cache->pool = mz; + + cache->start_time = nxt_cache_time(nxt_thread()); + + return NXT_OK; +} + + +static void * +nxt_cache_shm_alloc(void *data, size_t size, nxt_uint_t nalloc) +{ + return nxt_mem_zone_align(data, size, size); +} + + +void +nxt_cache_init(nxt_cache_t *cache) +{ + static const nxt_lvlhsh_proto_t proto nxt_aligned(64) = { + NXT_LVLHSH_LARGE_MEMALIGN, + 0, + nxt_cache_lvlhsh_test, + nxt_lvlhsh_alloc, + nxt_lvlhsh_free, + }; + + cache->proto = &proto; + + cache->start_time = nxt_cache_time(nxt_thread()); +} + + +static nxt_int_t +nxt_cache_lvlhsh_test(nxt_lvlhsh_query_t *lhq, void *data) +{ + nxt_cache_node_t *node; + + node = data; + + if (nxt_str_eq(&lhq->key, node->key_data, node->key_len)) { + return NXT_OK; + } + + return NXT_DECLINED; +} + + +nxt_inline void +nxt_cache_lock(nxt_cache_t *cache) +{ + if (cache->shared) { + nxt_thread_spin_lock(&cache->lock); + } +} + + +nxt_inline void +nxt_cache_unlock(nxt_cache_t *cache) +{ + if (cache->shared) { + nxt_thread_spin_unlock(&cache->lock); + } +} + + +void +nxt_cache_query(nxt_cache_t *cache, nxt_cache_query_t *q) +{ + nxt_thread_t *thr; + nxt_lvlhsh_query_t lhq; + nxt_work_handler_t handler; + + thr = nxt_thread(); + + if (cache != NULL) { + lhq.key_hash = nxt_murmur_hash2(q->key_data, q->key_len); + lhq.replace = 0; + lhq.key.len = q->key_len; + lhq.key.data = q->key_data; + lhq.proto = cache->proto; + lhq.pool = cache->pool; + + q->now = nxt_cache_time(thr); + + nxt_cache_lock(cache); + + handler = nxt_cache_query_locked(cache, q, &lhq); + + nxt_cache_unlock(cache); + + } else { + handler = q->state->nocache_handler; + } + + handler(thr, q, NULL); +} + + +static nxt_work_handler_t +nxt_cache_query_locked(nxt_cache_t *cache, nxt_cache_query_t *q, + nxt_lvlhsh_query_t *lhq) +{ + nxt_int_t ret; + nxt_time_t expiry; + nxt_cache_node_t *node; + nxt_cache_query_state_t *state; + + if (q->hold) { + return nxt_cache_node_hold(cache, q, lhq); + } + + ret = nxt_lvlhsh_find(&cache->lvlhsh, lhq); + + state = q->state; + + if (ret != NXT_OK) { + /* NXT_DECLINED */ + return state->nocache_handler; + } + + node = lhq->value; + node->count++; + q->node = node; + + expiry = cache->start_time + node->expiry; + + if (q->now < expiry) { + return state->ready_handler; + } + + q->stale = 1; + + return state->stale_handler; +} + + +static nxt_work_handler_t +nxt_cache_node_hold(nxt_cache_t *cache, nxt_cache_query_t *q, + nxt_lvlhsh_query_t *lhq) +{ + nxt_int_t ret; + nxt_bool_t slow; + nxt_cache_node_t *node, *sentinel; + nxt_work_handler_t handler; + nxt_cache_query_wait_t *qw; + nxt_cache_query_state_t *state; + + state = q->state; + sentinel = nxt_cache_node_alloc(cache); + + if (nxt_slow_path(sentinel == NULL)) { + return state->error_handler; + } + + sentinel->key_data = q->key_data; + sentinel->key_len = q->key_len; + lhq->value = sentinel; + + /* + * Try to insert an empty sentinel node to hold updating + * process if there is no existent cache node in cache. + */ + ret = nxt_lvlhsh_insert(&cache->lvlhsh, lhq); + + if (ret == NXT_OK) { + /* The sentinel node was successully added. */ + + q->node = sentinel; + sentinel->updating = 1; + return state->update_handler; + } + + nxt_cache_node_free(cache, sentinel, 1); + + if (ret == NXT_ERROR) { + return state->error_handler; + } + + /* NXT_DECLINED: a cache node exists. */ + + node = lhq->value; + node->count++; + q->node = node; + + handler = nxt_cache_node_test(cache, q); + if (handler != NULL) { + return handler; + } + + /* Add the node to a wait queue. */ + + qw = nxt_cache_query_wait_alloc(cache, &slow); + if (nxt_slow_path(qw == NULL)) { + return state->error_handler; + } + + if (slow) { + /* The node state may have been changed during slow allocation. */ + + handler = nxt_cache_node_test(cache, q); + if (handler != NULL) { + nxt_cache_query_wait_free(cache, qw); + return handler; + } + } + + qw->query = q; + qw->next = node->waiting; + qw->busy = 0; + qw->deleted = 0; + qw->pid = nxt_pid; + qw->engine = nxt_thread_event_engine(); + qw->handler = nxt_cache_wake_handler; + qw->cache = cache; + + node->waiting = qw; + + return nxt_cache_wait_handler; +} + + +static nxt_work_handler_t +nxt_cache_node_test(nxt_cache_t *cache, nxt_cache_query_t *q) +{ + nxt_time_t expiry; + nxt_cache_node_t *node; + nxt_cache_query_state_t *state; + + q->stale = 0; + state = q->state; + node = q->node; + + expiry = cache->start_time + node->expiry; + + if (q->now < expiry) { + return state->ready_handler; + } + + /* + * A valid stale or empty sentinel cache node. + * The sentinel node can be only in updating state. + */ + + if (node->updating) { + + if (node->expiry != 0) { + /* A valid stale cache node. */ + + q->stale = 1; + + if (q->use_stale) { + return state->stale_handler; + } + } + + /* A sentinel node. */ + return NULL; + } + + /* A valid stale cache node is not being updated now. */ + + q->stale = 1; + + if (q->use_stale) { + + if (q->update_stale) { + node->updating = 1; + return state->update_stale_handler; + } + + return state->stale_handler; + } + + node->updating = 1; + return state->update_handler; +} + + +static void +nxt_cache_wait_handler(nxt_thread_t *thr, void *obj, void *data) +{ + nxt_event_timer_t *ev; + nxt_cache_query_t *cq; + + cq = obj; + + if (cq->timeout != 0) { + + ev = &cq->timer; + + if (ev->state == NXT_EVENT_TIMER_DISABLED) { + ev->handler = nxt_cache_timeout_handler; + nxt_event_timer_ident(ev, -1); + + nxt_event_timer_add(thr->engine, ev, cq->timeout); + } + } +} + + +static void +nxt_cache_timeout_handler(nxt_thread_t *thr, void *obj, void *data) +{ + nxt_cache_query_t *cq; + nxt_event_timer_t *ev; + + ev = obj; + + cq = nxt_event_timer_data(ev, nxt_cache_query_t, timer); + + cq->state->timeout_handler(thr, cq, NULL); +} + + +static void +nxt_cache_wake_handler(nxt_thread_t *thr, void *obj, void *data) +{ + nxt_cache_t *cache; + nxt_work_handler_t handler; + nxt_cache_query_t *q; + nxt_cache_query_wait_t *qw; + + qw = obj; + q = qw->query; + cache = qw->cache; + + nxt_cache_lock(cache); + + handler = nxt_cache_node_test(cache, q); + + if (handler != NULL) { + nxt_cache_query_wait_free(cache, qw); + + } else { + /* Wait again. */ + qw->next = q->node->waiting; + q->node->waiting = qw; + } + + nxt_cache_unlock(cache); + + handler(thr, q, NULL); +} + + +nxt_int_t +nxt_cache_update(nxt_cache_t *cache, nxt_cache_query_t *q) +{ + nxt_int_t ret; + nxt_cache_node_t *node; + nxt_lvlhsh_query_t lhq; + + node = q->node; + + node->accessed = nxt_cache_time(nxt_thread()) - cache->start_time; + + node->updating = 0; + node->count = 1; + + lhq.key_hash = nxt_murmur_hash2(node->key_data, node->key_len); + lhq.replace = 1; + lhq.key.len = node->key_len; + lhq.key.data = node->key_data; + lhq.value = node; + lhq.proto = cache->proto; + lhq.pool = cache->pool; + + nxt_cache_lock(cache); + + ret = nxt_lvlhsh_insert(&cache->lvlhsh, &lhq); + + if (nxt_fast_path(ret != NXT_OK)) { + + nxt_queue_insert_head(&cache->expiry_queue, &node->link); + + node = lhq.value; + + if (node != NULL) { + /* A replaced node. */ + + nxt_queue_remove(&node->link); + + if (node->count != 0) { + node->deleted = 1; + + } else { + // delete cache node + } + } + } + + nxt_cache_unlock(cache); + + return ret; +} + + +void +nxt_cache_release(nxt_cache_t *cache, nxt_cache_query_t *q) +{ + u_char *p, *data; + size_t size; + ssize_t ret; + nxt_thread_t *thr; + u_char buf[1024]; + + thr = nxt_thread(); + q->now = nxt_cache_time(thr); + + p = buf; + size = sizeof(buf); + + for ( ;; ) { + nxt_cache_lock(cache); + + ret = nxt_cache_release_locked(cache, q, p, size); + + nxt_cache_unlock(cache); + + if (ret == 0) { + return; + } + + size = nxt_abs(ret); + + data = nxt_malloc(size); + + if (data == NULL) { + /* TODO: retry */ + return; + } + + if (ret < 0) { + p = data; + continue; + } + + if (p != data) { + nxt_memcpy(data, p, size); + } + + nxt_thread_work_queue_add(thr, &thr->work_queue.main, + cache->delete_handler, data, NULL, thr->log); + } +} + + +static ssize_t +nxt_cache_release_locked(nxt_cache_t *cache, nxt_cache_query_t *q, + u_char *buf, size_t size) +{ + ssize_t ret; + nxt_cache_node_t *node; + + node = q->node; + node->count--; + + if (node->count != 0) { + return 0; + } + + if (!node->deleted) { + /* + * A cache node is locked whilst its count is non zero. + * To minimize number of operations the node's place in expiry + * queue can be updated only if the node is not currently used. + */ + node->accessed = q->now - cache->start_time; + + nxt_queue_remove(&node->link); + nxt_queue_insert_head(&cache->expiry_queue, &node->link); + + return 0; + } + + ret = 0; +#if 0 + + ret = cache->delete_copy(cache, node, buf, size); + + if (ret < 0) { + return ret; + } + +#endif + + nxt_cache_node_free(cache, node, 0); + + return ret; +} + + +static nxt_cache_node_t * +nxt_cache_node_alloc(nxt_cache_t *cache) +{ + nxt_queue_link_t *link; + nxt_cache_node_t *node; + + link = nxt_queue_first(&cache->free_nodes); + + if (nxt_fast_path(link != nxt_queue_tail(&cache->free_nodes))) { + cache->nfree_nodes--; + nxt_queue_remove(link); + + node = nxt_queue_link_data(link, nxt_cache_node_t, link); + nxt_memzero(node, sizeof(nxt_cache_node_t)); + + return node; + } + + nxt_cache_unlock(cache); + + node = cache->alloc(cache->data, sizeof(nxt_cache_node_t)); + + nxt_cache_lock(cache); + + return node; +} + + +static void +nxt_cache_node_free(nxt_cache_t *cache, nxt_cache_node_t *node, nxt_bool_t fast) +{ + if (fast || cache->nfree_nodes < 32) { + nxt_queue_insert_head(&cache->free_nodes, &node->link); + cache->nfree_nodes++; + return; + } + + nxt_cache_unlock(cache); + + cache->free(cache->data, node); + + nxt_cache_lock(cache); +} + + +static nxt_cache_query_wait_t * +nxt_cache_query_wait_alloc(nxt_cache_t *cache, nxt_bool_t *slow) +{ + nxt_cache_query_wait_t *qw; + + qw = cache->free_query_wait; + + if (nxt_fast_path(qw != NULL)) { + cache->free_query_wait = qw->next; + cache->nfree_query_wait--; + + *slow = 0; + return qw; + } + + nxt_cache_unlock(cache); + + qw = cache->alloc(cache->data, sizeof(nxt_cache_query_wait_t)); + *slow = 1; + + nxt_cache_lock(cache); + + return qw; +} + + +static void +nxt_cache_query_wait_free(nxt_cache_t *cache, nxt_cache_query_wait_t *qw) +{ + if (cache->nfree_query_wait < 32) { + qw->next = cache->free_query_wait; + cache->free_query_wait = qw; + cache->nfree_query_wait++; + return; + } + + nxt_cache_unlock(cache); + + cache->free(cache->data, qw); + + nxt_cache_lock(cache); +} |