summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_work_queue.h
blob: 2b168b005224876d0a241ad8fc5fa39cd9343ccf (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137

/*
 * Copyright (C) Igor Sysoev
 * Copyright (C) NGINX, Inc.
 */

#ifndef _NXT_WORK_QUEUE_H_INCLUDED_
#define _NXT_WORK_QUEUE_H_INCLUDED_


/*
 * A work handler with just the obj and data arguments instead
 * of pointer to a possibly large a work struct allows to call
 * the handler not only via a work queue but also directly.
 * The only obj argument is enough for the most cases expect the
 * source filters, so the data argument has been introduced and
 * is used where appropriate.
 */
typedef void (*nxt_work_handler_t)(nxt_thread_t *thr, void *obj, void *data);

typedef struct nxt_work_s  nxt_work_t;


struct nxt_work_s {
    nxt_work_t                  *next;
    nxt_work_handler_t          handler;
    void                        *obj;
    void                        *data;
    nxt_log_t                   *log;
};


typedef struct nxt_work_queue_chunk_s  nxt_work_queue_chunk_t;

struct nxt_work_queue_chunk_s {
    nxt_work_queue_chunk_t      *next;
    nxt_work_t                  work;
};


typedef struct {
    nxt_work_t                  *next;
    nxt_work_t                  *spare;
    nxt_work_queue_chunk_t      *chunk;
    size_t                      chunk_size;
} nxt_work_queue_cache_t;


typedef struct nxt_work_queue_s  nxt_work_queue_t;

struct nxt_work_queue_s {
    nxt_work_t                  *head;
    nxt_work_t                  *tail;
    nxt_work_queue_t            *next;
#if (NXT_DEBUG)
    const char                  *name;
#endif
};


typedef struct {
    nxt_work_queue_t            *head;
    nxt_work_queue_t            *tail;
    nxt_work_queue_t            main;
    nxt_work_queue_t            last;
    nxt_work_queue_cache_t      cache;
} nxt_thread_work_queue_t;


typedef struct {
    nxt_thread_spinlock_t       lock;
    nxt_work_t                  *head;
    nxt_work_t                  *tail;
    nxt_work_queue_cache_t      cache;
} nxt_locked_work_queue_t;


NXT_EXPORT void nxt_thread_work_queue_create(nxt_thread_t *thr,
    size_t chunk_size);
NXT_EXPORT void nxt_thread_work_queue_destroy(nxt_thread_t *thr);
NXT_EXPORT void nxt_thread_work_queue_add(nxt_thread_t *thr,
    nxt_work_queue_t *wq,
    nxt_work_handler_t handler, void *obj, void *data, nxt_log_t *log);
NXT_EXPORT void nxt_thread_work_queue_push(nxt_thread_t *thr,
    nxt_work_queue_t *wq, nxt_work_handler_t handler, void *obj, void *data,
    nxt_log_t *log);
NXT_EXPORT void nxt_work_queue_attach(nxt_thread_t *thr, nxt_work_queue_t *wq);
NXT_EXPORT nxt_work_handler_t nxt_thread_work_queue_pop(nxt_thread_t *thr,
    void **obj, void **data, nxt_log_t **log);
NXT_EXPORT void nxt_thread_work_queue_drop(nxt_thread_t *thr, void *data);


#define                                                                       \
nxt_thread_current_work_queue_add(thr, handler, obj, data, log)               \
    do {                                                                      \
        nxt_thread_t  *_thr = thr;                                            \
                                                                              \
        nxt_thread_work_queue_add(_thr, _thr->work_queue.head,                \
                                  handler, obj, data, log);                   \
    } while (0)


NXT_EXPORT void nxt_work_queue_destroy(nxt_work_queue_t *wq);


#if (NXT_DEBUG)

#define                                                                       \
nxt_work_queue_name(_wq, _name)                                               \
    (_wq)->name = _name

#else

#define                                                                       \
nxt_work_queue_name(_wq, _name)

#endif


NXT_EXPORT void nxt_thread_last_work_queue_add(nxt_thread_t *thr,
    nxt_work_handler_t handler, void *obj, void *data, nxt_log_t *log);
NXT_EXPORT nxt_work_handler_t nxt_thread_last_work_queue_pop(nxt_thread_t *thr,
    void **obj, void **data, nxt_log_t **log);


NXT_EXPORT void nxt_locked_work_queue_create(nxt_locked_work_queue_t *lwq,
    size_t chunk_size);
NXT_EXPORT void nxt_locked_work_queue_destroy(nxt_locked_work_queue_t *lwq);
NXT_EXPORT void nxt_locked_work_queue_add(nxt_locked_work_queue_t *lwq,
    nxt_work_handler_t handler, void *obj, void *data, nxt_log_t *log);
NXT_EXPORT nxt_work_handler_t nxt_locked_work_queue_pop(
    nxt_locked_work_queue_t *lwq, void **obj, void **data, nxt_log_t **log);
NXT_EXPORT void nxt_locked_work_queue_move(nxt_thread_t *thr,
    nxt_locked_work_queue_t *lwq, nxt_work_queue_t *wq);


#endif /* _NXT_WORK_QUEUE_H_INCLUDED_ */