/*
* Copyright (C) Igor Sysoev
* Copyright (C) NGINX, Inc.
*/
#include <nxt_main.h>
static nxt_int_t nxt_file_cache_lvlhsh_test(nxt_lvlhsh_key_t *hkey, void *data);
static nxt_work_handler_t nxt_file_cache_query_locked(nxt_file_cache_t *cache,
nxt_file_cache_query_t *q, nxt_lvlhsh_key_t *hkey);
static nxt_work_handler_t nxt_file_cache_node_hold(nxt_file_cache_t *cache,
nxt_file_cache_query_t *q, nxt_lvlhsh_key_t *hkey);
static nxt_work_handler_t nxt_file_cache_node_test(nxt_file_cache_t *cache,
nxt_file_cache_query_t *q);
static void nxt_file_cache_wait_handler(void *data);
static void nxt_file_cache_timeout_handler(nxt_event_timer_t *ev);
static void nxt_file_cache_wake_handler(void *data);
static nxt_file_cache_node_t *nxt_file_cache_node_alloc(nxt_cache_t *cache);
static void nxt_file_cache_node_free(nxt_file_cache_t *cache,
nxt_file_cache_node_t *node, nxt_bool_t fast);
static nxt_file_cache_query_wait_t *nxt_file_cache_query_wait_alloc(
nxt_file_cache_t *cache, nxt_bool_t *fast);
static void nxt_file_cache_query_wait_free(nxt_file_cache_t *cache,
nxt_file_cache_query_wait_t *qw);
static void nxt_file_cache_lock(nxt_file_cache_t *cache);
static void nxt_file_cache_unlock(nxt_file_cache_t *cache);
void
nxt_file_cache_init(nxt_cache_t *cache)
{
static const nxt_lvlhsh_ctx_t ctx = {
nxt_file_cache_lvlhsh_test,
nxt_lvlhsh_alloc,
nxt_lvlhsh_free,
0,
};
/* lvlhsh with large first level. */
cache->lvlhsh.shift[1] = 10;
cache->lvlhsh.ctx = &ctx;
cache->start_time = nxt_thread_time();
}
static nxt_int_t
nxt_file_cache_lvlhsh_test(nxt_lvlhsh_key_t *hkey, void *data)
{
nxt_file_cache_node_t *node;
node = data;
if (nxt_strmem_eq(&hkey->key, node->key_data, node->key_len)) {
return NXT_OK;
}
return NXT_DECLINED;
}
void
nxt_file_cache_query(nxt_file_cache_t *cache, nxt_file_cache_query_t *q)
{
nxt_lvlhsh_key_t hkey;
nxt_work_handler_t handler;
if (cache != NULL) {
hkey.key.len = q->key_len;
hkey.key.data = q->key_data;
hkey.key_hash = nxt_murmur_hash2(q->key_data, q->key_len);
hkey.replace = 0;
nxt_file_cache_lock(cache);
handler = nxt_file_cache_query_locked(cache, q, &hkey);
nxt_file_cache_unlock(cache);
} else {
handler = q->state->nocache_handler;
}
handler(q);
}
static nxt_work_handler_t
nxt_file_cache_query_locked(nxt_file_cache_t *cache, nxt_file_cache_query_t *q,
nxt_lvlhsh_key_t *hkey)
{
nxt_int_t ret;
nxt_bool_t fast;
nxt_work_handler_t handler;
nxt_file_cache_node_t *node, *sentinel;
nxt_file_cache_query_wait_t *qw;
nxt_file_cache_query_state_t *state;
state = q->state;
sentinel = nxt_file_cache_node_alloc(cache);
if (nxt_slow_path(sentinel == NULL)) {
return state->error_handler;
}
sentinel->key_data = q->key_data;
sentinel->key_len = q->key_len;
hkey->value = sentinel;
/*
* Try to insert an empty sentinel node to hold updating
* process if there is no existent cache node in cache.
*/
ret = nxt_lvlhsh_insert(&cache->lvlhsh, hkey);
if (ret == NXT_OK) {
/* The sentinel node was successully added. */
q->node = sentinel;
sentinel->updating = 1;
return state->update_handler;
}
nxt_cache_node_free(cache, sentinel, 1);
if (ret == NXT_ERROR) {
return state->error_handler;
}
/* NXT_DECLINED: a cache node exists. */
node = hkey->value;
node->count++;
q->node = node;
handler = nxt_cache_node_test(cache, q);
if (handler == NULL) {
/* Add the node to a wait queue. */
qw = nxt_cache_query_wait_alloc(cache, &fast);
if (nxt_slow_path(qw == NULL)) {
return state->error_handler;
}
if (!fast) {
/* The node state may be changed during slow allocation. */
handler = nxt_cache_node_test(cache, q);
if (handler != NULL) {
nxt_cache_query_wait_free(cache, qw);
return handler;
}
}
qw->query = q;
qw->next = node->waiting;
qw->busy = 0;
qw->deleted = 0;
qw->pid = nxt_pid;
qw->engine = nxt_thread_event_engine();
qw->handler = nxt_cache_wake_handler;
qw->cache = cache;
node->waiting = qw;
return nxt_cache_wait_handler;
}
return handler;
}
static nxt_work_handler_t
nxt_cache_node_test(nxt_cache_t *cache, nxt_cache_query_t *q)
{
nxt_time_t expiry;
nxt_cache_node_t *node;
nxt_cache_query_state_t *state;
q->stale = 0;
state = q->state;
node = q->node;
expiry = cache->start_time + node->expiry;
if (nxt_thread_time() < expiry) {
return state->ready_handler;
}
/*
* A valid stale or empty sentinel cache node.
* The sentinel node can be only in updating state.
*/
if (node->updating) {
if (node->expiry != 0) {
/* A valid stale cache node. */
q->stale = 1;
if (q->use_stale) {
return state->stale_handler;
}
}
return NULL;
}
/* A valid stale cache node is not being updated now. */
q->stale = 1;
if (q->use_stale) {
if (q->update_stale) {
node->updating = 1;
return state->update_stale_handler;
}
return state->stale_handler;
}
node->updating = 1;
return state->update_handler;
}
static void
nxt_cache_wait_handler(void *data)
{
nxt_thread_t *thr;
nxt_event_timer_t *ev;
nxt_cache_query_t *q;
q = data;
if (&q->timeout == 0) {
return;
}
ev = &q->timer;
if (!nxt_event_timer_is_set(ev)) {
thr = nxt_thread();
ev->log = thr->log;
ev->handler = nxt_cache_timeout_handler;
ev->data = q;
nxt_event_timer_ident(ev, -1);
nxt_event_timer_add(thr->engine, ev, q->timeout);
}
}
static void
nxt_cache_timeout_handler(nxt_event_timer_t *ev)
{
nxt_cache_query_t *q;
q = ev->data;
q->state->timeout_handler(q);
}
static void
nxt_cache_wake_handler(void *data)
{
nxt_cache_t *cache;
nxt_work_handler_t handler;
nxt_cache_query_t *q;
nxt_cache_query_wait_t *qw;
qw = data;
q = qw->query;
cache = qw->cache;
nxt_cache_lock(cache);
handler = nxt_cache_node_test(cache, q);
if (handler == NULL) {
/* Wait again. */
qw->next = q->node->waiting;
q->node->waiting = qw;
}
nxt_cache_unlock(cache);
if (handler != NULL) {
nxt_cache_query_wait_free(cache, qw);
}
handler(q);
}
static nxt_cache_node_t *
nxt_cache_node_alloc(nxt_cache_t *cache)
{
nxt_queue_node_t *qn;
nxt_cache_node_t *node;
qn = nxt_queue_first(&cache->free_nodes);
if (nxt_fast_path(qn != nxt_queue_tail(&cache->free_nodes))) {
cache->nfree_nodes--;
nxt_queue_remove(qn);
node = nxt_queue_node_data(qn, nxt_cache_node_t, queue);
nxt_memzero(node, sizeof(nxt_cache_node_t));
return node;
}
nxt_cache_unlock(cache);
node = cache->alloc(cache->data, sizeof(nxt_cache_node_t));
nxt_cache_lock(cache);
return node;
}
static void
nxt_cache_node_free(nxt_cache_t *cache, nxt_cache_node_t *node, nxt_bool_t fast)
{
if (fast || cache->nfree_nodes < 32) {
nxt_queue_insert_head(&cache->free_nodes, &node->queue);
cache->nfree_nodes++;
return;
}
nxt_cache_unlock(cache);
cache->free(cache->data, node);
nxt_cache_lock(cache);
}
static nxt_cache_query_wait_t *
nxt_cache_query_wait_alloc(nxt_cache_t *cache, nxt_bool_t *fast)
{
nxt_cache_query_wait_t *qw;
qw = cache->free_query_wait;
if (nxt_fast_path(qw != NULL)) {
cache->free_query_wait = qw->next;
cache->nfree_query_wait--;
*fast = 1;
return qw;
}
nxt_cache_unlock(cache);
qw = cache->alloc(cache->data, sizeof(nxt_cache_query_wait_t));
*fast = 0;
nxt_cache_lock(cache);
return qw;
}
static void
nxt_cache_query_wait_free(nxt_cache_t *cache, nxt_cache_query_wait_t *qw)
{
if (cache->nfree_query_wait < 32) {
qw->next = cache->free_query_wait;
cache->free_query_wait = qw;
cache->nfree_query_wait++;
return;
}
nxt_cache_unlock(cache);
cache->free(cache->data, qw);
nxt_cache_lock(cache);
}
#if 0
nxt_int_t
nxt_cache_update(nxt_cache_t *cache, nxt_cache_node_t *node)
{
nxt_lvlhsh_key_t hkey;
if (node->expiry == 0) {
/* An empty sentinel node. */
nxt_cache_release(cache, node);
return;
}
hkey.key.len = node->key_len;
hkey.key.data = node->key_data;
hkey.key_hash = nxt_murmur_hash2(node->key_data, node->key_len);
hkey.replace = 1;
hkey.value = node;
node->count = 1;
if (nxt_lvlhsh_insert(&cache->lvlhsh, &hkey) != NXT_OK) {
return NXT_ERROR;
}
node = hkey.value;
if (node != NULL) {
if (node->count != 0) {
node->delete = 1;
} else {
// delete cache node
}
}
return NXT_OK;
}
#endif
void
nxt_cache_node_release(nxt_cache_t *cache, nxt_cache_node_t *node)
{
nxt_bool_t delete;
nxt_cache_lock(cache);
delete = nxt_cache_node_release_locked(cache, node);
nxt_cache_unlock(cache);
if (delete) {
nxt_thread_work_queue_add(cache->delete_handler, node);
}
}
nxt_bool_t
nxt_cache_node_release_locked(nxt_cache_t *cache, nxt_cache_node_t *node)
{
#if 0
nxt_lvlhsh_key_t hkey;
#endif
node->count--;
if (node->count != 0) {
return 0;
}
if (!node->deleted) {
/*
* A cache node is locked whilst its count is non zero.
* To minimize number of operations the node's place in expiry
* queue can be updated only if the node is not currently used.
*/
node->accessed = nxt_thread_time() - cache->start_time;
nxt_queue_remove(&node->queue);
nxt_queue_insert_head(&cache->expiry_queue, &node->queue);
return 0;
}
#if 0
hkey.key.len = node->key_len;
hkey.key.data = node->key_data;
hkey.key_hash = nxt_murmur_hash2(node->key_data, node->key_len);
nxt_lvlhsh_delete(&cache->lvlhsh, &hkey);
#endif
return 1;
}
static void
nxt_file_cache_lock(nxt_file_cache_t *cache)
{
if (cache->shared) {
nxt_thread_spin_lock(&cache->lock);
}
}
static void
nxt_file_cache_unlock(nxt_file_cache_t *cache)
{
if (cache->shared) {
nxt_thread_spin_unlock(&cache->lock);
}
}