From: Eric Wong <e@yhbt.net>
To: meta@public-inbox.org
Subject: [PATCH 4/8] with_umask: pass args to callback
Date: Fri, 17 Jul 2020 06:31:51 +0000 [thread overview]
Message-ID: <20200717063155.3734-5-e@yhbt.net> (raw)
In-Reply-To: <20200717063155.3734-1-e@yhbt.net>
While it makes the code flow slightly less well in some places,
it saves us runtime allocations and indentation.
---
lib/PublicInbox/InboxWritable.pm | 42 +++++++------
lib/PublicInbox/SearchIdx.pm | 40 ++++++------
lib/PublicInbox/V2Writable.pm | 102 ++++++++++++++-----------------
lib/PublicInbox/Xapcmd.pm | 35 ++++++-----
4 files changed, 111 insertions(+), 108 deletions(-)
diff --git a/lib/PublicInbox/InboxWritable.pm b/lib/PublicInbox/InboxWritable.pm
index 875dcce2..1f3f6672 100644
--- a/lib/PublicInbox/InboxWritable.pm
+++ b/lib/PublicInbox/InboxWritable.pm
@@ -37,27 +37,33 @@ sub assert_usable_dir {
die "no inboxdir defined for $self->{name}\n";
}
+sub _init_v1 {
+ my ($self, $skip_artnum) = @_;
+ if (defined($self->{indexlevel}) || defined($skip_artnum)) {
+ require PublicInbox::SearchIdx;
+ require PublicInbox::Msgmap;
+ my $sidx = PublicInbox::SearchIdx->new($self, 1); # just create
+ $sidx->begin_txn_lazy;
+ if (defined $skip_artnum) {
+ my $mm = PublicInbox::Msgmap->new($self->{inboxdir}, 1);
+ $mm->{dbh}->begin_work;
+ $mm->skip_artnum($skip_artnum);
+ $mm->{dbh}->commit;
+ }
+ $sidx->commit_txn_lazy;
+ } else {
+ open my $fh, '>>', "$self->{inboxdir}/ssoma.lock" or
+ die "$self->{inboxdir}/ssoma.lock: $!\n";
+ }
+}
+
sub init_inbox {
my ($self, $shards, $skip_epoch, $skip_artnum) = @_;
if ($self->version == 1) {
my $dir = assert_usable_dir($self);
PublicInbox::Import::init_bare($dir);
- if (defined($self->{indexlevel}) || defined($skip_artnum)) {
- require PublicInbox::SearchIdx;
- require PublicInbox::Msgmap;
- my $sidx = PublicInbox::SearchIdx->new($self, 1); # just create
- $sidx->begin_txn_lazy;
- $self->with_umask(sub {
- my $mm = PublicInbox::Msgmap->new($dir, 1);
- $mm->{dbh}->begin_work;
- $mm->skip_artnum($skip_artnum);
- $mm->{dbh}->commit;
- }) if defined($skip_artnum);
- $sidx->commit_txn_lazy;
- } else {
- open my $fh, '>>', "$dir/ssoma.lock" or
- die "$dir/ssoma.lock: $!\n";
- }
+ $self->umask_prepare;
+ $self->with_umask(\&_init_v1, $self, $skip_artnum);
} else {
my $v2w = importer($self);
$v2w->init_inbox($shards, $skip_epoch, $skip_artnum);
@@ -255,9 +261,9 @@ sub _umask_for {
}
sub with_umask {
- my ($self, $cb) = @_;
+ my ($self, $cb, @arg) = @_;
my $old = umask $self->{umask};
- my $rv = eval { $cb->() };
+ my $rv = eval { $cb->(@arg) };
my $err = $@;
umask $old;
die $err if $err;
diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm
index 4caa66d3..c93c9034 100644
--- a/lib/PublicInbox/SearchIdx.pm
+++ b/lib/PublicInbox/SearchIdx.pm
@@ -585,7 +585,7 @@ sub unindex_both { # git->cat_async callback
sub index_sync {
my ($self, $opts) = @_;
delete $self->{lock_path} if $opts->{-skip_lock};
- $self->{-inbox}->with_umask(sub { $self->_index_sync($opts) })
+ $self->{-inbox}->with_umask(\&_index_sync, $self, $opts);
}
sub too_big ($$$) {
@@ -854,17 +854,18 @@ sub remote_remove {
}
}
-sub begin_txn_lazy {
+sub _begin_txn {
my ($self) = @_;
- return if $self->{txn};
+ my $xdb = $self->{xdb} || $self->_xdb_acquire;
+ $self->{over}->begin_lazy if $self->{over};
+ $xdb->begin_transaction if $xdb;
+ $self->{txn} = 1;
+ $xdb;
+}
- $self->{-inbox}->with_umask(sub {
- my $xdb = $self->{xdb} || $self->_xdb_acquire;
- $self->{over}->begin_lazy if $self->{over};
- $xdb->begin_transaction if $xdb;
- $self->{txn} = 1;
- $xdb;
- });
+sub begin_txn_lazy {
+ my ($self) = @_;
+ $self->{-inbox}->with_umask(\&_begin_txn, $self) if !$self->{txn};
}
# store 'indexlevel=medium' in v2 shard=0 and v1 (only one shard)
@@ -882,16 +883,19 @@ sub set_indexlevel {
}
}
+sub _commit_txn {
+ my ($self) = @_;
+ if (my $xdb = $self->{xdb}) {
+ set_indexlevel($self);
+ $xdb->commit_transaction;
+ }
+ $self->{over}->commit_lazy if $self->{over};
+}
+
sub commit_txn_lazy {
my ($self) = @_;
- delete $self->{txn} or return;
- $self->{-inbox}->with_umask(sub {
- if (my $xdb = $self->{xdb}) {
- set_indexlevel($self);
- $xdb->commit_transaction;
- }
- $self->{over}->commit_lazy if $self->{over};
- });
+ delete($self->{txn}) and
+ $self->{-inbox}->with_umask(\&_commit_txn, $self);
}
sub worker_done {
diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm
index 0119ea76..b51c8525 100644
--- a/lib/PublicInbox/V2Writable.pm
+++ b/lib/PublicInbox/V2Writable.pm
@@ -147,10 +147,8 @@ sub init_inbox {
# returns undef on duplicate or spam
# mimics Import::add and wraps it for v2
sub add {
- my ($self, $mime, $check_cb) = @_;
- $self->{-inbox}->with_umask(sub {
- _add($self, $mime, $check_cb)
- });
+ my ($self, $eml, $check_cb) = @_;
+ $self->{-inbox}->with_umask(\&_add, $self, $eml, $check_cb);
}
# indexes a message, returns true if checkpointing is needed
@@ -276,6 +274,28 @@ sub idx_shard {
$self->{idx_shards}->[$shard_i];
}
+sub _idx_init { # with_umask callback
+ my ($self, $opt) = @_;
+ $self->lock_acquire unless $opt && $opt->{-skip_lock};
+ $self->{over}->create;
+
+ # xcpdb can change shard count while -watch is idle
+ my $nshards = count_shards($self);
+ $self->{shards} = $nshards if $nshards && $nshards != $self->{shards};
+
+ # need to create all shards before initializing msgmap FD
+ # idx_shards must be visible to all forked processes
+ my $max = $self->{shards} - 1;
+ my $idx = $self->{idx_shards} = [];
+ push @$idx, PublicInbox::SearchIdxShard->new($self, $_) for (0..$max);
+
+ # Now that all subprocesses are up, we can open the FDs
+ # for SQLite:
+ my $mm = $self->{mm} = PublicInbox::Msgmap->new_file(
+ "$self->{-inbox}->{inboxdir}/msgmap.sqlite3", 1);
+ $mm->{dbh}->begin_work;
+}
+
# idempotent
sub idx_init {
my ($self, $opt) = @_;
@@ -285,13 +305,10 @@ sub idx_init {
# do not leak read-only FDs to child processes, we only have these
# FDs for duplicate detection so they should not be
# frequently activated.
+ # delete @$ibx{qw(git mm search)};
delete $ibx->{$_} foreach (qw(git mm search));
- my $indexlevel = $ibx->{indexlevel};
- if ($indexlevel && $indexlevel eq 'basic') {
- $self->{parallel} = 0;
- }
-
+ $self->{parallel} = 0 if ($ibx->{indexlevel}//'') eq 'basic';
if ($self->{parallel}) {
pipe(my ($r, $w)) or die "pipe failed: $!";
# pipe for barrier notifications doesn't need to be big,
@@ -301,33 +318,8 @@ sub idx_init {
$w->autoflush(1);
}
- my $over = $self->{over};
$ibx->umask_prepare;
- $ibx->with_umask(sub {
- $self->lock_acquire unless ($opt && $opt->{-skip_lock});
- $over->create;
-
- # xcpdb can change shard count while -watch is idle
- my $nshards = count_shards($self);
- if ($nshards && $nshards != $self->{shards}) {
- $self->{shards} = $nshards;
- }
-
- # need to create all shards before initializing msgmap FD
- my $max = $self->{shards} - 1;
-
- # idx_shards must be visible to all forked processes
- my $idx = $self->{idx_shards} = [];
- for my $i (0..$max) {
- push @$idx, PublicInbox::SearchIdxShard->new($self, $i);
- }
-
- # Now that all subprocesses are up, we can open the FDs
- # for SQLite:
- my $mm = $self->{mm} = PublicInbox::Msgmap->new_file(
- "$self->{-inbox}->{inboxdir}/msgmap.sqlite3", 1);
- $mm->{dbh}->begin_work;
- });
+ $ibx->with_umask(\&_idx_init, $self, $opt);
}
# returns an array mapping [ epoch => latest_commit ]
@@ -379,24 +371,24 @@ sub content_matches ($$) {
# used for removing or replacing (purging)
sub rewrite_internal ($$;$$$) {
- my ($self, $old_mime, $cmt_msg, $new_mime, $sref) = @_;
+ my ($self, $old_eml, $cmt_msg, $new_eml, $sref) = @_;
$self->idx_init;
my ($im, $need_reindex, $replace_map);
if ($sref) {
$replace_map = {}; # oid => sref
- $need_reindex = [] if $new_mime;
+ $need_reindex = [] if $new_eml;
} else {
$im = $self->importer;
}
my $over = $self->{over};
- my $chashes = content_hashes($old_mime);
- my @removed;
- my $mids = mids($old_mime->header_obj);
+ my $chashes = content_hashes($old_eml);
+ my $removed = [];
+ my $mids = mids($old_eml->header_obj);
# We avoid introducing new blobs into git since the raw content
# can be slightly different, so we do not need the user-supplied
# message now that we have the mids and content_hash
- $old_mime = undef;
+ $old_eml = undef;
my $mark;
foreach my $mid (@$mids) {
@@ -422,15 +414,15 @@ sub rewrite_internal ($$;$$$) {
}
foreach my $num (keys %gone) {
my ($smsg, $mime, $orig) = @{$gone{$num}};
- # @removed should only be set once assuming
+ # $removed should only be set once assuming
# no bugs in our deduplication code:
- @removed = (undef, $mime, $smsg);
+ $removed = [ undef, $mime, $smsg ];
my $oid = $smsg->{blob};
if ($replace_map) {
$replace_map->{$oid} = $sref;
} else {
($mark, undef) = $im->remove($orig, $cmt_msg);
- $removed[0] = $mark;
+ $removed->[0] = $mark;
}
$orig = undef;
if ($need_reindex) { # ->replace
@@ -447,28 +439,26 @@ sub rewrite_internal ($$;$$$) {
$self->{last_commit}->[$self->{epoch_max}] = $cmt;
}
if ($replace_map && scalar keys %$replace_map) {
- my $rewrites = _replace_oids($self, $new_mime, $replace_map);
+ my $rewrites = _replace_oids($self, $new_eml, $replace_map);
return { rewrites => $rewrites, need_reindex => $need_reindex };
}
- defined($mark) ? @removed : undef;
+ defined($mark) ? $removed : undef;
}
# public (see PublicInbox::Import->remove), but note the 3rd element
# (retval[2]) is not part of the stable API shared with Import->remove
sub remove {
- my ($self, $mime, $cmt_msg) = @_;
- my @ret;
- $self->{-inbox}->with_umask(sub {
- @ret = rewrite_internal($self, $mime, $cmt_msg);
- });
- defined($ret[0]) ? @ret : undef;
+ my ($self, $eml, $cmt_msg) = @_;
+ my $r = $self->{-inbox}->with_umask(\&rewrite_internal,
+ $self, $eml, $cmt_msg);
+ defined($r) && defined($r->[0]) ? @$r: undef;
}
sub _replace ($$;$$) {
- my ($self, $old_mime, $new_mime, $sref) = @_;
- my $rewritten = $self->{-inbox}->with_umask(sub {
- rewrite_internal($self, $old_mime, undef, $new_mime, $sref);
- }) or return;
+ my ($self, $old_eml, $new_eml, $sref) = @_;
+ my $arg = [ $self, $old_eml, undef, $new_eml, $sref ];
+ my $rewritten = $self->{-inbox}->with_umask(\&rewrite_internal,
+ $self, $old_eml, undef, $new_eml, $sref) or return;
my $rewrites = $rewritten->{rewrites};
# ->done is called if there are rewrites since we gc+prune from git
diff --git a/lib/PublicInbox/Xapcmd.pm b/lib/PublicInbox/Xapcmd.pm
index a57fa559..c04f935c 100644
--- a/lib/PublicInbox/Xapcmd.pm
+++ b/lib/PublicInbox/Xapcmd.pm
@@ -229,6 +229,24 @@ sub prepare_run {
sub check_compact () { runnable_or_die($XAPIAN_COMPACT) }
+sub _run {
+ my ($ibx, $cb, $opt, $reindex) = @_;
+ my $im = $ibx->importer(0);
+ $im->lock_acquire;
+ my ($tmp, $queue) = prepare_run($ibx, $opt);
+
+ # fine-grained locking if we prepare for reindex
+ if (!$opt->{-coarse_lock}) {
+ prepare_reindex($ibx, $im, $reindex);
+ $im->lock_release;
+ }
+
+ $ibx->cleanup;
+ process_queue($queue, $cb, $opt);
+ $im->lock_acquire if !$opt->{-coarse_lock};
+ commit_changes($ibx, $im, $tmp, $opt);
+}
+
sub run {
my ($ibx, $task, $opt) = @_; # task = 'cpdb' or 'compact'
my $cb = \&${\"PublicInbox::Xapcmd::$task"};
@@ -248,22 +266,7 @@ sub run {
local %SIG = %SIG;
setup_signals();
$ibx->umask_prepare;
- $ibx->with_umask(sub {
- my $im = $ibx->importer(0);
- $im->lock_acquire;
- my ($tmp, $queue) = prepare_run($ibx, $opt);
-
- # fine-grained locking if we prepare for reindex
- if (!$opt->{-coarse_lock}) {
- prepare_reindex($ibx, $im, $reindex);
- $im->lock_release;
- }
-
- $ibx->cleanup;
- process_queue($queue, $cb, $opt);
- $im->lock_acquire if !$opt->{-coarse_lock};
- commit_changes($ibx, $im, $tmp, $opt);
- });
+ $ibx->with_umask(\&_run, $ibx, $cb, $opt, $reindex);
}
sub cpdb_retryable ($$) {
next prev parent reply other threads:[~2020-07-17 6:31 UTC|newest]
Thread overview: 10+ messages / expand[flat|nested] mbox.gz Atom feed top
2020-07-17 6:31 [PATCH 0/8] indexing cleanup and code reduction Eric Wong
2020-07-17 6:31 ` [PATCH 1/8] v2: use v5.10.1, parent.pm, drop warnings Eric Wong
2020-07-17 6:31 ` [PATCH 2/8] drop binmode usage Eric Wong
2020-07-17 6:31 ` [PATCH 3/8] import: use common capitalization for filtering headers Eric Wong
2020-07-17 6:31 ` Eric Wong [this message]
2020-07-17 6:31 ` [PATCH 5/8] overidx: each_by_mid: pass self and args to callbacks Eric Wong
2020-07-17 6:31 ` [PATCH 6/8] overidx: favor non-OO sub dispatch for internal subs Eric Wong
2020-07-17 6:31 ` [PATCH 7/8] searchidx: use v5.10.1, parent.pm, drop warnings Eric Wong
2020-07-17 6:31 ` [PATCH 8/8] search: simplify unindexing Eric Wong
2020-07-17 7:25 ` [9/8 PATCH] v2writable: git_hash_raw: avoid $TMPDIR write Eric Wong
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
List information: https://public-inbox.org/README
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20200717063155.3734-5-e@yhbt.net \
--to=e@yhbt.net \
--cc=meta@public-inbox.org \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).