unofficial mirror of meta@public-inbox.org
 help / color / mirror / Atom feed
From: Eric Wong <e@80x24.org>
To: meta@public-inbox.org
Subject: [PATCH 7/7] lei2mail: parallel augment for lock-free stores
Date: Sun, 21 Feb 2021 07:41:34 +0000	[thread overview]
Message-ID: <20210221074134.15084-8-e@80x24.org> (raw)
In-Reply-To: <20210221074134.15084-1-e@80x24.org>

This lets us make use of multiple cores on IMAP and Maildir
backed by SSD (or better) storage.  This benefits IMAP stores
with high network latency, but may still penalize IMAP servers
with rotational storage.
---
 lib/PublicInbox/LeiToMail.pm  | 32 ++++++++++++++++++++++++++++----
 lib/PublicInbox/LeiXSearch.pm | 26 ++++++++++++++++----------
 lib/PublicInbox/NetReader.pm  |  9 +++++++--
 3 files changed, 51 insertions(+), 16 deletions(-)

diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
index b5d560c7..6efd398a 100644
--- a/lib/PublicInbox/LeiToMail.pm
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -19,6 +19,7 @@ use Symbol qw(gensym);
 use IO::Handle; # ->autoflush
 use Fcntl qw(SEEK_SET SEEK_END O_CREAT O_EXCL O_WRONLY);
 use Errno qw(EEXIST ESPIPE ENOENT EPIPE);
+use Digest::SHA qw(sha256_hex);
 my ($maildir_each_file);
 
 # struggles with short-lived repos, Gcf2Client makes little sense with lei;
@@ -269,7 +270,15 @@ sub _mbox_write_cb ($$) {
 }
 
 sub _augment_file { # maildir_each_file cb
-	my ($f, $lei) = @_;
+	my ($f, $lei, $mod, $shard) = @_;
+	if ($mod) {
+		# can't get dirent.d_ino w/ pure Perl, so we extract the OID
+		# if it looks like one:
+		my $hex = $f =~ m!\b([a-f0-9]{40,})[^/]*\z! ?
+				$1 : sha256_hex($f);
+		my $recno = hex(substr($hex, 0, 8));
+		return if ($recno % $mod) != $shard;
+	}
 	my $eml = PublicInbox::InboxWritable::eml_from_path($f) or return;
 	_augment($eml, $lei);
 }
