From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.6 (2021-04-09) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: X-Spam-Status: No, score=-4.2 required=3.0 tests=ALL_TRUSTED,BAYES_00, DKIM_SIGNED,DKIM_VALID,DKIM_VALID_AU,DKIM_VALID_EF shortcircuit=no autolearn=ham autolearn_force=no version=3.4.6 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 03E531F487 for ; Wed, 5 Apr 2023 11:26:02 +0000 (UTC) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=80x24.org; s=selector1; t=1680693963; bh=/TXYfKrl0Gpzan79uzXaVwi+wPViCQKDvxl4yFX+4FY=; h=From:To:Subject:Date:In-Reply-To:References:From; b=IsPiQixxRfRqvXR5s0CrmSXcZargSW0iBGwnvWS8wkOe8nTQbdrCrdG2SQoMEvS12 Fk0eK8uV0CIVWLnVSS0+6EY/4MmiGsxvisMZJSCHUyojEowrOUkyf8i26HFwfrH1U9 qZEO9JSzfSFMNiYfJuZhFVFp/fuO2CN6N8zRgGHA= From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 4/7] cindex: share PktOp across indexing workers Date: Wed, 5 Apr 2023 11:26:55 +0000 Message-Id: <20230405112658.90216-5-e@80x24.org> In-Reply-To: <20230405112658.90216-1-e@80x24.org> References: <20230405112658.90216-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: Using fewer sockets simplifies completion checks, too. --- lib/PublicInbox/CodeSearchIdx.pm | 54 ++++++++++++++++---------------- 1 file changed, 27 insertions(+), 27 deletions(-) diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm index 14342683..05007afd 100644 --- a/lib/PublicInbox/CodeSearchIdx.pm +++ b/lib/PublicInbox/CodeSearchIdx.pm @@ -504,36 +504,35 @@ sub partition_refs ($$$) { } sub shard_commit { # via wq_io_do - my ($self, $n) = @_; + my ($self) = @_; my $op_p = delete($self->{0}) // die 'BUG: no {0} op_p'; $self->commit_txn_lazy; - send($op_p, "shard_done $n", MSG_EOR); + send($op_p, "shard_done $self->{shard}", MSG_EOR); } -sub consumers_open { # post_loop_do - my (undef, $consumers) = @_; - return if $DO_QUIT; - scalar(grep { $_->{sock} } values %$consumers); +sub consumer_open { # post_loop_do + my (undef, $c) = @_; # $c is PublicInbox::PktOp + $DO_QUIT ? undef : defined($c->{sock}); } -sub wait_consumers ($$$) { - my ($self, $git, $consumers) = @_; - local @PublicInbox::DS::post_loop_do = (\&consumers_open, $consumers); +sub wait_active ($$$$) { + my ($self, $git, $active, $c) = @_; + local @PublicInbox::DS::post_loop_do = (\&consumer_open, $c); PublicInbox::DS::event_loop($MY_SIG, $SIGSET); - my $n = grep { ! $self->{-shard_ok}->{$_} } keys %$consumers; + my $n = grep { ! $self->{-shard_ok}->{$_} } keys %$active; die "E: $git->{git_dir} $n shards failed" if $n && !$DO_QUIT; } -sub commit_used_shards ($$$) { - my ($self, $git, $consumers) = @_; +sub commit_active_shards ($$$) { + my ($self, $git, $active) = @_; local $self->{-shard_ok} = {}; - for my $n (keys %$consumers) { - my ($c, $p) = PublicInbox::PktOp->pair; - $c->{ops}->{shard_done} = [ $self ]; - $IDX_SHARDS[$n]->wq_io_do('shard_commit', [ $p->{op_p} ], $n); - $consumers->{$n} = $c; + my ($c, $p) = PublicInbox::PktOp->pair; + $c->{ops}->{shard_done} = [ $self ]; + for my $n (keys %$active) { + $IDX_SHARDS[$n]->wq_io_do('shard_commit', [ $p->{op_p} ]); } - wait_consumers($self, $git, $consumers); + undef $p; + wait_active($self, $git, $active, $c); } sub index_repo { # cidx_await cb @@ -547,30 +546,31 @@ sub index_repo { # cidx_await cb $repo->{roots} = \@roots; local $self->{current_info} = $git->{git_dir}; my @shard_in = partition_refs($self, $git, delete($repo->{refs})); - local $self->{-shard_ok} = {}; # [0..$#shard_in] => 1 - my $consumers = {}; + local $self->{-shard_ok} = {}; + my $active = {}; + my ($c, $p) = PublicInbox::PktOp->pair; + $c->{ops}->{shard_done} = [ $self ]; for my $n (0..$#shard_in) { -s $shard_in[$n] or next; last if $DO_QUIT; - my ($c, $p) = PublicInbox::PktOp->pair; - $c->{ops}->{shard_done} = [ $self ]; $IDX_SHARDS[$n]->wq_io_do('shard_index', [ $shard_in[$n], $p->{op_p} ], $git, \@roots); - $consumers->{$n} = $c; + $active->{$n} = undef; } + undef $p; @shard_in = (); - wait_consumers($self, $git, $consumers); + wait_active($self, $git, $active, $c); if ($DO_QUIT) { - commit_used_shards($self, $git, $consumers); + commit_active_shards($self, $git, $active); progress($self, "$git->{git_dir}: done"); return; } $repo->{git_dir} = $git->{git_dir}; my $id = $IDX_SHARDS[$repo->{shard_n}]->wq_do('store_repo', $repo); if ($id > 0) { - $consumers->{$repo->{shard_n}} = undef; - commit_used_shards($self, $git, $consumers); + $active->{$repo->{shard_n}} = undef; + commit_active_shards($self, $git, $active); progress($self, "$git->{git_dir}: done"); return run_deferred(); }