From: Eric Wong <e@80x24.org>
To: meta@public-inbox.org
Subject: [PATCH 2/5] ds: update indentation to match rest of source
Date: Wed, 19 Jun 2024 23:41:01 +0000 [thread overview]
Message-ID: <20240619234104.80183-3-e@80x24.org> (raw)
In-Reply-To: <20240619234104.80183-1-e@80x24.org>
Our changes aren't compatible with Danga::Socket at all at this
point. While we're at it, depend more on subroutine prototypes
to get some compile-time checking.
---
lib/PublicInbox/DS.pm | 357 +++++++++++++++++++++---------------------
1 file changed, 178 insertions(+), 179 deletions(-)
diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm
index ae0525dc..17a8a1df 100644
--- a/lib/PublicInbox/DS.pm
+++ b/lib/PublicInbox/DS.pm
@@ -18,7 +18,7 @@
# sock: underlying socket
# rbuf: scalarref, usually undef
# wbuf: arrayref of coderefs or tmpio (autovivified))
-# (tmpio = [ GLOB, offset, [ length ] ])
+# (tmpio = [ GLOB, offset, [ length ] ])
package PublicInbox::DS;
use strict;
use v5.10.1;
@@ -42,16 +42,14 @@ my $reap_armed;
my @active; # FDs (or objs) returned by epoll/kqueue
our (%AWAIT_PIDS, # pid => [ $callback, @args ]
$cur_runq, # only set inside next_tick
- @FD_MAP, # fd (num) -> PublicInbox::DS object
- $Poller, # global Select, Epoll, DSPoll, or DSKQXS ref
-
- @post_loop_do, # subref + args to call at the end of each loop
-
- $loop_timeout, # timeout of event loop in milliseconds
- @Timers, # timers
- %UniqTimer,
- $in_loop,
- );
+ @FD_MAP, # fd (num) -> PublicInbox::DS object
+ $Poller, # global Select, Epoll, DSPoll, or DSKQXS ref
+ @post_loop_do, # subref + args to call at the end of each loop
+ $loop_timeout, # timeout of event loop in milliseconds
+ @Timers,
+ %UniqTimer,
+ $in_loop,
+);
Reset();
@@ -318,23 +316,22 @@ This is normally (always?) called from your subclass via:
=cut
sub new {
- my ($self, $sock, $ev) = @_;
- $self->{sock} = $sock;
- my $fd = fileno($sock);
-
- $Poller //= _InitPoller();
+ my ($self, $sock, $ev) = @_;
+ $self->{sock} = $sock;
+ my $fd = fileno($sock);
+ $Poller //= _InitPoller();
retry:
- if ($Poller->ep_add($sock, $ev)) {
- if ($! == EINVAL && ($ev & EPOLLEXCLUSIVE)) {
- $ev &= ~EPOLLEXCLUSIVE;
- goto retry;
- }
- die "EPOLL_CTL_ADD $self/$sock/$fd: $!";
- }
- defined($FD_MAP[$fd]) and
+ if ($Poller->ep_add($sock, $ev)) {
+ if ($! == EINVAL && ($ev & EPOLLEXCLUSIVE)) {
+ $ev &= ~EPOLLEXCLUSIVE;
+ goto retry;
+ }
+ die "EPOLL_CTL_ADD $self/$sock/$fd: $!";
+ }
+ defined($FD_MAP[$fd]) and
croak("BUG: FD:$fd in use by $FD_MAP[$fd] (for $self/$sock)");
- $FD_MAP[$fd] = $self;
+ $FD_MAP[$fd] = $self;
}
# for IMAP, NNTP, and POP3 which greet clients upon connect
@@ -374,75 +371,80 @@ sub close {
# portable, non-thread-safe sendfile emulation (no pread, yet)
sub send_tmpio ($$) {
- my ($sock, $tmpio) = @_;
-
- sysseek($tmpio->[0], $tmpio->[1], SEEK_SET) or return;
- my $n = $tmpio->[2] // 65536;
- $n = 65536 if $n > 65536;
- defined(my $to_write = sysread($tmpio->[0], my $buf, $n)) or return;
- my $written = 0;
- while ($to_write > 0) {
- if (defined(my $w = syswrite($sock, $buf, $to_write, $written))) {
- $written += $w;
- $to_write -= $w;
- } else {
- return if $written == 0;
- last;
- }
- }
- $tmpio->[1] += $written; # offset
- $tmpio->[2] -= $written if defined($tmpio->[2]); # length
- $written;
+ my ($sock, $tmpio) = @_;
+
+ sysseek($tmpio->[0], $tmpio->[1], SEEK_SET) or return;
+ my $n = $tmpio->[2] // 65536;
+ $n = 65536 if $n > 65536;
+ my $to_write = sysread($tmpio->[0], my $buf, $n) // return;
+ my $total = 0;
+ while ($to_write > 0) {
+ if (defined(my $w = syswrite($sock, $buf, $to_write, $total))) {
+ $total += $w;
+ $to_write -= $w;
+ } else {
+ $total ? last : return;
+ }
+ }
+ $tmpio->[1] += $total; # offset
+ $tmpio->[2] -= $total if defined($tmpio->[2]); # length
+ $total;
}
sub epbit ($$) { # (sock, default)
$_[0]->can('stop_SSL') ? PublicInbox::TLS::epollbit() : $_[1];
}
+sub epwait ($$) {
+ my ($io, $ev) = @_;
+ $Poller->ep_mod($io, $ev) and croak("EPOLL_CTL_MOD($io): $!");
+}
+
# returns 1 if done, 0 if incomplete
-sub flush_write ($) {
- my ($self) = @_;
- my $sock = $self->{sock} or return;
- my $wbuf = $self->{wbuf} or return 1;
+sub flush_write {
+ my ($self) = @_;
+ my $sock = $self->{sock} or return;
+ my $wbuf = $self->{wbuf} or return 1;
next_buf:
- while (my $bref = $wbuf->[0]) {
- if (ref($bref) ne 'CODE') {
- while ($sock) {
- my $w = send_tmpio($sock, $bref); # bref is tmpio
- if (defined $w) {
- if ($w == 0) {
- shift @$wbuf;
- goto next_buf;
- }
- } elsif ($! == EAGAIN && (my $ev = epbit($sock, EPOLLOUT))) {
- epwait($sock, $ev | EPOLLONESHOT);
- return 0;
- } else {
- return $self->close;
- }
- }
- } else { #(ref($bref) eq 'CODE') {
- shift @$wbuf;
- my $before = scalar(@$wbuf);
- $bref->($self);
-
- # bref may be enqueueing more CODE to call (see accept_tls_step)
- return 0 if (scalar(@$wbuf) > $before);
- }
- } # while @$wbuf
-
- delete $self->{wbuf};
- 1; # all done
+ while (my $bref = $wbuf->[0]) {
+ if (ref($bref) ne 'CODE') { # bref is tmpio
+ while ($sock) {
+ my $w = send_tmpio $sock, $bref;
+ if (defined $w) {
+ if ($w == 0) {
+ shift @$wbuf;
+ goto next_buf;
+ }
+ } elsif ($! == EAGAIN && (my $ev = epbit
+ $sock, EPOLLOUT)) {
+ epwait $sock, $ev | EPOLLONESHOT;
+ return 0;
+ } else {
+ return $self->close;
+ }
+ }
+ } else { #(ref($bref) eq 'CODE') {
+ shift @$wbuf;
+ my $before = scalar(@$wbuf);
+ $bref->($self);
+ # bref may be enqueueing more CODE to call
+ # (see accept_tls_step)
+ return 0 if (scalar(@$wbuf) > $before);
+ }
+ } # while @$wbuf
+
+ delete $self->{wbuf};
+ 1; # all done
}
sub rbuf_idle ($$) {
- my ($self, $rbuf) = @_;
- if ($$rbuf eq '') { # who knows how long till we can read again
- delete $self->{rbuf};
- } else {
- $self->{rbuf} = $rbuf;
- }
+ my ($self, $rbuf) = @_;
+ if ($$rbuf eq '') { # who knows how long till we can read again
+ delete $self->{rbuf};
+ } else {
+ $self->{rbuf} = $rbuf;
+ }
}
# returns true if bytes are read, false otherwise
@@ -451,8 +453,8 @@ sub do_read ($$$;$) {
my ($ev, $r, $s);
$r = sysread($s = $self->{sock}, $$rbuf, $len, $off // 0) and return $r;
- if (!defined($r) && $! == EAGAIN && ($ev = epbit($s, EPOLLIN))) {
- epwait($s, $ev | EPOLLONESHOT);
+ if (!defined($r) && $! == EAGAIN && ($ev = epbit $s, EPOLLIN)) {
+ epwait $s, $ev | EPOLLONESHOT;
rbuf_idle($self, $rbuf);
} else {
$self->close;
@@ -462,7 +464,7 @@ sub do_read ($$$;$) {
# drop the socket if we hit unrecoverable errors on our system which
# require BOFH attention: ENOSPC, EFBIG, EIO, EMFILE, ENFILE...
-sub drop {
+sub drop ($@) {
my $self = shift;
carp(@_);
$self->close;
@@ -472,12 +474,12 @@ sub drop {
sub tmpio ($$$) {
my ($self, $bref, $off) = @_;
my $fh = tmpfile 'wbuf', $self->{sock}, 1 or
- return drop($self, "tmpfile $!");
+ return drop $self, "tmpfile $!";
$fh->autoflush(1);
my $len = length($$bref) - $off;
my $n = syswrite($fh, $$bref, $len, $off) //
- return drop($self, "write ($len): $!");
- $n == $len or return drop($self, "wrote $n < $len bytes");
+ return drop $self, "write ($len): $!";
+ $n == $len or return drop $self, "wrote $n < $len bytes";
[ $fh, 0 ] # [1] = offset, [2] = length, not set by us
}
@@ -490,112 +492,109 @@ it returns 1, caller should stop waiting for 'writable' events)
=cut
sub write {
- my ($self, $data) = @_;
-
- # nobody should be writing to closed sockets, but caller code can
- # do two writes within an event, have the first fail and
- # disconnect the other side (whose destructor then closes the
- # calling object, but it's still in a method), and then the
- # now-dead object does its second write. that is this case. we
- # just lie and say it worked. it'll be dead soon and won't be
- # hurt by this lie.
- my $sock = $self->{sock} or return 1;
- my $ref = ref $data;
- my $bref = $ref ? $data : \$data;
- my $wbuf = $self->{wbuf};
- if ($wbuf && scalar(@$wbuf)) { # already buffering, can't write more...
- if ($ref eq 'CODE') {
- push @$wbuf, $bref;
- } else {
- my $tmpio = $wbuf->[-1];
- if (ref($tmpio) eq 'ARRAY' && !defined($tmpio->[2])) {
- # append to tmp file buffer
- $tmpio->[0]->print($$bref) or return drop($self, "print: $!");
- } else {
- my $tmpio = tmpio($self, $bref, 0) or return 0;
- push @$wbuf, $tmpio;
- }
- }
- return 0;
- } elsif ($ref eq 'CODE') {
- $bref->($self);
- return 1;
- } else {
- my $to_write = length($$bref);
- my $written = syswrite($sock, $$bref, $to_write);
-
- if (defined $written) {
- return 1 if $written == $to_write;
- requeue($self); # runs: event_step -> flush_write
- } elsif ($! == EAGAIN) {
- my $ev = epbit($sock, EPOLLOUT) or return $self->close;
- epwait($sock, $ev | EPOLLONESHOT);
- $written = 0;
- } else {
- return $self->close;
- }
-
- # deal with EAGAIN or partial write:
- my $tmpio = tmpio($self, $bref, $written) or return 0;
-
- # wbuf may be an empty array if we're being called inside
- # ->flush_write via CODE bref:
- push @{$self->{wbuf}}, $tmpio; # autovivifies
- return 0;
- }
+ my ($self, $data) = @_;
+
+ # nobody should be writing to closed sockets, but caller code can
+ # do two writes within an event, have the first fail and
+ # disconnect the other side (whose destructor then closes the
+ # calling object, but it's still in a method), and then the
+ # now-dead object does its second write. that is this case. we
+ # just lie and say it worked. it'll be dead soon and won't be
+ # hurt by this lie.
+ my $sock = $self->{sock} or return 1;
+ my $ref = ref $data;
+ my $bref = $ref ? $data : \$data;
+ my $wbuf = $self->{wbuf};
+ if ($wbuf && scalar(@$wbuf)) { # already buffering, can't write more...
+ if ($ref eq 'CODE') {
+ push @$wbuf, $bref;
+ } else {
+ my $tmpio = $wbuf->[-1];
+ if (ref($tmpio) eq 'ARRAY' && !defined($tmpio->[2])) {
+ # append to tmp file buffer
+ $tmpio->[0]->print($$bref) or
+ return drop($self, "print: $!");
+ } else {
+ $tmpio = tmpio $self, $bref, 0 or return 0;
+ push @$wbuf, $tmpio;
+ }
+ }
+ 0;
+ } elsif ($ref eq 'CODE') {
+ $bref->($self);
+ 1;
+ } else {
+ my $to_write = length $$bref;
+ my $w = syswrite $sock, $$bref, $to_write;
+
+ if (defined $w) {
+ return 1 if $w == $to_write;
+ requeue $self; # runs: event_step -> flush_write
+ } elsif ($! == EAGAIN) {
+ my $ev = epbit $sock, EPOLLOUT or return $self->close;
+ epwait $sock, $ev | EPOLLONESHOT;
+ $w = 0;
+ } else {
+ return $self->close;
+ }
+
+ # deal with EAGAIN or partial write:
+ my $tmpio = tmpio $self, $bref, $w or return 0;
+
+ # wbuf may be an empty array if we're being called inside
+ # ->flush_write via CODE bref:
+ push @{$self->{wbuf}}, $tmpio; # autovivifies
+ 0;
+ }
}
use constant MSG_MORE => ($^O eq 'linux') ? 0x8000 : 0;
sub msg_more ($$) {
- my $self = $_[0];
- my $sock = $self->{sock} or return 1;
- my $wbuf = $self->{wbuf};
+ my $self = $_[0]; # $_[1] = buf
+ my ($sock, $wbuf, $n, $nlen, $tmpio);
+ $sock = $self->{sock} or return 1;
+ $wbuf = $self->{wbuf};
- if (MSG_MORE && (!defined($wbuf) || !scalar(@$wbuf)) &&
+ if (MSG_MORE && (!defined($wbuf) || !scalar(@$wbuf)) &&
!$sock->can('stop_SSL')) {
- my $n = send($sock, $_[1], MSG_MORE);
- if (defined $n) {
- my $nlen = length($_[1]) - $n;
- return 1 if $nlen == 0; # all done!
- # queue up the unwritten substring:
- my $tmpio = tmpio($self, \($_[1]), $n) or return 0;
- push @{$self->{wbuf}}, $tmpio; # autovivifies
- epwait($sock, EPOLLOUT|EPOLLONESHOT);
- return 0;
- }
- }
-
- # don't redispatch into NNTPdeflate::write
- PublicInbox::DS::write($self, \($_[1]));
-}
+ $n = send($sock, $_[1], MSG_MORE);
+ if (defined $n) {
+ $nlen = length($_[1]) - $n;
+ return 1 if $nlen == 0; # all done!
+ # queue up the unwritten substring:
+ $tmpio = tmpio($self, \($_[1]), $n) or return 0;
+ push @{$self->{wbuf}}, $tmpio; # autovivifies
+ epwait $sock, EPOLLOUT|EPOLLONESHOT;
+ return 0;
+ }
+ }
-sub epwait ($$) {
- my ($io, $ev) = @_;
- $Poller->ep_mod($io, $ev) and croak("EPOLL_CTL_MOD($io): $!");
+ # don't redispatch into NNTPdeflate::write
+ PublicInbox::DS::write($self, \($_[1]));
}
# return true if complete, false if incomplete (or failure)
sub accept_tls_step ($) {
- my ($self) = @_;
- my $sock = $self->{sock} or return;
- return 1 if $sock->accept_SSL;
- return $self->close if $! != EAGAIN;
- my $ev = PublicInbox::TLS::epollbit() or return $self->close;
- epwait($sock, $ev | EPOLLONESHOT);
- unshift(@{$self->{wbuf}}, \&accept_tls_step); # autovivifies
- 0;
+ my ($self) = @_;
+ my $sock = $self->{sock} or return;
+ return 1 if $sock->accept_SSL;
+ return $self->close if $! != EAGAIN;
+ my $ev = PublicInbox::TLS::epollbit() or return $self->close;
+ epwait $sock, $ev | EPOLLONESHOT;
+ unshift @{$self->{wbuf}}, \&accept_tls_step; # autovivifies
+ 0;
}
# return value irrelevant
sub shutdn_tls_step ($) {
- my ($self) = @_;
- my $sock = $self->{sock} or return;
- return $self->close if $sock->stop_SSL(SSL_fast_shutdown => 1);
- return $self->close if $! != EAGAIN;
- my $ev = PublicInbox::TLS::epollbit() or return $self->close;
- epwait($sock, $ev | EPOLLONESHOT);
- unshift(@{$self->{wbuf}}, \&shutdn_tls_step); # autovivifies
+ my ($self) = @_;
+ my $sock = $self->{sock} or return;
+ return $self->close if $sock->stop_SSL(SSL_fast_shutdown => 1);
+ return $self->close if $! != EAGAIN;
+ my $ev = PublicInbox::TLS::epollbit() or return $self->close;
+ epwait $sock, $ev | EPOLLONESHOT;
+ unshift @{$self->{wbuf}}, \&shutdn_tls_step; # autovivifies
}
# don't bother with shutdown($sock, 2), we don't fork+exec w/o CLOEXEC
@@ -603,7 +602,7 @@ sub shutdn_tls_step ($) {
sub shutdn ($) {
my ($self) = @_;
my $sock = $self->{sock} or return;
- $sock->can('stop_SSL') ? shutdn_tls_step($self) : $self->close;
+ $sock->can('stop_SSL') ? shutdn_tls_step $self : $self->close;
}
sub dflush {} # overridden by DSdeflate
next prev parent reply other threads:[~2024-06-19 23:41 UTC|newest]
Thread overview: 6+ messages / expand[flat|nested] mbox.gz Atom feed top
2024-06-19 23:40 [PATCH 0/5] use sendmsg|writev to reduce syscalls Eric Wong
2024-06-19 23:41 ` [PATCH 1/5] ds: remove needless O_APPEND import Eric Wong
2024-06-19 23:41 ` Eric Wong [this message]
2024-06-19 23:41 ` [PATCH 3/5] use sendmsg w/ MSG_MORE to reduce syscalls Eric Wong
2024-06-19 23:41 ` [PATCH 4/5] http: set Content-Length for simple array responses Eric Wong
2024-06-19 23:41 ` [PATCH 5/5] http: use writev for known Content-Length responses Eric Wong
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
List information: https://public-inbox.org/README
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20240619234104.80183-3-e@80x24.org \
--to=e@80x24.org \
--cc=meta@public-inbox.org \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).