Browse Source

BZ-58451 BZ-58833 Give StreamPumper a chance to finish cleanly before interrupting its thread, to prevent truncated output

master
Jaikiran Pai 7 years ago
parent
commit
6c860e0036
5 changed files with 322 additions and 35 deletions
  1. +4
    -0
      WHATSNEW
  2. +122
    -0
      src/etc/testcases/taskdefs/exec/exec-with-redirector.xml
  3. +13
    -3
      src/main/org/apache/tools/ant/taskdefs/PumpStreamHandler.java
  4. +92
    -32
      src/main/org/apache/tools/ant/taskdefs/StreamPumper.java
  5. +91
    -0
      src/tests/junit/org/apache/tools/ant/taskdefs/ExecStreamRedirectorTest.java

+ 4
- 0
WHATSNEW View File

@@ -23,6 +23,10 @@ Fixed bugs:
suggesting the fix.
Bugzilla Report 19516

* Fixed an issue where the content redirected from output/error
streams of a process, could end up being truncated.
Bugzilla Report 58833, 58451

Other changes:
--------------



+ 122
- 0
src/etc/testcases/taskdefs/exec/exec-with-redirector.xml View File

@@ -0,0 +1,122 @@
<?xml version="1.0"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project name="exec-redirector-test" basedir=".">

<target name="setUp">
<!-- This "output" property is set on the project in the Java test case (ExecStreamRedirectorTest) -->
<mkdir dir="${output}"/>
<condition property="dir.listing.command" value="ls" else="cmd.exe">
<os family="unix"/>
</condition>
<condition property="dir.to.ls" value="/usr/bin" else="${user.dir}">
<os family="unix"/>
</condition>
<condition property="dir.listing.command.arg" value="-l" else="dir">
<os family="unix"/>
</condition>
<property name="dir.to.ls" value="/usr/bin"/>
</target>

<target name="list-dir">
<!-- Just do listing of the same directory and redirect the output to different files -->
<parallel>
<exec executable="${dir.listing.command}" dir="${basedir}" failonerror="true" logerror="true">
<redirector output="${output}/ls1.txt" error="${output}/ls1.err" alwayslog="true"/>
<arg value="${dir.listing.command.arg}"/>
<arg value="${dir.to.ls}"/>
</exec>
<exec executable="${dir.listing.command}" dir="${basedir}" failonerror="true" logerror="true">
<redirector output="${output}/ls2.txt" error="${output}/ls2.err" alwayslog="true"/>
<arg value="${dir.listing.command.arg}"/>
<arg value="${dir.to.ls}"/>
</exec>
<exec executable="${dir.listing.command}" dir="${basedir}" failonerror="true" logerror="true">
<redirector output="${output}/ls3.txt" error="${output}/ls3.err" alwayslog="true"/>
<arg value="${dir.listing.command.arg}"/>

