diff options
Diffstat (limited to '')
-rw-r--r-- | src/nxt_event_engine.h | 455 |
1 files changed, 439 insertions, 16 deletions
diff --git a/src/nxt_event_engine.h b/src/nxt_event_engine.h index 76382e34..c34f8cd7 100644 --- a/src/nxt_event_engine.h +++ b/src/nxt_event_engine.h @@ -7,31 +7,445 @@ #ifndef _NXT_EVENT_ENGINE_H_INCLUDED_ #define _NXT_EVENT_ENGINE_H_INCLUDED_ +/* + * An event interface is kernel interface such as kqueue, epoll, etc. + * intended to get event notifications about file descriptor state, + * signals, etc. + */ + +#define NXT_FILE_EVENTS 1 +#define NXT_NO_FILE_EVENTS 0 + +#define NXT_SIGNAL_EVENTS 1 +#define NXT_NO_SIGNAL_EVENTS 0 + + +typedef struct { + + /* The canonical event set name. */ + const char *name; + + /* + * Create an event set. The mchanges argument is a maximum number of + * changes to send to the kernel. The mevents argument is a maximum + * number of events to retrieve from the kernel at once, if underlying + * event facility supports batch operations. + */ + nxt_int_t (*create)(nxt_event_engine_t *engine, + nxt_uint_t mchanges, nxt_uint_t mevents); + + /* Close and free an event set. */ + void (*free)(nxt_event_engine_t *engine); + + /* + * Add a file descriptor to an event set and enable the most + * effective read and write event notification method provided + * by underlying event facility. + */ + void (*enable)(nxt_event_engine_t *engine, + nxt_fd_event_t *ev); + + /* Disable file descriptor event notifications. */ + void (*disable)(nxt_event_engine_t *engine, + nxt_fd_event_t *ev); + + /* + * Delete a file descriptor from an event set. A possible usage + * is a moving of the file descriptor from one event set to another. + */ + void (*delete)(nxt_event_engine_t *engine, + nxt_fd_event_t *ev); + + /* + * Delete a file descriptor from an event set before closing the + * file descriptor. The most event facilities such as Linux epoll, + * BSD kqueue, Solaris event ports, AIX pollset, and HP-UX /dev/poll + * delete a file descriptor automatically on the file descriptor close. + * Some facilities such as Solaris /dev/poll require to delete a file + * descriptor explicitly. + */ + nxt_bool_t (*close)(nxt_event_engine_t *engine, + nxt_fd_event_t *ev); + + /* + * Add a file descriptor to an event set and enable the most effective + * read event notification method provided by underlying event facility. + */ + void (*enable_read)(nxt_event_engine_t *engine, + nxt_fd_event_t *ev); + + /* + * Add a file descriptor to an event set and enable the most effective + * write event notification method provided by underlying event facility. + */ + void (*enable_write)(nxt_event_engine_t *engine, + nxt_fd_event_t *ev); + + /* Disable file descriptor read event notifications. */ + void (*disable_read)(nxt_event_engine_t *engine, + nxt_fd_event_t *ev); + + /* Disable file descriptor write event notifications. */ + void (*disable_write)(nxt_event_engine_t *engine, + nxt_fd_event_t *ev); + + /* Block file descriptor read event notifications. */ + void (*block_read)(nxt_event_engine_t *engine, + nxt_fd_event_t *ev); + + /* Block file descriptor write event notifications. */ + void (*block_write)(nxt_event_engine_t *engine, + nxt_fd_event_t *ev); + + /* + * Add a file descriptor to an event set and enable an oneshot + * read event notification method. + */ + void (*oneshot_read)(nxt_event_engine_t *engine, + nxt_fd_event_t *ev); + + /* + * Add a file descriptor to an event set and enable an oneshot + * write event notification method. + */ + void (*oneshot_write)(nxt_event_engine_t *engine, + nxt_fd_event_t *ev); + + /* + * Add a listening socket descriptor to an event set and enable + * a level-triggered read event notification method. + */ + void (*enable_accept)(nxt_event_engine_t *engine, + nxt_fd_event_t *ev); + + /* + * Add a file to an event set and enable a file change notification + * events. + */ + void (*enable_file)(nxt_event_engine_t *engine, + nxt_event_file_t *fev); + + /* + * Delete a file from an event set before closing the file descriptor. + */ + void (*close_file)(nxt_event_engine_t *engine, + nxt_event_file_t *fev); + + /* + * Enable post event notifications and set a post handler to handle + * the zero signal. + */ + nxt_int_t (*enable_post)(nxt_event_engine_t *engine, + nxt_work_handler_t handler); + + /* + * Signal an event set. If a signal number is non-zero then + * a signal handler added to the event set is called. This is + * a way to route Unix signals to an event engine if underlying + * event facility does not support signal events. + * + * If a signal number is zero, then the post_handler of the event + * set is called. This has no relation to Unix signals but is + * a way to wake up the event set to process works posted to + * the event engine locked work queue. + */ + void (*signal)(nxt_event_engine_t *engine, + nxt_uint_t signo); + + /* Poll an event set for new event notifications. */ + void (*poll)(nxt_event_engine_t *engine, + nxt_msec_t timeout); + + /* I/O operations suitable to underlying event facility. */ + nxt_event_conn_io_t *io; + + /* True if an event facility supports file change event notifications. */ + uint8_t file_support; /* 1 bit */ + + /* True if an event facility supports signal event notifications. */ + uint8_t signal_support; /* 1 bit */ +} nxt_event_interface_t; + + +#if (NXT_HAVE_KQUEUE) + +typedef struct { + int fd; + int nchanges; + int mchanges; + int mevents; + nxt_pid_t pid; + + nxt_work_handler_t post_handler; + + struct kevent *changes; + struct kevent *events; +} nxt_kqueue_engine_t; + +extern const nxt_event_interface_t nxt_kqueue_engine; + +#endif + + +#if (NXT_HAVE_EPOLL) + +typedef struct { + int op; + struct epoll_event event; +} nxt_epoll_change_t; + + +typedef struct { + int fd; + uint32_t mode; + nxt_uint_t nchanges; + nxt_uint_t mchanges; + int mevents; + + nxt_epoll_change_t *changes; + struct epoll_event *events; + +#if (NXT_HAVE_EVENTFD) + nxt_work_handler_t post_handler; + nxt_fd_event_t eventfd; + uint32_t neventfd; +#endif + +#if (NXT_HAVE_SIGNALFD) + nxt_fd_event_t signalfd; +#endif +} nxt_epoll_engine_t; + + +extern const nxt_event_interface_t nxt_epoll_edge_engine; +extern const nxt_event_interface_t nxt_epoll_level_engine; + +#endif + + +#if (NXT_HAVE_EVENTPORT) + +typedef struct { + int events; + nxt_fd_event_t *event; +} nxt_eventport_change_t; + + +typedef struct { + int fd; + nxt_uint_t nchanges; + nxt_uint_t mchanges; + u_int mevents; + + nxt_eventport_change_t *changes; + port_event_t *events; + + nxt_work_handler_t post_handler; + nxt_work_handler_t signal_handler; +} nxt_eventport_engine_t; + +extern const nxt_event_interface_t nxt_eventport_engine; + +#endif + + +#if (NXT_HAVE_DEVPOLL) + +typedef struct { + uint8_t op; + short events; + nxt_fd_event_t *event; +} nxt_devpoll_change_t; + + +typedef struct { + int fd; + int nchanges; + int mchanges; + int mevents; + + nxt_devpoll_change_t *changes; + struct pollfd *write_changes; + struct pollfd *events; + nxt_lvlhsh_t fd_hash; +} nxt_devpoll_engine_t; + +extern const nxt_event_interface_t nxt_devpoll_engine; + +#endif + + +#if (NXT_HAVE_POLLSET) + +typedef struct { + uint8_t op; + uint8_t cmd; + short events; + nxt_fd_event_t *event; +} nxt_pollset_change_t; + + +typedef struct { + pollset_t ps; + int nchanges; + int mchanges; + int mevents; + + nxt_pollset_change_t *changes; + struct poll_ctl *write_changes; + struct pollfd *events; + nxt_lvlhsh_t fd_hash; +} nxt_pollset_engine_t; + +extern const nxt_event_interface_t nxt_pollset_engine; + +#endif + + +typedef struct { + uint8_t op; + short events; + nxt_fd_event_t *event; +} nxt_poll_change_t; + + +typedef struct { + nxt_uint_t max_nfds; + nxt_uint_t nfds; + + nxt_uint_t nchanges; + nxt_uint_t mchanges; + + nxt_poll_change_t *changes; + struct pollfd *set; + + nxt_lvlhsh_t fd_hash; +} nxt_poll_engine_t; + +extern const nxt_event_interface_t nxt_poll_engine; + + +typedef struct { + int nfds; + uint32_t update_nfds; /* 1 bit */ + + nxt_fd_event_t **events; + + fd_set main_read_fd_set; + fd_set main_write_fd_set; + fd_set work_read_fd_set; + fd_set work_write_fd_set; +} nxt_select_engine_t; + +extern const nxt_event_interface_t nxt_select_engine; + + +nxt_int_t nxt_fd_event_hash_add(nxt_lvlhsh_t *lvlhsh, nxt_fd_t fd, + nxt_fd_event_t *ev); +void *nxt_fd_event_hash_get(nxt_task_t *task, nxt_lvlhsh_t *lvlhsh, + nxt_fd_t fd); +void nxt_fd_event_hash_delete(nxt_task_t *task, nxt_lvlhsh_t *lvlhsh, + nxt_fd_t fd, nxt_bool_t ignore); +void nxt_fd_event_hash_destroy(nxt_lvlhsh_t *lvlhsh); + + +#define \ +nxt_fd_event_disable(engine, ev) \ + (engine)->event.disable(engine, ev) + + +#define \ +nxt_fd_event_close(engine, ev) \ + (engine)->event.close(engine, ev) + + +#define \ +nxt_fd_event_enable_read(engine, ev) \ + (engine)->event.enable_read(engine, ev) + + +#define \ +nxt_fd_event_enable_write(engine, ev) \ + (engine)->event.enable_write(engine, ev) + + +#define \ +nxt_fd_event_disable_read(engine, ev) \ + (engine)->event.disable_read(engine, ev) + + +#define \ +nxt_fd_event_disable_write(engine, ev) \ + (engine)->event.disable_write(engine, ev) + + +#define \ +nxt_fd_event_block_read(engine, ev) \ + do { \ + if (nxt_fd_event_is_active((ev)->read)) { \ + (engine)->event.block_read(engine, ev); \ + } \ + } while (0) + + +#define \ +nxt_fd_event_block_write(engine, ev) \ + do { \ + if (nxt_fd_event_is_active((ev)->write)) { \ + (engine)->event.block_write(engine, ev); \ + } \ + } while (0) + + +#define \ +nxt_fd_event_oneshot_read(engine, ev) \ + (engine)->event.oneshot_read(engine, ev) + + +#define \ +nxt_fd_event_oneshot_write(engine, ev) \ + (engine)->event.oneshot_write(engine, ev) + + +#define \ +nxt_fd_event_enable_accept(engine, ev) \ + (engine)->event.enable_accept(engine, ev) + #define NXT_ENGINE_FIBERS 1 typedef struct { nxt_fd_t fds[2]; - nxt_event_fd_t event; + nxt_fd_event_t event; } nxt_event_engine_pipe_t; struct nxt_event_engine_s { - const nxt_event_set_ops_t *event; - nxt_event_set_t *event_set; + nxt_task_t task; - nxt_timers_t timers; + union { + nxt_poll_engine_t poll; + nxt_select_engine_t select; - nxt_task_t task; - /* The engine ID, the main engine has ID 0. */ - uint32_t id; +#if (NXT_HAVE_KQUEUE) + nxt_kqueue_engine_t kqueue; +#endif +#if (NXT_HAVE_EPOLL) + nxt_epoll_engine_t epoll; +#endif +#if (NXT_HAVE_EVENTPORT) + nxt_eventport_engine_t eventport; +#endif +#if (NXT_HAVE_DEVPOLL) + nxt_devpoll_engine_t devpoll; +#endif +#if (NXT_HAVE_POLLSET) + nxt_pollset_engine_t pollset; +#endif + } u; - /* - * A pipe to pass event signals to the engine, if the engine's - * underlying event facility does not support user events. - */ - nxt_event_engine_pipe_t *pipe; + nxt_timers_t timers; nxt_work_queue_cache_t work_queue_cache; nxt_work_queue_t *current_work_queue; @@ -43,15 +457,24 @@ struct nxt_event_engine_s { nxt_work_queue_t write_work_queue; nxt_work_queue_t shutdown_work_queue; nxt_work_queue_t close_work_queue; - nxt_work_queue_t final_work_queue; nxt_locked_work_queue_t locked_work_queue; + nxt_event_interface_t event; + + /* + * A pipe to pass event signals to the engine, if the engine's + * underlying event facility does not support user events. + */ + nxt_event_engine_pipe_t *pipe; + nxt_event_signals_t *signals; - nxt_thread_t *thread; nxt_fiber_main_t *fibers; + /* The engine ID, the main engine has ID 0. */ + uint32_t id; + uint8_t shutdown; /* 1 bit */ uint32_t batch; @@ -64,10 +487,10 @@ struct nxt_event_engine_s { NXT_EXPORT nxt_event_engine_t *nxt_event_engine_create(nxt_thread_t *thr, - const nxt_event_set_ops_t *event_set, const nxt_event_sig_t *signals, + const nxt_event_interface_t *interface, const nxt_sig_event_t *signals, nxt_uint_t flags, nxt_uint_t batch); NXT_EXPORT nxt_int_t nxt_event_engine_change(nxt_thread_t *thr, - nxt_task_t *task, const nxt_event_set_ops_t *event_set, nxt_uint_t batch); + nxt_task_t *task, const nxt_event_interface_t *interface, nxt_uint_t batch); NXT_EXPORT void nxt_event_engine_free(nxt_event_engine_t *engine); NXT_EXPORT void nxt_event_engine_start(nxt_event_engine_t *engine); |