From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: X-Spam-Status: No, score=-4.0 required=3.0 tests=ALL_TRUSTED,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id D16E81F5AE for ; Thu, 3 Jun 2021 09:52:06 +0000 (UTC) From: Eric Wong To: meta@public-inbox.org Subject: [PATCH] pkt_op: make pkt_do an OO method Date: Thu, 3 Jun 2021 09:52:06 +0000 Message-Id: <20210603095206.8786-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: This will make it easier to use for internal use such as managing Maildir and IMAP IDLE watches. --- lib/PublicInbox/LEI.pm | 23 ++++++++++++----------- lib/PublicInbox/LeiAuth.pm | 5 ++--- lib/PublicInbox/LeiToMail.pm | 8 +++----- lib/PublicInbox/LeiXSearch.pm | 3 +-- lib/PublicInbox/PktOp.pm | 11 +++++------ lib/PublicInbox/V2Writable.pm | 3 ++- 6 files changed, 25 insertions(+), 28 deletions(-) diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index 7bda9408..2dd21fc6 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -441,8 +441,10 @@ sub x_it ($$) { # make sure client sees stdout before exit $self->{1}->autoflush(1) if $self->{1}; dump_and_clear_log(); - if (my $s = $self->{pkt_op_p} // $self->{sock}) { - send($s, "x_it $code", MSG_EOR); + if ($self->{pkt_op_p}) { # to top lei-daemon + $self->{pkt_op_p}->pkt_do('x_it', $code); + } elsif ($self->{sock}) { # to lei(1) client + send($self->{sock}, "x_it $code", MSG_EOR); } # else ignore if client disconnected } @@ -480,7 +482,7 @@ sub fail ($$;$) { my ($self, $buf, $exit_code) = @_; err($self, $buf) if defined $buf; # calls fail_handler: - send($self->{pkt_op_p}, '!', MSG_EOR) if $self->{pkt_op_p}; + $self->{pkt_op_p}->pkt_do('!') if $self->{pkt_op_p}; x_it($self, ($exit_code // 1) << 8); undef; } @@ -499,18 +501,17 @@ sub puts ($;@) { out(shift, map { "$_\n" } @_) } sub child_error { # passes non-fatal curl exit codes to user my ($self, $child_error, $msg) = @_; # child_error is $? $self->err($msg) if $msg; - if (my $s = $self->{pkt_op_p} // $self->{sock}) { - # send to the parent lei-daemon or to lei(1) client - send($s, "child_error $child_error", MSG_EOR); - } elsif (!$PublicInbox::DS::in_loop) { - $self->{child_error} = $child_error; + if ($self->{pkt_op_p}) { # to top lei-daemon + $self->{pkt_op_p}->pkt_do('child_error', $child_error); + } elsif ($self->{sock}) { # to lei(1) client + send($self->{sock}, "child_error $child_error", MSG_EOR); } # else noop if client disconnected } sub note_sigpipe { # triggers sigpipe_handler my ($self, $fd) = @_; close(delete($self->{$fd})); # explicit close silences Perl warning - send($self->{pkt_op_p}, '|', MSG_EOR) if $self->{pkt_op_p}; + $self->{pkt_op_p}->pkt_do('|') if $self->{pkt_op_p}; x_it($self, 13); } @@ -550,8 +551,8 @@ sub _delete_pkt_op { # OnDestroy callback to prevent leaks on die if (my $op = delete $self->{pkt_op_c}) { # in case of die $op->close; # PublicInbox::PktOp::close } - my $unclosed_after_die = delete($self->{pkt_op_p}) or return; - close $unclosed_after_die; + my $pkt_op_p = delete($self->{pkt_op_p}) or return; + close $pkt_op_p->{op_p}; } sub pkt_op_pair { diff --git a/lib/PublicInbox/LeiAuth.pm b/lib/PublicInbox/LeiAuth.pm index 877ae6a3..bbd713f0 100644 --- a/lib/PublicInbox/LeiAuth.pm +++ b/lib/PublicInbox/LeiAuth.pm @@ -6,7 +6,6 @@ package PublicInbox::LeiAuth; use strict; use v5.10.1; -use PublicInbox::PktOp qw(pkt_do); sub do_auth_atfork { # used by IPC WQ workers my ($self, $wq) = @_; @@ -16,7 +15,7 @@ sub do_auth_atfork { # used by IPC WQ workers eval { my $mics = $net->imap_common_init($lei); my $nn = $net->nntp_common_init($lei); - pkt_do($lei->{pkt_op_p}, 'net_merge_continue', $net) or + $lei->{pkt_op_p}->pkt_do('net_merge_continue', $net) or die "pkt_do net_merge_continue: $!"; $net->{mics_cached} = $mics if $mics; $net->{nn_cached} = $nn if $nn; @@ -34,7 +33,7 @@ sub net_merge_all { # called in wq worker via wq_broadcast my ($wq, $net_new) = @_; my $net = $wq->{lei}->{net}; %$net = (%$net, %$net_new); - pkt_do($wq->{lei}->{pkt_op_p}, 'net_merge_done1') or + $wq->{lei}->{pkt_op_p}->pkt_do('net_merge_done1') or die "pkt_op_do net_merge_done1: $!"; } diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm index 5b5caee7..b9405c0c 100644 --- a/lib/PublicInbox/LeiToMail.pm +++ b/lib/PublicInbox/LeiToMail.pm @@ -9,7 +9,6 @@ use parent qw(PublicInbox::IPC); use PublicInbox::Eml; use PublicInbox::ProcessPipe; use PublicInbox::Spawn qw(spawn); -use PublicInbox::PktOp qw(pkt_do); use Symbol qw(gensym); use IO::Handle; # ->autoflush use Fcntl qw(SEEK_SET SEEK_END O_CREAT O_EXCL O_WRONLY); @@ -679,7 +678,7 @@ sub do_post_auth { my ($self) = @_; my $lei = $self->{lei}; # lei_xsearch can start as soon as all l2m workers get here - pkt_do($lei->{pkt_op_p}, 'incr_start_query') or + $lei->{pkt_op_p}->pkt_do('incr_start_query') or die "incr_start_query: $!"; my $aug; if (lock_free($self)) { # all workers do_augment @@ -698,7 +697,7 @@ sub do_post_auth { local $0 = 'do_augment'; eval { do_augment($self, $lei) }; $lei->fail($@) if $@; - pkt_do($lei->{pkt_op_p}, $aug) == 1 or + $lei->{pkt_op_p}->pkt_do($aug) == 1 or die "do_post_augment trigger: $!"; } # done augmenting, connect the compressor pipe for each worker @@ -758,8 +757,7 @@ sub wq_atexit_child { $lei->{ale}->git->async_wait_all; my $nr = delete($lei->{-nr_write}) or return; return if $lei->{early_mua} || !$lei->{-progress} || !$lei->{pkt_op_p}; - require PublicInbox::PktOp; - PublicInbox::PktOp::pkt_do($lei->{pkt_op_p}, 'l2m_progress', $nr); + $lei->{pkt_op_p}->pkt_do('l2m_progress', $nr); } # called in top-level lei-daemon when LeiAuth is done diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm index f7c1e559..75e55d47 100644 --- a/lib/PublicInbox/LeiXSearch.pm +++ b/lib/PublicInbox/LeiXSearch.pm @@ -9,7 +9,6 @@ use strict; use v5.10.1; use parent qw(PublicInbox::LeiSearch PublicInbox::IPC); use PublicInbox::DS qw(now); -use PublicInbox::PktOp qw(pkt_do); use File::Temp 0.19 (); # 0.19 for ->newdir use File::Spec (); use PublicInbox::Search qw(xap_terms); @@ -142,7 +141,7 @@ sub mset_progress { my $lei = shift; return if $lei->{early_mua} || !$lei->{-progress}; if ($lei->{pkt_op_p}) { - pkt_do($lei->{pkt_op_p}, 'mset_progress', @_); + $lei->{pkt_op_p}->pkt_do('mset_progress', @_); } else { # single lei-daemon consumer my ($desc, $mset_size, $mset_total_est) = @_; $lei->{-mset_total} += $mset_size if $mset_total_est ne '?'; diff --git a/lib/PublicInbox/PktOp.pm b/lib/PublicInbox/PktOp.pm index 639a4f62..ca098d3c 100644 --- a/lib/PublicInbox/PktOp.pm +++ b/lib/PublicInbox/PktOp.pm @@ -8,12 +8,11 @@ package PublicInbox::PktOp; use strict; use v5.10.1; -use parent qw(PublicInbox::DS Exporter); +use parent qw(PublicInbox::DS); use Errno qw(EAGAIN EINTR); use PublicInbox::Syscall qw(EPOLLIN EPOLLET); use Socket qw(AF_UNIX MSG_EOR SOCK_SEQPACKET); use PublicInbox::IPC qw(ipc_freeze ipc_thaw); -our @EXPORT_OK = qw(pkt_do); sub new { my ($cls, $r) = @_; @@ -22,17 +21,17 @@ sub new { $self->SUPER::new($r, EPOLLIN|EPOLLET); } -# returns a blessed object as the consumer, and a GLOB/IO for the producer +# returns a blessed objects as the consumer and producer sub pair { my ($cls) = @_; my ($c, $p); socketpair($c, $p, AF_UNIX, SOCK_SEQPACKET, 0) or die "socketpair: $!"; - (new($cls, $c), $p); + (new($cls, $c), bless { op_p => $p }, $cls); } sub pkt_do { # for the producer to trigger event_step in consumer - my ($producer, $cmd, @args) = @_; - send($producer, @args ? "$cmd\0".ipc_freeze(\@args) : $cmd, MSG_EOR); + my ($self, $cmd, @args) = @_; + send($self->{op_p}, @args ? "$cmd\0".ipc_freeze(\@args) : $cmd, MSG_EOR) } sub event_step { diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm index 573d9c6f..689d27c8 100644 --- a/lib/PublicInbox/V2Writable.pm +++ b/lib/PublicInbox/V2Writable.pm @@ -1435,7 +1435,8 @@ sub ipc_atfork_child { my ($self) = @_; if (my $lei = delete $self->{lei}) { $lei->_lei_atfork_child; - close(delete $lei->{pkt_op_p}); + my $pkt_op_p = delete $lei->{pkt_op_p}; + close($pkt_op_p->{op_p}); } $self->SUPER::ipc_atfork_child; }