From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.6 (2021-04-09) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: X-Spam-Status: No, score=-4.2 required=3.0 tests=ALL_TRUSTED,BAYES_00, DKIM_SIGNED,DKIM_VALID,DKIM_VALID_AU,DKIM_VALID_EF shortcircuit=no autolearn=ham autolearn_force=no version=3.4.6 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 539981F4A4 for ; Wed, 11 Oct 2023 07:20:58 +0000 (UTC) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=80x24.org; s=selector1; t=1697008858; bh=bBJmgjROWxgoCH/9G8c2bl2FKW0AnXiRD5M19lOwVKA=; h=From:To:Subject:Date:In-Reply-To:References:From; b=2PliZyjEUVU6kJZNlSYdFcvj5ASCDq2bF2nXYuqnEd+/UaYw5Yo82rBE+td6S6roB MhSRu7cUKNZmJ3+MVM1i6KdGLC19eg1nI6PAr5xYwSSteXFa0uqftN+DaLYimtGSnp pz9z37qRyfXXnpK5Q43oO7n3vL+kTmBcxY4fKOI0= From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 6/9] import: switch to Unix stream socket for fast-import Date: Wed, 11 Oct 2023 07:20:54 +0000 Message-ID: <20231011072057.758022-7-e@80x24.org> In-Reply-To: <20231011072057.758022-1-e@80x24.org> References: <20231011072057.758022-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: We use fewer file descriptors and fewer lines of code this way. I'm not aware of any place we rely on POSIX pipe semantics with `git fast-import', and sockets have bigger buffers by default in most cases (even if Linux allows larger pipe buffers). --- lib/PublicInbox/Import.pm | 132 +++++++++++++++++------------------- script/public-inbox-convert | 18 ++--- 2 files changed, 71 insertions(+), 79 deletions(-) diff --git a/lib/PublicInbox/Import.pm b/lib/PublicInbox/Import.pm index cd03da05..894ba818 100644 --- a/lib/PublicInbox/Import.pm +++ b/lib/PublicInbox/Import.pm @@ -8,7 +8,7 @@ package PublicInbox::Import; use v5.12; use parent qw(PublicInbox::Lock); -use PublicInbox::Spawn qw(run_die popen_rd); +use PublicInbox::Spawn qw(run_die popen_rd spawn); use PublicInbox::MID qw(mids mid2path); use PublicInbox::Address; use PublicInbox::Smsg; @@ -16,9 +16,11 @@ use PublicInbox::MsgTime qw(msg_datestamp); use PublicInbox::ContentHash qw(content_digest); use PublicInbox::MDA; use PublicInbox::Eml; +use PublicInbox::ProcessIO; use POSIX qw(strftime); -use autodie qw(read close); +use autodie qw(read close socketpair); use Carp qw(croak); +use Socket qw(AF_UNIX SOCK_STREAM); sub default_branch () { state $default_branch = do { @@ -56,11 +58,10 @@ sub new { # idempotent start function sub gfi_start { my ($self) = @_; - - return ($self->{in}, $self->{out}) if $self->{in}; - - my ($in_r, $out_r, $out_w); - pipe($out_r, $out_w) or die "pipe failed: $!"; + my $io = $self->{io}; + return $io if $io; + socketpair($io, my $s2, AF_UNIX, SOCK_STREAM, 0); + $io->autoflush(1); $self->lock_acquire; eval { @@ -73,18 +74,17 @@ sub gfi_start { die "fatal: ls-tree -r -z --name-only $ref: \$?=$?" if $?; $self->{-tree} = { map { $_ => 1 } split(/\0/, $t) }; } - $in_r = $self->{in} = $git->popen(qw(fast-import - --quiet --done --date-format=raw), - undef, { 0 => $out_r }); - $out_w->autoflush(1); - $self->{out} = $out_w; + my $gfi = [ 'git', "--git-dir=$git->{git_dir}", qw(fast-import + --quiet --done --date-format=raw) ]; + my $pid = spawn($gfi, undef, { 0 => $s2, 1 => $s2 }); $self->{nchg} = 0; + $self->{io} = PublicInbox::ProcessIO->maybe_new($pid, $io); }; if ($@) { $self->lock_release; die $@; } - ($in_r, $out_w); + $self->{io}; } sub wfail () { die "write to fast-import failed: $!" } @@ -99,22 +99,22 @@ sub norm_body ($) { } # only used for v1 (ssoma) inboxes -sub _check_path ($$$$) { - my ($r, $w, $tip, $path) = @_; +sub _check_path ($$$) { + my ($io, $tip, $path) = @_; return if $tip eq ''; - print $w "ls $tip $path\n" or wfail; + print $io "ls $tip $path\n" or wfail; local $/ = "\n"; - my $info = <$r> // die "EOF from fast-import: $!"; + my $info = <$io> // die "EOF from fast-import: $!"; $info =~ /\Amissing / ? undef : $info; } -sub _cat_blob ($$$) { - my ($r, $w, $oid) = @_; - print $w "cat-blob $oid\n" or wfail; +sub _cat_blob ($$) { + my ($io, $oid) = @_; + print $io "cat-blob $oid\n" or wfail; local $/ = "\n"; - my $info = <$r> // die "EOF from fast-import / cat-blob: $!"; + my $info = <$io> // die "EOF from fast-import / cat-blob: $!"; $info =~ /\A[a-f0-9]{40,} blob ([0-9]+)\n\z/ or return; - my $n = read($r, my $buf, my $len = $1 + 1); + my $n = read($io, my $buf, my $len = $1 + 1); $n == $len or croak "cat-blob: short read: $n < $len"; my $lf = chop $buf; croak "bad read on final byte: <$lf>" if $lf ne "\n"; @@ -123,17 +123,16 @@ sub _cat_blob ($$$) { sub cat_blob { my ($self, $oid) = @_; - my ($r, $w) = $self->gfi_start; - _cat_blob($r, $w, $oid); + _cat_blob(gfi_start($self), $oid); } sub check_remove_v1 { - my ($r, $w, $tip, $path, $mime) = @_; + my ($io, $tip, $path, $mime) = @_; - my $info = _check_path($r, $w, $tip, $path) or return ('MISSING',undef); + my $info = _check_path($io, $tip, $path) or return ('MISSING',undef); $info =~ m!\A100644 blob ([a-f0-9]{40,})\t!s or die "not blob: $info"; my $oid = $1; - my $bref = _cat_blob($r, $w, $oid) or die "BUG: cat-blob $1 failed"; + my $bref = _cat_blob($io, $oid) or die "BUG: cat-blob $1 failed"; PublicInbox::Eml::strip_from($$bref); my $cur = PublicInbox::Eml->new($bref); my $cur_s = $cur->header('Subject') // ''; @@ -146,16 +145,15 @@ sub check_remove_v1 { sub checkpoint { my ($self) = @_; - return unless $self->{in}; - print { $self->{out} } "checkpoint\n" or wfail; + print { $self->{io} // return } "checkpoint\n" or wfail; undef; } sub progress { my ($self, $msg) = @_; - return unless $self->{in}; - print { $self->{out} } "progress $msg\n" or wfail; - readline($self->{in}) eq "progress $msg\n" or die + my $io = $self->{io} or return; + print $io "progress $msg\n" or wfail; + readline($io) eq "progress $msg\n" or die "progress $msg not received\n"; undef; } @@ -205,10 +203,9 @@ sub barrier { # used for v2 sub get_mark { my ($self, $mark) = @_; - die "not active\n" unless $self->{in}; - my ($r, $w) = $self->gfi_start; - print $w "get-mark $mark\n" or wfail; - my $oid = <$r> // die "get-mark failed, need git 2.6.0+\n"; + my $io = $self->{io} or croak "not active\n"; + print $io "get-mark $mark\n" or wfail; + my $oid = <$io> // die "get-mark failed, need git 2.6.0+\n"; chomp($oid); $oid; } @@ -225,11 +222,11 @@ sub remove { my $path_type = $self->{path_type}; my ($path, $err, $cur, $blob); - my ($r, $w) = $self->gfi_start; + my $io = gfi_start($self); my $tip = $self->{tip}; if ($path_type eq '2/38') { $path = mid2path(v1_mid0($mime)); - ($err, $cur) = check_remove_v1($r, $w, $tip, $path, $mime); + ($err, $cur) = check_remove_v1($io, $tip, $path, $mime); return ($err, $cur) if $err; } else { my $sref; @@ -241,7 +238,7 @@ sub remove { } my $len = length($$sref); $blob = $self->{mark}++; - print $w "blob\nmark :$blob\ndata $len\n", + print $io "blob\nmark :$blob\ndata $len\n", $$sref, "\n" or wfail; } @@ -249,22 +246,22 @@ sub remove { my $commit = $self->{mark}++; my $parent = $tip =~ /\A:/ ? $tip : undef; unless ($parent) { - print $w "reset $ref\n" or wfail; + print $io "reset $ref\n" or wfail; } my $ident = $self->{ident}; my $now = now_raw(); $msg //= 'rm'; my $len = length($msg) + 1; - print $w "commit $ref\nmark :$commit\n", + print $io "commit $ref\nmark :$commit\n", "author $ident $now\n", "committer $ident $now\n", "data $len\n$msg\n\n", 'from ', ($parent ? $parent : $tip), "\n" or wfail; if (defined $path) { - print $w "D $path\n\n" or wfail; + print $io "D $path\n\n" or wfail; } else { - clean_tree_v2($self, $w, 'd'); - print $w "M 100644 :$blob d\n\n" or wfail; + clean_tree_v2($self, $io, 'd'); + print $io "M 100644 :$blob d\n\n" or wfail; } $self->{nchg}++; (($self->{tip} = ":$commit"), $cur); @@ -354,11 +351,11 @@ sub v1_mid0 ($) { $mids->[0]; } sub clean_tree_v2 ($$$) { - my ($self, $w, $keep) = @_; + my ($self, $io, $keep) = @_; my $tree = $self->{-tree} or return; #v2 only delete $tree->{$keep}; foreach (keys %$tree) { - print $w "D $_\n" or wfail; + print $io "D $_\n" or wfail; } %$tree = ($keep => 1); } @@ -377,10 +374,10 @@ sub add { $path = 'm'; } - my ($r, $w) = $self->gfi_start; + my $io = gfi_start($self); my $tip = $self->{tip}; if ($path_type eq '2/38') { - _check_path($r, $w, $tip, $path) and return; + _check_path($io, $tip, $path) and return; } drop_unwanted_headers($mime); @@ -394,8 +391,7 @@ sub add { my $raw_email = $mime->{-public_inbox_raw} // $mime->as_string; my $n = length($raw_email); $self->{bytes_added} += $n; - print $w "blob\nmark :$blob\ndata ", $n, "\n" or wfail; - print $w $raw_email, "\n" or wfail; + print $io "blob\nmark :$blob\ndata $n\n", $raw_email, "\n" or wfail; # v2: we need this for Xapian if ($smsg) { @@ -422,19 +418,19 @@ sub add { my $parent = $tip =~ /\A:/ ? $tip : undef; unless ($parent) { - print $w "reset $ref\n" or wfail; + print $io "reset $ref\n" or wfail; } - print $w "commit $ref\nmark :$commit\n", + print $io "commit $ref\nmark :$commit\n", "author $author $at\n", - "committer $self->{ident} $ct\n" or wfail; - print $w "data ", (length($subject) + 1), "\n", + "committer $self->{ident} $ct\n", + "data ", (length($subject) + 1), "\n", $subject, "\n\n" or wfail; if ($tip ne '') { - print $w 'from ', ($parent ? $parent : $tip), "\n" or wfail; + print $io 'from ', ($parent ? $parent : $tip), "\n" or wfail; } - clean_tree_v2($self, $w, $path); - print $w "M 100644 :$blob $path\n\n" or wfail; + clean_tree_v2($self, $io, $path); + print $io "M 100644 :$blob $path\n\n" or wfail; $self->{nchg}++; $self->{tip} = ":$commit"; } @@ -475,15 +471,14 @@ EOM } # true if locked and active -sub active { !!$_[0]->{out} } +sub active { !!$_[0]->{io} } sub done { my ($self) = @_; - my $w = delete $self->{out} or return; + my $io = delete $self->{io} or return; eval { - my $r = delete $self->{in} or die 'BUG: missing {in} when done'; - print $w "done\n" or wfail; - close $r; + print $io "done\n" or wfail; + close $io; # reaps and dies on error }; my $wait_err = $@; my $nchg = delete $self->{nchg}; @@ -496,10 +491,7 @@ sub done { die $wait_err if $wait_err; } -sub atfork_child { - my ($self) = @_; - close($_) for (grep defined, delete(@$self{qw(in out)})); -} +sub atfork_child { close(delete($_[0]->{io}) // return) } sub digest2mid ($$;$) { my ($dig, $hdr, $fallback_time) = @_; @@ -552,7 +544,7 @@ sub replace_oids { my $git = $self->{git}; my @export = (qw(fast-export --no-data --use-done-feature), $old); my $rd = $git->popen(@export); - my ($r, $w) = $self->gfi_start; + my $io = gfi_start($self); my @buf; my $nreplace = 0; my @oids; @@ -563,7 +555,7 @@ sub replace_oids { push @buf, "reset $tmp\n"; } elsif (/^commit (?:.+)/) { if (@buf) { - print $w @buf or wfail; + print $io @buf or wfail; @buf = (); } push @buf, "commit $tmp\n"; @@ -599,7 +591,7 @@ sub replace_oids { rewrite_commit($self, \@oids, \@buf, $mime); $nreplace++; } - print $w @buf, "\n" or wfail; + print $io @buf, "\n" or wfail; @buf = (); } elsif ($_ eq "done\n") { $done = 1; @@ -612,7 +604,7 @@ sub replace_oids { } close $rd; if (@buf) { - print $w @buf or wfail; + print $io @buf or wfail; } die 'done\n not seen from fast-export' unless $done; chomp(my $cmt = $self->get_mark(":$mark")) if $nreplace; diff --git a/script/public-inbox-convert b/script/public-inbox-convert index 780f7194..0cc52777 100755 --- a/script/public-inbox-convert +++ b/script/public-inbox-convert @@ -120,7 +120,7 @@ my $head = $old->{ref_head} || 'HEAD'; my $rd = $old->git->popen(qw(fast-export --use-done-feature), $head); $v2w->idx_init($opt); my $im = $v2w->importer; -my ($r, $w) = $im->gfi_start; +my $io = $im->gfi_start; my $h = '[0-9a-f]'; my %D; my $last; @@ -131,12 +131,12 @@ while (<$rd>) { $state = 'commit'; } elsif (/^data ([0-9]+)/) { my $len = $1; - print $w $_ or $im->wfail; + print $io $_ or $im->wfail; while ($len) { my $n = read($rd, my $tmp, $len) or die "read: $!"; warn "$n != $len\n" if $n != $len; $len -= $n; - print $w $tmp or $im->wfail; + print $io $tmp or $im->wfail; } next; } elsif ($state eq 'commit') { @@ -144,9 +144,9 @@ while (<$rd>) { my ($mark, $path) = ($1, $2); $D{$path} = $mark; if ($last && $last ne 'm') { - print $w "D $last\n" or $im->wfail; + print $io "D $last\n" or $im->wfail; } - print $w "M 100644 :$mark m\n" or $im->wfail; + print $io "M 100644 :$mark m\n" or $im->wfail; $last = 'm'; next; } @@ -154,18 +154,18 @@ while (<$rd>) { my $mark = delete $D{$1}; defined $mark or die "undeleted path: $1\n"; if ($last && $last ne 'd') { - print $w "D $last\n" or $im->wfail; + print $io "D $last\n" or $im->wfail; } - print $w "M 100644 :$mark d\n" or $im->wfail; + print $io "M 100644 :$mark d\n" or $im->wfail; $last = 'd'; next; } } last if $_ eq "done\n"; - print $w $_ or $im->wfail; + print $io $_ or $im->wfail; } close $rd or die "fast-export: \$?=$? \$!=$!\n"; -$r = $w = undef; # v2w->done does the actual close and error checking +$io = undef; $v2w->done; if (my $old_mm = $old->mm) { $old->cleanup;