From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: X-Spam-Status: No, score=-4.0 required=3.0 tests=ALL_TRUSTED,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id CCEA91F5AE for ; Wed, 9 Jun 2021 23:27:50 +0000 (UTC) From: Eric Wong To: meta@public-inbox.org Subject: [PATCH] lei tag: less confusing warning about unimported messages Date: Wed, 9 Jun 2021 20:27:50 -0300 Message-Id: <20210609232750.10384-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: "unimported" is more meaningful than "missing", here. And instead of having every worker spew about unimported messages, we'll accumulate and only print one warning line. This necessitated alterating ->DESTROY behavior and persisting the client socket within the $lei object itself, not just the PktOp consumer object. --- lib/PublicInbox/LEI.pm | 21 ++++++++++++++++----- lib/PublicInbox/LeiTag.pm | 12 ++++++------ 2 files changed, 22 insertions(+), 11 deletions(-) diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index beeb8b48..d34997fd 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -567,6 +567,11 @@ sub pkt_op_pair { $end; } +sub incr { + my ($self, $field, $nr) = @_; + $self->{counters}->{$field} += $nr; +} + sub workers_start { my ($lei, $wq, $jobs, $ops, $flds) = @_; $ops = { @@ -574,6 +579,7 @@ sub workers_start { '|' => [ \&sigpipe_handler, $lei ], 'x_it' => [ \&x_it, $lei ], 'child_error' => [ \&child_error, $lei ], + 'incr' => [ \&incr, $lei ], ($ops ? %$ops : ()), }; $ops->{''} //= [ $wq->can('_lei_wq_eof') || \&wq_eof, $lei ]; @@ -583,8 +589,6 @@ sub workers_start { $wq->wq_workers_start($ident, $jobs, $lei->oldset, $flds); delete $lei->{pkt_op_p}; my $op_c = delete $lei->{pkt_op_c}; - # {-lei_sock} persists script/lei process until ops->{''} EOF callback - $op_c->{-lei_sock} = $lei->{sock}; @$end = (); $lei->event_step_init; ($op_c, $ops); @@ -1092,10 +1096,11 @@ sub event_step { sub event_step_init { my ($self) = @_; - return if $self->{-event_init_done}++; - if (my $sock = $self->{sock}) { # using DS->EventLoop + my $sock = $self->{sock} or return; + $self->{-event_init_done} //= do { # persist til $ops done $self->SUPER::new($sock, EPOLLIN|EPOLLET); - } + $sock; + }; } sub noop {} @@ -1246,6 +1251,12 @@ sub busy { 1 } # prevent daemon-shutdown if client is connected # can immediately reread it sub DESTROY { my ($self) = @_; + if (my $counters = delete $self->{counters}) { + for my $k (sort keys %$counters) { + my $nr = $counters->{$k}; + $self->child_error(1 << 8, "$nr $k messages"); + } + } $self->{1}->autoflush(1) if $self->{1}; stop_pager($self); # preserve $? for ->fail or ->x_it code diff --git a/lib/PublicInbox/LeiTag.pm b/lib/PublicInbox/LeiTag.pm index e0532653..463fb921 100644 --- a/lib/PublicInbox/LeiTag.pm +++ b/lib/PublicInbox/LeiTag.pm @@ -15,7 +15,7 @@ sub input_eml_cb { # used by PublicInbox::LeiInput::input_fh $self->{lei}->{sto}->ipc_do('update_xvmd', $xoids, $eml, $self->{vmd_mod}); } else { - ++$self->{missing}; + ++$self->{unimported}; } } @@ -40,7 +40,7 @@ sub lei_tag { # the "lei tag" method my ($lei, @argv) = @_; my $sto = $lei->_lei_store(1); $sto->write_prepare($lei); - my $self = bless { missing => 0 }, __PACKAGE__; + my $self = bless {}, __PACKAGE__; $lei->ale; # refresh and prepare my $vmd_mod = $self->vmd_mod_extract(\@argv); return $lei->fail(join("\n", @{$vmd_mod->{err}})) if $vmd_mod->{err}; @@ -58,10 +58,10 @@ sub lei_tag { # the "lei tag" method $lei->wait_wq_events($op_c, $ops); } -sub note_missing { +sub note_unimported { my ($self) = @_; - my $n = $self->{missing} or return; - $self->{lei}->child_error(1 << 8, "$n missed messages"); + my $n = $self->{unimported} or return; + $self->{lei}->{pkt_op_p}->pkt_do('incr', 'unimported', $n); } sub ipc_atfork_child { @@ -69,7 +69,7 @@ sub ipc_atfork_child { PublicInbox::LeiInput::input_only_atfork_child($self); $self->{lse} = $self->{lei}->{sto}->search; # this goes out-of-scope at worker process exit: - PublicInbox::OnDestroy->new($$, \¬e_missing, $self); + PublicInbox::OnDestroy->new($$, \¬e_unimported, $self); } # Workaround bash word-splitting s to ['kw', ':', 'keyword' ...]