* [PATCH 0/2] process IO fixes
@ 2023-10-08 22:11 Eric Wong
2023-10-08 22:11 ` [PATCH 1/2] process_io: fix binmode and use it in lei_xsearch Eric Wong
2023-10-08 22:11 ` [PATCH 2/2] introduce ProcessIONBF for multiplexed non-blocking IO Eric Wong
0 siblings, 2 replies; 3+ messages in thread
From: Eric Wong @ 2023-10-08 22:11 UTC (permalink / raw)
To: meta
Eric Wong (2):
process_io: fix binmode and use it in lei_xsearch
introduce ProcessIONBF for multiplexed non-blocking IO
MANIFEST | 1 +
lib/PublicInbox/Git.pm | 9 ++++-----
lib/PublicInbox/HTTPD/Async.pm | 4 ++--
lib/PublicInbox/LeiXSearch.pm | 23 ++++++++---------------
lib/PublicInbox/ProcessIO.pm | 4 ++--
lib/PublicInbox/ProcessIONBF.pm | 25 +++++++++++++++++++++++++
t/lei-q-remote-import.t | 33 ++++++++++++++++++++-------------
7 files changed, 62 insertions(+), 37 deletions(-)
create mode 100644 lib/PublicInbox/ProcessIONBF.pm
^ permalink raw reply [flat|nested] 3+ messages in thread
* [PATCH 1/2] process_io: fix binmode and use it in lei_xsearch
2023-10-08 22:11 [PATCH 0/2] process IO fixes Eric Wong
@ 2023-10-08 22:11 ` Eric Wong
2023-10-08 22:11 ` [PATCH 2/2] introduce ProcessIONBF for multiplexed non-blocking IO Eric Wong
1 sibling, 0 replies; 3+ messages in thread
From: Eric Wong @ 2023-10-08 22:11 UTC (permalink / raw)
To: meta
The `binmode' perlop can only take two scalars, so passing
`@_' blindly won't work since prototypes are checked. This
means we can get IO::Uncompress::Gunzip working properly
with ProcessIO and use it for curl.
We'll also just autodie (instead of warn) on FS errors when
dealing with curl stderr; since the process will likely be
in bigger trouble soon, anyways.
---
lib/PublicInbox/LeiXSearch.pm | 23 ++++++++---------------
lib/PublicInbox/ProcessIO.pm | 4 ++--
t/lei-q-remote-import.t | 33 ++++++++++++++++++++-------------
3 files changed, 30 insertions(+), 30 deletions(-)
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index 4077191f..fbafa324 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -21,6 +21,7 @@ use PublicInbox::LEI;
use Fcntl qw(SEEK_SET F_SETFL O_APPEND O_RDWR);
use PublicInbox::ContentHash qw(git_sha);
use POSIX qw(strftime);
+use autodie qw(read seek truncate);
sub new {
my ($class) = @_;
@@ -353,29 +354,21 @@ sub query_remote_mboxrd {
$uri->query_form(@qform, q => $q);
my $cmd = $curl->for_uri($lei, $uri);
$lei->qerr("# $cmd");
- my ($fh, $pid) = popen_rd($cmd, undef, $rdr);
- my $reap_curl = PublicInbox::AutoReap->new($pid);
- $fh = IO::Uncompress::Gunzip->new($fh, MultiStream => 1);
+ my $cfh = popen_rd($cmd, undef, $rdr);
+ my $fh = IO::Uncompress::Gunzip->new($cfh, MultiStream => 1);
PublicInbox::MboxReader->mboxrd($fh, \&each_remote_eml, $self,
$lei, $each_smsg);
$lei->sto_done_request if delete($self->{-sto_imported});
- $reap_curl->join;
my $nr = delete $lei->{-nr_remote_eml} // 0;
- if ($? == 0) {
- # don't update if no results, maybe MTA is down
+ close $cfh;
+ if ($? == 0) { # don't update if no results, maybe MTA is down
$lei->{lss}->cfg_set($key, $start) if $key && $nr;
mset_progress($lei, $lei->{-current_url}, $nr, $nr);
next;
}
- my $err;
- if (-s $cerr) {
- seek($cerr, 0, SEEK_SET) //
- warn "seek($cmd stderr): $!";
- $err = do { local $/; <$cerr> } //
- warn "read($cmd stderr): $!";
- truncate($cerr, 0) // warn "truncate($cmd stderr): $!";
- }
- $err //= '';
+ seek($cerr, 0, SEEK_SET);
+ read($cerr, my $err, -s $cerr);
+ truncate($cerr, 0);
next if (($? >> 8) == 22 && $err =~ /\b404\b/);
$uri->query_form(q => $qstr);
$lei->child_error($?, "E: <$uri> $err");
diff --git a/lib/PublicInbox/ProcessIO.pm b/lib/PublicInbox/ProcessIO.pm
index 5a81e3a6..f120edd0 100644
--- a/lib/PublicInbox/ProcessIO.pm
+++ b/lib/PublicInbox/ProcessIO.pm
@@ -33,8 +33,8 @@ sub TIEHANDLE {
# for IO::Uncompress::Gunzip
sub BINMODE {
- my $self = shift;
- binmode($self->{fh}, @_);
+ return binmode($_[0]->{fh}) if @_ == 1;
+ binmode $_[0]->{fh}, $_[1];
}
sub READ { read($_[0]->{fh}, $_[1], $_[2], $_[3] || 0) }
diff --git a/t/lei-q-remote-import.t b/t/lei-q-remote-import.t
index 92d8c9b6..885fa3e1 100644
--- a/t/lei-q-remote-import.t
+++ b/t/lei-q-remote-import.t
@@ -1,7 +1,8 @@
#!perl -w
-# Copyright (C) 2021 all contributors <meta@public-inbox.org>
+# Copyright (C) all contributors <meta@public-inbox.org>
# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
-use strict; use v5.10.1; use PublicInbox::TestCommon;
+use v5.12; use PublicInbox::TestCommon;
+use autodie qw(open close unlink);
require_mods(qw(lei -httpd));
require_cmd 'curl';
use PublicInbox::MboxReader;
@@ -16,7 +17,7 @@ my $url = "http://$host_port/t2/";
my $exp1 = [ eml_load('t/plack-qp.eml') ];
my $exp2 = [ eml_load('t/iso-2202-jp.eml') ];
my $slurp_emls = sub {
- open my $fh, '<', $_[0] or BAIL_OUT "open: $!";
+ open my $fh, '<', $_[0];
my @eml;
PublicInbox::MboxReader->mboxrd($fh, sub {
my $eml = shift;
@@ -31,33 +32,33 @@ test_lei({ tmpdir => $tmpdir }, sub {
my @cmd = ('q', '-o', "mboxrd:$o", 'm:qp@example.com');
lei_ok(@cmd);
ok(-f $o && !-s _, 'output exists but is empty');
- unlink $o or BAIL_OUT $!;
+ unlink $o;
lei_ok(@cmd, '-I', $url);
is_deeply($slurp_emls->($o), $exp1, 'got results after remote search');
- unlink $o or BAIL_OUT $!;
+ unlink $o;
lei_ok(@cmd);
ok(-f $o && -s _, 'output exists after import but is not empty') or
diag $lei_err;
is_deeply($slurp_emls->($o), $exp1, 'got results w/o remote search');
- unlink $o or BAIL_OUT $!;
+ unlink $o;
$cmd[-1] = 'm:199707281508.AAA24167@hoyogw.example';
lei_ok(@cmd, '-I', $url, '--no-import-remote');
is_deeply($slurp_emls->($o), $exp2, 'got another after remote search');
- unlink $o or BAIL_OUT $!;
+ unlink $o;
lei_ok(@cmd);
ok(-f $o && !-s _, '--no-import-remote did not memoize');
open my $fh, '>', "$o.lock";
$cmd[-1] = 'm:qp@example.com';
- unlink $o or xbail("unlink $o $! cwd=".Cwd::getcwd());
+ unlink $o;
lei_ok(@cmd, '--lock=none');
ok(-f $o && -s _, '--lock=none respected') or diag $lei_err;
- unlink $o or xbail("unlink $o $! cwd=".Cwd::getcwd());
+ unlink $o;
ok(!lei(@cmd, '--lock=dotlock,timeout=0.000001'), 'dotlock fails');
like($lei_err, qr/dotlock timeout/, 'timeout noted');
ok(-f $o && !-s _, 'nothing output on lock failure');
- unlink "$o.lock" or BAIL_OUT $!;
+ unlink "$o.lock";
lei_ok(@cmd, '--lock=dotlock,timeout=0.000001',
\'succeeds after lock removal');
@@ -76,8 +77,8 @@ test_lei({ tmpdir => $tmpdir }, sub {
'm:testmessage@example.com');
is($lei_out, '', 'message not imported when in local external');
- open $fh, '>', $o or BAIL_OUT;
- print $fh <<'EOF' or BAIL_OUT;
+ open $fh, '>', $o;
+ print $fh <<'EOF';
From a@z Mon Sep 17 00:00:00 2001
From: nobody@localhost
Date: Sat, 13 Mar 2021 18:23:01 +0600
@@ -86,7 +87,7 @@ Status: OR
whatever
EOF
- close $fh or BAIL_OUT;
+ close $fh;
lei_ok(qw(q -o), "mboxrd:$o", 'm:testmessage@example.com');
is_deeply($slurp_emls->($o), [$exp],
'got expected result after clobber') or diag $lei_err;
@@ -103,5 +104,11 @@ EOF
lei_ok([qw(edit-search), "$ENV{HOME}/md"], $edit_env);
like($lei_out, qr/^\Q[external "$url"]\E\n\s*lastresult = \d+/sm,
'lastresult set');
+
+ unlink $o;
+ lei_ok qw(q --no-save -q m:never2exist@example.com -o), "mboxrd:$o",
+ '--only', $url,
+ \'404 curl exit (22) does not influence lei(1)';
+ is(-s $o, 0, 'empty result');
});
done_testing;
^ permalink raw reply related [flat|nested] 3+ messages in thread
* [PATCH 2/2] introduce ProcessIONBF for multiplexed non-blocking IO
2023-10-08 22:11 [PATCH 0/2] process IO fixes Eric Wong
2023-10-08 22:11 ` [PATCH 1/2] process_io: fix binmode and use it in lei_xsearch Eric Wong
@ 2023-10-08 22:11 ` Eric Wong
1 sibling, 0 replies; 3+ messages in thread
From: Eric Wong @ 2023-10-08 22:11 UTC (permalink / raw)
To: meta
This is required for reliable epoll/kevent/poll/select
wakeup notifications, since we have no visibility into
the buffer states used internally by Perl.
We can safely use sysread here since we never use the :utf8
nor any :encoding Perl IO layers for readable pipes.
I suspect this fixes occasional failures from t/solver_git.t
when retrieving the WwwCoderepo summary.
---
MANIFEST | 1 +
lib/PublicInbox/Git.pm | 9 ++++-----
lib/PublicInbox/HTTPD/Async.pm | 4 ++--
lib/PublicInbox/ProcessIONBF.pm | 25 +++++++++++++++++++++++++
4 files changed, 32 insertions(+), 7 deletions(-)
create mode 100644 lib/PublicInbox/ProcessIONBF.pm
diff --git a/MANIFEST b/MANIFEST
index c972818f..791d91a7 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -319,6 +319,7 @@ lib/PublicInbox/POP3.pm
lib/PublicInbox/POP3D.pm
lib/PublicInbox/PktOp.pm
lib/PublicInbox/ProcessIO.pm
+lib/PublicInbox/ProcessIONBF.pm
lib/PublicInbox/Qspawn.pm
lib/PublicInbox/Reply.pm
lib/PublicInbox/RepoAtom.pm
diff --git a/lib/PublicInbox/Git.pm b/lib/PublicInbox/Git.pm
index 94d5dcee..448cfaf7 100644
--- a/lib/PublicInbox/Git.pm
+++ b/lib/PublicInbox/Git.pm
@@ -12,7 +12,6 @@ use v5.10.1;
use parent qw(Exporter PublicInbox::DS);
use autodie qw(socketpair);
use POSIX ();
-use IO::Handle; # ->blocking
use Socket qw(AF_UNIX SOCK_STREAM);
use PublicInbox::Syscall qw(EPOLLIN EPOLLET);
use Errno qw(EINTR EAGAIN);
@@ -20,6 +19,7 @@ use File::Glob qw(bsd_glob GLOB_NOSORT);
use File::Spec ();
use Time::HiRes qw(stat);
use PublicInbox::Spawn qw(spawn popen_rd which);
+use PublicInbox::ProcessIONBF;
use PublicInbox::Tmpfile;
use IO::Poll qw(POLLIN);
use Carp qw(croak carp);
@@ -146,7 +146,6 @@ sub _sock_cmd {
my ($self, $batch, $err_c) = @_;
$self->{sock} and Carp::confess('BUG: {sock} exists');
socketpair(my $s1, my $s2, AF_UNIX, SOCK_STREAM, 0);
- $s1->blocking(0);
my $opt = { pgid => 0, 0 => $s2, 1 => $s2 };
my $gd = $self->{git_dir};
if ($gd =~ s!/([^/]+/[^/]+)\z!/!) {
@@ -165,7 +164,7 @@ sub _sock_cmd {
$self->fail("tmpfile($id): $!");
}
my $pid = spawn(\@cmd, undef, $opt);
- $self->{sock} = PublicInbox::ProcessIO->maybe_new($pid, $s1);
+ $self->{sock} = PublicInbox::ProcessIONBF->new($pid, $s1);
}
sub poll_in ($) { IO::Poll::_poll($RDTIMEO, fileno($_[0]), my $ev = POLLIN) }
@@ -626,8 +625,8 @@ sub cleanup_if_unlinked {
my $ret = 0;
for my $obj ($self, ($self->{ck} // ())) {
my $sock = $obj->{sock} // next;
- my PublicInbox::ProcessIO $pp = tied *$sock; # ProcessIO
- my $pid = $pp->{pid} // next;
+ my PublicInbox::ProcessIONBF $p = tied *$sock; # ProcessIONBF
+ my $pid = $p->{pid} // next;
open my $fh, '<', "/proc/$pid/maps" or return cleanup($self, 1);
while (<$fh>) {
# n.b. we do not restart for unlinked multi-pack-index
diff --git a/lib/PublicInbox/HTTPD/Async.pm b/lib/PublicInbox/HTTPD/Async.pm
index b9d2159c..b73d0c4b 100644
--- a/lib/PublicInbox/HTTPD/Async.pm
+++ b/lib/PublicInbox/HTTPD/Async.pm
@@ -18,6 +18,7 @@ use v5.12;
use parent qw(PublicInbox::DS);
use Errno qw(EAGAIN);
use PublicInbox::Syscall qw(EPOLLIN);
+use PublicInbox::ProcessIONBF;
# This is called via: $env->{'pi-httpd.async'}->()
# $io is a read-only pipe ($rpipe) for now, but may be a
@@ -37,8 +38,7 @@ sub new {
arg => $arg, # arg for $cb
end_obj => $end_obj, # like END{}, can ->event_step
}, $class;
- my $pp = tied *$io; # ProcessIO
- $pp->{fh}->blocking(0) // die "$io->blocking(0): $!";
+ PublicInbox::ProcessIONBF->replace($io);
$self->SUPER::new($io, EPOLLIN);
}
diff --git a/lib/PublicInbox/ProcessIONBF.pm b/lib/PublicInbox/ProcessIONBF.pm
new file mode 100644
index 00000000..490e200a
--- /dev/null
+++ b/lib/PublicInbox/ProcessIONBF.pm
@@ -0,0 +1,25 @@
+# Copyright (C) all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# used to support unbuffered partial reads
+package PublicInbox::ProcessIONBF;
+use v5.12;
+use parent qw(PublicInbox::ProcessIO);
+use IO::Handle; # ->blocking
+
+sub new {
+ my ($cls, $pid, $fh, @cb_arg) = @_;
+ $fh->blocking(0) // die "$fh->blocking(0): $!";
+ my $io = $cls->SUPER::maybe_new($pid, $fh, @cb_arg);
+}
+
+sub replace {
+ my ($cls, $orig) = @_;
+ my $pio = tied *$orig; # ProcessIO
+ $pio->{fh}->blocking(0) // die "$pio->{fh}->blocking(0): $!";
+ bless $pio, $cls;
+}
+
+sub READ { sysread($_[0]->{fh}, $_[1], $_[2], $_[3] // 0) }
+
+1;
^ permalink raw reply related [flat|nested] 3+ messages in thread
end of thread, other threads:[~2023-10-08 22:11 UTC | newest]
Thread overview: 3+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2023-10-08 22:11 [PATCH 0/2] process IO fixes Eric Wong
2023-10-08 22:11 ` [PATCH 1/2] process_io: fix binmode and use it in lei_xsearch Eric Wong
2023-10-08 22:11 ` [PATCH 2/2] introduce ProcessIONBF for multiplexed non-blocking IO Eric Wong
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).