/* * Copyright (C) Igor Sysoev * Copyright (C) NGINX, Inc. */ #include <nxt_main.h> /* A cache time resolution is 10ms. */ #define \ nxt_cache_time(thr) \ (uint64_t) (nxt_thread_time(thr) * 100) static nxt_int_t nxt_cache_lvlhsh_test(nxt_lvlhsh_query_t *lhq, void *data); static nxt_work_handler_t nxt_cache_query_locked(nxt_cache_t *cache, nxt_cache_query_t *q, nxt_lvlhsh_query_t *lhq); static nxt_work_handler_t nxt_cache_node_hold(nxt_cache_t *cache, nxt_cache_query_t *q, nxt_lvlhsh_query_t *lhq); static nxt_work_handler_t nxt_cache_node_test(nxt_cache_t *cache, nxt_cache_query_t *q); static void nxt_cache_wait_handler(nxt_thread_t *thr, void *obj, void *data); static void nxt_cache_timeout_handler(nxt_thread_t *thr, void *obj, void *data); static void nxt_cache_wake_handler(nxt_thread_t *thr, void *obj, void *data); static ssize_t nxt_cache_release_locked(nxt_cache_t *cache, nxt_cache_query_t *q, u_char *buf, size_t size); static nxt_cache_node_t *nxt_cache_node_alloc(nxt_cache_t *cache); static void nxt_cache_node_free(nxt_cache_t *cache, nxt_cache_node_t *node, nxt_bool_t fast); static nxt_cache_query_wait_t *nxt_cache_query_wait_alloc(nxt_cache_t *cache, nxt_bool_t *slow); static void nxt_cache_query_wait_free(nxt_cache_t *cache, nxt_cache_query_wait_t *qw); /* STUB */ nxt_int_t nxt_cache_shm_create(nxt_mem_zone_t *pool); static void *nxt_cache_shm_alloc(void *data, size_t size, nxt_uint_t nalloc); /**/ nxt_int_t nxt_cache_shm_create(nxt_mem_zone_t *mz) { nxt_cache_t *cache; static const nxt_lvlhsh_proto_t proto nxt_aligned(64) = { NXT_LVLHSH_LARGE_SLAB, 0, nxt_cache_lvlhsh_test, (nxt_lvlhsh_alloc_t) nxt_cache_shm_alloc, (nxt_lvlhsh_free_t) nxt_mem_zone_free, }; cache = nxt_mem_zone_zalloc(mz, sizeof(nxt_cache_t)); if (cache == NULL) { return NXT_ERROR; } cache->proto = &proto; cache->pool = mz; cache->start_time = nxt_cache_time(nxt_thread()); return NXT_OK; } static void * nxt_cache_shm_alloc(void *data, size_t size, nxt_uint_t nalloc) { return nxt_mem_zone_align(data, size, size); } void nxt_cache_init(nxt_cache_t *cache) { static const nxt_lvlhsh_proto_t proto nxt_aligned(64) = { NXT_LVLHSH_LARGE_MEMALIGN, 0, nxt_cache_lvlhsh_test, nxt_lvlhsh_alloc, nxt_lvlhsh_free, }; cache->proto = &proto; cache->start_time = nxt_cache_time(nxt_thread()); } static nxt_int_t nxt_cache_lvlhsh_test(nxt_lvlhsh_query_t *lhq, void *data) { nxt_cache_node_t *node; node = data; if (nxt_str_eq(&lhq->key, node->key_data, node->key_len)) { return NXT_OK; } return NXT_DECLINED; } nxt_inline void nxt_cache_lock(nxt_cache_t *cache) { if (cache->shared) { nxt_thread_spin_lock(&cache->lock); } } nxt_inline void nxt_cache_unlock(nxt_cache_t *cache) { if (cache->shared) { nxt_thread_spin_unlock(&cache->lock); } } void nxt_cache_query(nxt_cache_t *cache, nxt_cache_query_t *q) { nxt_thread_t *thr; nxt_lvlhsh_query_t lhq; nxt_work_handler_t handler; thr = nxt_thread(); if (cache != NULL) { lhq.key_hash = nxt_murmur_hash2(q->key_data, q->key_len); lhq.replace = 0; lhq.key.len = q->key_len; lhq.key.data = q->key_data; lhq.proto = cache->proto; lhq.pool = cache->pool; q->now = nxt_cache_time(thr); nxt_cache_lock(cache); handler = nxt_cache_query_locked(cache, q, &lhq); nxt_cache_unlock(cache); } else { handler = q->state->nocache_handler; } handler(thr, q, NULL); } static nxt_work_handler_t nxt_cache_query_locked(nxt_cache_t *cache, nxt_cache_query_t *q, nxt_lvlhsh_query_t *lhq) { nxt_int_t ret; nxt_time_t expiry; nxt_cache_node_t *node; nxt_cache_query_state_t *state; if (q->hold) { return nxt_cache_node_hold(cache, q, lhq); } ret = nxt_lvlhsh_find(&cache->lvlhsh, lhq); state = q->state; if (ret != NXT_OK) { /* NXT_DECLINED */ return state->nocache_handler; } node = lhq->value; node->count++; q->node = node; expiry = cache->start_time + node->expiry; if (q->now < expiry) { return state->ready_handler; } q->stale = 1; return state->stale_handler; } static nxt_work_handler_t nxt_cache_node_hold(nxt_cache_t *cache, nxt_cache_query_t *q, nxt_lvlhsh_query_t *lhq) { nxt_int_t ret; nxt_bool_t slow; nxt_cache_node_t *node, *sentinel; nxt_work_handler_t handler; nxt_cache_query_wait_t *qw; nxt_cache_query_state_t *state; state = q->state; sentinel = nxt_cache_node_alloc(cache); if (nxt_slow_path(sentinel == NULL)) { return state->error_handler; } sentinel->key_data = q->key_data; sentinel->key_len = q->key_len; lhq->value = sentinel; /* * Try to insert an empty sentinel node to hold updating * process if there is no existent cache node in cache. */ ret = nxt_lvlhsh_insert(&cache->lvlhsh, lhq); if (ret == NXT_OK) { /* The sentinel node was successully added. */ q->node = sentinel; sentinel->updating = 1; return state->update_handler; } nxt_cache_node_free(cache, sentinel, 1); if (ret == NXT_ERROR) { return state->error_handler; } /* NXT_DECLINED: a cache node exists. */ node = lhq->value; node->count++; q->node = node; handler = nxt_cache_node_test(cache, q); if (handler != NULL) { return handler; } /* Add the node to a wait queue. */ qw = nxt_cache_query_wait_alloc(cache, &slow); if (nxt_slow_path(qw == NULL)) { return state->error_handler; } if (slow) { /* The node state may have been changed during slow allocation. */ handler = nxt_cache_node_test(cache, q); if (handler != NULL) { nxt_cache_query_wait_free(cache, qw); return handler; } } qw->query = q; qw->next = node->waiting; qw->busy = 0; qw->deleted = 0; qw->pid = nxt_pid; qw->engine = nxt_thread_event_engine(); qw->handler = nxt_cache_wake_handler; qw->cache = cache; node->waiting = qw; return nxt_cache_wait_handler; } static nxt_work_handler_t nxt_cache_node_test(nxt_cache_t *cache, nxt_cache_query_t *q) { nxt_time_t expiry; nxt_cache_node_t *node; nxt_cache_query_state_t *state; q->stale = 0; state = q->state; node = q->node; expiry = cache->start_time + node->expiry; if (q->now < expiry) { return state->ready_handler; } /* * A valid stale or empty sentinel cache node. * The sentinel node can be only in updating state. */ if (node->updating) { if (node->expiry != 0) { /* A valid stale cache node. */ q->stale = 1; if (q->use_stale) { return state->stale_handler; } } /* A sentinel node. */ return NULL; } /* A valid stale cache node is not being updated now. */ q->stale = 1; if (q->use_stale) { if (q->update_stale) { node->updating = 1; return state->update_stale_handler; } return state->stale_handler; } node->updating = 1; return state->update_handler; } static void nxt_cache_wait_handler(nxt_thread_t *thr, void *obj, void *data) { nxt_event_timer_t *ev; nxt_cache_query_t *cq; cq = obj; if (cq->timeout != 0) { ev = &cq->timer; if (ev->state == NXT_EVENT_TIMER_DISABLED) { ev->handler = nxt_cache_timeout_handler; nxt_event_timer_ident(ev, -1); nxt_event_timer_add(thr->engine, ev, cq->timeout); } } } static void nxt_cache_timeout_handler(nxt_thread_t *thr, void *obj, void *data) { nxt_cache_query_t *cq; nxt_event_timer_t *ev; ev = obj; cq = nxt_event_timer_data(ev, nxt_cache_query_t, timer); cq->state->timeout_handler(thr, cq, NULL); } static void nxt_cache_wake_handler(nxt_thread_t *thr, void *obj, void *data) { nxt_cache_t *cache; nxt_work_handler_t handler; nxt_cache_query_t *q; nxt_cache_query_wait_t *qw; qw = obj; q = qw->query; cache = qw->cache; nxt_cache_lock(cache); handler = nxt_cache_node_test(cache, q); if (handler != NULL) { nxt_cache_query_wait_free(cache, qw); } else { /* Wait again. */ qw->next = q->node->waiting; q->node->waiting = qw; } nxt_cache_unlock(cache); handler(thr, q, NULL); } nxt_int_t nxt_cache_update(nxt_cache_t *cache, nxt_cache_query_t *q) { nxt_int_t ret; nxt_cache_node_t *node; nxt_lvlhsh_query_t lhq; node = q->node; node->accessed = nxt_cache_time(nxt_thread()) - cache->start_time; node->updating = 0; node->count = 1; lhq.key_hash = nxt_murmur_hash2(node->key_data, node->key_len); lhq.replace = 1; lhq.key.len = node->key_len; lhq.key.data = node->key_data; lhq.value = node; lhq.proto = cache->proto; lhq.pool = cache->pool; nxt_cache_lock(cache); ret = nxt_lvlhsh_insert(&cache->lvlhsh, &lhq); if (nxt_fast_path(ret != NXT_OK)) { nxt_queue_insert_head(&cache->expiry_queue, &node->link); node = lhq.value; if (node != NULL) { /* A replaced node. */ nxt_queue_remove(&node->link); if (node->count != 0) { node->deleted = 1; } else { // delete cache node } } } nxt_cache_unlock(cache); return ret; } void nxt_cache_release(nxt_cache_t *cache, nxt_cache_query_t *q) { u_char *p, *data; size_t size; ssize_t ret; nxt_thread_t *thr; u_char buf[1024]; thr = nxt_thread(); q->now = nxt_cache_time(thr); p = buf; size = sizeof(buf); for ( ;; ) { nxt_cache_lock(cache); ret = nxt_cache_release_locked(cache, q, p, size); nxt_cache_unlock(cache); if (ret == 0) { return; } size = nxt_abs(ret); data = nxt_malloc(size); if (data == NULL) { /* TODO: retry */ return; } if (ret < 0) { p = data; continue; } if (p != data) { nxt_memcpy(data, p, size); } nxt_thread_work_queue_add(thr, &thr->work_queue.main, cache->delete_handler, data, NULL, thr->log); } } static ssize_t nxt_cache_release_locked(nxt_cache_t *cache, nxt_cache_query_t *q, u_char *buf, size_t size) { ssize_t ret; nxt_cache_node_t *node; node = q->node; node->count--; if (node->count != 0) { return 0; } if (!node->deleted) { /* * A cache node is locked whilst its count is non zero. * To minimize number of operations the node's place in expiry * queue can be updated only if the node is not currently used. */ node->accessed = q->now - cache->start_time; nxt_queue_remove(&node->link); nxt_queue_insert_head(&cache->expiry_queue, &node->link); return 0; } ret = 0; #if 0 ret = cache->delete_copy(cache, node, buf, size); if (ret < 0) { return ret; } #endif nxt_cache_node_free(cache, node, 0); return ret; } static nxt_cache_node_t * nxt_cache_node_alloc(nxt_cache_t *cache) { nxt_queue_link_t *link; nxt_cache_node_t *node; link = nxt_queue_first(&cache->free_nodes); if (nxt_fast_path(link != nxt_queue_tail(&cache->free_nodes))) { cache->nfree_nodes--; nxt_queue_remove(link); node = nxt_queue_link_data(link, nxt_cache_node_t, link); nxt_memzero(node, sizeof(nxt_cache_node_t)); return node; } nxt_cache_unlock(cache); node = cache->alloc(cache->data, sizeof(nxt_cache_node_t)); nxt_cache_lock(cache); return node; } static void nxt_cache_node_free(nxt_cache_t *cache, nxt_cache_node_t *node, nxt_bool_t fast) { if (fast || cache->nfree_nodes < 32) { nxt_queue_insert_head(&cache->free_nodes, &node->link); cache->nfree_nodes++; return; } nxt_cache_unlock(cache); cache->free(cache->data, node); nxt_cache_lock(cache); } static nxt_cache_query_wait_t * nxt_cache_query_wait_alloc(nxt_cache_t *cache, nxt_bool_t *slow) { nxt_cache_query_wait_t *qw; qw = cache->free_query_wait; if (nxt_fast_path(qw != NULL)) { cache->free_query_wait = qw->next; cache->nfree_query_wait--; *slow = 0; return qw; } nxt_cache_unlock(cache); qw = cache->alloc(cache->data, sizeof(nxt_cache_query_wait_t)); *slow = 1; nxt_cache_lock(cache); return qw; } static void nxt_cache_query_wait_free(nxt_cache_t *cache, nxt_cache_query_wait_t *qw) { if (cache->nfree_query_wait < 32) { qw->next = cache->free_query_wait; cache->free_query_wait = qw; cache->nfree_query_wait++; return; } nxt_cache_unlock(cache); cache->free(cache->data, qw); nxt_cache_lock(cache); }