From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-Status: No, score=-4.0 required=3.0 tests=ALL_TRUSTED,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 07D7A1FA18 for ; Thu, 31 Dec 2020 13:51:55 +0000 (UTC) From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 05/36] sharedkv: fork()-friendly key-value store Date: Thu, 31 Dec 2020 13:51:23 +0000 Message-Id: <20201231135154.6070-6-e@80x24.org> In-Reply-To: <20201231135154.6070-1-e@80x24.org> References: <20201231135154.6070-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: This is intended for maintaining Maildir states, mbox message deduplication, but may be useful for other purposes... --- MANIFEST | 2 + lib/PublicInbox/Lock.pm | 10 ++- lib/PublicInbox/SharedKV.pm | 143 ++++++++++++++++++++++++++++++++++++ t/shared_kv.t | 57 ++++++++++++++ 4 files changed, 211 insertions(+), 1 deletion(-) create mode 100644 lib/PublicInbox/SharedKV.pm create mode 100644 t/shared_kv.t diff --git a/MANIFEST b/MANIFEST index 12b67e95..d32f064e 100644 --- a/MANIFEST +++ b/MANIFEST @@ -200,6 +200,7 @@ lib/PublicInbox/SearchIdxShard.pm lib/PublicInbox/SearchQuery.pm lib/PublicInbox/SearchThread.pm lib/PublicInbox/SearchView.pm +lib/PublicInbox/SharedKV.pm lib/PublicInbox/Sigfd.pm lib/PublicInbox/Smsg.pm lib/PublicInbox/SolverGit.pm @@ -377,6 +378,7 @@ t/run.perl t/search-amsg.eml t/search-thr-index.t t/search.t +t/shared_kv.t t/sigfd.t t/solve/0001-simple-mod.patch t/solve/0002-rename-with-modifications.patch diff --git a/lib/PublicInbox/Lock.pm b/lib/PublicInbox/Lock.pm index b2c8227f..7fd17745 100644 --- a/lib/PublicInbox/Lock.pm +++ b/lib/PublicInbox/Lock.pm @@ -4,9 +4,10 @@ # Base class for per-inbox locking package PublicInbox::Lock; use strict; -use warnings; +use v5.10.1; use Fcntl qw(:flock :DEFAULT); use Carp qw(croak); +use PublicInbox::OnDestroy; # we only acquire the flock if creating or reindexing; # PublicInbox::Import already has the lock on its own. @@ -32,4 +33,11 @@ sub lock_release { close $lockfh or croak "close $lock_path failed: $!\n"; } +# caller must use return value +sub lock_for_scope { + my ($self) = @_; + $self->lock_acquire; + PublicInbox::OnDestroy->new(\&lock_release, $self); +} + 1; diff --git a/lib/PublicInbox/SharedKV.pm b/lib/PublicInbox/SharedKV.pm new file mode 100644 index 00000000..52a7424e --- /dev/null +++ b/lib/PublicInbox/SharedKV.pm @@ -0,0 +1,143 @@ +# Copyright (C) 2020 all contributors +# License: AGPL-3.0+ + +# fork()-friendly key-value store. Will be used for making +# augmenting Maildirs and mboxes less expensive, maybe. +# We use flock(2) to avoid SQLite lock problems (busy timeouts, backoff) +package PublicInbox::SharedKV; +use strict; +use v5.10.1; +use parent qw(PublicInbox::Lock); +use File::Temp 0.19 (); # 0.19 for ->newdir +use DBI (); +use PublicInbox::Spawn; + +sub dbh { + my ($self, $lock) = @_; + $self->{dbh} //= do { + my $f = $self->{filename}; + $lock //= $self->lock_for_scope; + my $dbh = DBI->connect("dbi:SQLite:dbname=$f", '', '', { + AutoCommit => 1, + RaiseError => 1, + PrintError => 0, + sqlite_use_immediate_transaction => 1, + # no sqlite_unicode here, this is for binary data + }); + my $opt = $self->{opt} // {}; + $dbh->do('PRAGMA synchronous = OFF') if !$opt->{fsync}; + $dbh->do('PRAGMA cache_size = '.($opt->{cache_size} || 80000)); + $dbh->do('PRAGMA journal_mode = '. + ($opt->{journal_mode} // 'WAL')); + $dbh->do(<<''); +CREATE TABLE IF NOT EXISTS kv ( + k VARBINARY PRIMARY KEY NOT NULL, + v VARBINARY NOT NULL, + UNIQUE (k) +) + + $dbh->do('CREATE INDEX IF NOT EXISTS idx_v ON kv (v)'); + $dbh; + } +} + +sub new { + my ($cls, $dir, $base, $opt) = @_; + my $self = bless { opt => $opt }, $cls; + unless (defined $dir) { + $self->{tmp} = File::Temp->newdir('kv-XXXXXX', TMPDIR => 1); + $dir = $self->{tmp}->dirname; + } + -d $dir or mkdir($dir) or die "mkdir($dir): $!"; + $base //= ''; + my $f = $self->{filename} = "$dir/$base.sqlite3"; + $self->{lock_path} = $opt->{lock_path} // "$dir/$base.flock"; + unless (-f $f) { + open my $fh, '+>>', $f or die "failed to open $f: $!"; + PublicInbox::Spawn::nodatacow_fd(fileno($fh)); + } + $self; +} + +sub set_maybe { + my ($self, $key, $val, $lock) = @_; + $lock //= $self->lock_for_scope; + my $e = $self->{dbh}->prepare_cached(<<'')->execute($key, $val); +INSERT OR IGNORE INTO kv (k,v) VALUES (?, ?) + + $e == 0 ? undef : $e; +} + +# caller calls sth->fetchrow_array +sub each_kv_iter { + my ($self) = @_; + my $sth = $self->{dbh}->prepare_cached(<<'', undef, 1); +SELECT k,v FROM kv + + $sth->execute; + $sth +} + +sub delete_by_val { + my ($self, $val, $lock) = @_; + $lock //= $self->lock_for_scope; + $self->{dbh}->prepare_cached(<<'')->execute($val) + 0; +DELETE FROM kv WHERE v = ? + +} + +sub replace_values { + my ($self, $oldval, $newval, $lock) = @_; + $lock //= $self->lock_for_scope; + $self->{dbh}->prepare_cached(<<'')->execute($newval, $oldval) + 0; +UPDATE kv SET v = ? WHERE v = ? + +} + +sub set { + my ($self, $key, $val) = @_; + if (defined $val) { + my $e = $self->{dbh}->prepare_cached(<<'')->execute($key, $val); +INSERT OR REPLACE INTO kv (k,v) VALUES (?,?) + + $e == 0 ? undef : $e; + } else { + $self->{dbh}->prepare_cached(<<'')->execute($key); +DELETE FROM kv WHERE k = ? + + } +} + +sub get { + my ($self, $key) = @_; + my $sth = $self->{dbh}->prepare_cached(<<'', undef, 1); +SELECT v FROM kv WHERE k = ? + + $sth->execute($key); + $sth->fetchrow_array; +} + +sub xchg { + my ($self, $key, $newval, $lock) = @_; + $lock //= $self->lock_for_scope; + my $oldval = get($self, $key); + if (defined $newval) { + set($self, $key, $newval); + } else { + $self->{dbh}->prepare_cached(<<'')->execute($key); +DELETE FROM kv WHERE k = ? + + } + $oldval; +} + +sub count { + my ($self) = @_; + my $sth = $self->{dbh}->prepare_cached(<<''); +SELECT COUNT(k) FROM kv + + $sth->execute; + $sth->fetchrow_array; +} + +1; diff --git a/t/shared_kv.t b/t/shared_kv.t new file mode 100644 index 00000000..4b727462 --- /dev/null +++ b/t/shared_kv.t @@ -0,0 +1,57 @@ +#!perl -w +# Copyright (C) 2020 all contributors +# License: AGPL-3.0+ +use strict; +use v5.10.1; +use Test::More; +use PublicInbox::TestCommon; +use_ok 'PublicInbox::SharedKV'; +my ($tmpdir, $for_destroy) = tmpdir(); +local $ENV{TMPDIR} = $tmpdir; +my $skv = PublicInbox::SharedKV->new; +opendir(my $dh, $tmpdir) or BAIL_OUT $!; +my @ent = grep(!/\A\.\.?\z/, readdir($dh)); +is(scalar(@ent), 1, 'created a temporary dir'); +$skv->dbh; +my $dead = "\xde\xad"; +my $beef = "\xbe\xef"; +my $cafe = "\xca\xfe"; +ok($skv->set($dead, $beef), 'set'); +is($skv->get($dead), $beef, 'get'); +ok($skv->set($dead, $beef), 'set idempotent'); +ok(!$skv->set_maybe($dead, $cafe), 'set_maybe ignores'); +ok($skv->set_maybe($cafe, $dead), 'set_maybe sets'); +is($skv->xchg($dead, $cafe), $beef, 'xchg'); +is($skv->get($dead), $cafe, 'get after xchg'); +is($skv->xchg($dead, undef), $cafe, 'xchg to undef'); +is($skv->get($dead), undef, 'get after xchg to undef'); +is($skv->get($cafe), $dead, 'get after set_maybe'); +is($skv->replace_values($dead, $cafe), 1, 'replaced one by value'); +is($skv->get($cafe), $cafe, 'value updated'); +is($skv->replace_values($dead, $cafe), 0, 'replaced none by value'); +is($skv->xchg($dead, $cafe), undef, 'xchg from undef'); +is($skv->count, 2, 'count works'); + +my %seen; +my $sth = $skv->each_kv_iter; +while (my ($k, $v) = $sth->fetchrow_array) { + $seen{$k} = $v; +} +is($seen{$dead}, $cafe, '$dead has expected value'); +is($seen{$cafe}, $cafe, '$cafe has expected value'); +is(scalar keys %seen, 2, 'iterated through all'); + +is($skv->replace_values($cafe, $dead), 2, 'replaced 2 by value'); +is($skv->delete_by_val('bogus'), 0, 'delete_by_val misses'); +is($skv->delete_by_val($dead), 2, 'delete_by_val hits'); +is($skv->delete_by_val($dead), 0, 'delete_by_val misses again'); + +undef $skv; +rewinddir($dh); +@ent = grep(!/\A\.\.?\z/, readdir($dh)); +is(scalar(@ent), 0, 'temporary dir gone'); +undef $dh; +$skv = PublicInbox::SharedKV->new("$tmpdir/dir", 'base'); +ok(-e "$tmpdir/dir/base.sqlite3", 'file created'); + +done_testing;