From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: X-Spam-Status: No, score=-4.0 required=3.0 tests=ALL_TRUSTED,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 3C50A1FA19 for ; Thu, 14 Oct 2021 13:16:10 +0000 (UTC) From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 7/7] lei up --all: send signals to workers, receive errors Date: Thu, 14 Oct 2021 13:16:09 +0000 Message-Id: <20211014131609.829-8-e@80x24.org> In-Reply-To: <20211014131609.829-1-e@80x24.org> References: <20211014131609.829-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: The redispatch mechanism wasn't routing signals and messages between redispatched workers and script/lei properly. We now rely on PktOp to do bidirectional message forwarding and carefully avoiding circular references by using PktOp. --- lib/PublicInbox/LEI.pm | 7 ++++++- lib/PublicInbox/LeiUp.pm | 13 ++++++++++++- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index d0905562f616..b6338377328f 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -573,6 +573,7 @@ sub _lei_atfork_child { POSIX::setpgid(0, $$) // die "setpgid(0, $$): $!"; } close($_) for (grep(defined, delete @$self{qw(3 old_1 au_done)})); + delete $self->{-socks}; if (my $op_c = delete $self->{pkt_op_c}) { close(delete $op_c->{sock}); } @@ -1144,7 +1145,9 @@ sub event_step { if ($buf eq '') { _drop_wq($self); # EOF, client disconnected dclose($self); - } elsif ($buf =~ /\A(?:STOP|CONT)\z/) { + $buf = 'TERM'; + } + if ($buf =~ /\A(?:STOP|CONT|TERM)\z/) { my $sig = "-$buf"; for my $wq (grep(defined, @$self{@WQ_KEYS})) { $wq->wq_kill($sig) or $wq->wq_kill_old($sig); @@ -1152,6 +1155,8 @@ sub event_step { } else { die "unrecognized client signal: $buf"; } + my $s = $self->{-socks} // []; # lei up --all + @$s = grep { send($_, $buf, MSG_EOR) } @$s; }; if (my $err = $@) { eval { $self->fail($err) }; diff --git a/lib/PublicInbox/LeiUp.pm b/lib/PublicInbox/LeiUp.pm index 719736e8597e..df65cb9b8474 100644 --- a/lib/PublicInbox/LeiUp.pm +++ b/lib/PublicInbox/LeiUp.pm @@ -166,7 +166,15 @@ sub event_step { # runs via PublicInbox::DS::requeue push(@m, $o) if !@m || $m[-1] !~ s/\n\z/$o\n/; $cb->(@m); }; - $l->{-up1} = $self; + $l->{-up1} = $self; # for LeiUp1->DESTROY + delete @$l{qw(-socks -event_init_done)}; + my ($op_c, $op_p) = PublicInbox::PktOp->pair; + $self->{unref_on_destroy} = $op_c->{sock}; # to cleanup $lei->{-socks} + $lei->pkt_ops($op_c->{ops} //= {}); # errors from $l -> script/lei + push @{$lei->{-socks}}, $op_c->{sock}; # script/lei signals to $l + $l->{sock} = $op_p->{op_p}; # receive signals from op_c->{sock} + $op_c = $op_p = undef; + eval { $l->dispatch('up', $self->{out}) }; $lei->child_error(0, $@) if $@ || $l->{failed}; # lei->fail() } @@ -175,6 +183,9 @@ sub DESTROY { my ($self) = @_; my $lei = $self->{lei}; # the original, from lei_up return if $lei->{daemon_pid} != $$; + my $sock = delete $self->{unref_on_destroy}; + my $s = $lei->{-socks} // []; + @$s = grep { $_ != $sock } @$s; my $out = shift(@{$lei->{-upq}}) or return; PublicInbox::DS::requeue(nxt($lei, $out, $self->{op_p})); }