From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.6 (2021-04-09) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: X-Spam-Status: No, score=-4.2 required=3.0 tests=ALL_TRUSTED,AWL,BAYES_00, DKIM_SIGNED,DKIM_VALID,DKIM_VALID_AU,DKIM_VALID_EF, T_SCC_BODY_TEXT_LINE shortcircuit=no autolearn=ham autolearn_force=no version=3.4.6 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id AAE141F518 for ; Sun, 26 Nov 2023 02:11:06 +0000 (UTC) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=80x24.org; s=selector1; t=1700964666; bh=LVcfuC1HMMkqoFSPWmsYQCs6hac1OmxcRY2bSKJuLyY=; h=From:To:Subject:Date:In-Reply-To:References:From; b=4RDjQCuuwmh9vjOlaS6e5xM3uMQ39A1N6CVuOwLywYRiglywWCAIjGzqHjyDXvbfL oBWEERYW4ZFUvsLzzg3vUDBaOaKei+h14zx4f/A/zaVd+A8FAaCaJFh+ySRW4XUer0 aLIk5hWvsxnoqidrL44etqsUm006gCdb3hfBmOAM= From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 5/7] git: move rbuf handling to PublicInbox::IO Date: Sun, 26 Nov 2023 02:11:03 +0000 Message-ID: <20231126021105.408573-6-e@80x24.org> In-Reply-To: <20231126021105.408573-1-e@80x24.org> References: <20231126021105.408573-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: The long-term plan is to share non-blocking read buffering logic with HTTP/NNTP/IMAP/POP3 and also XapClient. --- lib/PublicInbox/Gcf2Client.pm | 1 - lib/PublicInbox/Git.pm | 59 ++++++----------------------------- lib/PublicInbox/IO.pm | 53 ++++++++++++++++++++++++++++++- 3 files changed, 61 insertions(+), 52 deletions(-) diff --git a/lib/PublicInbox/Gcf2Client.pm b/lib/PublicInbox/Gcf2Client.pm index 48d8c5ac..19d77e32 100644 --- a/lib/PublicInbox/Gcf2Client.pm +++ b/lib/PublicInbox/Gcf2Client.pm @@ -21,7 +21,6 @@ use autodie qw(socketpair); # pid.owner => process which spawned {pid} # in => same as {sock}, for compatibility with PublicInbox::Git # inflight => array (see PublicInbox::Git) -# rbuf => scalarref, may be non-existent or empty sub new { my ($opt) = @_; my $self = bless {}, __PACKAGE__; diff --git a/lib/PublicInbox/Git.pm b/lib/PublicInbox/Git.pm index bef524aa..93736cf0 100644 --- a/lib/PublicInbox/Git.pm +++ b/lib/PublicInbox/Git.pm @@ -14,11 +14,11 @@ use autodie qw(socketpair read); use POSIX (); use Socket qw(AF_UNIX SOCK_STREAM); use PublicInbox::Syscall qw(EPOLLIN EPOLLET); -use Errno qw(EINTR EAGAIN); +use Errno qw(EAGAIN); use File::Glob qw(bsd_glob GLOB_NOSORT); use File::Spec (); use PublicInbox::Spawn qw(spawn popen_rd run_qx which); -use PublicInbox::IO qw(poll_in read_all try_cat); +use PublicInbox::IO qw(read_all try_cat); use PublicInbox::Tmpfile; use Carp qw(croak carp); use PublicInbox::SHA qw(sha_all); @@ -166,43 +166,6 @@ sub _sock_cmd { $self->{sock} = PublicInbox::IO::attach_pid($s1, $pid); } -sub my_read ($$$) { - my ($fh, $rbuf, $len) = @_; - my $left = $len - length($$rbuf); - my $r; - while ($left > 0) { - $r = sysread($fh, $$rbuf, $left, length($$rbuf)); - if ($r) { - $left -= $r; - } elsif (defined($r)) { # EOF - return 0; - } else { - next if ($! == EAGAIN and poll_in($fh)); - next if $! == EINTR; # may be set by sysread or poll_in - return; # unrecoverable error - } - } - my $no_pad = substr($$rbuf, 0, $len, ''); - \$no_pad; -} - -sub my_readline ($$) { - my ($fh, $rbuf) = @_; - while (1) { - if ((my $n = index($$rbuf, "\n")) >= 0) { - return substr($$rbuf, 0, $n + 1, ''); - } - my $r = sysread($fh, $$rbuf, 65536, length($$rbuf)) and next; - - # return whatever's left on EOF - return substr($$rbuf, 0, length($$rbuf)+1, '') if defined($r); - - next if ($! == EAGAIN and poll_in($fh)); - next if $! == EINTR; # may be set by sysread or poll_in - return; # unrecoverable error - } -} - sub cat_async_retry ($$) { my ($self, $old_inflight) = @_; @@ -234,16 +197,15 @@ sub cat_async_step ($$) { my ($self, $inflight) = @_; die 'BUG: inflight empty or odd' if scalar(@$inflight) < 3; my ($req, $cb, $arg) = @$inflight[0, 1, 2]; - my $rbuf = delete($self->{rbuf}) // \(my $new = ''); my ($bref, $oid, $type, $size); - my $head = my_readline($self->{sock}, $rbuf); + my $head = $self->{sock}->my_readline; my $cmd = ref($req) ? $$req : $req; # ->fail may be called via Gcf2Client.pm my $info = $self->{-bc} && substr($cmd, 0, 5) eq 'info '; if ($head =~ /^([0-9a-f]{40,}) (\S+) ([0-9]+)$/) { ($oid, $type, $size) = ($1, $2, $3 + 0); unless ($info) { # --batch-command - $bref = my_read($self->{sock}, $rbuf, $size + 1) or + $bref = $self->{sock}->my_bufread($size + 1) or $self->fail(defined($bref) ? 'read EOF' : "read: $!"); chop($$bref) eq "\n" or @@ -268,7 +230,6 @@ sub cat_async_step ($$) { my $err = $! ? " ($!)" : ''; $self->fail("bad result from async cat-file: $head$err"); } - $self->{rbuf} = $rbuf if $$rbuf ne ''; splice(@$inflight, 0, 3); # don't retry $cb on ->fail eval { $cb->($bref, $oid, $type, $size, $arg) }; async_err($self, $req, $oid, $@, $info ? 'check' : 'cat') if $@; @@ -312,17 +273,15 @@ sub check_async_step ($$) { my ($ck, $inflight) = @_; die 'BUG: inflight empty or odd' if scalar(@$inflight) < 3; my ($req, $cb, $arg) = @$inflight[0, 1, 2]; - my $rbuf = delete($ck->{rbuf}) // \(my $new = ''); - chomp(my $line = my_readline($ck->{sock}, $rbuf)); + chomp(my $line = $ck->{sock}->my_readline); my ($hex, $type, $size) = split(/ /, $line); # git <2.21 would show `dangling' (2.21+ shows `ambiguous') # https://public-inbox.org/git/20190118033845.s2vlrb3wd3m2jfzu@dcvr/T/ if ($hex eq 'dangling') { - my $ret = my_read($ck->{sock}, $rbuf, $type + 1); + my $ret = $ck->{sock}->my_bufread($type + 1); $ck->fail(defined($ret) ? 'read EOF' : "read: $!") if !$ret; } - $ck->{rbuf} = $rbuf if $$rbuf ne ''; splice(@$inflight, 0, 3); # don't retry $cb on ->fail eval { $cb->(undef, $hex, $type, $size, $arg) }; async_err($ck, $req, $hex, $@, 'check') if $@; @@ -643,8 +602,8 @@ sub event_step { $self->cat_async_step($inflight); return $self->close unless $self->{sock}; # don't loop here to keep things fair, but we must requeue - # if there's already-read data in rbuf - $self->requeue if exists($self->{rbuf}); + # if there's already-read data in pi_io_rbuf + $self->requeue if $self->{sock}->has_rbuf; } } @@ -670,7 +629,7 @@ sub close { warn "E: (in abort) $req: $@" if $@; } } - delete @$self{qw(-bc err_c inflight rbuf)}; + delete @$self{qw(-bc err_c inflight)}; delete($self->{epwatch}) ? $self->SUPER::close : delete($self->{sock}); } diff --git a/lib/PublicInbox/IO.pm b/lib/PublicInbox/IO.pm index 63ae3ef4..fcebac59 100644 --- a/lib/PublicInbox/IO.pm +++ b/lib/PublicInbox/IO.pm @@ -9,6 +9,7 @@ use PublicInbox::DS qw(awaitpid); our @EXPORT_OK = qw(poll_in read_all try_cat write_file); use Carp qw(croak); use IO::Poll qw(POLLIN); +use Errno qw(EINTR EAGAIN); # don't autodie in top-level for Perl 5.16.3 (and maybe newer versions) # we have our own ->close, so we scope autodie into each sub @@ -18,7 +19,7 @@ sub waitcb { # awaitpid callback $cb->($pid, @args) if $cb; } -sub attach_pid ($$;@) { +sub attach_pid { my ($io, $pid, @cb_arg) = @_; bless $io, __PACKAGE__; # we share $err (and not $self) with awaitpid to avoid a ref cycle @@ -87,4 +88,54 @@ sub try_cat ($) { read_all $fh; } +# TODO: move existing HTTP/IMAP/NNTP/POP3 uses of rbuf here +sub my_bufread { + my ($io, $len) = @_; + my $rbuf = ${*$io}{pi_io_rbuf} //= \(my $new = ''); + my $left = $len - length($$rbuf); + my $r; + while ($left > 0) { + $r = sysread($io, $$rbuf, $left, length($$rbuf)); + if ($r) { + $left -= $r; + } elsif (defined($r)) { # EOF + return 0; + } else { + next if ($! == EAGAIN and poll_in($io)); + next if $! == EINTR; # may be set by sysread or poll_in + return; # unrecoverable error + } + } + my $no_pad = substr($$rbuf, 0, $len, ''); + delete(${*$io}{pi_io_rbuf}) if $$rbuf eq ''; + \$no_pad; +} + +# always uses "\n" +sub my_readline { + my ($io) = @_; + my $rbuf = ${*$io}{pi_io_rbuf} //= \(my $new = ''); + while (1) { + if ((my $n = index($$rbuf, "\n")) >= 0) { + my $ret = substr($$rbuf, 0, $n + 1, ''); + delete(${*$io}{pi_io_rbuf}) if $$rbuf eq ''; + return $ret; + } + my $r = sysread($io, $$rbuf, 65536, length($$rbuf)); + if (!defined($r)) { + next if ($! == EAGAIN and poll_in($io)); + next if $! == EINTR; # may be set by sysread or poll_in + return; # unrecoverable error + } elsif ($r == 0) { # return whatever's left on EOF + delete(${*$io}{pi_io_rbuf}); + return $$rbuf; + } # else { continue + } +} + +sub has_rbuf { + my ($io) = @_; + defined(${*$io}{pi_io_rbuf}); +} + 1;