<arg value="${dir.to.ls}"/>
</exec>
<exec executable="${dir.listing.command}" dir="${basedir}" failonerror="true" logerror="true">
<redirector output="${output}/ls4.txt" error="${output}/ls4.err" alwayslog="true"/>
<arg value="${dir.listing.command.arg}"/>
<arg value="${dir.to.ls}"/>
</exec>
<exec executable="${dir.listing.command}" dir="${basedir}" failonerror="true" logerror="true">
<redirector output="${output}/ls5.txt" error="${output}/ls5.err" alwayslog="true"/>
<arg value="${dir.listing.command.arg}"/>
<arg value="${dir.to.ls}"/>
</exec>
<exec executable="${dir.listing.command}" dir="${basedir}" failonerror="true" logerror="true">
<redirector output="${output}/ls6.txt" error="${output}/ls6.err" alwayslog="true"/>
<arg value="${dir.listing.command.arg}"/>
<arg value="${dir.to.ls}"/>
</exec>
<exec executable="${dir.listing.command}" dir="${basedir}" failonerror="true" logerror="true">
<redirector output="${output}/ls7.txt" error="${output}/ls7.err" alwayslog="true"/>
<arg value="${dir.listing.command.arg}"/>
<arg value="${dir.to.ls}"/>
</exec>
<exec executable="${dir.listing.command}" dir="${basedir}" failonerror="true" logerror="true">
<redirector output="${output}/ls8.txt" error="${output}/ls8.err" alwayslog="true"/>
<arg value="${dir.listing.command.arg}"/>
<arg value="${dir.to.ls}"/>
</exec>
<exec executable="${dir.listing.command}" dir="${basedir}" failonerror="true" logerror="true">
<redirector output="${output}/ls9.txt" error="${output}/ls9.err" alwayslog="true"/>
<arg value="${dir.listing.command.arg}"/>
<arg value="${dir.to.ls}"/>
</exec>
<exec executable="${dir.listing.command}" dir="${basedir}" failonerror="true" logerror="true">
<redirector output="${output}/ls10.txt" error="${output}/ls10.err" alwayslog="true"/>
<arg value="${dir.listing.command.arg}"/>
<arg value="${dir.to.ls}"/>
</exec>
<exec executable="${dir.listing.command}" dir="${basedir}" failonerror="true" logerror="true">
<redirector output="${output}/ls11.txt" error="${output}/ls11.err" alwayslog="true"/>
<arg value="${dir.listing.command.arg}"/>
<arg value="${dir.to.ls}"/>
</exec>
<exec executable="${dir.listing.command}" dir="${basedir}" failonerror="true" logerror="true">
<redirector output="${output}/ls12.txt" error="${output}/ls12.err" alwayslog="true"/>
<arg value="${dir.listing.command.arg}"/>
<arg value="${dir.to.ls}"/>
</exec>
<exec executable="${dir.listing.command}" dir="${basedir}" failonerror="true" logerror="true">
<redirector output="${output}/ls13.txt" error="${output}/ls13.err" alwayslog="true"/>
<arg value="${dir.listing.command.arg}"/>
<arg value="${dir.to.ls}"/>
</exec>
<exec executable="${dir.listing.command}" dir="${basedir}" failonerror="true" logerror="true">
<redirector output="${output}/ls14.txt" error="${output}/ls14.err" alwayslog="true"/>
<arg value="${dir.listing.command.arg}"/>
<arg value="${dir.to.ls}"/>
</exec>
<exec executable="${dir.listing.command}" dir="${basedir}" failonerror="true" logerror="true">
<redirector output="${output}/ls15.txt" error="${output}/ls15.err" alwayslog="true"/>
<arg value="${dir.listing.command.arg}"/>
<arg value="${dir.to.ls}"/>
</exec>
<exec executable="${dir.listing.command}" dir="${basedir}" failonerror="true" logerror="true">
<redirector output="${output}/ls16.txt" error="${output}/ls16.err" alwayslog="true"/>
<arg value="${dir.listing.command.arg}"/>
<arg value="${dir.to.ls}"/>
</exec>
</parallel>
</target>

</project>

+ 13
- 3
src/main/org/apache/tools/ant/taskdefs/PumpStreamHandler.java View File

@@ -21,6 +21,7 @@ package org.apache.tools.ant.taskdefs;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.concurrent.TimeUnit;

import org.apache.tools.ant.util.FileUtils;

