From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-Status: No, score=-3.9 required=3.0 tests=ALL_TRUSTED,AWL,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id BBC931F8C7 for ; Sun, 7 Feb 2021 08:52:01 +0000 (UTC) From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 03/19] lei add-external: handle interrupts with --mirror Date: Sun, 7 Feb 2021 08:51:45 +0000 Message-Id: <20210207085201.13871-4-e@80x24.org> In-Reply-To: <20210207085201.13871-1-e@80x24.org> References: <20210207085201.13871-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: This also updates lei_xsearch to follow the same pattern for stopping curl(1) and tail(1) processes it spawns. --- lib/PublicInbox/IPC.pm | 5 +-- lib/PublicInbox/LEI.pm | 6 ++++ lib/PublicInbox/LeiMirror.pm | 66 +++++++++++++++++++++++------------ lib/PublicInbox/LeiXSearch.pm | 21 +++++------ lib/PublicInbox/OnDestroy.pm | 2 +- t/lei-mirror.t | 12 +++++++ 6 files changed, 74 insertions(+), 38 deletions(-) diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm index 0dee2a92..b936c27a 100644 --- a/lib/PublicInbox/IPC.pm +++ b/lib/PublicInbox/IPC.pm @@ -150,9 +150,10 @@ sub ipc_worker_reap { # dwaitpid callback } sub wq_wait_old { - my ($self, $args) = @_; + my ($self, @args) = @_; + my $cb = ref($args[0]) eq 'CODE' ? shift(@args) : \&ipc_worker_reap; my $pids = delete $self->{"-wq_old_pids.$$"} or return; - dwaitpid($_, \&ipc_worker_reap, [$self, $args]) for @$pids; + dwaitpid($_, $cb, [$self, @args]) for @$pids; } # for base class, override in sub classes diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index 3098ade7..515bc2a3 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -370,6 +370,12 @@ sub sigpipe_handler { # handles SIGPIPE from @WQ_KEYS workers fail_handler($_[0], 13, delete $_[0]->{1}); } +# PublicInbox::OnDestroy callback for SIGINT to take out the entire pgid +sub sigint_reap { + my ($pgid) = @_; + dwaitpid($pgid) if kill('-INT', $pgid); +} + sub fail ($$;$) { my ($self, $buf, $exit_code) = @_; err($self, $buf) if defined $buf; diff --git a/lib/PublicInbox/LeiMirror.pm b/lib/PublicInbox/LeiMirror.pm index bb172e6a..13795a58 100644 --- a/lib/PublicInbox/LeiMirror.pm +++ b/lib/PublicInbox/LeiMirror.pm @@ -10,13 +10,19 @@ use IO::Uncompress::Gunzip qw(gunzip $GunzipError); use PublicInbox::Spawn qw(popen_rd spawn); use PublicInbox::PktOp; +sub do_finish_mirror { # dwaitpid callback + my ($arg, $pid) = @_; + my ($mrr, $lei) = @$arg; + if ($? == 0 && unlink("$mrr->{dst}/mirror.done")) { + $lei->add_external_finish($mrr->{dst}); + } + $lei->dclose; +} + sub mirror_done { # EOF callback for main daemon my ($lei) = @_; - my $mrr = delete $lei->{mrr}; - $mrr->wq_wait_old($lei) if $mrr; - # FIXME: check $? before finish - $lei->add_external_finish($mrr->{dst}); - $lei->dclose; + my $mrr = delete $lei->{mrr} or return; + $mrr->wq_wait_old(\&do_finish_mirror, $lei); } # for old installations without manifest.js.gz @@ -59,8 +65,9 @@ E: confused by scraping <$uri>, got ambiguous results: } sub clone_cmd { - my ($lei) = @_; + my ($lei, $opt) = @_; my @cmd = qw(git); + $opt->{$_} = $lei->{$_} for (0..2); # we support "-c $key=$val" for arbitrary git config options # e.g.: git -c http.proxy=socks5h://127.0.0.1:9050 push(@cmd, '-c', $_) for @{$lei->{opt}->{c} // []}; @@ -92,14 +99,12 @@ sub _try_config { my $f = "$ce-$$.tmp"; open(my $fh, '+>', $f) or return $lei->err("open $f: $! (non-fatal)"); my $opt = { 0 => $lei->{0}, 1 => $fh, 2 => $lei->{2} }; - $lei->qerr("# @$cmd"); - my $pid = spawn($cmd, $lei->{env}, $opt); - waitpid($pid, 0) == $pid or return $lei->err("waitpid @$cmd: $!"); - if (($? >> 8) == 22) { # 404 missing + my $cerr = run_reap($lei, $cmd, $opt) // return; + if (($cerr >> 8) == 22) { # 404 missing unlink($f) if -s $fh == 0; return; } - return $lei->err("# @$cmd failed (non-fatal)") if $?; + return $lei->err("# @$cmd failed (non-fatal)") if $cerr; rename($f, $ce) or return $lei->err("link($f, $ce): $! (non-fatal)"); my $cfg = PublicInbox::Config::git_config_dump($f); my $ibx = $self->{ibx} = {}; @@ -132,6 +137,18 @@ sub index_cloned_inbox { local %ENV = (%ENV, %$env) if $env; PublicInbox::Admin::progress_prepare($opt, $lei->{2}); PublicInbox::Admin::index_inbox($ibx, undef, $opt); + open my $x, '>', "$self->{dst}/mirror.done"; # for do_finish_mirror +} + +sub run_reap { + my ($lei, $cmd, $opt) = @_; + $lei->qerr("# @$cmd"); + $opt->{pgid} = 0; + my $pid = spawn($cmd, $lei->{env}, $opt); + my $reap = PublicInbox::OnDestroy->new($lei->can('sigint_reap'), $pid); + my $err = waitpid($pid, 0) == $pid ? undef : "waitpid @$cmd: $!"; + @$reap = (); # cancel reap + $err ? $lei->err($err) : $? } sub clone_v1 { @@ -140,11 +157,10 @@ sub clone_v1 { my $curl = $self->{curl} //= PublicInbox::LeiCurl->new($lei) or return; my $uri = URI->new($self->{src}); my $pfx = $curl->torsocks($lei, $uri) or return; - my $cmd = [ @$pfx, clone_cmd($lei), $uri->as_string, $self->{dst} ]; - $lei->qerr("# @$cmd"); - my $pid = spawn($cmd, $lei->{env}, $lei); - waitpid($pid, 0) == $pid or die "BUG: waitpid @$cmd: $!"; - $? == 0 or return $lei->child_error($?, "@$cmd failed"); + my $cmd = [ @$pfx, clone_cmd($lei, my $opt = {}), + $uri->as_string, $self->{dst} ]; + my $cerr = run_reap($lei, $cmd, $opt) // return; + return $lei->child_error($cerr, "@$cmd failed") if $cerr; _try_config($self); index_cloned_inbox($self, 1); } @@ -170,13 +186,11 @@ failed to extract epoch number from $src my $lk = bless { lock_path => "$dst/inbox.lock" }, 'PublicInbox::Lock'; _try_config($self); my $on_destroy = $lk->lock_for_scope($$); - my @cmd = clone_cmd($lei); + my @cmd = clone_cmd($lei, my $opt = {}); while (my $pair = shift(@src_edst)) { my $cmd = [ @$pfx, @cmd, @$pair ]; - $lei->qerr("# @$cmd"); - my $pid = spawn($cmd, $lei->{env}, $lei); - waitpid($pid, 0) == $pid or die "BUG: waitpid @$cmd: $!"; - $? == 0 or return $lei->child_error($?, "@$cmd failed"); + my $cerr = run_reap($lei, $cmd, $opt) // return; + return $lei->child_error($cerr, "@$cmd failed") if $cerr; } undef $on_destroy; # unlock index_cloned_inbox($self, 2); @@ -193,9 +207,14 @@ sub try_manifest { my $cmd = $curl->for_uri($lei, $uri); $lei->qerr("# @$cmd"); my $opt = { 0 => $lei->{0}, 2 => $lei->{2} }; - my $fh = popen_rd($cmd, $lei->{env}, $opt); + my ($fh, $pid) = popen_rd($cmd, $lei->{env}, $opt); + my $reap = PublicInbox::OnDestroy->new($lei->can('sigint_reap'), $pid); my $gz = do { local $/; <$fh> } // die "read(curl $uri): $!"; - unless (close $fh) { + close $fh; + my $err = waitpid($pid, 0) == $pid ? undef : "waitpid @$cmd: $!"; + @$reap = (); + return $lei->err($err) if $err; + if ($?) { return try_scrape($self) if ($? >> 8) == 22; # 404 missing return $lei->child_error($?, "@$cmd failed"); } @@ -282,6 +301,7 @@ sub start { sub ipc_atfork_child { my ($self) = @_; $self->{lei}->lei_atfork_child; + $SIG{TERM} = sub { exit(128 + 15) }; # trigger OnDestroy $reap $self->SUPER::ipc_atfork_child; } diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm index 1e5d7ca6..6a1b107b 100644 --- a/lib/PublicInbox/LeiXSearch.pm +++ b/lib/PublicInbox/LeiXSearch.pm @@ -197,13 +197,6 @@ sub each_eml { # callback for MboxReader->mboxrd $each_smsg->($smsg, undef, $eml); } -# PublicInbox::OnDestroy callback -sub kill_reap { - my ($pid) = @_; - kill('KILL', $pid); # spawn() blocks other signals - waitpid($pid, 0); -} - sub query_remote_mboxrd { my ($self, $uris) = @_; local $0 = "$0 query_remote_mboxrd"; @@ -213,18 +206,19 @@ sub query_remote_mboxrd { my @qform = (q => $lei->{mset_opt}->{qstr}, x => 'm'); push(@qform, t => 1) if $opt->{thread}; my $verbose = $opt->{verbose}; - my $reap; + my ($reap_tail, $reap_curl); my $cerr = File::Temp->new(TEMPLATE => 'curl.err-XXXX', TMPDIR => 1); fcntl($cerr, F_SETFL, O_APPEND|O_RDWR) or warn "set O_APPEND: $!"; - my $rdr = { 2 => $cerr }; + my $rdr = { 2 => $cerr, pgid => 0 }; my $coff = 0; + my $sigint_reap = $lei->can('sigint_reap'); if ($verbose) { # spawn a process to force line-buffering, otherwise curl # will write 1 character at-a-time and parallel outputs # mmmaaayyy llloookkk llliiikkkeee ttthhhiiisss - my $o = { 1 => $lei->{2}, 2 => $lei->{2} }; + my $o = { 1 => $lei->{2}, 2 => $lei->{2}, pgid => 0 }; my $pid = spawn(['tail', '-f', $cerr->filename], undef, $o); - $reap = PublicInbox::OnDestroy->new(\&kill_reap, $pid); + $reap_tail = PublicInbox::OnDestroy->new($sigint_reap, $pid); } my $curl = PublicInbox::LeiCurl->new($lei, $self->{curl}) or return; push @$curl, '-s', '-d', ''; @@ -236,10 +230,13 @@ sub query_remote_mboxrd { my $cmd = $curl->for_uri($lei, $uri); $lei->err("# @$cmd") if $verbose; my ($fh, $pid) = popen_rd($cmd, $env, $rdr); + $reap_curl = PublicInbox::OnDestroy->new($sigint_reap, $pid); $fh = IO::Uncompress::Gunzip->new($fh); PublicInbox::MboxReader->mboxrd($fh, \&each_eml, $self, $lei, $each_smsg); - waitpid($pid, 0) == $pid or die "BUG: waitpid (curl): $!"; + my $err = waitpid($pid, 0) == $pid ? undef : "BUG: waitpid: $!"; + @$reap_curl = (); # cancel OnDestroy + die $err if $err; if ($? == 0) { my $nr = $lei->{-nr_remote_eml}; mset_progress($lei, $lei->{-current_url}, $nr, $nr); diff --git a/lib/PublicInbox/OnDestroy.pm b/lib/PublicInbox/OnDestroy.pm index 0ae4c4c9..615bc450 100644 --- a/lib/PublicInbox/OnDestroy.pm +++ b/lib/PublicInbox/OnDestroy.pm @@ -10,7 +10,7 @@ sub new { sub DESTROY { my ($cb, @args) = @{$_[0]}; - if (!ref($cb)) { + if (!ref($cb) && $cb) { my $pid = $cb; return if $pid != $$; $cb = shift @args; diff --git a/t/lei-mirror.t b/t/lei-mirror.t index 6af49678..2373b370 100644 --- a/t/lei-mirror.t +++ b/t/lei-mirror.t @@ -13,15 +13,27 @@ test_lei({ tmpdir => $tmpdir }, sub { my $t1 = "$home/t1-mirror"; ok($lei->('add-external', $t1, '--mirror', "$http/t1/"), '--mirror v1'); ok(-f "$t1/public-inbox/msgmap.sqlite3", 't1-mirror indexed'); + + ok($lei->('ls-external'), 'ls-external'); + like($lei_out, qr!\Q$t1\E!, 't1 added to ls-externals'); + my $t2 = "$home/t2-mirror"; ok($lei->('add-external', $t2, '--mirror', "$http/t2/"), '--mirror v2'); ok(-f "$t2/msgmap.sqlite3", 't2-mirror indexed'); + ok($lei->('ls-external'), 'ls-external'); + like($lei_out, qr!\Q$t2\E!, 't2 added to ls-externals'); + ok(!$lei->('add-external', $t2, '--mirror', "$http/t2/"), '--mirror fails if reused'); + ok($lei->('ls-external'), 'ls-external'); + like($lei_out, qr!\Q$t2\E!, 'still in ls-externals'); + ok(!$lei->('add-external', "$t2-fail", '-Lmedium'), '--mirror v2'); ok(!-d "$t2-fail", 'destination not created on failure'); + ok($lei->('ls-external'), 'ls-external'); + unlike($lei_out, qr!\Q$t2-fail\E!, 'not added to ls-external'); }); ok($td->kill, 'killed -httpd');