From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: X-Spam-Status: No, score=-4.0 required=3.0 tests=ALL_TRUSTED,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 807EE1F4B9 for ; Mon, 24 Jun 2019 02:54:16 +0000 (UTC) From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 13/57] ds: switch write buffering to use a tempfile Date: Mon, 24 Jun 2019 02:52:14 +0000 Message-Id: <20190624025258.25592-14-e@80x24.org> In-Reply-To: <20190624025258.25592-1-e@80x24.org> References: <20190624025258.25592-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: Data which can't fit into a generously-sized socket buffer, has no business being stored in heap. --- lib/PublicInbox/DS.pm | 110 ++++++++++++++++++++++++++++++---------- lib/PublicInbox/HTTP.pm | 20 ++------ 2 files changed, 85 insertions(+), 45 deletions(-) diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index 3e8b0b1a..eb468f57 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -18,22 +18,23 @@ use strict; use bytes; use POSIX (); use IO::Handle qw(); -use Fcntl qw(FD_CLOEXEC F_SETFD F_GETFD); +use Fcntl qw(FD_CLOEXEC F_SETFD F_GETFD SEEK_SET); use Time::HiRes qw(clock_gettime CLOCK_MONOTONIC); use parent qw(Exporter); -our @EXPORT_OK = qw(now msg_more); +our @EXPORT_OK = qw(now msg_more write_in_full); use warnings; use PublicInbox::Syscall qw(:epoll); use fields ('sock', # underlying socket - 'wbuf', # arrayref of scalars, scalarrefs, or coderefs to write + 'wbuf', # arrayref of coderefs or GLOB refs 'wbuf_off', # offset into first element of wbuf to start writing at 'event_watch', # bitmask of events the client is interested in (POLLIN,OUT,etc.) ); use Errno qw(EAGAIN EINVAL); use Carp qw(croak confess); +use File::Temp qw(tempfile); use constant POLLIN => 1; use constant POLLOUT => 4; @@ -478,32 +479,51 @@ sub close { return 0; } +# portable, non-thread-safe sendfile emulation (no pread, yet) +sub psendfile ($$$) { + my ($sock, $fh, $off) = @_; + + sysseek($fh, $$off, SEEK_SET) or return; + defined(my $to_write = sysread($fh, my $buf, 16384)) or return; + my $written = 0; + while ($to_write > 0) { + if (defined(my $w = syswrite($sock, $buf, $to_write, $written))) { + $written += $w; + $to_write -= $w; + } else { + return if $written == 0; + last; + } + } + $$off += $written; + $written; +} + # returns 1 if done, 0 if incomplete sub flush_write ($) { my ($self) = @_; my $wbuf = $self->{wbuf} or return 1; my $sock = $self->{sock} or return 1; +next_buf: while (my $bref = $wbuf->[0]) { - my $ref = ref($bref); - if ($ref eq 'SCALAR') { - my $len = bytes::length($$bref); - my $off = $self->{wbuf_off} || 0; - my $to_write = $len - $off; - my $written = syswrite($sock, $$bref, $to_write, $off); - if (defined $written) { - if ($written == $to_write) { - shift @$wbuf; + if (ref($bref) ne 'CODE') { + my $off = delete($self->{wbuf_off}) // 0; + while (1) { + my $w = psendfile($sock, $bref, \$off); + if (defined $w) { + if ($w == 0) { + shift @$wbuf; + goto next_buf; + } + } elsif ($! == EAGAIN) { + $self->{wbuf_off} = $off; + watch_write($self, 1); + return 0; } else { - $self->{wbuf_off} = $off + $written; + return $self->close; } - next; # keep going until EAGAIN - } elsif ($! == EAGAIN) { - $self->watch_write(1); - } else { - $self->close; } - return 0; } else { #($ref eq 'CODE') { shift @$wbuf; $bref->(); @@ -515,6 +535,31 @@ sub flush_write ($) { 1; # all done } +sub write_in_full ($$$$) { + my ($fh, $bref, $len, $off) = @_; + my $rv = 0; + while ($len > 0) { + my $w = syswrite($fh, $$bref, $len, $off); + return ($rv ? $rv : $w) unless $w; # undef or 0 + $rv += $w; + $len -= $w; + $off += $w; + } + $rv +} + +sub tmpbuf ($$) { + my ($bref, $off) = @_; + # open(my $fh, '+>>', undef) doesn't set O_APPEND + my ($fh, $path) = tempfile('wbuf-XXXXXXX', TMPDIR => 1); + open $fh, '+>>', $path or die "open: $!"; + unlink $path; + my $to_write = bytes::length($$bref) - $off; + my $w = write_in_full($fh, $bref, $to_write, $off); + die "write_in_full ($to_write): $!" unless defined $w; + $w == $to_write ? $fh : die("short write $w < $to_write"); +} + =head2 C<< $obj->write( $data ) >> Write the specified data to the underlying handle. I may be scalar, @@ -537,7 +582,16 @@ sub write { my $ref = ref $data; my $bref = $ref ? $data : \$data; if (my $wbuf = $self->{wbuf}) { # already buffering, can't write more... - push @$wbuf, $bref; + if ($ref eq 'CODE') { + push @$wbuf, $bref; + } else { + my $last = $wbuf->[-1]; + if (ref($last) eq 'GLOB') { # append to tmp file buffer + write_in_full($last, $bref, bytes::length($$bref), 0); + } else { + push @$wbuf, tmpbuf($bref, 0); + } + } return 0; } elsif ($ref eq 'CODE') { $bref->(); @@ -548,15 +602,13 @@ sub write { if (defined $written) { return 1 if $written == $to_write; - $self->{wbuf_off} = $written; - $self->{wbuf} = [ $bref ]; - return flush_write($self); # try until EAGAIN } elsif ($! == EAGAIN) { - $self->{wbuf} = [ $bref ]; - $self->watch_write(1); + $written = 0; } else { - $self->close; + return $self->close; } + $self->{wbuf} = [ tmpbuf($bref, $written) ]; + watch_write($self, 1); return 0; } } @@ -573,8 +625,10 @@ sub msg_more ($$) { my $nlen = bytes::length($_[1]) - $n; return 1 if $nlen == 0; # all done! - # PublicInbox::DS::write queues the unwritten substring: - return $self->write(substr($_[1], $n, $nlen)); + # queue up the unwritten substring: + $self->{wbuf} = [ tmpbuf(\($_[1]), $n) ]; + watch_write($self, 1); + return 0; } } $self->write(\($_[1])); diff --git a/lib/PublicInbox/HTTP.pm b/lib/PublicInbox/HTTP.pm index a669eb6e..fcb5eb6c 100644 --- a/lib/PublicInbox/HTTP.pm +++ b/lib/PublicInbox/HTTP.pm @@ -19,7 +19,7 @@ use HTTP::Status qw(status_message); use HTTP::Date qw(time2str); use IO::Handle; require PublicInbox::EvCleanup; -PublicInbox::DS->import('msg_more'); +PublicInbox::DS->import(qw(msg_more write_in_full)); use constant { CHUNK_START => -1, # [a-f0-9]+\r\n CHUNK_END => -2, # \r\n @@ -125,7 +125,7 @@ sub read_input ($) { while ($len > 0) { if ($$rbuf ne '') { - my $w = write_in_full($input, $rbuf, $len); + my $w = write_in_full($input, $rbuf, $len, 0); return write_err($self, $len) unless $w; $len -= $w; die "BUG: $len < 0 (w=$w)" if $len < 0; @@ -367,20 +367,6 @@ sub recv_err { quit($self, 500); } -sub write_in_full { - my ($fh, $rbuf, $len) = @_; - my $rv = 0; - my $off = 0; - while ($len > 0) { - my $w = syswrite($fh, $$rbuf, $len, $off); - return ($rv ? $rv : $w) unless $w; # undef or 0 - $rv += $w; - $off += $w; - $len -= $w; - } - $rv -} - sub read_input_chunked { # unlikely... my ($self) = @_; my $input = $self->{env}->{'psgi.input'}; @@ -425,7 +411,7 @@ sub read_input_chunked { # unlikely... # drain the current chunk until ($len <= 0) { if ($$rbuf ne '') { - my $w = write_in_full($input, $rbuf, $len); + my $w = write_in_full($input, $rbuf, $len, 0); return write_err($self, "$len chunk") if !$w; $len -= $w; if ($len == 0) { -- EW