From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-Status: No, score=-4.0 required=3.0 tests=ALL_TRUSTED,AWL,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id EB7571F92B for ; Fri, 24 Jul 2020 05:56:10 +0000 (UTC) From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 19/20] searchidx: $batch_cb => v1_checkpoint Date: Fri, 24 Jul 2020 05:56:05 +0000 Message-Id: <20200724055606.27332-20-e@yhbt.net> In-Reply-To: <20200724055606.27332-1-e@yhbt.net> References: <20200724055606.27332-1-e@yhbt.net> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: Another closure gone, and we may be able to share more code with v2 in upcoming commits. --- lib/PublicInbox/SearchIdx.pm | 90 ++++++++++++++++++------------------ 1 file changed, 45 insertions(+), 45 deletions(-) diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm index 39dc1f874..fe089c8e8 100644 --- a/lib/PublicInbox/SearchIdx.pm +++ b/lib/PublicInbox/SearchIdx.pm @@ -573,9 +573,48 @@ sub ck_size { # check_async cb for -index --max-size=... } } +sub v1_checkpoint ($$;$) { + my ($self, $sync, $stk) = @_; + $self->{ibx}->git->check_async_wait; + $self->{ibx}->git->cat_async_wait; + + # latest_cmt may be undef + my $newest = $stk ? $stk->{latest_cmt} : undef; + if ($newest) { + my $cur = $self->{mm}->last_commit || ''; + if (need_update($self, $cur, $newest)) { + $self->{mm}->last_commit($newest); + } + } else { + ${$sync->{max}} = $BATCH_BYTES; + } + + $self->{mm}->{dbh}->commit; + if ($newest && need_xapian($self)) { + my $cur = $self->{xdb}->get_metadata('last_commit'); + if (need_update($self, $cur, $newest)) { + $self->{xdb}->set_metadata('last_commit', $newest); + } + } + + $self->{over}->rethread_done($sync->{-opt}) if $newest; # all done + commit_txn_lazy($self); + $self->{ibx}->git->cleanup; + my $nr = ${$sync->{nr}}; + idx_release($self, $nr); + # let another process do some work... + if (my $pr = $sync->{-opt}->{-progress}) { + $pr->("indexed $nr/$sync->{ntodo}\n") if $nr; + } + if (!$stk) { # more to come + begin_txn_lazy($self); + $self->{mm}->{dbh}->begin_work; + } +} + # only for v1 sub process_stack { - my ($self, $stk, $sync, $batch_cb) = @_; + my ($self, $sync, $stk) = @_; my $git = $self->{ibx}->git; my $max = $BATCH_BYTES; my $nr = 0; @@ -583,6 +622,7 @@ sub process_stack { $sync->{max} = \$max; $sync->{sidx} = $self; + $self->{mm}->{dbh}->begin_work; if (my @leftovers = keys %{delete($sync->{D}) // {}}) { warn('W: unindexing '.scalar(@leftovers)." leftovers\n"); for my $oid (@leftovers) { @@ -599,19 +639,12 @@ sub process_stack { } else { $git->cat_async($oid, \&index_both, $arg); } - if ($max <= 0) { - $git->check_async_wait; - $git->cat_async_wait; - $max = $BATCH_BYTES; - $batch_cb->($nr); - } + v1_checkpoint($self, $sync) if $max <= 0; } elsif ($f eq 'd') { $git->cat_async($oid, \&unindex_both, $self); } } - $git->check_async_wait; - $git->cat_async_wait; - $batch_cb->($nr, $stk); + v1_checkpoint($self, $sync, $stk); } sub log2stack ($$$$) { @@ -729,7 +762,7 @@ sub _index_sync { my $git = $self->{ibx}->git; $git->batch_prepare; my $pr = $opts->{-progress}; - my $sync = { reindex => $opts->{reindex} }; + my $sync = { reindex => $opts->{reindex}, -opt => $opts }; my $xdb = $self->begin_txn_lazy; $self->{over}->rethread_prepare($opts); my $mm = _msgmap_init($self); @@ -750,40 +783,7 @@ sub _index_sync { my $stk = prepare_stack($self, $sync, $range); $sync->{ntodo} = $stk ? $stk->num_records : 0; $pr->("$sync->{ntodo}\n") if $pr; # continue previous line - - my $dbh = $mm->{dbh}; - my $batch_cb = sub { - my ($nr, $stk) = @_; - # latest_cmt may be undef - my $newest = $stk ? $stk->{latest_cmt} : undef; - if ($newest) { - my $cur = $mm->last_commit || ''; - if (need_update($self, $cur, $newest)) { - $mm->last_commit($newest); - } - } - $dbh->commit; - if ($newest && need_xapian($self)) { - my $cur = $xdb->get_metadata('last_commit'); - if (need_update($self, $cur, $newest)) { - $xdb->set_metadata('last_commit', $newest); - } - } - - $self->{over}->rethread_done($opts) if $newest; # all done - $self->commit_txn_lazy; - $git->cleanup; - $xdb = idx_release($self, $nr); - # let another process do some work... - $pr->("indexed $nr/$sync->{ntodo}\n") if $pr && $nr; - if (!$stk) { # more to come - $xdb = $self->begin_txn_lazy; - $dbh->begin_work; - } - }; - - $dbh->begin_work; - process_stack($self, $stk, $sync, $batch_cb); + process_stack($self, $sync, $stk); } sub DESTROY {