1
0
mirror of https://github.com/vlang/v.git synced 2023-08-10 21:13:21 +03:00

os.notify: implement the kqueue backend for notify.FdNotifier (#19057)

This commit is contained in:
shove 2023-08-05 12:11:07 +08:00 committed by GitHub
parent 4cf8328f71
commit 301320f4b0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 272 additions and 9 deletions

View File

@ -0,0 +1,230 @@
module notify
import time
import os
#insert "@VEXEROOT/vlib/os/notify/kqueue.h"
struct C.kevent {
mut:
ident u32
filter i16
flags u16
fflags u32
data int
udata voidptr
}
fn C.kqueue() int
fn C.__kevent__(int, voidptr, int, voidptr, int, voidptr) int
fn C.EV_SET(voidptr, u32, i16, u16, u32, int, voidptr)
// KqueueNotifier provides methods that implement FdNotifier using the
// kqueue I/O event notification facility (macos, freeBSD, xxxBSD...unix only)
[noinit]
struct KqueueNotifier {
kqueue_fd int
}
// KqueueEvent describes an event that occurred for a file descriptor in
// the watch list
[noinit]
struct KqueueEvent {
pub:
fd int
kind FdEventType
}
// new creates a new KqueueNotifier
// The FdNotifier interface is returned to allow OS specific
// implementations without exposing the concrete type
pub fn new() !FdNotifier {
fd := C.kqueue()
if fd == -1 {
return error(os.posix_get_error_msg(C.errno))
}
// Needed to circumvent V limitations
x := &KqueueNotifier{
kqueue_fd: fd
}
return x
}
const (
// filter types
kqueue_read = i16(C.EVFILT_READ)
kqueue_write = i16(C.EVFILT_WRITE)
kqueue_aio = i16(C.EVFILT_AIO)
kqueue_vnode = i16(C.EVFILT_VNODE)
kqueue_proc = i16(C.EVFILT_PROC)
kqueue_signal = i16(C.EVFILT_SIGNAL)
kqueue_timer = i16(C.EVFILT_TIMER)
kqueue_machport = i16(C.EVFILT_MACHPORT)
kqueue_fs = i16(C.EVFILT_FS)
kqueue_user = i16(C.EVFILT_USER)
kqueue_vm = i16(C.EVFILT_VM)
kqueue_exception = i16(C.EVFILT_EXCEPT)
kqueue_syscount = i16(C.EVFILT_SYSCOUNT)
// actions
kqueue_add = u16(C.EV_ADD)
kqueue_delete = u16(C.EV_DELETE)
kqueue_enable = u16(C.EV_ENABLE)
kqueue_disable = u16(C.EV_DISABLE)
// flags
kqueue_oneshot = u16(C.EV_ONESHOT)
kqueue_edge_trigger = u16(C.EV_CLEAR) // kqueue_clear
kqueue_receipt = u16(C.EV_RECEIPT)
kqueue_dispatch = u16(C.EV_DISPATCH)
kqueue_udata_specific = u16(C.EV_UDATA_SPECIFIC)
kqueue_dispatch2 = u16(C.EV_DISPATCH | C.EV_UDATA_SPECIFIC)
kqueue_vanished = u16(C.EV_VANISHED)
kqueue_sysflags = u16(C.EV_SYSFLAGS)
kqueue_flag0 = u16(C.EV_FLAG0)
kqueue_flag1 = u16(C.EV_FLAG1)
// returned values
kqueue_eof = u16(C.EV_EOF)
kqueue_error = u16(C.EV_ERROR)
)
// ctl is a helper method for add, modify, and remove
fn (mut kn KqueueNotifier) ctl(fd int, filter i16, flags u16) ! {
event := [1]C.kevent{}
C.EV_SET(&event[0], fd, filter, flags, 0, 0, unsafe { nil })
if C.__kevent__(kn.kqueue_fd, &event[0], 1, unsafe { nil }, 0, unsafe { nil }) == -1 {
return error(os.posix_get_error_msg(C.errno))
}
}
// add adds a file descriptor to the watch list
fn (mut kn KqueueNotifier) add(fd int, events FdEventType, conf ...FdConfigFlags) ! {
filter := filter_to_mask(events)
flags := flags_to_mask(...conf)
kn.ctl(fd, filter, flags)!
}
// modify sets an existing entry in the watch list to the provided events and configuration
fn (mut kn KqueueNotifier) modify(fd int, events FdEventType, conf ...FdConfigFlags) ! {
kn.add(fd, events, ...conf)!
}
// remove removes a file descriptor from the watch list
fn (mut kn KqueueNotifier) remove(fd int) ! {
filter := notify.kqueue_read | notify.kqueue_write | notify.kqueue_exception
flags := notify.kqueue_delete
kn.ctl(fd, filter, flags)!
}
// wait waits to be notified of events on the watch list,
// returns at most 512 events
fn (mut kn KqueueNotifier) wait(timeout time.Duration) []FdEvent {
// arbitrary 512 limit; events will round robin on successive
// waits if the number exceeds this
// NOTE: we use a fixed size array here for stack allocation; this has
// the added bonus of making KqueueNotifier thread safe
events := [512]C.kevent{}
// populate events with the new events
to := &C.timespec{0, timeout.nanoseconds()}
count := C.__kevent__(kn.kqueue_fd, unsafe { nil }, 0, &events[0], events.len, to)
if count > 0 {
mut arr := []FdEvent{cap: count}
for i := 0; i < count; i++ {
fd := int(events[i].ident)
kind := event_mask_to_flag(events[i].filter, events[i].flags)
if kind.is_empty() {
// NOTE: tcc only reports the first event for some
// reason, leaving subsequent structs in the array as 0
// (or possibly garbage)
panic('encountered an empty event kind; this is most likely due to using tcc')
}
arr << &KqueueEvent{
fd: fd
kind: kind
}
}
return arr
}
return []
}
// close closes the KqueueNotifier,
// any successive calls to add, modify, remove, and wait should fail
fn (mut kn KqueueNotifier) close() ! {
if C.close(kn.kqueue_fd) == -1 {
return error(os.posix_get_error_msg(C.errno))
}
}
// event_mask_to_flag is a helper function that converts a bitmask
// returned by kevent() wait to FdEventType
fn event_mask_to_flag(filter i16, flags u16) FdEventType {
mut res := FdEventType.read
if filter & notify.kqueue_read != 0 {
res.set(.read)
}
if filter & notify.kqueue_write != 0 {
res.set(.write)
}
if filter & notify.kqueue_exception != 0 {
res.set(.exception)
}
if flags & notify.kqueue_eof != 0 {
res.set(.hangup)
}
if flags & notify.kqueue_error != 0 {
res.set(.error)
}
return res
}
// filter_to_mask is a helper function that converts FdEventType
// to a bitmask used by the C functions
fn filter_to_mask(events FdEventType) i16 {
mut mask := i16(0)
if events.has(.read) {
mask |= notify.kqueue_read
}
if events.has(.write) {
mask |= notify.kqueue_write
}
if events.has(.exception) {
mask |= notify.kqueue_exception
}
if events.has(.peer_hangup) {
panic("Kqueue does not support 'peer_hangup' event type.")
}
if events.has(.error) {
panic("Kqueue does not support 'error' event type.")
}
if events.has(.hangup) {
panic("Kqueue does not support 'hangup' event type.")
}
return mask
}
// flags_to_mask is a helper function that converts FdConfigFlags
// to a bitmask used by the C functions
fn flags_to_mask(confs ...FdConfigFlags) u16 {
mut mask := notify.kqueue_add | notify.kqueue_enable
for conf in confs {
if conf.has(.edge_trigger) {
mask |= notify.kqueue_edge_trigger
}
if conf.has(.one_shot) {
mask |= notify.kqueue_oneshot
}
if conf.has(.wake_up) {
panic("Kqueue does not support 'wake_up' flag.")
}
if conf.has(.exclusive) {
panic("Kqueue does not support 'exclusive' flag.")
}
}
return mask
}

12
vlib/os/notify/kqueue.h Normal file
View File

@ -0,0 +1,12 @@
#ifndef __KQUEUE_H
#define __KQUEUE_H
#include <sys/event.h>
// Due to the renaming of 'struct kevent' and function 'kevent',
// they are wrapped here to avoid conflicts.
int __kevent__(int handle, const struct kevent* changelist, int nchanges, struct kevent* eventlist, int nevents, const struct timespec* timeout) {
return kevent(handle, changelist, nchanges, eventlist, nevents, timeout);
}
#endif

View File

@ -5,7 +5,7 @@ import os.notify
// make a pipe and return the (read, write) file descriptors // make a pipe and return the (read, write) file descriptors
fn make_pipe() !(int, int) { fn make_pipe() !(int, int) {
$if linux { $if linux || macos {
pipefd := [2]int{} pipefd := [2]int{}
if C.pipe(&pipefd[0]) != 0 { if C.pipe(&pipefd[0]) != 0 {
return error('error ${C.errno}: ' + os.posix_get_error_msg(C.errno)) return error('error ${C.errno}: ' + os.posix_get_error_msg(C.errno))
@ -16,8 +16,8 @@ fn make_pipe() !(int, int) {
} }
fn test_level_trigger() { fn test_level_trigger() {
// currently only linux is supported // currently only linux and macos are supported
$if linux { $if linux || macos {
mut notifier := notify.new()! mut notifier := notify.new()!
reader, writer := make_pipe()! reader, writer := make_pipe()!
defer { defer {
@ -37,8 +37,8 @@ fn test_level_trigger() {
} }
fn test_edge_trigger() { fn test_edge_trigger() {
// currently only linux is supported // currently only linux and macos are supported
$if linux { $if linux || macos {
mut notifier := notify.new()! mut notifier := notify.new()!
reader, writer := make_pipe()! reader, writer := make_pipe()!
defer { defer {
@ -53,7 +53,27 @@ fn test_edge_trigger() {
os.fd_write(writer, 'foobar') os.fd_write(writer, 'foobar')
check_read_event(mut n, reader, 'foo') check_read_event(mut n, reader, 'foo')
assert notifier.wait(0).len == 0 $if linux {
assert notifier.wait(0).len == 0
}
$if macos {
/*
In the kqueue of macos, EV_CLEAR flag represents a clear event,
which is mainly used for pipeline and socket class events. When this flag is set,
kqueue will trigger the corresponding event when the data is readable or writable,
but it is not guaranteed that the event will only be triggered once.
Compared to EPOLLET, EV_CLEAR's behavior varies. In epoll, the edge triggered mode only triggers
an event once when the state changes from unreadable/non writable to readable/writable,
that is, when the data changes from unreadable to readable,
or when the data changes from unreadable to writable. In the kqueue of macos,
EV_CLEAR does not possess this precise edge triggering behavior.
Therefore, in the kqueue of macos, even if the data is not completely read,
it is possible to continue triggering read events. This means that if you don't process all the data,
the next kqueue event notification may still be triggered
*/
// notifier.wait(0).len == 1 or 0
}
os.fd_write(writer, 'baz') os.fd_write(writer, 'baz')
// we do not get an event because there is still data // we do not get an event because there is still data
@ -65,7 +85,7 @@ fn test_edge_trigger() {
} }
fn test_one_shot() { fn test_one_shot() {
$if linux { $if linux || macos {
mut notifier := notify.new()! mut notifier := notify.new()!
reader, writer := make_pipe()! reader, writer := make_pipe()!
defer { defer {
@ -89,6 +109,7 @@ fn test_one_shot() {
} }
} }
// Kqueue does not support 'hangup' event type.
fn test_hangup() { fn test_hangup() {
$if linux { $if linux {
mut notifier := notify.new()! mut notifier := notify.new()!
@ -112,7 +133,7 @@ fn test_hangup() {
} }
fn test_write() { fn test_write() {
$if linux { $if linux || macos {
mut notifier := notify.new()! mut notifier := notify.new()!
reader, writer := make_pipe()! reader, writer := make_pipe()!
defer { defer {
@ -133,7 +154,7 @@ fn test_write() {
} }
fn test_remove() { fn test_remove() {
$if linux { $if linux || macos {
mut notifier := notify.new()! mut notifier := notify.new()!
reader, writer := make_pipe()! reader, writer := make_pipe()!
defer { defer {