From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-Status: No, score=-4.0 required=3.0 tests=ALL_TRUSTED,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id EAF311FA0B for ; Wed, 10 Jun 2020 07:06:23 +0000 (UTC) From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 25/82] imap: use git-cat-file asynchronously Date: Wed, 10 Jun 2020 07:04:22 +0000 Message-Id: <20200610070519.18252-26-e@yhbt.net> In-Reply-To: <20200610070519.18252-1-e@yhbt.net> References: <20200610070519.18252-1-e@yhbt.net> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: This ought to improve overall performance with multiple clients. Single client performance suffers a tiny bit due to extra syscall overhead from epoll. This also makes the existing async interface easier-to-use, since calling cat_async_begin is no longer required. --- MANIFEST | 1 + lib/PublicInbox/Git.pm | 10 +++---- lib/PublicInbox/GitAsyncCat.pm | 49 ++++++++++++++++++++++++++++++++++ lib/PublicInbox/IMAP.pm | 49 ++++++++++++++++++---------------- lib/PublicInbox/Inbox.pm | 16 ++++++++--- t/git.t | 1 - xt/cmp-msgstr.t | 1 - xt/cmp-msgview.t | 1 - xt/eml_check_limits.t | 1 - xt/git_async_cmp.t | 1 - xt/msgtime_cmp.t | 1 - xt/perf-msgview.t | 1 - 12 files changed, 93 insertions(+), 39 deletions(-) create mode 100644 lib/PublicInbox/GitAsyncCat.pm diff --git a/MANIFEST b/MANIFEST index 2e4ec915412..82f047f3623 100644 --- a/MANIFEST +++ b/MANIFEST @@ -119,6 +119,7 @@ lib/PublicInbox/Filter/SubjectTag.pm lib/PublicInbox/Filter/Vger.pm lib/PublicInbox/GetlineBody.pm lib/PublicInbox/Git.pm +lib/PublicInbox/GitAsyncCat.pm lib/PublicInbox/GitHTTPBackend.pm lib/PublicInbox/GzipFilter.pm lib/PublicInbox/HTTP.pm diff --git a/lib/PublicInbox/Git.pm b/lib/PublicInbox/Git.pm index 54c163e8c2f..c5a3fa4642c 100644 --- a/lib/PublicInbox/Git.pm +++ b/lib/PublicInbox/Git.pm @@ -156,7 +156,7 @@ sub my_readline ($$) { } } -sub _cat_async_step ($$) { +sub cat_async_step ($$) { my ($self, $inflight) = @_; die 'BUG: inflight empty or odd' if scalar(@$inflight) < 2; my ($cb, $arg) = splice(@$inflight, 0, 2); @@ -178,7 +178,7 @@ sub cat_async_wait ($) { my ($self) = @_; my $inflight = delete $self->{inflight} or return; while (scalar(@$inflight)) { - _cat_async_step($self, $inflight); + cat_async_step($self, $inflight); } } @@ -277,6 +277,7 @@ sub qx { # returns true if there are pending "git cat-file" processes sub cleanup { my ($self) = @_; + cat_async_wait($self); _destroy($self, qw(--batch in out pid)); _destroy($self, qw(--batch-check in_c out_c pid_c err_c)); !!($self->{pid} || $self->{pid_c}); @@ -334,9 +335,9 @@ sub cat_async_begin { sub cat_async ($$$;$) { my ($self, $oid, $cb, $arg) = @_; - my $inflight = $self->{inflight} or die 'BUG: not in async'; + my $inflight = $self->{inflight} // cat_async_begin($self); if (scalar(@$inflight) >= MAX_INFLIGHT) { - _cat_async_step($self, $inflight); + cat_async_step($self, $inflight); } print { $self->{out} } $oid, "\n" or fail($self, "write error: $!"); @@ -358,7 +359,6 @@ sub modified ($) { my ($self) = @_; my $modified = 0; my $fh = popen($self, qw(rev-parse --branches)); - cat_async_begin($self); local $/ = "\n"; while (my $oid = <$fh>) { chomp $oid; diff --git a/lib/PublicInbox/GitAsyncCat.pm b/lib/PublicInbox/GitAsyncCat.pm new file mode 100644 index 00000000000..f168169f68d --- /dev/null +++ b/lib/PublicInbox/GitAsyncCat.pm @@ -0,0 +1,49 @@ +# Copyright (C) 2020 all contributors +# License: AGPL-3.0+ +# +# internal class used by PublicInbox::Git + Danga::Socket +# This parses the output pipe of "git cat-file --batch" +# +# Note: this does NOT set the non-blocking flag, we expect `git cat-file' +# to be a local process, and git won't start writing a blob until it's +# fully read. So minimize context switching and read as much as possible +# and avoid holding a buffer in our heap any longer than it has to live. +package PublicInbox::GitAsyncCat; +use strict; +use parent qw(PublicInbox::DS Exporter); +use fields qw(git); +use PublicInbox::Syscall qw(EPOLLIN EPOLLET); +our @EXPORT = qw(git_async_msg); + +sub new { + my ($class, $git) = @_; + my $self = fields::new($class); + $git->batch_prepare; + $self->SUPER::new($git->{in}, EPOLLIN|EPOLLET); + $self->{git} = $git; + $self; +} + +sub event_step { + my ($self) = @_; + my $git = $self->{git} or return; # ->close-ed + my $inflight = $git->{inflight}; + if (@$inflight) { + $git->cat_async_step($inflight); + $self->requeue if @$inflight || length(${$git->{'--batch'}}); + } +} + +sub close { + my ($self) = @_; + delete $self->{git}; + $self->SUPER::close; # PublicInbox::DS::close +} + +sub git_async_msg ($$$$) { + my ($ibx, $smsg, $cb, $arg) = @_; + $ibx->git->cat_async($smsg->{blob}, $cb, $arg); + $ibx->{async_cat} //= new(__PACKAGE__, $ibx->{git}); +} + +1; diff --git a/lib/PublicInbox/IMAP.pm b/lib/PublicInbox/IMAP.pm index 39667199080..2282e3ce78c 100644 --- a/lib/PublicInbox/IMAP.pm +++ b/lib/PublicInbox/IMAP.pm @@ -21,8 +21,10 @@ use PublicInbox::Eml; use PublicInbox::EmlContentFoo qw(parse_content_disposition); use PublicInbox::DS qw(now); use PublicInbox::Syscall qw(EPOLLIN EPOLLONESHOT); +use PublicInbox::GitAsyncCat; use Text::ParseWords qw(parse_line); use Errno qw(EAGAIN); + my $Address; for my $mod (qw(Email::Address::XS Mail::Address)) { eval "require $mod" or next; @@ -367,14 +369,30 @@ EOF \$ret; } -sub uid_fetch_cb { # called by git->cat_async +sub requeue_once ($) { + my ($self) = @_; + # COMPRESS users all share the same DEFLATE context. + # Flush it here to ensure clients don't see + # each other's data + $self->zflush; + + # no recursion, schedule another call ASAP, + # but only after all pending writes are done. + # autovivify wbuf: + my $new_size = push(@{$self->{wbuf}}, \&long_step); + + # wbuf may be populated by $cb, no need to rearm if so: + $self->requeue if $new_size == 1; +} + +sub uid_fetch_cb { # called by git->cat_async via git_async_msg my ($bref, $oid, $type, $size, $fetch_m_arg) = @_; my ($self, undef, $ibx, undef, undef, $msgs, $want) = @$fetch_m_arg; my $smsg = shift @$msgs or die 'BUG: no smsg'; if (!defined($oid)) { # it's possible to have TOCTOU if an admin runs # public-inbox-(edit|purge), just move onto the next message - return unless defined $want->{-seqno}; + return requeue_once($self) unless defined $want->{-seqno}; $bref = dummy_message($smsg->{num}, $ibx); } else { $smsg->{blob} eq $oid or die "BUG: $smsg->{blob} != $oid"; @@ -420,6 +438,7 @@ sub uid_fetch_cb { # called by git->cat_async partial_emit($self, $partial, $eml); } $self->msg_more(")\r\n"); + requeue_once($self); } sub uid_fetch_m { # long_response @@ -432,11 +451,7 @@ sub uid_fetch_m { # long_response } $$beg = $msgs->[-1]->{num} + 1; } - my $git = $ibx->git; - $git->cat_async_begin; # TODO: actually make async - $git->cat_async($msgs->[0]->{blob}, \&uid_fetch_cb, \@_); - $git->cat_async_wait; - 1; + git_async_msg($ibx, $msgs->[0], \&uid_fetch_cb, \@_); } sub cmd_status ($$$;@) { @@ -677,20 +692,17 @@ sub seq_fetch_m { # long_response my $seq = $want->{-seqno}++; my $cur_num = $msgs->[0]->{num}; if ($cur_num == $seq) { # as expected - my $git = $ibx->git; - $git->cat_async_begin; # TODO: actually make async - $git->cat_async($msgs->[0]->{blob}, \&uid_fetch_cb, \@_); - $git->cat_async_wait; + git_async_msg($ibx, $msgs->[0], \&uid_fetch_cb, \@_); } elsif ($cur_num > $seq) { # send dummy messages until $seq catches up to $cur_num my $smsg = bless { num => $seq, ts => 0 }, 'PublicInbox::Smsg'; unshift @$msgs, $smsg; my $bref = dummy_message($seq, $ibx); uid_fetch_cb($bref, undef, undef, undef, \@_); + $smsg; # blessed response since uid_fetch_cb requeues } else { # should not happen die "BUG: cur_num=$cur_num < seq=$seq"; } - 1; # more messages on the way } sub cmd_fetch ($$$;@) { @@ -810,17 +822,8 @@ sub long_step { } elsif ($more) { # $self->{wbuf}: $self->update_idle_time; - # COMPRESS users all share the same DEFLATE context. - # Flush it here to ensure clients don't see - # each other's data - $self->zflush; - - # no recursion, schedule another call ASAP, but only after - # all pending writes are done. autovivify wbuf: - my $new_size = push(@{$self->{wbuf}}, \&long_step); - - # wbuf may be populated by $cb, no need to rearm if so: - $self->requeue if $new_size == 1; + # control passed to $more may be a GitAsyncCat object + requeue_once($self) if !ref($more); } else { # all done! delete $self->{long_cb}; my $elapsed = now() - $t0; diff --git a/lib/PublicInbox/Inbox.pm b/lib/PublicInbox/Inbox.pm index b250bef33d3..407751c3063 100644 --- a/lib/PublicInbox/Inbox.pm +++ b/lib/PublicInbox/Inbox.pm @@ -18,6 +18,16 @@ my $cleanup_timer; my $cleanup_avail = -1; # 0, or 1 my $have_devel_peek; my $CLEANUP = {}; # string(inbox) -> inbox + +sub git_cleanup ($) { + my ($self) = @_; + my $git = $self->{git} or return; + if (my $async_cat = delete $self->{async_cat}) { + $async_cat->close; + } + $git->cleanup; +} + sub cleanup_task () { $cleanup_timer = undef; my $next = {}; @@ -32,9 +42,7 @@ sub cleanup_task () { # refcnt is zero when tmp is out-of-scope } } - if (my $git = $ibx->{git}) { - $again = $git->cleanup; - } + git_cleanup($ibx); if (my $gits = $ibx->{-repo_objs}) { foreach my $git (@$gits) { $again = 1 if $git->cleanup; @@ -157,7 +165,7 @@ sub max_git_epoch { my $cur = $self->{-max_git_epoch}; my $changed = git($self)->alternates_changed; if (!defined($cur) || $changed) { - $self->git->cleanup if $changed; + git_cleanup($self) if $changed; my $gits = "$self->{inboxdir}/git"; if (opendir my $dh, $gits) { my $max = -1; diff --git a/t/git.t b/t/git.t index b05ac123bec..0b2089ba701 100644 --- a/t/git.t +++ b/t/git.t @@ -40,7 +40,6 @@ use_ok 'PublicInbox::Git'; my $arg = { 'foo' => 'bar' }; my $res = []; my $missing = []; - $gcf->cat_async_begin; $gcf->cat_async($oid, sub { my ($bref, $oid_hex, $type, $size, $arg) = @_; $res = [ @_ ]; diff --git a/xt/cmp-msgstr.t b/xt/cmp-msgstr.t index 6bae0f66e83..0276f8454b0 100644 --- a/xt/cmp-msgstr.t +++ b/xt/cmp-msgstr.t @@ -93,7 +93,6 @@ my $git_cb = sub { diag xqx([qw(git diff), "$tmpdir/mime", "$tmpdir/eml"]); } }; -$git->cat_async_begin; my $t = timeit(1, sub { while (<$fh>) { my ($oid, $type) = split / /; diff --git a/xt/cmp-msgview.t b/xt/cmp-msgview.t index 66fb467eb1a..5bd7aa174df 100644 --- a/xt/cmp-msgview.t +++ b/xt/cmp-msgview.t @@ -77,7 +77,6 @@ my $git_cb = sub { is_deeply($eml_cmp, $mime_cmp, "$inboxdir $oid match"); } }; -$git->cat_async_begin; my $t = timeit(1, sub { while (<$fh>) { my ($oid, $type) = split / /; diff --git a/xt/eml_check_limits.t b/xt/eml_check_limits.t index 39de047645b..2d632799956 100644 --- a/xt/eml_check_limits.t +++ b/xt/eml_check_limits.t @@ -55,7 +55,6 @@ my $cat_cb = sub { }; my $t = timeit(1, sub { - $git->cat_async_begin; my ($blob, $type); while (<$fh>) { ($blob, $type) = split / /; diff --git a/xt/git_async_cmp.t b/xt/git_async_cmp.t index 46d27b26bed..8f8d1cf4d01 100644 --- a/xt/git_async_cmp.t +++ b/xt/git_async_cmp.t @@ -25,7 +25,6 @@ my $async = timeit($nr, sub { $dig->add($$bref); }; my $cat = $git->popen(@cat); - $git->cat_async_begin; while (<$cat>) { my ($oid, undef, undef) = split(/ /); diff --git a/xt/msgtime_cmp.t b/xt/msgtime_cmp.t index 95d7c64bec6..0ce3c042217 100644 --- a/xt/msgtime_cmp.t +++ b/xt/msgtime_cmp.t @@ -59,7 +59,6 @@ sub compare { } my $fh = $git->popen(@cat); -$git->cat_async_begin; while (<$fh>) { my ($oid, $type) = split / /; next if $type ne 'blob'; diff --git a/xt/perf-msgview.t b/xt/perf-msgview.t index 30fc07dcbc8..d99101a30de 100644 --- a/xt/perf-msgview.t +++ b/xt/perf-msgview.t @@ -44,7 +44,6 @@ my $cb = sub { $obuf = ''; }; -$git->cat_async_begin; my $t = timeit(1, sub { $ctx->{obuf} = \$obuf; $ctx->{mhref} = '../';