From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.6 (2021-04-09) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: X-Spam-Status: No, score=-4.2 required=3.0 tests=ALL_TRUSTED,AWL,BAYES_00, DKIM_SIGNED,DKIM_VALID,DKIM_VALID_AU,DKIM_VALID_EF shortcircuit=no autolearn=ham autolearn_force=no version=3.4.6 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id EA5AF1F4D0 for ; Wed, 11 Dec 2024 08:10:47 +0000 (UTC) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=80x24.org; s=selector1; t=1733904648; bh=bOWp7EU2cb3vYpQS9tjkQ5I0bqGZepJWUwFX9qvy8L8=; h=From:To:Subject:Date:In-Reply-To:References:From; b=sQdR8mzdjPWpHoj+QoFzNA2oH+dyK8C2JZ/Gthm8SzmmE3BBVuPKZvGAJhvLD0KHK R1sw0Nh3rSkNy9UzmHX3/MqUafzFXA/o/1Nlzdd1kHAPvYaVVH0RIBq5nagHfDBLT1 DD4DMcdRJG1aisuNC0bVc/rh0ElvQuSyHHV7Swls= From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 1/4] (ext)index: use time-based commits to avoid busy timeout Date: Wed, 11 Dec 2024 08:10:44 +0000 Message-ID: <20241211081047.1267062-2-e@80x24.org> In-Reply-To: <20241211081047.1267062-1-e@80x24.org> References: <20241211081047.1267062-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: With public-facing read-only daemons typically run from a different user than writers, we cannot rely on SQLite WAL (write-ahead-log) for parallelism since all readers need write permissions on read-write FSes to read from WAL DBs. Since we can't force or even encourage WAL use for public-facing inboxes, we need to ensure long-running --reindex jobs can commit occasionally to prevent read-only daemons from hitting the default 30s busy_timeout set by DBD::SQLite (not SQLite itself). This mainly affects --reindex users, but can also affect newly-cloned inboxes which are being served by read-only daemons while they're being indexed. This change only benefits read-only processes, and is likely to penalize writer performance and storage efficiency due to increased write frequency. We still maintain and respect --batch-size for memory sized-based commits in addition to time-based commits, but the new time-based commit interval is necessary in case the batch size is too large or the system is too slow to index a large batch. While Xapian doesn't need time-based commits for read parallelism, we commit to Xapian anyways since we want to minimize consistency problems on interrupted indexing jobs. Followup-to: 807abf67e14d (lei/store: auto-commit for long-running imports, 2024-11-15) --- lib/PublicInbox/ExtSearchIdx.pm | 14 ++++++-------- lib/PublicInbox/SearchIdx.pm | 29 +++++++++++++++++++++-------- lib/PublicInbox/V2Writable.pm | 22 ++++++++++------------ 3 files changed, 37 insertions(+), 28 deletions(-) diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm index d8db7d4b..fe2f5d2e 100644 --- a/lib/PublicInbox/ExtSearchIdx.pm +++ b/lib/PublicInbox/ExtSearchIdx.pm @@ -26,7 +26,8 @@ use PublicInbox::Isearch; use PublicInbox::MultiGit; use PublicInbox::Spawn (); use PublicInbox::Search; -use PublicInbox::SearchIdx qw(prepare_stack is_ancestor is_bad_blob); +use PublicInbox::SearchIdx qw(prepare_stack is_ancestor is_bad_blob + update_checkpoint); use PublicInbox::OverIdx; use PublicInbox::MiscIdx; use PublicInbox::MID qw(mids); @@ -123,12 +124,9 @@ sub attach_config { sub check_batch_limit ($) { my ($req) = @_; - my $self = $req->{self}; - my $new_smsg = $req->{new_smsg}; - my $n = $self->{transact_bytes} += $new_smsg->{bytes}; - # set flag for PublicInbox::V2Writable::index_todo: - ${$req->{need_checkpoint}} = 1 if $n >= $self->{batch_bytes}; + update_checkpoint $req->{self}, $req->{new_smsg} and + ${$req->{need_checkpoint}} = 1; } sub bad_ibx_id ($$;$) { @@ -535,7 +533,7 @@ sub eidx_gc { $self->{cfg} or die "E: GC requires ->attach_config\n"; $opt->{-idx_gc} = 1; my $sync = { - need_checkpoint => \(my $need_checkpoint = 0), + need_checkpoint => \(my $need_checkpoint), check_intvl => 10, next_check => now() + 10, checkpoint_unlocks => 1, @@ -1136,7 +1134,7 @@ sub eidx_sync { # main entry point $self->idx_init($opt); # acquire lock via V2Writable::_idx_init $self->{oidx}->rethread_prepare($opt); my $sync = { - need_checkpoint => \(my $need_checkpoint = 0), + need_checkpoint => \(my $need_checkpoint), check_intvl => 10, next_check => now() + 10, -opt => $opt, diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm index 48ba806a..3a85f552 100644 --- a/lib/PublicInbox/SearchIdx.pm +++ b/lib/PublicInbox/SearchIdx.pm @@ -12,6 +12,7 @@ use v5.10.1; use parent qw(PublicInbox::Search PublicInbox::Lock PublicInbox::Umask Exporter); use PublicInbox::Eml; +use PublicInbox::DS qw(now); use PublicInbox::Search qw(xap_terms); use PublicInbox::InboxWritable; use PublicInbox::MID qw(mids_for_index mids); @@ -28,11 +29,12 @@ use PublicInbox::MsgTime qw(msg_timestamp msg_datestamp); use PublicInbox::Address; use Config; our @EXPORT_OK = qw(log2stack is_ancestor check_size prepare_stack - index_text term_generator add_val is_bad_blob); + index_text term_generator add_val is_bad_blob update_checkpoint); my $X = \%PublicInbox::Search::X; our ($DB_CREATE_OR_OPEN, $DB_OPEN); our $DB_NO_SYNC = 0; our $DB_DANGEROUS = 0; +our $CHECKPOINT_INTVL = 5; # seconds our $BATCH_BYTES = $ENV{XAPIAN_FLUSH_THRESHOLD} ? 0x7fffffff : # assume a typical 64-bit system has 8x more RAM than a # typical 32-bit system: @@ -785,15 +787,23 @@ sub is_bad_blob ($$$$) { $size == 0 ? 1 : 0; # size == 0 means purged } +sub update_checkpoint ($$) { + my ($self, $smsg) = @_; + ($self->{transact_bytes} += $smsg->{bytes}) >= $self->{batch_bytes} and + return 1; + my $now = now; + my $next = $self->{next_checkpoint} //= $now + $CHECKPOINT_INTVL; + $now > $next; +} + sub index_both { # git->cat_async callback my ($bref, $oid, $type, $size, $sync) = @_; return if is_bad_blob($oid, $type, $size, $sync->{oid}); - my ($nr, $max) = @$sync{qw(nr max)}; - ++$$nr; - $$max -= $size; + ++${$sync->{nr}}; my $smsg = bless { blob => $oid }, 'PublicInbox::Smsg'; $smsg->set_bytes($$bref, $size); my $self = $sync->{sidx}; + ${$sync->{need_checkpoint}} = 1 if update_checkpoint $self, $smsg; local $self->{current_info} = "$self->{current_info}: $oid"; my $eml = PublicInbox::Eml->new($bref); $smsg->{num} = index_mm($self, $eml, $oid, $sync) or @@ -850,6 +860,7 @@ sub check_size { # check_async cb for -index --max-size=... sub v1_checkpoint ($$;$) { my ($self, $sync, $stk) = @_; $self->{ibx}->git->async_wait_all; + ${$sync->{need_checkpoint}} = undef; # $newest may be undef my $newest = $stk ? $stk->{latest_cmt} : ${$sync->{latest_cmt}}; @@ -859,7 +870,6 @@ sub v1_checkpoint ($$;$) { $self->{mm}->last_commit($newest); } } - ${$sync->{max}} = $self->{batch_bytes}; $self->{mm}->mm_commit; my $xdb = $self->{xdb}; if ($newest && $xdb) { @@ -889,17 +899,18 @@ sub v1_checkpoint ($$;$) { begin_txn_lazy($self); $self->{mm}->{dbh}->begin_work; } + $self->{transact_bytes} = 0; + delete $self->{next_checkpoint}; } # only for v1 sub process_stack { my ($self, $sync, $stk) = @_; my $git = $sync->{ibx}->git; - my $max = $self->{batch_bytes}; my $nr = 0; $sync->{nr} = \$nr; - $sync->{max} = \$max; $sync->{sidx} = $self; + $sync->{need_checkpoint} = \(my $need_ckpt); $sync->{latest_cmt} = \(my $latest_cmt); $self->{mm}->{dbh}->begin_work; @@ -926,10 +937,11 @@ sub process_stack { } else { $git->cat_async($oid, \&index_both, $arg); } - v1_checkpoint($self, $sync) if $max <= 0; } elsif ($f eq 'd') { $git->cat_async($oid, \&unindex_both, $arg); } + ${$sync->{need_checkpoint}} and + v1_checkpoint $self, $sync; } v1_checkpoint($self, $sync, $sync->{quit} ? undef : $stk); } @@ -1072,6 +1084,7 @@ sub _index_sync { $ibx->git->cat_file($tip); $ibx->git->check($tip); } + local $self->{transact_bytes} = 0; my $pr = $opt->{-progress}; my $sync = { reindex => $opt->{reindex}, -opt => $opt, ibx => $ibx }; my $quit = quit_cb($sync); diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm index af9aaef3..5f3bfde5 100644 --- a/lib/PublicInbox/V2Writable.pm +++ b/lib/PublicInbox/V2Writable.pm @@ -20,7 +20,8 @@ use PublicInbox::OverIdx; use PublicInbox::Msgmap; use PublicInbox::Spawn qw(spawn popen_rd run_die); use PublicInbox::Search; -use PublicInbox::SearchIdx qw(log2stack is_ancestor check_size is_bad_blob); +use PublicInbox::SearchIdx qw(log2stack is_ancestor check_size is_bad_blob + update_checkpoint); use PublicInbox::DS qw(now); use IO::Handle; # ->autoflush use POSIX (); @@ -106,8 +107,7 @@ sub do_idx ($$$) { my $idx = idx_shard($self, $smsg->{num}); $idx->index_eml($eml, $smsg); } - my $n = $self->{transact_bytes} += $smsg->{bytes}; - $n >= $self->{batch_bytes}; + update_checkpoint $self, $smsg; } # returns undef on duplicate or spam @@ -136,10 +136,7 @@ sub add { my $cmt = $im->add($mime, undef, $smsg); # sets $smsg->{ds|ts|blob} $cmt = $im->get_mark($cmt); $self->{last_commit}->[$self->{epoch_max}] = $cmt; - - if (do_idx($self, $mime, $smsg)) { - $self->checkpoint; - } + $self->checkpoint if do_idx $self, $mime, $smsg; $cmt; } @@ -564,6 +561,7 @@ shard[$i] bad echo:$echo != $i waiting for txn commit $dbh->begin_work; } } + delete $self->{next_checkpoint}; $self->{total_bytes} += $self->{transact_bytes}; $self->{transact_bytes} = 0; } @@ -819,9 +817,7 @@ sub index_oid { # cat_async callback }, 'PublicInbox::Smsg'; $smsg->populate($eml, $arg); $smsg->set_bytes($$bref, $size); - if (do_idx($self, $eml, $smsg)) { - ${$arg->{need_checkpoint}} = 1; - } + ${$arg->{need_checkpoint}} = 1 if do_idx $self, $eml, $smsg; index_finalize($arg, 1); } @@ -1103,7 +1099,6 @@ sub index_xap_only { # git->cat_async callback my $self = delete $smsg->{self}; my $idx = idx_shard($self, $smsg->{num}); $idx->index_eml(PublicInbox::Eml->new($bref), $smsg); - $self->{transact_bytes} += $smsg->{bytes}; } sub index_xap_step ($$$;$) { @@ -1122,7 +1117,10 @@ sub index_xap_step ($$$;$) { my $smsg = $ibx->over->get_art($num) or next; $smsg->{self} = $self; $ibx->git->cat_async($smsg->{blob}, \&index_xap_only, $smsg); - if ($self->{transact_bytes} >= $self->{batch_bytes}) { + # n.b. ignore CHECKPOINT_INTVL for Xapian-only, Xapian doesn't + # have timeout problems like SQLite + my $n = $self->{transact_bytes} += $smsg->{bytes}; + if ($n >= $self->{batch_bytes}) { ${$sync->{nr}} = $num; reindex_checkpoint($self, $sync); }