* [PATCH 1/4] (ext)index: use time-based commits to avoid busy timeout
2024-12-11 8:10 [PATCH 0/4] various indexing updates Eric Wong
@ 2024-12-11 8:10 ` Eric Wong
2024-12-11 8:10 ` [PATCH 2/4] lei/store: use global checkpoint interval Eric Wong
` (2 subsequent siblings)
3 siblings, 0 replies; 5+ messages in thread
From: Eric Wong @ 2024-12-11 8:10 UTC (permalink / raw)
To: meta
With public-facing read-only daemons typically run from a
different user than writers, we cannot rely on SQLite WAL
(write-ahead-log) for parallelism since all readers need write
permissions on read-write FSes to read from WAL DBs. Since we
can't force or even encourage WAL use for public-facing inboxes,
we need to ensure long-running --reindex jobs can commit
occasionally to prevent read-only daemons from hitting the
default 30s busy_timeout set by DBD::SQLite (not SQLite itself).
This mainly affects --reindex users, but can also affect
newly-cloned inboxes which are being served by read-only
daemons while they're being indexed.
This change only benefits read-only processes, and is likely to
penalize writer performance and storage efficiency due to
increased write frequency. We still maintain and respect
--batch-size for memory sized-based commits in addition to
time-based commits, but the new time-based commit interval is
necessary in case the batch size is too large or the system
is too slow to index a large batch.
While Xapian doesn't need time-based commits for read
parallelism, we commit to Xapian anyways since we want to
minimize consistency problems on interrupted indexing jobs.
Followup-to: 807abf67e14d (lei/store: auto-commit for long-running imports, 2024-11-15)
---
lib/PublicInbox/ExtSearchIdx.pm | 14 ++++++--------
lib/PublicInbox/SearchIdx.pm | 29 +++++++++++++++++++++--------
lib/PublicInbox/V2Writable.pm | 22 ++++++++++------------
3 files changed, 37 insertions(+), 28 deletions(-)
diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm
index d8db7d4b..fe2f5d2e 100644
--- a/lib/PublicInbox/ExtSearchIdx.pm
+++ b/lib/PublicInbox/ExtSearchIdx.pm
@@ -26,7 +26,8 @@ use PublicInbox::Isearch;
use PublicInbox::MultiGit;
use PublicInbox::Spawn ();
use PublicInbox::Search;
-use PublicInbox::SearchIdx qw(prepare_stack is_ancestor is_bad_blob);
+use PublicInbox::SearchIdx qw(prepare_stack is_ancestor is_bad_blob
+ update_checkpoint);
use PublicInbox::OverIdx;
use PublicInbox::MiscIdx;
use PublicInbox::MID qw(mids);
@@ -123,12 +124,9 @@ sub attach_config {
sub check_batch_limit ($) {
my ($req) = @_;
- my $self = $req->{self};
- my $new_smsg = $req->{new_smsg};
- my $n = $self->{transact_bytes} += $new_smsg->{bytes};
-
# set flag for PublicInbox::V2Writable::index_todo:
- ${$req->{need_checkpoint}} = 1 if $n >= $self->{batch_bytes};
+ update_checkpoint $req->{self}, $req->{new_smsg} and
+ ${$req->{need_checkpoint}} = 1;
}
sub bad_ibx_id ($$;$) {
@@ -535,7 +533,7 @@ sub eidx_gc {
$self->{cfg} or die "E: GC requires ->attach_config\n";
$opt->{-idx_gc} = 1;
my $sync = {
- need_checkpoint => \(my $need_checkpoint = 0),
+ need_checkpoint => \(my $need_checkpoint),
check_intvl => 10,
next_check => now() + 10,
checkpoint_unlocks => 1,
@@ -1136,7 +1134,7 @@ sub eidx_sync { # main entry point
$self->idx_init($opt); # acquire lock via V2Writable::_idx_init
$self->{oidx}->rethread_prepare($opt);
my $sync = {
- need_checkpoint => \(my $need_checkpoint = 0),
+ need_checkpoint => \(my $need_checkpoint),
check_intvl => 10,
next_check => now() + 10,
-opt => $opt,
diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm
index 48ba806a..3a85f552 100644
--- a/lib/PublicInbox/SearchIdx.pm
+++ b/lib/PublicInbox/SearchIdx.pm
@@ -12,6 +12,7 @@ use v5.10.1;
use parent qw(PublicInbox::Search PublicInbox::Lock PublicInbox::Umask
Exporter);
use PublicInbox::Eml;
+use PublicInbox::DS qw(now);
use PublicInbox::Search qw(xap_terms);
use PublicInbox::InboxWritable;
use PublicInbox::MID qw(mids_for_index mids);
@@ -28,11 +29,12 @@ use PublicInbox::MsgTime qw(msg_timestamp msg_datestamp);
use PublicInbox::Address;
use Config;
our @EXPORT_OK = qw(log2stack is_ancestor check_size prepare_stack
- index_text term_generator add_val is_bad_blob);
+ index_text term_generator add_val is_bad_blob update_checkpoint);
my $X = \%PublicInbox::Search::X;
our ($DB_CREATE_OR_OPEN, $DB_OPEN);
our $DB_NO_SYNC = 0;
our $DB_DANGEROUS = 0;
+our $CHECKPOINT_INTVL = 5; # seconds
our $BATCH_BYTES = $ENV{XAPIAN_FLUSH_THRESHOLD} ? 0x7fffffff :
# assume a typical 64-bit system has 8x more RAM than a
# typical 32-bit system:
@@ -785,15 +787,23 @@ sub is_bad_blob ($$$$) {
$size == 0 ? 1 : 0; # size == 0 means purged
}
+sub update_checkpoint ($$) {
+ my ($self, $smsg) = @_;
+ ($self->{transact_bytes} += $smsg->{bytes}) >= $self->{batch_bytes} and
+ return 1;
+ my $now = now;
+ my $next = $self->{next_checkpoint} //= $now + $CHECKPOINT_INTVL;
+ $now > $next;
+}
+
sub index_both { # git->cat_async callback
my ($bref, $oid, $type, $size, $sync) = @_;
return if is_bad_blob($oid, $type, $size, $sync->{oid});
- my ($nr, $max) = @$sync{qw(nr max)};
- ++$$nr;
- $$max -= $size;
+ ++${$sync->{nr}};
my $smsg = bless { blob => $oid }, 'PublicInbox::Smsg';
$smsg->set_bytes($$bref, $size);
my $self = $sync->{sidx};
+ ${$sync->{need_checkpoint}} = 1 if update_checkpoint $self, $smsg;
local $self->{current_info} = "$self->{current_info}: $oid";
my $eml = PublicInbox::Eml->new($bref);
$smsg->{num} = index_mm($self, $eml, $oid, $sync) or
@@ -850,6 +860,7 @@ sub check_size { # check_async cb for -index --max-size=...
sub v1_checkpoint ($$;$) {
my ($self, $sync, $stk) = @_;
$self->{ibx}->git->async_wait_all;
+ ${$sync->{need_checkpoint}} = undef;
# $newest may be undef
my $newest = $stk ? $stk->{latest_cmt} : ${$sync->{latest_cmt}};
@@ -859,7 +870,6 @@ sub v1_checkpoint ($$;$) {
$self->{mm}->last_commit($newest);
}
}
- ${$sync->{max}} = $self->{batch_bytes};
$self->{mm}->mm_commit;
my $xdb = $self->{xdb};
if ($newest && $xdb) {
@@ -889,17 +899,18 @@ sub v1_checkpoint ($$;$) {
begin_txn_lazy($self);
$self->{mm}->{dbh}->begin_work;
}
+ $self->{transact_bytes} = 0;
+ delete $self->{next_checkpoint};
}
# only for v1
sub process_stack {
my ($self, $sync, $stk) = @_;
my $git = $sync->{ibx}->git;
- my $max = $self->{batch_bytes};
my $nr = 0;
$sync->{nr} = \$nr;
- $sync->{max} = \$max;
$sync->{sidx} = $self;
+ $sync->{need_checkpoint} = \(my $need_ckpt);
$sync->{latest_cmt} = \(my $latest_cmt);
$self->{mm}->{dbh}->begin_work;
@@ -926,10 +937,11 @@ sub process_stack {
} else {
$git->cat_async($oid, \&index_both, $arg);
}
- v1_checkpoint($self, $sync) if $max <= 0;
} elsif ($f eq 'd') {
$git->cat_async($oid, \&unindex_both, $arg);
}
+ ${$sync->{need_checkpoint}} and
+ v1_checkpoint $self, $sync;
}
v1_checkpoint($self, $sync, $sync->{quit} ? undef : $stk);
}
@@ -1072,6 +1084,7 @@ sub _index_sync {
$ibx->git->cat_file($tip);
$ibx->git->check($tip);
}
+ local $self->{transact_bytes} = 0;
my $pr = $opt->{-progress};
my $sync = { reindex => $opt->{reindex}, -opt => $opt, ibx => $ibx };
my $quit = quit_cb($sync);
diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm
index af9aaef3..5f3bfde5 100644
--- a/lib/PublicInbox/V2Writable.pm
+++ b/lib/PublicInbox/V2Writable.pm
@@ -20,7 +20,8 @@ use PublicInbox::OverIdx;
use PublicInbox::Msgmap;
use PublicInbox::Spawn qw(spawn popen_rd run_die);
use PublicInbox::Search;
-use PublicInbox::SearchIdx qw(log2stack is_ancestor check_size is_bad_blob);
+use PublicInbox::SearchIdx qw(log2stack is_ancestor check_size is_bad_blob
+ update_checkpoint);
use PublicInbox::DS qw(now);
use IO::Handle; # ->autoflush
use POSIX ();
@@ -106,8 +107,7 @@ sub do_idx ($$$) {
my $idx = idx_shard($self, $smsg->{num});
$idx->index_eml($eml, $smsg);
}
- my $n = $self->{transact_bytes} += $smsg->{bytes};
- $n >= $self->{batch_bytes};
+ update_checkpoint $self, $smsg;
}
# returns undef on duplicate or spam
@@ -136,10 +136,7 @@ sub add {
my $cmt = $im->add($mime, undef, $smsg); # sets $smsg->{ds|ts|blob}
$cmt = $im->get_mark($cmt);
$self->{last_commit}->[$self->{epoch_max}] = $cmt;
-
- if (do_idx($self, $mime, $smsg)) {
- $self->checkpoint;
- }
+ $self->checkpoint if do_idx $self, $mime, $smsg;
$cmt;
}
@@ -564,6 +561,7 @@ shard[$i] bad echo:$echo != $i waiting for txn commit
$dbh->begin_work;
}
}
+ delete $self->{next_checkpoint};
$self->{total_bytes} += $self->{transact_bytes};
$self->{transact_bytes} = 0;
}
@@ -819,9 +817,7 @@ sub index_oid { # cat_async callback
}, 'PublicInbox::Smsg';
$smsg->populate($eml, $arg);
$smsg->set_bytes($$bref, $size);
- if (do_idx($self, $eml, $smsg)) {
- ${$arg->{need_checkpoint}} = 1;
- }
+ ${$arg->{need_checkpoint}} = 1 if do_idx $self, $eml, $smsg;
index_finalize($arg, 1);
}
@@ -1103,7 +1099,6 @@ sub index_xap_only { # git->cat_async callback
my $self = delete $smsg->{self};
my $idx = idx_shard($self, $smsg->{num});
$idx->index_eml(PublicInbox::Eml->new($bref), $smsg);
- $self->{transact_bytes} += $smsg->{bytes};
}
sub index_xap_step ($$$;$) {
@@ -1122,7 +1117,10 @@ sub index_xap_step ($$$;$) {
my $smsg = $ibx->over->get_art($num) or next;
$smsg->{self} = $self;
$ibx->git->cat_async($smsg->{blob}, \&index_xap_only, $smsg);
- if ($self->{transact_bytes} >= $self->{batch_bytes}) {
+ # n.b. ignore CHECKPOINT_INTVL for Xapian-only, Xapian doesn't
+ # have timeout problems like SQLite
+ my $n = $self->{transact_bytes} += $smsg->{bytes};
+ if ($n >= $self->{batch_bytes}) {
${$sync->{nr}} = $num;
reindex_checkpoint($self, $sync);
}
^ permalink raw reply related [flat|nested] 5+ messages in thread