summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_cache.c
diff options
context:
space:
mode:
authorIgor Sysoev <igor@sysoev.ru>2017-01-17 20:00:00 +0300
committerIgor Sysoev <igor@sysoev.ru>2017-01-17 20:00:00 +0300
commit16cbf3c076a0aca6d47adaf3f719493674cf2363 (patch)
treee6530480020f62a2bdbf249988ec3e2a751d3927 /src/nxt_cache.c
downloadunit-16cbf3c076a0aca6d47adaf3f719493674cf2363.tar.gz
unit-16cbf3c076a0aca6d47adaf3f719493674cf2363.tar.bz2
Initial version.
Diffstat (limited to 'src/nxt_cache.c')
-rw-r--r--src/nxt_cache.c643
1 files changed, 643 insertions, 0 deletions
diff --git a/src/nxt_cache.c b/src/nxt_cache.c
new file mode 100644
index 00000000..409ba301
--- /dev/null
+++ b/src/nxt_cache.c
@@ -0,0 +1,643 @@
+
+/*
+ * Copyright (C) Igor Sysoev
+ * Copyright (C) NGINX, Inc.
+ */
+
+#include <nxt_main.h>
+
+
+/* A cache time resolution is 10ms. */
+#define \
+nxt_cache_time(thr) \
+ (uint64_t) (nxt_thread_time(thr) * 100)
+
+
+static nxt_int_t nxt_cache_lvlhsh_test(nxt_lvlhsh_query_t *lhq, void *data);
+static nxt_work_handler_t nxt_cache_query_locked(nxt_cache_t *cache,
+ nxt_cache_query_t *q, nxt_lvlhsh_query_t *lhq);
+static nxt_work_handler_t nxt_cache_node_hold(nxt_cache_t *cache,
+ nxt_cache_query_t *q, nxt_lvlhsh_query_t *lhq);
+static nxt_work_handler_t nxt_cache_node_test(nxt_cache_t *cache,
+ nxt_cache_query_t *q);
+
+static void nxt_cache_wait_handler(nxt_thread_t *thr, void *obj, void *data);
+static void nxt_cache_timeout_handler(nxt_thread_t *thr, void *obj, void *data);
+static void nxt_cache_wake_handler(nxt_thread_t *thr, void *obj, void *data);
+static ssize_t nxt_cache_release_locked(nxt_cache_t *cache,
+ nxt_cache_query_t *q, u_char *buf, size_t size);
+
+static nxt_cache_node_t *nxt_cache_node_alloc(nxt_cache_t *cache);
+static void nxt_cache_node_free(nxt_cache_t *cache, nxt_cache_node_t *node,
+ nxt_bool_t fast);
+static nxt_cache_query_wait_t *nxt_cache_query_wait_alloc(nxt_cache_t *cache,
+ nxt_bool_t *slow);
+static void nxt_cache_query_wait_free(nxt_cache_t *cache,
+ nxt_cache_query_wait_t *qw);
+
+
+/* STUB */
+nxt_int_t nxt_cache_shm_create(nxt_mem_zone_t *pool);
+static void *nxt_cache_shm_alloc(void *data, size_t size, nxt_uint_t nalloc);
+/**/
+
+
+nxt_int_t
+nxt_cache_shm_create(nxt_mem_zone_t *mz)
+{
+ nxt_cache_t *cache;
+
+ static const nxt_lvlhsh_proto_t proto nxt_aligned(64) = {
+ NXT_LVLHSH_LARGE_SLAB,
+ 0,
+ nxt_cache_lvlhsh_test,
+ (nxt_lvlhsh_alloc_t) nxt_cache_shm_alloc,
+ (nxt_lvlhsh_free_t) nxt_mem_zone_free,
+ };
+
+ cache = nxt_mem_zone_zalloc(mz, sizeof(nxt_cache_t));
+
+ if (cache == NULL) {
+ return NXT_ERROR;
+ }
+
+ cache->proto = &proto;
+ cache->pool = mz;
+
+ cache->start_time = nxt_cache_time(nxt_thread());
+
+ return NXT_OK;
+}
+
+
+static void *
+nxt_cache_shm_alloc(void *data, size_t size, nxt_uint_t nalloc)
+{
+ return nxt_mem_zone_align(data, size, size);
+}
+
+
+void
+nxt_cache_init(nxt_cache_t *cache)
+{
+ static const nxt_lvlhsh_proto_t proto nxt_aligned(64) = {
+ NXT_LVLHSH_LARGE_MEMALIGN,
+ 0,
+ nxt_cache_lvlhsh_test,
+ nxt_lvlhsh_alloc,
+ nxt_lvlhsh_free,
+ };
+
+ cache->proto = &proto;
+
+ cache->start_time = nxt_cache_time(nxt_thread());
+}
+
+
+static nxt_int_t
+nxt_cache_lvlhsh_test(nxt_lvlhsh_query_t *lhq, void *data)
+{
+ nxt_cache_node_t *node;
+
+ node = data;
+
+ if (nxt_str_eq(&lhq->key, node->key_data, node->key_len)) {
+ return NXT_OK;
+ }
+
+ return NXT_DECLINED;
+}
+
+
+nxt_inline void
+nxt_cache_lock(nxt_cache_t *cache)
+{
+ if (cache->shared) {
+ nxt_thread_spin_lock(&cache->lock);
+ }
+}
+
+
+nxt_inline void
+nxt_cache_unlock(nxt_cache_t *cache)
+{
+ if (cache->shared) {
+ nxt_thread_spin_unlock(&cache->lock);
+ }
+}
+
+
+void
+nxt_cache_query(nxt_cache_t *cache, nxt_cache_query_t *q)
+{
+ nxt_thread_t *thr;
+ nxt_lvlhsh_query_t lhq;
+ nxt_work_handler_t handler;
+
+ thr = nxt_thread();
+
+ if (cache != NULL) {
+ lhq.key_hash = nxt_murmur_hash2(q->key_data, q->key_len);
+ lhq.replace = 0;
+ lhq.key.len = q->key_len;
+ lhq.key.data = q->key_data;
+ lhq.proto = cache->proto;
+ lhq.pool = cache->pool;
+
+ q->now = nxt_cache_time(thr);
+
+ nxt_cache_lock(cache);
+
+ handler = nxt_cache_query_locked(cache, q, &lhq);
+
+ nxt_cache_unlock(cache);
+
+ } else {
+ handler = q->state->nocache_handler;
+ }
+
+ handler(thr, q, NULL);
+}
+
+
+static nxt_work_handler_t
+nxt_cache_query_locked(nxt_cache_t *cache, nxt_cache_query_t *q,
+ nxt_lvlhsh_query_t *lhq)
+{
+ nxt_int_t ret;
+ nxt_time_t expiry;
+ nxt_cache_node_t *node;
+ nxt_cache_query_state_t *state;
+
+ if (q->hold) {
+ return nxt_cache_node_hold(cache, q, lhq);
+ }
+
+ ret = nxt_lvlhsh_find(&cache->lvlhsh, lhq);
+
+ state = q->state;
+
+ if (ret != NXT_OK) {
+ /* NXT_DECLINED */
+ return state->nocache_handler;
+ }
+
+ node = lhq->value;
+ node->count++;
+ q->node = node;
+
+ expiry = cache->start_time + node->expiry;
+
+ if (q->now < expiry) {
+ return state->ready_handler;
+ }
+
+ q->stale = 1;
+
+ return state->stale_handler;
+}
+
+
+static nxt_work_handler_t
+nxt_cache_node_hold(nxt_cache_t *cache, nxt_cache_query_t *q,
+ nxt_lvlhsh_query_t *lhq)
+{
+ nxt_int_t ret;
+ nxt_bool_t slow;
+ nxt_cache_node_t *node, *sentinel;
+ nxt_work_handler_t handler;
+ nxt_cache_query_wait_t *qw;
+ nxt_cache_query_state_t *state;
+
+ state = q->state;
+ sentinel = nxt_cache_node_alloc(cache);
+
+ if (nxt_slow_path(sentinel == NULL)) {
+ return state->error_handler;
+ }
+
+ sentinel->key_data = q->key_data;
+ sentinel->key_len = q->key_len;
+ lhq->value = sentinel;
+
+ /*
+ * Try to insert an empty sentinel node to hold updating
+ * process if there is no existent cache node in cache.
+ */
+ ret = nxt_lvlhsh_insert(&cache->lvlhsh, lhq);
+
+ if (ret == NXT_OK) {
+ /* The sentinel node was successully added. */
+
+ q->node = sentinel;
+ sentinel->updating = 1;
+ return state->update_handler;
+ }
+
+ nxt_cache_node_free(cache, sentinel, 1);
+
+ if (ret == NXT_ERROR) {
+ return state->error_handler;
+ }
+
+ /* NXT_DECLINED: a cache node exists. */
+
+ node = lhq->value;
+ node->count++;
+ q->node = node;
+
+ handler = nxt_cache_node_test(cache, q);
+ if (handler != NULL) {
+ return handler;
+ }
+
+ /* Add the node to a wait queue. */
+
+ qw = nxt_cache_query_wait_alloc(cache, &slow);
+ if (nxt_slow_path(qw == NULL)) {
+ return state->error_handler;
+ }
+
+ if (slow) {
+ /* The node state may have been changed during slow allocation. */
+
+ handler = nxt_cache_node_test(cache, q);
+ if (handler != NULL) {
+ nxt_cache_query_wait_free(cache, qw);
+ return handler;
+ }
+ }
+
+ qw->query = q;
+ qw->next = node->waiting;
+ qw->busy = 0;
+ qw->deleted = 0;
+ qw->pid = nxt_pid;
+ qw->engine = nxt_thread_event_engine();
+ qw->handler = nxt_cache_wake_handler;
+ qw->cache = cache;
+
+ node->waiting = qw;
+
+ return nxt_cache_wait_handler;
+}
+
+
+static nxt_work_handler_t
+nxt_cache_node_test(nxt_cache_t *cache, nxt_cache_query_t *q)
+{
+ nxt_time_t expiry;
+ nxt_cache_node_t *node;
+ nxt_cache_query_state_t *state;
+
+ q->stale = 0;
+ state = q->state;
+ node = q->node;
+
+ expiry = cache->start_time + node->expiry;
+
+ if (q->now < expiry) {
+ return state->ready_handler;
+ }
+
+ /*
+ * A valid stale or empty sentinel cache node.
+ * The sentinel node can be only in updating state.
+ */
+
+ if (node->updating) {
+
+ if (node->expiry != 0) {
+ /* A valid stale cache node. */
+
+ q->stale = 1;
+
+ if (q->use_stale) {
+ return state->stale_handler;
+ }
+ }
+
+ /* A sentinel node. */
+ return NULL;
+ }
+
+ /* A valid stale cache node is not being updated now. */
+
+ q->stale = 1;
+
+ if (q->use_stale) {
+
+ if (q->update_stale) {
+ node->updating = 1;
+ return state->update_stale_handler;
+ }
+
+ return state->stale_handler;
+ }
+
+ node->updating = 1;
+ return state->update_handler;
+}
+
+
+static void
+nxt_cache_wait_handler(nxt_thread_t *thr, void *obj, void *data)
+{
+ nxt_event_timer_t *ev;
+ nxt_cache_query_t *cq;
+
+ cq = obj;
+
+ if (cq->timeout != 0) {
+
+ ev = &cq->timer;
+
+ if (ev->state == NXT_EVENT_TIMER_DISABLED) {
+ ev->handler = nxt_cache_timeout_handler;
+ nxt_event_timer_ident(ev, -1);
+
+ nxt_event_timer_add(thr->engine, ev, cq->timeout);
+ }
+ }
+}
+
+
+static void
+nxt_cache_timeout_handler(nxt_thread_t *thr, void *obj, void *data)
+{
+ nxt_cache_query_t *cq;
+ nxt_event_timer_t *ev;
+
+ ev = obj;
+
+ cq = nxt_event_timer_data(ev, nxt_cache_query_t, timer);
+
+ cq->state->timeout_handler(thr, cq, NULL);
+}
+
+
+static void
+nxt_cache_wake_handler(nxt_thread_t *thr, void *obj, void *data)
+{
+ nxt_cache_t *cache;
+ nxt_work_handler_t handler;
+ nxt_cache_query_t *q;
+ nxt_cache_query_wait_t *qw;
+
+ qw = obj;
+ q = qw->query;
+ cache = qw->cache;
+
+ nxt_cache_lock(cache);
+
+ handler = nxt_cache_node_test(cache, q);
+
+ if (handler != NULL) {
+ nxt_cache_query_wait_free(cache, qw);
+
+ } else {
+ /* Wait again. */
+ qw->next = q->node->waiting;
+ q->node->waiting = qw;
+ }
+
+ nxt_cache_unlock(cache);
+
+ handler(thr, q, NULL);
+}
+
+
+nxt_int_t
+nxt_cache_update(nxt_cache_t *cache, nxt_cache_query_t *q)
+{
+ nxt_int_t ret;
+ nxt_cache_node_t *node;
+ nxt_lvlhsh_query_t lhq;
+
+ node = q->node;
+
+ node->accessed = nxt_cache_time(nxt_thread()) - cache->start_time;
+
+ node->updating = 0;
+ node->count = 1;
+
+ lhq.key_hash = nxt_murmur_hash2(node->key_data, node->key_len);
+ lhq.replace = 1;
+ lhq.key.len = node->key_len;
+ lhq.key.data = node->key_data;
+ lhq.value = node;
+ lhq.proto = cache->proto;
+ lhq.pool = cache->pool;
+
+ nxt_cache_lock(cache);
+
+ ret = nxt_lvlhsh_insert(&cache->lvlhsh, &lhq);
+
+ if (nxt_fast_path(ret != NXT_OK)) {
+
+ nxt_queue_insert_head(&cache->expiry_queue, &node->link);
+
+ node = lhq.value;
+
+ if (node != NULL) {
+ /* A replaced node. */
+
+ nxt_queue_remove(&node->link);
+
+ if (node->count != 0) {
+ node->deleted = 1;
+
+ } else {
+ // delete cache node
+ }
+ }
+ }
+
+ nxt_cache_unlock(cache);
+
+ return ret;
+}
+
+
+void
+nxt_cache_release(nxt_cache_t *cache, nxt_cache_query_t *q)
+{
+ u_char *p, *data;
+ size_t size;
+ ssize_t ret;
+ nxt_thread_t *thr;
+ u_char buf[1024];
+
+ thr = nxt_thread();
+ q->now = nxt_cache_time(thr);
+
+ p = buf;
+ size = sizeof(buf);
+
+ for ( ;; ) {
+ nxt_cache_lock(cache);
+
+ ret = nxt_cache_release_locked(cache, q, p, size);
+
+ nxt_cache_unlock(cache);
+
+ if (ret == 0) {
+ return;
+ }
+
+ size = nxt_abs(ret);
+
+ data = nxt_malloc(size);
+
+ if (data == NULL) {
+ /* TODO: retry */
+ return;
+ }
+
+ if (ret < 0) {
+ p = data;
+ continue;
+ }
+
+ if (p != data) {
+ nxt_memcpy(data, p, size);
+ }
+
+ nxt_thread_work_queue_add(thr, &thr->work_queue.main,
+ cache->delete_handler, data, NULL, thr->log);
+ }
+}
+
+
+static ssize_t
+nxt_cache_release_locked(nxt_cache_t *cache, nxt_cache_query_t *q,
+ u_char *buf, size_t size)
+{
+ ssize_t ret;
+ nxt_cache_node_t *node;
+
+ node = q->node;
+ node->count--;
+
+ if (node->count != 0) {
+ return 0;
+ }
+
+ if (!node->deleted) {
+ /*
+ * A cache node is locked whilst its count is non zero.
+ * To minimize number of operations the node's place in expiry
+ * queue can be updated only if the node is not currently used.
+ */
+ node->accessed = q->now - cache->start_time;
+
+ nxt_queue_remove(&node->link);
+ nxt_queue_insert_head(&cache->expiry_queue, &node->link);
+
+ return 0;
+ }
+
+ ret = 0;
+#if 0
+
+ ret = cache->delete_copy(cache, node, buf, size);
+
+ if (ret < 0) {
+ return ret;
+ }
+
+#endif
+
+ nxt_cache_node_free(cache, node, 0);
+
+ return ret;
+}
+
+
+static nxt_cache_node_t *
+nxt_cache_node_alloc(nxt_cache_t *cache)
+{
+ nxt_queue_link_t *link;
+ nxt_cache_node_t *node;
+
+ link = nxt_queue_first(&cache->free_nodes);
+
+ if (nxt_fast_path(link != nxt_queue_tail(&cache->free_nodes))) {
+ cache->nfree_nodes--;
+ nxt_queue_remove(link);
+
+ node = nxt_queue_link_data(link, nxt_cache_node_t, link);
+ nxt_memzero(node, sizeof(nxt_cache_node_t));
+
+ return node;
+ }
+
+ nxt_cache_unlock(cache);
+
+ node = cache->alloc(cache->data, sizeof(nxt_cache_node_t));
+
+ nxt_cache_lock(cache);
+
+ return node;
+}
+
+
+static void
+nxt_cache_node_free(nxt_cache_t *cache, nxt_cache_node_t *node, nxt_bool_t fast)
+{
+ if (fast || cache->nfree_nodes < 32) {
+ nxt_queue_insert_head(&cache->free_nodes, &node->link);
+ cache->nfree_nodes++;
+ return;
+ }
+
+ nxt_cache_unlock(cache);
+
+ cache->free(cache->data, node);
+
+ nxt_cache_lock(cache);
+}
+
+
+static nxt_cache_query_wait_t *
+nxt_cache_query_wait_alloc(nxt_cache_t *cache, nxt_bool_t *slow)
+{
+ nxt_cache_query_wait_t *qw;
+
+ qw = cache->free_query_wait;
+
+ if (nxt_fast_path(qw != NULL)) {
+ cache->free_query_wait = qw->next;
+ cache->nfree_query_wait--;
+
+ *slow = 0;
+ return qw;
+ }
+
+ nxt_cache_unlock(cache);
+
+ qw = cache->alloc(cache->data, sizeof(nxt_cache_query_wait_t));
+ *slow = 1;
+
+ nxt_cache_lock(cache);
+
+ return qw;
+}
+
+
+static void
+nxt_cache_query_wait_free(nxt_cache_t *cache, nxt_cache_query_wait_t *qw)
+{
+ if (cache->nfree_query_wait < 32) {
+ qw->next = cache->free_query_wait;
+ cache->free_query_wait = qw;
+ cache->nfree_query_wait++;
+ return;
+ }
+
+ nxt_cache_unlock(cache);
+
+ cache->free(cache->data, qw);
+
+ nxt_cache_lock(cache);
+}