* [PATCH 01/57] ds: get rid of {closed} field
2019-06-24 2:52 [PATCH 00/57] ds: shrink, TLS support, buffer writes to FS Eric Wong
@ 2019-06-24 2:52 ` Eric Wong
2019-06-24 2:52 ` [PATCH 02/57] ds: get rid of more unused debug instance methods Eric Wong
` (57 subsequent siblings)
58 siblings, 0 replies; 61+ messages in thread
From: Eric Wong @ 2019-06-24 2:52 UTC (permalink / raw)
To: meta
Merely checking the presence of the {sock} field is
enough, and having multiple sources of truth increases
confusion and the likelyhood of bugs.
---
lib/PublicInbox/DS.pm | 52 ++++++++++++----------------------
lib/PublicInbox/HTTP.pm | 8 +++---
lib/PublicInbox/HTTPD/Async.pm | 2 +-
lib/PublicInbox/NNTP.pm | 30 +++++++++-----------
4 files changed, 37 insertions(+), 55 deletions(-)
diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm
index 2b04886a..f4fe8793 100644
--- a/lib/PublicInbox/DS.pm
+++ b/lib/PublicInbox/DS.pm
@@ -28,7 +28,6 @@ use PublicInbox::Syscall qw(:epoll);
use fields ('sock', # underlying socket
'wbuf', # arrayref of scalars, scalarrefs, or coderefs to write
'wbuf_off', # offset into first element of wbuf to start writing at
- 'closed', # bool: socket is closed
'event_watch', # bitmask of events the client is interested in (POLLIN,OUT,etc.)
);
@@ -366,7 +365,7 @@ sub PostEventLoop {
$sock->close;
# and now we can finally remove the fd from the map. see
- # comment above in _cleanup.
+ # comment above in ->close.
delete $DescriptorMap{$fd};
}
@@ -411,7 +410,6 @@ sub new {
$self->{wbuf} = [];
$self->{wbuf_off} = 0;
- $self->{closed} = 0;
my $ev = $self->{event_watch} = POLLERR|POLLHUP|POLLNVAL;
@@ -457,28 +455,8 @@ Close the socket.
=cut
sub close {
- my PublicInbox::DS $self = $_[0];
- return if $self->{closed};
-
- # this does most of the work of closing us
- $self->_cleanup();
-
- # defer closing the actual socket until the event loop is done
- # processing this round of events. (otherwise we might reuse fds)
- if (my $sock = delete $self->{sock}) {
- push @ToClose, $sock;
- }
-
- return 0;
-}
-
-### METHOD: _cleanup()
-### Called by our closers so we can clean internal data structures.
-sub _cleanup {
- my PublicInbox::DS $self = $_[0];
-
- # we're effectively closed; we have no fd and sock when we leave here
- $self->{closed} = 1;
+ my ($self) = @_;
+ my $sock = delete $self->{sock} or return;
# we need to flush our write buffer, as there may
# be self-referential closures (sub { $client->close })
@@ -487,8 +465,8 @@ sub _cleanup {
# if we're using epoll, we have to remove this from our epoll fd so we stop getting
# notifications about it
- if ($HaveEpoll && $self->{sock}) {
- my $fd = fileno($self->{sock});
+ if ($HaveEpoll) {
+ my $fd = fileno($sock);
epoll_ctl($Epoll, EPOLL_CTL_DEL, $fd, $self->{event_watch}) and
confess("EPOLL_CTL_DEL: $!");
}
@@ -498,9 +476,15 @@ sub _cleanup {
# processing an epoll_wait/etc that returned hundreds of fds, one
# of which is not yet processed and is what we're closing. if we
# keep it in DescriptorMap, then the event harnesses can just
- # looked at $pob->{closed} and ignore it. but if it's an
+ # looked at $pob->{sock} == undef and ignore it. but if it's an
# un-accounted for fd, then it (understandably) freak out a bit
# and emit warnings, thinking their state got off.
+
+ # defer closing the actual socket until the event loop is done
+ # processing this round of events. (otherwise we might reuse fds)
+ push @ToClose, $sock;
+
+ return 0;
}
=head2 C<< $obj->sock() >>
@@ -533,7 +517,7 @@ sub write {
# now-dead object does its second write. that is this case. we
# just lie and say it worked. it'll be dead soon and won't be
# hurt by this lie.
- return 1 if $self->{closed};
+ return 1 unless $self->{sock};
my $bref;
@@ -634,7 +618,7 @@ Turn 'readable' event notification on or off.
=cut
sub watch_read {
my PublicInbox::DS $self = shift;
- return if $self->{closed} || !$self->{sock};
+ my $sock = $self->{sock} or return;
my $val = shift;
my $event = $self->{event_watch};
@@ -642,7 +626,7 @@ sub watch_read {
$event &= ~POLLIN if ! $val;
$event |= POLLIN if $val;
- my $fd = fileno($self->{sock});
+ my $fd = fileno($sock);
# If it changed, set it
if ($event != $self->{event_watch}) {
if ($HaveKQueue) {
@@ -664,14 +648,14 @@ Turn 'writable' event notification on or off.
=cut
sub watch_write {
my PublicInbox::DS $self = shift;
- return if $self->{closed} || !$self->{sock};
+ my $sock = $self->{sock} or return;
my $val = shift;
my $event = $self->{event_watch};
$event &= ~POLLOUT if ! $val;
$event |= POLLOUT if $val;
- my $fd = fileno($self->{sock});
+ my $fd = fileno($sock);
# If it changed, set it
if ($event != $self->{event_watch}) {
@@ -728,7 +712,7 @@ sub as_string {
my PublicInbox::DS $self = shift;
my $rw = "(" . ($self->{event_watch} & POLLIN ? 'R' : '') .
($self->{event_watch} & POLLOUT ? 'W' : '') . ")";
- my $ret = ref($self) . "$rw: " . ($self->{closed} ? "closed" : "open");
+ my $ret = ref($self) . "$rw: " . ($self->{sock} ? 'open' : 'closed');
return $ret;
}
diff --git a/lib/PublicInbox/HTTP.pm b/lib/PublicInbox/HTTP.pm
index 45bf23ec..dff59286 100644
--- a/lib/PublicInbox/HTTP.pm
+++ b/lib/PublicInbox/HTTP.pm
@@ -33,7 +33,7 @@ sub process_pipelineq () {
$pipet = undef;
$pipelineq = [];
foreach (@$q) {
- next if $_->{closed};
+ next unless $_->{sock};
rbuf_process($_);
}
}
@@ -70,7 +70,7 @@ sub event_step { # called by PublicInbox::DS
my $wbuf = $self->{wbuf};
if (@$wbuf) {
$self->write(undef);
- return if $self->{closed} || scalar(@$wbuf);
+ return if !$self->{sock} || scalar(@$wbuf);
}
# only read more requests if we've drained the write buffer,
# otherwise we can be buffering infinitely w/o backpressure
@@ -266,7 +266,7 @@ sub getline_cb ($$$) {
my $buf = eval { $forward->getline };
if (defined $buf) {
$write->($buf); # may close in PublicInbox::DS::write
- unless ($self->{closed}) {
+ if ($self->{sock}) {
my $next = $self->{pull};
if (scalar @{$self->{wbuf}}) {
$self->write($next);
@@ -322,7 +322,7 @@ sub response_write {
use constant MSG_MORE => ($^O eq 'linux') ? 0x8000 : 0;
sub more ($$) {
my $self = $_[0];
- return if $self->{closed};
+ return unless $self->{sock};
if (MSG_MORE && !scalar(@{$self->{wbuf}})) {
my $n = send($self->{sock}, $_[1], MSG_MORE);
if (defined $n) {
diff --git a/lib/PublicInbox/HTTPD/Async.pm b/lib/PublicInbox/HTTPD/Async.pm
index 604627ab..261a01e0 100644
--- a/lib/PublicInbox/HTTPD/Async.pm
+++ b/lib/PublicInbox/HTTPD/Async.pm
@@ -45,7 +45,7 @@ sub main_cb ($$$) {
my $r = sysread($self->{sock}, $$bref, 8192);
if ($r) {
$fh->write($$bref);
- unless ($http->{closed}) { # PublicInbox::DS sets this
+ if ($http->{sock}) { # !closed
if (scalar @{$http->{wbuf}}) {
$self->watch_read(0);
$http->write(restart_read_cb($self));
diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm
index 796ac74d..107cbe31 100644
--- a/lib/PublicInbox/NNTP.pm
+++ b/lib/PublicInbox/NNTP.pm
@@ -57,7 +57,7 @@ sub next_tick () {
# maybe there's more pipelined data, or we'll have
# to register it for socket-readiness notifications
- if (!$nntp->{long_res} && !$nntp->{closed}) {
+ if (!$nntp->{long_res} && $nntp->{sock}) {
check_read($nntp);
}
}
@@ -66,9 +66,8 @@ sub next_tick () {
sub update_idle_time ($) {
my ($self) = @_;
- my $sock = $self->{sock} or return;
- my $fd = fileno($sock);
- defined $fd and $EXPMAP->{$fd} = [ now(), $self ];
+ my $sock = $self->{sock} or return;
+ $EXPMAP->{fileno($sock)} = [ now(), $self ];
}
sub expire_old () {
@@ -134,7 +133,7 @@ sub process_line ($$) {
my $res = eval { $req->($self, @args) };
my $err = $@;
- if ($err && !$self->{closed}) {
+ if ($err && $self->{sock}) {
local $/ = "\n";
chomp($l);
err($self, 'error from: %s (%s)', $l, $err);
@@ -632,7 +631,7 @@ sub long_response ($$) {
my $t0 = now();
$self->{long_res} = sub {
my $more = eval { $cb->() };
- if ($@ || $self->{closed}) {
+ if ($@ || !$self->{sock}) {
$self->{long_res} = undef;
if ($@) {
@@ -640,12 +639,12 @@ sub long_response ($$) {
"%s during long response[$fd] - %0.6f",
$@, now() - $t0);
}
- if ($self->{closed}) {
- out($self, " deferred[$fd] aborted - %0.6f",
- now() - $t0);
- } else {
+ if ($self->{sock}) {
update_idle_time($self);
check_read($self);
+ } else {
+ out($self, " deferred[$fd] aborted - %0.6f",
+ now() - $t0);
}
} elsif ($more) { # scalar @{$self->{wbuf}}:
# no recursion, schedule another call ASAP
@@ -930,7 +929,7 @@ sub more ($$) {
sub do_write ($$) {
my ($self, $data) = @_;
my $done = $self->write($data);
- return 0 if $self->{closed};
+ return 0 unless $self->{sock};
# Do not watch for readability if we have data in the queue,
# instead re-enable watching for readability when we can
@@ -966,13 +965,13 @@ sub do_more ($$) {
sub event_step {
my ($self) = @_;
- return if $self->{closed};
+ return unless $self->{sock};
my $wbuf = $self->{wbuf};
if (@$wbuf) {
update_idle_time($self);
$self->write(undef);
- return if $self->{closed} || scalar(@$wbuf);
+ return if !$self->{sock} || scalar(@$wbuf);
}
return if $self->{long_res};
# only read more requests if we've drained the write buffer,
@@ -1028,9 +1027,8 @@ sub check_read {
sub not_idle_long ($$) {
my ($self, $now) = @_;
- my $sock = $self->{sock} or return;
- defined(my $fd = fileno($sock)) or return;
- my $ary = $EXPMAP->{$fd} or return;
+ my $sock = $self->{sock} or return;
+ my $ary = $EXPMAP->{fileno($sock)} or return;
my $exp_at = $ary->[0] + $EXPTIME;
$exp_at > $now;
}
--
EW
^ permalink raw reply related [flat|nested] 61+ messages in thread
* [PATCH 02/57] ds: get rid of more unused debug instance methods
2019-06-24 2:52 [PATCH 00/57] ds: shrink, TLS support, buffer writes to FS Eric Wong
2019-06-24 2:52 ` [PATCH 01/57] ds: get rid of {closed} field Eric Wong
@ 2019-06-24 2:52 ` Eric Wong
2019-06-24 2:52 ` [PATCH 03/57] ds: use and export monotonic now() Eric Wong
` (56 subsequent siblings)
58 siblings, 0 replies; 61+ messages in thread
From: Eric Wong @ 2019-06-24 2:52 UTC (permalink / raw)
To: meta
Over a decade of using Danga::Socket and I never found the
built-in debug functionality useful.
---
lib/PublicInbox/DS.pm | 69 -------------------------------------------
1 file changed, 69 deletions(-)
diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm
index f4fe8793..9e24ed78 100644
--- a/lib/PublicInbox/DS.pm
+++ b/lib/PublicInbox/DS.pm
@@ -34,8 +34,6 @@ use fields ('sock', # underlying socket
use Errno qw(EAGAIN EINVAL);
use Carp qw(croak confess);
-use constant DebugLevel => 0;
-
use constant POLLIN => 1;
use constant POLLOUT => 4;
use constant POLLERR => 8;
@@ -105,18 +103,6 @@ sub SetLoopTimeout {
return $LoopTimeout = $_[1] + 0;
}
-=head2 C<< CLASS->DebugMsg( $format, @args ) >>
-
-Print the debugging message specified by the C<sprintf>-style I<format> and
-I<args>
-
-=cut
-sub DebugMsg {
- my ( $class, $fmt, @args ) = @_;
- chomp $fmt;
- printf STDERR ">>> $fmt\n", @args;
-}
-
=head2 C<< CLASS->AddTimer( $seconds, $coderef ) >>
Add a timer to occur $seconds from now. $seconds may be fractional, but timers
@@ -487,16 +473,6 @@ sub close {
return 0;
}
-=head2 C<< $obj->sock() >>
-
-Returns the underlying IO::Handle for the object.
-
-=cut
-sub sock {
- my PublicInbox::DS $self = shift;
- return $self->{sock};
-}
-
=head2 C<< $obj->write( $data ) >>
Write the specified data to the underlying handle. I<data> may be scalar,
@@ -671,51 +647,6 @@ sub watch_write {
}
}
-=head2 C<< $obj->dump_error( $message ) >>
-
-Prints to STDERR a backtrace with information about this socket and what lead
-up to the dump_error call.
-
-=cut
-sub dump_error {
- my $i = 0;
- my @list;
- while (my ($file, $line, $sub) = (caller($i++))[1..3]) {
- push @list, "\t$file:$line called $sub\n";
- }
-
- warn "ERROR: $_[1]\n" .
- "\t$_[0] = " . $_[0]->as_string . "\n" .
- join('', @list);
-}
-
-=head2 C<< $obj->debugmsg( $format, @args ) >>
-
-Print the debugging message specified by the C<sprintf>-style I<format> and
-I<args>.
-
-=cut
-sub debugmsg {
- my ( $self, $fmt, @args ) = @_;
- confess "Not an object" unless ref $self;
-
- chomp $fmt;
- printf STDERR ">>> $fmt\n", @args;
-}
-
-=head2 C<< $obj->as_string() >>
-
-Returns a string describing this socket.
-
-=cut
-sub as_string {
- my PublicInbox::DS $self = shift;
- my $rw = "(" . ($self->{event_watch} & POLLIN ? 'R' : '') .
- ($self->{event_watch} & POLLOUT ? 'W' : '') . ")";
- my $ret = ref($self) . "$rw: " . ($self->{sock} ? 'open' : 'closed');
- return $ret;
-}
-
package PublicInbox::DS::Timer;
# [$abs_float_firetime, $coderef];
sub cancel {
--
EW
^ permalink raw reply related [flat|nested] 61+ messages in thread
* [PATCH 03/57] ds: use and export monotonic now()
2019-06-24 2:52 [PATCH 00/57] ds: shrink, TLS support, buffer writes to FS Eric Wong
2019-06-24 2:52 ` [PATCH 01/57] ds: get rid of {closed} field Eric Wong
2019-06-24 2:52 ` [PATCH 02/57] ds: get rid of more unused debug instance methods Eric Wong
@ 2019-06-24 2:52 ` Eric Wong
2019-06-24 2:52 ` [PATCH 04/57] AddTimer: avoid clock_gettime for the '0' case Eric Wong
` (55 subsequent siblings)
58 siblings, 0 replies; 61+ messages in thread
From: Eric Wong @ 2019-06-24 2:52 UTC (permalink / raw)
To: meta
All of our internal timing code should use monotonic clocks
for consistency against system clock adjustments.
This can be shared by our Daemon and NNTP packages.
---
lib/PublicInbox/DS.pm | 11 +++++++----
lib/PublicInbox/Daemon.pm | 9 ++++-----
lib/PublicInbox/NNTP.pm | 4 +---
3 files changed, 12 insertions(+), 12 deletions(-)
diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm
index 9e24ed78..e7db2034 100644
--- a/lib/PublicInbox/DS.pm
+++ b/lib/PublicInbox/DS.pm
@@ -17,10 +17,11 @@ package PublicInbox::DS;
use strict;
use bytes;
use POSIX ();
-use Time::HiRes ();
use IO::Handle qw();
use Fcntl qw(FD_CLOEXEC F_SETFD F_GETFD);
-
+use Time::HiRes qw(clock_gettime CLOCK_MONOTONIC);
+use parent qw(Exporter);
+our @EXPORT_OK = qw(now);
use warnings;
use PublicInbox::Syscall qw(:epoll);
@@ -115,7 +116,7 @@ sub AddTimer {
my $class = shift;
my ($secs, $coderef) = @_;
- my $fire_time = Time::HiRes::time() + $secs;
+ my $fire_time = now() + $secs;
my $timer = bless [$fire_time, $coderef], "PublicInbox::DS::Timer";
@@ -195,11 +196,13 @@ sub FirstTimeEventLoop {
}
}
+sub now () { clock_gettime(CLOCK_MONOTONIC) }
+
# runs timers and returns milliseconds for next one, or next event loop
sub RunTimers {
return $LoopTimeout unless @Timers;
- my $now = Time::HiRes::time();
+ my $now = now();
# Run expired timers
while (@Timers && $Timers[0][0] <= $now) {
diff --git a/lib/PublicInbox/Daemon.pm b/lib/PublicInbox/Daemon.pm
index 227ba5f9..b8d6b572 100644
--- a/lib/PublicInbox/Daemon.pm
+++ b/lib/PublicInbox/Daemon.pm
@@ -9,10 +9,9 @@ use Getopt::Long qw/:config gnu_getopt no_ignore_case auto_abbrev/;
use IO::Handle;
use IO::Socket;
use Cwd qw/abs_path/;
-use Time::HiRes qw(clock_gettime CLOCK_MONOTONIC);
STDOUT->autoflush(1);
STDERR->autoflush(1);
-require PublicInbox::DS;
+use PublicInbox::DS qw(now);
require PublicInbox::EvCleanup;
require POSIX;
require PublicInbox::Listener;
@@ -183,7 +182,7 @@ sub worker_quit {
PublicInbox::DS->SetPostLoopCallback(sub {
my ($dmap, undef) = @_;
my $n = 0;
- my $now = clock_gettime(CLOCK_MONOTONIC);
+ my $now = now();
foreach my $s (values %$dmap) {
$s->can('busy') or next;
@@ -195,9 +194,9 @@ sub worker_quit {
}
}
if ($n) {
- if (($warn + 5) < time) {
+ if (($warn + 5) < now()) {
warn "$$ quitting, $n client(s) left\n";
- $warn = time;
+ $warn = now();
}
unless (defined $proc_name) {
$proc_name = (split(/\s+/, $0))[0];
diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm
index 107cbe31..0a473e42 100644
--- a/lib/PublicInbox/NNTP.pm
+++ b/lib/PublicInbox/NNTP.pm
@@ -14,7 +14,7 @@ use PublicInbox::Git;
require PublicInbox::EvCleanup;
use Email::Simple;
use POSIX qw(strftime);
-use Time::HiRes qw(clock_gettime CLOCK_MONOTONIC);
+PublicInbox::DS->import('now');
use Digest::SHA qw(sha1_hex);
use Time::Local qw(timegm timelocal);
use constant {
@@ -25,8 +25,6 @@ use constant {
r430 => '430 No article with that message-id',
};
-sub now () { clock_gettime(CLOCK_MONOTONIC) };
-
my @OVERVIEW = qw(Subject From Date Message-ID References Xref);
my $OVERVIEW_FMT = join(":\r\n", @OVERVIEW, qw(Bytes Lines)) . ":\r\n";
my $LIST_HEADERS = join("\r\n", @OVERVIEW,
--
EW
^ permalink raw reply related [flat|nested] 61+ messages in thread
* [PATCH 04/57] AddTimer: avoid clock_gettime for the '0' case
2019-06-24 2:52 [PATCH 00/57] ds: shrink, TLS support, buffer writes to FS Eric Wong
` (2 preceding siblings ...)
2019-06-24 2:52 ` [PATCH 03/57] ds: use and export monotonic now() Eric Wong
@ 2019-06-24 2:52 ` Eric Wong
2019-06-24 2:52 ` [PATCH 05/57] ds: get rid of on_incomplete_write wrapper Eric Wong
` (54 subsequent siblings)
58 siblings, 0 replies; 61+ messages in thread
From: Eric Wong @ 2019-06-24 2:52 UTC (permalink / raw)
To: meta
We rely on immediate timers often, so we can avoid the overhead
of an extra subroutine call to retrieve the monotonic time (and
a sometimes-system call on some platforms).
---
lib/PublicInbox/DS.pm | 9 +++++++--
1 file changed, 7 insertions(+), 2 deletions(-)
diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm
index e7db2034..ed04feb5 100644
--- a/lib/PublicInbox/DS.pm
+++ b/lib/PublicInbox/DS.pm
@@ -113,8 +113,13 @@ Returns a timer object which you can call C<< $timer->cancel >> on if you need t
=cut
sub AddTimer {
- my $class = shift;
- my ($secs, $coderef) = @_;
+ my ($class, $secs, $coderef) = @_;
+
+ if (!$secs) {
+ my $timer = bless([0, $coderef], 'PublicInbox::DS::Timer');
+ unshift(@Timers, $timer);
+ return $timer;
+ }
my $fire_time = now() + $secs;
--
EW
^ permalink raw reply related [flat|nested] 61+ messages in thread
* [PATCH 05/57] ds: get rid of on_incomplete_write wrapper
2019-06-24 2:52 [PATCH 00/57] ds: shrink, TLS support, buffer writes to FS Eric Wong
` (3 preceding siblings ...)
2019-06-24 2:52 ` [PATCH 04/57] AddTimer: avoid clock_gettime for the '0' case Eric Wong
@ 2019-06-24 2:52 ` Eric Wong
2019-06-24 2:52 ` [PATCH 06/57] ds: lazy initialize wbuf_off Eric Wong
` (53 subsequent siblings)
58 siblings, 0 replies; 61+ messages in thread
From: Eric Wong @ 2019-06-24 2:52 UTC (permalink / raw)
To: meta
Wrong place to be wrapping this method.
---
lib/PublicInbox/DS.pm | 7 +------
1 file changed, 1 insertion(+), 6 deletions(-)
diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm
index ed04feb5..26c5251b 100644
--- a/lib/PublicInbox/DS.pm
+++ b/lib/PublicInbox/DS.pm
@@ -571,7 +571,7 @@ sub write {
# since connection has stuff to write, it should now be
# interested in pending writes:
$self->{wbuf_off} += $written;
- $self->on_incomplete_write;
+ $self->watch_write(1);
return 0;
} elsif ($written == $to_write) {
$self->{wbuf_off} = 0;
@@ -590,11 +590,6 @@ sub write {
}
}
-sub on_incomplete_write {
- my PublicInbox::DS $self = shift;
- $self->watch_write(1);
-}
-
=head2 C<< $obj->watch_read( $boolean ) >>
Turn 'readable' event notification on or off.
--
EW
^ permalink raw reply related [flat|nested] 61+ messages in thread
* [PATCH 06/57] ds: lazy initialize wbuf_off
2019-06-24 2:52 [PATCH 00/57] ds: shrink, TLS support, buffer writes to FS Eric Wong
` (4 preceding siblings ...)
2019-06-24 2:52 ` [PATCH 05/57] ds: get rid of on_incomplete_write wrapper Eric Wong
@ 2019-06-24 2:52 ` Eric Wong
2019-06-24 2:52 ` [PATCH 07/57] ds: split out from ->flush_write and ->write Eric Wong
` (52 subsequent siblings)
58 siblings, 0 replies; 61+ messages in thread
From: Eric Wong @ 2019-06-24 2:52 UTC (permalink / raw)
To: meta
Since Perl 5.10+, "fields" makes a restricted hash; not a
compile-time-defined array (struct) with fixed offsets as
it did in Perl <= 5.8.
Thus in-use fields cost memory, and since the write buffer
offset is rarely needed; stop relying on it.
---
lib/PublicInbox/DS.pm | 13 ++++++-------
1 file changed, 6 insertions(+), 7 deletions(-)
diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm
index 26c5251b..154fd4dd 100644
--- a/lib/PublicInbox/DS.pm
+++ b/lib/PublicInbox/DS.pm
@@ -403,7 +403,6 @@ sub new {
unless $sock && $fd;
$self->{wbuf} = [];
- $self->{wbuf_off} = 0;
my $ev = $self->{event_watch} = POLLERR|POLLHUP|POLLNVAL;
@@ -501,7 +500,7 @@ sub write {
# now-dead object does its second write. that is this case. we
# just lie and say it worked. it'll be dead soon and won't be
# hurt by this lie.
- return 1 unless $self->{sock};
+ my $sock = $self->{sock} or return 1;
my $bref;
@@ -548,9 +547,9 @@ sub write {
die "Write error: $@ <$bref>";
}
- my $to_write = $len - $self->{wbuf_off};
- my $written = syswrite($self->{sock}, $$bref, $to_write,
- $self->{wbuf_off});
+ my $off = $self->{wbuf_off} // 0;
+ my $to_write = $len - $off;
+ my $written = syswrite($sock, $$bref, $to_write, $off);
if (! defined $written) {
if ($! == EAGAIN) {
@@ -570,11 +569,11 @@ sub write {
}
# since connection has stuff to write, it should now be
# interested in pending writes:
- $self->{wbuf_off} += $written;
+ $self->{wbuf_off} = $off + $written;
$self->watch_write(1);
return 0;
} elsif ($written == $to_write) {
- $self->{wbuf_off} = 0;
+ delete $self->{wbuf_off};
$self->watch_write(0);
# this was our only write, so we can return immediately
--
EW
^ permalink raw reply related [flat|nested] 61+ messages in thread
* [PATCH 07/57] ds: split out from ->flush_write and ->write
2019-06-24 2:52 [PATCH 00/57] ds: shrink, TLS support, buffer writes to FS Eric Wong
` (5 preceding siblings ...)
2019-06-24 2:52 ` [PATCH 06/57] ds: lazy initialize wbuf_off Eric Wong
@ 2019-06-24 2:52 ` Eric Wong
2019-06-24 2:52 ` [PATCH 08/57] ds: lazy-initialize wbuf Eric Wong
` (51 subsequent siblings)
58 siblings, 0 replies; 61+ messages in thread
From: Eric Wong @ 2019-06-24 2:52 UTC (permalink / raw)
To: meta
Get rid of the confusing $need_queue variable and all
the associated documentation for it. Instead, make it
obvious that we're either skipping the write buffer or
flushing the write buffer by splitting the sub in two.
---
lib/PublicInbox/DS.pm | 141 +++++++++++++++++-------------------------
1 file changed, 58 insertions(+), 83 deletions(-)
diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm
index 154fd4dd..f1b7bab7 100644
--- a/lib/PublicInbox/DS.pm
+++ b/lib/PublicInbox/DS.pm
@@ -480,6 +480,42 @@ sub close {
return 0;
}
+# returns 1 if done, 0 if incomplete
+sub flush_write ($) {
+ my ($self) = @_;
+ my $sock = $self->{sock} or return 1;
+ my $wbuf = $self->{wbuf};
+
+ while (my $bref = $wbuf->[0]) {
+ my $ref = ref($bref);
+ if ($ref eq 'SCALAR') {
+ my $len = bytes::length($$bref);
+ my $off = $self->{wbuf_off} || 0;
+ my $to_write = $len - $off;
+ my $written = syswrite($sock, $$bref, $to_write, $off);
+ if (defined $written) {
+ if ($written == $to_write) {
+ shift @$wbuf;
+ } else {
+ $self->{wbuf_off} = $off + $written;
+ }
+ next; # keep going until EAGAIN
+ } elsif ($! == EAGAIN) {
+ $self->watch_write(1);
+ } else {
+ $self->close;
+ }
+ return 0;
+ } else { #($ref eq 'CODE') {
+ shift @$wbuf;
+ $bref->();
+ }
+ } # while @$wbuf
+
+ $self->watch_write(0);
+ 1; # all done
+}
+
=head2 C<< $obj->write( $data ) >>
Write the specified data to the underlying handle. I<data> may be scalar,
@@ -489,9 +525,8 @@ it returns 1, caller should stop waiting for 'writable' events)
=cut
sub write {
- my PublicInbox::DS $self;
- my $data;
- ($self, $data) = @_;
+ my ($self, $data) = @_;
+ return flush_write($self) unless defined $data;
# nobody should be writing to closed sockets, but caller code can
# do two writes within an event, have the first fail and
@@ -501,91 +536,31 @@ sub write {
# just lie and say it worked. it'll be dead soon and won't be
# hurt by this lie.
my $sock = $self->{sock} or return 1;
-
- my $bref;
-
- # just queue data if there's already a wait
- my $need_queue;
+ my $ref = ref $data;
+ my $bref = $ref ? $data : \$data;
my $wbuf = $self->{wbuf};
+ if (@$wbuf) { # already buffering, can't write more...
+ push @$wbuf, $bref;
+ return 0;
+ } elsif ($ref eq 'CODE') {
+ $bref->();
+ return 1;
+ } else {
+ my $to_write = bytes::length($$bref);
+ my $written = syswrite($sock, $$bref, $to_write);
- if (defined $data) {
- $bref = ref $data ? $data : \$data;
- if (scalar @$wbuf) {
+ if (defined $written) {
+ return 1 if $written == $to_write;
+ $self->{wbuf_off} = $written;
+ push @$wbuf, $bref;
+ return flush_write($self); # try until EAGAIN
+ } elsif ($! == EAGAIN) {
push @$wbuf, $bref;
- return 0;
- }
-
- # this flag says we're bypassing the queue system, knowing we're the
- # only outstanding write, and hoping we don't ever need to use it.
- # if so later, though, we'll need to queue
- $need_queue = 1;
- }
-
- WRITE:
- while (1) {
- return 1 unless $bref ||= $wbuf->[0];
-
- my $len;
- eval {
- $len = length($$bref); # this will die if $bref is a code ref, caught below
- };
- if ($@) {
- if (UNIVERSAL::isa($bref, "CODE")) {
- unless ($need_queue) {
- shift @$wbuf;
- }
- $bref->();
-
- # code refs are just run and never get reenqueued
- # (they're one-shot), so turn off the flag indicating the
- # outstanding data needs queueing.
- $need_queue = 0;
-
- undef $bref;
- next WRITE;
- }
- die "Write error: $@ <$bref>";
- }
-
- my $off = $self->{wbuf_off} // 0;
- my $to_write = $len - $off;
- my $written = syswrite($sock, $$bref, $to_write, $off);
-
- if (! defined $written) {
- if ($! == EAGAIN) {
- # since connection has stuff to write, it should now be
- # interested in pending writes:
- if ($need_queue) {
- push @$wbuf, $bref;
- }
- $self->watch_write(1);
- return 0;
- }
-
- return $self->close;
- } elsif ($written != $to_write) {
- if ($need_queue) {
- push @$wbuf, $bref;
- }
- # since connection has stuff to write, it should now be
- # interested in pending writes:
- $self->{wbuf_off} = $off + $written;
$self->watch_write(1);
- return 0;
- } elsif ($written == $to_write) {
- delete $self->{wbuf_off};
- $self->watch_write(0);
-
- # this was our only write, so we can return immediately
- # since we avoided incrementing the buffer size or
- # putting it in the buffer. we also know there
- # can't be anything else to write.
- return 1 if $need_queue;
-
- shift @$wbuf;
- undef $bref;
- next WRITE;
+ } else {
+ $self->close;
}
+ return 0;
}
}
--
EW
^ permalink raw reply related [flat|nested] 61+ messages in thread
* [PATCH 08/57] ds: lazy-initialize wbuf
2019-06-24 2:52 [PATCH 00/57] ds: shrink, TLS support, buffer writes to FS Eric Wong
` (6 preceding siblings ...)
2019-06-24 2:52 ` [PATCH 07/57] ds: split out from ->flush_write and ->write Eric Wong
@ 2019-06-24 2:52 ` Eric Wong
2019-06-24 2:52 ` [PATCH 09/57] ds: don't pass `events' arg to EPOLL_CTL_DEL Eric Wong
` (50 subsequent siblings)
58 siblings, 0 replies; 61+ messages in thread
From: Eric Wong @ 2019-06-24 2:52 UTC (permalink / raw)
To: meta
We don't need write buffering unless we encounter slow clients
requesting large responses. So don't waste a hash slot or
(empty) arrayref for it.
---
lib/PublicInbox/DS.pm | 14 ++++++--------
lib/PublicInbox/EvCleanup.pm | 2 +-
lib/PublicInbox/HTTP.pm | 13 +++++--------
lib/PublicInbox/HTTPD/Async.pm | 2 +-
lib/PublicInbox/NNTP.pm | 16 ++++++----------
5 files changed, 19 insertions(+), 28 deletions(-)
diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm
index f1b7bab7..d07620a8 100644
--- a/lib/PublicInbox/DS.pm
+++ b/lib/PublicInbox/DS.pm
@@ -402,8 +402,6 @@ sub new {
Carp::cluck("undef sock and/or fd in PublicInbox::DS->new. sock=" . ($sock || "") . ", fd=" . ($fd || ""))
unless $sock && $fd;
- $self->{wbuf} = [];
-
my $ev = $self->{event_watch} = POLLERR|POLLHUP|POLLNVAL;
_InitPoller();
@@ -454,7 +452,7 @@ sub close {
# we need to flush our write buffer, as there may
# be self-referential closures (sub { $client->close })
# preventing the object from being destroyed
- @{$self->{wbuf}} = ();
+ delete $self->{wbuf};
# if we're using epoll, we have to remove this from our epoll fd so we stop getting
# notifications about it
@@ -483,8 +481,8 @@ sub close {
# returns 1 if done, 0 if incomplete
sub flush_write ($) {
my ($self) = @_;
+ my $wbuf = $self->{wbuf} or return 1;
my $sock = $self->{sock} or return 1;
- my $wbuf = $self->{wbuf};
while (my $bref = $wbuf->[0]) {
my $ref = ref($bref);
@@ -512,6 +510,7 @@ sub flush_write ($) {
}
} # while @$wbuf
+ delete $self->{wbuf};
$self->watch_write(0);
1; # all done
}
@@ -538,8 +537,7 @@ sub write {
my $sock = $self->{sock} or return 1;
my $ref = ref $data;
my $bref = $ref ? $data : \$data;
- my $wbuf = $self->{wbuf};
- if (@$wbuf) { # already buffering, can't write more...
+ if (my $wbuf = $self->{wbuf}) { # already buffering, can't write more...
push @$wbuf, $bref;
return 0;
} elsif ($ref eq 'CODE') {
@@ -552,10 +550,10 @@ sub write {
if (defined $written) {
return 1 if $written == $to_write;
$self->{wbuf_off} = $written;
- push @$wbuf, $bref;
+ $self->{wbuf} = [ $bref ];
return flush_write($self); # try until EAGAIN
} elsif ($! == EAGAIN) {
- push @$wbuf, $bref;
+ $self->{wbuf} = [ $bref ];
$self->watch_write(1);
} else {
$self->close;
diff --git a/lib/PublicInbox/EvCleanup.pm b/lib/PublicInbox/EvCleanup.pm
index c64e2388..bd4dda11 100644
--- a/lib/PublicInbox/EvCleanup.pm
+++ b/lib/PublicInbox/EvCleanup.pm
@@ -29,7 +29,7 @@ sub once_init () {
# never drains wbuf. We can avoid wasting a hash slot by
# stuffing the read-end of the pipe into the never-to-be-touched
# wbuf
- push @{$self->{wbuf}}, $r;
+ $self->{wbuf} = $r;
$self;
}
diff --git a/lib/PublicInbox/HTTP.pm b/lib/PublicInbox/HTTP.pm
index dff59286..9a43069f 100644
--- a/lib/PublicInbox/HTTP.pm
+++ b/lib/PublicInbox/HTTP.pm
@@ -67,11 +67,8 @@ sub new ($$$) {
sub event_step { # called by PublicInbox::DS
my ($self) = @_;
- my $wbuf = $self->{wbuf};
- if (@$wbuf) {
- $self->write(undef);
- return if !$self->{sock} || scalar(@$wbuf);
- }
+ return unless $self->flush_write && $self->{sock};
+
# only read more requests if we've drained the write buffer,
# otherwise we can be buffering infinitely w/o backpressure
@@ -268,7 +265,7 @@ sub getline_cb ($$$) {
$write->($buf); # may close in PublicInbox::DS::write
if ($self->{sock}) {
my $next = $self->{pull};
- if (scalar @{$self->{wbuf}}) {
+ if ($self->{wbuf}) {
$self->write($next);
} else {
PublicInbox::EvCleanup::asap($next);
@@ -323,7 +320,7 @@ use constant MSG_MORE => ($^O eq 'linux') ? 0x8000 : 0;
sub more ($$) {
my $self = $_[0];
return unless $self->{sock};
- if (MSG_MORE && !scalar(@{$self->{wbuf}})) {
+ if (MSG_MORE && !$self->{wbuf}) {
my $n = send($self->{sock}, $_[1], MSG_MORE);
if (defined $n) {
my $nlen = length($_[1]) - $n;
@@ -490,7 +487,7 @@ sub close {
# for graceful shutdown in PublicInbox::Daemon:
sub busy () {
my ($self) = @_;
- ($self->{rbuf} ne '' || $self->{env} || scalar(@{$self->{wbuf}}));
+ ($self->{rbuf} ne '' || $self->{env} || $self->{wbuf});
}
1;
diff --git a/lib/PublicInbox/HTTPD/Async.pm b/lib/PublicInbox/HTTPD/Async.pm
index 261a01e0..46ea188c 100644
--- a/lib/PublicInbox/HTTPD/Async.pm
+++ b/lib/PublicInbox/HTTPD/Async.pm
@@ -46,7 +46,7 @@ sub main_cb ($$$) {
if ($r) {
$fh->write($$bref);
if ($http->{sock}) { # !closed
- if (scalar @{$http->{wbuf}}) {
+ if ($http->{wbuf}) {
$self->watch_read(0);
$http->write(restart_read_cb($self));
}
diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm
index 0a473e42..d9097cc7 100644
--- a/lib/PublicInbox/NNTP.pm
+++ b/lib/PublicInbox/NNTP.pm
@@ -644,7 +644,7 @@ sub long_response ($$) {
out($self, " deferred[$fd] aborted - %0.6f",
now() - $t0);
}
- } elsif ($more) { # scalar @{$self->{wbuf}}:
+ } elsif ($more) { # $self->{wbuf}:
# no recursion, schedule another call ASAP
# but only after all pending writes are done
update_idle_time($self);
@@ -950,7 +950,7 @@ use constant MSG_MORE => ($^O eq 'linux') ? 0x8000 : 0;
sub do_more ($$) {
my ($self, $data) = @_;
- if (MSG_MORE && !scalar(@{$self->{wbuf}})) {
+ if (MSG_MORE && !$self->{wbuf}) {
my $n = send($self->{sock}, $data, MSG_MORE);
if (defined $n) {
my $dlen = length($data);
@@ -963,15 +963,11 @@ sub do_more ($$) {
sub event_step {
my ($self) = @_;
- return unless $self->{sock};
- my $wbuf = $self->{wbuf};
- if (@$wbuf) {
- update_idle_time($self);
- $self->write(undef);
- return if !$self->{sock} || scalar(@$wbuf);
- }
+ return unless $self->flush_write && $self->{sock};
return if $self->{long_res};
+
+ update_idle_time($self);
# only read more requests if we've drained the write buffer,
# otherwise we can be buffering infinitely w/o backpressure
@@ -1035,7 +1031,7 @@ sub not_idle_long ($$) {
sub busy {
my ($self, $now) = @_;
($self->{rbuf} ne '' || $self->{long_res} ||
- scalar(@{$self->{wbuf}}) || not_idle_long($self, $now));
+ $self->{wbuf} || not_idle_long($self, $now));
}
1;
--
EW
^ permalink raw reply related [flat|nested] 61+ messages in thread
* [PATCH 09/57] ds: don't pass `events' arg to EPOLL_CTL_DEL
2019-06-24 2:52 [PATCH 00/57] ds: shrink, TLS support, buffer writes to FS Eric Wong
` (7 preceding siblings ...)
2019-06-24 2:52 ` [PATCH 08/57] ds: lazy-initialize wbuf Eric Wong
@ 2019-06-24 2:52 ` Eric Wong
2019-06-24 2:52 ` [PATCH 10/57] ds: remove support for DS->write(undef) Eric Wong
` (49 subsequent siblings)
58 siblings, 0 replies; 61+ messages in thread
From: Eric Wong @ 2019-06-24 2:52 UTC (permalink / raw)
To: meta
There's no point in passing a mask of interesting events
when removing an item from the epoll watch set.
---
lib/PublicInbox/DS.pm | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm
index d07620a8..8fc49eee 100644
--- a/lib/PublicInbox/DS.pm
+++ b/lib/PublicInbox/DS.pm
@@ -458,7 +458,7 @@ sub close {
# notifications about it
if ($HaveEpoll) {
my $fd = fileno($sock);
- epoll_ctl($Epoll, EPOLL_CTL_DEL, $fd, $self->{event_watch}) and
+ epoll_ctl($Epoll, EPOLL_CTL_DEL, $fd, 0) and
confess("EPOLL_CTL_DEL: $!");
}
--
EW
^ permalink raw reply related [flat|nested] 61+ messages in thread
* [PATCH 10/57] ds: remove support for DS->write(undef)
2019-06-24 2:52 [PATCH 00/57] ds: shrink, TLS support, buffer writes to FS Eric Wong
` (8 preceding siblings ...)
2019-06-24 2:52 ` [PATCH 09/57] ds: don't pass `events' arg to EPOLL_CTL_DEL Eric Wong
@ 2019-06-24 2:52 ` Eric Wong
2019-06-24 2:52 ` [PATCH 11/57] http: favor DS->write(strref) when reasonable Eric Wong
` (48 subsequent siblings)
58 siblings, 0 replies; 61+ messages in thread
From: Eric Wong @ 2019-06-24 2:52 UTC (permalink / raw)
To: meta
We call ->flush_write directly, now; so we can eliminate a
needless check.
---
lib/PublicInbox/DS.pm | 3 +--
1 file changed, 1 insertion(+), 2 deletions(-)
diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm
index 8fc49eee..ba8bd95f 100644
--- a/lib/PublicInbox/DS.pm
+++ b/lib/PublicInbox/DS.pm
@@ -518,14 +518,13 @@ sub flush_write ($) {
=head2 C<< $obj->write( $data ) >>
Write the specified data to the underlying handle. I<data> may be scalar,
-scalar ref, code ref (to run when there), or undef just to kick-start.
+scalar ref, code ref (to run when there).
Returns 1 if writes all went through, or 0 if there are writes in queue. If
it returns 1, caller should stop waiting for 'writable' events)
=cut
sub write {
my ($self, $data) = @_;
- return flush_write($self) unless defined $data;
# nobody should be writing to closed sockets, but caller code can
# do two writes within an event, have the first fail and
--
EW
^ permalink raw reply related [flat|nested] 61+ messages in thread
* [PATCH 11/57] http: favor DS->write(strref) when reasonable
2019-06-24 2:52 [PATCH 00/57] ds: shrink, TLS support, buffer writes to FS Eric Wong
` (9 preceding siblings ...)
2019-06-24 2:52 ` [PATCH 10/57] ds: remove support for DS->write(undef) Eric Wong
@ 2019-06-24 2:52 ` Eric Wong
2019-06-24 2:52 ` [PATCH 12/57] ds: share send(..., MSG_MORE) logic Eric Wong
` (47 subsequent siblings)
58 siblings, 0 replies; 61+ messages in thread
From: Eric Wong @ 2019-06-24 2:52 UTC (permalink / raw)
To: meta
This can avoid large memory copies when strings can't be
copy-on-write and saves us the trouble of creating new
refs in the code.
---
lib/PublicInbox/HTTP.pm | 10 +++++-----
1 file changed, 5 insertions(+), 5 deletions(-)
diff --git a/lib/PublicInbox/HTTP.pm b/lib/PublicInbox/HTTP.pm
index 9a43069f..4f1f88fe 100644
--- a/lib/PublicInbox/HTTP.pm
+++ b/lib/PublicInbox/HTTP.pm
@@ -209,7 +209,7 @@ sub response_header_write {
if (($len || $chunked) && $env->{REQUEST_METHOD} ne 'HEAD') {
more($self, $h);
} else {
- $self->write($h);
+ $self->write(\$h);
}
$alive;
}
@@ -222,7 +222,7 @@ sub chunked_wcb ($) {
more($self, sprintf("%x\r\n", bytes::length($_[0])));
more($self, $_[0]);
- # use $self->write("\n\n") if you care about real-time
+ # use $self->write(\"\n\n") if you care about real-time
# streaming responses, public-inbox WWW does not.
more($self, "\r\n");
}
@@ -248,7 +248,7 @@ sub response_done_cb ($$) {
sub {
my $env = $self->{env};
$self->{env} = undef;
- $self->write("0\r\n\r\n") if $alive == 2;
+ $self->write(\"0\r\n\r\n") if $alive == 2;
$self->write(sub{$alive ? next_request($self) : $self->close});
}
}
@@ -330,7 +330,7 @@ sub more ($$) {
return $self->write(substr($_[1], $n, $nlen));
}
}
- $self->write($_[1]);
+ $self->write(\($_[1]));
}
sub input_prepare {
@@ -467,7 +467,7 @@ sub read_input_chunked { # unlikely...
sub quit {
my ($self, $status) = @_;
my $h = "HTTP/1.1 $status " . status_message($status) . "\r\n\r\n";
- $self->write($h);
+ $self->write(\$h);
$self->close;
}
--
EW
^ permalink raw reply related [flat|nested] 61+ messages in thread
* [PATCH 12/57] ds: share send(..., MSG_MORE) logic
2019-06-24 2:52 [PATCH 00/57] ds: shrink, TLS support, buffer writes to FS Eric Wong
` (10 preceding siblings ...)
2019-06-24 2:52 ` [PATCH 11/57] http: favor DS->write(strref) when reasonable Eric Wong
@ 2019-06-24 2:52 ` Eric Wong
2019-06-24 2:52 ` [PATCH 13/57] ds: switch write buffering to use a tempfile Eric Wong
` (46 subsequent siblings)
58 siblings, 0 replies; 61+ messages in thread
From: Eric Wong @ 2019-06-24 2:52 UTC (permalink / raw)
To: meta
No sense in having similar Linux-specific functionality in
both our NNTP.pm and HTTP.pm
---
lib/PublicInbox/DS.pm | 21 +++++++++++++++++-
lib/PublicInbox/HTTP.pm | 26 +++++------------------
lib/PublicInbox/NNTP.pm | 47 ++++++++++++-----------------------------
3 files changed, 38 insertions(+), 56 deletions(-)
diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm
index ba8bd95f..3e8b0b1a 100644
--- a/lib/PublicInbox/DS.pm
+++ b/lib/PublicInbox/DS.pm
@@ -21,7 +21,7 @@ use IO::Handle qw();
use Fcntl qw(FD_CLOEXEC F_SETFD F_GETFD);
use Time::HiRes qw(clock_gettime CLOCK_MONOTONIC);
use parent qw(Exporter);
-our @EXPORT_OK = qw(now);
+our @EXPORT_OK = qw(now msg_more);
use warnings;
use PublicInbox::Syscall qw(:epoll);
@@ -561,6 +561,25 @@ sub write {
}
}
+use constant MSG_MORE => ($^O eq 'linux') ? 0x8000 : 0;
+
+sub msg_more ($$) {
+ my $self = $_[0];
+ my $sock = $self->{sock} or return 1;
+
+ if (MSG_MORE && !$self->{wbuf}) {
+ my $n = send($sock, $_[1], MSG_MORE);
+ if (defined $n) {
+ my $nlen = bytes::length($_[1]) - $n;
+ return 1 if $nlen == 0; # all done!
+
+ # PublicInbox::DS::write queues the unwritten substring:
+ return $self->write(substr($_[1], $n, $nlen));
+ }
+ }
+ $self->write(\($_[1]));
+}
+
=head2 C<< $obj->watch_read( $boolean ) >>
Turn 'readable' event notification on or off.
diff --git a/lib/PublicInbox/HTTP.pm b/lib/PublicInbox/HTTP.pm
index 4f1f88fe..a669eb6e 100644
--- a/lib/PublicInbox/HTTP.pm
+++ b/lib/PublicInbox/HTTP.pm
@@ -19,6 +19,7 @@ use HTTP::Status qw(status_message);
use HTTP::Date qw(time2str);
use IO::Handle;
require PublicInbox::EvCleanup;
+PublicInbox::DS->import('msg_more');
use constant {
CHUNK_START => -1, # [a-f0-9]+\r\n
CHUNK_END => -2, # \r\n
@@ -207,7 +208,7 @@ sub response_header_write {
$h .= 'Date: ' . http_date() . "\r\n\r\n";
if (($len || $chunked) && $env->{REQUEST_METHOD} ne 'HEAD') {
- more($self, $h);
+ msg_more($self, $h);
} else {
$self->write(\$h);
}
@@ -219,12 +220,12 @@ sub chunked_wcb ($) {
my ($self) = @_;
sub {
return if $_[0] eq '';
- more($self, sprintf("%x\r\n", bytes::length($_[0])));
- more($self, $_[0]);
+ msg_more($self, sprintf("%x\r\n", bytes::length($_[0])));
+ msg_more($self, $_[0]);
# use $self->write(\"\n\n") if you care about real-time
# streaming responses, public-inbox WWW does not.
- more($self, "\r\n");
+ msg_more($self, "\r\n");
}
}
@@ -316,23 +317,6 @@ sub response_write {
}
}
-use constant MSG_MORE => ($^O eq 'linux') ? 0x8000 : 0;
-sub more ($$) {
- my $self = $_[0];
- return unless $self->{sock};
- if (MSG_MORE && !$self->{wbuf}) {
- my $n = send($self->{sock}, $_[1], MSG_MORE);
- if (defined $n) {
- my $nlen = length($_[1]) - $n;
- return 1 if $nlen == 0; # all done!
-
- # PublicInbox::DS::write queues the unwritten substring:
- return $self->write(substr($_[1], $n, $nlen));
- }
- }
- $self->write(\($_[1]));
-}
-
sub input_prepare {
my ($self, $env) = @_;
my $input;
diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm
index d9097cc7..fe01627f 100644
--- a/lib/PublicInbox/NNTP.pm
+++ b/lib/PublicInbox/NNTP.pm
@@ -14,7 +14,7 @@ use PublicInbox::Git;
require PublicInbox::EvCleanup;
use Email::Simple;
use POSIX qw(strftime);
-PublicInbox::DS->import('now');
+PublicInbox::DS->import(qw(now msg_more));
use Digest::SHA qw(sha1_hex);
use Time::Local qw(timegm timelocal);
use constant {
@@ -159,12 +159,12 @@ sub cmd_xgtitle ($;$) {
sub list_overview_fmt ($) {
my ($self) = @_;
- do_more($self, $OVERVIEW_FMT);
+ msg_more($self, $OVERVIEW_FMT);
}
sub list_headers ($;$) {
my ($self) = @_;
- do_more($self, $LIST_HEADERS);
+ msg_more($self, $LIST_HEADERS);
}
sub list_active ($;$) {
@@ -519,8 +519,8 @@ sub simple_body_write ($$) {
$s->body_set('');
$body =~ s/^\./../smg;
$body =~ s/(?<!\r)\n/\r\n/sg;
- do_more($self, $body);
- do_more($self, "\r\n") unless $body =~ /\r\n\z/s;
+ msg_more($self, $body);
+ msg_more($self, "\r\n") unless $body =~ /\r\n\z/s;
'.'
}
@@ -550,8 +550,8 @@ sub cmd_article ($;$) {
my ($n, $mid, $s) = @$r;
set_art($self, $art);
more($self, "220 $n <$mid> article retrieved - head and body follow");
- do_more($self, _header($s));
- do_more($self, "\r\n");
+ msg_more($self, _header($s));
+ msg_more($self, "\r\n");
simple_body_write($self, $s);
}
@@ -562,7 +562,7 @@ sub cmd_head ($;$) {
my ($n, $mid, $s) = @$r;
set_art($self, $art);
more($self, "221 $n <$mid> article retrieved - head follows");
- do_more($self, _header($s));
+ msg_more($self, _header($s));
'.'
}
@@ -762,7 +762,7 @@ sub hdr_searchmsg ($$$$) {
$tmp .= $s->{num} . ' ' . $s->$field . "\r\n";
}
utf8::encode($tmp);
- do_more($self, $tmp);
+ msg_more($self, $tmp);
$cur = $msgs->[-1]->{num} + 1;
});
}
@@ -914,19 +914,13 @@ sub cmd_xpath ($$) {
'223 '.join(' ', @paths);
}
-sub res ($$) {
- my ($self, $line) = @_;
- do_write($self, $line . "\r\n");
-}
+sub res ($$) { do_write($_[0], $_[1] . "\r\n") }
-sub more ($$) {
- my ($self, $line) = @_;
- do_more($self, $line . "\r\n");
-}
+sub more ($$) { msg_more($_[0], $_[1] . "\r\n") }
sub do_write ($$) {
- my ($self, $data) = @_;
- my $done = $self->write($data);
+ my $self = $_[0];
+ my $done = $self->write(\($_[1]));
return 0 unless $self->{sock};
# Do not watch for readability if we have data in the queue,
@@ -946,21 +940,6 @@ sub out ($$;@) {
printf { $self->{nntpd}->{out} } $fmt."\n", @args;
}
-use constant MSG_MORE => ($^O eq 'linux') ? 0x8000 : 0;
-
-sub do_more ($$) {
- my ($self, $data) = @_;
- if (MSG_MORE && !$self->{wbuf}) {
- my $n = send($self->{sock}, $data, MSG_MORE);
- if (defined $n) {
- my $dlen = length($data);
- return 1 if $n == $dlen; # all done!
- $data = substr($data, $n, $dlen - $n);
- }
- }
- do_write($self, $data);
-}
-
sub event_step {
my ($self) = @_;
--
EW
^ permalink raw reply related [flat|nested] 61+ messages in thread
* [PATCH 13/57] ds: switch write buffering to use a tempfile
2019-06-24 2:52 [PATCH 00/57] ds: shrink, TLS support, buffer writes to FS Eric Wong
` (11 preceding siblings ...)
2019-06-24 2:52 ` [PATCH 12/57] ds: share send(..., MSG_MORE) logic Eric Wong
@ 2019-06-24 2:52 ` Eric Wong
2019-06-24 2:52 ` [PATCH 14/57] ds: get rid of redundant and unnecessary POLL* constants Eric Wong
` (45 subsequent siblings)
58 siblings, 0 replies; 61+ messages in thread
From: Eric Wong @ 2019-06-24 2:52 UTC (permalink / raw)
To: meta
Data which can't fit into a generously-sized socket buffer,
has no business being stored in heap.
---
lib/PublicInbox/DS.pm | 110 ++++++++++++++++++++++++++++++----------
lib/PublicInbox/HTTP.pm | 20 ++------
2 files changed, 85 insertions(+), 45 deletions(-)
diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm
index 3e8b0b1a..eb468f57 100644
--- a/lib/PublicInbox/DS.pm
+++ b/lib/PublicInbox/DS.pm
@@ -18,22 +18,23 @@ use strict;
use bytes;
use POSIX ();
use IO::Handle qw();
-use Fcntl qw(FD_CLOEXEC F_SETFD F_GETFD);
+use Fcntl qw(FD_CLOEXEC F_SETFD F_GETFD SEEK_SET);
use Time::HiRes qw(clock_gettime CLOCK_MONOTONIC);
use parent qw(Exporter);
-our @EXPORT_OK = qw(now msg_more);
+our @EXPORT_OK = qw(now msg_more write_in_full);
use warnings;
use PublicInbox::Syscall qw(:epoll);
use fields ('sock', # underlying socket
- 'wbuf', # arrayref of scalars, scalarrefs, or coderefs to write
+ 'wbuf', # arrayref of coderefs or GLOB refs
'wbuf_off', # offset into first element of wbuf to start writing at
'event_watch', # bitmask of events the client is interested in (POLLIN,OUT,etc.)
);
use Errno qw(EAGAIN EINVAL);
use Carp qw(croak confess);
+use File::Temp qw(tempfile);
use constant POLLIN => 1;
use constant POLLOUT => 4;
@@ -478,32 +479,51 @@ sub close {
return 0;
}
+# portable, non-thread-safe sendfile emulation (no pread, yet)
+sub psendfile ($$$) {
+ my ($sock, $fh, $off) = @_;
+
+ sysseek($fh, $$off, SEEK_SET) or return;
+ defined(my $to_write = sysread($fh, my $buf, 16384)) or return;
+ my $written = 0;
+ while ($to_write > 0) {
+ if (defined(my $w = syswrite($sock, $buf, $to_write, $written))) {
+ $written += $w;
+ $to_write -= $w;
+ } else {
+ return if $written == 0;
+ last;
+ }
+ }
+ $$off += $written;
+ $written;
+}
+
# returns 1 if done, 0 if incomplete
sub flush_write ($) {
my ($self) = @_;
my $wbuf = $self->{wbuf} or return 1;
my $sock = $self->{sock} or return 1;
+next_buf:
while (my $bref = $wbuf->[0]) {
- my $ref = ref($bref);
- if ($ref eq 'SCALAR') {
- my $len = bytes::length($$bref);
- my $off = $self->{wbuf_off} || 0;
- my $to_write = $len - $off;
- my $written = syswrite($sock, $$bref, $to_write, $off);
- if (defined $written) {
- if ($written == $to_write) {
- shift @$wbuf;
+ if (ref($bref) ne 'CODE') {
+ my $off = delete($self->{wbuf_off}) // 0;
+ while (1) {
+ my $w = psendfile($sock, $bref, \$off);
+ if (defined $w) {
+ if ($w == 0) {
+ shift @$wbuf;
+ goto next_buf;
+ }
+ } elsif ($! == EAGAIN) {
+ $self->{wbuf_off} = $off;
+ watch_write($self, 1);
+ return 0;
} else {
- $self->{wbuf_off} = $off + $written;
+ return $self->close;
}
- next; # keep going until EAGAIN
- } elsif ($! == EAGAIN) {
- $self->watch_write(1);
- } else {
- $self->close;
}
- return 0;
} else { #($ref eq 'CODE') {
shift @$wbuf;
$bref->();
@@ -515,6 +535,31 @@ sub flush_write ($) {
1; # all done
}
+sub write_in_full ($$$$) {
+ my ($fh, $bref, $len, $off) = @_;
+ my $rv = 0;
+ while ($len > 0) {
+ my $w = syswrite($fh, $$bref, $len, $off);
+ return ($rv ? $rv : $w) unless $w; # undef or 0
+ $rv += $w;
+ $len -= $w;
+ $off += $w;
+ }
+ $rv
+}
+
+sub tmpbuf ($$) {
+ my ($bref, $off) = @_;
+ # open(my $fh, '+>>', undef) doesn't set O_APPEND
+ my ($fh, $path) = tempfile('wbuf-XXXXXXX', TMPDIR => 1);
+ open $fh, '+>>', $path or die "open: $!";
+ unlink $path;
+ my $to_write = bytes::length($$bref) - $off;
+ my $w = write_in_full($fh, $bref, $to_write, $off);
+ die "write_in_full ($to_write): $!" unless defined $w;
+ $w == $to_write ? $fh : die("short write $w < $to_write");
+}
+
=head2 C<< $obj->write( $data ) >>
Write the specified data to the underlying handle. I<data> may be scalar,
@@ -537,7 +582,16 @@ sub write {
my $ref = ref $data;
my $bref = $ref ? $data : \$data;
if (my $wbuf = $self->{wbuf}) { # already buffering, can't write more...
- push @$wbuf, $bref;
+ if ($ref eq 'CODE') {
+ push @$wbuf, $bref;
+ } else {
+ my $last = $wbuf->[-1];
+ if (ref($last) eq 'GLOB') { # append to tmp file buffer
+ write_in_full($last, $bref, bytes::length($$bref), 0);
+ } else {
+ push @$wbuf, tmpbuf($bref, 0);
+ }
+ }
return 0;
} elsif ($ref eq 'CODE') {
$bref->();
@@ -548,15 +602,13 @@ sub write {
if (defined $written) {
return 1 if $written == $to_write;
- $self->{wbuf_off} = $written;
- $self->{wbuf} = [ $bref ];
- return flush_write($self); # try until EAGAIN
} elsif ($! == EAGAIN) {
- $self->{wbuf} = [ $bref ];
- $self->watch_write(1);
+ $written = 0;
} else {
- $self->close;
+ return $self->close;
}
+ $self->{wbuf} = [ tmpbuf($bref, $written) ];
+ watch_write($self, 1);
return 0;
}
}
@@ -573,8 +625,10 @@ sub msg_more ($$) {
my $nlen = bytes::length($_[1]) - $n;
return 1 if $nlen == 0; # all done!
- # PublicInbox::DS::write queues the unwritten substring:
- return $self->write(substr($_[1], $n, $nlen));
+ # queue up the unwritten substring:
+ $self->{wbuf} = [ tmpbuf(\($_[1]), $n) ];
+ watch_write($self, 1);
+ return 0;
}
}
$self->write(\($_[1]));
diff --git a/lib/PublicInbox/HTTP.pm b/lib/PublicInbox/HTTP.pm
index a669eb6e..fcb5eb6c 100644
--- a/lib/PublicInbox/HTTP.pm
+++ b/lib/PublicInbox/HTTP.pm
@@ -19,7 +19,7 @@ use HTTP::Status qw(status_message);
use HTTP::Date qw(time2str);
use IO::Handle;
require PublicInbox::EvCleanup;
-PublicInbox::DS->import('msg_more');
+PublicInbox::DS->import(qw(msg_more write_in_full));
use constant {
CHUNK_START => -1, # [a-f0-9]+\r\n
CHUNK_END => -2, # \r\n
@@ -125,7 +125,7 @@ sub read_input ($) {
while ($len > 0) {
if ($$rbuf ne '') {
- my $w = write_in_full($input, $rbuf, $len);
+ my $w = write_in_full($input, $rbuf, $len, 0);
return write_err($self, $len) unless $w;
$len -= $w;
die "BUG: $len < 0 (w=$w)" if $len < 0;
@@ -367,20 +367,6 @@ sub recv_err {
quit($self, 500);
}
-sub write_in_full {
- my ($fh, $rbuf, $len) = @_;
- my $rv = 0;
- my $off = 0;
- while ($len > 0) {
- my $w = syswrite($fh, $$rbuf, $len, $off);
- return ($rv ? $rv : $w) unless $w; # undef or 0
- $rv += $w;
- $off += $w;
- $len -= $w;
- }
- $rv
-}
-
sub read_input_chunked { # unlikely...
my ($self) = @_;
my $input = $self->{env}->{'psgi.input'};
@@ -425,7 +411,7 @@ sub read_input_chunked { # unlikely...
# drain the current chunk
until ($len <= 0) {
if ($$rbuf ne '') {
- my $w = write_in_full($input, $rbuf, $len);
+ my $w = write_in_full($input, $rbuf, $len, 0);
return write_err($self, "$len chunk") if !$w;
$len -= $w;
if ($len == 0) {
--
EW
^ permalink raw reply related [flat|nested] 61+ messages in thread
* [PATCH 14/57] ds: get rid of redundant and unnecessary POLL* constants
2019-06-24 2:52 [PATCH 00/57] ds: shrink, TLS support, buffer writes to FS Eric Wong
` (12 preceding siblings ...)
2019-06-24 2:52 ` [PATCH 13/57] ds: switch write buffering to use a tempfile Eric Wong
@ 2019-06-24 2:52 ` Eric Wong
2019-06-24 2:52 ` [PATCH 15/57] syscall: get rid of unused EPOLL* constants Eric Wong
` (44 subsequent siblings)
58 siblings, 0 replies; 61+ messages in thread
From: Eric Wong @ 2019-06-24 2:52 UTC (permalink / raw)
To: meta
EPOLL* constants already match their POLL* counterparts and
there's no way Linux can ever diverge or change the values
of those constants. So we'll favor the EPOLL* ones since we
use EPOLLEXCLUSIVE, already.
For weird stuff like kqueue, we'd need to keep maintaining
the mapping, anyways.
---
lib/PublicInbox/DS.pm | 23 +++++++++--------------
1 file changed, 9 insertions(+), 14 deletions(-)
diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm
index eb468f57..bff12de5 100644
--- a/lib/PublicInbox/DS.pm
+++ b/lib/PublicInbox/DS.pm
@@ -29,19 +29,14 @@ use PublicInbox::Syscall qw(:epoll);
use fields ('sock', # underlying socket
'wbuf', # arrayref of coderefs or GLOB refs
'wbuf_off', # offset into first element of wbuf to start writing at
- 'event_watch', # bitmask of events the client is interested in (POLLIN,OUT,etc.)
+ 'event_watch', # bitmask of events the client is interested in
+ # (EPOLLIN,OUT,etc.)
);
use Errno qw(EAGAIN EINVAL);
use Carp qw(croak confess);
use File::Temp qw(tempfile);
-use constant POLLIN => 1;
-use constant POLLOUT => 4;
-use constant POLLERR => 8;
-use constant POLLHUP => 16;
-use constant POLLNVAL => 32;
-
our $HAVE_KQUEUE = eval { require IO::KQueue; 1 };
our (
@@ -403,19 +398,19 @@ sub new {
Carp::cluck("undef sock and/or fd in PublicInbox::DS->new. sock=" . ($sock || "") . ", fd=" . ($fd || ""))
unless $sock && $fd;
- my $ev = $self->{event_watch} = POLLERR|POLLHUP|POLLNVAL;
+ my $ev = $self->{event_watch} = 0;
_InitPoller();
if ($HaveEpoll) {
if ($exclusive) {
- $ev = $self->{event_watch} = EPOLLIN|EPOLLERR|EPOLLHUP|$EPOLLEXCLUSIVE;
+ $ev = $self->{event_watch} = EPOLLIN|$EPOLLEXCLUSIVE;
}
retry:
if (epoll_ctl($Epoll, EPOLL_CTL_ADD, $fd, $ev)) {
if ($! == EINVAL && ($ev & $EPOLLEXCLUSIVE)) {
$EPOLLEXCLUSIVE = 0; # old kernel
- $ev = $self->{event_watch} = EPOLLIN|EPOLLERR|EPOLLHUP;
+ $ev = $self->{event_watch} = EPOLLIN;
goto retry;
}
die "couldn't add epoll watch for $fd: $!\n";
@@ -646,8 +641,8 @@ sub watch_read {
my $val = shift;
my $event = $self->{event_watch};
- $event &= ~POLLIN if ! $val;
- $event |= POLLIN if $val;
+ $event &= ~EPOLLIN if ! $val;
+ $event |= EPOLLIN if $val;
my $fd = fileno($sock);
# If it changed, set it
@@ -676,8 +671,8 @@ sub watch_write {
my $val = shift;
my $event = $self->{event_watch};
- $event &= ~POLLOUT if ! $val;
- $event |= POLLOUT if $val;
+ $event &= ~EPOLLOUT if ! $val;
+ $event |= EPOLLOUT if $val;
my $fd = fileno($sock);
# If it changed, set it
--
EW
^ permalink raw reply related [flat|nested] 61+ messages in thread
* [PATCH 15/57] syscall: get rid of unused EPOLL* constants
2019-06-24 2:52 [PATCH 00/57] ds: shrink, TLS support, buffer writes to FS Eric Wong
` (13 preceding siblings ...)
2019-06-24 2:52 ` [PATCH 14/57] ds: get rid of redundant and unnecessary POLL* constants Eric Wong
@ 2019-06-24 2:52 ` Eric Wong
2019-06-24 2:52 ` [PATCH 16/57] syscall: get rid of unnecessary uname local vars Eric Wong
` (43 subsequent siblings)
58 siblings, 0 replies; 61+ messages in thread
From: Eric Wong @ 2019-06-24 2:52 UTC (permalink / raw)
To: meta
EPOLLRDBAND is used for DECnet; and I'm pretty sure I won't be
updating any of our code to work with DECnet.
I've never found use for EPOLLHUP or EPOLLERR, either; so
disable those for now and add comments for things I might
actually use: EPOLLET and EPOLLONESHOT.
---
lib/PublicInbox/Syscall.pm | 12 +++++++-----
1 file changed, 7 insertions(+), 5 deletions(-)
diff --git a/lib/PublicInbox/Syscall.pm b/lib/PublicInbox/Syscall.pm
index 4ef64cc3..98110eaf 100644
--- a/lib/PublicInbox/Syscall.pm
+++ b/lib/PublicInbox/Syscall.pm
@@ -22,11 +22,11 @@ use vars qw(@ISA @EXPORT_OK %EXPORT_TAGS $VERSION);
$VERSION = "0.25";
@ISA = qw(Exporter);
@EXPORT_OK = qw(sendfile epoll_ctl epoll_create epoll_wait
- EPOLLIN EPOLLOUT EPOLLERR EPOLLHUP EPOLLRDBAND
+ EPOLLIN EPOLLOUT
EPOLL_CTL_ADD EPOLL_CTL_DEL EPOLL_CTL_MOD
EPOLLEXCLUSIVE);
%EXPORT_TAGS = (epoll => [qw(epoll_ctl epoll_create epoll_wait
- EPOLLIN EPOLLOUT EPOLLERR EPOLLHUP EPOLLRDBAND
+ EPOLLIN EPOLLOUT
EPOLL_CTL_ADD EPOLL_CTL_DEL EPOLL_CTL_MOD
EPOLLEXCLUSIVE)],
sendfile => [qw(sendfile)],
@@ -34,10 +34,12 @@ $VERSION = "0.25";
use constant EPOLLIN => 1;
use constant EPOLLOUT => 4;
-use constant EPOLLERR => 8;
-use constant EPOLLHUP => 16;
-use constant EPOLLRDBAND => 128;
+# use constant EPOLLERR => 8;
+# use constant EPOLLHUP => 16;
+# use constant EPOLLRDBAND => 128;
use constant EPOLLEXCLUSIVE => (1 << 28);
+# use constant EPOLLONESHOT => (1 << 30);
+# use constant EPOLLET => (1 << 31);
use constant EPOLL_CTL_ADD => 1;
use constant EPOLL_CTL_DEL => 2;
use constant EPOLL_CTL_MOD => 3;
--
EW
^ permalink raw reply related [flat|nested] 61+ messages in thread
* [PATCH 16/57] syscall: get rid of unnecessary uname local vars
2019-06-24 2:52 [PATCH 00/57] ds: shrink, TLS support, buffer writes to FS Eric Wong
` (14 preceding siblings ...)
2019-06-24 2:52 ` [PATCH 15/57] syscall: get rid of unused EPOLL* constants Eric Wong
@ 2019-06-24 2:52 ` Eric Wong
2019-06-24 2:52 ` [PATCH 17/57] ds: set event flags directly at initialization Eric Wong
` (42 subsequent siblings)
58 siblings, 0 replies; 61+ messages in thread
From: Eric Wong @ 2019-06-24 2:52 UTC (permalink / raw)
To: meta
We don't need to keep information from uname(2) around outside
of startup.
---
lib/PublicInbox/Syscall.pm | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/lib/PublicInbox/Syscall.pm b/lib/PublicInbox/Syscall.pm
index 98110eaf..17fd1398 100644
--- a/lib/PublicInbox/Syscall.pm
+++ b/lib/PublicInbox/Syscall.pm
@@ -59,7 +59,6 @@ sub _load_syscall {
return $rv;
}
-our ($sysname, $nodename, $release, $version, $machine) = POSIX::uname();
our (
$SYS_epoll_create,
@@ -71,6 +70,7 @@ our (
our $no_deprecated = 0;
if ($^O eq "linux") {
+ my $machine = (POSIX::uname())[-1];
# whether the machine requires 64-bit numbers to be on 8-byte
# boundaries.
my $u64_mod_8 = 0;
--
EW
^ permalink raw reply related [flat|nested] 61+ messages in thread
* [PATCH 17/57] ds: set event flags directly at initialization
2019-06-24 2:52 [PATCH 00/57] ds: shrink, TLS support, buffer writes to FS Eric Wong
` (15 preceding siblings ...)
2019-06-24 2:52 ` [PATCH 16/57] syscall: get rid of unnecessary uname local vars Eric Wong
@ 2019-06-24 2:52 ` Eric Wong
2019-06-24 2:52 ` [PATCH 18/57] ds: import IO::KQueue namespace Eric Wong
` (41 subsequent siblings)
58 siblings, 0 replies; 61+ messages in thread
From: Eric Wong @ 2019-06-24 2:52 UTC (permalink / raw)
To: meta
We can avoid the EPOLL_CTL_ADD && EPOLL_CTL_MOD sequence with
a single EPOLL_CTL_ADD.
---
lib/PublicInbox/DS.pm | 23 ++++++++++-------------
lib/PublicInbox/EvCleanup.pm | 2 +-
lib/PublicInbox/HTTP.pm | 3 +--
lib/PublicInbox/HTTPD/Async.pm | 3 +--
lib/PublicInbox/Listener.pm | 4 ++--
lib/PublicInbox/NNTP.pm | 3 +--
lib/PublicInbox/ParentPipe.pm | 3 +--
7 files changed, 17 insertions(+), 24 deletions(-)
diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm
index bff12de5..2e0aa1e0 100644
--- a/lib/PublicInbox/DS.pm
+++ b/lib/PublicInbox/DS.pm
@@ -55,8 +55,6 @@ our (
@Timers, # timers
);
-# this may be set to zero with old kernels
-our $EPOLLEXCLUSIVE = EPOLLEXCLUSIVE;
Reset();
#####################################################################
@@ -389,7 +387,7 @@ This is normally (always?) called from your subclass via:
=cut
sub new {
- my ($self, $sock, $exclusive) = @_;
+ my ($self, $sock, $ev) = @_;
$self = fields::new($self) unless ref $self;
$self->{sock} = $sock;
@@ -398,30 +396,29 @@ sub new {
Carp::cluck("undef sock and/or fd in PublicInbox::DS->new. sock=" . ($sock || "") . ", fd=" . ($fd || ""))
unless $sock && $fd;
- my $ev = $self->{event_watch} = 0;
+ $self->{event_watch} = $ev;
_InitPoller();
if ($HaveEpoll) {
- if ($exclusive) {
- $ev = $self->{event_watch} = EPOLLIN|$EPOLLEXCLUSIVE;
- }
retry:
if (epoll_ctl($Epoll, EPOLL_CTL_ADD, $fd, $ev)) {
- if ($! == EINVAL && ($ev & $EPOLLEXCLUSIVE)) {
- $EPOLLEXCLUSIVE = 0; # old kernel
- $ev = $self->{event_watch} = EPOLLIN;
+ if ($! == EINVAL && ($ev & EPOLLEXCLUSIVE)) {
+ $self->{event_watch} = ($ev &= ~EPOLLEXCLUSIVE);
goto retry;
}
die "couldn't add epoll watch for $fd: $!\n";
}
}
elsif ($HaveKQueue) {
- # Add them to the queue but disabled for now
+ my $f = $ev & EPOLLIN ? IO::KQueue::EV_ENABLE()
+ : IO::KQueue::EV_DISABLE();
$KQueue->EV_SET($fd, IO::KQueue::EVFILT_READ(),
- IO::KQueue::EV_ADD() | IO::KQueue::EV_DISABLE());
+ IO::KQueue::EV_ADD() | $f);
+ $f = $ev & EPOLLOUT ? IO::KQueue::EV_ENABLE()
+ : IO::KQueue::EV_DISABLE();
$KQueue->EV_SET($fd, IO::KQueue::EVFILT_WRITE(),
- IO::KQueue::EV_ADD() | IO::KQueue::EV_DISABLE());
+ IO::KQueue::EV_ADD() | $f);
}
Carp::cluck("PublicInbox::DS::new blowing away existing descriptor map for fd=$fd ($DescriptorMap{$fd})")
diff --git a/lib/PublicInbox/EvCleanup.pm b/lib/PublicInbox/EvCleanup.pm
index bd4dda11..d60ac2cc 100644
--- a/lib/PublicInbox/EvCleanup.pm
+++ b/lib/PublicInbox/EvCleanup.pm
@@ -23,7 +23,7 @@ sub once_init () {
# fires in the next event loop iteration.
pipe($r, $w) or die "pipe: $!";
fcntl($w, 1031, 4096) if $^O eq 'linux'; # 1031: F_SETPIPE_SZ
- $self->SUPER::new($w);
+ $self->SUPER::new($w, 0);
# always writable, since PublicInbox::EvCleanup::event_step
# never drains wbuf. We can avoid wasting a hash slot by
diff --git a/lib/PublicInbox/HTTP.pm b/lib/PublicInbox/HTTP.pm
index fcb5eb6c..afa71ea5 100644
--- a/lib/PublicInbox/HTTP.pm
+++ b/lib/PublicInbox/HTTP.pm
@@ -56,12 +56,11 @@ sub http_date () {
sub new ($$$) {
my ($class, $sock, $addr, $httpd) = @_;
my $self = fields::new($class);
- $self->SUPER::new($sock);
+ $self->SUPER::new($sock, PublicInbox::DS::EPOLLIN());
$self->{httpd} = $httpd;
$self->{rbuf} = '';
($self->{remote_addr}, $self->{remote_port}) =
PublicInbox::Daemon::host_with_port($addr);
- $self->watch_read(1);
$self;
}
diff --git a/lib/PublicInbox/HTTPD/Async.pm b/lib/PublicInbox/HTTPD/Async.pm
index 46ea188c..dae62e55 100644
--- a/lib/PublicInbox/HTTPD/Async.pm
+++ b/lib/PublicInbox/HTTPD/Async.pm
@@ -25,10 +25,9 @@ sub new {
my $self = fields::new($class);
IO::Handle::blocking($io, 0);
- $self->SUPER::new($io);
+ $self->SUPER::new($io, PublicInbox::DS::EPOLLIN());
$self->{cb} = $cb;
$self->{cleanup} = $cleanup;
- $self->watch_read(1);
$self;
}
diff --git a/lib/PublicInbox/Listener.pm b/lib/PublicInbox/Listener.pm
index 6ee3abb1..94b2aed4 100644
--- a/lib/PublicInbox/Listener.pm
+++ b/lib/PublicInbox/Listener.pm
@@ -17,8 +17,8 @@ sub new ($$$) {
listen($s, 1024);
IO::Handle::blocking($s, 0);
my $self = fields::new($class);
- $self->SUPER::new($s, 1); # calls epoll_create for the first socket
- $self->watch_read(1);
+ $self->SUPER::new($s, PublicInbox::DS::EPOLLIN()|
+ PublicInbox::DS::EPOLLEXCLUSIVE());
$self->{post_accept} = $cb;
$self
}
diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm
index fe01627f..eb1679a7 100644
--- a/lib/PublicInbox/NNTP.pm
+++ b/lib/PublicInbox/NNTP.pm
@@ -97,11 +97,10 @@ sub expire_old () {
sub new ($$$) {
my ($class, $sock, $nntpd) = @_;
my $self = fields::new($class);
- $self->SUPER::new($sock);
+ $self->SUPER::new($sock, PublicInbox::DS::EPOLLIN());
$self->{nntpd} = $nntpd;
res($self, '201 ' . $nntpd->{servername} . ' ready - post via email');
$self->{rbuf} = '';
- $self->watch_read(1);
update_idle_time($self);
$expt ||= PublicInbox::EvCleanup::later(*expire_old);
$self;
diff --git a/lib/PublicInbox/ParentPipe.pm b/lib/PublicInbox/ParentPipe.pm
index a9f05fc1..ccc0815e 100644
--- a/lib/PublicInbox/ParentPipe.pm
+++ b/lib/PublicInbox/ParentPipe.pm
@@ -10,9 +10,8 @@ use fields qw(cb);
sub new ($$$) {
my ($class, $pipe, $cb) = @_;
my $self = fields::new($class);
- $self->SUPER::new($pipe);
+ $self->SUPER::new($pipe, PublicInbox::DS::EPOLLIN());
$self->{cb} = $cb;
- $self->watch_read(1);
$self;
}
--
EW
^ permalink raw reply related [flat|nested] 61+ messages in thread
* [PATCH 18/57] ds: import IO::KQueue namespace
2019-06-24 2:52 [PATCH 00/57] ds: shrink, TLS support, buffer writes to FS Eric Wong
` (16 preceding siblings ...)
2019-06-24 2:52 ` [PATCH 17/57] ds: set event flags directly at initialization Eric Wong
@ 2019-06-24 2:52 ` Eric Wong
2019-06-24 2:52 ` [PATCH 19/57] ds: share watch_chg between watch_read/watch_write Eric Wong
` (40 subsequent siblings)
58 siblings, 0 replies; 61+ messages in thread
From: Eric Wong @ 2019-06-24 2:52 UTC (permalink / raw)
To: meta
Make the rest of our IO::KQueue-using code less verbose and
closer to the C equivalent.
---
lib/PublicInbox/DS.pm | 22 +++++++++-------------
1 file changed, 9 insertions(+), 13 deletions(-)
diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm
index 2e0aa1e0..00e2e5c6 100644
--- a/lib/PublicInbox/DS.pm
+++ b/lib/PublicInbox/DS.pm
@@ -37,7 +37,7 @@ use Errno qw(EAGAIN EINVAL);
use Carp qw(croak confess);
use File::Temp qw(tempfile);
-our $HAVE_KQUEUE = eval { require IO::KQueue; 1 };
+our $HAVE_KQUEUE = eval { require IO::KQueue; IO::KQueue->import; 1 };
our (
$HaveEpoll, # Flag -- is epoll available? initially undefined.
@@ -411,14 +411,10 @@ retry:
}
}
elsif ($HaveKQueue) {
- my $f = $ev & EPOLLIN ? IO::KQueue::EV_ENABLE()
- : IO::KQueue::EV_DISABLE();
- $KQueue->EV_SET($fd, IO::KQueue::EVFILT_READ(),
- IO::KQueue::EV_ADD() | $f);
- $f = $ev & EPOLLOUT ? IO::KQueue::EV_ENABLE()
- : IO::KQueue::EV_DISABLE();
- $KQueue->EV_SET($fd, IO::KQueue::EVFILT_WRITE(),
- IO::KQueue::EV_ADD() | $f);
+ my $f = $ev & EPOLLIN ? EV_ENABLE() : EV_DISABLE();
+ $KQueue->EV_SET($fd, EVFILT_READ(), EV_ADD() | $f);
+ $f = $ev & EPOLLOUT ? EV_ENABLE() : EV_DISABLE();
+ $KQueue->EV_SET($fd, EVFILT_WRITE(), EV_ADD() | $f);
}
Carp::cluck("PublicInbox::DS::new blowing away existing descriptor map for fd=$fd ($DescriptorMap{$fd})")
@@ -645,8 +641,8 @@ sub watch_read {
# If it changed, set it
if ($event != $self->{event_watch}) {
if ($HaveKQueue) {
- $KQueue->EV_SET($fd, IO::KQueue::EVFILT_READ(),
- $val ? IO::KQueue::EV_ENABLE() : IO::KQueue::EV_DISABLE());
+ $KQueue->EV_SET($fd, EVFILT_READ(),
+ $val ? EV_ENABLE() : EV_DISABLE());
}
elsif ($HaveEpoll) {
epoll_ctl($Epoll, EPOLL_CTL_MOD, $fd, $event) and
@@ -675,8 +671,8 @@ sub watch_write {
# If it changed, set it
if ($event != $self->{event_watch}) {
if ($HaveKQueue) {
- $KQueue->EV_SET($fd, IO::KQueue::EVFILT_WRITE(),
- $val ? IO::KQueue::EV_ENABLE() : IO::KQueue::EV_DISABLE());
+ $KQueue->EV_SET($fd, EVFILT_WRITE(),
+ $val ? EV_ENABLE() : EV_DISABLE());
}
elsif ($HaveEpoll) {
epoll_ctl($Epoll, EPOLL_CTL_MOD, $fd, $event) and
--
EW
^ permalink raw reply related [flat|nested] 61+ messages in thread
* [PATCH 19/57] ds: share watch_chg between watch_read/watch_write
2019-06-24 2:52 [PATCH 00/57] ds: shrink, TLS support, buffer writes to FS Eric Wong
` (17 preceding siblings ...)
2019-06-24 2:52 ` [PATCH 18/57] ds: import IO::KQueue namespace Eric Wong
@ 2019-06-24 2:52 ` Eric Wong
2019-06-24 2:52 ` [PATCH 20/57] ds: remove IO::Poll support (for now) Eric Wong
` (39 subsequent siblings)
58 siblings, 0 replies; 61+ messages in thread
From: Eric Wong @ 2019-06-24 2:52 UTC (permalink / raw)
To: meta
There was much duplicate logic between watch_read and
watch_write. Share that logic, and give us room to enable
edge-triggered or one-shot notifications in the future.
---
lib/PublicInbox/DS.pm | 73 +++++++++++++++----------------------------
1 file changed, 25 insertions(+), 48 deletions(-)
diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm
index 00e2e5c6..943e30b5 100644
--- a/lib/PublicInbox/DS.pm
+++ b/lib/PublicInbox/DS.pm
@@ -622,65 +622,42 @@ sub msg_more ($$) {
$self->write(\($_[1]));
}
-=head2 C<< $obj->watch_read( $boolean ) >>
-
-Turn 'readable' event notification on or off.
-
-=cut
-sub watch_read {
- my PublicInbox::DS $self = shift;
+sub watch_chg ($$$) {
+ my ($self, $bits, $set) = @_;
my $sock = $self->{sock} or return;
-
- my $val = shift;
- my $event = $self->{event_watch};
-
- $event &= ~EPOLLIN if ! $val;
- $event |= EPOLLIN if $val;
-
+ my $cur = $self->{event_watch};
+ my $changes = $cur;
+ if ($set) {
+ $changes |= $bits;
+ } else {
+ $changes &= ~$bits;
+ }
+ return if $changes == $cur;
my $fd = fileno($sock);
- # If it changed, set it
- if ($event != $self->{event_watch}) {
- if ($HaveKQueue) {
- $KQueue->EV_SET($fd, EVFILT_READ(),
- $val ? EV_ENABLE() : EV_DISABLE());
- }
- elsif ($HaveEpoll) {
- epoll_ctl($Epoll, EPOLL_CTL_MOD, $fd, $event) and
- confess("EPOLL_CTL_MOD: $!");
- }
- $self->{event_watch} = $event;
+ if ($HaveEpoll) {
+ epoll_ctl($Epoll, EPOLL_CTL_MOD, $fd, $changes) and
+ confess("EPOLL_CTL_MOD $!");
+ } elsif ($HaveKQueue) {
+ my $flag = $set ? EV_ENABLE() : EV_DISABLE();
+ $KQueue->EV_SET($fd, EVFILT_READ(), $flag) if $bits & EPOLLIN;
+ $KQueue->EV_SET($fd, EVFILT_WRITE(), $flag) if $bits & EPOLLOUT;
}
+ $self->{event_watch} = $changes;
}
-=head2 C<< $obj->watch_write( $boolean ) >>
+=head2 C<< $obj->watch_read( $boolean ) >>
-Turn 'writable' event notification on or off.
+Turn 'readable' event notification on or off.
=cut
-sub watch_write {
- my PublicInbox::DS $self = shift;
- my $sock = $self->{sock} or return;
+sub watch_read ($$) { watch_chg($_[0], EPOLLIN, $_[1]) };
- my $val = shift;
- my $event = $self->{event_watch};
+=head2 C<< $obj->watch_write( $boolean ) >>
- $event &= ~EPOLLOUT if ! $val;
- $event |= EPOLLOUT if $val;
- my $fd = fileno($sock);
+Turn 'writable' event notification on or off.
- # If it changed, set it
- if ($event != $self->{event_watch}) {
- if ($HaveKQueue) {
- $KQueue->EV_SET($fd, EVFILT_WRITE(),
- $val ? EV_ENABLE() : EV_DISABLE());
- }
- elsif ($HaveEpoll) {
- epoll_ctl($Epoll, EPOLL_CTL_MOD, $fd, $event) and
- confess "EPOLL_CTL_MOD: $!";
- }
- $self->{event_watch} = $event;
- }
-}
+=cut
+sub watch_write ($$) { watch_chg($_[0], EPOLLOUT, $_[1]) };
package PublicInbox::DS::Timer;
# [$abs_float_firetime, $coderef];
--
EW
^ permalink raw reply related [flat|nested] 61+ messages in thread
* [PATCH 20/57] ds: remove IO::Poll support (for now)
2019-06-24 2:52 [PATCH 00/57] ds: shrink, TLS support, buffer writes to FS Eric Wong
` (18 preceding siblings ...)
2019-06-24 2:52 ` [PATCH 19/57] ds: share watch_chg between watch_read/watch_write Eric Wong
@ 2019-06-24 2:52 ` Eric Wong
2019-06-24 2:52 ` [PATCH 21/57] ds: get rid of event_watch field Eric Wong
` (38 subsequent siblings)
58 siblings, 0 replies; 61+ messages in thread
From: Eric Wong @ 2019-06-24 2:52 UTC (permalink / raw)
To: meta
It may be reinstated at a later time if there's interest; but I
want to be able to use one-shot notifications for certain events
while retaining level-triggered notifications others.
OTOH, I intend to fully support kqueue; via IO::KQueue for now,
but via syscall() eventually to take advantage of the syscall
reduction kevent(2) can provide over (current) epoll APIs.
---
lib/PublicInbox/DS.pm | 52 -------------------------------------------
1 file changed, 52 deletions(-)
diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm
index 943e30b5..9c801214 100644
--- a/lib/PublicInbox/DS.pm
+++ b/lib/PublicInbox/DS.pm
@@ -168,11 +168,6 @@ sub _InitPoller
*EventLoop = *EpollEventLoop;
}
}
-
- if (!$HaveEpoll && !$HaveKQueue) {
- require IO::Poll;
- *EventLoop = *PollEventLoop;
- }
}
=head2 C<< CLASS->EventLoop() >>
@@ -190,8 +185,6 @@ sub FirstTimeEventLoop {
EpollEventLoop($class);
} elsif ($HaveKQueue) {
KQueueEventLoop($class);
- } else {
- PollEventLoop($class);
}
}
@@ -250,51 +243,6 @@ sub EpollEventLoop {
exit 0;
}
-### The fallback IO::Poll-based event loop. Gets installed as EventLoop if
-### IO::Epoll fails to load.
-sub PollEventLoop {
- my $class = shift;
-
- my PublicInbox::DS $pob;
-
- while (1) {
- my $timeout = RunTimers();
-
- # the following sets up @poll as a series of ($poll,$event_mask)
- # items, then uses IO::Poll::_poll, implemented in XS, which
- # modifies the array in place with the even elements being
- # replaced with the event masks that occured.
- my @poll;
- while ( my ($fd, $sock) = each %DescriptorMap ) {
- push @poll, $fd, $sock->{event_watch};
- }
-
- # if nothing to poll, either end immediately (if no timeout)
- # or just keep calling the callback
- unless (@poll) {
- select undef, undef, undef, ($timeout / 1000);
- return unless PostEventLoop();
- next;
- }
-
- my $count = IO::Poll::_poll($timeout, @poll);
- unless ($count >= 0) {
- return unless PostEventLoop();
- next;
- }
-
- # Fetch handles with read events
- while (@poll) {
- my ($fd, $state) = splice(@poll, 0, 2);
- $DescriptorMap{$fd}->event_step if $state;
- }
-
- return unless PostEventLoop();
- }
-
- exit 0;
-}
-
### The kqueue-based event loop. Gets installed as EventLoop if IO::KQueue works
### okay.
sub KQueueEventLoop {
--
EW
^ permalink raw reply related [flat|nested] 61+ messages in thread
* [PATCH 21/57] ds: get rid of event_watch field
2019-06-24 2:52 [PATCH 00/57] ds: shrink, TLS support, buffer writes to FS Eric Wong
` (19 preceding siblings ...)
2019-06-24 2:52 ` [PATCH 20/57] ds: remove IO::Poll support (for now) Eric Wong
@ 2019-06-24 2:52 ` Eric Wong
2019-06-24 2:52 ` [PATCH 22/57] httpd/async: remove EINTR check Eric Wong
` (37 subsequent siblings)
58 siblings, 0 replies; 61+ messages in thread
From: Eric Wong @ 2019-06-24 2:52 UTC (permalink / raw)
To: meta
We don't need to keep track of that field since we always
know what events we're interested in when using one-shot
wakeups.
---
lib/PublicInbox/DS.pm | 64 ++++++++++++----------------------
lib/PublicInbox/EvCleanup.pm | 4 +--
lib/PublicInbox/HTTP.pm | 13 +++----
lib/PublicInbox/HTTPD/Async.pm | 10 +++---
lib/PublicInbox/NNTP.pm | 25 +++++--------
lib/PublicInbox/Syscall.pm | 6 ++--
6 files changed, 50 insertions(+), 72 deletions(-)
diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm
index 9c801214..f5986e55 100644
--- a/lib/PublicInbox/DS.pm
+++ b/lib/PublicInbox/DS.pm
@@ -29,8 +29,6 @@ use PublicInbox::Syscall qw(:epoll);
use fields ('sock', # underlying socket
'wbuf', # arrayref of coderefs or GLOB refs
'wbuf_off', # offset into first element of wbuf to start writing at
- 'event_watch', # bitmask of events the client is interested in
- # (EPOLLIN,OUT,etc.)
);
use Errno qw(EAGAIN EINVAL);
@@ -318,6 +316,17 @@ sub PostEventLoop {
return $keep_running;
}
+# map EPOLL* bits to kqueue EV_* flags for EV_SET
+sub kq_flag ($$) {
+ my ($bit, $ev) = @_;
+ if ($ev & $bit) {
+ my $fl = EV_ADD() | EV_ENABLE();
+ ($ev & EPOLLONESHOT) ? ($fl|EV_ONESHOT()) : $fl;
+ } else {
+ EV_DISABLE();
+ }
+}
+
#####################################################################
### PublicInbox::DS-the-object code
#####################################################################
@@ -344,25 +353,21 @@ sub new {
Carp::cluck("undef sock and/or fd in PublicInbox::DS->new. sock=" . ($sock || "") . ", fd=" . ($fd || ""))
unless $sock && $fd;
- $self->{event_watch} = $ev;
-
_InitPoller();
if ($HaveEpoll) {
retry:
if (epoll_ctl($Epoll, EPOLL_CTL_ADD, $fd, $ev)) {
if ($! == EINVAL && ($ev & EPOLLEXCLUSIVE)) {
- $self->{event_watch} = ($ev &= ~EPOLLEXCLUSIVE);
+ $ev &= ~EPOLLEXCLUSIVE;
goto retry;
}
die "couldn't add epoll watch for $fd: $!\n";
}
}
elsif ($HaveKQueue) {
- my $f = $ev & EPOLLIN ? EV_ENABLE() : EV_DISABLE();
- $KQueue->EV_SET($fd, EVFILT_READ(), EV_ADD() | $f);
- $f = $ev & EPOLLOUT ? EV_ENABLE() : EV_DISABLE();
- $KQueue->EV_SET($fd, EVFILT_WRITE(), EV_ADD() | $f);
+ $KQueue->EV_SET($fd, EVFILT_READ(), EV_ADD() | kq_flag(EPOLLIN, $ev));
+ $KQueue->EV_SET($fd, EVFILT_WRITE(), EV_ADD() | kq_flag(EPOLLOUT, $ev));
}
Carp::cluck("PublicInbox::DS::new blowing away existing descriptor map for fd=$fd ($DescriptorMap{$fd})")
@@ -454,7 +459,7 @@ next_buf:
}
} elsif ($! == EAGAIN) {
$self->{wbuf_off} = $off;
- watch_write($self, 1);
+ watch($self, EPOLLOUT|EPOLLONESHOT);
return 0;
} else {
return $self->close;
@@ -467,7 +472,6 @@ next_buf:
} # while @$wbuf
delete $self->{wbuf};
- $self->watch_write(0);
1; # all done
}
@@ -544,7 +548,7 @@ sub write {
return $self->close;
}
$self->{wbuf} = [ tmpbuf($bref, $written) ];
- watch_write($self, 1);
+ watch($self, EPOLLOUT|EPOLLONESHOT);
return 0;
}
}
@@ -563,49 +567,27 @@ sub msg_more ($$) {
# queue up the unwritten substring:
$self->{wbuf} = [ tmpbuf(\($_[1]), $n) ];
- watch_write($self, 1);
+ watch($self, EPOLLOUT|EPOLLONESHOT);
return 0;
}
}
$self->write(\($_[1]));
}
-sub watch_chg ($$$) {
- my ($self, $bits, $set) = @_;
+sub watch ($$) {
+ my ($self, $ev) = @_;
my $sock = $self->{sock} or return;
- my $cur = $self->{event_watch};
- my $changes = $cur;
- if ($set) {
- $changes |= $bits;
- } else {
- $changes &= ~$bits;
- }
- return if $changes == $cur;
my $fd = fileno($sock);
if ($HaveEpoll) {
- epoll_ctl($Epoll, EPOLL_CTL_MOD, $fd, $changes) and
+ epoll_ctl($Epoll, EPOLL_CTL_MOD, $fd, $ev) and
confess("EPOLL_CTL_MOD $!");
} elsif ($HaveKQueue) {
- my $flag = $set ? EV_ENABLE() : EV_DISABLE();
- $KQueue->EV_SET($fd, EVFILT_READ(), $flag) if $bits & EPOLLIN;
- $KQueue->EV_SET($fd, EVFILT_WRITE(), $flag) if $bits & EPOLLOUT;
+ $KQueue->EV_SET($fd, EVFILT_READ(), kq_flag(EPOLLIN, $ev));
+ $KQueue->EV_SET($fd, EVFILT_WRITE(), kq_flag(EPOLLOUT, $ev));
}
- $self->{event_watch} = $changes;
}
-=head2 C<< $obj->watch_read( $boolean ) >>
-
-Turn 'readable' event notification on or off.
-
-=cut
-sub watch_read ($$) { watch_chg($_[0], EPOLLIN, $_[1]) };
-
-=head2 C<< $obj->watch_write( $boolean ) >>
-
-Turn 'writable' event notification on or off.
-
-=cut
-sub watch_write ($$) { watch_chg($_[0], EPOLLOUT, $_[1]) };
+sub watch_in1 ($) { watch($_[0], EPOLLIN | EPOLLONESHOT) }
package PublicInbox::DS::Timer;
# [$abs_float_firetime, $coderef];
diff --git a/lib/PublicInbox/EvCleanup.pm b/lib/PublicInbox/EvCleanup.pm
index d60ac2cc..a9f6167d 100644
--- a/lib/PublicInbox/EvCleanup.pm
+++ b/lib/PublicInbox/EvCleanup.pm
@@ -6,6 +6,7 @@ package PublicInbox::EvCleanup;
use strict;
use warnings;
use base qw(PublicInbox::DS);
+use PublicInbox::Syscall qw(EPOLLOUT EPOLLONESHOT);
my $ENABLED;
sub enabled { $ENABLED }
@@ -59,13 +60,12 @@ sub _run_later () {
# Called by PublicInbox::DS
sub event_step {
my ($self) = @_;
- $self->watch_write(0);
_run_asap();
}
sub _asap_timer () {
$singleton ||= once_init();
- $singleton->watch_write(1);
+ $singleton->watch(EPOLLOUT|EPOLLONESHOT);
1;
}
diff --git a/lib/PublicInbox/HTTP.pm b/lib/PublicInbox/HTTP.pm
index afa71ea5..773d77ba 100644
--- a/lib/PublicInbox/HTTP.pm
+++ b/lib/PublicInbox/HTTP.pm
@@ -20,6 +20,7 @@ use HTTP::Date qw(time2str);
use IO::Handle;
require PublicInbox::EvCleanup;
PublicInbox::DS->import(qw(msg_more write_in_full));
+use PublicInbox::Syscall qw(EPOLLIN EPOLLONESHOT);
use constant {
CHUNK_START => -1, # [a-f0-9]+\r\n
CHUNK_END => -2, # \r\n
@@ -56,7 +57,7 @@ sub http_date () {
sub new ($$$) {
my ($class, $sock, $addr, $httpd) = @_;
my $self = fields::new($class);
- $self->SUPER::new($sock, PublicInbox::DS::EPOLLIN());
+ $self->SUPER::new($sock, EPOLLIN | EPOLLONESHOT);
$self->{httpd} = $httpd;
$self->{rbuf} = '';
($self->{remote_addr}, $self->{remote_port}) =
@@ -80,7 +81,8 @@ sub event_step { # called by PublicInbox::DS
return $self->close if $r == 0;
return rbuf_process($self);
}
- return if $!{EAGAIN}; # no need to call watch_read(1) again
+
+ return $self->watch_in1 if $!{EAGAIN};
# common for clients to break connections without warning,
# would be too noisy to log here:
@@ -100,7 +102,7 @@ sub rbuf_process {
($r == -2 && length($self->{rbuf}) > 0x4000)) {
return quit($self, 400);
}
- return $self->watch_read(1) if $r < 0; # incomplete
+ return $self->watch_in1 if $r < 0; # incomplete
$self->{rbuf} = substr($self->{rbuf}, $r);
my $len = input_prepare($self, \%env);
@@ -143,7 +145,6 @@ sub read_input ($) {
sub app_dispatch {
my ($self, $input) = @_;
- $self->watch_read(0);
my $env = $self->{env};
$env->{REMOTE_ADDR} = $self->{remote_addr};
$env->{REMOTE_PORT} = $self->{remote_port};
@@ -236,7 +237,7 @@ sub identity_wcb ($) {
sub next_request ($) {
my ($self) = @_;
if ($self->{rbuf} eq '') { # wait for next request
- $self->watch_read(1);
+ $self->watch_in1;
} else { # avoid recursion for pipelined requests
push @$pipelineq, $self;
$pipet ||= PublicInbox::EvCleanup::asap(*process_pipelineq);
@@ -360,7 +361,7 @@ sub recv_err {
return $self->close if (defined $r && $r == 0);
if ($!{EAGAIN}) {
$self->{input_left} = $len;
- return;
+ return $self->watch_in1;
}
err($self, "error reading for input: $! ($len bytes remaining)");
quit($self, 500);
diff --git a/lib/PublicInbox/HTTPD/Async.pm b/lib/PublicInbox/HTTPD/Async.pm
index dae62e55..f32ef009 100644
--- a/lib/PublicInbox/HTTPD/Async.pm
+++ b/lib/PublicInbox/HTTPD/Async.pm
@@ -31,10 +31,12 @@ sub new {
$self;
}
+sub restart_read ($) { $_[0]->watch(PublicInbox::DS::EPOLLIN()) }
+
# fires after pending writes are complete:
sub restart_read_cb ($) {
my ($self) = @_;
- sub { $self->watch_read(1) }
+ sub { restart_read($self) }
}
sub main_cb ($$$) {
@@ -46,16 +48,16 @@ sub main_cb ($$$) {
$fh->write($$bref);
if ($http->{sock}) { # !closed
if ($http->{wbuf}) {
- $self->watch_read(0);
+ $self->watch(0);
$http->write(restart_read_cb($self));
}
- # stay in watch_read, but let other clients
+ # stay in EPOLLIN, but let other clients
# get some work done, too.
return;
}
# fall through to close below...
} elsif (!defined $r) {
- return if $!{EAGAIN} || $!{EINTR};
+ return restart_read($self) if $!{EAGAIN} || $!{EINTR};
}
# Done! Error handling will happen in $fh->close
diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm
index eb1679a7..98f88410 100644
--- a/lib/PublicInbox/NNTP.pm
+++ b/lib/PublicInbox/NNTP.pm
@@ -24,6 +24,7 @@ use constant {
r225 => '225 Headers follow (multi-line)',
r430 => '430 No article with that message-id',
};
+use PublicInbox::Syscall qw(EPOLLIN EPOLLONESHOT);
my @OVERVIEW = qw(Subject From Date Message-ID References Xref);
my $OVERVIEW_FMT = join(":\r\n", @OVERVIEW, qw(Bytes Lines)) . ":\r\n";
@@ -52,12 +53,6 @@ sub next_tick () {
# pipelined request, we bypassed socket-readiness
# checks to get here:
event_step($nntp);
-
- # maybe there's more pipelined data, or we'll have
- # to register it for socket-readiness notifications
- if (!$nntp->{long_res} && $nntp->{sock}) {
- check_read($nntp);
- }
}
}
}
@@ -97,7 +92,7 @@ sub expire_old () {
sub new ($$$) {
my ($class, $sock, $nntpd) = @_;
my $self = fields::new($class);
- $self->SUPER::new($sock, PublicInbox::DS::EPOLLIN());
+ $self->SUPER::new($sock, EPOLLIN | EPOLLONESHOT);
$self->{nntpd} = $nntpd;
res($self, '201 ' . $nntpd->{servername} . ' ready - post via email');
$self->{rbuf} = '';
@@ -624,11 +619,10 @@ sub long_response ($$) {
# make sure we disable reading during a long response,
# clients should not be sending us stuff and making us do more
# work while we are stream a response to them
- $self->watch_read(0);
my $t0 = now();
$self->{long_res} = sub {
my $more = eval { $cb->() };
- if ($@ || !$self->{sock}) {
+ if ($@ || !$self->{sock}) { # something bad happened...
$self->{long_res} = undef;
if ($@) {
@@ -922,10 +916,6 @@ sub do_write ($$) {
my $done = $self->write(\($_[1]));
return 0 unless $self->{sock};
- # Do not watch for readability if we have data in the queue,
- # instead re-enable watching for readability when we can
- $self->watch_read(0) if (!$done || $self->{long_res});
-
$done;
}
@@ -943,7 +933,6 @@ sub event_step {
my ($self) = @_;
return unless $self->flush_write && $self->{sock};
- return if $self->{long_res};
update_idle_time($self);
# only read more requests if we've drained the write buffer,
@@ -957,7 +946,7 @@ sub event_step {
my $off = length($$rbuf);
$r = sysread($self->{sock}, $$rbuf, LINE_MAX, $off);
unless (defined $r) {
- return if $!{EAGAIN};
+ return $self->watch_in1 if $!{EAGAIN};
return $self->close;
}
return $self->close if $r == 0;
@@ -978,6 +967,10 @@ sub event_step {
my $len = length($$rbuf);
return $self->close if ($len >= LINE_MAX);
update_idle_time($self);
+
+ # maybe there's more pipelined data, or we'll have
+ # to register it for socket-readiness notifications
+ check_read($self) unless ($self->{long_res} || $self->{wbuf});
}
sub check_read {
@@ -993,7 +986,7 @@ sub check_read {
} else {
# no pipelined requests available, let the kernel know
# to wake us up if there's more
- $self->watch_read(1); # PublicInbox::DS::watch_read
+ $self->watch_in1; # PublicInbox::DS::watch_in1
}
}
diff --git a/lib/PublicInbox/Syscall.pm b/lib/PublicInbox/Syscall.pm
index 17fd1398..f1988e61 100644
--- a/lib/PublicInbox/Syscall.pm
+++ b/lib/PublicInbox/Syscall.pm
@@ -24,11 +24,11 @@ $VERSION = "0.25";
@EXPORT_OK = qw(sendfile epoll_ctl epoll_create epoll_wait
EPOLLIN EPOLLOUT
EPOLL_CTL_ADD EPOLL_CTL_DEL EPOLL_CTL_MOD
- EPOLLEXCLUSIVE);
+ EPOLLONESHOT EPOLLEXCLUSIVE);
%EXPORT_TAGS = (epoll => [qw(epoll_ctl epoll_create epoll_wait
EPOLLIN EPOLLOUT
EPOLL_CTL_ADD EPOLL_CTL_DEL EPOLL_CTL_MOD
- EPOLLEXCLUSIVE)],
+ EPOLLONESHOT EPOLLEXCLUSIVE)],
sendfile => [qw(sendfile)],
);
@@ -38,7 +38,7 @@ use constant EPOLLOUT => 4;
# use constant EPOLLHUP => 16;
# use constant EPOLLRDBAND => 128;
use constant EPOLLEXCLUSIVE => (1 << 28);
-# use constant EPOLLONESHOT => (1 << 30);
+use constant EPOLLONESHOT => (1 << 30);
# use constant EPOLLET => (1 << 31);
use constant EPOLL_CTL_ADD => 1;
use constant EPOLL_CTL_DEL => 2;
--
EW
^ permalink raw reply related [flat|nested] 61+ messages in thread
* [PATCH 22/57] httpd/async: remove EINTR check
2019-06-24 2:52 [PATCH 00/57] ds: shrink, TLS support, buffer writes to FS Eric Wong
` (20 preceding siblings ...)
2019-06-24 2:52 ` [PATCH 21/57] ds: get rid of event_watch field Eric Wong
@ 2019-06-24 2:52 ` Eric Wong
2019-06-24 2:52 ` [PATCH 23/57] spawn: remove `Blocking' flag handling Eric Wong
` (36 subsequent siblings)
58 siblings, 0 replies; 61+ messages in thread
From: Eric Wong @ 2019-06-24 2:52 UTC (permalink / raw)
To: meta
This pipe is always non-blocking when run under public-inbox-httpd
and it won't fail with EINTR in that case
---
lib/PublicInbox/HTTPD/Async.pm | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/lib/PublicInbox/HTTPD/Async.pm b/lib/PublicInbox/HTTPD/Async.pm
index f32ef009..3eb7f75a 100644
--- a/lib/PublicInbox/HTTPD/Async.pm
+++ b/lib/PublicInbox/HTTPD/Async.pm
@@ -57,7 +57,7 @@ sub main_cb ($$$) {
}
# fall through to close below...
} elsif (!defined $r) {
- return restart_read($self) if $!{EAGAIN} || $!{EINTR};
+ return restart_read($self) if $!{EAGAIN};
}
# Done! Error handling will happen in $fh->close
--
EW
^ permalink raw reply related [flat|nested] 61+ messages in thread
* [PATCH 23/57] spawn: remove `Blocking' flag handling
2019-06-24 2:52 [PATCH 00/57] ds: shrink, TLS support, buffer writes to FS Eric Wong
` (21 preceding siblings ...)
2019-06-24 2:52 ` [PATCH 22/57] httpd/async: remove EINTR check Eric Wong
@ 2019-06-24 2:52 ` Eric Wong
2019-06-24 2:52 ` [PATCH 24/57] qspawn: describe where `$rpipe' come from Eric Wong
` (35 subsequent siblings)
58 siblings, 0 replies; 61+ messages in thread
From: Eric Wong @ 2019-06-24 2:52 UTC (permalink / raw)
To: meta
Instead, the O_NONBLOCK flag is set by PublicInbox::HTTPD::Async;
and we won't be setting it elsewhere.
---
lib/PublicInbox/Spawn.pm | 2 --
t/spawn.t | 11 -----------
2 files changed, 13 deletions(-)
diff --git a/lib/PublicInbox/Spawn.pm b/lib/PublicInbox/Spawn.pm
index 66b916df..9161bb5b 100644
--- a/lib/PublicInbox/Spawn.pm
+++ b/lib/PublicInbox/Spawn.pm
@@ -229,8 +229,6 @@ sub popen_rd {
my ($cmd, $env, $opts) = @_;
pipe(my ($r, $w)) or die "pipe: $!\n";
$opts ||= {};
- my $blocking = $opts->{Blocking};
- IO::Handle::blocking($r, $blocking) if defined $blocking;
$opts->{1} = fileno($w);
my $pid = spawn($cmd, $env, $opts);
return unless defined $pid;
diff --git a/t/spawn.t b/t/spawn.t
index 88404282..1d71b26d 100644
--- a/t/spawn.t
+++ b/t/spawn.t
@@ -81,17 +81,6 @@ use PublicInbox::Spawn qw(which spawn popen_rd);
isnt($?, 0, '$? set properly: '.$?);
}
-{
- my ($fh, $pid) = popen_rd([qw(sleep 60)], undef, { Blocking => 0 });
- ok(defined $pid && $pid > 0, 'returned pid when array requested');
- is(kill(0, $pid), 1, 'child process is running');
- ok(!defined(sysread($fh, my $buf, 1)) && $!{EAGAIN},
- 'sysread returned quickly with EAGAIN');
- is(kill(9, $pid), 1, 'child process killed early');
- is(waitpid($pid, 0), $pid, 'child process reapable');
- isnt($?, 0, '$? set properly: '.$?);
-}
-
SKIP: {
eval {
require BSD::Resource;
--
EW
^ permalink raw reply related [flat|nested] 61+ messages in thread
* [PATCH 24/57] qspawn: describe where `$rpipe' come from
2019-06-24 2:52 [PATCH 00/57] ds: shrink, TLS support, buffer writes to FS Eric Wong
` (22 preceding siblings ...)
2019-06-24 2:52 ` [PATCH 23/57] spawn: remove `Blocking' flag handling Eric Wong
@ 2019-06-24 2:52 ` Eric Wong
2019-06-24 2:52 ` [PATCH 25/57] http|nntp: favor "$! == EFOO" over $!{EFOO} checks Eric Wong
` (34 subsequent siblings)
58 siblings, 0 replies; 61+ messages in thread
From: Eric Wong @ 2019-06-24 2:52 UTC (permalink / raw)
To: meta
It wasn't immediately obvious to me after several months of
not looking at this code.
---
lib/PublicInbox/Qspawn.pm | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm
index 9aede103..943ee801 100644
--- a/lib/PublicInbox/Qspawn.pm
+++ b/lib/PublicInbox/Qspawn.pm
@@ -122,7 +122,7 @@ sub psgi_qx {
eval { $qx_cb->($qx) };
$qx = undef;
};
- my $rpipe;
+ my $rpipe; # comes from popen_rd
my $async = $env->{'pi-httpd.async'};
my $cb = sub {
my $r = sysread($rpipe, my $buf, 8192);
@@ -137,7 +137,7 @@ sub psgi_qx {
};
$limiter ||= $def_limiter ||= PublicInbox::Qspawn::Limiter->new(32);
$self->start($limiter, sub { # may run later, much later...
- ($rpipe) = @_;
+ ($rpipe) = @_; # popen_rd result
if ($async) {
# PublicInbox::HTTPD::Async->new($rpipe, $cb, $end)
$async = $async->($rpipe, $cb, $end);
--
EW
^ permalink raw reply related [flat|nested] 61+ messages in thread
* [PATCH 25/57] http|nntp: favor "$! == EFOO" over $!{EFOO} checks
2019-06-24 2:52 [PATCH 00/57] ds: shrink, TLS support, buffer writes to FS Eric Wong
` (23 preceding siblings ...)
2019-06-24 2:52 ` [PATCH 24/57] qspawn: describe where `$rpipe' come from Eric Wong
@ 2019-06-24 2:52 ` Eric Wong
2019-06-24 2:52 ` [PATCH 26/57] ds: favor `delete' over assigning fields to `undef' Eric Wong
` (33 subsequent siblings)
58 siblings, 0 replies; 61+ messages in thread
From: Eric Wong @ 2019-06-24 2:52 UTC (permalink / raw)
To: meta
Integer comparisions of "$!" are faster than hash lookups.
See commit 6fa2b29fcd0477d126ebb7db7f97b334f74bbcbc
("ds: cleanup Errno imports and favor constant comparisons")
for benchmarks.
---
lib/PublicInbox/HTTP.pm | 7 +++----
lib/PublicInbox/HTTPD/Async.pm | 3 ++-
lib/PublicInbox/NNTP.pm | 4 ++--
lib/PublicInbox/Qspawn.pm | 7 +++++--
4 files changed, 12 insertions(+), 9 deletions(-)
diff --git a/lib/PublicInbox/HTTP.pm b/lib/PublicInbox/HTTP.pm
index 773d77ba..4738e156 100644
--- a/lib/PublicInbox/HTTP.pm
+++ b/lib/PublicInbox/HTTP.pm
@@ -27,6 +27,7 @@ use constant {
CHUNK_ZEND => -3, # \r\n
CHUNK_MAX_HDR => 256,
};
+use Errno qw(EAGAIN);
my $pipelineq = [];
my $pipet;
@@ -82,11 +83,9 @@ sub event_step { # called by PublicInbox::DS
return rbuf_process($self);
}
- return $self->watch_in1 if $!{EAGAIN};
-
# common for clients to break connections without warning,
# would be too noisy to log here:
- return $self->close;
+ $! == EAGAIN ? $self->watch_in1 : $self->close;
}
sub rbuf_process {
@@ -359,7 +358,7 @@ sub write_err {
sub recv_err {
my ($self, $r, $len) = @_;
return $self->close if (defined $r && $r == 0);
- if ($!{EAGAIN}) {
+ if ($! == EAGAIN) {
$self->{input_left} = $len;
return $self->watch_in1;
}
diff --git a/lib/PublicInbox/HTTPD/Async.pm b/lib/PublicInbox/HTTPD/Async.pm
index 3eb7f75a..9cc41f17 100644
--- a/lib/PublicInbox/HTTPD/Async.pm
+++ b/lib/PublicInbox/HTTPD/Async.pm
@@ -11,6 +11,7 @@ use warnings;
use base qw(PublicInbox::DS);
use fields qw(cb cleanup);
require PublicInbox::EvCleanup;
+use Errno qw(EAGAIN);
sub new {
my ($class, $io, $cb, $cleanup) = @_;
@@ -57,7 +58,7 @@ sub main_cb ($$$) {
}
# fall through to close below...
} elsif (!defined $r) {
- return restart_read($self) if $!{EAGAIN};
+ return restart_read($self) if $! == EAGAIN;
}
# Done! Error handling will happen in $fh->close
diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm
index 98f88410..fbdf1364 100644
--- a/lib/PublicInbox/NNTP.pm
+++ b/lib/PublicInbox/NNTP.pm
@@ -25,6 +25,7 @@ use constant {
r430 => '430 No article with that message-id',
};
use PublicInbox::Syscall qw(EPOLLIN EPOLLONESHOT);
+use Errno qw(EAGAIN);
my @OVERVIEW = qw(Subject From Date Message-ID References Xref);
my $OVERVIEW_FMT = join(":\r\n", @OVERVIEW, qw(Bytes Lines)) . ":\r\n";
@@ -946,8 +947,7 @@ sub event_step {
my $off = length($$rbuf);
$r = sysread($self->{sock}, $$rbuf, LINE_MAX, $off);
unless (defined $r) {
- return $self->watch_in1 if $!{EAGAIN};
- return $self->close;
+ return $! == EAGAIN ? $self->watch_in1 : $self->close;
}
return $self->close if $r == 0;
}
diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm
index 943ee801..f2630a0f 100644
--- a/lib/PublicInbox/Qspawn.pm
+++ b/lib/PublicInbox/Qspawn.pm
@@ -29,6 +29,9 @@ use warnings;
use PublicInbox::Spawn qw(popen_rd);
require Plack::Util;
+# n.b.: we get EAGAIN with public-inbox-httpd, and EINTR on other PSGI servers
+use Errno qw(EAGAIN EINTR);
+
my $def_limiter;
# declares a command to spawn (but does not spawn it).
@@ -131,7 +134,7 @@ sub psgi_qx {
} elsif (defined $r) {
$r ? $qx->write($buf) : $end->();
} else {
- return if $!{EAGAIN} || $!{EINTR}; # loop again
+ return if $! == EAGAIN || $! == EINTR; # loop again
$end->();
}
};
@@ -193,7 +196,7 @@ sub psgi_return {
my $buf = '';
my $rd_hdr = sub {
my $r = sysread($rpipe, $buf, 1024, length($buf));
- return if !defined($r) && ($!{EINTR} || $!{EAGAIN});
+ return if !defined($r) && $! == EAGAIN || $! == EINTR;
$parse_hdr->($r, \$buf);
};
--
EW
^ permalink raw reply related [flat|nested] 61+ messages in thread
* [PATCH 26/57] ds: favor `delete' over assigning fields to `undef'
2019-06-24 2:52 [PATCH 00/57] ds: shrink, TLS support, buffer writes to FS Eric Wong
` (24 preceding siblings ...)
2019-06-24 2:52 ` [PATCH 25/57] http|nntp: favor "$! == EFOO" over $!{EFOO} checks Eric Wong
@ 2019-06-24 2:52 ` Eric Wong
2019-06-24 2:52 ` [PATCH 27/57] http: don't pass extra args to PublicInbox::DS::close Eric Wong
` (32 subsequent siblings)
58 siblings, 0 replies; 61+ messages in thread
From: Eric Wong @ 2019-06-24 2:52 UTC (permalink / raw)
To: meta
This is cleaner in most cases and may allow Perl to reuse memory
from unused fields.
We can do this now that we no longer support Perl 5.8; since
Danga::Socket was written with struct-like pseudo-hash support
in mind, and Perl 5.9+ dropped support for pseudo-hashes over
a decade ago.
---
lib/PublicInbox/DS.pm | 1 +
lib/PublicInbox/HTTP.pm | 21 +++++++++------------
lib/PublicInbox/HTTPD/Async.pm | 9 +++++----
lib/PublicInbox/NNTP.pm | 4 ++--
4 files changed, 17 insertions(+), 18 deletions(-)
diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm
index f5986e55..482710f7 100644
--- a/lib/PublicInbox/DS.pm
+++ b/lib/PublicInbox/DS.pm
@@ -23,6 +23,7 @@ use Time::HiRes qw(clock_gettime CLOCK_MONOTONIC);
use parent qw(Exporter);
our @EXPORT_OK = qw(now msg_more write_in_full);
use warnings;
+use 5.010_001;
use PublicInbox::Syscall qw(:epoll);
diff --git a/lib/PublicInbox/HTTP.pm b/lib/PublicInbox/HTTP.pm
index 4738e156..94972054 100644
--- a/lib/PublicInbox/HTTP.pm
+++ b/lib/PublicInbox/HTTP.pm
@@ -118,8 +118,7 @@ sub read_input ($) {
# env->{CONTENT_LENGTH} (identity)
my $sock = $self->{sock};
- my $len = $self->{input_left};
- $self->{input_left} = undef;
+ my $len = delete $self->{input_left};
my $rbuf = \($self->{rbuf});
my $input = $env->{'psgi.input'};
@@ -246,8 +245,7 @@ sub next_request ($) {
sub response_done_cb ($$) {
my ($self, $alive) = @_;
sub {
- my $env = $self->{env};
- $self->{env} = undef;
+ my $env = delete $self->{env};
$self->write(\"0\r\n\r\n") if $alive == 2;
$self->write(sub{$alive ? next_request($self) : $self->close});
}
@@ -279,7 +277,7 @@ sub getline_cb ($$$) {
}
}
- $self->{forward} = $self->{pull} = undef;
+ delete @$self{qw(forward pull)};
# avoid recursion
if ($forward) {
eval { $forward->close };
@@ -370,8 +368,7 @@ sub read_input_chunked { # unlikely...
my ($self) = @_;
my $input = $self->{env}->{'psgi.input'};
my $sock = $self->{sock};
- my $len = $self->{input_left};
- $self->{input_left} = undef;
+ my $len = delete $self->{input_left};
my $rbuf = \($self->{rbuf});
while (1) { # chunk start
@@ -442,11 +439,11 @@ sub quit {
sub close {
my $self = shift;
- my $forward = $self->{forward};
- my $env = $self->{env};
- delete $env->{'psgix.io'} if $env; # prevent circular references
- $self->{pull} = $self->{forward} = $self->{env} = undef;
- if ($forward) {
+ if (my $env = delete $self->{env}) {
+ delete $env->{'psgix.io'}; # prevent circular references
+ }
+ delete $self->{pull};
+ if (my $forward = delete $self->{forward}) {
eval { $forward->close };
err($self, "forward ->close error: $@") if $@;
}
diff --git a/lib/PublicInbox/HTTPD/Async.pm b/lib/PublicInbox/HTTPD/Async.pm
index 9cc41f17..bec49337 100644
--- a/lib/PublicInbox/HTTPD/Async.pm
+++ b/lib/PublicInbox/HTTPD/Async.pm
@@ -63,7 +63,7 @@ sub main_cb ($$$) {
# Done! Error handling will happen in $fh->close
# called by the {cleanup} handler
- $http->{forward} = undef;
+ delete $http->{forward};
$self->close;
}
}
@@ -81,12 +81,13 @@ sub event_step { $_[0]->{cb}->(@_) }
sub close {
my $self = shift;
- my $cleanup = $self->{cleanup};
- $self->{cleanup} = $self->{cb} = undef;
+ delete $self->{cb};
$self->SUPER::close(@_);
# we defer this to the next timer loop since close is deferred
- PublicInbox::EvCleanup::next_tick($cleanup) if $cleanup;
+ if (my $cleanup = delete $self->{cleanup}) {
+ PublicInbox::EvCleanup::next_tick($cleanup);
+ }
}
1;
diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm
index fbdf1364..6a582ea4 100644
--- a/lib/PublicInbox/NNTP.pm
+++ b/lib/PublicInbox/NNTP.pm
@@ -624,7 +624,7 @@ sub long_response ($$) {
$self->{long_res} = sub {
my $more = eval { $cb->() };
if ($@ || !$self->{sock}) { # something bad happened...
- $self->{long_res} = undef;
+ delete $self->{long_res};
if ($@) {
err($self,
@@ -646,7 +646,7 @@ sub long_response ($$) {
push @$nextq, $self;
$nextt ||= PublicInbox::EvCleanup::asap(*next_tick);
} else { # all done!
- $self->{long_res} = undef;
+ delete $self->{long_res};
check_read($self);
res($self, '.');
out($self, " deferred[$fd] done - %0.6f", now() - $t0);
--
EW
^ permalink raw reply related [flat|nested] 61+ messages in thread
* [PATCH 27/57] http: don't pass extra args to PublicInbox::DS::close
2019-06-24 2:52 [PATCH 00/57] ds: shrink, TLS support, buffer writes to FS Eric Wong
` (25 preceding siblings ...)
2019-06-24 2:52 ` [PATCH 26/57] ds: favor `delete' over assigning fields to `undef' Eric Wong
@ 2019-06-24 2:52 ` Eric Wong
2019-06-24 2:52 ` [PATCH 28/57] ds: pass $self to code references Eric Wong
` (31 subsequent siblings)
58 siblings, 0 replies; 61+ messages in thread
From: Eric Wong @ 2019-06-24 2:52 UTC (permalink / raw)
To: meta
YAGNI
Followup-to: commit 30ab5cf82b9d47242640f748a0f9a088ca783e32
("ds: reduce Errno imports and drop ->close reason")
---
lib/PublicInbox/HTTP.pm | 4 ++--
lib/PublicInbox/HTTPD/Async.pm | 4 ++--
2 files changed, 4 insertions(+), 4 deletions(-)
diff --git a/lib/PublicInbox/HTTP.pm b/lib/PublicInbox/HTTP.pm
index 94972054..c81aeacd 100644
--- a/lib/PublicInbox/HTTP.pm
+++ b/lib/PublicInbox/HTTP.pm
@@ -438,7 +438,7 @@ sub quit {
}
sub close {
- my $self = shift;
+ my $self = $_[0];
if (my $env = delete $self->{env}) {
delete $env->{'psgix.io'}; # prevent circular references
}
@@ -447,7 +447,7 @@ sub close {
eval { $forward->close };
err($self, "forward ->close error: $@") if $@;
}
- $self->SUPER::close(@_);
+ $self->SUPER::close; # PublicInbox::DS::close
}
# for graceful shutdown in PublicInbox::Daemon:
diff --git a/lib/PublicInbox/HTTPD/Async.pm b/lib/PublicInbox/HTTPD/Async.pm
index bec49337..e6df58eb 100644
--- a/lib/PublicInbox/HTTPD/Async.pm
+++ b/lib/PublicInbox/HTTPD/Async.pm
@@ -80,9 +80,9 @@ sub async_pass {
sub event_step { $_[0]->{cb}->(@_) }
sub close {
- my $self = shift;
+ my $self = $_[0];
delete $self->{cb};
- $self->SUPER::close(@_);
+ $self->SUPER::close;
# we defer this to the next timer loop since close is deferred
if (my $cleanup = delete $self->{cleanup}) {
--
EW
^ permalink raw reply related [flat|nested] 61+ messages in thread
* [PATCH 28/57] ds: pass $self to code references
2019-06-24 2:52 [PATCH 00/57] ds: shrink, TLS support, buffer writes to FS Eric Wong
` (26 preceding siblings ...)
2019-06-24 2:52 ` [PATCH 27/57] http: don't pass extra args to PublicInbox::DS::close Eric Wong
@ 2019-06-24 2:52 ` Eric Wong
2019-06-24 2:52 ` [PATCH 29/57] evcleanup: replace _run_asap with `event_step' callback Eric Wong
` (30 subsequent siblings)
58 siblings, 0 replies; 61+ messages in thread
From: Eric Wong @ 2019-06-24 2:52 UTC (permalink / raw)
To: meta
We can reduce the amount of short-lived anonymous subs we
create by passing $self to code references.
---
lib/PublicInbox/DS.pm | 4 ++--
lib/PublicInbox/HTTP.pm | 9 ++++++++-
lib/PublicInbox/HTTPD/Async.pm | 17 +++++++++--------
3 files changed, 19 insertions(+), 11 deletions(-)
diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm
index 482710f7..7b87cd56 100644
--- a/lib/PublicInbox/DS.pm
+++ b/lib/PublicInbox/DS.pm
@@ -468,7 +468,7 @@ next_buf:
}
} else { #($ref eq 'CODE') {
shift @$wbuf;
- $bref->();
+ $bref->($self);
}
} # while @$wbuf
@@ -535,7 +535,7 @@ sub write {
}
return 0;
} elsif ($ref eq 'CODE') {
- $bref->();
+ $bref->($self);
return 1;
} else {
my $to_write = bytes::length($$bref);
diff --git a/lib/PublicInbox/HTTP.pm b/lib/PublicInbox/HTTP.pm
index c81aeacd..e132c610 100644
--- a/lib/PublicInbox/HTTP.pm
+++ b/lib/PublicInbox/HTTP.pm
@@ -247,7 +247,7 @@ sub response_done_cb ($$) {
sub {
my $env = delete $self->{env};
$self->write(\"0\r\n\r\n") if $alive == 2;
- $self->write(sub{$alive ? next_request($self) : $self->close});
+ $self->write($alive ? \&next_request : \&close);
}
}
@@ -456,4 +456,11 @@ sub busy () {
($self->{rbuf} ne '' || $self->{env} || $self->{wbuf});
}
+# fires after pending writes are complete:
+sub restart_pass ($) {
+ $_[0]->{forward}->restart_read; # see PublicInbox::HTTPD::Async
+}
+
+sub enqueue_restart_pass ($) { $_[0]->write(\&restart_pass) }
+
1;
diff --git a/lib/PublicInbox/HTTPD/Async.pm b/lib/PublicInbox/HTTPD/Async.pm
index e6df58eb..b46baeb2 100644
--- a/lib/PublicInbox/HTTPD/Async.pm
+++ b/lib/PublicInbox/HTTPD/Async.pm
@@ -34,23 +34,24 @@ sub new {
sub restart_read ($) { $_[0]->watch(PublicInbox::DS::EPOLLIN()) }
-# fires after pending writes are complete:
-sub restart_read_cb ($) {
- my ($self) = @_;
- sub { restart_read($self) }
-}
-
sub main_cb ($$$) {
my ($http, $fh, $bref) = @_;
sub {
my ($self) = @_;
my $r = sysread($self->{sock}, $$bref, 8192);
if ($r) {
- $fh->write($$bref);
+ $fh->write($$bref); # may call $http->close
+
if ($http->{sock}) { # !closed
if ($http->{wbuf}) {
+ # HTTP client could not keep up, so
+ # stop reading and buffering.
$self->watch(0);
- $http->write(restart_read_cb($self));
+
+ # Tell the HTTP socket to restart us
+ # when HTTP client is done draining
+ # $http->{wbuf}:
+ $http->enqueue_restart_pass;
}
# stay in EPOLLIN, but let other clients
# get some work done, too.
--
EW
^ permalink raw reply related [flat|nested] 61+ messages in thread
* [PATCH 29/57] evcleanup: replace _run_asap with `event_step' callback
2019-06-24 2:52 [PATCH 00/57] ds: shrink, TLS support, buffer writes to FS Eric Wong
` (27 preceding siblings ...)
2019-06-24 2:52 ` [PATCH 28/57] ds: pass $self to code references Eric Wong
@ 2019-06-24 2:52 ` Eric Wong
2019-06-24 2:52 ` [PATCH 30/57] ds: remove pointless exit calls Eric Wong
` (29 subsequent siblings)
58 siblings, 0 replies; 61+ messages in thread
From: Eric Wong @ 2019-06-24 2:52 UTC (permalink / raw)
To: meta
No point in keeping a one-line wrapper sub around.
---
lib/PublicInbox/EvCleanup.pm | 12 ++++--------
1 file changed, 4 insertions(+), 8 deletions(-)
diff --git a/lib/PublicInbox/EvCleanup.pm b/lib/PublicInbox/EvCleanup.pm
index a9f6167d..33b54ebc 100644
--- a/lib/PublicInbox/EvCleanup.pm
+++ b/lib/PublicInbox/EvCleanup.pm
@@ -46,7 +46,9 @@ sub _run_all ($) {
# ensure PublicInbox::DS::ToClose processing after timers fire
sub _asap_close () { $asapq->[1] ||= _asap_timer() }
-sub _run_asap () { _run_all($asapq) }
+# Called by PublicInbox::DS
+sub event_step { _run_all($asapq) }
+
sub _run_next () {
_run_all($nextq);
_asap_close();
@@ -57,12 +59,6 @@ sub _run_later () {
_asap_close();
}
-# Called by PublicInbox::DS
-sub event_step {
- my ($self) = @_;
- _run_asap();
-}
-
sub _asap_timer () {
$singleton ||= once_init();
$singleton->watch(EPOLLOUT|EPOLLONESHOT);
@@ -88,7 +84,7 @@ sub later ($) {
}
END {
- _run_asap();
+ event_step();
_run_all($nextq);
_run_all($laterq);
}
--
EW
^ permalink raw reply related [flat|nested] 61+ messages in thread
* [PATCH 30/57] ds: remove pointless exit calls
2019-06-24 2:52 [PATCH 00/57] ds: shrink, TLS support, buffer writes to FS Eric Wong
` (28 preceding siblings ...)
2019-06-24 2:52 ` [PATCH 29/57] evcleanup: replace _run_asap with `event_step' callback Eric Wong
@ 2019-06-24 2:52 ` Eric Wong
2019-06-24 2:52 ` [PATCH 31/57] http|nntp: be explicit about bytes::length on rbuf Eric Wong
` (28 subsequent siblings)
58 siblings, 0 replies; 61+ messages in thread
From: Eric Wong @ 2019-06-24 2:52 UTC (permalink / raw)
To: meta
They're never called; the only way to break out of that loop
is the PostEventLoop callback.
---
lib/PublicInbox/DS.pm | 3 ---
1 file changed, 3 deletions(-)
diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm
index 7b87cd56..9811405b 100644
--- a/lib/PublicInbox/DS.pm
+++ b/lib/PublicInbox/DS.pm
@@ -239,7 +239,6 @@ sub EpollEventLoop {
}
return unless PostEventLoop();
}
- exit 0;
}
### The kqueue-based event loop. Gets installed as EventLoop if IO::KQueue works
@@ -264,8 +263,6 @@ sub KQueueEventLoop {
}
return unless PostEventLoop();
}
-
- exit(0);
}
=head2 C<< CLASS->SetPostLoopCallback( CODEREF ) >>
--
EW
^ permalink raw reply related [flat|nested] 61+ messages in thread
* [PATCH 31/57] http|nntp: be explicit about bytes::length on rbuf
2019-06-24 2:52 [PATCH 00/57] ds: shrink, TLS support, buffer writes to FS Eric Wong
` (29 preceding siblings ...)
2019-06-24 2:52 ` [PATCH 30/57] ds: remove pointless exit calls Eric Wong
@ 2019-06-24 2:52 ` Eric Wong
2019-06-24 2:52 ` [PATCH 32/57] ds: hoist out do_read from NNTP and HTTP Eric Wong
` (27 subsequent siblings)
58 siblings, 0 replies; 61+ messages in thread
From: Eric Wong @ 2019-06-24 2:52 UTC (permalink / raw)
To: meta
It should not matter because our rbuf is always from
a socket without encoding layers, but this makes things
easier to follow.
---
lib/PublicInbox/HTTP.pm | 12 ++++++------
lib/PublicInbox/NNTP.pm | 6 +++---
2 files changed, 9 insertions(+), 9 deletions(-)
diff --git a/lib/PublicInbox/HTTP.pm b/lib/PublicInbox/HTTP.pm
index e132c610..fbca9a54 100644
--- a/lib/PublicInbox/HTTP.pm
+++ b/lib/PublicInbox/HTTP.pm
@@ -76,7 +76,7 @@ sub event_step { # called by PublicInbox::DS
return read_input($self) if defined $self->{env};
- my $off = length($self->{rbuf});
+ my $off = bytes::length($self->{rbuf});
my $r = sysread($self->{sock}, $self->{rbuf}, 8192, $off);
if (defined $r) {
return $self->close if $r == 0;
@@ -98,7 +98,7 @@ sub rbuf_process {
# (they are rarely-used and git (as of 2.7.2) does not use them)
if ($r == -1 || $env{HTTP_TRAILER} ||
# this length-check is necessary for PURE_PERL=1:
- ($r == -2 && length($self->{rbuf}) > 0x4000)) {
+ ($r == -2 && bytes::length($self->{rbuf}) > 0x4000)) {
return quit($self, 400);
}
return $self->watch_in1 if $r < 0; # incomplete
@@ -375,12 +375,12 @@ sub read_input_chunked { # unlikely...
if ($len == CHUNK_ZEND) {
$$rbuf =~ s/\A\r\n//s and
return app_dispatch($self, $input);
- return quit($self, 400) if length($$rbuf) > 2;
+ return quit($self, 400) if bytes::length($$rbuf) > 2;
}
if ($len == CHUNK_END) {
if ($$rbuf =~ s/\A\r\n//s) {
$len = CHUNK_START;
- } elsif (length($$rbuf) > 2) {
+ } elsif (bytes::length($$rbuf) > 2) {
return quit($self, 400);
}
}
@@ -390,14 +390,14 @@ sub read_input_chunked { # unlikely...
if (($len + -s $input) > $MAX_REQUEST_BUFFER) {
return quit($self, 413);
}
- } elsif (length($$rbuf) > CHUNK_MAX_HDR) {
+ } elsif (bytes::length($$rbuf) > CHUNK_MAX_HDR) {
return quit($self, 400);
}
# will break from loop since $len >= 0
}
if ($len < 0) { # chunk header is trickled, read more
- my $off = length($$rbuf);
+ my $off = bytes::length($$rbuf);
my $r = sysread($sock, $$rbuf, 8192, $off);
return recv_err($self, $r, $len) unless $r;
# (implicit) goto chunk_start if $r > 0;
diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm
index 6a582ea4..a9e54a68 100644
--- a/lib/PublicInbox/NNTP.pm
+++ b/lib/PublicInbox/NNTP.pm
@@ -243,7 +243,7 @@ sub parse_time ($$;$) {
}
my @now = $gmt ? gmtime : localtime;
my ($YYYY, $MM, $DD);
- if (length($date) == 8) { # RFC 3977 allows YYYYMMDD
+ if (bytes::length($date) == 8) { # RFC 3977 allows YYYYMMDD
($YYYY, $MM, $DD) = unpack('A4A2A2', $date);
} else { # legacy clients send YYMMDD
($YYYY, $MM, $DD) = unpack('A2A2A2', $date);
@@ -944,7 +944,7 @@ sub event_step {
my $r;
if (index($$rbuf, "\n") < 0) {
- my $off = length($$rbuf);
+ my $off = bytes::length($$rbuf);
$r = sysread($self->{sock}, $$rbuf, LINE_MAX, $off);
unless (defined $r) {
return $! == EAGAIN ? $self->watch_in1 : $self->close;
@@ -964,7 +964,7 @@ sub event_step {
}
return $self->close if $r < 0;
- my $len = length($$rbuf);
+ my $len = bytes::length($$rbuf);
return $self->close if ($len >= LINE_MAX);
update_idle_time($self);
--
EW
^ permalink raw reply related [flat|nested] 61+ messages in thread
* [PATCH 32/57] ds: hoist out do_read from NNTP and HTTP
2019-06-24 2:52 [PATCH 00/57] ds: shrink, TLS support, buffer writes to FS Eric Wong
` (30 preceding siblings ...)
2019-06-24 2:52 ` [PATCH 31/57] http|nntp: be explicit about bytes::length on rbuf Eric Wong
@ 2019-06-24 2:52 ` Eric Wong
2019-06-24 2:52 ` [PATCH 33/57] nntp: simplify re-arming/requeue logic Eric Wong
` (26 subsequent siblings)
58 siblings, 0 replies; 61+ messages in thread
From: Eric Wong @ 2019-06-24 2:52 UTC (permalink / raw)
To: meta
Both NNTP and HTTP have common needs and we can factor
out some common code to make dealing with IO::Socket::SSL
easier.
---
lib/PublicInbox/DS.pm | 10 ++++++++++
lib/PublicInbox/HTTP.pm | 14 +++-----------
lib/PublicInbox/NNTP.pm | 9 ++-------
3 files changed, 15 insertions(+), 18 deletions(-)
diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm
index 9811405b..8735e888 100644
--- a/lib/PublicInbox/DS.pm
+++ b/lib/PublicInbox/DS.pm
@@ -473,6 +473,15 @@ next_buf:
1; # all done
}
+sub do_read ($$$$) {
+ my ($self, $rbuf, $len, $off) = @_;
+ my $r = sysread($self->{sock}, $$rbuf, $len, $off);
+ return ($r == 0 ? $self->close : $r) if defined $r;
+ # common for clients to break connections without warning,
+ # would be too noisy to log here:
+ $! == EAGAIN ? $self->watch_in1 : $self->close;
+}
+
sub write_in_full ($$$$) {
my ($fh, $bref, $len, $off) = @_;
my $rv = 0;
@@ -583,6 +592,7 @@ sub watch ($$) {
$KQueue->EV_SET($fd, EVFILT_READ(), kq_flag(EPOLLIN, $ev));
$KQueue->EV_SET($fd, EVFILT_WRITE(), kq_flag(EPOLLOUT, $ev));
}
+ 0;
}
sub watch_in1 ($) { watch($_[0], EPOLLIN | EPOLLONESHOT) }
diff --git a/lib/PublicInbox/HTTP.pm b/lib/PublicInbox/HTTP.pm
index fbca9a54..7697ac5c 100644
--- a/lib/PublicInbox/HTTP.pm
+++ b/lib/PublicInbox/HTTP.pm
@@ -75,17 +75,9 @@ sub event_step { # called by PublicInbox::DS
# otherwise we can be buffering infinitely w/o backpressure
return read_input($self) if defined $self->{env};
-
- my $off = bytes::length($self->{rbuf});
- my $r = sysread($self->{sock}, $self->{rbuf}, 8192, $off);
- if (defined $r) {
- return $self->close if $r == 0;
- return rbuf_process($self);
- }
-
- # common for clients to break connections without warning,
- # would be too noisy to log here:
- $! == EAGAIN ? $self->watch_in1 : $self->close;
+ my $rbuf = \($self->{rbuf});
+ my $off = bytes::length($$rbuf);
+ $self->do_read($rbuf, 8192, $off) and rbuf_process($self);
}
sub rbuf_process {
diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm
index a9e54a68..42fbb255 100644
--- a/lib/PublicInbox/NNTP.pm
+++ b/lib/PublicInbox/NNTP.pm
@@ -941,17 +941,12 @@ sub event_step {
use constant LINE_MAX => 512; # RFC 977 section 2.3
my $rbuf = \($self->{rbuf});
- my $r;
+ my $r = 1;
if (index($$rbuf, "\n") < 0) {
my $off = bytes::length($$rbuf);
- $r = sysread($self->{sock}, $$rbuf, LINE_MAX, $off);
- unless (defined $r) {
- return $! == EAGAIN ? $self->watch_in1 : $self->close;
- }
- return $self->close if $r == 0;
+ $r = $self->do_read($rbuf, LINE_MAX, $off) or return;
}
- $r = 1;
while ($r > 0 && $$rbuf =~ s/\A[ \t\r\n]*([^\r\n]*)\r?\n//) {
my $line = $1;
return $self->close if $line =~ /[[:cntrl:]]/s;
--
EW
^ permalink raw reply related [flat|nested] 61+ messages in thread
* [PATCH 33/57] nntp: simplify re-arming/requeue logic
2019-06-24 2:52 [PATCH 00/57] ds: shrink, TLS support, buffer writes to FS Eric Wong
` (31 preceding siblings ...)
2019-06-24 2:52 ` [PATCH 32/57] ds: hoist out do_read from NNTP and HTTP Eric Wong
@ 2019-06-24 2:52 ` Eric Wong
2019-06-24 2:52 ` [PATCH 34/57] allow use of PerlIO layers for filesystem writes Eric Wong
` (25 subsequent siblings)
58 siblings, 0 replies; 61+ messages in thread
From: Eric Wong @ 2019-06-24 2:52 UTC (permalink / raw)
To: meta
We can be smarter about requeuing clients to run and avoid
excessive epoll_ctl calls since we can trust event_step to do
the right thing depending on the state of the client.
---
lib/PublicInbox/NNTP.pm | 33 ++++++++++-----------------------
1 file changed, 10 insertions(+), 23 deletions(-)
diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm
index 42fbb255..468a22f5 100644
--- a/lib/PublicInbox/NNTP.pm
+++ b/lib/PublicInbox/NNTP.pm
@@ -58,6 +58,11 @@ sub next_tick () {
}
}
+sub requeue ($) {
+ push @$nextq, $_[0];
+ $nextt ||= PublicInbox::EvCleanup::asap(*next_tick);
+}
+
sub update_idle_time ($) {
my ($self) = @_;
my $sock = $self->{sock} or return;
@@ -633,7 +638,7 @@ sub long_response ($$) {
}
if ($self->{sock}) {
update_idle_time($self);
- check_read($self);
+ requeue($self);
} else {
out($self, " deferred[$fd] aborted - %0.6f",
now() - $t0);
@@ -642,14 +647,12 @@ sub long_response ($$) {
# no recursion, schedule another call ASAP
# but only after all pending writes are done
update_idle_time($self);
-
- push @$nextq, $self;
- $nextt ||= PublicInbox::EvCleanup::asap(*next_tick);
+ requeue($self);
} else { # all done!
delete $self->{long_res};
- check_read($self);
res($self, '.');
out($self, " deferred[$fd] done - %0.6f", now() - $t0);
+ requeue($self);
}
};
$self->{long_res}->(); # kick off!
@@ -930,6 +933,7 @@ sub out ($$;@) {
printf { $self->{nntpd}->{out} } $fmt."\n", @args;
}
+# callback used by PublicInbox::DS for any (e)poll (in/out/hup/err)
sub event_step {
my ($self) = @_;
@@ -965,24 +969,7 @@ sub event_step {
# maybe there's more pipelined data, or we'll have
# to register it for socket-readiness notifications
- check_read($self) unless ($self->{long_res} || $self->{wbuf});
-}
-
-sub check_read {
- my ($self) = @_;
- if (index($self->{rbuf}, "\n") >= 0) {
- # Force another read if there is a pipelined request.
- # We don't know if the socket has anything for us to read,
- # and we must double-check again by the time the timer fires
- # in case we really did dispatch a read event and started
- # another long response.
- push @$nextq, $self;
- $nextt ||= PublicInbox::EvCleanup::asap(*next_tick);
- } else {
- # no pipelined requests available, let the kernel know
- # to wake us up if there's more
- $self->watch_in1; # PublicInbox::DS::watch_in1
- }
+ requeue($self) unless ($self->{long_res} || $self->{wbuf});
}
sub not_idle_long ($$) {
--
EW
^ permalink raw reply related [flat|nested] 61+ messages in thread
* [PATCH 34/57] allow use of PerlIO layers for filesystem writes
2019-06-24 2:52 [PATCH 00/57] ds: shrink, TLS support, buffer writes to FS Eric Wong
` (32 preceding siblings ...)
2019-06-24 2:52 ` [PATCH 33/57] nntp: simplify re-arming/requeue logic Eric Wong
@ 2019-06-24 2:52 ` Eric Wong
2019-06-24 2:52 ` [PATCH 35/57] ds: deal better with FS-related errors IO buffers Eric Wong
` (24 subsequent siblings)
58 siblings, 0 replies; 61+ messages in thread
From: Eric Wong @ 2019-06-24 2:52 UTC (permalink / raw)
To: meta
It may make sense to use PerlIO::mmap or PerlIO::scalar for
DS write buffering with IO::Socket::SSL or similar (since we can't
use MSG_MORE), so that means we need to go through buffering
in userspace for the common case; while still being easily
compatible with slow clients.
And it also simplifies GitHTTPBackend slightly.
Maybe it can make sense for HTTP input buffering, too...
---
lib/PublicInbox/DS.pm | 32 ++++++++++++-------------------
lib/PublicInbox/GitHTTPBackend.pm | 18 ++++++++---------
lib/PublicInbox/HTTP.pm | 24 ++++++++++++++++++-----
3 files changed, 39 insertions(+), 35 deletions(-)
diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm
index 8735e888..486af40e 100644
--- a/lib/PublicInbox/DS.pm
+++ b/lib/PublicInbox/DS.pm
@@ -21,7 +21,7 @@ use IO::Handle qw();
use Fcntl qw(FD_CLOEXEC F_SETFD F_GETFD SEEK_SET);
use Time::HiRes qw(clock_gettime CLOCK_MONOTONIC);
use parent qw(Exporter);
-our @EXPORT_OK = qw(now msg_more write_in_full);
+our @EXPORT_OK = qw(now msg_more);
use warnings;
use 5.010_001;
@@ -422,8 +422,8 @@ sub close {
sub psendfile ($$$) {
my ($sock, $fh, $off) = @_;
- sysseek($fh, $$off, SEEK_SET) or return;
- defined(my $to_write = sysread($fh, my $buf, 16384)) or return;
+ seek($fh, $$off, SEEK_SET) or return;
+ defined(my $to_write = read($fh, my $buf, 16384)) or return;
my $written = 0;
while ($to_write > 0) {
if (defined(my $w = syswrite($sock, $buf, $to_write, $written))) {
@@ -482,29 +482,18 @@ sub do_read ($$$$) {
$! == EAGAIN ? $self->watch_in1 : $self->close;
}
-sub write_in_full ($$$$) {
- my ($fh, $bref, $len, $off) = @_;
- my $rv = 0;
- while ($len > 0) {
- my $w = syswrite($fh, $$bref, $len, $off);
- return ($rv ? $rv : $w) unless $w; # undef or 0
- $rv += $w;
- $len -= $w;
- $off += $w;
- }
- $rv
-}
-
+# n.b.: use ->write/->read for this buffer to allow compatibility with
+# PerlIO::mmap or PerlIO::scalar if needed
sub tmpbuf ($$) {
my ($bref, $off) = @_;
# open(my $fh, '+>>', undef) doesn't set O_APPEND
my ($fh, $path) = tempfile('wbuf-XXXXXXX', TMPDIR => 1);
open $fh, '+>>', $path or die "open: $!";
+ $fh->autoflush(1);
unlink $path;
my $to_write = bytes::length($$bref) - $off;
- my $w = write_in_full($fh, $bref, $to_write, $off);
- die "write_in_full ($to_write): $!" unless defined $w;
- $w == $to_write ? $fh : die("short write $w < $to_write");
+ $fh->write($$bref, $to_write, $off) or die "write ($to_write): $!";
+ $fh;
}
=head2 C<< $obj->write( $data ) >>
@@ -534,7 +523,10 @@ sub write {
} else {
my $last = $wbuf->[-1];
if (ref($last) eq 'GLOB') { # append to tmp file buffer
- write_in_full($last, $bref, bytes::length($$bref), 0);
+ unless ($last->print($$bref)) {
+ warn "error buffering: $!";
+ return $self->close;
+ }
} else {
push @$wbuf, tmpbuf($bref, 0);
}
diff --git a/lib/PublicInbox/GitHTTPBackend.pm b/lib/PublicInbox/GitHTTPBackend.pm
index a2a81f8e..303d5073 100644
--- a/lib/PublicInbox/GitHTTPBackend.pm
+++ b/lib/PublicInbox/GitHTTPBackend.pm
@@ -231,18 +231,16 @@ sub input_prepare {
return;
}
last if $r == 0;
- my $off = 0;
- while ($r > 0) {
- my $w = syswrite($in, $buf, $r, $off);
- if (defined $w) {
- $r -= $w;
- $off += $w;
- } else {
- err($env, "error writing temporary file: $!");
- return;
- }
+ unless (print $in $buf) {
+ err($env, "error writing temporary file: $!");
+ return;
}
}
+ # ensure it's visible to git-http-backend(1):
+ unless ($in->flush) {
+ err($env, "error writing temporary file: $!");
+ return;
+ }
unless (defined(sysseek($in, 0, SEEK_SET))) {
err($env, "error seeking temporary file: $!");
return;
diff --git a/lib/PublicInbox/HTTP.pm b/lib/PublicInbox/HTTP.pm
index 7697ac5c..a1cb4aca 100644
--- a/lib/PublicInbox/HTTP.pm
+++ b/lib/PublicInbox/HTTP.pm
@@ -19,7 +19,7 @@ use HTTP::Status qw(status_message);
use HTTP::Date qw(time2str);
use IO::Handle;
require PublicInbox::EvCleanup;
-PublicInbox::DS->import(qw(msg_more write_in_full));
+PublicInbox::DS->import(qw(msg_more));
use PublicInbox::Syscall qw(EPOLLIN EPOLLONESHOT);
use constant {
CHUNK_START => -1, # [a-f0-9]+\r\n
@@ -102,6 +102,15 @@ sub rbuf_process {
$len ? read_input($self) : app_dispatch($self);
}
+# IO::Handle::write returns boolean, this returns bytes written:
+sub xwrite ($$$) {
+ my ($fh, $rbuf, $max) = @_;
+ my $w = bytes::length($$rbuf);
+ $w = $max if $w > $max;
+ $fh->write($$rbuf, $w) or return;
+ $w;
+}
+
sub read_input ($) {
my ($self) = @_;
my $env = $self->{env};
@@ -116,7 +125,7 @@ sub read_input ($) {
while ($len > 0) {
if ($$rbuf ne '') {
- my $w = write_in_full($input, $rbuf, $len, 0);
+ my $w = xwrite($input, $rbuf, $len);
return write_err($self, $len) unless $w;
$len -= $w;
die "BUG: $len < 0 (w=$w)" if $len < 0;
@@ -306,6 +315,11 @@ sub response_write {
}
}
+sub input_tmpfile ($) {
+ open($_[0], '+>', undef);
+ $_[0]->autoflush(1);
+}
+
sub input_prepare {
my ($self, $env) = @_;
my $input;
@@ -315,10 +329,10 @@ sub input_prepare {
quit($self, 413);
return;
}
- open($input, '+>', undef);
+ input_tmpfile($input);
} elsif (env_chunked($env)) {
$len = CHUNK_START;
- open($input, '+>', undef);
+ input_tmpfile($input);
} else {
$input = $null_io;
}
@@ -399,7 +413,7 @@ sub read_input_chunked { # unlikely...
# drain the current chunk
until ($len <= 0) {
if ($$rbuf ne '') {
- my $w = write_in_full($input, $rbuf, $len, 0);
+ my $w = xwrite($input, $rbuf, $len);
return write_err($self, "$len chunk") if !$w;
$len -= $w;
if ($len == 0) {
--
EW
^ permalink raw reply related [flat|nested] 61+ messages in thread
* [PATCH 35/57] ds: deal better with FS-related errors IO buffers
2019-06-24 2:52 [PATCH 00/57] ds: shrink, TLS support, buffer writes to FS Eric Wong
` (33 preceding siblings ...)
2019-06-24 2:52 ` [PATCH 34/57] allow use of PerlIO layers for filesystem writes Eric Wong
@ 2019-06-24 2:52 ` Eric Wong
2019-06-24 2:52 ` [PATCH 36/57] nntp: wait for writability before sending greeting Eric Wong
` (23 subsequent siblings)
58 siblings, 0 replies; 61+ messages in thread
From: Eric Wong @ 2019-06-24 2:52 UTC (permalink / raw)
To: meta
Instead of ENOMEM (or fragmentation/swap storms), using tempfile
buffers opens us up to filesystem and storage-related errors
(e.g. ENOSPC, EFBIG, EIO, EROFS). Log these errors, drop the
particular client, and try to limp by with whateve we have left.
---
lib/PublicInbox/DS.pm | 42 +++++++++++++++++++++++++-----------------
1 file changed, 25 insertions(+), 17 deletions(-)
diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm
index 486af40e..1a1ef7d3 100644
--- a/lib/PublicInbox/DS.pm
+++ b/lib/PublicInbox/DS.pm
@@ -33,7 +33,7 @@ use fields ('sock', # underlying socket
);
use Errno qw(EAGAIN EINVAL);
-use Carp qw(croak confess);
+use Carp qw(croak confess carp);
use File::Temp qw(tempfile);
our $HAVE_KQUEUE = eval { require IO::KQueue; IO::KQueue->import; 1 };
@@ -482,18 +482,27 @@ sub do_read ($$$$) {
$! == EAGAIN ? $self->watch_in1 : $self->close;
}
+# drop the socket if we hit unrecoverable errors on our system which
+# require BOFH attention: ENOSPC, EFBIG, EIO, EMFILE, ENFILE...
+sub drop {
+ my $self = shift;
+ carp(@_);
+ $self->close;
+}
+
# n.b.: use ->write/->read for this buffer to allow compatibility with
# PerlIO::mmap or PerlIO::scalar if needed
-sub tmpbuf ($$) {
- my ($bref, $off) = @_;
+sub tmpio ($$$) {
+ my ($self, $bref, $off) = @_;
# open(my $fh, '+>>', undef) doesn't set O_APPEND
- my ($fh, $path) = tempfile('wbuf-XXXXXXX', TMPDIR => 1);
- open $fh, '+>>', $path or die "open: $!";
+ my ($fh, $path) = eval { tempfile('wbuf-XXXXXXX', TMPDIR => 1) };
+ $fh or return drop($self, "tempfile: $@");
+ open($fh, '+>>', $path) or return drop($self, "open: $!");
$fh->autoflush(1);
- unlink $path;
- my $to_write = bytes::length($$bref) - $off;
- $fh->write($$bref, $to_write, $off) or die "write ($to_write): $!";
- $fh;
+ unlink($path) or return drop($self, "unlink: $!");
+ my $len = bytes::length($$bref) - $off;
+ $fh->write($$bref, $len, $off) or return drop($self, "write ($len): $!");
+ $fh
}
=head2 C<< $obj->write( $data ) >>
@@ -523,12 +532,10 @@ sub write {
} else {
my $last = $wbuf->[-1];
if (ref($last) eq 'GLOB') { # append to tmp file buffer
- unless ($last->print($$bref)) {
- warn "error buffering: $!";
- return $self->close;
- }
+ $last->print($$bref) or return drop($self, "print: $!");
} else {
- push @$wbuf, tmpbuf($bref, 0);
+ my $tmpio = tmpio($self, $bref, 0) or return 0;
+ push @$wbuf, $tmpio;
}
}
return 0;
@@ -546,7 +553,8 @@ sub write {
} else {
return $self->close;
}
- $self->{wbuf} = [ tmpbuf($bref, $written) ];
+ my $tmpio = tmpio($self, $bref, $written) or return 0;
+ $self->{wbuf} = [ $tmpio ];
watch($self, EPOLLOUT|EPOLLONESHOT);
return 0;
}
@@ -563,9 +571,9 @@ sub msg_more ($$) {
if (defined $n) {
my $nlen = bytes::length($_[1]) - $n;
return 1 if $nlen == 0; # all done!
-
# queue up the unwritten substring:
- $self->{wbuf} = [ tmpbuf(\($_[1]), $n) ];
+ my $tmpio = tmpio($self, \($_[1]), $n) or return 0;
+ $self->{wbuf} = [ $tmpio ];
watch($self, EPOLLOUT|EPOLLONESHOT);
return 0;
}
--
EW
^ permalink raw reply related [flat|nested] 61+ messages in thread
* [PATCH 36/57] nntp: wait for writability before sending greeting
2019-06-24 2:52 [PATCH 00/57] ds: shrink, TLS support, buffer writes to FS Eric Wong
` (34 preceding siblings ...)
2019-06-24 2:52 ` [PATCH 35/57] ds: deal better with FS-related errors IO buffers Eric Wong
@ 2019-06-24 2:52 ` Eric Wong
2019-06-24 2:52 ` [PATCH 37/57] nntp: NNTPS and NNTP+STARTTLS working Eric Wong
` (22 subsequent siblings)
58 siblings, 0 replies; 61+ messages in thread
From: Eric Wong @ 2019-06-24 2:52 UTC (permalink / raw)
To: meta
This will be needed for NNTPS support, since we need
to negotiate the TLS connection before writing the
greeting and we can reuse the existing buffer layer
to enqueue writes.
---
lib/PublicInbox/NNTP.pm | 8 +++++---
1 file changed, 5 insertions(+), 3 deletions(-)
diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm
index 468a22f5..a18641d3 100644
--- a/lib/PublicInbox/NNTP.pm
+++ b/lib/PublicInbox/NNTP.pm
@@ -24,7 +24,7 @@ use constant {
r225 => '225 Headers follow (multi-line)',
r430 => '430 No article with that message-id',
};
-use PublicInbox::Syscall qw(EPOLLIN EPOLLONESHOT);
+use PublicInbox::Syscall qw(EPOLLOUT EPOLLONESHOT);
use Errno qw(EAGAIN);
my @OVERVIEW = qw(Subject From Date Message-ID References Xref);
@@ -98,9 +98,11 @@ sub expire_old () {
sub new ($$$) {
my ($class, $sock, $nntpd) = @_;
my $self = fields::new($class);
- $self->SUPER::new($sock, EPOLLIN | EPOLLONESHOT);
+ $self->SUPER::new($sock, EPOLLOUT | EPOLLONESHOT);
$self->{nntpd} = $nntpd;
- res($self, '201 ' . $nntpd->{servername} . ' ready - post via email');
+ my $greet = "201 $nntpd->{servername} ready - post via email\r\n";
+ open my $fh, '<:scalar', \$greet or die "open :scalar: $!";
+ $self->{wbuf} = [ $fh ];
$self->{rbuf} = '';
update_idle_time($self);
$expt ||= PublicInbox::EvCleanup::later(*expire_old);
--
EW
^ permalink raw reply related [flat|nested] 61+ messages in thread
* [PATCH 37/57] nntp: NNTPS and NNTP+STARTTLS working
2019-06-24 2:52 [PATCH 00/57] ds: shrink, TLS support, buffer writes to FS Eric Wong
` (35 preceding siblings ...)
2019-06-24 2:52 ` [PATCH 36/57] nntp: wait for writability before sending greeting Eric Wong
@ 2019-06-24 2:52 ` Eric Wong
2019-06-24 2:52 ` [PATCH 38/57] certs/create-certs.perl: fix cert validity on 32-bit Eric Wong
` (21 subsequent siblings)
58 siblings, 0 replies; 61+ messages in thread
From: Eric Wong @ 2019-06-24 2:52 UTC (permalink / raw)
To: meta
It kinda, barely works, and I'm most happy I got it working
without any modifications to the main NNTP::event_step callback
thanks to the DS->write(CODE) support we inherited from
Danga::Socket.
---
MANIFEST | 4 +
certs/.gitignore | 4 +
certs/create-certs.perl | 132 ++++++++++++++++++++++++++++++++
lib/PublicInbox/DS.pm | 28 ++++++-
lib/PublicInbox/Daemon.pm | 82 ++++++++++++++++++--
lib/PublicInbox/NNTP.pm | 27 ++++++-
lib/PublicInbox/NNTPD.pm | 1 +
lib/PublicInbox/TLS.pm | 24 ++++++
script/public-inbox-nntpd | 3 +-
t/nntpd-tls.t | 156 ++++++++++++++++++++++++++++++++++++++
t/nntpd.t | 2 +
11 files changed, 450 insertions(+), 13 deletions(-)
create mode 100644 certs/.gitignore
create mode 100755 certs/create-certs.perl
create mode 100644 lib/PublicInbox/TLS.pm
create mode 100644 t/nntpd-tls.t
diff --git a/MANIFEST b/MANIFEST
index c7693976..26ff0d0d 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -31,6 +31,8 @@ MANIFEST
Makefile.PL
README
TODO
+certs/.gitignore
+certs/create-certs.perl
ci/README
ci/deps.perl
ci/profiles.sh
@@ -129,6 +131,7 @@ lib/PublicInbox/Spamcheck/Spamc.pm
lib/PublicInbox/Spawn.pm
lib/PublicInbox/SpawnPP.pm
lib/PublicInbox/Syscall.pm
+lib/PublicInbox/TLS.pm
lib/PublicInbox/Unsubscribe.pm
lib/PublicInbox/UserContent.pm
lib/PublicInbox/V2Writable.pm
@@ -222,6 +225,7 @@ t/msg_iter.t
t/msgmap.t
t/msgtime.t
t/nntp.t
+t/nntpd-tls.t
t/nntpd.t
t/nulsubject.t
t/over.t
diff --git a/certs/.gitignore b/certs/.gitignore
new file mode 100644
index 00000000..0b3a547b
--- /dev/null
+++ b/certs/.gitignore
@@ -0,0 +1,4 @@
+*.pem
+*.der
+*.enc
+*.p12
diff --git a/certs/create-certs.perl b/certs/create-certs.perl
new file mode 100755
index 00000000..bfd8e5f1
--- /dev/null
+++ b/certs/create-certs.perl
@@ -0,0 +1,132 @@
+#!/usr/bin/perl -w
+# License: GPL-1.0+ or Artistic-1.0-Perl
+# from IO::Socket::SSL 2.063 / https://github.com/noxxi/p5-io-socket-ssl
+use strict;
+use warnings;
+use IO::Socket::SSL::Utils;
+use Net::SSLeay;
+
+my $dir = "./";
+my $now = time();
+my $later = $now + 100*365*86400;
+
+Net::SSLeay::SSLeay_add_ssl_algorithms();
+my $sha256 = Net::SSLeay::EVP_get_digestbyname('sha256') or die;
+my $printfp = sub {
+ my ($w,$cert) = @_;
+ print $w.' sha256$'.unpack('H*',Net::SSLeay::X509_digest($cert, $sha256))."\n"
+};
+
+my %time_valid = (not_before => $now, not_after => $later);
+
+my @ca = CERT_create(
+ CA => 1,
+ subject => { CN => 'IO::Socket::SSL Demo CA' },
+ %time_valid,
+);
+save('test-ca.pem',PEM_cert2string($ca[0]));
+
+my @server = CERT_create(
+ CA => 0,
+ subject => { CN => 'server.local' },
+ purpose => 'server',
+ issuer => \@ca,
+ %time_valid,
+);
+save('server-cert.pem',PEM_cert2string($server[0]));
+save('server-key.pem',PEM_key2string($server[1]));
+$printfp->(server => $server[0]);
+
+@server = CERT_create(
+ CA => 0,
+ subject => { CN => 'server2.local' },
+ purpose => 'server',
+ issuer => \@ca,
+ %time_valid,
+);
+save('server2-cert.pem',PEM_cert2string($server[0]));
+save('server2-key.pem',PEM_key2string($server[1]));
+$printfp->(server2 => $server[0]);
+
+@server = CERT_create(
+ CA => 0,
+ subject => { CN => 'server-ecc.local' },
+ purpose => 'server',
+ issuer => \@ca,
+ key => KEY_create_ec(),
+ %time_valid,
+);
+save('server-ecc-cert.pem',PEM_cert2string($server[0]));
+save('server-ecc-key.pem',PEM_key2string($server[1]));
+$printfp->('server-ecc' => $server[0]);
+
+
+my @client = CERT_create(
+ CA => 0,
+ subject => { CN => 'client.local' },
+ purpose => 'client',
+ issuer => \@ca,
+ %time_valid,
+);
+save('client-cert.pem',PEM_cert2string($client[0]));
+save('client-key.pem',PEM_key2string($client[1]));
+$printfp->(client => $client[0]);
+
+my @swc = CERT_create(
+ CA => 0,
+ subject => { CN => 'server.local' },
+ purpose => 'server',
+ issuer => \@ca,
+ subjectAltNames => [
+ [ DNS => '*.server.local' ],
+ [ IP => '127.0.0.1' ],
+ [ DNS => 'www*.other.local' ],
+ [ DNS => 'smtp.mydomain.local' ],
+ [ DNS => 'xn--lwe-sna.idntest.local' ]
+ ],
+ %time_valid,
+);
+save('server-wildcard.pem',PEM_cert2string($swc[0]),PEM_key2string($swc[1]));
+
+
+my @subca = CERT_create(
+ CA => 1,
+ issuer => \@ca,
+ subject => { CN => 'IO::Socket::SSL Demo Sub CA' },
+ %time_valid,
+);
+save('test-subca.pem',PEM_cert2string($subca[0]));
+@server = CERT_create(
+ CA => 0,
+ subject => { CN => 'server.local' },
+ purpose => 'server',
+ issuer => \@subca,
+ %time_valid,
+);
+save('sub-server.pem',PEM_cert2string($server[0]).PEM_key2string($server[1]));
+
+
+
+my @cap = CERT_create(
+ CA => 1,
+ subject => { CN => 'IO::Socket::SSL::Intercept' },
+ %time_valid,
+);
+save('proxyca.pem',PEM_cert2string($cap[0]).PEM_key2string($cap[1]));
+
+sub save {
+ my $file = shift;
+ open(my $fd,'>',$dir.$file) or die $!;
+ print $fd @_;
+}
+
+system(<<CMD);
+cd $dir
+set -x
+openssl x509 -in server-cert.pem -out server-cert.der -outform der
+openssl rsa -in server-key.pem -out server-key.der -outform der
+openssl rsa -in server-key.pem -out server-key.enc -passout pass:bluebell
+openssl rsa -in client-key.pem -out client-key.enc -passout pass:opossum
+openssl pkcs12 -export -in server-cert.pem -inkey server-key.pem -out server.p12 -passout pass:
+openssl pkcs12 -export -in server-cert.pem -inkey server-key.pem -out server_enc.p12 -passout pass:bluebell
+CMD
diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm
index 1a1ef7d3..044b991c 100644
--- a/lib/PublicInbox/DS.pm
+++ b/lib/PublicInbox/DS.pm
@@ -465,7 +465,11 @@ next_buf:
}
} else { #($ref eq 'CODE') {
shift @$wbuf;
+ my $before = scalar(@$wbuf);
$bref->($self);
+
+ # bref may be enqueueing more CODE to call (see accept_tls_step)
+ return 0 if (scalar(@$wbuf) > $before);
}
} # while @$wbuf
@@ -479,7 +483,14 @@ sub do_read ($$$$) {
return ($r == 0 ? $self->close : $r) if defined $r;
# common for clients to break connections without warning,
# would be too noisy to log here:
- $! == EAGAIN ? $self->watch_in1 : $self->close;
+ if (ref($self) eq 'IO::Socket::SSL') {
+ my $ev = PublicInbox::TLS::epollbit() or return $self->close;
+ watch($self, $ev | EPOLLONESHOT);
+ } elsif ($! == EAGAIN) {
+ watch($self, EPOLLIN | EPOLLONESHOT);
+ } else {
+ $self->close;
+ }
}
# drop the socket if we hit unrecoverable errors on our system which
@@ -566,7 +577,7 @@ sub msg_more ($$) {
my $self = $_[0];
my $sock = $self->{sock} or return 1;
- if (MSG_MORE && !$self->{wbuf}) {
+ if (MSG_MORE && !$self->{wbuf} && ref($sock) ne 'IO::Socket::SSL') {
my $n = send($sock, $_[1], MSG_MORE);
if (defined $n) {
my $nlen = bytes::length($_[1]) - $n;
@@ -597,6 +608,19 @@ sub watch ($$) {
sub watch_in1 ($) { watch($_[0], EPOLLIN | EPOLLONESHOT) }
+# return true if complete, false if incomplete (or failure)
+sub accept_tls_step ($) {
+ my ($self) = @_;
+ my $sock = $self->{sock} or return;
+ return 1 if $sock->accept_SSL;
+ return $self->close if $! != EAGAIN;
+ if (my $ev = PublicInbox::TLS::epollbit()) {
+ unshift @{$self->{wbuf} ||= []}, \&accept_tls_step;
+ return watch($self, $ev | EPOLLONESHOT);
+ }
+ drop($self, 'BUG? EAGAIN but '.PublicInbox::TLS::err());
+}
+
package PublicInbox::DS::Timer;
# [$abs_float_firetime, $coderef];
sub cancel {
diff --git a/lib/PublicInbox/Daemon.pm b/lib/PublicInbox/Daemon.pm
index b8d6b572..24c13ad2 100644
--- a/lib/PublicInbox/Daemon.pm
+++ b/lib/PublicInbox/Daemon.pm
@@ -22,12 +22,48 @@ my (@cfg_listen, $stdout, $stderr, $group, $user, $pid_file, $daemonize);
my $worker_processes = 1;
my @listeners;
my %pids;
-my %listener_names;
+my %listener_names; # sockname => IO::Handle
+my %tls_opt; # scheme://sockname => args for IO::Socket::SSL->start_SSL
my $reexec_pid;
my $cleanup;
my ($uid, $gid);
+my ($default_cert, $default_key);
END { $cleanup->() if $cleanup };
+sub tls_listen ($$$) {
+ my ($scheme, $sockname, $opt_str) = @_;
+ # opt_str: opt1=val1,opt2=val2 (opt may repeat for multi-value)
+ require PublicInbox::TLS;
+ my $o = {};
+ # allow ',' as delimiter since '&' is shell-unfriendly
+ foreach (split(/[,&]/, $opt_str)) {
+ my ($k, $v) = split(/=/, $_, 2);
+ push @{$o->{$k} ||= []}, $v;
+ }
+
+ # key may be a part of cert. At least
+ # p5-io-socket-ssl/example/ssl_server.pl has this fallback:
+ $o->{cert} //= [ $default_cert ];
+ $o->{key} //= defined($default_key) ? [ $default_key ] : $o->{cert};
+ my %ctx_opt = (SSL_server => 1);
+ # parse out hostname:/path/to/ mappings:
+ foreach my $k (qw(cert key)) {
+ my $x = $ctx_opt{'SSL_'.$k.'_file'} = {};
+ foreach my $path (@{$o->{$k}}) {
+ my $host = '';
+ $path =~ s/\A([^:]+):// and $host = $1;
+ $x->{$host} = $path;
+ }
+ }
+ my $ctx = IO::Socket::SSL::SSL_Context->new(%ctx_opt) or
+ die 'SSL_Context->new: '.PublicInbox::TLS::err();
+ $tls_opt{"$scheme://$sockname"} = {
+ SSL_server => 1,
+ SSL_startHandshake => 0,
+ SSL_reuse_ctx => $ctx
+ };
+}
+
sub daemon_prepare ($) {
my ($default_listen) = @_;
@CMD = ($0, @ARGV);
@@ -42,6 +78,8 @@ sub daemon_prepare ($) {
'u|user=s' => \$user,
'g|group=s' => \$group,
'D|daemonize' => \$daemonize,
+ 'cert=s' => \$default_cert,
+ 'key=s' => \$default_key,
);
GetOptions(%opts) or die "bad command-line args\n";
@@ -55,6 +93,18 @@ sub daemon_prepare ($) {
push @cfg_listen, $default_listen unless (@listeners || @cfg_listen);
foreach my $l (@cfg_listen) {
+ my $orig = $l;
+ my $scheme = '';
+ $l =~ s!\A([^:]+)://!! and $scheme = $1;
+ if ($l =~ s!/?\?(.+)\z!!) {
+ tls_listen($scheme, $l, $1);
+ } elsif (defined($default_cert)) {
+ tls_listen($scheme, $l, '');
+ } elsif ($scheme =~ /\A(?:nntps|https)\z/) {
+ die "$orig specified w/o cert=\n";
+ }
+ # TODO: use scheme to load either NNTP.pm or HTTP.pm
+
next if $listener_names{$l}; # already inherited
my (%o, $sock_pkg);
if (index($l, '/') == 0) {
@@ -461,9 +511,26 @@ sub master_loop {
exit # never gets here, just for documentation
}
-sub daemon_loop ($$) {
- my ($refresh, $post_accept) = @_;
+sub tls_start_cb ($$) {
+ my ($opt, $orig_post_accept) = @_;
+ sub {
+ my ($io, $addr, $srv) = @_;
+ my $ssl = IO::Socket::SSL->start_SSL($io, %$opt);
+ $orig_post_accept->($ssl, $addr, $srv);
+ }
+}
+
+sub daemon_loop ($$$) {
+ my ($refresh, $post_accept, $nntpd) = @_;
PublicInbox::EvCleanup::enable(); # early for $refresh
+ my %post_accept;
+ while (my ($k, $v) = each %tls_opt) {
+ if ($k =~ s!\A(?:nntps|https)://!!) {
+ $post_accept{$k} = tls_start_cb($v, $post_accept);
+ } elsif ($nntpd) { # STARTTLS, $k eq '' is OK
+ $nntpd->{accept_tls} = $v;
+ }
+ }
my $parent_pipe;
if ($worker_processes > 0) {
$refresh->(); # preload by default
@@ -484,18 +551,19 @@ sub daemon_loop ($$) {
$SIG{$_} = 'IGNORE' for qw(USR2 TTIN TTOU WINCH);
# this calls epoll_create:
@listeners = map {
- PublicInbox::Listener->new($_, $post_accept)
+ PublicInbox::Listener->new($_,
+ $post_accept{sockname($_)} || $post_accept)
} @listeners;
PublicInbox::DS->EventLoop;
$parent_pipe = undef;
}
-sub run ($$$) {
- my ($default, $refresh, $post_accept) = @_;
+sub run ($$$;$) {
+ my ($default, $refresh, $post_accept, $nntpd) = @_;
daemon_prepare($default);
daemonize();
- daemon_loop($refresh, $post_accept);
+ daemon_loop($refresh, $post_accept, $nntpd);
}
sub do_chown ($) {
diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm
index a18641d3..659e44d5 100644
--- a/lib/PublicInbox/NNTP.pm
+++ b/lib/PublicInbox/NNTP.pm
@@ -1,4 +1,4 @@
-# Copyright (C) 2015-2018 all contributors <meta@public-inbox.org>
+# Copyright (C) 2015-2019 all contributors <meta@public-inbox.org>
# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
#
# Each instance of this represents a NNTP client socket
@@ -98,11 +98,19 @@ sub expire_old () {
sub new ($$$) {
my ($class, $sock, $nntpd) = @_;
my $self = fields::new($class);
- $self->SUPER::new($sock, EPOLLOUT | EPOLLONESHOT);
+ my $ev = EPOLLOUT | EPOLLONESHOT;
+ my $wbuf = [];
+ if (ref($sock) eq 'IO::Socket::SSL' && !$sock->accept_SSL) {
+ $ev = PublicInbox::TLS::epollbit() or return $sock->close;
+ $ev |= EPOLLONESHOT;
+ $wbuf->[0] = \&PublicInbox::DS::accept_tls_step;
+ }
+ $self->SUPER::new($sock, $ev);
$self->{nntpd} = $nntpd;
my $greet = "201 $nntpd->{servername} ready - post via email\r\n";
open my $fh, '<:scalar', \$greet or die "open :scalar: $!";
- $self->{wbuf} = [ $fh ];
+ push @$wbuf, $fh;
+ $self->{wbuf} = $wbuf;
$self->{rbuf} = '';
update_idle_time($self);
$expt ||= PublicInbox::EvCleanup::later(*expire_old);
@@ -900,6 +908,19 @@ sub cmd_xover ($;$) {
});
}
+sub cmd_starttls ($) {
+ my ($self) = @_;
+ my $sock = $self->{sock} or return;
+ # RFC 4642 2.2.1
+ (ref($sock) eq 'IO::Socket::SSL') and return '502 Command unavailable';
+ my $opt = $self->{nntpd}->{accept_tls} or
+ return '580 can not initiate TLS negotiation';
+ res($self, '382 Continue with TLS negotiation');
+ $self->{sock} = IO::Socket::SSL->start_SSL($sock, %$opt);
+ requeue($self) if PublicInbox::DS::accept_tls_step($self);
+ undef;
+}
+
sub cmd_xpath ($$) {
my ($self, $mid) = @_;
return r501 unless $mid =~ /\A<(.+)>\z/;
diff --git a/lib/PublicInbox/NNTPD.pm b/lib/PublicInbox/NNTPD.pm
index 32848d7c..6d9ffd5f 100644
--- a/lib/PublicInbox/NNTPD.pm
+++ b/lib/PublicInbox/NNTPD.pm
@@ -25,6 +25,7 @@ sub new {
out => \*STDOUT,
grouplist => [],
servername => $name,
+ # accept_tls => { SSL_server => 1, ..., SSL_reuse_ctx => ... }
}, $class;
}
diff --git a/lib/PublicInbox/TLS.pm b/lib/PublicInbox/TLS.pm
new file mode 100644
index 00000000..576c11d7
--- /dev/null
+++ b/lib/PublicInbox/TLS.pm
@@ -0,0 +1,24 @@
+# Copyright (C) 2019 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# IO::Socket::SSL support code
+package PublicInbox::TLS;
+use strict;
+use IO::Socket::SSL;
+require Carp;
+use Errno qw(EAGAIN);
+use PublicInbox::Syscall qw(EPOLLIN EPOLLOUT);
+
+sub err () { $SSL_ERROR }
+
+# returns the EPOLL event bit which matches the existing SSL error
+sub epollbit () {
+ if ($! == EAGAIN) {
+ return EPOLLIN if $SSL_ERROR == SSL_WANT_READ;
+ return EPOLLOUT if $SSL_ERROR == SSL_WANT_WRITE;
+ die "unexpected SSL error: $SSL_ERROR";
+ }
+ 0;
+}
+
+1;
diff --git a/script/public-inbox-nntpd b/script/public-inbox-nntpd
index 484ce8d6..55bf330e 100755
--- a/script/public-inbox-nntpd
+++ b/script/public-inbox-nntpd
@@ -11,4 +11,5 @@ require PublicInbox::NNTPD;
my $nntpd = PublicInbox::NNTPD->new;
PublicInbox::Daemon::run('0.0.0.0:119',
sub { $nntpd->refresh_groups }, # refresh
- sub ($$$) { PublicInbox::NNTP->new($_[0], $nntpd) }); # post_accept
+ sub ($$$) { PublicInbox::NNTP->new($_[0], $nntpd) }, # post_accept
+ $nntpd);
diff --git a/t/nntpd-tls.t b/t/nntpd-tls.t
new file mode 100644
index 00000000..53890ff2
--- /dev/null
+++ b/t/nntpd-tls.t
@@ -0,0 +1,156 @@
+# Copyright (C) 2019 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+use strict;
+use warnings;
+use Test::More;
+use File::Temp qw(tempdir);
+use Socket qw(SOCK_STREAM);
+foreach my $mod (qw(DBD::SQLite IO::Socket::SSL Net::NNTP)) {
+ eval "require $mod";
+ plan skip_all => "$mod missing for $0" if $@;
+}
+my $cert = 'certs/server-cert.pem';
+my $key = 'certs/server-key.pem';
+unless (-r $key && -r $cert) {
+ plan skip_all =>
+ "certs/ missing for $0, run ./create-certs.perl in certs/";
+}
+
+use_ok 'PublicInbox::TLS';
+use_ok 'IO::Socket::SSL';
+require './t/common.perl';
+require PublicInbox::InboxWritable;
+require PublicInbox::MIME;
+require PublicInbox::SearchIdx;
+my $version = 2; # v2 needs newer git
+require_git('2.6') if $version >= 2;
+my $tmpdir = tempdir('pi-nntpd-tls-XXXXXX', TMPDIR => 1, CLEANUP => 1);
+my $err = "$tmpdir/stderr.log";
+my $out = "$tmpdir/stdout.log";
+my $mainrepo = "$tmpdir";
+my $pi_config = "$tmpdir/pi_config";
+my $group = 'test-nntpd-tls';
+my $addr = $group . '@example.com';
+my $nntpd = 'blib/script/public-inbox-nntpd';
+my %opts = (
+ LocalAddr => '127.0.0.1',
+ ReuseAddr => 1,
+ Proto => 'tcp',
+ Type => SOCK_STREAM,
+ Listen => 1024,
+);
+my $starttls = IO::Socket::INET->new(%opts);
+my $nntps = IO::Socket::INET->new(%opts);
+my ($pid, $tail_pid);
+END {
+ foreach ($pid, $tail_pid) {
+ kill 'TERM', $_ if defined $_;
+ }
+};
+
+my $ibx = PublicInbox::Inbox->new({
+ mainrepo => $mainrepo,
+ name => 'nntpd-tls',
+ version => $version,
+ -primary_address => $addr,
+ indexlevel => 'basic',
+});
+$ibx = PublicInbox::InboxWritable->new($ibx, {nproc=>1});
+$ibx->init_inbox(0);
+{
+ open my $fh, '>', $pi_config or die "open: $!\n";
+ print $fh <<EOF
+[publicinbox "nntpd-tls"]
+ mainrepo = $mainrepo
+ address = $addr
+ indexlevel = basic
+ newsgroup = $group
+EOF
+ ;
+ close $fh or die "close: $!\n";
+}
+
+{
+ my $im = $ibx->importer(0);
+ my $mime = PublicInbox::MIME->new(do {
+ open my $fh, '<', 't/data/0001.patch' or die;
+ local $/;
+ <$fh>
+ });
+ ok($im->add($mime), 'message added');
+ $im->done;
+ if ($version == 1) {
+ my $s = PublicInbox::SearchIdx->new($ibx, 1);
+ $s->index_sync;
+ }
+}
+
+my $nntps_addr = $nntps->sockhost . ':' . $nntps->sockport;
+my $starttls_addr = $starttls->sockhost . ':' . $starttls->sockport;
+my $env = { PI_CONFIG => $pi_config };
+
+for my $args (
+ [ "--cert=$cert", "--key=$key",
+ "-lnntps://$nntps_addr",
+ "-lnntp://$starttls_addr" ],
+) {
+ for ($out, $err) {
+ open my $fh, '>', $_ or die "truncate: $!";
+ }
+ if (my $tail_cmd = $ENV{TAIL}) { # don't assume GNU tail
+ $tail_pid = fork;
+ if (defined $tail_pid && $tail_pid == 0) {
+ exec(split(' ', $tail_cmd), $out, $err);
+ }
+ }
+ my $cmd = [ $nntpd, '-W0', @$args, "--stdout=$out", "--stderr=$err" ];
+ $pid = spawn_listener($env, $cmd, [ $starttls, $nntps ]);
+ my %o = (
+ SSL_hostname => 'server.local',
+ SSL_verifycn_name => 'server.local',
+ SSL => 1,
+ SSL_verify_mode => SSL_VERIFY_PEER(),
+ SSL_ca_file => 'certs/test-ca.pem',
+ );
+ my $expect = { $group => [qw(1 1 n)] };
+
+ # NNTPS
+ my $c = Net::NNTP->new($nntps_addr, %o);
+ my $list = $c->list;
+ is_deeply($list, $expect, 'NNTPS LIST works');
+
+ # STARTTLS
+ delete $o{SSL};
+ $c = Net::NNTP->new($starttls_addr, %o);
+ $list = $c->list;
+ is_deeply($list, $expect, 'plain LIST works');
+ ok($c->starttls, 'STARTTLS succeeds');
+ is($c->code, 382, 'got 382 for STARTTLS');
+ $list = $c->list;
+ is_deeply($list, $expect, 'LIST works after STARTTLS');
+
+ # Net::NNTP won't let us do dumb things, but we need to test
+ # dumb things, so use Net::Cmd directly:
+ my $n = $c->command('STARTTLS')->response();
+ is($n, Net::Cmd::CMD_ERROR(), 'error attempting STARTTLS again');
+ is($c->code, 502, '502 according to RFC 4642 sec#2.2.1');
+
+ $c = undef;
+ kill('TERM', $pid);
+ is($pid, waitpid($pid, 0), 'nntpd exited successfully');
+ is($?, 0, 'no error in exited process');
+ $pid = undef;
+ my $eout = eval {
+ open my $fh, '<', $err or die "open $err failed: $!";
+ local $/;
+ <$fh>;
+ };
+ unlike($eout, qr/wide/i, 'no Wide character warnings');
+ if (defined $tail_pid) {
+ kill 'TERM', $tail_pid;
+ waitpid($tail_pid, 0);
+ $tail_pid = undef;
+ }
+}
+done_testing();
+1;
diff --git a/t/nntpd.t b/t/nntpd.t
index c37880bf..6cba2be4 100644
--- a/t/nntpd.t
+++ b/t/nntpd.t
@@ -106,6 +106,8 @@ EOF
is_deeply($list, { $group => [ qw(1 1 n) ] }, 'LIST works');
is_deeply([$n->group($group)], [ qw(0 1 1), $group ], 'GROUP works');
is_deeply($n->listgroup($group), [1], 'listgroup OK');
+ ok(!$n->starttls, 'STARTTLS fails when unconfigured');
+ is($n->code, 580, 'got 580 code on server w/o TLS');
%opts = (
PeerAddr => $host_port,
--
EW
^ permalink raw reply related [flat|nested] 61+ messages in thread
* [PATCH 38/57] certs/create-certs.perl: fix cert validity on 32-bit
2019-06-24 2:52 [PATCH 00/57] ds: shrink, TLS support, buffer writes to FS Eric Wong
` (36 preceding siblings ...)
2019-06-24 2:52 ` [PATCH 37/57] nntp: NNTPS and NNTP+STARTTLS working Eric Wong
@ 2019-06-24 2:52 ` Eric Wong
2019-06-24 2:52 ` [PATCH 39/57] daemon: map inherited sockets to well-known schemes Eric Wong
` (20 subsequent siblings)
58 siblings, 0 replies; 61+ messages in thread
From: Eric Wong @ 2019-06-24 2:52 UTC (permalink / raw)
To: meta
If I'm still alive, I won't be coding after 2038 :<
---
certs/create-certs.perl | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/certs/create-certs.perl b/certs/create-certs.perl
index bfd8e5f1..476be4d7 100755
--- a/certs/create-certs.perl
+++ b/certs/create-certs.perl
@@ -8,7 +8,7 @@ use Net::SSLeay;
my $dir = "./";
my $now = time();
-my $later = $now + 100*365*86400;
+my $later = 0x7fffffff; # 2038 problems on 32-bit :<
Net::SSLeay::SSLeay_add_ssl_algorithms();
my $sha256 = Net::SSLeay::EVP_get_digestbyname('sha256') or die;
--
EW
^ permalink raw reply related [flat|nested] 61+ messages in thread
* [PATCH 39/57] daemon: map inherited sockets to well-known schemes
2019-06-24 2:52 [PATCH 00/57] ds: shrink, TLS support, buffer writes to FS Eric Wong
` (37 preceding siblings ...)
2019-06-24 2:52 ` [PATCH 38/57] certs/create-certs.perl: fix cert validity on 32-bit Eric Wong
@ 2019-06-24 2:52 ` Eric Wong
2019-06-24 2:52 ` [PATCH 40/57] ds|nntp: use CORE::close on socket Eric Wong
` (19 subsequent siblings)
58 siblings, 0 replies; 61+ messages in thread
From: Eric Wong @ 2019-06-24 2:52 UTC (permalink / raw)
To: meta
I don't want to specify "--listen" in my systemd .service files,
so map 563 to NNTPS automatically (and 443 to HTTPS, but HTTPS
support doesn't work, yet).
---
lib/PublicInbox/Daemon.pm | 42 +++++++++++++++++++++++++++++----------
1 file changed, 32 insertions(+), 10 deletions(-)
diff --git a/lib/PublicInbox/Daemon.pm b/lib/PublicInbox/Daemon.pm
index 24c13ad2..55103f40 100644
--- a/lib/PublicInbox/Daemon.pm
+++ b/lib/PublicInbox/Daemon.pm
@@ -29,9 +29,11 @@ my $cleanup;
my ($uid, $gid);
my ($default_cert, $default_key);
END { $cleanup->() if $cleanup };
+my %KNOWN_TLS = ( 443 => 'https', 563 => 'nntps' );
+my %KNOWN_STARTTLS = ( 119 => 'nntp' );
-sub tls_listen ($$$) {
- my ($scheme, $sockname, $opt_str) = @_;
+sub accept_tls_opt ($) {
+ my ($opt_str) = @_;
# opt_str: opt1=val1,opt2=val2 (opt may repeat for multi-value)
require PublicInbox::TLS;
my $o = {};
@@ -57,11 +59,7 @@ sub tls_listen ($$$) {
}
my $ctx = IO::Socket::SSL::SSL_Context->new(%ctx_opt) or
die 'SSL_Context->new: '.PublicInbox::TLS::err();
- $tls_opt{"$scheme://$sockname"} = {
- SSL_server => 1,
- SSL_startHandshake => 0,
- SSL_reuse_ctx => $ctx
- };
+ { SSL_server => 1, SSL_startHandshake => 0, SSL_reuse_ctx => $ctx };
}
sub daemon_prepare ($) {
@@ -87,6 +85,11 @@ sub daemon_prepare ($) {
die "--pid-file cannot end with '.oldbin'\n";
}
@listeners = inherit();
+
+ # allow socket-activation users to set certs once and not
+ # have to configure each socket:
+ my @inherited_names = keys(%listener_names) if defined($default_cert);
+
# ignore daemonize when inheriting
$daemonize = undef if scalar @listeners;
@@ -95,11 +98,16 @@ sub daemon_prepare ($) {
foreach my $l (@cfg_listen) {
my $orig = $l;
my $scheme = '';
- $l =~ s!\A([^:]+)://!! and $scheme = $1;
+ if ($l =~ s!\A([^:]+)://!!) {
+ $scheme = $1;
+ } elsif ($l =~ /\A(?:\[[^\]]+\]|[^:]+):([0-9])+/) {
+ my $s = $KNOWN_TLS{$1} // $KNOWN_STARTTLS{$1};
+ $scheme = $s if defined $s;
+ }
if ($l =~ s!/?\?(.+)\z!!) {
- tls_listen($scheme, $l, $1);
+ $tls_opt{"$scheme://$l"} = accept_tls_opt($1);
} elsif (defined($default_cert)) {
- tls_listen($scheme, $l, '');
+ $tls_opt{"$scheme://$l"} = accept_tls_opt('');
} elsif ($scheme =~ /\A(?:nntps|https)\z/) {
die "$orig specified w/o cert=\n";
}
@@ -141,6 +149,20 @@ sub daemon_prepare ($) {
push @listeners, $s;
}
}
+
+ # cert/key options in @cfg_listen takes precedence when inheriting,
+ # but map well-known inherited ports if --listen isn't specified
+ # at all
+ for my $sockname (@inherited_names) {
+ $sockname =~ /:([0-9]+)\z/ or next;
+ if (my $scheme = $KNOWN_TLS{$1}) {
+ $tls_opt{"$scheme://$sockname"} ||= accept_tls_opt('');
+ } elsif (($scheme = $KNOWN_STARTTLS{$1})) {
+ next if $tls_opt{"$scheme://$sockname"};
+ $tls_opt{''} ||= accept_tls_opt('');
+ }
+ }
+
die "No listeners bound\n" unless @listeners;
}
--
EW
^ permalink raw reply related [flat|nested] 61+ messages in thread
* [PATCH 40/57] ds|nntp: use CORE::close on socket
2019-06-24 2:52 [PATCH 00/57] ds: shrink, TLS support, buffer writes to FS Eric Wong
` (38 preceding siblings ...)
2019-06-24 2:52 ` [PATCH 39/57] daemon: map inherited sockets to well-known schemes Eric Wong
@ 2019-06-24 2:52 ` Eric Wong
2019-06-24 2:52 ` [PATCH 41/57] nntp: call SSL_shutdown in normal cases Eric Wong
` (18 subsequent siblings)
58 siblings, 0 replies; 61+ messages in thread
From: Eric Wong @ 2019-06-24 2:52 UTC (permalink / raw)
To: meta
IO::Socket::SSL will try to re-bless back to the original class
on TLS negotiation failure. Unfortunately, the original class
is 'GLOB', and re-blessing to 'GLOB' takes away all the IO::Handle
methods, because Filehandle/IO are a special case in Perl5.
Anyways, since we already use syswrite() and sysread() as functions
on our socket, we might as well use CORE::close(), as well (and
it plays nicely with tied classes).
---
lib/PublicInbox/DS.pm | 4 ++--
lib/PublicInbox/NNTP.pm | 2 +-
t/nntpd-tls.t | 17 +++++++++++++++++
3 files changed, 20 insertions(+), 3 deletions(-)
diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm
index 044b991c..2c886b4e 100644
--- a/lib/PublicInbox/DS.pm
+++ b/lib/PublicInbox/DS.pm
@@ -293,8 +293,8 @@ sub PostEventLoop {
while (my $sock = shift @ToClose) {
my $fd = fileno($sock);
- # close the socket. (not a PublicInbox::DS close)
- $sock->close;
+ # close the socket. (not a PublicInbox::DS close)
+ CORE::close($sock);
# and now we can finally remove the fd from the map. see
# comment above in ->close.
diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm
index 659e44d5..8840adbb 100644
--- a/lib/PublicInbox/NNTP.pm
+++ b/lib/PublicInbox/NNTP.pm
@@ -101,7 +101,7 @@ sub new ($$$) {
my $ev = EPOLLOUT | EPOLLONESHOT;
my $wbuf = [];
if (ref($sock) eq 'IO::Socket::SSL' && !$sock->accept_SSL) {
- $ev = PublicInbox::TLS::epollbit() or return $sock->close;
+ $ev = PublicInbox::TLS::epollbit() or return CORE::close($sock);
$ev |= EPOLLONESHOT;
$wbuf->[0] = \&PublicInbox::DS::accept_tls_step;
}
diff --git a/t/nntpd-tls.t b/t/nntpd-tls.t
index 53890ff2..4727ee5b 100644
--- a/t/nntpd-tls.t
+++ b/t/nntpd-tls.t
@@ -135,6 +135,23 @@ for my $args (
is($n, Net::Cmd::CMD_ERROR(), 'error attempting STARTTLS again');
is($c->code, 502, '502 according to RFC 4642 sec#2.2.1');
+ # STARTTLS with bad hostname
+ $o{SSL_hostname} = $o{SSL_verifycn_name} = 'server.invalid';
+ $c = Net::NNTP->new($starttls_addr, %o);
+ $list = $c->list;
+ is_deeply($list, $expect, 'plain LIST works again');
+ ok(!$c->starttls, 'STARTTLS fails with bad hostname');
+ $c = Net::NNTP->new($starttls_addr, %o);
+ $list = $c->list;
+ is_deeply($list, $expect, 'not broken after bad negotiation');
+
+ # NNTPS with bad hostname
+ $c = Net::NNTP->new($nntps_addr, %o, SSL => 1);
+ is($c, undef, 'NNTPS fails with bad hostname');
+ $o{SSL_hostname} = $o{SSL_verifycn_name} = 'server.local';
+ $c = Net::NNTP->new($nntps_addr, %o, SSL => 1);
+ ok($c, 'NNTPS succeeds again with valid hostname');
+
$c = undef;
kill('TERM', $pid);
is($pid, waitpid($pid, 0), 'nntpd exited successfully');
--
EW
^ permalink raw reply related [flat|nested] 61+ messages in thread
* [PATCH 41/57] nntp: call SSL_shutdown in normal cases
2019-06-24 2:52 [PATCH 00/57] ds: shrink, TLS support, buffer writes to FS Eric Wong
` (39 preceding siblings ...)
2019-06-24 2:52 ` [PATCH 40/57] ds|nntp: use CORE::close on socket Eric Wong
@ 2019-06-24 2:52 ` Eric Wong
2019-06-24 2:52 ` [PATCH 42/57] t/nntpd-tls: slow client connection test Eric Wong
` (17 subsequent siblings)
58 siblings, 0 replies; 61+ messages in thread
From: Eric Wong @ 2019-06-24 2:52 UTC (permalink / raw)
To: meta
This is in accordance with TLS standards and will be needed
to support session caching/reuse in the future. However, we
don't issue shutdown(2) since we know not to inadvertantly
share our sockets with other processes.
---
lib/PublicInbox/DS.pm | 24 ++++++++++++++++++++++++
lib/PublicInbox/NNTP.pm | 12 +++++++++---
t/nntpd-tls.t | 2 ++
3 files changed, 35 insertions(+), 3 deletions(-)
diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm
index 2c886b4e..2aa9e3d2 100644
--- a/lib/PublicInbox/DS.pm
+++ b/lib/PublicInbox/DS.pm
@@ -621,6 +621,30 @@ sub accept_tls_step ($) {
drop($self, 'BUG? EAGAIN but '.PublicInbox::TLS::err());
}
+sub shutdn_tls_step ($) {
+ my ($self) = @_;
+ my $sock = $self->{sock} or return;
+ return $self->close if $sock->stop_SSL(SSL_fast_shutdown => 1);
+ return $self->close if $! != EAGAIN;
+ if (my $ev = PublicInbox::TLS::epollbit()) {
+ unshift @{$self->{wbuf} ||= []}, \&shutdn_tls_step;
+ return watch($self, $ev | EPOLLONESHOT);
+ }
+ drop($self, 'BUG? EAGAIN but '.PublicInbox::TLS::err());
+}
+
+# don't bother with shutdown($sock, 2), we don't fork+exec w/o CLOEXEC
+# or fork w/o exec, so no inadvertant socket sharing
+sub shutdn ($) {
+ my ($self) = @_;
+ my $sock = $self->{sock} or return;
+ if (ref($sock) eq 'IO::Socket::SSL') {
+ shutdn_tls_step($self);
+ } else {
+ $self->close;
+ }
+}
+
package PublicInbox::DS::Timer;
# [$abs_float_firetime, $coderef];
sub cancel {
diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm
index 8840adbb..53de2bca 100644
--- a/lib/PublicInbox/NNTP.pm
+++ b/lib/PublicInbox/NNTP.pm
@@ -74,11 +74,17 @@ sub expire_old () {
my $exp = $EXPTIME;
my $old = $now - $exp;
my $nr = 0;
+ my $closed = 0;
my %new;
while (my ($fd, $v) = each %$EXPMAP) {
my ($idle_time, $nntp) = @$v;
if ($idle_time < $old) {
- $nntp->close; # idempotent
+ if ($nntp->shutdn) {
+ $closed++;
+ } else {
+ ++$nr;
+ $new{$fd} = $v;
+ }
} else {
++$nr;
$new{$fd} = $v;
@@ -91,7 +97,7 @@ sub expire_old () {
$expt = undef;
# noop to kick outselves out of the loop ASAP so descriptors
# really get closed
- PublicInbox::EvCleanup::asap(sub {});
+ PublicInbox::EvCleanup::asap(sub {}) if $closed;
}
}
@@ -410,7 +416,7 @@ sub cmd_post ($) {
sub cmd_quit ($) {
my ($self) = @_;
res($self, '205 closing connection - goodbye!');
- $self->close;
+ $self->shutdn;
undef;
}
diff --git a/t/nntpd-tls.t b/t/nntpd-tls.t
index 4727ee5b..00b03b66 100644
--- a/t/nntpd-tls.t
+++ b/t/nntpd-tls.t
@@ -118,6 +118,8 @@ for my $args (
my $c = Net::NNTP->new($nntps_addr, %o);
my $list = $c->list;
is_deeply($list, $expect, 'NNTPS LIST works');
+ is($c->command('QUIT')->response(), Net::Cmd::CMD_OK(), 'QUIT works');
+ is(0, sysread($c, my $buf, 1), 'got EOF after QUIT');
# STARTTLS
delete $o{SSL};
--
EW
^ permalink raw reply related [flat|nested] 61+ messages in thread
* [PATCH 42/57] t/nntpd-tls: slow client connection test
2019-06-24 2:52 [PATCH 00/57] ds: shrink, TLS support, buffer writes to FS Eric Wong
` (40 preceding siblings ...)
2019-06-24 2:52 ` [PATCH 41/57] nntp: call SSL_shutdown in normal cases Eric Wong
@ 2019-06-24 2:52 ` Eric Wong
2019-06-24 2:52 ` [PATCH 43/57] daemon: use SSL_MODE_RELEASE_BUFFERS Eric Wong
` (16 subsequent siblings)
58 siblings, 0 replies; 61+ messages in thread
From: Eric Wong @ 2019-06-24 2:52 UTC (permalink / raw)
To: meta
We need to ensure slowly negotiating TLS clients don't block
the event loop. This is why I added the size check of
{wbuf} before and after calling the CODE ref in DS::flush_write.
---
t/nntpd-tls.t | 36 ++++++++++++++++++++++++++++++++----
1 file changed, 32 insertions(+), 4 deletions(-)
diff --git a/t/nntpd-tls.t b/t/nntpd-tls.t
index 00b03b66..e8fb63b4 100644
--- a/t/nntpd-tls.t
+++ b/t/nntpd-tls.t
@@ -5,7 +5,9 @@ use warnings;
use Test::More;
use File::Temp qw(tempdir);
use Socket qw(SOCK_STREAM);
-foreach my $mod (qw(DBD::SQLite IO::Socket::SSL Net::NNTP)) {
+# IO::Poll and Net::NNTP are part of the standard library, but
+# distros may split them off...
+foreach my $mod (qw(DBD::SQLite IO::Socket::SSL Net::NNTP IO::Poll)) {
eval "require $mod";
plan skip_all => "$mod missing for $0" if $@;
}
@@ -108,21 +110,32 @@ for my $args (
my %o = (
SSL_hostname => 'server.local',
SSL_verifycn_name => 'server.local',
- SSL => 1,
SSL_verify_mode => SSL_VERIFY_PEER(),
SSL_ca_file => 'certs/test-ca.pem',
);
my $expect = { $group => [qw(1 1 n)] };
+ # start negotiating a slow TLS connection
+ my $slow = IO::Socket::INET->new(
+ Proto => 'tcp',
+ PeerAddr => $nntps_addr,
+ Type => SOCK_STREAM,
+ Blocking => 0,
+ );
+ $slow = IO::Socket::SSL->start_SSL($slow, SSL_startHandshake => 0, %o);
+ my $slow_done = $slow->connect_SSL;
+ diag('W: connect_SSL early OK, slow client test invalid') if $slow_done;
+ my @poll = (fileno($slow), PublicInbox::TLS::epollbit());
+ # we should call connect_SSL much later...
+
# NNTPS
- my $c = Net::NNTP->new($nntps_addr, %o);
+ my $c = Net::NNTP->new($nntps_addr, %o, SSL => 1);
my $list = $c->list;
is_deeply($list, $expect, 'NNTPS LIST works');
is($c->command('QUIT')->response(), Net::Cmd::CMD_OK(), 'QUIT works');
is(0, sysread($c, my $buf, 1), 'got EOF after QUIT');
# STARTTLS
- delete $o{SSL};
$c = Net::NNTP->new($starttls_addr, %o);
$list = $c->list;
is_deeply($list, $expect, 'plain LIST works');
@@ -154,6 +167,21 @@ for my $args (
$c = Net::NNTP->new($nntps_addr, %o, SSL => 1);
ok($c, 'NNTPS succeeds again with valid hostname');
+ # slow TLS connection did not block the other fast clients while
+ # connecting, finish it off:
+ until ($slow_done) {
+ IO::Poll::_poll(-1, @poll);
+ $slow_done = $slow->connect_SSL and last;
+ @poll = (fileno($slow), PublicInbox::TLS::epollbit());
+ }
+ $slow->blocking(1);
+ ok(sysread($slow, my $greet, 4096) > 0, 'slow got greeting');
+ like($greet, qr/\A201 /, 'got expected greeting');
+ is(syswrite($slow, "QUIT\r\n"), 6, 'slow wrote QUIT');
+ ok(sysread($slow, my $end, 4096) > 0, 'got EOF');
+ is(sysread($slow, my $eof, 4096), 0, 'got EOF');
+ $slow = undef;
+
$c = undef;
kill('TERM', $pid);
is($pid, waitpid($pid, 0), 'nntpd exited successfully');
--
EW
^ permalink raw reply related [flat|nested] 61+ messages in thread
* [PATCH 43/57] daemon: use SSL_MODE_RELEASE_BUFFERS
2019-06-24 2:52 [PATCH 00/57] ds: shrink, TLS support, buffer writes to FS Eric Wong
` (41 preceding siblings ...)
2019-06-24 2:52 ` [PATCH 42/57] t/nntpd-tls: slow client connection test Eric Wong
@ 2019-06-24 2:52 ` Eric Wong
2019-06-24 2:52 ` [PATCH 44/57] ds: allow ->write callbacks to syswrite directly Eric Wong
` (15 subsequent siblings)
58 siblings, 0 replies; 61+ messages in thread
From: Eric Wong @ 2019-06-24 2:52 UTC (permalink / raw)
To: meta
34K per idle connection adds up to large amounts of memory;
especially with the speed of malloc nowadays compared to the
cost of cache misses or worse, swapping.
---
lib/PublicInbox/Daemon.pm | 10 ++++++++++
1 file changed, 10 insertions(+)
diff --git a/lib/PublicInbox/Daemon.pm b/lib/PublicInbox/Daemon.pm
index 55103f40..c4481555 100644
--- a/lib/PublicInbox/Daemon.pm
+++ b/lib/PublicInbox/Daemon.pm
@@ -59,6 +59,16 @@ sub accept_tls_opt ($) {
}
my $ctx = IO::Socket::SSL::SSL_Context->new(%ctx_opt) or
die 'SSL_Context->new: '.PublicInbox::TLS::err();
+
+ # save ~34K per idle connection (cf. SSL_CTX_set_mode(3ssl))
+ # RSS goes from 346MB to 171MB with 10K idle NNTPS clients on amd64
+ # cf. https://rt.cpan.org/Ticket/Display.html?id=129463
+ my $mode = eval { Net::SSLeay::MODE_RELEASE_BUFFERS() };
+ if ($mode && $ctx->{context}) {
+ eval { Net::SSLeay::CTX_set_mode($ctx->{context}, $mode) };
+ warn "W: $@ (setting SSL_MODE_RELEASE_BUFFERS)\n" if $@;
+ }
+
{ SSL_server => 1, SSL_startHandshake => 0, SSL_reuse_ctx => $ctx };
}
--
EW
^ permalink raw reply related [flat|nested] 61+ messages in thread
* [PATCH 44/57] ds: allow ->write callbacks to syswrite directly
2019-06-24 2:52 [PATCH 00/57] ds: shrink, TLS support, buffer writes to FS Eric Wong
` (42 preceding siblings ...)
2019-06-24 2:52 ` [PATCH 43/57] daemon: use SSL_MODE_RELEASE_BUFFERS Eric Wong
@ 2019-06-24 2:52 ` Eric Wong
2019-06-24 2:52 ` [PATCH 45/57] nntp: reduce allocations for greeting Eric Wong
` (14 subsequent siblings)
58 siblings, 0 replies; 61+ messages in thread
From: Eric Wong @ 2019-06-24 2:52 UTC (permalink / raw)
To: meta
We can bypass buffering when wbuf is empty when it's called
from a CODE reference passed to ->write.
---
lib/PublicInbox/DS.pm | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm
index 2aa9e3d2..0e48ed07 100644
--- a/lib/PublicInbox/DS.pm
+++ b/lib/PublicInbox/DS.pm
@@ -537,7 +537,8 @@ sub write {
my $sock = $self->{sock} or return 1;
my $ref = ref $data;
my $bref = $ref ? $data : \$data;
- if (my $wbuf = $self->{wbuf}) { # already buffering, can't write more...
+ my $wbuf = $self->{wbuf};
+ if ($wbuf && scalar(@$wbuf)) { # already buffering, can't write more...
if ($ref eq 'CODE') {
push @$wbuf, $bref;
} else {
--
EW
^ permalink raw reply related [flat|nested] 61+ messages in thread
* [PATCH 45/57] nntp: reduce allocations for greeting
2019-06-24 2:52 [PATCH 00/57] ds: shrink, TLS support, buffer writes to FS Eric Wong
` (43 preceding siblings ...)
2019-06-24 2:52 ` [PATCH 44/57] ds: allow ->write callbacks to syswrite directly Eric Wong
@ 2019-06-24 2:52 ` Eric Wong
2019-06-24 2:52 ` [PATCH 46/57] ds: always use EV_ADD with EV_SET Eric Wong
` (13 subsequent siblings)
58 siblings, 0 replies; 61+ messages in thread
From: Eric Wong @ 2019-06-24 2:52 UTC (permalink / raw)
To: meta
No need to allocate a new PerlIO::scalar filehandle for every
client, instead we can now pass the same CODE reference which
calls DS->write on a reused string reference.
---
lib/PublicInbox/NNTP.pm | 6 +++---
lib/PublicInbox/NNTPD.pm | 1 +
2 files changed, 4 insertions(+), 3 deletions(-)
diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm
index 53de2bca..12ce4e68 100644
--- a/lib/PublicInbox/NNTP.pm
+++ b/lib/PublicInbox/NNTP.pm
@@ -101,6 +101,8 @@ sub expire_old () {
}
}
+sub greet ($) { $_[0]->write($_[0]->{nntpd}->{greet}) };
+
sub new ($$$) {
my ($class, $sock, $nntpd) = @_;
my $self = fields::new($class);
@@ -113,9 +115,7 @@ sub new ($$$) {
}
$self->SUPER::new($sock, $ev);
$self->{nntpd} = $nntpd;
- my $greet = "201 $nntpd->{servername} ready - post via email\r\n";
- open my $fh, '<:scalar', \$greet or die "open :scalar: $!";
- push @$wbuf, $fh;
+ push @$wbuf, \&greet;
$self->{wbuf} = $wbuf;
$self->{rbuf} = '';
update_idle_time($self);
diff --git a/lib/PublicInbox/NNTPD.pm b/lib/PublicInbox/NNTPD.pm
index 6d9ffd5f..4f30c5d9 100644
--- a/lib/PublicInbox/NNTPD.pm
+++ b/lib/PublicInbox/NNTPD.pm
@@ -25,6 +25,7 @@ sub new {
out => \*STDOUT,
grouplist => [],
servername => $name,
+ greet => \"201 $name ready - post via email\r\n",
# accept_tls => { SSL_server => 1, ..., SSL_reuse_ctx => ... }
}, $class;
}
--
EW
^ permalink raw reply related [flat|nested] 61+ messages in thread
* [PATCH 46/57] ds: always use EV_ADD with EV_SET
2019-06-24 2:52 [PATCH 00/57] ds: shrink, TLS support, buffer writes to FS Eric Wong
` (44 preceding siblings ...)
2019-06-24 2:52 ` [PATCH 45/57] nntp: reduce allocations for greeting Eric Wong
@ 2019-06-24 2:52 ` Eric Wong
2019-06-24 2:52 ` [PATCH 47/57] nntp: simplify long response logic and fix nesting Eric Wong
` (12 subsequent siblings)
58 siblings, 0 replies; 61+ messages in thread
From: Eric Wong @ 2019-06-24 2:52 UTC (permalink / raw)
To: meta
kqueue EV_ONESHOT semantics are different than epoll
EPOLLONESHOT. epoll only disables watches for that event while
keeping the item in the rbtree for future EPOLL_CTL_MOD. kqueue
removes the watch from the filter set entirely, necessitating
the use of EV_ADD for future modifications.
---
lib/PublicInbox/DS.pm | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm
index 0e48ed07..8f77ce24 100644
--- a/lib/PublicInbox/DS.pm
+++ b/lib/PublicInbox/DS.pm
@@ -321,7 +321,7 @@ sub kq_flag ($$) {
my $fl = EV_ADD() | EV_ENABLE();
($ev & EPOLLONESHOT) ? ($fl|EV_ONESHOT()) : $fl;
} else {
- EV_DISABLE();
+ EV_ADD() | EV_DISABLE();
}
}
@@ -364,8 +364,8 @@ retry:
}
}
elsif ($HaveKQueue) {
- $KQueue->EV_SET($fd, EVFILT_READ(), EV_ADD() | kq_flag(EPOLLIN, $ev));
- $KQueue->EV_SET($fd, EVFILT_WRITE(), EV_ADD() | kq_flag(EPOLLOUT, $ev));
+ $KQueue->EV_SET($fd, EVFILT_READ(), kq_flag(EPOLLIN, $ev));
+ $KQueue->EV_SET($fd, EVFILT_WRITE(), kq_flag(EPOLLOUT, $ev));
}
Carp::cluck("PublicInbox::DS::new blowing away existing descriptor map for fd=$fd ($DescriptorMap{$fd})")
--
EW
^ permalink raw reply related [flat|nested] 61+ messages in thread
* [PATCH 47/57] nntp: simplify long response logic and fix nesting
2019-06-24 2:52 [PATCH 00/57] ds: shrink, TLS support, buffer writes to FS Eric Wong
` (45 preceding siblings ...)
2019-06-24 2:52 ` [PATCH 46/57] ds: always use EV_ADD with EV_SET Eric Wong
@ 2019-06-24 2:52 ` Eric Wong
2019-06-24 2:52 ` [PATCH 48/57] ds: flush_write runs ->write callbacks even if closed Eric Wong
` (11 subsequent siblings)
58 siblings, 0 replies; 61+ messages in thread
From: Eric Wong @ 2019-06-24 2:52 UTC (permalink / raw)
To: meta
We can get rid of the {long_res} field and reuse the write
buffer ordering logic to prevent nesting of responses from
requeue.
On FreeBSD, this fixes a problem of callbacks firing twice
because kqueue as event_step is now our only callback entry
point.
There's a slight change in the stdout "logging" format, in
that we can no longer distinguish between writes blocked
due to slow clients or deferred long responses. Not sure
if this affects anybody parsing logs or not, but preserving
the old format could prove expensive and not worth the
effort.
---
lib/PublicInbox/NNTP.pm | 61 +++++++++++++++++------------------------
1 file changed, 25 insertions(+), 36 deletions(-)
diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm
index 12ce4e68..6acfcc1b 100644
--- a/lib/PublicInbox/NNTP.pm
+++ b/lib/PublicInbox/NNTP.pm
@@ -6,7 +6,7 @@ package PublicInbox::NNTP;
use strict;
use warnings;
use base qw(PublicInbox::DS);
-use fields qw(nntpd article rbuf ng long_res);
+use fields qw(nntpd article rbuf ng);
use PublicInbox::Search;
use PublicInbox::Msgmap;
use PublicInbox::MID qw(mid_escape);
@@ -45,17 +45,7 @@ sub next_tick () {
$nextt = undef;
my $q = $nextq;
$nextq = [];
- foreach my $nntp (@$q) {
- # for request && response protocols, always finish writing
- # before finishing reading:
- if (my $long_cb = $nntp->{long_res}) {
- $nntp->write($long_cb);
- } else {
- # pipelined request, we bypassed socket-readiness
- # checks to get here:
- event_step($nntp);
- }
- }
+ event_step($_) for @$q;
}
sub requeue ($) {
@@ -633,8 +623,7 @@ sub get_range ($$) {
}
sub long_response ($$) {
- my ($self, $cb) = @_;
- die "BUG: nested long response" if $self->{long_res};
+ my ($self, $cb) = @_; # cb returns true if more, false if done
my $fd = fileno($self->{sock});
defined $fd or return;
@@ -642,36 +631,38 @@ sub long_response ($$) {
# clients should not be sending us stuff and making us do more
# work while we are stream a response to them
my $t0 = now();
- $self->{long_res} = sub {
+ my $long_cb; # DANGER: self-referential
+ $long_cb = sub {
+ # wbuf is unset or empty, here; $cb may add to it
my $more = eval { $cb->() };
if ($@ || !$self->{sock}) { # something bad happened...
- delete $self->{long_res};
-
+ $long_cb = undef;
+ my $diff = now() - $t0;
if ($@) {
err($self,
"%s during long response[$fd] - %0.6f",
- $@, now() - $t0);
- }
- if ($self->{sock}) {
- update_idle_time($self);
- requeue($self);
- } else {
- out($self, " deferred[$fd] aborted - %0.6f",
- now() - $t0);
+ $@, $diff);
}
+ out($self, " deferred[$fd] aborted - %0.6f", $diff);
+ $self->close;
} elsif ($more) { # $self->{wbuf}:
+ update_idle_time($self);
+
# no recursion, schedule another call ASAP
# but only after all pending writes are done
- update_idle_time($self);
- requeue($self);
+ my $wbuf = $self->{wbuf} ||= [];
+ push @$wbuf, $long_cb;
+
+ # wbuf may be populated by $cb, no need to rearm if so:
+ requeue($self) if scalar(@$wbuf) == 1;
} else { # all done!
- delete $self->{long_res};
+ $long_cb = undef;
res($self, '.');
out($self, " deferred[$fd] done - %0.6f", now() - $t0);
- requeue($self);
+ requeue($self) unless $self->{wbuf};
}
};
- $self->{long_res}->(); # kick off!
+ $self->write($long_cb); # kick off!
undef;
}
@@ -986,9 +977,8 @@ sub event_step {
my $t0 = now();
my $fd = fileno($self->{sock});
$r = eval { process_line($self, $line) };
- my $d = $self->{long_res} ?
- " deferred[$fd]" : '';
- out($self, "[$fd] %s - %0.6f$d", $line, now() - $t0);
+ my $pending = $self->{wbuf} ? ' pending' : '';
+ out($self, "[$fd] %s - %0.6f$pending", $line, now() - $t0);
}
return $self->close if $r < 0;
@@ -998,7 +988,7 @@ sub event_step {
# maybe there's more pipelined data, or we'll have
# to register it for socket-readiness notifications
- requeue($self) unless ($self->{long_res} || $self->{wbuf});
+ requeue($self) unless $self->{wbuf};
}
sub not_idle_long ($$) {
@@ -1012,8 +1002,7 @@ sub not_idle_long ($$) {
# for graceful shutdown in PublicInbox::Daemon:
sub busy {
my ($self, $now) = @_;
- ($self->{rbuf} ne '' || $self->{long_res} ||
- $self->{wbuf} || not_idle_long($self, $now));
+ ($self->{rbuf} ne '' || $self->{wbuf} || not_idle_long($self, $now));
}
1;
--
EW
^ permalink raw reply related [flat|nested] 61+ messages in thread
* [PATCH 48/57] ds: flush_write runs ->write callbacks even if closed
2019-06-24 2:52 [PATCH 00/57] ds: shrink, TLS support, buffer writes to FS Eric Wong
` (46 preceding siblings ...)
2019-06-24 2:52 ` [PATCH 47/57] nntp: simplify long response logic and fix nesting Eric Wong
@ 2019-06-24 2:52 ` Eric Wong
2019-06-24 2:52 ` [PATCH 49/57] nntp: lazily allocate and stash rbuf Eric Wong
` (10 subsequent siblings)
58 siblings, 0 replies; 61+ messages in thread
From: Eric Wong @ 2019-06-24 2:52 UTC (permalink / raw)
To: meta
We may need to rely on cleanup code running in enqueued
callbacks, so ensure we call it when flush_write happens.
---
lib/PublicInbox/DS.pm | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm
index 8f77ce24..d38e2d20 100644
--- a/lib/PublicInbox/DS.pm
+++ b/lib/PublicInbox/DS.pm
@@ -442,13 +442,13 @@ sub psendfile ($$$) {
sub flush_write ($) {
my ($self) = @_;
my $wbuf = $self->{wbuf} or return 1;
- my $sock = $self->{sock} or return 1;
+ my $sock = $self->{sock};
next_buf:
while (my $bref = $wbuf->[0]) {
if (ref($bref) ne 'CODE') {
my $off = delete($self->{wbuf_off}) // 0;
- while (1) {
+ while ($sock) {
my $w = psendfile($sock, $bref, \$off);
if (defined $w) {
if ($w == 0) {
--
EW
^ permalink raw reply related [flat|nested] 61+ messages in thread
* [PATCH 49/57] nntp: lazily allocate and stash rbuf
2019-06-24 2:52 [PATCH 00/57] ds: shrink, TLS support, buffer writes to FS Eric Wong
` (47 preceding siblings ...)
2019-06-24 2:52 ` [PATCH 48/57] ds: flush_write runs ->write callbacks even if closed Eric Wong
@ 2019-06-24 2:52 ` Eric Wong
2019-06-24 2:52 ` [PATCH 50/57] ci: require IO::KQueue on FreeBSD, for now Eric Wong
` (9 subsequent siblings)
58 siblings, 0 replies; 61+ messages in thread
From: Eric Wong @ 2019-06-24 2:52 UTC (permalink / raw)
To: meta
Allocating a per-client buffer up front is unnecessary and
wastes a hash slot. For the majority of (non-malicious)
clients, we won't need to store rbuf in a long-lived object
associated with a client socket at all.
This saves around 10M on 64-bit with 20K connected-but-idle
clients.
---
lib/PublicInbox/NNTP.pm | 10 +++++++---
1 file changed, 7 insertions(+), 3 deletions(-)
diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm
index 6acfcc1b..10a2e158 100644
--- a/lib/PublicInbox/NNTP.pm
+++ b/lib/PublicInbox/NNTP.pm
@@ -107,7 +107,6 @@ sub new ($$$) {
$self->{nntpd} = $nntpd;
push @$wbuf, \&greet;
$self->{wbuf} = $wbuf;
- $self->{rbuf} = '';
update_idle_time($self);
$expt ||= PublicInbox::EvCleanup::later(*expire_old);
$self;
@@ -964,7 +963,7 @@ sub event_step {
# otherwise we can be buffering infinitely w/o backpressure
use constant LINE_MAX => 512; # RFC 977 section 2.3
- my $rbuf = \($self->{rbuf});
+ my $rbuf = $self->{rbuf} // (\(my $x = ''));
my $r = 1;
if (index($$rbuf, "\n") < 0) {
@@ -984,6 +983,11 @@ sub event_step {
return $self->close if $r < 0;
my $len = bytes::length($$rbuf);
return $self->close if ($len >= LINE_MAX);
+ if ($len) {
+ $self->{rbuf} = $rbuf;
+ } else {
+ delete $self->{rbuf};
+ }
update_idle_time($self);
# maybe there's more pipelined data, or we'll have
@@ -1002,7 +1006,7 @@ sub not_idle_long ($$) {
# for graceful shutdown in PublicInbox::Daemon:
sub busy {
my ($self, $now) = @_;
- ($self->{rbuf} ne '' || $self->{wbuf} || not_idle_long($self, $now));
+ ($self->{rbuf} || $self->{wbuf} || not_idle_long($self, $now));
}
1;
--
EW
^ permalink raw reply related [flat|nested] 61+ messages in thread
* [PATCH 50/57] ci: require IO::KQueue on FreeBSD, for now
2019-06-24 2:52 [PATCH 00/57] ds: shrink, TLS support, buffer writes to FS Eric Wong
` (48 preceding siblings ...)
2019-06-24 2:52 ` [PATCH 49/57] nntp: lazily allocate and stash rbuf Eric Wong
@ 2019-06-24 2:52 ` Eric Wong
2019-06-24 2:52 ` [PATCH 51/57] nntp: send greeting immediately for plain sockets Eric Wong
` (8 subsequent siblings)
58 siblings, 0 replies; 61+ messages in thread
From: Eric Wong @ 2019-06-24 2:52 UTC (permalink / raw)
To: meta
We'll likely replace IO::KQueue (at least on FreeBSD) using
a pure-Perl syscall()-based version since syscall numbers are
consistent across architectures on FreeBSD and easy to maintain.
IO::KQueue->EV_SET is also shockingly inefficient in that it
calls kqueue() as much as epoll_ctl.
---
ci/deps.perl | 5 +----
ci/profiles.sh | 3 +--
2 files changed, 2 insertions(+), 6 deletions(-)
diff --git a/ci/deps.perl b/ci/deps.perl
index 62870c1f..ad2c11d2 100755
--- a/ci/deps.perl
+++ b/ci/deps.perl
@@ -60,7 +60,7 @@ my $profiles = {
# account for granularity differences between package systems and OSes
my @precious;
if ($^O eq 'freebsd') {
- @precious = qw(perl curl Socket6 IO::Compress::Gzip);
+ @precious = qw(perl curl Socket6 IO::Compress::Gzip IO::KQueue);
} elsif ($pkg_fmt eq 'rpm') {
@precious = qw(perl curl);
}
@@ -149,9 +149,6 @@ my (@pkg_install, @pkg_remove, %all);
for my $ary (values %$profiles) {
$all{$_} = \@pkg_remove for @$ary;
}
-if ($^O eq 'freebsd') {
- $all{'IO::KQueue'} = \@pkg_remove;
-}
$profiles->{all} = [ keys %all ]; # pseudo-profile for all packages
# parse the profile list from the command-line
diff --git a/ci/profiles.sh b/ci/profiles.sh
index d559ec5f..1ddf7891 100755
--- a/ci/profiles.sh
+++ b/ci/profiles.sh
@@ -54,8 +54,7 @@ esac
case $ID-$VERSION_ID in
freebsd-11|freebsd-12) sed "s/^/$PKG_FMT /" <<EOF
all devtest-
-all devtest IO::KQueue-
-all devtest IO::KQueue
+all devtest
v2essential
essential
essential devtest-
--
EW
^ permalink raw reply related [flat|nested] 61+ messages in thread
* [PATCH 51/57] nntp: send greeting immediately for plain sockets
2019-06-24 2:52 [PATCH 00/57] ds: shrink, TLS support, buffer writes to FS Eric Wong
` (49 preceding siblings ...)
2019-06-24 2:52 ` [PATCH 50/57] ci: require IO::KQueue on FreeBSD, for now Eric Wong
@ 2019-06-24 2:52 ` Eric Wong
2019-06-24 2:52 ` [PATCH 52/57] daemon: set TCP_DEFER_ACCEPT on everything but NNTP Eric Wong
` (7 subsequent siblings)
58 siblings, 0 replies; 61+ messages in thread
From: Eric Wong @ 2019-06-24 2:52 UTC (permalink / raw)
To: meta
A tiny write() for the greeting on a just accept()-ed TCP socket
won't fail with EAGAIN, so we can avoid the extra epoll syscall
traffic with plain sockets.
---
lib/PublicInbox/NNTP.pm | 18 ++++++++++--------
1 file changed, 10 insertions(+), 8 deletions(-)
diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm
index 10a2e158..53e18281 100644
--- a/lib/PublicInbox/NNTP.pm
+++ b/lib/PublicInbox/NNTP.pm
@@ -24,7 +24,7 @@ use constant {
r225 => '225 Headers follow (multi-line)',
r430 => '430 No article with that message-id',
};
-use PublicInbox::Syscall qw(EPOLLOUT EPOLLONESHOT);
+use PublicInbox::Syscall qw(EPOLLIN EPOLLONESHOT);
use Errno qw(EAGAIN);
my @OVERVIEW = qw(Subject From Date Message-ID References Xref);
@@ -96,17 +96,19 @@ sub greet ($) { $_[0]->write($_[0]->{nntpd}->{greet}) };
sub new ($$$) {
my ($class, $sock, $nntpd) = @_;
my $self = fields::new($class);
- my $ev = EPOLLOUT | EPOLLONESHOT;
- my $wbuf = [];
+ my $ev = EPOLLIN;
+ my $wbuf;
if (ref($sock) eq 'IO::Socket::SSL' && !$sock->accept_SSL) {
$ev = PublicInbox::TLS::epollbit() or return CORE::close($sock);
- $ev |= EPOLLONESHOT;
- $wbuf->[0] = \&PublicInbox::DS::accept_tls_step;
+ $wbuf = [ \&PublicInbox::DS::accept_tls_step, \&greet ];
}
- $self->SUPER::new($sock, $ev);
+ $self->SUPER::new($sock, $ev | EPOLLONESHOT);
$self->{nntpd} = $nntpd;
- push @$wbuf, \&greet;
- $self->{wbuf} = $wbuf;
+ if ($wbuf) {
+ $self->{wbuf} = $wbuf;
+ } else {
+ greet($self);
+ }
update_idle_time($self);
$expt ||= PublicInbox::EvCleanup::later(*expire_old);
$self;
--
EW
^ permalink raw reply related [flat|nested] 61+ messages in thread
* [PATCH 52/57] daemon: set TCP_DEFER_ACCEPT on everything but NNTP
2019-06-24 2:52 [PATCH 00/57] ds: shrink, TLS support, buffer writes to FS Eric Wong
` (50 preceding siblings ...)
2019-06-24 2:52 ` [PATCH 51/57] nntp: send greeting immediately for plain sockets Eric Wong
@ 2019-06-24 2:52 ` Eric Wong
2019-06-24 2:52 ` [PATCH 53/57] daemon: use FreeBSD accept filters on non-NNTP Eric Wong
` (6 subsequent siblings)
58 siblings, 0 replies; 61+ messages in thread
From: Eric Wong @ 2019-06-24 2:52 UTC (permalink / raw)
To: meta
This Linux-specific option can save us some wakeups during
the TLS negotiation phase, and it can help with ordinary HTTP,
too.
Plain NNTP (and in the future, POP3) are the only things which
require the server send messages, first.
---
lib/PublicInbox/Daemon.pm | 26 ++++++++++++++++++++++----
t/httpd-corner.t | 19 +++++++++++++++++++
t/httpd.t | 8 ++++++++
t/nntpd-tls.t | 11 ++++++++++-
4 files changed, 59 insertions(+), 5 deletions(-)
diff --git a/lib/PublicInbox/Daemon.pm b/lib/PublicInbox/Daemon.pm
index c4481555..8b59b65f 100644
--- a/lib/PublicInbox/Daemon.pm
+++ b/lib/PublicInbox/Daemon.pm
@@ -8,6 +8,7 @@ use warnings;
use Getopt::Long qw/:config gnu_getopt no_ignore_case auto_abbrev/;
use IO::Handle;
use IO::Socket;
+use Socket qw(IPPROTO_TCP);
use Cwd qw/abs_path/;
STDOUT->autoflush(1);
STDERR->autoflush(1);
@@ -552,6 +553,18 @@ sub tls_start_cb ($$) {
}
}
+sub defer_accept ($) {
+ if ($^O eq 'linux') {
+ my ($s) = @_;
+ my $x = getsockopt($s, IPPROTO_TCP, Socket::TCP_DEFER_ACCEPT());
+ return unless defined $x; # may be Unix socket
+ my $sec = unpack('i', $x);
+ return if $sec > 0; # systemd users may set a higher value
+ setsockopt($s, IPPROTO_TCP, Socket::TCP_DEFER_ACCEPT(), 1);
+ }
+ # TODO FreeBSD accf_http / accf_data
+}
+
sub daemon_loop ($$$) {
my ($refresh, $post_accept, $nntpd) = @_;
PublicInbox::EvCleanup::enable(); # early for $refresh
@@ -581,10 +594,15 @@ sub daemon_loop ($$$) {
$SIG{HUP} = $refresh;
$SIG{CHLD} = 'DEFAULT';
$SIG{$_} = 'IGNORE' for qw(USR2 TTIN TTOU WINCH);
- # this calls epoll_create:
- @listeners = map {
- PublicInbox::Listener->new($_,
- $post_accept{sockname($_)} || $post_accept)
+ @listeners = map {;
+ my $tls_cb = $post_accept{sockname($_)};
+
+ # NNTPS, HTTPS, HTTP, and POP3S are client-first traffic
+ # NNTP and POP3 are server-first
+ defer_accept($_) if $tls_cb || !$nntpd;
+
+ # this calls epoll_create:
+ PublicInbox::Listener->new($_, $tls_cb || $post_accept)
} @listeners;
PublicInbox::DS->EventLoop;
$parent_pipe = undef;
diff --git a/t/httpd-corner.t b/t/httpd-corner.t
index c1dc77db..13befcf1 100644
--- a/t/httpd-corner.t
+++ b/t/httpd-corner.t
@@ -36,6 +36,17 @@ my %opts = (
Listen => 1024,
);
my $sock = IO::Socket::INET->new(%opts);
+my $defer_accept_val;
+if ($^O eq 'linux') {
+ setsockopt($sock, IPPROTO_TCP, Socket::TCP_DEFER_ACCEPT(), 5) or die;
+ my $x = getsockopt($sock, IPPROTO_TCP, Socket::TCP_DEFER_ACCEPT());
+ defined $x or die "getsockopt: $!";
+ $defer_accept_val = unpack('i', $x);
+ if ($defer_accept_val <= 0) {
+ die "unexpected TCP_DEFER_ACCEPT value: $defer_accept_val";
+ }
+}
+
my $upath = "$tmpdir/s";
my $unix = IO::Socket::UNIX->new(
Listen => 1024,
@@ -497,6 +508,14 @@ SKIP: {
is($body, sha1_hex(''), 'read expected body #2');
}
+SKIP: {
+ skip 'TCP_DEFER_ACCEPT is Linux-only', 1 if $^O ne 'linux';
+ my $var = Socket::TCP_DEFER_ACCEPT();
+ defined(my $x = getsockopt($sock, IPPROTO_TCP, $var)) or die;
+ is(unpack('i', $x), $defer_accept_val,
+ 'TCP_DEFER_ACCEPT unchanged if previously set');
+};
+
done_testing();
sub capture {
diff --git a/t/httpd.t b/t/httpd.t
index c061031c..8c2a3173 100644
--- a/t/httpd.t
+++ b/t/httpd.t
@@ -10,6 +10,7 @@ foreach my $mod (qw(Plack::Util Plack::Builder HTTP::Date HTTP::Status)) {
}
use File::Temp qw/tempdir/;
use IO::Socket::INET;
+use Socket qw(IPPROTO_TCP);
require './t/common.perl';
# FIXME: too much setup
@@ -99,6 +100,13 @@ EOF
'fsck on cloned directory successful');
}
+SKIP: {
+ skip 'TCP_DEFER_ACCEPT is Linux-only', 1 if $^O ne 'linux';
+ my $var = Socket::TCP_DEFER_ACCEPT();
+ defined(my $x = getsockopt($sock, IPPROTO_TCP, $var)) or die;
+ ok(unpack('i', $x) > 0, 'TCP_DEFER_ACCEPT set');
+};
+
done_testing();
1;
diff --git a/t/nntpd-tls.t b/t/nntpd-tls.t
index e8fb63b4..ef683cab 100644
--- a/t/nntpd-tls.t
+++ b/t/nntpd-tls.t
@@ -4,7 +4,7 @@ use strict;
use warnings;
use Test::More;
use File::Temp qw(tempdir);
-use Socket qw(SOCK_STREAM);
+use Socket qw(SOCK_STREAM IPPROTO_TCP);
# IO::Poll and Net::NNTP are part of the standard library, but
# distros may split them off...
foreach my $mod (qw(DBD::SQLite IO::Socket::SSL Net::NNTP IO::Poll)) {
@@ -182,6 +182,15 @@ for my $args (
is(sysread($slow, my $eof, 4096), 0, 'got EOF');
$slow = undef;
+ SKIP: {
+ skip 'TCP_DEFER_ACCEPT is Linux-only', 2 if $^O ne 'linux';
+ my $var = Socket::TCP_DEFER_ACCEPT();
+ defined(my $x = getsockopt($nntps, IPPROTO_TCP, $var)) or die;
+ ok(unpack('i', $x) > 0, 'TCP_DEFER_ACCEPT set on NNTPS');
+ defined($x = getsockopt($starttls, IPPROTO_TCP, $var)) or die;
+ is(unpack('i', $x), 0, 'TCP_DEFER_ACCEPT is 0 on plain NNTP');
+ };
+
$c = undef;
kill('TERM', $pid);
is($pid, waitpid($pid, 0), 'nntpd exited successfully');
--
EW
^ permalink raw reply related [flat|nested] 61+ messages in thread
* [PATCH 53/57] daemon: use FreeBSD accept filters on non-NNTP
2019-06-24 2:52 [PATCH 00/57] ds: shrink, TLS support, buffer writes to FS Eric Wong
` (51 preceding siblings ...)
2019-06-24 2:52 ` [PATCH 52/57] daemon: set TCP_DEFER_ACCEPT on everything but NNTP Eric Wong
@ 2019-06-24 2:52 ` Eric Wong
2019-06-24 2:52 ` [PATCH 54/57] ds: split out IO::KQueue-specific code Eric Wong
` (5 subsequent siblings)
58 siblings, 0 replies; 61+ messages in thread
From: Eric Wong @ 2019-06-24 2:52 UTC (permalink / raw)
To: meta
Similar to TCP_DEFER_ACCEPT on Linux, FreeBSD has a 'dataready'
accept filter which we can use to reduce wakeups when doing
TLS negotiation or plain HTTP. There's also a 'httpready'
which we can use for plain HTTP connections.
---
lib/PublicInbox/Daemon.pm | 23 +++++++++++++++--------
t/httpd-corner.t | 21 ++++++++++++++++++---
t/httpd.t | 10 ++++++++++
t/nntpd-tls.t | 14 +++++++++++++-
4 files changed, 56 insertions(+), 12 deletions(-)
diff --git a/lib/PublicInbox/Daemon.pm b/lib/PublicInbox/Daemon.pm
index 8b59b65f..cf011a20 100644
--- a/lib/PublicInbox/Daemon.pm
+++ b/lib/PublicInbox/Daemon.pm
@@ -8,7 +8,8 @@ use warnings;
use Getopt::Long qw/:config gnu_getopt no_ignore_case auto_abbrev/;
use IO::Handle;
use IO::Socket;
-use Socket qw(IPPROTO_TCP);
+use Socket qw(IPPROTO_TCP SOL_SOCKET);
+sub SO_ACCEPTFILTER () { 0x1000 }
use Cwd qw/abs_path/;
STDOUT->autoflush(1);
STDERR->autoflush(1);
@@ -553,20 +554,25 @@ sub tls_start_cb ($$) {
}
}
-sub defer_accept ($) {
+sub defer_accept ($$) {
+ my ($s, $af_name) = @_;
+ return unless defined $af_name;
if ($^O eq 'linux') {
- my ($s) = @_;
my $x = getsockopt($s, IPPROTO_TCP, Socket::TCP_DEFER_ACCEPT());
return unless defined $x; # may be Unix socket
my $sec = unpack('i', $x);
return if $sec > 0; # systemd users may set a higher value
setsockopt($s, IPPROTO_TCP, Socket::TCP_DEFER_ACCEPT(), 1);
+ } elsif ($^O eq 'freebsd') {
+ my $x = getsockopt($s, SOL_SOCKET, SO_ACCEPTFILTER);
+ return if defined $x; # don't change if set
+ my $accf_arg = pack('a16a240', $af_name, '');
+ setsockopt($s, SOL_SOCKET, SO_ACCEPTFILTER, $accf_arg);
}
- # TODO FreeBSD accf_http / accf_data
}
-sub daemon_loop ($$$) {
- my ($refresh, $post_accept, $nntpd) = @_;
+sub daemon_loop ($$$$) {
+ my ($refresh, $post_accept, $nntpd, $af_default) = @_;
PublicInbox::EvCleanup::enable(); # early for $refresh
my %post_accept;
while (my ($k, $v) = each %tls_opt) {
@@ -599,7 +605,7 @@ sub daemon_loop ($$$) {
# NNTPS, HTTPS, HTTP, and POP3S are client-first traffic
# NNTP and POP3 are server-first
- defer_accept($_) if $tls_cb || !$nntpd;
+ defer_accept($_, $tls_cb ? 'dataready' : $af_default);
# this calls epoll_create:
PublicInbox::Listener->new($_, $tls_cb || $post_accept)
@@ -612,8 +618,9 @@ sub daemon_loop ($$$) {
sub run ($$$;$) {
my ($default, $refresh, $post_accept, $nntpd) = @_;
daemon_prepare($default);
+ my $af_default = $default =~ /:8080\z/ ? 'httpready' : undef;
daemonize();
- daemon_loop($refresh, $post_accept, $nntpd);
+ daemon_loop($refresh, $post_accept, $nntpd, $af_default);
}
sub do_chown ($) {
diff --git a/t/httpd-corner.t b/t/httpd-corner.t
index 13befcf1..1cfc2565 100644
--- a/t/httpd-corner.t
+++ b/t/httpd-corner.t
@@ -18,7 +18,7 @@ use File::Temp qw/tempdir/;
use IO::Socket;
use IO::Socket::UNIX;
use Fcntl qw(:seek);
-use Socket qw(IPPROTO_TCP TCP_NODELAY);
+use Socket qw(IPPROTO_TCP TCP_NODELAY SOL_SOCKET);
use POSIX qw(mkfifo);
require './t/common.perl';
my $tmpdir = tempdir('httpd-corner-XXXXXX', TMPDIR => 1, CLEANUP => 1);
@@ -36,7 +36,10 @@ my %opts = (
Listen => 1024,
);
my $sock = IO::Socket::INET->new(%opts);
-my $defer_accept_val;
+
+# Make sure we don't clobber socket options set by systemd or similar
+# using socket activation:
+my ($defer_accept_val, $accf_arg);
if ($^O eq 'linux') {
setsockopt($sock, IPPROTO_TCP, Socket::TCP_DEFER_ACCEPT(), 5) or die;
my $x = getsockopt($sock, IPPROTO_TCP, Socket::TCP_DEFER_ACCEPT());
@@ -45,6 +48,11 @@ if ($^O eq 'linux') {
if ($defer_accept_val <= 0) {
die "unexpected TCP_DEFER_ACCEPT value: $defer_accept_val";
}
+} elsif ($^O eq 'freebsd' && system('kldstat -m accf_data >/dev/null') == 0) {
+ require PublicInbox::Daemon;
+ my $var = PublicInbox::Daemon::SO_ACCEPTFILTER();
+ $accf_arg = pack('a16a240', 'dataready', '');
+ setsockopt($sock, SOL_SOCKET, $var, $accf_arg) or die "setsockopt: $!";
}
my $upath = "$tmpdir/s";
@@ -100,7 +108,7 @@ my $spawn_httpd = sub {
is(scalar(grep(/CLOSE FAIL/, @$after)), 1, 'body->close not called');
}
-{
+SKIP: {
my $conn = conn_for($sock, 'excessive header');
$SIG{PIPE} = 'IGNORE';
$conn->write("GET /callback HTTP/1.0\r\n");
@@ -515,6 +523,13 @@ SKIP: {
is(unpack('i', $x), $defer_accept_val,
'TCP_DEFER_ACCEPT unchanged if previously set');
};
+SKIP: {
+ skip 'SO_ACCEPTFILTER is FreeBSD-only', 1 if $^O ne 'freebsd';
+ skip 'accf_data not loaded: kldload accf_data' if !defined $accf_arg;
+ my $var = PublicInbox::Daemon::SO_ACCEPTFILTER();
+ defined(my $x = getsockopt($sock, SOL_SOCKET, $var)) or die;
+ is($x, $accf_arg, 'SO_ACCEPTFILTER unchanged if previously set');
+};
done_testing();
diff --git a/t/httpd.t b/t/httpd.t
index 8c2a3173..e085c4b9 100644
--- a/t/httpd.t
+++ b/t/httpd.t
@@ -106,6 +106,16 @@ SKIP: {
defined(my $x = getsockopt($sock, IPPROTO_TCP, $var)) or die;
ok(unpack('i', $x) > 0, 'TCP_DEFER_ACCEPT set');
};
+SKIP: {
+ skip 'SO_ACCEPTFILTER is FreeBSD-only', 1 if $^O ne 'freebsd';
+ if (system('kldstat -m accf_http >/dev/null') != 0) {
+ skip 'accf_http not loaded: kldload accf_http', 1;
+ }
+ require PublicInbox::Daemon;
+ my $var = PublicInbox::Daemon::SO_ACCEPTFILTER();
+ my $x = getsockopt($sock, SOL_SOCKET, $var);
+ like($x, qr/\Ahttpready\0+\z/, 'got httpready accf for HTTP');
+};
done_testing();
diff --git a/t/nntpd-tls.t b/t/nntpd-tls.t
index ef683cab..427d370f 100644
--- a/t/nntpd-tls.t
+++ b/t/nntpd-tls.t
@@ -4,7 +4,7 @@ use strict;
use warnings;
use Test::More;
use File::Temp qw(tempdir);
-use Socket qw(SOCK_STREAM IPPROTO_TCP);
+use Socket qw(SOCK_STREAM IPPROTO_TCP SOL_SOCKET);
# IO::Poll and Net::NNTP are part of the standard library, but
# distros may split them off...
foreach my $mod (qw(DBD::SQLite IO::Socket::SSL Net::NNTP IO::Poll)) {
@@ -190,6 +190,18 @@ for my $args (
defined($x = getsockopt($starttls, IPPROTO_TCP, $var)) or die;
is(unpack('i', $x), 0, 'TCP_DEFER_ACCEPT is 0 on plain NNTP');
};
+ SKIP: {
+ skip 'SO_ACCEPTFILTER is FreeBSD-only', 2 if $^O ne 'freebsd';
+ if (system('kldstat -m accf_data >/dev/null')) {
+ skip 'accf_data not loaded? kldload accf_data', 2;
+ }
+ require PublicInbox::Daemon;
+ my $var = PublicInbox::Daemon::SO_ACCEPTFILTER();
+ my $x = getsockopt($nntps, SOL_SOCKET, $var);
+ like($x, qr/\Adataready\0+\z/, 'got dataready accf for NNTPS');
+ $x = getsockopt($starttls, IPPROTO_TCP, $var);
+ is($x, undef, 'no BSD accept filter for plain NNTP');
+ };
$c = undef;
kill('TERM', $pid);
--
EW
^ permalink raw reply related [flat|nested] 61+ messages in thread
* [PATCH 54/57] ds: split out IO::KQueue-specific code
2019-06-24 2:52 [PATCH 00/57] ds: shrink, TLS support, buffer writes to FS Eric Wong
` (52 preceding siblings ...)
2019-06-24 2:52 ` [PATCH 53/57] daemon: use FreeBSD accept filters on non-NNTP Eric Wong
@ 2019-06-24 2:52 ` Eric Wong
2019-06-24 5:24 ` Eric Wong
2019-06-24 2:52 ` [PATCH 55/57] ds: reimplement IO::Poll support to look like epoll Eric Wong
` (4 subsequent siblings)
58 siblings, 1 reply; 61+ messages in thread
From: Eric Wong @ 2019-06-24 2:52 UTC (permalink / raw)
To: meta
We don't need to code multiple event loops or have branches in
watch() if we can easily make the IO::KQueue-based interface
look like our lower-level epoll_* API.
---
MANIFEST | 1 +
lib/PublicInbox/DS.pm | 121 ++++++++-----------------------------
lib/PublicInbox/DSKQXS.pm | 73 ++++++++++++++++++++++
lib/PublicInbox/Syscall.pm | 9 +--
4 files changed, 99 insertions(+), 105 deletions(-)
create mode 100644 lib/PublicInbox/DSKQXS.pm
diff --git a/MANIFEST b/MANIFEST
index 26ff0d0d..52c4790e 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -77,6 +77,7 @@ lib/PublicInbox/Cgit.pm
lib/PublicInbox/Config.pm
lib/PublicInbox/ContentId.pm
lib/PublicInbox/DS.pm
+lib/PublicInbox/DSKQXS.pm
lib/PublicInbox/Daemon.pm
lib/PublicInbox/Emergency.pm
lib/PublicInbox/EvCleanup.pm
diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm
index d38e2d20..d6ef0b8d 100644
--- a/lib/PublicInbox/DS.pm
+++ b/lib/PublicInbox/DS.pm
@@ -36,14 +36,9 @@ use Errno qw(EAGAIN EINVAL);
use Carp qw(croak confess carp);
use File::Temp qw(tempfile);
-our $HAVE_KQUEUE = eval { require IO::KQueue; IO::KQueue->import; 1 };
-
our (
- $HaveEpoll, # Flag -- is epoll available? initially undefined.
- $HaveKQueue,
%DescriptorMap, # fd (num) -> PublicInbox::DS object
- $Epoll, # Global epoll fd (for epoll mode only)
- $KQueue, # Global kqueue fd ref (for kqueue mode only)
+ $Epoll, # Global epoll fd (or DSKQXS ref)
$_io, # IO::Handle for Epoll
@ToClose, # sockets to close when event loop is done
@@ -74,13 +69,8 @@ sub Reset {
$PostLoopCallback = undef;
$DoneInit = 0;
- # NOTE kqueue is close-on-fork, and we don't account for it, yet
- # OTOH, we (public-inbox) don't need this sub outside of tests...
- POSIX::close($$KQueue) if !$_io && $KQueue && $$KQueue >= 0;
- $KQueue = undef;
-
- $_io = undef; # close $Epoll
- $Epoll = undef;
+ $_io = undef; # closes real $Epoll FD
+ $Epoll = undef; # may call DSKQXS::DESTROY
*EventLoop = *FirstTimeEventLoop;
}
@@ -152,21 +142,17 @@ sub _InitPoller
return if $DoneInit;
$DoneInit = 1;
- if ($HAVE_KQUEUE) {
- $KQueue = IO::KQueue->new();
- $HaveKQueue = defined $KQueue;
- if ($HaveKQueue) {
- *EventLoop = *KQueueEventLoop;
- }
- }
- elsif (PublicInbox::Syscall::epoll_defined()) {
- $Epoll = eval { epoll_create(1024); };
- $HaveEpoll = defined $Epoll && $Epoll >= 0;
- if ($HaveEpoll) {
- set_cloexec($Epoll);
- *EventLoop = *EpollEventLoop;
- }
+ if (!PublicInbox::Syscall::epoll_defined()) {
+ $Epoll = eval {
+ require PublicInbox::DSKQXS;
+ PublicInbox::DSKQXS->import;
+ PublicInbox::DSKQXS->new;
+ };
+ } else {
+ $Epoll = epoll_create();
+ set_cloexec($Epoll) if (defined($Epoll) && $Epoll >= 0);
}
+ *EventLoop = *EpollEventLoop;
}
=head2 C<< CLASS->EventLoop() >>
@@ -180,11 +166,7 @@ sub FirstTimeEventLoop {
_InitPoller();
- if ($HaveEpoll) {
- EpollEventLoop($class);
- } elsif ($HaveKQueue) {
- KQueueEventLoop($class);
- }
+ EventLoop($class);
}
sub now () { clock_gettime(CLOCK_MONOTONIC) }
@@ -218,11 +200,7 @@ sub RunTimers {
return $timeout;
}
-### The epoll-based event loop. Gets installed as EventLoop if IO::Epoll loads
-### okay.
sub EpollEventLoop {
- my $class = shift;
-
while (1) {
my @events;
my $i;
@@ -241,30 +219,6 @@ sub EpollEventLoop {
}
}
-### The kqueue-based event loop. Gets installed as EventLoop if IO::KQueue works
-### okay.
-sub KQueueEventLoop {
- my $class = shift;
-
- while (1) {
- my $timeout = RunTimers();
- my @ret = eval { $KQueue->kevent($timeout) };
- if (my $err = $@) {
- # workaround https://rt.cpan.org/Ticket/Display.html?id=116615
- if ($err =~ /Interrupted system call/) {
- @ret = ();
- } else {
- die $err;
- }
- }
-
- foreach my $kev (@ret) {
- $DescriptorMap{$kev->[0]}->event_step;
- }
- return unless PostEventLoop();
- }
-}
-
=head2 C<< CLASS->SetPostLoopCallback( CODEREF ) >>
Sets post loop callback function. Pass a subref and it will be
@@ -314,17 +268,6 @@ sub PostEventLoop {
return $keep_running;
}
-# map EPOLL* bits to kqueue EV_* flags for EV_SET
-sub kq_flag ($$) {
- my ($bit, $ev) = @_;
- if ($ev & $bit) {
- my $fl = EV_ADD() | EV_ENABLE();
- ($ev & EPOLLONESHOT) ? ($fl|EV_ONESHOT()) : $fl;
- } else {
- EV_ADD() | EV_DISABLE();
- }
-}
-
#####################################################################
### PublicInbox::DS-the-object code
#####################################################################
@@ -353,21 +296,13 @@ sub new {
_InitPoller();
- if ($HaveEpoll) {
-retry:
- if (epoll_ctl($Epoll, EPOLL_CTL_ADD, $fd, $ev)) {
- if ($! == EINVAL && ($ev & EPOLLEXCLUSIVE)) {
- $ev &= ~EPOLLEXCLUSIVE;
- goto retry;
- }
- die "couldn't add epoll watch for $fd: $!\n";
+ if (epoll_ctl($Epoll, EPOLL_CTL_ADD, $fd, $ev)) {
+ if ($! == EINVAL && ($ev & EPOLLEXCLUSIVE)) {
+ $ev &= ~EPOLLEXCLUSIVE;
+ goto retry;
}
+ die "couldn't add epoll watch for $fd: $!\n";
}
- elsif ($HaveKQueue) {
- $KQueue->EV_SET($fd, EVFILT_READ(), kq_flag(EPOLLIN, $ev));
- $KQueue->EV_SET($fd, EVFILT_WRITE(), kq_flag(EPOLLOUT, $ev));
- }
-
Carp::cluck("PublicInbox::DS::new blowing away existing descriptor map for fd=$fd ($DescriptorMap{$fd})")
if $DescriptorMap{$fd};
@@ -396,11 +331,9 @@ sub close {
# if we're using epoll, we have to remove this from our epoll fd so we stop getting
# notifications about it
- if ($HaveEpoll) {
- my $fd = fileno($sock);
- epoll_ctl($Epoll, EPOLL_CTL_DEL, $fd, 0) and
- confess("EPOLL_CTL_DEL: $!");
- }
+ my $fd = fileno($sock);
+ epoll_ctl($Epoll, EPOLL_CTL_DEL, $fd, 0) and
+ confess("EPOLL_CTL_DEL: $!");
# we explicitly don't delete from DescriptorMap here until we
# actually close the socket, as we might be in the middle of
@@ -596,14 +529,8 @@ sub msg_more ($$) {
sub watch ($$) {
my ($self, $ev) = @_;
my $sock = $self->{sock} or return;
- my $fd = fileno($sock);
- if ($HaveEpoll) {
- epoll_ctl($Epoll, EPOLL_CTL_MOD, $fd, $ev) and
- confess("EPOLL_CTL_MOD $!");
- } elsif ($HaveKQueue) {
- $KQueue->EV_SET($fd, EVFILT_READ(), kq_flag(EPOLLIN, $ev));
- $KQueue->EV_SET($fd, EVFILT_WRITE(), kq_flag(EPOLLOUT, $ev));
- }
+ epoll_ctl($Epoll, EPOLL_CTL_MOD, fileno($sock), $ev) and
+ confess("EPOLL_CTL_MOD $!");
0;
}
diff --git a/lib/PublicInbox/DSKQXS.pm b/lib/PublicInbox/DSKQXS.pm
new file mode 100644
index 00000000..38e13446
--- /dev/null
+++ b/lib/PublicInbox/DSKQXS.pm
@@ -0,0 +1,73 @@
+# Copyright (C) 2019 all contributors <meta@public-inbox.org>
+# Licensed the same as Danga::Socket (and Perl5)
+# License: GPL-1.0+ or Artistic-1.0-Perl
+# <https://www.gnu.org/licenses/gpl-1.0.txt>
+# <https://dev.perl.org/licenses/artistic.html>
+#
+# kqueue support via IO::KQueue XS module. This makes kqueue look
+# like epoll to simplify the code in DS.pm. This is NOT meant to be
+# an all encompassing emulation of epoll via IO::KQueue, but just to
+# support cases public-inbox-nntpd/httpd care about.
+# A pure-Perl version using syscall() is planned, and it should be
+# faster due to the lack of syscall overhead.
+package PublicInbox::DSKQXS;
+use strict;
+use warnings;
+use parent qw(IO::KQueue);
+use parent qw(Exporter);
+use IO::KQueue;
+use PublicInbox::Syscall qw(EPOLLONESHOT EPOLLIN EPOLLOUT EPOLL_CTL_DEL);
+our @EXPORT = qw(epoll_ctl epoll_wait);
+my $owner_pid = -1; # kqueue is close-on-fork (yes, fork, not exec)
+
+# map EPOLL* bits to kqueue EV_* flags for EV_SET
+sub kq_flag ($$) {
+ my ($bit, $ev) = @_;
+ if ($ev & $bit) {
+ my $fl = EV_ADD | EV_ENABLE;
+ ($ev & EPOLLONESHOT) ? ($fl | EV_ONESHOT) : $fl;
+ } else {
+ EV_ADD | EV_DISABLE;
+ }
+}
+
+sub new {
+ my ($class) = @_;
+ die 'non-singleton use not supported' if $owner_pid == $$;
+ $owner_pid = $$;
+ $class->SUPER::new;
+}
+
+sub epoll_ctl {
+ my ($self, $op, $fd, $ev) = @_;
+ if ($op != EPOLL_CTL_DEL) {
+ $self->EV_SET($fd, EVFILT_READ, kq_flag(EPOLLIN, $ev));
+ $self->EV_SET($fd, EVFILT_WRITE, kq_flag(EPOLLOUT, $ev));
+ }
+ 0;
+}
+
+sub epoll_wait {
+ my ($self, $maxevents, $timeout_msec, $events) = @_;
+ @$events = eval { $self->kevent($timeout_msec) };
+ if (my $err = $@) {
+ # workaround https://rt.cpan.org/Ticket/Display.html?id=116615
+ if ($err =~ /Interrupted system call/) {
+ @$events = ();
+ } else {
+ die $err;
+ }
+ }
+ # caller only cares for $events[$i]->[0]
+ scalar(@$events);
+}
+
+sub DESTROY {
+ my ($self) = @_;
+ if ($owner_pid == $$) {
+ POSIX::close($$self);
+ $owner_pid = -1;
+ }
+}
+
+1;
diff --git a/lib/PublicInbox/Syscall.pm b/lib/PublicInbox/Syscall.pm
index f1988e61..f53f3c82 100644
--- a/lib/PublicInbox/Syscall.pm
+++ b/lib/PublicInbox/Syscall.pm
@@ -248,14 +248,7 @@ sub sendfile_freebsd {
sub epoll_defined { return $SYS_epoll_create ? 1 : 0; }
-# ARGS: (size) -- but in modern Linux 2.6, the
-# size doesn't even matter (radix tree now, not hash)
-sub epoll_create {
- return -1 unless defined $SYS_epoll_create;
- my $epfd = eval { syscall($SYS_epoll_create, $no_deprecated ? 0 : ($_[0]||100)+0) };
- return -1 if $@;
- return $epfd;
-}
+sub epoll_create { syscall($SYS_epoll_create, 0) }
# epoll_ctl wrapper
# ARGS: (epfd, op, fd, events_mask)
--
EW
^ permalink raw reply related [flat|nested] 61+ messages in thread
* Re: [PATCH 54/57] ds: split out IO::KQueue-specific code
2019-06-24 2:52 ` [PATCH 54/57] ds: split out IO::KQueue-specific code Eric Wong
@ 2019-06-24 5:24 ` Eric Wong
0 siblings, 0 replies; 61+ messages in thread
From: Eric Wong @ 2019-06-24 5:24 UTC (permalink / raw)
To: meta
Eric Wong <e@80x24.org> wrote:
> diff --git a/lib/PublicInbox/Syscall.pm b/lib/PublicInbox/Syscall.pm
> index f1988e61..f53f3c82 100644
> --- a/lib/PublicInbox/Syscall.pm
> +++ b/lib/PublicInbox/Syscall.pm
> @@ -248,14 +248,7 @@ sub sendfile_freebsd {
>
> sub epoll_defined { return $SYS_epoll_create ? 1 : 0; }
>
> -# ARGS: (size) -- but in modern Linux 2.6, the
> -# size doesn't even matter (radix tree now, not hash)
> -sub epoll_create {
> - return -1 unless defined $SYS_epoll_create;
> - my $epfd = eval { syscall($SYS_epoll_create, $no_deprecated ? 0 : ($_[0]||100)+0) };
> - return -1 if $@;
> - return $epfd;
> -}
> +sub epoll_create { syscall($SYS_epoll_create, 0) }
Oops, that wasn't tested on Linux, actually :x I got too
focused on FreeBSD-related improvements that I forgot to test
on the OS I mainly use :x
Will squash this before pushing
diff --git a/lib/PublicInbox/Syscall.pm b/lib/PublicInbox/Syscall.pm
index f53f3c8..500efa6 100644
--- a/lib/PublicInbox/Syscall.pm
+++ b/lib/PublicInbox/Syscall.pm
@@ -248,7 +248,9 @@ sub sendfile_freebsd {
sub epoll_defined { return $SYS_epoll_create ? 1 : 0; }
-sub epoll_create { syscall($SYS_epoll_create, 0) }
+sub epoll_create {
+ syscall($SYS_epoll_create, $no_deprecated ? 0 : ($_[0]||100)+0);
+}
# epoll_ctl wrapper
# ARGS: (epfd, op, fd, events_mask)
^ permalink raw reply related [flat|nested] 61+ messages in thread
* [PATCH 55/57] ds: reimplement IO::Poll support to look like epoll
2019-06-24 2:52 [PATCH 00/57] ds: shrink, TLS support, buffer writes to FS Eric Wong
` (53 preceding siblings ...)
2019-06-24 2:52 ` [PATCH 54/57] ds: split out IO::KQueue-specific code Eric Wong
@ 2019-06-24 2:52 ` Eric Wong
2019-06-24 2:52 ` [PATCH 56/57] Revert "ci: require IO::KQueue on FreeBSD, for now" Eric Wong
` (3 subsequent siblings)
58 siblings, 0 replies; 61+ messages in thread
From: Eric Wong @ 2019-06-24 2:52 UTC (permalink / raw)
To: meta
At least the subset of epoll we use. EPOLLET might be
difficult to emulate if we end up using it.
---
MANIFEST | 2 ++
lib/PublicInbox/DS.pm | 16 ++++++-----
lib/PublicInbox/DSPoll.pm | 58 +++++++++++++++++++++++++++++++++++++++
t/ds-poll.t | 58 +++++++++++++++++++++++++++++++++++++++
4 files changed, 127 insertions(+), 7 deletions(-)
create mode 100644 lib/PublicInbox/DSPoll.pm
create mode 100644 t/ds-poll.t
diff --git a/MANIFEST b/MANIFEST
index 52c4790e..29920953 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -78,6 +78,7 @@ lib/PublicInbox/Config.pm
lib/PublicInbox/ContentId.pm
lib/PublicInbox/DS.pm
lib/PublicInbox/DSKQXS.pm
+lib/PublicInbox/DSPoll.pm
lib/PublicInbox/Daemon.pm
lib/PublicInbox/Emergency.pm
lib/PublicInbox/EvCleanup.pm
@@ -191,6 +192,7 @@ t/content_id.t
t/convert-compact.t
t/data/0001.patch
t/ds-leak.t
+t/ds-poll.t
t/edit.t
t/emergency.t
t/fail-bin/spamc
diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm
index d6ef0b8d..e3479e66 100644
--- a/lib/PublicInbox/DS.pm
+++ b/lib/PublicInbox/DS.pm
@@ -142,15 +142,17 @@ sub _InitPoller
return if $DoneInit;
$DoneInit = 1;
- if (!PublicInbox::Syscall::epoll_defined()) {
- $Epoll = eval {
- require PublicInbox::DSKQXS;
- PublicInbox::DSKQXS->import;
- PublicInbox::DSKQXS->new;
- };
- } else {
+ if (PublicInbox::Syscall::epoll_defined()) {
$Epoll = epoll_create();
set_cloexec($Epoll) if (defined($Epoll) && $Epoll >= 0);
+ } else {
+ my $cls;
+ for (qw(DSKQXS DSPoll)) {
+ $cls = "PublicInbox::$_";
+ last if eval "require $cls";
+ }
+ $cls->import;
+ $Epoll = $cls->new;
}
*EventLoop = *EpollEventLoop;
}
diff --git a/lib/PublicInbox/DSPoll.pm b/lib/PublicInbox/DSPoll.pm
new file mode 100644
index 00000000..e65640a8
--- /dev/null
+++ b/lib/PublicInbox/DSPoll.pm
@@ -0,0 +1,58 @@
+# Copyright (C) 2019 all contributors <meta@public-inbox.org>
+# Licensed the same as Danga::Socket (and Perl5)
+# License: GPL-1.0+ or Artistic-1.0-Perl
+# <https://www.gnu.org/licenses/gpl-1.0.txt>
+# <https://dev.perl.org/licenses/artistic.html>
+#
+# poll(2) via IO::Poll core module. This makes poll look
+# like epoll to simplify the code in DS.pm. This is NOT meant to be
+# an all encompassing emulation of epoll via IO::Poll, but just to
+# support cases public-inbox-nntpd/httpd care about.
+package PublicInbox::DSPoll;
+use strict;
+use warnings;
+use parent qw(Exporter);
+use IO::Poll;
+use PublicInbox::Syscall qw(EPOLLONESHOT EPOLLIN EPOLLOUT EPOLL_CTL_DEL);
+our @EXPORT = qw(epoll_ctl epoll_wait);
+
+sub new { bless {}, $_[0] } # fd => events
+
+sub epoll_ctl {
+ my ($self, $op, $fd, $ev) = @_;
+
+ # not wasting time on error checking
+ if ($op != EPOLL_CTL_DEL) {
+ $self->{$fd} = $ev;
+ } else {
+ delete $self->{$fd};
+ }
+ 0;
+}
+
+sub epoll_wait {
+ my ($self, $maxevents, $timeout_msec, $events) = @_;
+ my @pset;
+ while (my ($fd, $events) = each %$self) {
+ my $pevents = $events & EPOLLIN ? POLLIN : 0;
+ $pevents |= $events & EPOLLOUT ? POLLOUT : 0;
+ push(@pset, $fd, $pevents);
+ }
+ @$events = ();
+ my $n = IO::Poll::_poll($timeout_msec, @pset);
+ if ($n >= 0) {
+ for (my $i = 0; $i < @pset; ) {
+ my $fd = $pset[$i++];
+ my $revents = $pset[$i++] or next;
+ delete($self->{$fd}) if $self->{$fd} & EPOLLONESHOT;
+ push @$events, [ $fd ];
+ }
+ my $nevents = scalar @$events;
+ if ($n != $nevents) {
+ warn "BUG? poll() returned $n, but got $nevents";
+ }
+ }
+ $n;
+}
+
+1;
diff --git a/t/ds-poll.t b/t/ds-poll.t
new file mode 100644
index 00000000..a397ee06
--- /dev/null
+++ b/t/ds-poll.t
@@ -0,0 +1,58 @@
+# Copyright (C) 2019 all contributors <meta@public-inbox.org>
+# Licensed the same as Danga::Socket (and Perl5)
+# License: GPL-1.0+ or Artistic-1.0-Perl
+# <https://www.gnu.org/licenses/gpl-1.0.txt>
+# <https://dev.perl.org/licenses/artistic.html>
+use strict;
+use warnings;
+use Test::More;
+use PublicInbox::Syscall qw(:epoll);
+my $cls = 'PublicInbox::DSPoll';
+use_ok $cls;
+my $p = $cls->new;
+
+my ($r, $w, $x, $y);
+pipe($r, $w) or die;
+pipe($x, $y) or die;
+is(epoll_ctl($p, EPOLL_CTL_ADD, fileno($r), EPOLLIN), 0, 'add EPOLLIN');
+my $events = [];
+my $n = epoll_wait($p, 9, 0, $events);
+is_deeply($events, [], 'no events set');
+is($n, 0, 'nothing ready, yet');
+is(epoll_ctl($p, EPOLL_CTL_ADD, fileno($w), EPOLLOUT|EPOLLONESHOT), 0,
+ 'add EPOLLOUT|EPOLLONESHOT');
+$n = epoll_wait($p, 9, -1, $events);
+is($n, 1, 'got POLLOUT event');
+is($events->[0]->[0], fileno($w), '$w ready');
+
+$n = epoll_wait($p, 9, 0, $events);
+is($n, 0, 'nothing ready after oneshot');
+is_deeply($events, [], 'no events set after oneshot');
+
+syswrite($w, '1') == 1 or die;
+for my $t (0..1) {
+ $n = epoll_wait($p, 9, $t, $events);
+ is($events->[0]->[0], fileno($r), "level-trigger POLLIN ready #$t");
+ is($n, 1, "only event ready #$t");
+}
+syswrite($y, '1') == 1 or die;
+is(epoll_ctl($p, EPOLL_CTL_ADD, fileno($x), EPOLLIN|EPOLLONESHOT), 0,
+ 'EPOLLIN|EPOLLONESHOT add');
+is(epoll_wait($p, 9, -1, $events), 2, 'epoll_wait has 2 ready');
+my @fds = sort(map { $_->[0] } @$events);
+my @exp = sort((fileno($r), fileno($x)));
+is_deeply(\@fds, \@exp, 'got both ready FDs');
+
+# EPOLL_CTL_DEL doesn't matter for kqueue, we do it in native epoll
+# to avoid a kernel-wide lock; but its not needed for native kqueue
+# paths so DSKQXS makes it a noop (as did Danga::Socket::close).
+SKIP: {
+ if ($cls ne 'PublicInbox::DSPoll') {
+ skip "$cls doesn't handle EPOLL_CTL_DEL", 2;
+ }
+ is(epoll_ctl($p, EPOLL_CTL_DEL, fileno($r), 0), 0, 'EPOLL_CTL_DEL OK');
+ $n = epoll_wait($p, 9, 0, $events);
+ is($n, 0, 'nothing ready after EPOLL_CTL_DEL');
+};
+
+done_testing;
--
EW
^ permalink raw reply related [flat|nested] 61+ messages in thread
* [PATCH 56/57] Revert "ci: require IO::KQueue on FreeBSD, for now"
2019-06-24 2:52 [PATCH 00/57] ds: shrink, TLS support, buffer writes to FS Eric Wong
` (54 preceding siblings ...)
2019-06-24 2:52 ` [PATCH 55/57] ds: reimplement IO::Poll support to look like epoll Eric Wong
@ 2019-06-24 2:52 ` Eric Wong
2019-06-24 2:52 ` [PATCH 57/57] ds: reduce overhead of tempfile creation Eric Wong
` (2 subsequent siblings)
58 siblings, 0 replies; 61+ messages in thread
From: Eric Wong @ 2019-06-24 2:52 UTC (permalink / raw)
To: meta
Now that we support IO::Poll once again, we can remove
the IO::KQueue requirement.
---
ci/deps.perl | 5 ++++-
ci/profiles.sh | 3 ++-
2 files changed, 6 insertions(+), 2 deletions(-)
diff --git a/ci/deps.perl b/ci/deps.perl
index ad2c11d2..62870c1f 100755
--- a/ci/deps.perl
+++ b/ci/deps.perl
@@ -60,7 +60,7 @@ my $profiles = {
# account for granularity differences between package systems and OSes
my @precious;
if ($^O eq 'freebsd') {
- @precious = qw(perl curl Socket6 IO::Compress::Gzip IO::KQueue);
+ @precious = qw(perl curl Socket6 IO::Compress::Gzip);
} elsif ($pkg_fmt eq 'rpm') {
@precious = qw(perl curl);
}
@@ -149,6 +149,9 @@ my (@pkg_install, @pkg_remove, %all);
for my $ary (values %$profiles) {
$all{$_} = \@pkg_remove for @$ary;
}
+if ($^O eq 'freebsd') {
+ $all{'IO::KQueue'} = \@pkg_remove;
+}
$profiles->{all} = [ keys %all ]; # pseudo-profile for all packages
# parse the profile list from the command-line
diff --git a/ci/profiles.sh b/ci/profiles.sh
index 1ddf7891..d559ec5f 100755
--- a/ci/profiles.sh
+++ b/ci/profiles.sh
@@ -54,7 +54,8 @@ esac
case $ID-$VERSION_ID in
freebsd-11|freebsd-12) sed "s/^/$PKG_FMT /" <<EOF
all devtest-
-all devtest
+all devtest IO::KQueue-
+all devtest IO::KQueue
v2essential
essential
essential devtest-
--
EW
^ permalink raw reply related [flat|nested] 61+ messages in thread
* [PATCH 57/57] ds: reduce overhead of tempfile creation
2019-06-24 2:52 [PATCH 00/57] ds: shrink, TLS support, buffer writes to FS Eric Wong
` (55 preceding siblings ...)
2019-06-24 2:52 ` [PATCH 56/57] Revert "ci: require IO::KQueue on FreeBSD, for now" Eric Wong
@ 2019-06-24 2:52 ` Eric Wong
2019-06-24 5:25 ` [PATCH 58/57] Makefile: skip DSKQXS in global syntax check Eric Wong
2019-06-24 18:28 ` [PATCH 59/57] ds: ->write must not clobber empty wbuf array Eric Wong
58 siblings, 0 replies; 61+ messages in thread
From: Eric Wong @ 2019-06-24 2:52 UTC (permalink / raw)
To: meta
We end up buffering giant things to the FS sometimes, and open()
is not a cheap syscall; so being forced to do it twice to get a
file description with O_APPEND is gross when we can just use
O_EXCL ourselves and loop on EEXIST.
---
lib/PublicInbox/DS.pm | 20 ++++++++++++--------
1 file changed, 12 insertions(+), 8 deletions(-)
diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm
index e3479e66..4947192f 100644
--- a/lib/PublicInbox/DS.pm
+++ b/lib/PublicInbox/DS.pm
@@ -18,7 +18,7 @@ use strict;
use bytes;
use POSIX ();
use IO::Handle qw();
-use Fcntl qw(FD_CLOEXEC F_SETFD F_GETFD SEEK_SET);
+use Fcntl qw(SEEK_SET :DEFAULT);
use Time::HiRes qw(clock_gettime CLOCK_MONOTONIC);
use parent qw(Exporter);
our @EXPORT_OK = qw(now msg_more);
@@ -32,9 +32,9 @@ use fields ('sock', # underlying socket
'wbuf_off', # offset into first element of wbuf to start writing at
);
-use Errno qw(EAGAIN EINVAL);
+use Errno qw(EAGAIN EINVAL EEXIST);
use Carp qw(croak confess carp);
-use File::Temp qw(tempfile);
+require File::Spec;
our (
%DescriptorMap, # fd (num) -> PublicInbox::DS object
@@ -440,12 +440,16 @@ sub drop {
# PerlIO::mmap or PerlIO::scalar if needed
sub tmpio ($$$) {
my ($self, $bref, $off) = @_;
- # open(my $fh, '+>>', undef) doesn't set O_APPEND
- my ($fh, $path) = eval { tempfile('wbuf-XXXXXXX', TMPDIR => 1) };
- $fh or return drop($self, "tempfile: $@");
- open($fh, '+>>', $path) or return drop($self, "open: $!");
+ my $fh; # open(my $fh, '+>>', undef) doesn't set O_APPEND
+ do {
+ my $fn = File::Spec->tmpdir . '/wbuf-' . rand;
+ if (sysopen($fh, $fn, O_RDWR|O_CREAT|O_EXCL|O_APPEND, 0600)) { # likely
+ unlink($fn) or return drop($self, "unlink($fn) $!");
+ } elsif ($! != EEXIST) { # EMFILE/ENFILE/ENOSPC/ENOMEM
+ return drop($self, "open: $!");
+ }
+ } until (defined $fh);
$fh->autoflush(1);
- unlink($path) or return drop($self, "unlink: $!");
my $len = bytes::length($$bref) - $off;
$fh->write($$bref, $len, $off) or return drop($self, "write ($len): $!");
$fh
--
EW
^ permalink raw reply related [flat|nested] 61+ messages in thread
* [PATCH 58/57] Makefile: skip DSKQXS in global syntax check
2019-06-24 2:52 [PATCH 00/57] ds: shrink, TLS support, buffer writes to FS Eric Wong
` (56 preceding siblings ...)
2019-06-24 2:52 ` [PATCH 57/57] ds: reduce overhead of tempfile creation Eric Wong
@ 2019-06-24 5:25 ` Eric Wong
2019-06-24 18:28 ` [PATCH 59/57] ds: ->write must not clobber empty wbuf array Eric Wong
58 siblings, 0 replies; 61+ messages in thread
From: Eric Wong @ 2019-06-24 5:25 UTC (permalink / raw)
To: meta
IO::KQueue isn't easily installable on Linux systems.
---
Makefile.PL | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/Makefile.PL b/Makefile.PL
index 2382207..adcf91e 100644
--- a/Makefile.PL
+++ b/Makefile.PL
@@ -77,7 +77,7 @@ changed = \$(shell git ls-files -m)
%.syntax ::
@\$(PERL) -w -I lib -c \$(subst .syntax,,\$@)
-syntax:: \$(my_syntax)
+syntax:: \$(filter-out lib/PublicInbox/DSKQXS.pm.syntax,\$(my_syntax))
dsyn :: \$(addsuffix .syntax, \$(filter \$(changed), \$(syn_files)))
--
EW
^ permalink raw reply related [flat|nested] 61+ messages in thread
* [PATCH 59/57] ds: ->write must not clobber empty wbuf array
2019-06-24 2:52 [PATCH 00/57] ds: shrink, TLS support, buffer writes to FS Eric Wong
` (57 preceding siblings ...)
2019-06-24 5:25 ` [PATCH 58/57] Makefile: skip DSKQXS in global syntax check Eric Wong
@ 2019-06-24 18:28 ` Eric Wong
58 siblings, 0 replies; 61+ messages in thread
From: Eric Wong @ 2019-06-24 18:28 UTC (permalink / raw)
To: meta
We need to account for ->write(CODE) calls doing ->write(SCALARREF),
otherwise flush_write may see the wrong ->{wbuf} field.
---
lib/PublicInbox/DS.pm | 5 ++++-
1 file changed, 4 insertions(+), 1 deletion(-)
diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm
index 4947192..08f4e9e 100644
--- a/lib/PublicInbox/DS.pm
+++ b/lib/PublicInbox/DS.pm
@@ -505,7 +505,10 @@ sub write {
return $self->close;
}
my $tmpio = tmpio($self, $bref, $written) or return 0;
- $self->{wbuf} = [ $tmpio ];
+
+ # wbuf may be an empty array if we're being called inside
+ # ->flush_write via CODE bref:
+ push @{$self->{wbuf} ||= []}, $tmpio;
watch($self, EPOLLOUT|EPOLLONESHOT);
return 0;
}
--
EW
^ permalink raw reply related [flat|nested] 61+ messages in thread