From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.6 (2021-04-09) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: X-Spam-Status: No, score=-4.2 required=3.0 tests=ALL_TRUSTED,AWL,BAYES_00, DKIM_SIGNED,DKIM_VALID,DKIM_VALID_AU,DKIM_VALID_EF, T_SCC_BODY_TEXT_LINE shortcircuit=no autolearn=ham autolearn_force=no version=3.4.6 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 1BF3ED48B8 for ; Wed, 19 Jun 2024 23:41:25 +0000 (UTC) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=80x24.org; s=selector1; t=1718840485; bh=KW3ELtQ+8+bJjDEHKH3rgR1BSaXMh7cSrfDWC47JOpg=; h=From:To:Subject:Date:In-Reply-To:References:From; b=VHozLgdwxXNnFJWgMQVrsLy7eDb+joTTxSu1Pgs5wlrJ9MOfA4r6UW/Ftsa2Ssh+D LYVEqk3SKgaNTKSKBngOfPUvqg45o5bGVdVyCW4QO+4aY/SX2p1IhfqPcd1lw0MaAM EInf7G0ylOh1tGSNI8lkGsF1on8f/zSYqENrjFVA= From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 2/5] ds: update indentation to match rest of source Date: Wed, 19 Jun 2024 23:41:01 +0000 Message-ID: <20240619234104.80183-3-e@80x24.org> In-Reply-To: <20240619234104.80183-1-e@80x24.org> References: <20240619234104.80183-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: Our changes aren't compatible with Danga::Socket at all at this point. While we're at it, depend more on subroutine prototypes to get some compile-time checking. --- lib/PublicInbox/DS.pm | 357 +++++++++++++++++++++--------------------- 1 file changed, 178 insertions(+), 179 deletions(-) diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index ae0525dc..17a8a1df 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -18,7 +18,7 @@ # sock: underlying socket # rbuf: scalarref, usually undef # wbuf: arrayref of coderefs or tmpio (autovivified)) -# (tmpio = [ GLOB, offset, [ length ] ]) +# (tmpio = [ GLOB, offset, [ length ] ]) package PublicInbox::DS; use strict; use v5.10.1; @@ -42,16 +42,14 @@ my $reap_armed; my @active; # FDs (or objs) returned by epoll/kqueue our (%AWAIT_PIDS, # pid => [ $callback, @args ] $cur_runq, # only set inside next_tick - @FD_MAP, # fd (num) -> PublicInbox::DS object - $Poller, # global Select, Epoll, DSPoll, or DSKQXS ref - - @post_loop_do, # subref + args to call at the end of each loop - - $loop_timeout, # timeout of event loop in milliseconds - @Timers, # timers - %UniqTimer, - $in_loop, - ); + @FD_MAP, # fd (num) -> PublicInbox::DS object + $Poller, # global Select, Epoll, DSPoll, or DSKQXS ref + @post_loop_do, # subref + args to call at the end of each loop + $loop_timeout, # timeout of event loop in milliseconds + @Timers, + %UniqTimer, + $in_loop, +); Reset(); @@ -318,23 +316,22 @@ This is normally (always?) called from your subclass via: =cut sub new { - my ($self, $sock, $ev) = @_; - $self->{sock} = $sock; - my $fd = fileno($sock); - - $Poller //= _InitPoller(); + my ($self, $sock, $ev) = @_; + $self->{sock} = $sock; + my $fd = fileno($sock); + $Poller //= _InitPoller(); retry: - if ($Poller->ep_add($sock, $ev)) { - if ($! == EINVAL && ($ev & EPOLLEXCLUSIVE)) { - $ev &= ~EPOLLEXCLUSIVE; - goto retry; - } - die "EPOLL_CTL_ADD $self/$sock/$fd: $!"; - } - defined($FD_MAP[$fd]) and + if ($Poller->ep_add($sock, $ev)) { + if ($! == EINVAL && ($ev & EPOLLEXCLUSIVE)) { + $ev &= ~EPOLLEXCLUSIVE; + goto retry; + } + die "EPOLL_CTL_ADD $self/$sock/$fd: $!"; + } + defined($FD_MAP[$fd]) and croak("BUG: FD:$fd in use by $FD_MAP[$fd] (for $self/$sock)"); - $FD_MAP[$fd] = $self; + $FD_MAP[$fd] = $self; } # for IMAP, NNTP, and POP3 which greet clients upon connect @@ -374,75 +371,80 @@ sub close { # portable, non-thread-safe sendfile emulation (no pread, yet) sub send_tmpio ($$) { - my ($sock, $tmpio) = @_; - - sysseek($tmpio->[0], $tmpio->[1], SEEK_SET) or return; - my $n = $tmpio->[2] // 65536; - $n = 65536 if $n > 65536; - defined(my $to_write = sysread($tmpio->[0], my $buf, $n)) or return; - my $written = 0; - while ($to_write > 0) { - if (defined(my $w = syswrite($sock, $buf, $to_write, $written))) { - $written += $w; - $to_write -= $w; - } else { - return if $written == 0; - last; - } - } - $tmpio->[1] += $written; # offset - $tmpio->[2] -= $written if defined($tmpio->[2]); # length - $written; + my ($sock, $tmpio) = @_; + + sysseek($tmpio->[0], $tmpio->[1], SEEK_SET) or return; + my $n = $tmpio->[2] // 65536; + $n = 65536 if $n > 65536; + my $to_write = sysread($tmpio->[0], my $buf, $n) // return; + my $total = 0; + while ($to_write > 0) { + if (defined(my $w = syswrite($sock, $buf, $to_write, $total))) { + $total += $w; + $to_write -= $w; + } else { + $total ? last : return; + } + } + $tmpio->[1] += $total; # offset + $tmpio->[2] -= $total if defined($tmpio->[2]); # length + $total; } sub epbit ($$) { # (sock, default) $_[0]->can('stop_SSL') ? PublicInbox::TLS::epollbit() : $_[1]; } +sub epwait ($$) { + my ($io, $ev) = @_; + $Poller->ep_mod($io, $ev) and croak("EPOLL_CTL_MOD($io): $!"); +} + # returns 1 if done, 0 if incomplete -sub flush_write ($) { - my ($self) = @_; - my $sock = $self->{sock} or return; - my $wbuf = $self->{wbuf} or return 1; +sub flush_write { + my ($self) = @_; + my $sock = $self->{sock} or return; + my $wbuf = $self->{wbuf} or return 1; next_buf: - while (my $bref = $wbuf->[0]) { - if (ref($bref) ne 'CODE') { - while ($sock) { - my $w = send_tmpio($sock, $bref); # bref is tmpio - if (defined $w) { - if ($w == 0) { - shift @$wbuf; - goto next_buf; - } - } elsif ($! == EAGAIN && (my $ev = epbit($sock, EPOLLOUT))) { - epwait($sock, $ev | EPOLLONESHOT); - return 0; - } else { - return $self->close; - } - } - } else { #(ref($bref) eq 'CODE') { - shift @$wbuf; - my $before = scalar(@$wbuf); - $bref->($self); - - # bref may be enqueueing more CODE to call (see accept_tls_step) - return 0 if (scalar(@$wbuf) > $before); - } - } # while @$wbuf - - delete $self->{wbuf}; - 1; # all done + while (my $bref = $wbuf->[0]) { + if (ref($bref) ne 'CODE') { # bref is tmpio + while ($sock) { + my $w = send_tmpio $sock, $bref; + if (defined $w) { + if ($w == 0) { + shift @$wbuf; + goto next_buf; + } + } elsif ($! == EAGAIN && (my $ev = epbit + $sock, EPOLLOUT)) { + epwait $sock, $ev | EPOLLONESHOT; + return 0; + } else { + return $self->close; + } + } + } else { #(ref($bref) eq 'CODE') { + shift @$wbuf; + my $before = scalar(@$wbuf); + $bref->($self); + # bref may be enqueueing more CODE to call + # (see accept_tls_step) + return 0 if (scalar(@$wbuf) > $before); + } + } # while @$wbuf + + delete $self->{wbuf}; + 1; # all done } sub rbuf_idle ($$) { - my ($self, $rbuf) = @_; - if ($$rbuf eq '') { # who knows how long till we can read again - delete $self->{rbuf}; - } else { - $self->{rbuf} = $rbuf; - } + my ($self, $rbuf) = @_; + if ($$rbuf eq '') { # who knows how long till we can read again + delete $self->{rbuf}; + } else { + $self->{rbuf} = $rbuf; + } } # returns true if bytes are read, false otherwise @@ -451,8 +453,8 @@ sub do_read ($$$;$) { my ($ev, $r, $s); $r = sysread($s = $self->{sock}, $$rbuf, $len, $off // 0) and return $r; - if (!defined($r) && $! == EAGAIN && ($ev = epbit($s, EPOLLIN))) { - epwait($s, $ev | EPOLLONESHOT); + if (!defined($r) && $! == EAGAIN && ($ev = epbit $s, EPOLLIN)) { + epwait $s, $ev | EPOLLONESHOT; rbuf_idle($self, $rbuf); } else { $self->close; @@ -462,7 +464,7 @@ sub do_read ($$$;$) { # drop the socket if we hit unrecoverable errors on our system which # require BOFH attention: ENOSPC, EFBIG, EIO, EMFILE, ENFILE... -sub drop { +sub drop ($@) { my $self = shift; carp(@_); $self->close; @@ -472,12 +474,12 @@ sub drop { sub tmpio ($$$) { my ($self, $bref, $off) = @_; my $fh = tmpfile 'wbuf', $self->{sock}, 1 or - return drop($self, "tmpfile $!"); + return drop $self, "tmpfile $!"; $fh->autoflush(1); my $len = length($$bref) - $off; my $n = syswrite($fh, $$bref, $len, $off) // - return drop($self, "write ($len): $!"); - $n == $len or return drop($self, "wrote $n < $len bytes"); + return drop $self, "write ($len): $!"; + $n == $len or return drop $self, "wrote $n < $len bytes"; [ $fh, 0 ] # [1] = offset, [2] = length, not set by us } @@ -490,112 +492,109 @@ it returns 1, caller should stop waiting for 'writable' events) =cut sub write { - my ($self, $data) = @_; - - # nobody should be writing to closed sockets, but caller code can - # do two writes within an event, have the first fail and - # disconnect the other side (whose destructor then closes the - # calling object, but it's still in a method), and then the - # now-dead object does its second write. that is this case. we - # just lie and say it worked. it'll be dead soon and won't be - # hurt by this lie. - my $sock = $self->{sock} or return 1; - my $ref = ref $data; - my $bref = $ref ? $data : \$data; - my $wbuf = $self->{wbuf}; - if ($wbuf && scalar(@$wbuf)) { # already buffering, can't write more... - if ($ref eq 'CODE') { - push @$wbuf, $bref; - } else { - my $tmpio = $wbuf->[-1]; - if (ref($tmpio) eq 'ARRAY' && !defined($tmpio->[2])) { - # append to tmp file buffer - $tmpio->[0]->print($$bref) or return drop($self, "print: $!"); - } else { - my $tmpio = tmpio($self, $bref, 0) or return 0; - push @$wbuf, $tmpio; - } - } - return 0; - } elsif ($ref eq 'CODE') { - $bref->($self); - return 1; - } else { - my $to_write = length($$bref); - my $written = syswrite($sock, $$bref, $to_write); - - if (defined $written) { - return 1 if $written == $to_write; - requeue($self); # runs: event_step -> flush_write - } elsif ($! == EAGAIN) { - my $ev = epbit($sock, EPOLLOUT) or return $self->close; - epwait($sock, $ev | EPOLLONESHOT); - $written = 0; - } else { - return $self->close; - } - - # deal with EAGAIN or partial write: - my $tmpio = tmpio($self, $bref, $written) or return 0; - - # wbuf may be an empty array if we're being called inside - # ->flush_write via CODE bref: - push @{$self->{wbuf}}, $tmpio; # autovivifies - return 0; - } + my ($self, $data) = @_; + + # nobody should be writing to closed sockets, but caller code can + # do two writes within an event, have the first fail and + # disconnect the other side (whose destructor then closes the + # calling object, but it's still in a method), and then the + # now-dead object does its second write. that is this case. we + # just lie and say it worked. it'll be dead soon and won't be + # hurt by this lie. + my $sock = $self->{sock} or return 1; + my $ref = ref $data; + my $bref = $ref ? $data : \$data; + my $wbuf = $self->{wbuf}; + if ($wbuf && scalar(@$wbuf)) { # already buffering, can't write more... + if ($ref eq 'CODE') { + push @$wbuf, $bref; + } else { + my $tmpio = $wbuf->[-1]; + if (ref($tmpio) eq 'ARRAY' && !defined($tmpio->[2])) { + # append to tmp file buffer + $tmpio->[0]->print($$bref) or + return drop($self, "print: $!"); + } else { + $tmpio = tmpio $self, $bref, 0 or return 0; + push @$wbuf, $tmpio; + } + } + 0; + } elsif ($ref eq 'CODE') { + $bref->($self); + 1; + } else { + my $to_write = length $$bref; + my $w = syswrite $sock, $$bref, $to_write; + + if (defined $w) { + return 1 if $w == $to_write; + requeue $self; # runs: event_step -> flush_write + } elsif ($! == EAGAIN) { + my $ev = epbit $sock, EPOLLOUT or return $self->close; + epwait $sock, $ev | EPOLLONESHOT; + $w = 0; + } else { + return $self->close; + } + + # deal with EAGAIN or partial write: + my $tmpio = tmpio $self, $bref, $w or return 0; + + # wbuf may be an empty array if we're being called inside + # ->flush_write via CODE bref: + push @{$self->{wbuf}}, $tmpio; # autovivifies + 0; + } } use constant MSG_MORE => ($^O eq 'linux') ? 0x8000 : 0; sub msg_more ($$) { - my $self = $_[0]; - my $sock = $self->{sock} or return 1; - my $wbuf = $self->{wbuf}; + my $self = $_[0]; # $_[1] = buf + my ($sock, $wbuf, $n, $nlen, $tmpio); + $sock = $self->{sock} or return 1; + $wbuf = $self->{wbuf}; - if (MSG_MORE && (!defined($wbuf) || !scalar(@$wbuf)) && + if (MSG_MORE && (!defined($wbuf) || !scalar(@$wbuf)) && !$sock->can('stop_SSL')) { - my $n = send($sock, $_[1], MSG_MORE); - if (defined $n) { - my $nlen = length($_[1]) - $n; - return 1 if $nlen == 0; # all done! - # queue up the unwritten substring: - my $tmpio = tmpio($self, \($_[1]), $n) or return 0; - push @{$self->{wbuf}}, $tmpio; # autovivifies - epwait($sock, EPOLLOUT|EPOLLONESHOT); - return 0; - } - } - - # don't redispatch into NNTPdeflate::write - PublicInbox::DS::write($self, \($_[1])); -} + $n = send($sock, $_[1], MSG_MORE); + if (defined $n) { + $nlen = length($_[1]) - $n; + return 1 if $nlen == 0; # all done! + # queue up the unwritten substring: + $tmpio = tmpio($self, \($_[1]), $n) or return 0; + push @{$self->{wbuf}}, $tmpio; # autovivifies + epwait $sock, EPOLLOUT|EPOLLONESHOT; + return 0; + } + } -sub epwait ($$) { - my ($io, $ev) = @_; - $Poller->ep_mod($io, $ev) and croak("EPOLL_CTL_MOD($io): $!"); + # don't redispatch into NNTPdeflate::write + PublicInbox::DS::write($self, \($_[1])); } # return true if complete, false if incomplete (or failure) sub accept_tls_step ($) { - my ($self) = @_; - my $sock = $self->{sock} or return; - return 1 if $sock->accept_SSL; - return $self->close if $! != EAGAIN; - my $ev = PublicInbox::TLS::epollbit() or return $self->close; - epwait($sock, $ev | EPOLLONESHOT); - unshift(@{$self->{wbuf}}, \&accept_tls_step); # autovivifies - 0; + my ($self) = @_; + my $sock = $self->{sock} or return; + return 1 if $sock->accept_SSL; + return $self->close if $! != EAGAIN; + my $ev = PublicInbox::TLS::epollbit() or return $self->close; + epwait $sock, $ev | EPOLLONESHOT; + unshift @{$self->{wbuf}}, \&accept_tls_step; # autovivifies + 0; } # return value irrelevant sub shutdn_tls_step ($) { - my ($self) = @_; - my $sock = $self->{sock} or return; - return $self->close if $sock->stop_SSL(SSL_fast_shutdown => 1); - return $self->close if $! != EAGAIN; - my $ev = PublicInbox::TLS::epollbit() or return $self->close; - epwait($sock, $ev | EPOLLONESHOT); - unshift(@{$self->{wbuf}}, \&shutdn_tls_step); # autovivifies + my ($self) = @_; + my $sock = $self->{sock} or return; + return $self->close if $sock->stop_SSL(SSL_fast_shutdown => 1); + return $self->close if $! != EAGAIN; + my $ev = PublicInbox::TLS::epollbit() or return $self->close; + epwait $sock, $ev | EPOLLONESHOT; + unshift @{$self->{wbuf}}, \&shutdn_tls_step; # autovivifies } # don't bother with shutdown($sock, 2), we don't fork+exec w/o CLOEXEC @@ -603,7 +602,7 @@ sub shutdn_tls_step ($) { sub shutdn ($) { my ($self) = @_; my $sock = $self->{sock} or return; - $sock->can('stop_SSL') ? shutdn_tls_step($self) : $self->close; + $sock->can('stop_SSL') ? shutdn_tls_step $self : $self->close; } sub dflush {} # overridden by DSdeflate