summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_event_engine.h
diff options
context:
space:
mode:
authorIgor Sysoev <igor@sysoev.ru>2017-02-07 20:04:56 +0300
committerIgor Sysoev <igor@sysoev.ru>2017-02-07 20:04:56 +0300
commit059a8642898a6bd4b47d13a1c1d599cd44af7e1c (patch)
treee3c8c530a04f1ae44777d5ea4fd6901dc55a8ebf /src/nxt_event_engine.h
parente57b95a92333fa7ff558737b0ba2b76894cc0412 (diff)
downloadunit-059a8642898a6bd4b47d13a1c1d599cd44af7e1c.tar.gz
unit-059a8642898a6bd4b47d13a1c1d599cd44af7e1c.tar.bz2
Event engines refactoring.
Diffstat (limited to '')
-rw-r--r--src/nxt_event_engine.h455
1 files changed, 439 insertions, 16 deletions
diff --git a/src/nxt_event_engine.h b/src/nxt_event_engine.h
index 76382e34..c34f8cd7 100644
--- a/src/nxt_event_engine.h
+++ b/src/nxt_event_engine.h
@@ -7,31 +7,445 @@
#ifndef _NXT_EVENT_ENGINE_H_INCLUDED_
#define _NXT_EVENT_ENGINE_H_INCLUDED_
+/*
+ * An event interface is kernel interface such as kqueue, epoll, etc.
+ * intended to get event notifications about file descriptor state,
+ * signals, etc.
+ */
+
+#define NXT_FILE_EVENTS 1
+#define NXT_NO_FILE_EVENTS 0
+
+#define NXT_SIGNAL_EVENTS 1
+#define NXT_NO_SIGNAL_EVENTS 0
+
+
+typedef struct {
+
+ /* The canonical event set name. */
+ const char *name;
+
+ /*
+ * Create an event set. The mchanges argument is a maximum number of
+ * changes to send to the kernel. The mevents argument is a maximum
+ * number of events to retrieve from the kernel at once, if underlying
+ * event facility supports batch operations.
+ */
+ nxt_int_t (*create)(nxt_event_engine_t *engine,
+ nxt_uint_t mchanges, nxt_uint_t mevents);
+
+ /* Close and free an event set. */
+ void (*free)(nxt_event_engine_t *engine);
+
+ /*
+ * Add a file descriptor to an event set and enable the most
+ * effective read and write event notification method provided
+ * by underlying event facility.
+ */
+ void (*enable)(nxt_event_engine_t *engine,
+ nxt_fd_event_t *ev);
+
+ /* Disable file descriptor event notifications. */
+ void (*disable)(nxt_event_engine_t *engine,
+ nxt_fd_event_t *ev);
+
+ /*
+ * Delete a file descriptor from an event set. A possible usage
+ * is a moving of the file descriptor from one event set to another.
+ */
+ void (*delete)(nxt_event_engine_t *engine,
+ nxt_fd_event_t *ev);
+
+ /*
+ * Delete a file descriptor from an event set before closing the
+ * file descriptor. The most event facilities such as Linux epoll,
+ * BSD kqueue, Solaris event ports, AIX pollset, and HP-UX /dev/poll
+ * delete a file descriptor automatically on the file descriptor close.
+ * Some facilities such as Solaris /dev/poll require to delete a file
+ * descriptor explicitly.
+ */
+ nxt_bool_t (*close)(nxt_event_engine_t *engine,
+ nxt_fd_event_t *ev);
+
+ /*
+ * Add a file descriptor to an event set and enable the most effective
+ * read event notification method provided by underlying event facility.
+ */
+ void (*enable_read)(nxt_event_engine_t *engine,
+ nxt_fd_event_t *ev);
+
+ /*
+ * Add a file descriptor to an event set and enable the most effective
+ * write event notification method provided by underlying event facility.
+ */
+ void (*enable_write)(nxt_event_engine_t *engine,
+ nxt_fd_event_t *ev);
+
+ /* Disable file descriptor read event notifications. */
+ void (*disable_read)(nxt_event_engine_t *engine,
+ nxt_fd_event_t *ev);
+
+ /* Disable file descriptor write event notifications. */
+ void (*disable_write)(nxt_event_engine_t *engine,
+ nxt_fd_event_t *ev);
+
+ /* Block file descriptor read event notifications. */
+ void (*block_read)(nxt_event_engine_t *engine,
+ nxt_fd_event_t *ev);
+
+ /* Block file descriptor write event notifications. */
+ void (*block_write)(nxt_event_engine_t *engine,
+ nxt_fd_event_t *ev);
+
+ /*
+ * Add a file descriptor to an event set and enable an oneshot
+ * read event notification method.
+ */
+ void (*oneshot_read)(nxt_event_engine_t *engine,
+ nxt_fd_event_t *ev);
+
+ /*
+ * Add a file descriptor to an event set and enable an oneshot
+ * write event notification method.
+ */
+ void (*oneshot_write)(nxt_event_engine_t *engine,
+ nxt_fd_event_t *ev);
+
+ /*
+ * Add a listening socket descriptor to an event set and enable
+ * a level-triggered read event notification method.
+ */
+ void (*enable_accept)(nxt_event_engine_t *engine,
+ nxt_fd_event_t *ev);
+
+ /*
+ * Add a file to an event set and enable a file change notification
+ * events.
+ */
+ void (*enable_file)(nxt_event_engine_t *engine,
+ nxt_event_file_t *fev);
+
+ /*
+ * Delete a file from an event set before closing the file descriptor.
+ */
+ void (*close_file)(nxt_event_engine_t *engine,
+ nxt_event_file_t *fev);
+
+ /*
+ * Enable post event notifications and set a post handler to handle
+ * the zero signal.
+ */
+ nxt_int_t (*enable_post)(nxt_event_engine_t *engine,
+ nxt_work_handler_t handler);
+
+ /*
+ * Signal an event set. If a signal number is non-zero then
+ * a signal handler added to the event set is called. This is
+ * a way to route Unix signals to an event engine if underlying
+ * event facility does not support signal events.
+ *
+ * If a signal number is zero, then the post_handler of the event
+ * set is called. This has no relation to Unix signals but is
+ * a way to wake up the event set to process works posted to
+ * the event engine locked work queue.
+ */
+ void (*signal)(nxt_event_engine_t *engine,
+ nxt_uint_t signo);
+
+ /* Poll an event set for new event notifications. */
+ void (*poll)(nxt_event_engine_t *engine,
+ nxt_msec_t timeout);
+
+ /* I/O operations suitable to underlying event facility. */
+ nxt_event_conn_io_t *io;
+
+ /* True if an event facility supports file change event notifications. */
+ uint8_t file_support; /* 1 bit */
+
+ /* True if an event facility supports signal event notifications. */
+ uint8_t signal_support; /* 1 bit */
+} nxt_event_interface_t;
+
+
+#if (NXT_HAVE_KQUEUE)
+
+typedef struct {
+ int fd;
+ int nchanges;
+ int mchanges;
+ int mevents;
+ nxt_pid_t pid;
+
+ nxt_work_handler_t post_handler;
+
+ struct kevent *changes;
+ struct kevent *events;
+} nxt_kqueue_engine_t;
+
+extern const nxt_event_interface_t nxt_kqueue_engine;
+
+#endif
+
+
+#if (NXT_HAVE_EPOLL)
+
+typedef struct {
+ int op;
+ struct epoll_event event;
+} nxt_epoll_change_t;
+
+
+typedef struct {
+ int fd;
+ uint32_t mode;
+ nxt_uint_t nchanges;
+ nxt_uint_t mchanges;
+ int mevents;
+
+ nxt_epoll_change_t *changes;
+ struct epoll_event *events;
+
+#if (NXT_HAVE_EVENTFD)
+ nxt_work_handler_t post_handler;
+ nxt_fd_event_t eventfd;
+ uint32_t neventfd;
+#endif
+
+#if (NXT_HAVE_SIGNALFD)
+ nxt_fd_event_t signalfd;
+#endif
+} nxt_epoll_engine_t;
+
+
+extern const nxt_event_interface_t nxt_epoll_edge_engine;
+extern const nxt_event_interface_t nxt_epoll_level_engine;
+
+#endif
+
+
+#if (NXT_HAVE_EVENTPORT)
+
+typedef struct {
+ int events;
+ nxt_fd_event_t *event;
+} nxt_eventport_change_t;
+
+
+typedef struct {
+ int fd;
+ nxt_uint_t nchanges;
+ nxt_uint_t mchanges;
+ u_int mevents;
+
+ nxt_eventport_change_t *changes;
+ port_event_t *events;
+
+ nxt_work_handler_t post_handler;
+ nxt_work_handler_t signal_handler;
+} nxt_eventport_engine_t;
+
+extern const nxt_event_interface_t nxt_eventport_engine;
+
+#endif
+
+
+#if (NXT_HAVE_DEVPOLL)
+
+typedef struct {
+ uint8_t op;
+ short events;
+ nxt_fd_event_t *event;
+} nxt_devpoll_change_t;
+
+
+typedef struct {
+ int fd;
+ int nchanges;
+ int mchanges;
+ int mevents;
+
+ nxt_devpoll_change_t *changes;
+ struct pollfd *write_changes;
+ struct pollfd *events;
+ nxt_lvlhsh_t fd_hash;
+} nxt_devpoll_engine_t;
+
+extern const nxt_event_interface_t nxt_devpoll_engine;
+
+#endif
+
+
+#if (NXT_HAVE_POLLSET)
+
+typedef struct {
+ uint8_t op;
+ uint8_t cmd;
+ short events;
+ nxt_fd_event_t *event;
+} nxt_pollset_change_t;
+
+
+typedef struct {
+ pollset_t ps;
+ int nchanges;
+ int mchanges;
+ int mevents;
+
+ nxt_pollset_change_t *changes;
+ struct poll_ctl *write_changes;
+ struct pollfd *events;
+ nxt_lvlhsh_t fd_hash;
+} nxt_pollset_engine_t;
+
+extern const nxt_event_interface_t nxt_pollset_engine;
+
+#endif
+
+
+typedef struct {
+ uint8_t op;
+ short events;
+ nxt_fd_event_t *event;
+} nxt_poll_change_t;
+
+
+typedef struct {
+ nxt_uint_t max_nfds;
+ nxt_uint_t nfds;
+
+ nxt_uint_t nchanges;
+ nxt_uint_t mchanges;
+
+ nxt_poll_change_t *changes;
+ struct pollfd *set;
+
+ nxt_lvlhsh_t fd_hash;
+} nxt_poll_engine_t;
+
+extern const nxt_event_interface_t nxt_poll_engine;
+
+
+typedef struct {
+ int nfds;
+ uint32_t update_nfds; /* 1 bit */
+
+ nxt_fd_event_t **events;
+
+ fd_set main_read_fd_set;
+ fd_set main_write_fd_set;
+ fd_set work_read_fd_set;
+ fd_set work_write_fd_set;
+} nxt_select_engine_t;
+
+extern const nxt_event_interface_t nxt_select_engine;
+
+
+nxt_int_t nxt_fd_event_hash_add(nxt_lvlhsh_t *lvlhsh, nxt_fd_t fd,
+ nxt_fd_event_t *ev);
+void *nxt_fd_event_hash_get(nxt_task_t *task, nxt_lvlhsh_t *lvlhsh,
+ nxt_fd_t fd);
+void nxt_fd_event_hash_delete(nxt_task_t *task, nxt_lvlhsh_t *lvlhsh,
+ nxt_fd_t fd, nxt_bool_t ignore);
+void nxt_fd_event_hash_destroy(nxt_lvlhsh_t *lvlhsh);
+
+
+#define \
+nxt_fd_event_disable(engine, ev) \
+ (engine)->event.disable(engine, ev)
+
+
+#define \
+nxt_fd_event_close(engine, ev) \
+ (engine)->event.close(engine, ev)
+
+
+#define \
+nxt_fd_event_enable_read(engine, ev) \
+ (engine)->event.enable_read(engine, ev)
+
+
+#define \
+nxt_fd_event_enable_write(engine, ev) \
+ (engine)->event.enable_write(engine, ev)
+
+
+#define \
+nxt_fd_event_disable_read(engine, ev) \
+ (engine)->event.disable_read(engine, ev)
+
+
+#define \
+nxt_fd_event_disable_write(engine, ev) \
+ (engine)->event.disable_write(engine, ev)
+
+
+#define \
+nxt_fd_event_block_read(engine, ev) \
+ do { \
+ if (nxt_fd_event_is_active((ev)->read)) { \
+ (engine)->event.block_read(engine, ev); \
+ } \
+ } while (0)
+
+
+#define \
+nxt_fd_event_block_write(engine, ev) \
+ do { \
+ if (nxt_fd_event_is_active((ev)->write)) { \
+ (engine)->event.block_write(engine, ev); \
+ } \
+ } while (0)
+
+
+#define \
+nxt_fd_event_oneshot_read(engine, ev) \
+ (engine)->event.oneshot_read(engine, ev)
+
+
+#define \
+nxt_fd_event_oneshot_write(engine, ev) \
+ (engine)->event.oneshot_write(engine, ev)
+
+
+#define \
+nxt_fd_event_enable_accept(engine, ev) \
+ (engine)->event.enable_accept(engine, ev)
+
#define NXT_ENGINE_FIBERS 1
typedef struct {
nxt_fd_t fds[2];
- nxt_event_fd_t event;
+ nxt_fd_event_t event;
} nxt_event_engine_pipe_t;
struct nxt_event_engine_s {
- const nxt_event_set_ops_t *event;
- nxt_event_set_t *event_set;
+ nxt_task_t task;
- nxt_timers_t timers;
+ union {
+ nxt_poll_engine_t poll;
+ nxt_select_engine_t select;
- nxt_task_t task;
- /* The engine ID, the main engine has ID 0. */
- uint32_t id;
+#if (NXT_HAVE_KQUEUE)
+ nxt_kqueue_engine_t kqueue;
+#endif
+#if (NXT_HAVE_EPOLL)
+ nxt_epoll_engine_t epoll;
+#endif
+#if (NXT_HAVE_EVENTPORT)
+ nxt_eventport_engine_t eventport;
+#endif
+#if (NXT_HAVE_DEVPOLL)
+ nxt_devpoll_engine_t devpoll;
+#endif
+#if (NXT_HAVE_POLLSET)
+ nxt_pollset_engine_t pollset;
+#endif
+ } u;
- /*
- * A pipe to pass event signals to the engine, if the engine's
- * underlying event facility does not support user events.
- */
- nxt_event_engine_pipe_t *pipe;
+ nxt_timers_t timers;
nxt_work_queue_cache_t work_queue_cache;
nxt_work_queue_t *current_work_queue;
@@ -43,15 +457,24 @@ struct nxt_event_engine_s {
nxt_work_queue_t write_work_queue;
nxt_work_queue_t shutdown_work_queue;
nxt_work_queue_t close_work_queue;
- nxt_work_queue_t final_work_queue;
nxt_locked_work_queue_t locked_work_queue;
+ nxt_event_interface_t event;
+
+ /*
+ * A pipe to pass event signals to the engine, if the engine's
+ * underlying event facility does not support user events.
+ */
+ nxt_event_engine_pipe_t *pipe;
+
nxt_event_signals_t *signals;
- nxt_thread_t *thread;
nxt_fiber_main_t *fibers;
+ /* The engine ID, the main engine has ID 0. */
+ uint32_t id;
+
uint8_t shutdown; /* 1 bit */
uint32_t batch;
@@ -64,10 +487,10 @@ struct nxt_event_engine_s {
NXT_EXPORT nxt_event_engine_t *nxt_event_engine_create(nxt_thread_t *thr,
- const nxt_event_set_ops_t *event_set, const nxt_event_sig_t *signals,
+ const nxt_event_interface_t *interface, const nxt_sig_event_t *signals,
nxt_uint_t flags, nxt_uint_t batch);
NXT_EXPORT nxt_int_t nxt_event_engine_change(nxt_thread_t *thr,
- nxt_task_t *task, const nxt_event_set_ops_t *event_set, nxt_uint_t batch);
+ nxt_task_t *task, const nxt_event_interface_t *interface, nxt_uint_t batch);
NXT_EXPORT void nxt_event_engine_free(nxt_event_engine_t *engine);
NXT_EXPORT void nxt_event_engine_start(nxt_event_engine_t *engine);