SearchIdxShard was too big and adding the new extindex stuff made things worse. Since I intend to use IPC in more places, I figured it'd be good to prove it with works well by dropping it into the old v2 mix. The below diffstat is nice Eric Wong (7): ipc: some documentation comments searchidxshard: use PublicInbox::IPC to kill lots of code searchidxshard: IPC conversion, part 2 searchidxshard: replace index_raw with index_eml use Eml (or MIME) objects for all indexing paths ipc: switch to one-way pipes searchidxshard: use add_xapian directly for v2 lib/PublicInbox/ExtSearchIdx.pm | 38 +++-- lib/PublicInbox/IPC.pm | 127 +++++++++------ lib/PublicInbox/Import.pm | 4 +- lib/PublicInbox/LeiStore.pm | 18 +-- lib/PublicInbox/SearchIdx.pm | 35 +--- lib/PublicInbox/SearchIdxShard.pm | 257 +++++------------------------- lib/PublicInbox/Smsg.pm | 13 ++ lib/PublicInbox/V2Writable.pm | 102 +++++------- t/import.t | 12 +- t/search.t | 2 +- 10 files changed, 206 insertions(+), 402 deletions(-)
Fix some comments and add some short summary descriptions to hopefully make things easier-to-follow. --- lib/PublicInbox/IPC.pm | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm index 288a8c94..79cd34fe 100644 --- a/lib/PublicInbox/IPC.pm +++ b/lib/PublicInbox/IPC.pm @@ -10,7 +10,7 @@ use Socket qw(AF_UNIX SOCK_STREAM); use Carp qw(confess croak); use PublicInbox::Sigfd; my ($enc, $dec); -# ->imports at BEGIN turns serial_*_with_object into custom ops on 5.14+ +# ->imports at BEGIN turns sereal_*_with_object into custom ops on 5.14+ # and eliminate method call overhead BEGIN { eval { @@ -71,9 +71,10 @@ sub ipc_worker_loop ($$) { } } +# starts a worker if Sereal or Storable is installed sub ipc_worker_spawn { my ($self, $ident, $oldset) = @_; - return unless $enc; + return unless $enc; # no Sereal or Storable my $pid = $self->{-ipc_worker_pid}; confess "BUG: already spawned PID:$pid" if $pid; confess "BUG: already have worker socket" if $self->{-ipc_sock}; @@ -108,15 +109,17 @@ sub ipc_worker_reap { # dwaitpid callback warn "PID:$pid died with \$?=$?\n" if $?; } -# for base class, override in superclasses +# for base class, override in sub classes sub ipc_atfork_parent {} sub ipc_atfork_child {} +# should only be called inside the worker process sub ipc_worker_exit { my (undef, $code) = @_; exit($code); } +# idempotent, can be called regardless of whether worker is active or not sub ipc_worker_stop { my ($self) = @_; my $pid; @@ -147,6 +150,7 @@ sub ipc_lock_init { $self->{-ipc_lock} //= bless { lock_path => $f }, 'PublicInbox::Lock' } +# call $self->$sub(@args), on a worker if ipc_worker_spawn was used sub ipc_do { my ($self, $sub, @args) = @_; if (my $s1 = $self->{-ipc_sock}) {
It's nice to prove the new code works by swapping it into the current V2Writable / SearchIdxShard packages. This is only the first step for the core bits, and we'll be able to delete more code in a subsequent patch. --- lib/PublicInbox/IPC.pm | 2 +- lib/PublicInbox/SearchIdx.pm | 9 +- lib/PublicInbox/SearchIdxShard.pm | 237 +++++++----------------------- lib/PublicInbox/V2Writable.pm | 63 +++----- 4 files changed, 74 insertions(+), 237 deletions(-) diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm index 79cd34fe..6b7b3c7a 100644 --- a/lib/PublicInbox/IPC.pm +++ b/lib/PublicInbox/IPC.pm @@ -92,7 +92,7 @@ sub ipc_worker_spawn { $SIG{$_} = 'IGNORE' for (qw(TERM INT QUIT)); local $0 = $ident; PublicInbox::Sigfd::sig_setmask($oldset); - $self->ipc_atfork_child; + my $on_destroy = $self->ipc_atfork_child; eval { ipc_worker_loop($self, $s2) }; die "worker $ident PID:$$ died: $@\n" if $@; exit; diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm index 88349faa..d83fd4ca 100644 --- a/lib/PublicInbox/SearchIdx.pm +++ b/lib/PublicInbox/SearchIdx.pm @@ -414,6 +414,7 @@ sub _msgmap_init ($) { sub add_message { # mime = PublicInbox::Eml or Email::MIME object my ($self, $mime, $smsg, $sync) = @_; + begin_txn_lazy($self); my $mids = mids_for_index($mime); $smsg //= bless { blob => '' }, 'PublicInbox::Smsg'; # test-only compat $smsg->{mid} //= $mids->[0]; # v1 compatibility @@ -1002,14 +1003,6 @@ sub commit_txn_lazy { $self->with_umask(\&_commit_txn, $self); } -sub worker_done { - my ($self) = @_; - if (need_xapian($self)) { - die "$$ $0 xdb not released\n" if $self->{xdb}; - } - die "$$ $0 still in transaction\n" if $self->{txn}; -} - sub eidx_shard_new { my ($class, $eidx, $shard) = @_; my $self = bless { diff --git a/lib/PublicInbox/SearchIdxShard.pm b/lib/PublicInbox/SearchIdxShard.pm index 45240e07..68644bc0 100644 --- a/lib/PublicInbox/SearchIdxShard.pm +++ b/lib/PublicInbox/SearchIdxShard.pm @@ -6,11 +6,8 @@ package PublicInbox::SearchIdxShard; use strict; use v5.10.1; -use parent qw(PublicInbox::SearchIdx); -use bytes qw(length); -use IO::Handle (); # autoflush -use PublicInbox::Eml; -use PublicInbox::Sigfd; +use parent qw(PublicInbox::SearchIdx PublicInbox::IPC); +use PublicInbox::OnDestroy; sub new { my ($class, $v2w, $shard) = @_; # v2w may be ExtSearchIdx @@ -21,238 +18,108 @@ sub new { $self->idx_acquire; $self->set_metadata_once; $self->idx_release; - $self->spawn_worker($v2w, $shard) if $v2w->{parallel}; + if ($v2w->{parallel}) { + local $self->{-v2w_afc} = $v2w; + $self->ipc_worker_spawn("shard[$shard]"); + } $self; } -sub spawn_worker { - my ($self, $v2w, $shard) = @_; - my ($r, $w); - pipe($r, $w) or die "pipe failed: $!\n"; - $w->autoflush(1); - my $oldset = PublicInbox::Sigfd::block_signals(); - my $pid = fork; - defined $pid or die "fork failed: $!\n"; - if ($pid == 0) { - eval { PublicInbox::DS->Reset }; - # these signals are localized in parent - $SIG{$_} = 'IGNORE' for (qw(TERM INT QUIT)); - PublicInbox::Sigfd::sig_setmask($oldset); - my $bnote = $v2w->atfork_child; - close $w or die "failed to close: $!"; - - # F_SETPIPE_SZ = 1031 on Linux; increasing the pipe size here - # speeds V2Writable batch imports across 8 cores by nearly 20% - fcntl($r, 1031, 1048576) if $^O eq 'linux'; - - eval { shard_worker_loop($self, $v2w, $r, $shard, $bnote) }; - die "worker $shard died: $@\n" if $@; - die "unexpected MM $self->{mm}" if $self->{mm}; - exit; +sub _worker_done { + my ($self) = @_; + if ($self->need_xapian) { + die "$$ $0 xdb not released\n" if $self->{xdb}; } - PublicInbox::Sigfd::sig_setmask($oldset); - $self->{pid} = $pid; - $self->{w} = $w; - close $r or die "failed to close: $!"; -} - -sub eml ($$) { - my ($r, $len) = @_; - return if $len == 0; - my $n = read($r, my $bref, $len) or die "read: $!\n"; - $n == $len or die "short read: $n != $len\n"; - PublicInbox::Eml->new(\$bref); + die "$$ $0 still in transaction\n" if $self->{txn}; } -# this reads all the writes to $self->{w} from the parent process -sub shard_worker_loop ($$$$$) { - my ($self, $v2w, $r, $shard, $bnote) = @_; - $0 = "shard[$shard]"; +sub ipc_atfork_child { # called automatically before ipc_worker_loop + my ($self) = @_; + my $v2w = delete $self->{-v2w_afc} or die 'BUG: {-v2w_afc} missing'; + $v2w->atfork_child; # calls shard_atfork_child on our siblings + $v2w->{current_info} = "[$self->{shard}]"; # for $SIG{__WARN__} $self->begin_txn_lazy; - while (my $line = readline($r)) { - chomp $line; - $v2w->{current_info} = "[$shard] $line"; - if ($line eq 'commit') { - $self->commit_txn_lazy; - } elsif ($line eq 'close') { - $self->idx_release; - } elsif ($line eq 'barrier') { - $self->commit_txn_lazy; - # no need to lock < 512 bytes is atomic under POSIX - print $bnote "barrier $shard\n" or - die "write failed for barrier $!\n"; - } elsif ($line =~ /\AD ([0-9]+)\z/s) { - $self->remove_by_docid($1 + 0); - } elsif ($line =~ s/\A\+X //) { - my ($len, $docid, $eidx_key) = split(/ /, $line, 3); - $self->add_eidx_info($docid, $eidx_key, eml($r, $len)); - } elsif ($line =~ s/\A-X //) { - my ($len, $docid, $eidx_key) = split(/ /, $line, 3); - $self->remove_eidx_info($docid, $eidx_key, - eml($r, $len)); - } elsif ($line =~ s/\A=K (\d+) //) { - $self->set_keywords($1 + 0, split(/ /, $line)); - } elsif ($line =~ s/\A-K (\d+) //) { - $self->remove_keywords($1 + 0, split(/ /, $line)); - } elsif ($line =~ s/\A\+K (\d+) //) { - $self->add_keywords($1 + 0, split(/ /, $line)); - } elsif ($line =~ s/\AO ([^\n]+)//) { - my $over_fn = $1; - $over_fn =~ tr/\0/\n/; - $self->over_check(PublicInbox::Over->new($over_fn)); - } else { - my $eidx_key; - if ($line =~ s/\AX=(.+)\0//) { - $eidx_key = $1; - $v2w->{current_info} =~ s/\0/\\0 /; - } - # n.b. $mid may contain spaces(!) - my ($len, $bytes, $num, $oid, $ds, $ts, $tid, $mid) - = split(/ /, $line, 8); - $self->begin_txn_lazy; - my $smsg = bless { - bytes => $bytes, - num => $num + 0, - blob => $oid, - mid => $mid, - tid => $tid, - ds => $ds, - ts => $ts, - }, 'PublicInbox::Smsg'; - $smsg->{eidx_key} = $eidx_key if defined($eidx_key); - $self->add_message(eml($r, $len), $smsg); - } - } - $self->worker_done; + # caller must capture this: + PublicInbox::OnDestroy->new($$, \&_worker_done, $self); } sub index_raw { my ($self, $msgref, $eml, $smsg, $eidx_key) = @_; - if (my $w = $self->{w}) { - my @ekey = defined($eidx_key) ? ("X=$eidx_key\0") : (); - $msgref //= \($eml->as_string); - $smsg->{raw_bytes} //= length($$msgref); - # mid must be last, it can contain spaces (but not LF) - print $w @ekey, join(' ', @$smsg{qw(raw_bytes bytes - num blob ds ts tid mid)}), - "\n", $$msgref or die "failed to write shard $!\n"; - } else { - if ($eml) { - undef($$msgref) if $msgref; - } else { # --xapian-only + --sequential-shard: - $eml = PublicInbox::Eml->new($msgref); - } - $self->begin_txn_lazy; - $smsg->{eidx_key} = $eidx_key if defined $eidx_key; - $self->add_message($eml, $smsg); + if ($eml) { + undef($$msgref) if $msgref; + } else { # --xapian-only + --sequential-shard: + $eml = PublicInbox::Eml->new($msgref); } + $smsg->{eidx_key} = $eidx_key if defined $eidx_key; + $self->ipc_do('add_message', $eml, $smsg); } sub shard_add_eidx_info { my ($self, $docid, $eidx_key, $eml) = @_; - if (my $w = $self->{w}) { - my $hdr = $eml->header_obj->as_string; - my $len = length($hdr); - print $w "+X $len $docid $eidx_key\n", $hdr or - die "failed to write shard: $!"; - } else { - $self->add_eidx_info($docid, $eidx_key, $eml); - } + $self->ipc_do('add_eidx_info', $docid, $eidx_key, $eml); } sub shard_remove_eidx_info { my ($self, $docid, $eidx_key, $eml) = @_; - if (my $w = $self->{w}) { - my $hdr = $eml ? $eml->header_obj->as_string : ''; - my $len = length($hdr); - print $w "-X $len $docid $eidx_key\n", $hdr or - die "failed to write shard: $!"; - } else { - $self->remove_eidx_info($docid, $eidx_key, $eml); - } + $self->ipc_do('remove_eidx_info', $docid, $eidx_key, $eml); } -sub atfork_child { - close $_[0]->{w} or die "failed to close write pipe: $!\n"; +# needed when there's multiple IPC workers and the parent forking +# causes newer siblings to inherit older siblings sockets +sub shard_atfork_child { + my ($self) = @_; + my $pid = delete($self->{-ipc_worker_pid}) or + die "BUG: $$ no -ipc_worker_pid"; + my $s1 = delete($self->{-ipc_sock}) or die "BUG: $$ no -ipc_sock"; + $pid == $$ and die "BUG: $$ shard_atfork_child called on itself"; + close($s1) or die "close -ipc_sock: $!"; } -sub shard_barrier { - my ($self) = @_; - if (my $w = $self->{w}) { - print $w "barrier\n" or die "failed to print: $!"; - } else { - $self->commit_txn_lazy; - } +# wait for return to determine when ipc_do('commit_txn_lazy') is done +sub echo { + shift; + "@_"; } -sub shard_commit { +sub idx_close { my ($self) = @_; - if (my $w = $self->{w}) { - print $w "commit\n" or die "failed to write commit: $!"; - } else { - $self->commit_txn_lazy; - } + die "transaction in progress $self\n" if $self->{txn}; + $self->idx_release if $self->{xdb}; } sub shard_close { my ($self) = @_; - if (my $w = delete $self->{w}) { - my $pid = delete $self->{pid} or die "no process to wait on\n"; - print $w "close\n" or die "failed to write to pid:$pid: $!\n"; - close $w or die "failed to close pipe for pid:$pid: $!\n"; - waitpid($pid, 0) == $pid or die "remote process did not finish"; - $? == 0 or die ref($self)." pid:$pid exited with: $?"; - } else { - die "transaction in progress $self\n" if $self->{txn}; - $self->idx_release if $self->{xdb}; - } + $self->ipc_do('idx_close'); + $self->ipc_worker_stop; } sub shard_remove { my ($self, $num) = @_; - if (my $w = $self->{w}) { # triggers remove_by_docid in a shard child - print $w "D $num\n" or die "failed to write remove $!"; - } else { # same process - $self->remove_by_docid($num); - } + $self->ipc_do('remove_by_docid', $num); } sub shard_set_keywords { my ($self, $docid, @kw) = @_; - if (my $w = $self->{w}) { # triggers remove_by_docid in a shard child - print $w "=K $docid @kw\n" or die "failed to write: $!"; - } else { # same process - $self->set_keywords($docid, @kw); - } + $self->ipc_do('set_keywords', $docid, @kw); } sub shard_remove_keywords { my ($self, $docid, @kw) = @_; - if (my $w = $self->{w}) { # triggers remove_by_docid in a shard child - print $w "-K $docid @kw\n" or die "failed to write: $!"; - } else { # same process - $self->remove_keywords($docid, @kw); - } + $self->ipc_do('remove_keywords', $docid, @kw); } sub shard_add_keywords { my ($self, $docid, @kw) = @_; - if (my $w = $self->{w}) { # triggers remove_by_docid in a shard child - print $w "+K $docid @kw\n" or die "failed to write: $!"; - } else { # same process - $self->add_keywords($docid, @kw); - } + $self->ipc_do('add_keywords', $docid, @kw); } sub shard_over_check { my ($self, $over) = @_; - if (my $w = $self->{w}) { # triggers remove_by_docid in a shard child - my ($over_fn) = $over->{dbh}->sqlite_db_filename; - $over_fn =~ tr/\n/\0/; - print $w "O $over_fn\n" or die "failed to write over $!"; - } else { - $self->over_check($over); + if ($self->{-ipc_sock} && $over->{dbh}) { + # can't send DB handles over IPC + $over = ref($over)->new($over->{dbh}->sqlite_db_filename); } + $self->ipc_do('over_check', $over); } 1; diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm index 724fa804..cad559c5 100644 --- a/lib/PublicInbox/V2Writable.pm +++ b/lib/PublicInbox/V2Writable.pm @@ -287,16 +287,7 @@ sub _idx_init { # with_umask callback sub parallel_init ($$) { my ($self, $indexlevel) = @_; - if (($indexlevel // 'full') eq 'basic') { - $self->{parallel} = 0; - } else { - pipe(my ($r, $w)) or die "pipe failed: $!"; - # pipe for barrier notifications doesn't need to be big, - # 1031: F_SETPIPE_SZ - fcntl($w, 1031, 4096) if $^O eq 'linux'; - $self->{bnote} = [ $r, $w ]; - $w->autoflush(1); - } + $self->{parallel} = 0 if ($indexlevel // 'full') eq 'basic'; } # idempotent @@ -574,24 +565,6 @@ sub set_last_commits ($) { # this is NOT for ExtSearchIdx } } -sub barrier_init { - my ($self, $n) = @_; - $self->{bnote} or return; - --$n; - my $barrier = { map { $_ => 1 } (0..$n) }; -} - -sub barrier_wait { - my ($self, $barrier) = @_; - my $bnote = $self->{bnote} or return; - my $r = $bnote->[0]; - while (scalar keys %$barrier) { - defined(my $l = readline($r)) or die "EOF on barrier_wait: $!"; - $l =~ /\Abarrier (\d+)/ or die "bad line on barrier_wait: $l"; - delete $barrier->{$1} or die "bad shard[$1] on barrier wait"; - } -} - # public sub checkpoint ($;$) { my ($self, $wait) = @_; @@ -615,16 +588,23 @@ sub checkpoint ($;$) { $self->{oidx}->commit_lazy; # Now deal with Xapian - if ($wait) { - my $barrier = barrier_init($self, scalar @$shards); - - # each shard needs to issue a barrier command - $_->shard_barrier for @$shards; - # wait for each Xapian shard - barrier_wait($self, $barrier); - } else { - $_->shard_commit for @$shards; + # start commit_txn_lazy asynchronously on all parallel shards + # (non-parallel waits here) + $_->ipc_do('commit_txn_lazy') for @$shards; + + # transactions started on parallel shards, + # wait for them by issuing an echo command (echo can only + # run after commit_txn_lazy is done) + if ($wait && $self->{parallel}) { + my $i = 0; + for my $shard (@$shards) { + my $echo = $shard->ipc_do('echo', $i); + $echo == $i or die <<""; +shard[$i] bad echo:$echo != $i waiting for txn commit + + ++$i; + } } my $midx = $self->{midx}; # misc index @@ -679,7 +659,6 @@ sub done { eval { $self->{oidx}->dbh_close }; $err .= "over close: $@\n" if $@; delete $self->{midx}; - delete $self->{bnote}; my $nbytes = $self->{total_bytes}; $self->{total_bytes} = 0; $self->lock_release(!!$nbytes) if $shards; @@ -846,15 +825,13 @@ sub content_exists ($$$) { sub atfork_child { my ($self) = @_; - if (my $shards = $self->{idx_shards}) { - $_->atfork_child foreach @$shards; + if (my $older_siblings = $self->{idx_shards}) { + $_->shard_atfork_child for @$older_siblings; } if (my $im = $self->{im}) { $im->atfork_child; } - die "unexpected mm" if $self->{mm}; - close $self->{bnote}->[0] or die "close bnote[0]: $!\n"; - $self->{bnote}->[1]; + die "BUG: unexpected mm" if $self->{mm}; } sub reindex_checkpoint ($$) {
We can remove some now-pointless wrapper functions by using ->ipc_do in even more places. --- lib/PublicInbox/ExtSearchIdx.pm | 23 ++++++++++++----------- lib/PublicInbox/LeiStore.pm | 13 +++++++------ lib/PublicInbox/SearchIdx.pm | 8 +------- lib/PublicInbox/SearchIdxShard.pm | 30 ------------------------------ lib/PublicInbox/V2Writable.pm | 3 +-- 5 files changed, 21 insertions(+), 56 deletions(-) diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm index c3581628..064d9939 100644 --- a/lib/PublicInbox/ExtSearchIdx.pm +++ b/lib/PublicInbox/ExtSearchIdx.pm @@ -102,7 +102,7 @@ sub do_xpost ($$) { if (my $new_smsg = $req->{new_smsg}) { # 'm' on cross-posted message my $xnum = $req->{xnum}; $self->{oidx}->add_xref3($docid, $xnum, $oid, $eidx_key); - $idx->shard_add_eidx_info($docid, $eidx_key, $eml); + $idx->ipc_do('add_eidx_info', $docid, $eidx_key, $eml); check_batch_limit($req); } else { # 'd' my $rm_eidx_info; @@ -110,9 +110,10 @@ sub do_xpost ($$) { \$rm_eidx_info); if ($nr == 0) { $self->{oidx}->eidxq_del($docid); - $idx->shard_remove($docid); + $idx->ipc_do('xdb_remove', $docid); } elsif ($rm_eidx_info) { - $idx->shard_remove_eidx_info($docid, $eidx_key, $eml); + $idx->ipc_do('remove_eidx_info', + $docid, $eidx_key, $eml); $self->{oidx}->eidxq_add($docid); # yes, add } } @@ -327,7 +328,7 @@ DELETE FROM xref3 WHERE docid = ? AND ibx_id = ? } } else { warn "I: remove #$docid $eidx_key @oid\n"; - $self->idx_shard($docid)->shard_remove($docid); + $self->idx_shard($docid)->ipc_do('xdb_remove', $docid); } } @@ -440,7 +441,7 @@ sub _reindex_finalize ($$$) { for my $x (reverse @$stable) { $ibx = _ibx_for($self, $sync, $x); my $hdr = delete $x->{hdr} // die 'BUG: no {hdr}'; - $idx->shard_add_eidx_info($docid, $ibx->eidx_key, $hdr); + $idx->ipc_do('add_eidx_info', $docid, $ibx->eidx_key, $hdr); } return if $nr == 1; # likely, all good @@ -483,12 +484,12 @@ sub _reindex_oid { # git->cat_async callback my $remain = $self->{oidx}->remove_xref3($docid, $expect_oid); if ($remain == 0) { warn "W: #$docid gone or corrupted\n"; - $self->idx_shard($docid)->shard_remove($docid); + $self->idx_shard($docid)->ipc_do('xdb_remove', $docid); } elsif (my $next_oid = $req->{xr3r}->[++$req->{ix}]->[2]) { $self->git->cat_async($next_oid, \&_reindex_oid, $req); } else { warn "BUG: #$docid gone (UNEXPECTED)\n"; - $self->idx_shard($docid)->shard_remove($docid); + $self->idx_shard($docid)->ipc_do('xdb_remove', $docid); } return; } @@ -522,7 +523,7 @@ sub _reindex_smsg ($$$) { BUG? #$docid $smsg->{blob} is not referenced by inboxes during reindex $self->{oidx}->delete_by_num($docid); - $self->idx_shard($docid)->shard_remove($docid); + $self->idx_shard($docid)->ipc_do('xdb_remove', $docid); return; } @@ -799,10 +800,10 @@ DELETE FROM xref3 WHERE ibx_id = ? AND xnum = ? AND oidbin = ? if (scalar(@$xr3) == 0) { # all gone $self->{oidx}->delete_by_num($docid); $self->{oidx}->eidxq_del($docid); - $idx->shard_remove($docid); + $idx->ipc_do('xdb_remove', $docid); } else { # enqueue for reindex of remaining messages - $idx->shard_remove_eidx_info($docid, - $ibx->eidx_key); + $idx->ipc_do('remove_eidx_info', + $docid, $ibx->eidx_key); $self->{oidx}->eidxq_add($docid); # yes, add } } diff --git a/lib/PublicInbox/LeiStore.pm b/lib/PublicInbox/LeiStore.pm index 07a3198a..d686e95a 100644 --- a/lib/PublicInbox/LeiStore.pm +++ b/lib/PublicInbox/LeiStore.pm @@ -146,7 +146,7 @@ sub set_eml_keywords { my $eidx = eidx_init($self); my @docids = _docids_for($self, $eml); for my $docid (@docids) { - $eidx->idx_shard($docid)->shard_set_keywords($docid, @kw); + $eidx->idx_shard($docid)->ipc_do('set_keywords', $docid, @kw); } \@docids; } @@ -156,7 +156,7 @@ sub add_eml_keywords { my $eidx = eidx_init($self); my @docids = _docids_for($self, $eml); for my $docid (@docids) { - $eidx->idx_shard($docid)->shard_add_keywords($docid, @kw); + $eidx->idx_shard($docid)->ipc_do('add_keywords', $docid, @kw); } \@docids; } @@ -166,7 +166,7 @@ sub remove_eml_keywords { my $eidx = eidx_init($self); my @docids = _docids_for($self, $eml); for my $docid (@docids) { - $eidx->idx_shard($docid)->shard_remove_keywords($docid, @kw); + $eidx->idx_shard($docid)->ipc_do('remove_keywords', $docid, @kw) } \@docids; } @@ -205,8 +205,9 @@ sub add_eml { for my $docid (@docids) { my $idx = $eidx->idx_shard($docid); $oidx->add_xref3($docid, -1, $smsg->{blob}, '.'); - $idx->shard_add_eidx_info($docid, '.', $eml); # List-Id - $idx->shard_add_keywords($docid, @kw) if @kw; + # add_eidx_info for List-Id + $idx->ipc_do('add_eidx_info', $docid, '.', $eml); + $idx->ipc_do('add_keywords', $docid, @kw) if @kw; } \@docids; } else { @@ -215,7 +216,7 @@ sub add_eml { $oidx->add_xref3($smsg->{num}, -1, $smsg->{blob}, '.'); my $idx = $eidx->idx_shard($smsg->{num}); $idx->index_raw($msgref, $eml, $smsg); - $idx->shard_add_keywords($smsg->{num}, @kw) if @kw; + $idx->ipc_do('add_keywords', $smsg->{num}, @kw) if @kw; $smsg; } } diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm index d83fd4ca..da3ac2e3 100644 --- a/lib/PublicInbox/SearchIdx.pm +++ b/lib/PublicInbox/SearchIdx.pm @@ -552,6 +552,7 @@ sub smsg_from_doc ($) { sub xdb_remove { my ($self, @docids) = @_; + $self->begin_txn_lazy; my $xdb = $self->{xdb} or return; for my $docid (@docids) { eval { $xdb->delete_document($docid) }; @@ -559,13 +560,6 @@ sub xdb_remove { } } -sub remove_by_docid { - my ($self, $num) = @_; - die "BUG: remove_by_docid is v2-only\n" if $self->{oidx}; - $self->begin_txn_lazy; - xdb_remove($self, $num) if need_xapian($self); -} - sub index_git_blob_id { my ($doc, $pfx, $objid) = @_; diff --git a/lib/PublicInbox/SearchIdxShard.pm b/lib/PublicInbox/SearchIdxShard.pm index 68644bc0..43dad959 100644 --- a/lib/PublicInbox/SearchIdxShard.pm +++ b/lib/PublicInbox/SearchIdxShard.pm @@ -54,16 +54,6 @@ sub index_raw { $self->ipc_do('add_message', $eml, $smsg); } -sub shard_add_eidx_info { - my ($self, $docid, $eidx_key, $eml) = @_; - $self->ipc_do('add_eidx_info', $docid, $eidx_key, $eml); -} - -sub shard_remove_eidx_info { - my ($self, $docid, $eidx_key, $eml) = @_; - $self->ipc_do('remove_eidx_info', $docid, $eidx_key, $eml); -} - # needed when there's multiple IPC workers and the parent forking # causes newer siblings to inherit older siblings sockets sub shard_atfork_child { @@ -93,26 +83,6 @@ sub shard_close { $self->ipc_worker_stop; } -sub shard_remove { - my ($self, $num) = @_; - $self->ipc_do('remove_by_docid', $num); -} - -sub shard_set_keywords { - my ($self, $docid, @kw) = @_; - $self->ipc_do('set_keywords', $docid, @kw); -} - -sub shard_remove_keywords { - my ($self, $docid, @kw) = @_; - $self->ipc_do('remove_keywords', $docid, @kw); -} - -sub shard_add_keywords { - my ($self, $docid, @kw) = @_; - $self->ipc_do('add_keywords', $docid, @kw); -} - sub shard_over_check { my ($self, $over) = @_; if ($self->{-ipc_sock} && $over->{dbh}) { diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm index cad559c5..885edbe9 100644 --- a/lib/PublicInbox/V2Writable.pm +++ b/lib/PublicInbox/V2Writable.pm @@ -1133,8 +1133,7 @@ sub unindex_oid_aux ($$$) { my ($self, $oid, $mid) = @_; my @removed = $self->{oidx}->remove_oid($oid, $mid); for my $num (@removed) { - my $idx = idx_shard($self, $num); - $idx->shard_remove($num); + idx_shard($self, $num)->ipc_do('xdb_remove', $num); } }
Since Storable and Sereal are designed for lossless serialization, we'll just pass $eml objects to whatever process is running SearchIdx. --- lib/PublicInbox/ExtSearchIdx.pm | 4 ++-- lib/PublicInbox/LeiStore.pm | 3 ++- lib/PublicInbox/SearchIdxShard.pm | 9 ++------- lib/PublicInbox/V2Writable.pm | 11 +++++------ 4 files changed, 11 insertions(+), 16 deletions(-) diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm index 064d9939..d55d3db9 100644 --- a/lib/PublicInbox/ExtSearchIdx.pm +++ b/lib/PublicInbox/ExtSearchIdx.pm @@ -135,7 +135,7 @@ sub index_unseen ($) { my $oid = $new_smsg->{blob}; my $ibx = delete $req->{ibx} or die 'BUG: {ibx} unset'; $self->{oidx}->add_xref3($docid, $req->{xnum}, $oid, $ibx->eidx_key); - $idx->index_raw(undef, $eml, $new_smsg, $ibx->eidx_key); + $idx->index_eml($eml, $new_smsg, $ibx->eidx_key); check_batch_limit($req); } @@ -437,7 +437,7 @@ sub _reindex_finalize ($$$) { my $top_smsg = pop @$stable; $top_smsg == $smsg or die 'BUG: top_smsg != smsg'; my $ibx = _ibx_for($self, $sync, $smsg); - $idx->index_raw(undef, $eml, $smsg, $ibx->eidx_key); + $idx->index_eml($eml, $smsg, $ibx->eidx_key); for my $x (reverse @$stable) { $ibx = _ibx_for($self, $sync, $x); my $hdr = delete $x->{hdr} // die 'BUG: no {hdr}'; diff --git a/lib/PublicInbox/LeiStore.pm b/lib/PublicInbox/LeiStore.pm index d686e95a..4f77e8fa 100644 --- a/lib/PublicInbox/LeiStore.pm +++ b/lib/PublicInbox/LeiStore.pm @@ -199,6 +199,7 @@ sub add_eml { $im->add($eml, undef, $smsg) or return; # duplicate returns undef my $msgref = delete $smsg->{-raw_email}; $smsg->{bytes} = $smsg->{raw_bytes} + crlf_adjust($$msgref); + undef $msgref; local $self->{current_info} = $smsg->{blob}; if (my @docids = _docids_for($self, $eml)) { @@ -215,7 +216,7 @@ sub add_eml { $oidx->add_overview($eml, $smsg); $oidx->add_xref3($smsg->{num}, -1, $smsg->{blob}, '.'); my $idx = $eidx->idx_shard($smsg->{num}); - $idx->index_raw($msgref, $eml, $smsg); + $idx->index_eml($eml, $smsg); $idx->ipc_do('add_keywords', $smsg->{num}, @kw) if @kw; $smsg; } diff --git a/lib/PublicInbox/SearchIdxShard.pm b/lib/PublicInbox/SearchIdxShard.pm index 43dad959..83cbbb25 100644 --- a/lib/PublicInbox/SearchIdxShard.pm +++ b/lib/PublicInbox/SearchIdxShard.pm @@ -43,13 +43,8 @@ sub ipc_atfork_child { # called automatically before ipc_worker_loop PublicInbox::OnDestroy->new($$, \&_worker_done, $self); } -sub index_raw { - my ($self, $msgref, $eml, $smsg, $eidx_key) = @_; - if ($eml) { - undef($$msgref) if $msgref; - } else { # --xapian-only + --sequential-shard: - $eml = PublicInbox::Eml->new($msgref); - } +sub index_eml { + my ($self, $eml, $smsg, $eidx_key) = @_; $smsg->{eidx_key} = $eidx_key if defined $eidx_key; $self->ipc_do('add_message', $eml, $smsg); } diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm index 885edbe9..7b6b93a0 100644 --- a/lib/PublicInbox/V2Writable.pm +++ b/lib/PublicInbox/V2Writable.pm @@ -140,11 +140,11 @@ sub idx_shard ($$) { # indexes a message, returns true if checkpointing is needed sub do_idx ($$$$) { - my ($self, $msgref, $mime, $smsg) = @_; + my ($self, $msgref, $eml, $smsg) = @_; $smsg->{bytes} = $smsg->{raw_bytes} + crlf_adjust($$msgref); - $self->{oidx}->add_overview($mime, $smsg); + $self->{oidx}->add_overview($eml, $smsg); my $idx = idx_shard($self, $smsg->{num}); - $idx->index_raw($msgref, $mime, $smsg); + $idx->index_eml($eml, $smsg); my $n = $self->{transact_bytes} += $smsg->{raw_bytes}; $n >= $self->{batch_bytes}; } @@ -173,8 +173,7 @@ sub _add { $cmt = $im->get_mark($cmt); $self->{last_commit}->[$self->{epoch_max}] = $cmt; - my $msgref = delete $smsg->{-raw_email}; - if (do_idx($self, $msgref, $mime, $smsg)) { + if (do_idx($self, delete $smsg->{-raw_email}, $mime, $smsg)) { $self->checkpoint; } @@ -1219,7 +1218,7 @@ sub index_xap_only { # git->cat_async callback my $self = $smsg->{self}; my $idx = idx_shard($self, $smsg->{num}); $smsg->{raw_bytes} = $size; - $idx->index_raw($bref, undef, $smsg); + $idx->index_eml(PublicInbox::Eml->new($bref), $smsg); $self->{transact_bytes} += $size; }
We don't need to be keeping the raw message around after it hits git. Shard work now relies on Storable (or Sereal) and all of the indexing code relies on the Email::MIME-like API of Eml to access interesting parts of the message. Similarly, smsg->{raw_bytes} is no longer carried around and we do the CRLF adjustment when setting smsg->{bytes}. There's also a small simplification to t/import.t while we're in the area to use xqx instead of spawn/popen_rd. --- lib/PublicInbox/ExtSearchIdx.pm | 11 ++++------- lib/PublicInbox/Import.pm | 4 +--- lib/PublicInbox/LeiStore.pm | 4 ---- lib/PublicInbox/SearchIdx.pm | 17 +++-------------- lib/PublicInbox/Smsg.pm | 13 +++++++++++++ lib/PublicInbox/V2Writable.pm | 23 ++++++++++------------- t/import.t | 12 ++---------- t/search.t | 2 +- 8 files changed, 34 insertions(+), 52 deletions(-) diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm index d55d3db9..e6c21866 100644 --- a/lib/PublicInbox/ExtSearchIdx.pm +++ b/lib/PublicInbox/ExtSearchIdx.pm @@ -21,8 +21,7 @@ use Carp qw(croak carp); use Sys::Hostname qw(hostname); use POSIX qw(strftime); use PublicInbox::Search; -use PublicInbox::SearchIdx qw(crlf_adjust prepare_stack is_ancestor - is_bad_blob); +use PublicInbox::SearchIdx qw(prepare_stack is_ancestor is_bad_blob); use PublicInbox::OverIdx; use PublicInbox::MiscIdx; use PublicInbox::MID qw(mids); @@ -82,8 +81,6 @@ sub check_batch_limit ($) { my ($req) = @_; my $self = $req->{self}; my $new_smsg = $req->{new_smsg}; - - # {raw_bytes} may be unset, so just use {bytes} my $n = $self->{transact_bytes} += $new_smsg->{bytes}; # set flag for PublicInbox::V2Writable::index_todo: @@ -239,7 +236,7 @@ sub index_oid { # git->cat_async callback for 'm' my $new_smsg = $req->{new_smsg} = bless { blob => $oid, }, 'PublicInbox::Smsg'; - $new_smsg->{bytes} = $size + crlf_adjust($$bref); + $new_smsg->set_bytes($$bref, $size); defined($req->{xnum} = cur_ibx_xnum($req, $bref)) or return; ++${$req->{nr}}; do_step($req); @@ -496,7 +493,7 @@ sub _reindex_oid { # git->cat_async callback my $ci = $self->{current_info}; local $self->{current_info} = "$ci #$docid $oid"; my $re_smsg = bless { blob => $oid }, 'PublicInbox::Smsg'; - $re_smsg->{bytes} = $size + crlf_adjust($$bref); + $re_smsg->set_bytes($$bref, $size); my $eml = PublicInbox::Eml->new($bref); $re_smsg->populate($eml, { autime => $orig_smsg->{ds}, cotime => $orig_smsg->{ts} }); @@ -676,7 +673,7 @@ sub _reindex_unseen { # git->cat_async callback my $self = $req->{self} // die 'BUG: {self} unset'; local $self->{current_info} = "$self->{current_info} $oid"; my $new_smsg = bless { blob => $oid, }, 'PublicInbox::Smsg'; - $new_smsg->{bytes} = $size + crlf_adjust($$bref); + $new_smsg->set_bytes($$bref, $size); my $eml = $req->{eml} = PublicInbox::Eml->new($bref); $req->{new_smsg} = $new_smsg; $req->{chash} = content_hash($eml); diff --git a/lib/PublicInbox/Import.pm b/lib/PublicInbox/Import.pm index 052b145b..8a06a661 100644 --- a/lib/PublicInbox/Import.pm +++ b/lib/PublicInbox/Import.pm @@ -412,12 +412,10 @@ sub add { # v2: we need this for Xapian if ($smsg) { $smsg->{blob} = $self->get_mark(":$blob"); - $smsg->{raw_bytes} = $n; + $smsg->set_bytes($raw_email, $n); if (my $oidx = delete $smsg->{-oidx}) { # used by LeiStore return if $oidx->blob_exists($smsg->{blob}); } - # XXX do we need this? it's in git at this point - $smsg->{-raw_email} = \$raw_email; } my $ref = $self->{ref}; my $commit = $self->{mark}++; diff --git a/lib/PublicInbox/LeiStore.pm b/lib/PublicInbox/LeiStore.pm index 4f77e8fa..7cda7e44 100644 --- a/lib/PublicInbox/LeiStore.pm +++ b/lib/PublicInbox/LeiStore.pm @@ -10,7 +10,6 @@ package PublicInbox::LeiStore; use strict; use v5.10.1; use parent qw(PublicInbox::Lock PublicInbox::IPC); -use PublicInbox::SearchIdx qw(crlf_adjust); use PublicInbox::ExtSearchIdx; use PublicInbox::Import; use PublicInbox::InboxWritable; @@ -197,9 +196,6 @@ sub add_eml { my $smsg = bless { -oidx => $oidx }, 'PublicInbox::Smsg'; my $im = $self->importer; $im->add($eml, undef, $smsg) or return; # duplicate returns undef - my $msgref = delete $smsg->{-raw_email}; - $smsg->{bytes} = $smsg->{raw_bytes} + crlf_adjust($$msgref); - undef $msgref; local $self->{current_info} = $smsg->{blob}; if (my @docids = _docids_for($self, $eml)) { diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm index da3ac2e3..a7005051 100644 --- a/lib/PublicInbox/SearchIdx.pm +++ b/lib/PublicInbox/SearchIdx.pm @@ -22,7 +22,7 @@ use PublicInbox::OverIdx; use PublicInbox::Spawn qw(spawn nodatacow_dir); use PublicInbox::Git qw(git_unquote); use PublicInbox::MsgTime qw(msg_timestamp msg_datestamp); -our @EXPORT_OK = qw(crlf_adjust log2stack is_ancestor check_size prepare_stack +our @EXPORT_OK = qw(log2stack is_ancestor check_size prepare_stack index_text term_generator add_val is_bad_blob); my $X = \%PublicInbox::Search::X; our ($DB_CREATE_OR_OPEN, $DB_OPEN); @@ -613,17 +613,6 @@ sub index_mm { } } -# returns the number of bytes to add if given a non-CRLF arg -sub crlf_adjust ($) { - if (index($_[0], "\r\n") < 0) { - # common case is LF-only, every \n needs an \r; - # so favor a cheap tr// over an expensive m//g - $_[0] =~ tr/\n/\n/; - } else { # count number of '\n' w/o '\r', expensive: - scalar(my @n = ($_[0] =~ m/(?<!\r)\n/g)); - } -} - sub is_bad_blob ($$$$) { my ($oid, $type, $size, $expect_oid) = @_; if ($type ne 'blob') { @@ -640,8 +629,8 @@ sub index_both { # git->cat_async callback my ($nr, $max) = @$sync{qw(nr max)}; ++$$nr; $$max -= $size; - $size += crlf_adjust($$bref); - my $smsg = bless { bytes => $size, blob => $oid }, 'PublicInbox::Smsg'; + my $smsg = bless { blob => $oid }, 'PublicInbox::Smsg'; + $smsg->set_bytes($$bref, $size); my $self = $sync->{sidx}; local $self->{current_info} = "$self->{current_info}: $oid"; my $eml = PublicInbox::Eml->new($bref); diff --git a/lib/PublicInbox/Smsg.pm b/lib/PublicInbox/Smsg.pm index 571cbb6f..c6ff7f52 100644 --- a/lib/PublicInbox/Smsg.pm +++ b/lib/PublicInbox/Smsg.pm @@ -135,4 +135,17 @@ sub subject_normalized ($) { $subj; } +# returns the number of bytes to add if given a non-CRLF arg +sub crlf_adjust ($) { + if (index($_[0], "\r\n") < 0) { + # common case is LF-only, every \n needs an \r; + # so favor a cheap tr// over an expensive m//g + $_[0] =~ tr/\n/\n/; + } else { # count number of '\n' w/o '\r', expensive: + scalar(my @n = ($_[0] =~ m/(?<!\r)\n/g)); + } +} + +sub set_bytes { $_[0]->{bytes} = $_[2] + crlf_adjust($_[1]) } + 1; diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm index 7b6b93a0..c4efbdd2 100644 --- a/lib/PublicInbox/V2Writable.pm +++ b/lib/PublicInbox/V2Writable.pm @@ -17,8 +17,7 @@ use PublicInbox::InboxWritable; use PublicInbox::OverIdx; use PublicInbox::Msgmap; use PublicInbox::Spawn qw(spawn popen_rd run_die); -use PublicInbox::SearchIdx qw(log2stack crlf_adjust is_ancestor check_size - is_bad_blob); +use PublicInbox::SearchIdx qw(log2stack is_ancestor check_size is_bad_blob); use IO::Handle; # ->autoflush use File::Temp (); @@ -139,13 +138,12 @@ sub idx_shard ($$) { } # indexes a message, returns true if checkpointing is needed -sub do_idx ($$$$) { - my ($self, $msgref, $eml, $smsg) = @_; - $smsg->{bytes} = $smsg->{raw_bytes} + crlf_adjust($$msgref); +sub do_idx ($$$) { + my ($self, $eml, $smsg) = @_; $self->{oidx}->add_overview($eml, $smsg); my $idx = idx_shard($self, $smsg->{num}); $idx->index_eml($eml, $smsg); - my $n = $self->{transact_bytes} += $smsg->{raw_bytes}; + my $n = $self->{transact_bytes} += $smsg->{bytes}; $n >= $self->{batch_bytes}; } @@ -173,7 +171,7 @@ sub _add { $cmt = $im->get_mark($cmt); $self->{last_commit}->[$self->{epoch_max}] = $cmt; - if (do_idx($self, delete $smsg->{-raw_email}, $mime, $smsg)) { + if (do_idx($self, $mime, $smsg)) { $self->checkpoint; } @@ -536,13 +534,13 @@ W: $list for my $smsg (@$need_reindex) { my $new_smsg = bless { blob => $blob, - raw_bytes => $bytes, num => $smsg->{num}, mid => $smsg->{mid}, }, 'PublicInbox::Smsg'; my $sync = { autime => $smsg->{ds}, cotime => $smsg->{ts} }; $new_smsg->populate($new_mime, $sync); - do_idx($self, \$raw, $new_mime, $new_smsg); + $new_smsg->set_bytes($raw, $bytes); + do_idx($self, $new_mime, $new_smsg); } $rewritten->{rewrites}; } @@ -937,13 +935,13 @@ sub index_oid { # cat_async callback } ++${$arg->{nr}}; my $smsg = bless { - raw_bytes => $size, num => $num, blob => $oid, mid => $mid0, }, 'PublicInbox::Smsg'; $smsg->populate($eml, $arg); - if (do_idx($self, $bref, $eml, $smsg)) { + $smsg->set_bytes($$bref, $size); + if (do_idx($self, $eml, $smsg)) { ${$arg->{need_checkpoint}} = 1; } index_finalize($arg, 1); @@ -1217,9 +1215,8 @@ sub index_xap_only { # git->cat_async callback my ($bref, $oid, $type, $size, $smsg) = @_; my $self = $smsg->{self}; my $idx = idx_shard($self, $smsg->{num}); - $smsg->{raw_bytes} = $size; $idx->index_eml(PublicInbox::Eml->new($bref), $smsg); - $self->{transact_bytes} += $size; + $self->{transact_bytes} += $smsg->{bytes}; } sub index_xap_step ($$$;$) { diff --git a/t/import.t b/t/import.t index 855b5d7c..ae76858b 100644 --- a/t/import.t +++ b/t/import.t @@ -7,7 +7,6 @@ use PublicInbox::Eml; use PublicInbox::Smsg; use PublicInbox::Git; use PublicInbox::Import; -use PublicInbox::Spawn qw(spawn); use Fcntl qw(:DEFAULT SEEK_SET); use PublicInbox::TestCommon; use MIME::Base64 3.05; # Perl 5.10.0 / 5.9.2 @@ -32,20 +31,13 @@ like($im->add($mime, undef, $smsg), qr/\A:[0-9]+\z/, 'added one message'); if ($v2) { like($smsg->{blob}, qr/\A[a-f0-9]{40}\z/, 'got last object_id'); - my $raw_email = $smsg->{-raw_email}; - is($mime->as_string, $$raw_email, 'string matches'); - is($smsg->{raw_bytes}, length($$raw_email), 'length matches'); my @cmd = ('git', "--git-dir=$git->{git_dir}", qw(hash-object --stdin)); open my $in, '+<', undef or BAIL_OUT "open(+<): $!"; print $in $mime->as_string or die "write failed: $!"; $in->flush or die "flush failed: $!"; - seek($in, 0, SEEK_SET); - open my $out, '+<', undef or BAIL_OUT "open(+<): $!"; - my $pid = spawn(\@cmd, {}, { 0 => $in, 1 => $out }); - is(waitpid($pid, 0), $pid, 'waitpid succeeds on hash-object'); + seek($in, 0, SEEK_SET) or die "seek: $!"; + chomp(my $hashed_obj = xqx(\@cmd, undef, { 0 => $in })); is($?, 0, 'hash-object'); - seek($out, 0, SEEK_SET); - chomp(my $hashed_obj = <$out>); is($hashed_obj, $smsg->{blob}, "blob object_id matches exp"); } diff --git a/t/search.t b/t/search.t index 7495233e..b2958c00 100644 --- a/t/search.t +++ b/t/search.t @@ -60,7 +60,7 @@ sub oct_is ($$$) { } { - my $crlf_adjust = \&PublicInbox::SearchIdx::crlf_adjust; + my $crlf_adjust = \&PublicInbox::Smsg::crlf_adjust; is($crlf_adjust->("hi\r\nworld\r\n"), 0, 'no adjustment needed'); is($crlf_adjust->("hi\nworld\n"), 2, 'LF-only counts two CR'); is($crlf_adjust->("hi\r\nworld\n"), 1, 'CRLF/LF-mix 1 counts 1 CR');
This fixes a performance regression in multi-process v2 indexing due to the switch to PublicInbox::IPC. While Unix sockets are fewer FDs to manage, pipes allow unprivileged processes to use larger buffers (up to 1M) on out-of-the-box Linux instances. A larger buffer via F_SETPIPE_SZ afforded by pipes was proven valuable during v2 development in 2018 and continues to be valuable when we get significant amounts of one-way traffic from the producer parent to worker children. Compression may be an option for systems without F_SETPIPE_SZ; but it increases CPU usage with no memory bandwidth savings on hosts where larger buffers are available. --- lib/PublicInbox/IPC.pm | 115 +++++++++++++++++------------- lib/PublicInbox/SearchIdxShard.pm | 23 +++--- lib/PublicInbox/V2Writable.pm | 2 +- 3 files changed, 78 insertions(+), 62 deletions(-) diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm index 6b7b3c7a..c1f6f920 100644 --- a/lib/PublicInbox/IPC.pm +++ b/lib/PublicInbox/IPC.pm @@ -6,7 +6,6 @@ package PublicInbox::IPC; use strict; use v5.10.1; -use Socket qw(AF_UNIX SOCK_STREAM); use Carp qw(confess croak); use PublicInbox::Sigfd; my ($enc, $dec); @@ -34,39 +33,42 @@ if ($enc && $dec) { # should be custom ops } sub _get_rec ($) { - my ($sock) = @_; - local $/ = "\n"; - defined(my $len = <$sock>) or return; + my ($r) = @_; + defined(my $len = <$r>) or return; chop($len) eq "\n" or croak "no LF byte in $len"; - defined(my $r = read($sock, my $buf, $len)) or croak "read error: $!"; - $r == $len or croak "short read: $r != $len"; + defined(my $n = read($r, my $buf, $len)) or croak "read error: $!"; + $n == $len or croak "short read: $n != $len"; thaw($buf); } sub _send_rec ($$) { - my ($sock, $ref) = @_; + my ($w, $ref) = @_; my $buf = freeze($ref); - print $sock length($buf), "\n", $buf or croak "print: $!"; + print $w length($buf), "\n", $buf or croak "print: $!"; } sub ipc_return ($$$) { - my ($s2, $ret, $exc) = @_; - _send_rec($s2, $exc ? bless(\$exc, 'PublicInbox::IPC::Die') : $ret); + my ($w, $ret, $exc) = @_; + _send_rec($w, $exc ? bless(\$exc, 'PublicInbox::IPC::Die') : $ret); } -sub ipc_worker_loop ($$) { - my ($self, $s2) = @_; - while (my $rec = _get_rec($s2)) { - my ($wantarray, $sub, @args) = @$rec; - if (!defined($wantarray)) { # no waiting if client doesn't care +sub ipc_worker_loop ($$$) { + my ($self, $r_req, $w_res) = @_; + my ($rec, $wantarray, $sub, @args); + local $/ = "\n"; + while ($rec = _get_rec($r_req)) { + ($wantarray, $sub, @args) = @$rec; + # no waiting if client doesn't care, + # this is the overwhelmingly likely case + if (!defined($wantarray)) { eval { $self->$sub(@args) }; - eval { warn "die: $@ (from nowait $sub)\n" } if $@; + eval { warn "$$ die: $@ (from nowait $sub)\n" } if $@; } elsif ($wantarray) { my @ret = eval { $self->$sub(@args) }; - ipc_return($s2, \@ret, $@); - } else { + ipc_return($w_res, \@ret, $@); + } else { # '' => wantscalar my $ret = eval { $self->$sub(@args) }; - ipc_return($s2, \$ret, $@); + ipc_return($w_res, \$ret, $@); } } } @@ -75,33 +77,34 @@ sub ipc_worker_loop ($$) { sub ipc_worker_spawn { my ($self, $ident, $oldset) = @_; return unless $enc; # no Sereal or Storable - my $pid = $self->{-ipc_worker_pid}; - confess "BUG: already spawned PID:$pid" if $pid; - confess "BUG: already have worker socket" if $self->{-ipc_sock}; - my ($s1, $s2); - socketpair($s1, $s2, AF_UNIX, SOCK_STREAM, 0) or die "socketpair: $!"; + return if ($self->{-ipc_ppid} // -1) == $$; # idempotent + delete(@$self{qw(-ipc_req -ipc_res -ipc_ppid -ipc_pid)}); + pipe(my ($r_req, $w_req)) or die "pipe: $!"; + pipe(my ($r_res, $w_res)) or die "pipe: $!"; my $sigset = $oldset // PublicInbox::Sigfd::block_signals(); my $parent = $$; $self->ipc_atfork_parent; - defined($pid = fork) or die "fork: $!"; + defined(my $pid = fork) or die "fork: $!"; if ($pid == 0) { eval { PublicInbox::DS->Reset }; $self->{-ipc_parent_pid} = $parent; - close $s1 or die "close(\$s1): $!"; - $s2->autoflush(1); + $w_req = $r_res = undef; + $w_res->autoflush(1); $SIG{$_} = 'IGNORE' for (qw(TERM INT QUIT)); local $0 = $ident; PublicInbox::Sigfd::sig_setmask($oldset); my $on_destroy = $self->ipc_atfork_child; - eval { ipc_worker_loop($self, $s2) }; + eval { ipc_worker_loop($self, $r_req, $w_res) }; die "worker $ident PID:$$ died: $@\n" if $@; exit; } PublicInbox::Sigfd::sig_setmask($sigset) unless $oldset; - close $s2 or die "close(\$s2): $!"; - $s1->autoflush(1); - $self->{-ipc_sock} = $s1; - $self->{-ipc_worker_pid} = $pid; + $r_req = $w_res = undef; + $w_req->autoflush(1); + $self->{-ipc_req} = $w_req; + $self->{-ipc_res} = $r_res; + $self->{-ipc_ppid} = $$; + $self->{-ipc_pid} = $pid; } sub ipc_worker_reap { # dwaitpid callback @@ -122,15 +125,18 @@ sub ipc_worker_exit { # idempotent, can be called regardless of whether worker is active or not sub ipc_worker_stop { my ($self) = @_; - my $pid; - my $s1 = delete $self->{-ipc_sock} or do { - $pid = delete $self->{-ipc_worker_pid} and - die "unexpected PID:$pid without ipc_sock"; - return; - }; - $pid = delete $self->{-ipc_worker_pid} or die "no PID?"; - _send_rec($s1, [ undef, 'ipc_worker_exit', 0 ]); - shutdown($s1, 2) or die "shutdown(\$s1) for PID:$pid"; + my ($pid, $ppid) = delete(@$self{qw(-ipc_pid -ipc_ppid)}); + my ($w_req, $r_res) = delete(@$self{qw(-ipc_req -ipc_res)}); + if (!$w_req && !$r_res) { + die "unexpected PID:$pid without IPC pipes" if $pid; + return; # idempotent + } + die 'no PID with IPC pipes' unless $pid; + _send_rec($w_req, [ undef, 'ipc_worker_exit', 0 ]); + $w_req = $r_res = undef; + + # allow any sibling to send ipc_worker_exit, but siblings can't wait + return if $$ != $ppid; eval { my $reap = $self->can('ipc_worker_reap'); PublicInbox::DS::dwaitpid($pid, $reap, $self); @@ -153,17 +159,30 @@ sub ipc_lock_init { # call $self->$sub(@args), on a worker if ipc_worker_spawn was used sub ipc_do { my ($self, $sub, @args) = @_; - if (my $s1 = $self->{-ipc_sock}) { + if (my $w_req = $self->{-ipc_req}) { # run in worker my $ipc_lock = $self->{-ipc_lock}; my $lock = $ipc_lock ? $ipc_lock->lock_for_scope : undef; - _send_rec($s1, [ wantarray, $sub, @args ]); - return unless defined(wantarray); - my $ret = _get_rec($s1) // die "no response on $sub"; - die $$ret if ref($ret) eq 'PublicInbox::IPC::Die'; - wantarray ? @$ret : $$ret; - } else { + if (defined(wantarray)) { + my $r_res = $self->{-ipc_res} or die 'no ipc_res'; + _send_rec($w_req, [ wantarray, $sub, @args ]); + my $ret = _get_rec($r_res) // die "no response on $sub"; + die $$ret if ref($ret) eq 'PublicInbox::IPC::Die'; + wantarray ? @$ret : $$ret; + } else { # likely, fire-and-forget into pipe + _send_rec($w_req, [ undef , $sub, @args ]); + } + } else { # run locally $self->$sub(@args); } } +# needed when there's multiple IPC workers and the parent forking +# causes newer siblings to inherit older siblings sockets +sub ipc_sibling_atfork_child { + my ($self) = @_; + my ($pid, undef) = delete(@$self{qw(-ipc_pid -ipc_ppid)}); + delete(@$self{qw(-ipc_req -ipc_res)}); + $pid == $$ and die "BUG: $$ ipc_atfork_child called on itself"; +} + 1; diff --git a/lib/PublicInbox/SearchIdxShard.pm b/lib/PublicInbox/SearchIdxShard.pm index 83cbbb25..0051df93 100644 --- a/lib/PublicInbox/SearchIdxShard.pm +++ b/lib/PublicInbox/SearchIdxShard.pm @@ -21,6 +21,14 @@ sub new { if ($v2w->{parallel}) { local $self->{-v2w_afc} = $v2w; $self->ipc_worker_spawn("shard[$shard]"); + # F_SETPIPE_SZ = 1031 on Linux; increasing the pipe size for + # inputs speeds V2Writable batch imports across 8 cores by + # nearly 20%. Since any of our responses are small, make + # the response pipe as small as possible + if ($^O eq 'linux') { + fcntl($self->{-ipc_req}, 1031, 1048576); + fcntl($self->{-ipc_res}, 1031, 4096); + } } $self; } @@ -36,7 +44,7 @@ sub _worker_done { sub ipc_atfork_child { # called automatically before ipc_worker_loop my ($self) = @_; my $v2w = delete $self->{-v2w_afc} or die 'BUG: {-v2w_afc} missing'; - $v2w->atfork_child; # calls shard_atfork_child on our siblings + $v2w->atfork_child; # calls ipc_sibling_atfork_child on our siblings $v2w->{current_info} = "[$self->{shard}]"; # for $SIG{__WARN__} $self->begin_txn_lazy; # caller must capture this: @@ -49,17 +57,6 @@ sub index_eml { $self->ipc_do('add_message', $eml, $smsg); } -# needed when there's multiple IPC workers and the parent forking -# causes newer siblings to inherit older siblings sockets -sub shard_atfork_child { - my ($self) = @_; - my $pid = delete($self->{-ipc_worker_pid}) or - die "BUG: $$ no -ipc_worker_pid"; - my $s1 = delete($self->{-ipc_sock}) or die "BUG: $$ no -ipc_sock"; - $pid == $$ and die "BUG: $$ shard_atfork_child called on itself"; - close($s1) or die "close -ipc_sock: $!"; -} - # wait for return to determine when ipc_do('commit_txn_lazy') is done sub echo { shift; @@ -80,7 +77,7 @@ sub shard_close { sub shard_over_check { my ($self, $over) = @_; - if ($self->{-ipc_sock} && $over->{dbh}) { + if ($self->{-ipc_req} && $over->{dbh}) { # can't send DB handles over IPC $over = ref($over)->new($over->{dbh}->sqlite_db_filename); } diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm index c4efbdd2..6be95979 100644 --- a/lib/PublicInbox/V2Writable.pm +++ b/lib/PublicInbox/V2Writable.pm @@ -823,7 +823,7 @@ sub content_exists ($$$) { sub atfork_child { my ($self) = @_; if (my $older_siblings = $self->{idx_shards}) { - $_->shard_atfork_child for @$older_siblings; + $_->ipc_sibling_atfork_child for @$older_siblings; } if (my $im = $self->{im}) { $im->atfork_child;
We can more clearly distinguish between v1 and v2-only code paths this way, and may be able to save a few cycles this way. --- lib/PublicInbox/SearchIdx.pm | 1 + lib/PublicInbox/SearchIdxShard.pm | 2 +- lib/PublicInbox/V2Writable.pm | 8 ++++++-- 3 files changed, 8 insertions(+), 3 deletions(-) diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm index a7005051..adced076 100644 --- a/lib/PublicInbox/SearchIdx.pm +++ b/lib/PublicInbox/SearchIdx.pm @@ -397,6 +397,7 @@ sub eml2doc ($$$;$) { sub add_xapian ($$$$) { my ($self, $eml, $smsg, $mids) = @_; + begin_txn_lazy($self); my $doc = eml2doc($self, $eml, $smsg, $mids); $self->{xdb}->replace_document($smsg->{num}, $doc); } diff --git a/lib/PublicInbox/SearchIdxShard.pm b/lib/PublicInbox/SearchIdxShard.pm index 0051df93..1598faeb 100644 --- a/lib/PublicInbox/SearchIdxShard.pm +++ b/lib/PublicInbox/SearchIdxShard.pm @@ -54,7 +54,7 @@ sub ipc_atfork_child { # called automatically before ipc_worker_loop sub index_eml { my ($self, $eml, $smsg, $eidx_key) = @_; $smsg->{eidx_key} = $eidx_key if defined $eidx_key; - $self->ipc_do('add_message', $eml, $smsg); + $self->ipc_do('add_xapian', $eml, $smsg); } # wait for return to determine when ipc_do('commit_txn_lazy') is done diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm index 6be95979..459c7e86 100644 --- a/lib/PublicInbox/V2Writable.pm +++ b/lib/PublicInbox/V2Writable.pm @@ -141,8 +141,10 @@ sub idx_shard ($$) { sub do_idx ($$$) { my ($self, $eml, $smsg) = @_; $self->{oidx}->add_overview($eml, $smsg); - my $idx = idx_shard($self, $smsg->{num}); - $idx->index_eml($eml, $smsg); + if ($self->{-need_xapian}) { + my $idx = idx_shard($self, $smsg->{num}); + $idx->index_eml($eml, $smsg); + } my $n = $self->{transact_bytes} += $smsg->{bytes}; $n >= $self->{batch_bytes}; } @@ -267,6 +269,7 @@ sub _idx_init { # with_umask callback my $max = $self->{shards} - 1; my $idx = $self->{idx_shards} = []; push @$idx, PublicInbox::SearchIdxShard->new($self, $_) for (0..$max); + $self->{-need_xapian} = $idx->[0]->need_xapian; # SearchIdxShard may do their own flushing, so don't scale # until after forking @@ -1129,6 +1132,7 @@ sub sync_prepare ($$) { sub unindex_oid_aux ($$$) { my ($self, $oid, $mid) = @_; my @removed = $self->{oidx}->remove_oid($oid, $mid); + return unless $self->{-need_xapian}; for my $num (@removed) { idx_shard($self, $num)->ipc_do('xdb_remove', $num); }