I noticed -imapd on public-inbox.org was stuck reading on the Gcf2 pipe; so patch 1/2 should allow us to recover within 60s if it happens again. And 2/2 makes the code easier to follow/modify in case we do hit failures, since we need to ensure we call DS::close in the daemons. Not sure what caused that read(2) to get stuck, I reread the partial writev calculations in lib/PublicInbox/gcf2_libgit2.h and it all seems to make sense to me. I've also written and tested similar code for writev/sendmmsg many times over the years in other projects... Eric Wong (2): git: set non-blocking flag in case of other bugs git: ensure subclassed ->fail gets called lib/PublicInbox/Git.pm | 52 +++++++++++++++++++++------------- lib/PublicInbox/GitAsyncCat.pm | 6 +--- 2 files changed, 34 insertions(+), 24 deletions(-)
This makes GitAsyncCat more resilient to bugs in Gcf2 or even git-cat-file itself. I noticed -imapd stuck on read(2) from the Gcf2 pipe, so there may be a bug somewhere in Gcf2 or PublicInbox::Git. This should make us more resilient to them and hopefully help us notice and fix them. --- lib/PublicInbox/Git.pm | 28 +++++++++++++++++++++------- lib/PublicInbox/GitAsyncCat.pm | 6 +----- 2 files changed, 22 insertions(+), 12 deletions(-) diff --git a/lib/PublicInbox/Git.pm b/lib/PublicInbox/Git.pm index 917fa4a1..d53427d7 100644 --- a/lib/PublicInbox/Git.pm +++ b/lib/PublicInbox/Git.pm @@ -12,17 +12,19 @@ use v5.10.1; use parent qw(Exporter); use POSIX (); use IO::Handle; # ->autoflush -use Errno qw(EINTR); +use Errno qw(EINTR EAGAIN); use File::Glob qw(bsd_glob GLOB_NOSORT); use File::Spec (); use Time::HiRes qw(stat); use PublicInbox::Spawn qw(popen_rd); use PublicInbox::Tmpfile; +use IO::Poll qw(POLLIN); use Carp qw(croak); use Digest::SHA (); our @EXPORT_OK = qw(git_unquote git_quote); our $PIPE_BUFSIZ = 65536; # Linux default our $in_cleanup; +our $RDTIMEO = 60_000; # milliseconds use constant MAX_INFLIGHT => (($^O eq 'linux' ? 4096 : POSIX::_POSIX_PIPE_BUF()) * 3) @@ -132,6 +134,8 @@ sub _bidi_pipe { $self->{$in} = $in_r; } +sub poll_in ($) { IO::Poll::_poll($RDTIMEO, fileno($_[0]), my $ev = POLLIN) } + sub my_read ($$$) { my ($fh, $rbuf, $len) = @_; my $left = $len - length($$rbuf); @@ -140,9 +144,12 @@ sub my_read ($$$) { $r = sysread($fh, $$rbuf, $PIPE_BUFSIZ, length($$rbuf)); if ($r) { $left -= $r; + } elsif (defined($r)) { # EOF + return 0; } else { - next if (!defined($r) && $! == EINTR); - return $r; + next if ($! == EAGAIN and poll_in($fh)); + next if $! == EINTR; # may be set by sysread or poll_in + return; # unrecoverable error } } \substr($$rbuf, 0, $len, ''); @@ -154,9 +161,15 @@ sub my_readline ($$) { if ((my $n = index($$rbuf, "\n")) >= 0) { return substr($$rbuf, 0, $n + 1, ''); } - my $r = sysread($fh, $$rbuf, $PIPE_BUFSIZ, length($$rbuf)); - next if $r || (!defined($r) && $! == EINTR); - return defined($r) ? '' : undef; # EOF or error + my $r = sysread($fh, $$rbuf, $PIPE_BUFSIZ, length($$rbuf)) + and next; + + # return whatever's left on EOF + return substr($$rbuf, 0, length($$rbuf)+1, '') if defined($r); + + next if ($! == EAGAIN and poll_in($fh)); + next if $! == EINTR; # may be set by sysread or poll_in + return; # unrecoverable error } } @@ -204,7 +217,8 @@ sub cat_async_step ($$) { $type = 'missing'; $oid = ref($req) ? $$req : $req if $oid eq ''; } else { - $self->fail("Unexpected result from async git cat-file: $head"); + my $err = $! ? " ($!)" : ''; + $self->fail("bad result from async cat-file: $head$err"); } $self->{cat_rbuf} = $rbuf if $$rbuf ne ''; eval { $cb->($bref, $oid, $type, $size, $arg) }; diff --git a/lib/PublicInbox/GitAsyncCat.pm b/lib/PublicInbox/GitAsyncCat.pm index be51f673..dc97af16 100644 --- a/lib/PublicInbox/GitAsyncCat.pm +++ b/lib/PublicInbox/GitAsyncCat.pm @@ -3,11 +3,6 @@ # # internal class used by PublicInbox::Git + PublicInbox::DS # This parses the output pipe of "git cat-file --batch" -# -# Note: this does NOT set the non-blocking flag, we expect `git cat-file' -# to be a local process, and git won't start writing a blob until it's -# fully read. So minimize context switching and read as much as possible -# and avoid holding a buffer in our heap any longer than it has to live. package PublicInbox::GitAsyncCat; use strict; use parent qw(PublicInbox::DS Exporter); @@ -69,6 +64,7 @@ sub git_async_cat ($$$$) { $gitish->{async_cat} //= do { # read-only end of pipe (Gcf2Client is write-only end) my $self = bless { gitish => $gitish }, __PACKAGE__; + $gitish->{in}->blocking(0); $self->SUPER::new($gitish->{in}, EPOLLIN|EPOLLET); \undef; # this is a true ref() };
Some of these changes may not be strictly necessary, but it makes code easier to maintain and change. Hackers using/modifying this code will no longer wonder if a particular callsite needs to care about subclasses or not. --- lib/PublicInbox/Git.pm | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/lib/PublicInbox/Git.pm b/lib/PublicInbox/Git.pm index d53427d7..08406925 100644 --- a/lib/PublicInbox/Git.pm +++ b/lib/PublicInbox/Git.pm @@ -96,9 +96,9 @@ sub alternates_changed { sub last_check_err { my ($self) = @_; my $fh = $self->{err_c} or return; - sysseek($fh, 0, 0) or fail($self, "sysseek failed: $!"); + sysseek($fh, 0, 0) or $self->fail("sysseek failed: $!"); defined(sysread($fh, my $buf, -s $fh)) or - fail($self, "sysread failed: $!"); + $self->fail("sysread failed: $!"); $buf; } @@ -107,19 +107,19 @@ sub _bidi_pipe { if ($self->{$pid}) { if (defined $err) { # "err_c" my $fh = $self->{$err}; - sysseek($fh, 0, 0) or fail($self, "sysseek failed: $!"); - truncate($fh, 0) or fail($self, "truncate failed: $!"); + sysseek($fh, 0, 0) or $self->fail("sysseek failed: $!"); + truncate($fh, 0) or $self->fail("truncate failed: $!"); } return; } my ($out_r, $out_w); - pipe($out_r, $out_w) or fail($self, "pipe failed: $!"); + pipe($out_r, $out_w) or $self->fail("pipe failed: $!"); my @cmd = (qw(git), "--git-dir=$self->{git_dir}", qw(-c core.abbrev=40 cat-file), $batch); my $redir = { 0 => $out_r }; if ($err) { my $id = "git.$self->{git_dir}$batch.err"; - my $fh = tmpfile($id) or fail($self, "tmpfile($id): $!"); + my $fh = tmpfile($id) or $self->fail("tmpfile($id): $!"); $self->{$err} = $fh; $redir->{2} = $fh; } @@ -187,7 +187,7 @@ sub cat_async_retry ($$$$$) { for (my $i = 0; $i < @$inflight; $i += 3) { $buf .= "$inflight->[$i]\n"; } - print { $self->{out} } $buf or fail($self, "write error: $!"); + print { $self->{out} } $buf or $self->fail("write error: $!"); unshift(@$inflight, \$req, $cb, $arg); # \$ref to indicate retried cat_async_step($self, $inflight); # take one step @@ -265,7 +265,7 @@ sub check_async_step ($$) { # https://public-inbox.org/git/20190118033845.s2vlrb3wd3m2jfzu@dcvr/T/ if ($hex eq 'dangling' || $hex eq 'notdir' || $hex eq 'loop') { my $ret = my_read($self->{in_c}, $rbuf, $type + 1); - fail($self, defined($ret) ? 'read EOF' : "read: $!") if !$ret; + $self->fail(defined($ret) ? 'read EOF' : "read: $!") if !$ret; } $self->{chk_rbuf} = $rbuf if $$rbuf ne ''; eval { $cb->($hex, $type, $size, $arg, $self) }; @@ -294,7 +294,7 @@ sub check_async ($$$$) { while (scalar(@$inflight_c) >= MAX_INFLIGHT) { check_async_step($self, $inflight_c); } - print { $self->{out_c} } $oid, "\n" or fail($self, "write error: $!"); + print { $self->{out_c} } $oid, "\n" or $self->fail("write error: $!"); push(@$inflight_c, $oid, $cb, $arg); } @@ -347,7 +347,7 @@ sub cat_async_abort ($) { cleanup($self); } -sub fail { +sub fail { # may be augmented in subclasses my ($self, $msg) = @_; cat_async_abort($self); croak(ref($self) . ' ' . ($self->{git_dir} // '') . ": $msg"); @@ -449,7 +449,7 @@ sub cat_async ($$$;$) { while (scalar(@$inflight) >= MAX_INFLIGHT) { cat_async_step($self, $inflight); } - print { $self->{out} } $oid, "\n" or fail($self, "write error: $!"); + print { $self->{out} } $oid, "\n" or $self->fail("write error: $!"); push(@$inflight, $oid, $cb, $arg); } @@ -460,7 +460,7 @@ sub async_prefetch { # but lets not allow one client to monopolize a git process if (scalar(@$inflight) < int(MAX_INFLIGHT/2)) { print { $self->{out} } $oid, "\n" or - fail($self, "write error: $!"); + $self->fail("write error: $!"); return push(@$inflight, $oid, $cb, $arg); } }