From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-Status: No, score=-3.8 required=3.0 tests=ALL_TRUSTED,AWL,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 03F812007F for ; Thu, 4 Feb 2021 09:59:32 +0000 (UTC) From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 10/10] lei import: initial implementation Date: Thu, 4 Feb 2021 00:59:30 -0900 Message-Id: <20210204095930.20278-11-e@80x24.org> In-Reply-To: <20210204095930.20278-1-e@80x24.org> References: <20210204095930.20278-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: Only tested with .eml files so far, but Maildir + IMAP will be supported. --- MANIFEST | 1 + lib/PublicInbox/IPC.pm | 4 +- lib/PublicInbox/LEI.pm | 48 ++++++++++++--- lib/PublicInbox/LeiImport.pm | 106 ++++++++++++++++++++++++++++++++++ lib/PublicInbox/LeiStore.pm | 18 ++++++ lib/PublicInbox/LeiXSearch.pm | 18 +----- t/lei.t | 15 +++++ 7 files changed, 184 insertions(+), 26 deletions(-) create mode 100644 lib/PublicInbox/LeiImport.pm diff --git a/MANIFEST b/MANIFEST index 6922f9b1..a11d4106 100644 --- a/MANIFEST +++ b/MANIFEST @@ -179,6 +179,7 @@ lib/PublicInbox/KQNotify.pm lib/PublicInbox/LEI.pm lib/PublicInbox/LeiDedupe.pm lib/PublicInbox/LeiExternal.pm +lib/PublicInbox/LeiImport.pm lib/PublicInbox/LeiOverview.pm lib/PublicInbox/LeiQuery.pm lib/PublicInbox/LeiSearch.pm diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm index 7f5a3f6f..a0e6bfee 100644 --- a/lib/PublicInbox/IPC.pm +++ b/lib/PublicInbox/IPC.pm @@ -101,7 +101,7 @@ sub ipc_worker_loop ($$$) { # starts a worker if Sereal or Storable is installed sub ipc_worker_spawn { - my ($self, $ident, $oldset) = @_; + my ($self, $ident, $oldset, $fields) = @_; return unless $enc; # no Sereal or Storable return if ($self->{-ipc_ppid} // -1) == $$; # idempotent delete(@$self{qw(-ipc_req -ipc_res -ipc_ppid -ipc_pid)}); @@ -123,6 +123,8 @@ sub ipc_worker_spawn { # ensure we properly exit even if warn() dies: my $end = PublicInbox::OnDestroy->new($$, sub { exit(!!$@) }); eval { + $fields //= {}; + local @$self{keys %$fields} = values(%$fields); my $on_destroy = $self->ipc_atfork_child; local %SIG = %SIG; ipc_worker_loop($self, $r_req, $w_res); diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index 24efb494..682d1bd1 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -160,9 +160,10 @@ our %CMD = ( # sorted in order of importance/use: 'forget-watch' => [ '{WATCH_NUMBER|--prune}', 'stop and forget a watch', qw(prune) ], -'import' => [ 'URL_OR_PATHNAME|--stdin', - 'one-shot import/update from URL or filesystem', - qw(stdin| offset=i recursive|r exclude=s include=s !flags), +'import' => [ 'URLS_OR_PATHNAMES...|--stdin', + 'one-time import/update from URL or filesystem', + qw(stdin| offset=i recursive|r exclude=s include|I=s + format|f=s flags!), ], 'config' => [ '[...]', sub { @@ -194,8 +195,8 @@ our %CMD = ( # sorted in order of importance/use: # $spec => [@ALLOWED_VALUES (default is first), $description], # $spec => $description # "$SUB_COMMAND TAB $spec" => as above -my $stdin_formats = [ 'IN|auto|raw|mboxrd|mboxcl2|mboxcl|mboxo', - 'specify message input format' ]; +my $stdin_formats = [ 'MAIL_FORMAT|eml|mboxrd|mboxcl2|mboxcl|mboxo', + 'specify message input format' ]; my $ls_format = [ 'OUT|plain|json|null', 'listing output format' ]; my %OPTDESC = ( @@ -240,6 +241,8 @@ my %OPTDESC = ( 'q jobs=s' => [ '[SEARCH_JOBS][,WRITER_JOBS]', 'control number of search and writer jobs' ], +'import format|f=s' => $stdin_formats, + 'ls-query format|f=s' => $ls_format, 'ls-external format|f=s' => $ls_format, @@ -319,6 +322,20 @@ sub err ($;@) { sub qerr ($;@) { $_[0]->{opt}->{quiet} or err(shift, @_) } +sub fail_handler ($;$$) { + my ($lei, $code, $io) = @_; + for my $f (qw(imp lxs l2m)) { + my $wq = delete $lei->{$f} or next; + $wq->wq_wait_old($lei) if $wq->wq_kill_old; # lei-daemon + } + close($io) if $io; # needed to avoid warnings on SIGPIPE + $lei->x_it($code // (1 >> 8)); +} + +sub sigpipe_handler { # handles SIGPIPE from l2m/lxs workers + fail_handler($_[0], 13, delete $_[0]->{1}); +} + sub fail ($$;$) { my ($self, $buf, $exit_code) = @_; err($self, $buf) if defined $buf; @@ -340,7 +357,8 @@ sub out ($;@) { sub puts ($;@) { out(shift, map { "$_\n" } @_) } sub child_error { # passes non-fatal curl exit codes to user - my ($self, $child_error) = @_; # child_error is $? + my ($self, $child_error, $msg) = @_; # child_error is $? + $self->err($msg) if $msg; if (my $s = $self->{pkt_op_p} // $self->{sock}) { # send to the parent lei-daemon or to lei(1) client send($s, "child_error $child_error", MSG_EOR); @@ -357,9 +375,16 @@ sub note_sigpipe { # triggers sigpipe_handler } sub lei_atfork_child { - my ($self) = @_; + my ($self, $persist) = @_; # we need to explicitly close things which are on stack - delete $self->{0}; + if ($persist) { + my @io = delete @$self{0,1,2}; + unless ($self->{oneshot}) { + close($_) for @io; + } + } else { + delete $self->{0}; + } for (delete @$self{qw(3 sock old_1 au_done)}) { close($_) if defined($_); } @@ -374,7 +399,7 @@ sub lei_atfork_child { %PATH2CFG = (); undef $errors_log; $quit = \&CORE::exit; - $current_lei = $self; # for SIG{__WARN__} + $current_lei = $persist ? undef : $self; # for SIG{__WARN__} } sub _help ($;$) { @@ -606,6 +631,11 @@ sub lei_config { x_it($self, $?) if $?; } +sub lei_import { + require PublicInbox::LeiImport; + PublicInbox::LeiImport->call(@_); +} + sub lei_init { my ($self, $dir) = @_; my $cfg = _lei_cfg($self, 1); diff --git a/lib/PublicInbox/LeiImport.pm b/lib/PublicInbox/LeiImport.pm new file mode 100644 index 00000000..4a9af8a7 --- /dev/null +++ b/lib/PublicInbox/LeiImport.pm @@ -0,0 +1,106 @@ +# Copyright (C) 2021 all contributors +# License: AGPL-3.0+ + +# front-end for the "lei import" sub-command +package PublicInbox::LeiImport; +use strict; +use v5.10.1; +use parent qw(PublicInbox::IPC); +use PublicInbox::MboxReader; +use PublicInbox::Eml; + +sub _import_eml { # MboxReader callback + my ($eml, $sto, $set_kw) = @_; + $sto->ipc_do('set_eml', $eml, $set_kw ? $sto->mbox_keywords($eml) : ()); +} + +sub import_done { # EOF callback for main daemon + my ($lei) = @_; + my $imp = delete $lei->{imp}; + $imp->wq_wait_old($lei) if $imp; + my $wait = $lei->{sto}->ipc_do('done'); + $lei->dclose; +} + +sub call { # the main "lei import" method + my ($cls, $lei, @argv) = @_; + my $sto = $lei->_lei_store(1); + $sto->write_prepare($lei); + $lei->{opt}->{flags} //= 1; + my $fmt = $lei->{opt}->{'format'}; + my $self = $lei->{imp} = bless {}, $cls; + return $lei->fail('--format unspecified') if !$fmt; + $self->{0} = $lei->{0} if $lei->{opt}->{stdin}; + my $ops = { + '!' => [ $lei->can('fail_handler'), $lei ], + 'x_it' => [ $lei->can('x_it'), $lei ], + 'child_error' => [ $lei->can('child_error'), $lei ], + '' => [ \&import_done, $lei ], + }; + ($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops); + my $j = $lei->{opt}->{jobs} // scalar(@argv) || 1; + my $nproc = $self->detect_nproc; + $j = $nproc if $j > $nproc; + $self->wq_workers_start('lei_import', $j, $lei->oldset, {lei => $lei}); + my $op = delete $lei->{pkt_op_c}; + delete $lei->{pkt_op_p}; + $self->wq_do('import_stdin', []) if $self->{0}; + for my $x (@argv) { + $self->wq_do('import_path_url', [], $x); + } + $self->wq_close(1); + $lei->event_step_init; # wait for shutdowns + if ($lei->{oneshot}) { + while ($op->{sock}) { $op->event_step } + } +} + +sub ipc_atfork_child { + my ($self) = @_; + $self->{lei}->lei_atfork_child; + $self->SUPER::ipc_atfork_child; +} + +sub _import_fh { + my ($lei, $fh, $x) = @_; + my $set_kw = $lei->{opt}->{flags}; + my $fmt = $lei->{opt}->{'format'}; + eval { + if ($fmt eq 'eml') { + my $buf = do { local $/; <$fh> } // + return $lei->child_error(1 >> 8, <<""); + error reading $x: $! + + my $eml = PublicInbox::Eml->new(\$buf); + _import_eml($eml, $lei->{sto}, $set_kw); + } else { # some mbox + my $cb = PublicInbox::MboxReader->can($fmt); + $cb or return $lei->child_error(1 >> 8, <<""); + --format $fmt unsupported for $x + + $cb->(undef, $fh, \&_import_eml, $lei->{sto}, $set_kw); + } + }; + $lei->child_error(1 >> 8, ": $@") if $@; +} + +sub import_path_url { + my ($self, $x) = @_; + my $lei = $self->{lei}; + # TODO auto-detect? + if (-f $x) { + open my $fh, '<', $x or return $lei->child_error(1 >> 8, <<""); +unable to open $x: $! + + _import_fh($lei, $fh, $x); + } else { + $lei->fail("$x unsupported (TODO)"); + } +} + +sub import_stdin { + my ($self) = @_; + _import_fh($self->{lei}, $self->{0}, ''); +} + +1; diff --git a/lib/PublicInbox/LeiStore.pm b/lib/PublicInbox/LeiStore.pm index a7d7d953..3a215973 100644 --- a/lib/PublicInbox/LeiStore.pm +++ b/lib/PublicInbox/LeiStore.pm @@ -17,6 +17,7 @@ use PublicInbox::V2Writable; use PublicInbox::ContentHash qw(content_hash content_digest); use PublicInbox::MID qw(mids mids_in); use PublicInbox::LeiSearch; +use PublicInbox::MDA; use List::Util qw(max); sub new { @@ -237,4 +238,21 @@ sub done { die $err if $err; } +sub ipc_atfork_child { + my ($self) = @_; + my $lei = delete $self->{lei}; + $lei->lei_atfork_child(1) if $lei; + $self->SUPER::ipc_atfork_child; +} + +sub write_prepare { + my ($self, $lei) = @_; + $self->ipc_lock_init; + # Mail we import into lei are private, so headers filtered out + # by -mda for public mail are not appropriate + local @PublicInbox::MDA::BAD_HEADERS = (); + $self->ipc_worker_spawn('lei_store', $lei->oldset, { lei => $lei }); + $lei->{sto} = $self; +} + 1; diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm index daf42098..f8068362 100644 --- a/lib/PublicInbox/LeiXSearch.pm +++ b/lib/PublicInbox/LeiXSearch.pm @@ -392,25 +392,11 @@ sub query_prepare { # called by wq_do pkt_do($lei->{pkt_op_p}, '.') == 1 or die "do_post_augment trigger: $!" } -sub fail_handler ($;$$) { - my ($lei, $code, $io) = @_; - for my $f (qw(lxs l2m)) { - my $wq = delete $lei->{$f} or next; - $wq->wq_wait_old($lei) if $wq->wq_kill_old; # lei-daemon - } - close($io) if $io; # needed to avoid warnings on SIGPIPE - $lei->x_it($code // (1 >> 8)); -} - -sub sigpipe_handler { # handles SIGPIPE from l2m/lxs workers - fail_handler($_[0], 13, delete $_[0]->{1}); -} - sub do_query { my ($self, $lei) = @_; my $ops = { - '|' => [ \&sigpipe_handler, $lei ], - '!' => [ \&fail_handler, $lei ], + '|' => [ $lei->can('sigpipe_handler'), $lei ], + '!' => [ $lei->can('fail_handler'), $lei ], '.' => [ \&do_post_augment, $lei ], '' => [ \&query_done, $lei ], 'mset_progress' => [ \&mset_progress, $lei ], diff --git a/t/lei.t b/t/lei.t index a08a6d0d..eb824a30 100644 --- a/t/lei.t +++ b/t/lei.t @@ -389,6 +389,20 @@ SKIP: { }; # /SKIP }; +my $test_import = sub { + $cleanup->(); + ok($lei->(qw(q s:boolean)), 'search miss before import'); + unlike($out, qr/boolean/i, 'no results, yet'); + open my $fh, '<', 't/data/0001.patch' or BAIL_OUT $!; + ok($lei->([qw(import -f eml -)], undef, { %$opt, 0 => $fh }), + 'import single file from stdin'); + close $fh; + ok($lei->(qw(q s:boolean)), 'search hit after import'); + ok($lei->(qw(import -f eml), 't/data/message_embed.eml'), + 'import single file by path'); + $cleanup->(); +}; + my $test_lei_common = sub { $test_help->(); $test_config->(); @@ -396,6 +410,7 @@ my $test_lei_common = sub { $test_external->(); $test_completion->(); $test_fail->(); + $test_import->(); }; if ($ENV{TEST_LEI_ONESHOT}) {