1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
|
/*
* Copyright (C) Igor Sysoev
* Copyright (C) NGINX, Inc.
*/
#ifndef _NXT_WORK_QUEUE_H_INCLUDED_
#define _NXT_WORK_QUEUE_H_INCLUDED_
typedef struct nxt_work_s nxt_work_t;
typedef struct {
nxt_thread_t *thread;
nxt_log_t *log;
uint32_t ident;
nxt_work_t *next_work;
/* TODO: exception_handler, prev/next task, subtasks. */
} nxt_task_t;
#define nxt_task_next_ident() \
((uint32_t) nxt_atomic_fetch_add(&nxt_task_ident, 1) & 0x3fffffff)
/*
* A work handler with just the obj and data arguments instead
* of pointer to a possibly large a work struct allows to call
* the handler not only via a work queue but also directly.
* The only obj argument is enough for the most cases expect the
* source filters, so the data argument has been introduced and
* is used where appropriate.
*/
typedef void (*nxt_work_handler_t)(nxt_task_t *task, void *obj, void *data);
struct nxt_work_s {
nxt_work_t *next;
nxt_work_handler_t handler;
nxt_task_t *task;
void *obj;
void *data;
};
typedef struct nxt_work_queue_chunk_s nxt_work_queue_chunk_t;
struct nxt_work_queue_chunk_s {
nxt_work_queue_chunk_t *next;
nxt_work_t work;
};
typedef struct {
nxt_work_t *next;
nxt_work_t *spare;
nxt_work_queue_chunk_t *chunk;
size_t chunk_size;
} nxt_work_queue_cache_t;
typedef struct nxt_work_queue_s nxt_work_queue_t;
struct nxt_work_queue_s {
nxt_work_t *head;
nxt_work_t *tail;
nxt_work_queue_t *next;
#if (NXT_DEBUG)
const char *name;
#endif
};
typedef struct {
nxt_work_queue_t *head;
nxt_work_queue_t *tail;
nxt_work_queue_t main;
nxt_work_queue_t last;
nxt_work_queue_cache_t cache;
} nxt_thread_work_queue_t;
typedef struct {
nxt_thread_spinlock_t lock;
nxt_work_t *head;
nxt_work_t *tail;
nxt_work_queue_cache_t cache;
} nxt_locked_work_queue_t;
NXT_EXPORT void nxt_thread_work_queue_create(nxt_thread_t *thr,
size_t chunk_size);
NXT_EXPORT void nxt_thread_work_queue_destroy(nxt_thread_t *thr);
NXT_EXPORT void nxt_thread_work_queue_add(nxt_thread_t *thr,
nxt_work_queue_t *wq, nxt_work_handler_t handler, nxt_task_t *task,
void *obj, void *data);
NXT_EXPORT void nxt_thread_work_queue_push(nxt_thread_t *thr,
nxt_work_queue_t *wq, nxt_work_handler_t handler, nxt_task_t *task,
void *obj, void *data);
NXT_EXPORT void nxt_work_queue_attach(nxt_thread_t *thr, nxt_work_queue_t *wq);
NXT_EXPORT nxt_work_handler_t nxt_thread_work_queue_pop(nxt_thread_t *thr,
nxt_task_t **task, void **obj, void **data);
NXT_EXPORT void nxt_thread_work_queue_drop(nxt_thread_t *thr, void *data);
#define \
nxt_thread_current_work_queue_add(thr, handler, task, obj, data) \
do { \
nxt_thread_t *_thr = thr; \
\
nxt_thread_work_queue_add(_thr, _thr->work_queue.head, \
handler, task, obj, data); \
} while (0)
NXT_EXPORT void nxt_work_queue_destroy(nxt_work_queue_t *wq);
#if (NXT_DEBUG)
#define \
nxt_work_queue_name(_wq, _name) \
(_wq)->name = _name
#else
#define \
nxt_work_queue_name(_wq, _name)
#endif
NXT_EXPORT void nxt_thread_last_work_queue_add(nxt_thread_t *thr,
nxt_work_handler_t handler, void *obj, void *data);
NXT_EXPORT nxt_work_handler_t nxt_thread_last_work_queue_pop(nxt_thread_t *thr,
nxt_task_t **task, void **obj, void **data);
NXT_EXPORT void nxt_locked_work_queue_create(nxt_locked_work_queue_t *lwq,
size_t chunk_size);
NXT_EXPORT void nxt_locked_work_queue_destroy(nxt_locked_work_queue_t *lwq);
NXT_EXPORT void nxt_locked_work_queue_add(nxt_locked_work_queue_t *lwq,
nxt_work_handler_t handler, nxt_task_t *task, void *obj, void *data);
NXT_EXPORT nxt_work_handler_t nxt_locked_work_queue_pop(
nxt_locked_work_queue_t *lwq, nxt_task_t **task, void **obj, void **data);
NXT_EXPORT void nxt_locked_work_queue_move(nxt_thread_t *thr,
nxt_locked_work_queue_t *lwq, nxt_work_queue_t *wq);
#endif /* _NXT_WORK_QUEUE_H_INCLUDED_ */
|