From: Eric Wong <e@80x24.org>
To: meta@public-inbox.org
Subject: [PATCH 21/57] ds: get rid of event_watch field
Date: Mon, 24 Jun 2019 02:52:22 +0000 [thread overview]
Message-ID: <20190624025258.25592-22-e@80x24.org> (raw)
In-Reply-To: <20190624025258.25592-1-e@80x24.org>
We don't need to keep track of that field since we always
know what events we're interested in when using one-shot
wakeups.
---
lib/PublicInbox/DS.pm | 64 ++++++++++++----------------------
lib/PublicInbox/EvCleanup.pm | 4 +--
lib/PublicInbox/HTTP.pm | 13 +++----
lib/PublicInbox/HTTPD/Async.pm | 10 +++---
lib/PublicInbox/NNTP.pm | 25 +++++--------
lib/PublicInbox/Syscall.pm | 6 ++--
6 files changed, 50 insertions(+), 72 deletions(-)
diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm
index 9c801214..f5986e55 100644
--- a/lib/PublicInbox/DS.pm
+++ b/lib/PublicInbox/DS.pm
@@ -29,8 +29,6 @@ use PublicInbox::Syscall qw(:epoll);
use fields ('sock', # underlying socket
'wbuf', # arrayref of coderefs or GLOB refs
'wbuf_off', # offset into first element of wbuf to start writing at
- 'event_watch', # bitmask of events the client is interested in
- # (EPOLLIN,OUT,etc.)
);
use Errno qw(EAGAIN EINVAL);
@@ -318,6 +316,17 @@ sub PostEventLoop {
return $keep_running;
}
+# map EPOLL* bits to kqueue EV_* flags for EV_SET
+sub kq_flag ($$) {
+ my ($bit, $ev) = @_;
+ if ($ev & $bit) {
+ my $fl = EV_ADD() | EV_ENABLE();
+ ($ev & EPOLLONESHOT) ? ($fl|EV_ONESHOT()) : $fl;
+ } else {
+ EV_DISABLE();
+ }
+}
+
#####################################################################
### PublicInbox::DS-the-object code
#####################################################################
@@ -344,25 +353,21 @@ sub new {
Carp::cluck("undef sock and/or fd in PublicInbox::DS->new. sock=" . ($sock || "") . ", fd=" . ($fd || ""))
unless $sock && $fd;
- $self->{event_watch} = $ev;
-
_InitPoller();
if ($HaveEpoll) {
retry:
if (epoll_ctl($Epoll, EPOLL_CTL_ADD, $fd, $ev)) {
if ($! == EINVAL && ($ev & EPOLLEXCLUSIVE)) {
- $self->{event_watch} = ($ev &= ~EPOLLEXCLUSIVE);
+ $ev &= ~EPOLLEXCLUSIVE;
goto retry;
}
die "couldn't add epoll watch for $fd: $!\n";
}
}
elsif ($HaveKQueue) {
- my $f = $ev & EPOLLIN ? EV_ENABLE() : EV_DISABLE();
- $KQueue->EV_SET($fd, EVFILT_READ(), EV_ADD() | $f);
- $f = $ev & EPOLLOUT ? EV_ENABLE() : EV_DISABLE();
- $KQueue->EV_SET($fd, EVFILT_WRITE(), EV_ADD() | $f);
+ $KQueue->EV_SET($fd, EVFILT_READ(), EV_ADD() | kq_flag(EPOLLIN, $ev));
+ $KQueue->EV_SET($fd, EVFILT_WRITE(), EV_ADD() | kq_flag(EPOLLOUT, $ev));
}
Carp::cluck("PublicInbox::DS::new blowing away existing descriptor map for fd=$fd ($DescriptorMap{$fd})")
@@ -454,7 +459,7 @@ next_buf:
}
} elsif ($! == EAGAIN) {
$self->{wbuf_off} = $off;
- watch_write($self, 1);
+ watch($self, EPOLLOUT|EPOLLONESHOT);
return 0;
} else {
return $self->close;
@@ -467,7 +472,6 @@ next_buf:
} # while @$wbuf
delete $self->{wbuf};
- $self->watch_write(0);
1; # all done
}
@@ -544,7 +548,7 @@ sub write {
return $self->close;
}
$self->{wbuf} = [ tmpbuf($bref, $written) ];
- watch_write($self, 1);
+ watch($self, EPOLLOUT|EPOLLONESHOT);
return 0;
}
}
@@ -563,49 +567,27 @@ sub msg_more ($$) {
# queue up the unwritten substring:
$self->{wbuf} = [ tmpbuf(\($_[1]), $n) ];
- watch_write($self, 1);
+ watch($self, EPOLLOUT|EPOLLONESHOT);
return 0;
}
}
$self->write(\($_[1]));
}
-sub watch_chg ($$$) {
- my ($self, $bits, $set) = @_;
+sub watch ($$) {
+ my ($self, $ev) = @_;
my $sock = $self->{sock} or return;
- my $cur = $self->{event_watch};
- my $changes = $cur;
- if ($set) {
- $changes |= $bits;
- } else {
- $changes &= ~$bits;
- }
- return if $changes == $cur;
my $fd = fileno($sock);
if ($HaveEpoll) {
- epoll_ctl($Epoll, EPOLL_CTL_MOD, $fd, $changes) and
+ epoll_ctl($Epoll, EPOLL_CTL_MOD, $fd, $ev) and
confess("EPOLL_CTL_MOD $!");
} elsif ($HaveKQueue) {
- my $flag = $set ? EV_ENABLE() : EV_DISABLE();
- $KQueue->EV_SET($fd, EVFILT_READ(), $flag) if $bits & EPOLLIN;
- $KQueue->EV_SET($fd, EVFILT_WRITE(), $flag) if $bits & EPOLLOUT;
+ $KQueue->EV_SET($fd, EVFILT_READ(), kq_flag(EPOLLIN, $ev));
+ $KQueue->EV_SET($fd, EVFILT_WRITE(), kq_flag(EPOLLOUT, $ev));
}
- $self->{event_watch} = $changes;
}
-=head2 C<< $obj->watch_read( $boolean ) >>
-
-Turn 'readable' event notification on or off.
-
-=cut
-sub watch_read ($$) { watch_chg($_[0], EPOLLIN, $_[1]) };
-
-=head2 C<< $obj->watch_write( $boolean ) >>
-
-Turn 'writable' event notification on or off.
-
-=cut
-sub watch_write ($$) { watch_chg($_[0], EPOLLOUT, $_[1]) };
+sub watch_in1 ($) { watch($_[0], EPOLLIN | EPOLLONESHOT) }
package PublicInbox::DS::Timer;
# [$abs_float_firetime, $coderef];
diff --git a/lib/PublicInbox/EvCleanup.pm b/lib/PublicInbox/EvCleanup.pm
index d60ac2cc..a9f6167d 100644
--- a/lib/PublicInbox/EvCleanup.pm
+++ b/lib/PublicInbox/EvCleanup.pm
@@ -6,6 +6,7 @@ package PublicInbox::EvCleanup;
use strict;
use warnings;
use base qw(PublicInbox::DS);
+use PublicInbox::Syscall qw(EPOLLOUT EPOLLONESHOT);
my $ENABLED;
sub enabled { $ENABLED }
@@ -59,13 +60,12 @@ sub _run_later () {
# Called by PublicInbox::DS
sub event_step {
my ($self) = @_;
- $self->watch_write(0);
_run_asap();
}
sub _asap_timer () {
$singleton ||= once_init();
- $singleton->watch_write(1);
+ $singleton->watch(EPOLLOUT|EPOLLONESHOT);
1;
}
diff --git a/lib/PublicInbox/HTTP.pm b/lib/PublicInbox/HTTP.pm
index afa71ea5..773d77ba 100644
--- a/lib/PublicInbox/HTTP.pm
+++ b/lib/PublicInbox/HTTP.pm
@@ -20,6 +20,7 @@ use HTTP::Date qw(time2str);
use IO::Handle;
require PublicInbox::EvCleanup;
PublicInbox::DS->import(qw(msg_more write_in_full));
+use PublicInbox::Syscall qw(EPOLLIN EPOLLONESHOT);
use constant {
CHUNK_START => -1, # [a-f0-9]+\r\n
CHUNK_END => -2, # \r\n
@@ -56,7 +57,7 @@ sub http_date () {
sub new ($$$) {
my ($class, $sock, $addr, $httpd) = @_;
my $self = fields::new($class);
- $self->SUPER::new($sock, PublicInbox::DS::EPOLLIN());
+ $self->SUPER::new($sock, EPOLLIN | EPOLLONESHOT);
$self->{httpd} = $httpd;
$self->{rbuf} = '';
($self->{remote_addr}, $self->{remote_port}) =
@@ -80,7 +81,8 @@ sub event_step { # called by PublicInbox::DS
return $self->close if $r == 0;
return rbuf_process($self);
}
- return if $!{EAGAIN}; # no need to call watch_read(1) again
+
+ return $self->watch_in1 if $!{EAGAIN};
# common for clients to break connections without warning,
# would be too noisy to log here:
@@ -100,7 +102,7 @@ sub rbuf_process {
($r == -2 && length($self->{rbuf}) > 0x4000)) {
return quit($self, 400);
}
- return $self->watch_read(1) if $r < 0; # incomplete
+ return $self->watch_in1 if $r < 0; # incomplete
$self->{rbuf} = substr($self->{rbuf}, $r);
my $len = input_prepare($self, \%env);
@@ -143,7 +145,6 @@ sub read_input ($) {
sub app_dispatch {
my ($self, $input) = @_;
- $self->watch_read(0);
my $env = $self->{env};
$env->{REMOTE_ADDR} = $self->{remote_addr};
$env->{REMOTE_PORT} = $self->{remote_port};
@@ -236,7 +237,7 @@ sub identity_wcb ($) {
sub next_request ($) {
my ($self) = @_;
if ($self->{rbuf} eq '') { # wait for next request
- $self->watch_read(1);
+ $self->watch_in1;
} else { # avoid recursion for pipelined requests
push @$pipelineq, $self;
$pipet ||= PublicInbox::EvCleanup::asap(*process_pipelineq);
@@ -360,7 +361,7 @@ sub recv_err {
return $self->close if (defined $r && $r == 0);
if ($!{EAGAIN}) {
$self->{input_left} = $len;
- return;
+ return $self->watch_in1;
}
err($self, "error reading for input: $! ($len bytes remaining)");
quit($self, 500);
diff --git a/lib/PublicInbox/HTTPD/Async.pm b/lib/PublicInbox/HTTPD/Async.pm
index dae62e55..f32ef009 100644
--- a/lib/PublicInbox/HTTPD/Async.pm
+++ b/lib/PublicInbox/HTTPD/Async.pm
@@ -31,10 +31,12 @@ sub new {
$self;
}
+sub restart_read ($) { $_[0]->watch(PublicInbox::DS::EPOLLIN()) }
+
# fires after pending writes are complete:
sub restart_read_cb ($) {
my ($self) = @_;
- sub { $self->watch_read(1) }
+ sub { restart_read($self) }
}
sub main_cb ($$$) {
@@ -46,16 +48,16 @@ sub main_cb ($$$) {
$fh->write($$bref);
if ($http->{sock}) { # !closed
if ($http->{wbuf}) {
- $self->watch_read(0);
+ $self->watch(0);
$http->write(restart_read_cb($self));
}
- # stay in watch_read, but let other clients
+ # stay in EPOLLIN, but let other clients
# get some work done, too.
return;
}
# fall through to close below...
} elsif (!defined $r) {
- return if $!{EAGAIN} || $!{EINTR};
+ return restart_read($self) if $!{EAGAIN} || $!{EINTR};
}
# Done! Error handling will happen in $fh->close
diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm
index eb1679a7..98f88410 100644
--- a/lib/PublicInbox/NNTP.pm
+++ b/lib/PublicInbox/NNTP.pm
@@ -24,6 +24,7 @@ use constant {
r225 => '225 Headers follow (multi-line)',
r430 => '430 No article with that message-id',
};
+use PublicInbox::Syscall qw(EPOLLIN EPOLLONESHOT);
my @OVERVIEW = qw(Subject From Date Message-ID References Xref);
my $OVERVIEW_FMT = join(":\r\n", @OVERVIEW, qw(Bytes Lines)) . ":\r\n";
@@ -52,12 +53,6 @@ sub next_tick () {
# pipelined request, we bypassed socket-readiness
# checks to get here:
event_step($nntp);
-
- # maybe there's more pipelined data, or we'll have
- # to register it for socket-readiness notifications
- if (!$nntp->{long_res} && $nntp->{sock}) {
- check_read($nntp);
- }
}
}
}
@@ -97,7 +92,7 @@ sub expire_old () {
sub new ($$$) {
my ($class, $sock, $nntpd) = @_;
my $self = fields::new($class);
- $self->SUPER::new($sock, PublicInbox::DS::EPOLLIN());
+ $self->SUPER::new($sock, EPOLLIN | EPOLLONESHOT);
$self->{nntpd} = $nntpd;
res($self, '201 ' . $nntpd->{servername} . ' ready - post via email');
$self->{rbuf} = '';
@@ -624,11 +619,10 @@ sub long_response ($$) {
# make sure we disable reading during a long response,
# clients should not be sending us stuff and making us do more
# work while we are stream a response to them
- $self->watch_read(0);
my $t0 = now();
$self->{long_res} = sub {
my $more = eval { $cb->() };
- if ($@ || !$self->{sock}) {
+ if ($@ || !$self->{sock}) { # something bad happened...
$self->{long_res} = undef;
if ($@) {
@@ -922,10 +916,6 @@ sub do_write ($$) {
my $done = $self->write(\($_[1]));
return 0 unless $self->{sock};
- # Do not watch for readability if we have data in the queue,
- # instead re-enable watching for readability when we can
- $self->watch_read(0) if (!$done || $self->{long_res});
-
$done;
}
@@ -943,7 +933,6 @@ sub event_step {
my ($self) = @_;
return unless $self->flush_write && $self->{sock};
- return if $self->{long_res};
update_idle_time($self);
# only read more requests if we've drained the write buffer,
@@ -957,7 +946,7 @@ sub event_step {
my $off = length($$rbuf);
$r = sysread($self->{sock}, $$rbuf, LINE_MAX, $off);
unless (defined $r) {
- return if $!{EAGAIN};
+ return $self->watch_in1 if $!{EAGAIN};
return $self->close;
}
return $self->close if $r == 0;
@@ -978,6 +967,10 @@ sub event_step {
my $len = length($$rbuf);
return $self->close if ($len >= LINE_MAX);
update_idle_time($self);
+
+ # maybe there's more pipelined data, or we'll have
+ # to register it for socket-readiness notifications
+ check_read($self) unless ($self->{long_res} || $self->{wbuf});
}
sub check_read {
@@ -993,7 +986,7 @@ sub check_read {
} else {
# no pipelined requests available, let the kernel know
# to wake us up if there's more
- $self->watch_read(1); # PublicInbox::DS::watch_read
+ $self->watch_in1; # PublicInbox::DS::watch_in1
}
}
diff --git a/lib/PublicInbox/Syscall.pm b/lib/PublicInbox/Syscall.pm
index 17fd1398..f1988e61 100644
--- a/lib/PublicInbox/Syscall.pm
+++ b/lib/PublicInbox/Syscall.pm
@@ -24,11 +24,11 @@ $VERSION = "0.25";
@EXPORT_OK = qw(sendfile epoll_ctl epoll_create epoll_wait
EPOLLIN EPOLLOUT
EPOLL_CTL_ADD EPOLL_CTL_DEL EPOLL_CTL_MOD
- EPOLLEXCLUSIVE);
+ EPOLLONESHOT EPOLLEXCLUSIVE);
%EXPORT_TAGS = (epoll => [qw(epoll_ctl epoll_create epoll_wait
EPOLLIN EPOLLOUT
EPOLL_CTL_ADD EPOLL_CTL_DEL EPOLL_CTL_MOD
- EPOLLEXCLUSIVE)],
+ EPOLLONESHOT EPOLLEXCLUSIVE)],
sendfile => [qw(sendfile)],
);
@@ -38,7 +38,7 @@ use constant EPOLLOUT => 4;
# use constant EPOLLHUP => 16;
# use constant EPOLLRDBAND => 128;
use constant EPOLLEXCLUSIVE => (1 << 28);
-# use constant EPOLLONESHOT => (1 << 30);
+use constant EPOLLONESHOT => (1 << 30);
# use constant EPOLLET => (1 << 31);
use constant EPOLL_CTL_ADD => 1;
use constant EPOLL_CTL_DEL => 2;
--
EW
next prev parent reply other threads:[~2019-06-24 2:55 UTC|newest]
Thread overview: 61+ messages / expand[flat|nested] mbox.gz Atom feed top
2019-06-24 2:52 [PATCH 00/57] ds: shrink, TLS support, buffer writes to FS Eric Wong
2019-06-24 2:52 ` [PATCH 01/57] ds: get rid of {closed} field Eric Wong
2019-06-24 2:52 ` [PATCH 02/57] ds: get rid of more unused debug instance methods Eric Wong
2019-06-24 2:52 ` [PATCH 03/57] ds: use and export monotonic now() Eric Wong
2019-06-24 2:52 ` [PATCH 04/57] AddTimer: avoid clock_gettime for the '0' case Eric Wong
2019-06-24 2:52 ` [PATCH 05/57] ds: get rid of on_incomplete_write wrapper Eric Wong
2019-06-24 2:52 ` [PATCH 06/57] ds: lazy initialize wbuf_off Eric Wong
2019-06-24 2:52 ` [PATCH 07/57] ds: split out from ->flush_write and ->write Eric Wong
2019-06-24 2:52 ` [PATCH 08/57] ds: lazy-initialize wbuf Eric Wong
2019-06-24 2:52 ` [PATCH 09/57] ds: don't pass `events' arg to EPOLL_CTL_DEL Eric Wong
2019-06-24 2:52 ` [PATCH 10/57] ds: remove support for DS->write(undef) Eric Wong
2019-06-24 2:52 ` [PATCH 11/57] http: favor DS->write(strref) when reasonable Eric Wong
2019-06-24 2:52 ` [PATCH 12/57] ds: share send(..., MSG_MORE) logic Eric Wong
2019-06-24 2:52 ` [PATCH 13/57] ds: switch write buffering to use a tempfile Eric Wong
2019-06-24 2:52 ` [PATCH 14/57] ds: get rid of redundant and unnecessary POLL* constants Eric Wong
2019-06-24 2:52 ` [PATCH 15/57] syscall: get rid of unused EPOLL* constants Eric Wong
2019-06-24 2:52 ` [PATCH 16/57] syscall: get rid of unnecessary uname local vars Eric Wong
2019-06-24 2:52 ` [PATCH 17/57] ds: set event flags directly at initialization Eric Wong
2019-06-24 2:52 ` [PATCH 18/57] ds: import IO::KQueue namespace Eric Wong
2019-06-24 2:52 ` [PATCH 19/57] ds: share watch_chg between watch_read/watch_write Eric Wong
2019-06-24 2:52 ` [PATCH 20/57] ds: remove IO::Poll support (for now) Eric Wong
2019-06-24 2:52 ` Eric Wong [this message]
2019-06-24 2:52 ` [PATCH 22/57] httpd/async: remove EINTR check Eric Wong
2019-06-24 2:52 ` [PATCH 23/57] spawn: remove `Blocking' flag handling Eric Wong
2019-06-24 2:52 ` [PATCH 24/57] qspawn: describe where `$rpipe' come from Eric Wong
2019-06-24 2:52 ` [PATCH 25/57] http|nntp: favor "$! == EFOO" over $!{EFOO} checks Eric Wong
2019-06-24 2:52 ` [PATCH 26/57] ds: favor `delete' over assigning fields to `undef' Eric Wong
2019-06-24 2:52 ` [PATCH 27/57] http: don't pass extra args to PublicInbox::DS::close Eric Wong
2019-06-24 2:52 ` [PATCH 28/57] ds: pass $self to code references Eric Wong
2019-06-24 2:52 ` [PATCH 29/57] evcleanup: replace _run_asap with `event_step' callback Eric Wong
2019-06-24 2:52 ` [PATCH 30/57] ds: remove pointless exit calls Eric Wong
2019-06-24 2:52 ` [PATCH 31/57] http|nntp: be explicit about bytes::length on rbuf Eric Wong
2019-06-24 2:52 ` [PATCH 32/57] ds: hoist out do_read from NNTP and HTTP Eric Wong
2019-06-24 2:52 ` [PATCH 33/57] nntp: simplify re-arming/requeue logic Eric Wong
2019-06-24 2:52 ` [PATCH 34/57] allow use of PerlIO layers for filesystem writes Eric Wong
2019-06-24 2:52 ` [PATCH 35/57] ds: deal better with FS-related errors IO buffers Eric Wong
2019-06-24 2:52 ` [PATCH 36/57] nntp: wait for writability before sending greeting Eric Wong
2019-06-24 2:52 ` [PATCH 37/57] nntp: NNTPS and NNTP+STARTTLS working Eric Wong
2019-06-24 2:52 ` [PATCH 38/57] certs/create-certs.perl: fix cert validity on 32-bit Eric Wong
2019-06-24 2:52 ` [PATCH 39/57] daemon: map inherited sockets to well-known schemes Eric Wong
2019-06-24 2:52 ` [PATCH 40/57] ds|nntp: use CORE::close on socket Eric Wong
2019-06-24 2:52 ` [PATCH 41/57] nntp: call SSL_shutdown in normal cases Eric Wong
2019-06-24 2:52 ` [PATCH 42/57] t/nntpd-tls: slow client connection test Eric Wong
2019-06-24 2:52 ` [PATCH 43/57] daemon: use SSL_MODE_RELEASE_BUFFERS Eric Wong
2019-06-24 2:52 ` [PATCH 44/57] ds: allow ->write callbacks to syswrite directly Eric Wong
2019-06-24 2:52 ` [PATCH 45/57] nntp: reduce allocations for greeting Eric Wong
2019-06-24 2:52 ` [PATCH 46/57] ds: always use EV_ADD with EV_SET Eric Wong
2019-06-24 2:52 ` [PATCH 47/57] nntp: simplify long response logic and fix nesting Eric Wong
2019-06-24 2:52 ` [PATCH 48/57] ds: flush_write runs ->write callbacks even if closed Eric Wong
2019-06-24 2:52 ` [PATCH 49/57] nntp: lazily allocate and stash rbuf Eric Wong
2019-06-24 2:52 ` [PATCH 50/57] ci: require IO::KQueue on FreeBSD, for now Eric Wong
2019-06-24 2:52 ` [PATCH 51/57] nntp: send greeting immediately for plain sockets Eric Wong
2019-06-24 2:52 ` [PATCH 52/57] daemon: set TCP_DEFER_ACCEPT on everything but NNTP Eric Wong
2019-06-24 2:52 ` [PATCH 53/57] daemon: use FreeBSD accept filters on non-NNTP Eric Wong
2019-06-24 2:52 ` [PATCH 54/57] ds: split out IO::KQueue-specific code Eric Wong
2019-06-24 5:24 ` Eric Wong
2019-06-24 2:52 ` [PATCH 55/57] ds: reimplement IO::Poll support to look like epoll Eric Wong
2019-06-24 2:52 ` [PATCH 56/57] Revert "ci: require IO::KQueue on FreeBSD, for now" Eric Wong
2019-06-24 2:52 ` [PATCH 57/57] ds: reduce overhead of tempfile creation Eric Wong
2019-06-24 5:25 ` [PATCH 58/57] Makefile: skip DSKQXS in global syntax check Eric Wong
2019-06-24 18:28 ` [PATCH 59/57] ds: ->write must not clobber empty wbuf array Eric Wong
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
List information: https://public-inbox.org/README
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20190624025258.25592-22-e@80x24.org \
--to=e@80x24.org \
--cc=meta@public-inbox.org \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).