@@ -421,7 +430,9 @@ sub _do_augment_maildir {
 	if ($lei->{opt}->{augment}) {
 		my $dedupe = $lei->{dedupe};
 		if ($dedupe && $dedupe->prepare_dedupe) {
-			$maildir_each_file->($dst, \&_augment_file, $lei);
+			my ($mod, $shard) = @{$self->{shard_info} // []};
+			$maildir_each_file->($dst, \&_augment_file,
+						$lei, $mod, $shard);
 			$dedupe->pause_dedupe;
 		}
 	} else { # clobber existing Maildir
@@ -516,11 +527,24 @@ sub ipc_atfork_child {
 	my ($self) = @_;
 	my $lei = delete $self->{lei};
 	$lei->lei_atfork_child;
-	if ($self->{-wq_worker_nr} == 0) {
+	my $aug;
+	if (lock_free($self)) {
+		my $mod = $self->{-wq_nr_workers};
+		my $shard = $self->{-wq_worker_nr};
+		if (my $nwr = $lei->{nwr}) {
+			$nwr->{shard_info} = [ $mod, $shard ];
+		} else { # Maildir (MH?)
+			$self->{shard_info} = [ $mod, $shard ];
+		}
+		$aug = '+'; # incr_post_augment
+	} elsif ($self->{-wq_worker_nr} == 0) {
+		$aug = '.'; # do_post_augment
+	}
+	if ($aug) {
 		local $0 = 'do_augment';
 		eval { do_augment($self, $lei) };
 		$lei->fail($@) if $@;
-		pkt_do($lei->{pkt_op_p}, '.') == 1 or
+		pkt_do($lei->{pkt_op_p}, $aug) == 1 or
 					die "do_post_augment trigger: $!";
 	}
 	if (my $zpipe = delete $lei->{zpipe}) {
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index 524f4d1c..e982165f 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -331,18 +331,16 @@ Error closing $lei->{ovv}->{dst}: $!
 
 sub do_post_augment {
 	my ($lei) = @_;
-	my $l2m = $lei->{l2m};
+	my $l2m = $lei->{l2m} or die 'BUG: unexpected do_post_augment';
 	my $err;
-	if ($l2m) {
-		eval { $l2m->post_augment($lei) };
-		$err = $@;
-		if ($err) {
-			if (my $lxs = delete $lei->{lxs}) {
-				$lxs->wq_kill;
-				$lxs->wq_close(0, undef, $lei);
-			}
-			$lei->fail("$err");
+	eval { $l2m->post_augment($lei) };
+	$err = $@;
+	if ($err) {
+		if (my $lxs = delete $lei->{lxs}) {
+			$lxs->wq_kill;
+			$lxs->wq_close(0, undef, $lei);
 		}
+		$lei->fail("$err");
 	}
 	if (!$err && delete $lei->{early_mua}) { # non-augment case
 		$lei->start_mua;
@@ -350,6 +348,13 @@ sub do_post_augment {
 	close(delete $lei->{au_done}); # triggers wait_startq in lei_xsearch
 }
 
+sub incr_post_augment { # called whenever an l2m shard finishes
+	my ($lei) = @_;
+	my $l2m = $lei->{l2m} or die 'BUG: unexpected incr_post_augment';
+	return if ++$lei->{nr_post_augment} != $l2m->{-wq_nr_workers};
+	do_post_augment($lei);
+}
+
 my $MAX_PER_HOST = 4;
 
 sub concurrency {
@@ -392,6 +397,7 @@ sub do_query {
 		'|' => [ $lei->can('sigpipe_handler'), $lei ],
 		'!' => [ $lei->can('fail_handler'), $lei ],
 		'.' => [ \&do_post_augment, $lei ],
+		'+' => [ \&incr_post_augment, $lei ],
 		'' => [ \&query_done, $lei ],
 		'mset_progress' => [ \&mset_progress, $lei ],
 		'x_it' => [ $lei->can('x_it'), $lei ],
diff --git a/lib/PublicInbox/NetReader.pm b/lib/PublicInbox/NetReader.pm
index 4c412491..0956d5da 100644
--- a/lib/PublicInbox/NetReader.pm
+++ b/lib/PublicInbox/NetReader.pm
@@ -363,8 +363,11 @@ sub _imap_fetch_all ($$$) {
 	}
 	return if $l_uid >= $r_uid; # nothing to do
 	$l_uid ||= 1;
-
-	warn "# $uri fetching UID $l_uid:$r_uid\n" unless $self->{quiet};
+	my ($mod, $shard) = @{$self->{shard_info} // []};
+	unless ($self->{quiet}) {
+		my $m = $mod ? " [(UID % $mod) == $shard]" : '';
+		warn "# $uri fetching UID $l_uid:$r_uid$m\n";
+	}
 	$mic->Uid(1); # the default, we hope
 	my $bs = $self->{imap_opt}->{$sec}->{batch_size} // 1;
 	my $req = $mic->imap4rev1 ? 'BODY.PEEK[]' : 'RFC822.PEEK';
@@ -391,6 +394,8 @@ sub _imap_fetch_all ($$$) {
 		$l_uid = $uids->[-1] + 1; # for next search
 		my $last_uid;
 		my $n = $self->{max_batch};
+
+		@$uids = grep { ($_ % $mod) == $shard } @$uids if $mod;
 		while (scalar @$uids) {
 			my @batch = splice(@$uids, 0, $bs);
 			$batch = join(',', @batch);

      parent reply	other threads:[~2021-02-21  7:41 UTC|newest]

Thread overview: 8+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2021-02-21  7:41 [PATCH 0/7] "lei q -o imaps://..." support Eric Wong
2021-02-21  7:41 ` [PATCH 1/7] inbox_writable: require PublicInbox::MdirReader Eric Wong
2021-02-21  7:41 ` [PATCH 2/7] lei q: support IMAP/IMAPS --output destinations Eric Wong
2021-02-21  7:41 ` [PATCH 3/7] ipc: add wq_broadcast Eric Wong
2021-02-21  7:41 ` [PATCH 4/7] lei q: move augment into lei2mail workers Eric Wong
2021-02-21  7:41 ` [PATCH 5/7] ipc: support setting a locked number of WQ workers Eric Wong
2021-02-21  7:41 ` [PATCH 6/7] net_reader: use and accept URIimap objects in more places Eric Wong
2021-02-21  7:41 ` Eric Wong [this message]

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

  List information: https://public-inbox.org/README

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20210221074134.15084-8-e@80x24.org \
    --to=e@80x24.org \
    --cc=meta@public-inbox.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).