@@ -183,12 +184,21 @@ public class PumpStreamHandler implements ExecuteStreamHandler {
if (!t.isAlive()) {
return;
}
StreamPumper.PostStopHandle postStopHandle = null;
if (s != null && !s.isFinished()) {
s.stop();
postStopHandle = s.stop();
}
if (postStopHandle != null && postStopHandle.isInPostStopTasks()) {
// the stream pumper is in post stop tasks (like flushing output), which
// indicates that the stream pumper has respected the stop request and
// is cleaning up before finishing. Give it some time to finish this
// post stop activity, before trying to force interrupt the underlying thread
// of the stream pumper
postStopHandle.awaitPostStopCompletion(2, TimeUnit.SECONDS);
}
t.join(JOIN_TIMEOUT);
while ((s == null || !s.isFinished()) && t.isAlive()) {
// we waited for the thread/stream pumper to finish, but it hasn't yet.
// so we interrupt it
t.interrupt();
t.join(JOIN_TIMEOUT);
}


+ 92
- 32
src/main/org/apache/tools/ant/taskdefs/StreamPumper.java View File

@@ -17,11 +17,13 @@
*/
package org.apache.tools.ant.taskdefs;

import org.apache.tools.ant.util.FileUtils;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.tools.ant.util.FileUtils;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
* Copies all data from an input stream to an output stream.
@@ -34,7 +36,7 @@ public class StreamPumper implements Runnable {

private final InputStream is;
private final OutputStream os;
private volatile boolean finish;
private volatile boolean askedToStop;
private volatile boolean finished;
private final boolean closeWhenExhausted;
private boolean autoflush = false;
@@ -42,6 +44,7 @@ public class StreamPumper implements Runnable {
private int bufferSize = SMALL_BUFFER_SIZE;
private boolean started = false;
private final boolean useAvailable;
private PostStopHandle postStopHandle;

/**
* Create a new StreamPumper.
@@ -58,7 +61,6 @@ public class StreamPumper implements Runnable {

/**
* Create a new StreamPumper.
*
* <p><b>Note:</b> If you set useAvailable to true, you must
* explicitly invoke {@link #stop stop} or interrupt the
* corresponding Thread when you are done or the run method will
@@ -120,41 +122,29 @@ public class StreamPumper implements Runnable {

final byte[] buf = new byte[bufferSize];

int length;
try {
while (true) {
int length;
while (!this.askedToStop && !Thread.interrupted()) {
waitForInput(is);

if (finish || Thread.interrupted()) {
if (askedToStop || Thread.interrupted()) {
break;
}

length = is.read(buf);
if (length <= 0 || Thread.interrupted()) {
if (length < 0) {
// EOF
break;
}
os.write(buf, 0, length);
if (autoflush) {
os.flush();
}
if (finish) { //NOSONAR
break;
}
}
// On completion, drain any available data (which might be the first data available for quick executions)
if (finish) {
while ((length = is.available()) > 0) {
if (Thread.interrupted()) {
break;
}
length = is.read(buf, 0, Math.min(length, buf.length));
if (length <= 0) {
break;
}
if (length > 0) {
// we did read something, so write it out
os.write(buf, 0, length);
if (autoflush) {
os.flush();
}
}
}
os.flush();
this.doPostStop();
} catch (InterruptedException ie) {
// likely PumpStreamHandler trying to stop us
} catch (Exception e) {
@@ -166,7 +156,7 @@ public class StreamPumper implements Runnable {
FileUtils.close(os);
}
finished = true;
finish = false;
askedToStop = false;
synchronized (this) {
notifyAll();
}
@@ -206,6 +196,7 @@ public class StreamPumper implements Runnable {

/**
* Get the size in bytes of the read buffer.
*
* @return the int size of the read buffer.
*/
public synchronized int getBufferSize() {
@@ -225,19 +216,26 @@ public class StreamPumper implements Runnable {
* Note that it may continue to block on the input stream
* but it will really stop the thread as soon as it gets EOF
* or any byte, and it will be marked as finished.
* @return Returns a {@link PostStopHandle} for the callers to
* know if the status of post-stop activities, that happen, before this
* {@link StreamPumper} is actually finished
* @since Ant 1.6.3
* @since Ant 10.2.0 this method returns a {@link PostStopHandle}
*/
/*package*/ synchronized void stop() {
finish = true;
/*package*/
synchronized PostStopHandle stop() {
askedToStop = true;
postStopHandle = new PostStopHandle();
notifyAll();
return postStopHandle;
}

private static final long POLL_INTERVAL = 100;

private void waitForInput(InputStream is)
throws IOException, InterruptedException {
throws IOException, InterruptedException {
if (useAvailable) {
while (!finish && is.available() == 0) {
while (!askedToStop && is.available() == 0) {
if (Thread.interrupted()) {
throw new InterruptedException();
}
@@ -249,4 +247,66 @@ public class StreamPumper implements Runnable {
}
}

private void doPostStop() throws IOException {
try {
final byte[] buf = new byte[bufferSize];
int length;
// We were asked to stop, the contract allows us to do any non-blocking
// final bits of reads, before actually finishing. So we try and drain any (non-blocking) available
// data. We *don't* check the thread interrupt status, anymore, once we start draining this non-blocking
// available data, to allow us to cleanly write out any available data.
if (askedToStop) {
int bytesReadableWithoutBlocking;
while ((bytesReadableWithoutBlocking = is.available()) > 0) {
length = is.read(buf, 0, Math.min(bytesReadableWithoutBlocking, buf.length));
if (length <= 0) {
break;
}
os.write(buf, 0, length);
}
}
// this can potentially be blocking, but that's OK since our post stop activity is allowed to
// cleanup/flush any data and the PostStopHandle let's the caller control over how long they want
// this to go, before actually interrupting the thread
os.flush();
} finally {
if (this.postStopHandle != null) {
this.postStopHandle.latch.countDown();
this.postStopHandle.inPostStopTasks = false;
}
}
}

/**
* A handle that can be used after {@link #stop()} has been invoked to check if the
* {@link StreamPumper} is in the process of do some post-stop tasks (like flushing
* of streams), before finishing.
*/
final class PostStopHandle {
private boolean inPostStopTasks = true;
private final CountDownLatch latch = new CountDownLatch(1);

/**
* Returns true if the {@link StreamPumper} is doing post-stop tasks (like flushing of streams).
* Else returns false.
* @return
*/
boolean isInPostStopTasks() {
return inPostStopTasks;
}

/**
* Waits for a maximum of {@code timeout} time for the post-stop activities to complete.
*
* @param timeout The maximum amount of time to wait for the post-stop activities to complete
* @param timeUnit The unit of {@code timeout}
* @return Returns true if the post-stop activities completed within the specified {@code timeout}.
* Else returns false
* @throws InterruptedException If the current thread was interrupted while waiting
*/
boolean awaitPostStopCompletion(final long timeout, final TimeUnit timeUnit) throws InterruptedException {
return this.latch.await(timeout, timeUnit);
}
}

}

+ 91
- 0
src/tests/junit/org/apache/tools/ant/taskdefs/ExecStreamRedirectorTest.java View File

@@ -0,0 +1,91 @@
package org.apache.tools.ant.taskdefs;

import org.apache.tools.ant.Project;
import org.apache.tools.ant.ProjectHelper;
import org.junit.Before;
import org.junit.Test;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Arrays;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;

/**
* Tests the {@code exec} task which uses a {@code redirector} to redirect its output and error streams
*/
public class ExecStreamRedirectorTest {

private Project project;

@Before
public void setUp() throws Exception {
project = new Project();
project.init();
final File antFile = new File(System.getProperty("root"), "src/etc/testcases/taskdefs/exec/exec-with-redirector.xml");
project.setUserProperty("ant.file", antFile.getAbsolutePath());
final File outputDir = this.createTmpDir();
project.setUserProperty("output", outputDir.toString());
ProjectHelper.configureProject(project, antFile);
project.executeTarget("setUp");
}

/**
* Tests that the redirected streams of the exec'ed process aren't truncated.
*
* @throws Exception
* @see <a href="https://bz.apache.org/bugzilla/show_bug.cgi?id=58451">bz-58451</a> and
* <a href="https://bz.apache.org/bugzilla/show_bug.cgi?id=58833">bz-58833</a> for more details
*/
@Test
public void testRedirection() throws Exception {
final String dirToList = project.getProperty("dir.to.ls");
assertNotNull("Directory to list isn't available", dirToList);
assertTrue(dirToList + " is not a directory", new File(dirToList).isDirectory());

project.executeTarget("list-dir");

// verify the redirected output
final String outputDirPath = project.getProperty("output");
byte[] dirListingOutput = null;
for (int i = 1; i <= 16; i++) {
final File redirectedOutputFile = new File(outputDirPath, "ls" + i + ".txt");
assertTrue(redirectedOutputFile + " is missing or not a regular file", redirectedOutputFile.isFile());
final byte[] redirectedOutput = readAllBytes(redirectedOutputFile);
assertNotNull("No content was redirected to " + redirectedOutputFile, redirectedOutput);
assertFalse("Content in redirected file " + redirectedOutputFile + " was empty", redirectedOutput.length == 0);
if (dirListingOutput != null) {
// compare the directory listing that was redirected to these files. all files should have the same content
assertTrue("Redirected output in file " + redirectedOutputFile +
" doesn't match content in other redirected output file(s)", Arrays.equals(dirListingOutput, redirectedOutput));
}
dirListingOutput = redirectedOutput;
}
}

private File createTmpDir() {
final File tmpDir = new File(System.getProperty("java.io.tmpdir"), String.valueOf("temp-" + System.nanoTime()));
tmpDir.mkdir();
tmpDir.deleteOnExit();
return tmpDir;
}

private static byte[] readAllBytes(final File file) throws IOException {
final FileInputStream fis = new FileInputStream(file);
final ByteArrayOutputStream bos = new ByteArrayOutputStream();
try {
final byte[] dataChunk = new byte[1024];
int numRead = -1;
while ((numRead = fis.read(dataChunk)) > 0) {
bos.write(dataChunk, 0, numRead);
}
} finally {
fis.close();
}
return bos.toByteArray();
}
}

Loading…
Cancel
Save