From 6c860e0036a8742f4cc35e3e6a8e595e42804c89 Mon Sep 17 00:00:00 2001 From: Jaikiran Pai Date: Sat, 16 Dec 2017 15:18:30 +0530 Subject: [PATCH] BZ-58451 BZ-58833 Give StreamPumper a chance to finish cleanly before interrupting its thread, to prevent truncated output --- WHATSNEW | 4 + .../taskdefs/exec/exec-with-redirector.xml | 122 +++++++++++++++++ .../tools/ant/taskdefs/PumpStreamHandler.java | 16 ++- .../tools/ant/taskdefs/StreamPumper.java | 124 +++++++++++++----- .../taskdefs/ExecStreamRedirectorTest.java | 91 +++++++++++++ 5 files changed, 322 insertions(+), 35 deletions(-) create mode 100644 src/etc/testcases/taskdefs/exec/exec-with-redirector.xml create mode 100644 src/tests/junit/org/apache/tools/ant/taskdefs/ExecStreamRedirectorTest.java diff --git a/WHATSNEW b/WHATSNEW index 2b383da3e..235e7236b 100644 --- a/WHATSNEW +++ b/WHATSNEW @@ -23,6 +23,10 @@ Fixed bugs: suggesting the fix. Bugzilla Report 19516 + * Fixed an issue where the content redirected from output/error + streams of a process, could end up being truncated. + Bugzilla Report 58833, 58451 + Other changes: -------------- diff --git a/src/etc/testcases/taskdefs/exec/exec-with-redirector.xml b/src/etc/testcases/taskdefs/exec/exec-with-redirector.xml new file mode 100644 index 000000000..27b680d1c --- /dev/null +++ b/src/etc/testcases/taskdefs/exec/exec-with-redirector.xml @@ -0,0 +1,122 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/src/main/org/apache/tools/ant/taskdefs/PumpStreamHandler.java b/src/main/org/apache/tools/ant/taskdefs/PumpStreamHandler.java index 9a2394721..31671fc13 100644 --- a/src/main/org/apache/tools/ant/taskdefs/PumpStreamHandler.java +++ b/src/main/org/apache/tools/ant/taskdefs/PumpStreamHandler.java @@ -21,6 +21,7 @@ package org.apache.tools.ant.taskdefs; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.concurrent.TimeUnit; import org.apache.tools.ant.util.FileUtils; @@ -183,12 +184,21 @@ public class PumpStreamHandler implements ExecuteStreamHandler { if (!t.isAlive()) { return; } - + StreamPumper.PostStopHandle postStopHandle = null; if (s != null && !s.isFinished()) { - s.stop(); + postStopHandle = s.stop(); + } + if (postStopHandle != null && postStopHandle.isInPostStopTasks()) { + // the stream pumper is in post stop tasks (like flushing output), which + // indicates that the stream pumper has respected the stop request and + // is cleaning up before finishing. Give it some time to finish this + // post stop activity, before trying to force interrupt the underlying thread + // of the stream pumper + postStopHandle.awaitPostStopCompletion(2, TimeUnit.SECONDS); } - t.join(JOIN_TIMEOUT); while ((s == null || !s.isFinished()) && t.isAlive()) { + // we waited for the thread/stream pumper to finish, but it hasn't yet. + // so we interrupt it t.interrupt(); t.join(JOIN_TIMEOUT); } diff --git a/src/main/org/apache/tools/ant/taskdefs/StreamPumper.java b/src/main/org/apache/tools/ant/taskdefs/StreamPumper.java index 676b5fb52..c7afc4d20 100644 --- a/src/main/org/apache/tools/ant/taskdefs/StreamPumper.java +++ b/src/main/org/apache/tools/ant/taskdefs/StreamPumper.java @@ -17,11 +17,13 @@ */ package org.apache.tools.ant.taskdefs; +import org.apache.tools.ant.util.FileUtils; + import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; - -import org.apache.tools.ant.util.FileUtils; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; /** * Copies all data from an input stream to an output stream. @@ -34,7 +36,7 @@ public class StreamPumper implements Runnable { private final InputStream is; private final OutputStream os; - private volatile boolean finish; + private volatile boolean askedToStop; private volatile boolean finished; private final boolean closeWhenExhausted; private boolean autoflush = false; @@ -42,6 +44,7 @@ public class StreamPumper implements Runnable { private int bufferSize = SMALL_BUFFER_SIZE; private boolean started = false; private final boolean useAvailable; + private PostStopHandle postStopHandle; /** * Create a new StreamPumper. @@ -58,7 +61,6 @@ public class StreamPumper implements Runnable { /** * Create a new StreamPumper. - * *

Note: If you set useAvailable to true, you must * explicitly invoke {@link #stop stop} or interrupt the * corresponding Thread when you are done or the run method will @@ -120,41 +122,29 @@ public class StreamPumper implements Runnable { final byte[] buf = new byte[bufferSize]; - int length; try { - while (true) { + int length; + while (!this.askedToStop && !Thread.interrupted()) { waitForInput(is); - if (finish || Thread.interrupted()) { + if (askedToStop || Thread.interrupted()) { break; } length = is.read(buf); - if (length <= 0 || Thread.interrupted()) { + if (length < 0) { + // EOF break; } - os.write(buf, 0, length); - if (autoflush) { - os.flush(); - } - if (finish) { //NOSONAR - break; - } - } - // On completion, drain any available data (which might be the first data available for quick executions) - if (finish) { - while ((length = is.available()) > 0) { - if (Thread.interrupted()) { - break; - } - length = is.read(buf, 0, Math.min(length, buf.length)); - if (length <= 0) { - break; - } + if (length > 0) { + // we did read something, so write it out os.write(buf, 0, length); + if (autoflush) { + os.flush(); + } } } - os.flush(); + this.doPostStop(); } catch (InterruptedException ie) { // likely PumpStreamHandler trying to stop us } catch (Exception e) { @@ -166,7 +156,7 @@ public class StreamPumper implements Runnable { FileUtils.close(os); } finished = true; - finish = false; + askedToStop = false; synchronized (this) { notifyAll(); } @@ -206,6 +196,7 @@ public class StreamPumper implements Runnable { /** * Get the size in bytes of the read buffer. + * * @return the int size of the read buffer. */ public synchronized int getBufferSize() { @@ -225,19 +216,26 @@ public class StreamPumper implements Runnable { * Note that it may continue to block on the input stream * but it will really stop the thread as soon as it gets EOF * or any byte, and it will be marked as finished. + * @return Returns a {@link PostStopHandle} for the callers to + * know if the status of post-stop activities, that happen, before this + * {@link StreamPumper} is actually finished * @since Ant 1.6.3 + * @since Ant 10.2.0 this method returns a {@link PostStopHandle} */ - /*package*/ synchronized void stop() { - finish = true; + /*package*/ + synchronized PostStopHandle stop() { + askedToStop = true; + postStopHandle = new PostStopHandle(); notifyAll(); + return postStopHandle; } private static final long POLL_INTERVAL = 100; private void waitForInput(InputStream is) - throws IOException, InterruptedException { + throws IOException, InterruptedException { if (useAvailable) { - while (!finish && is.available() == 0) { + while (!askedToStop && is.available() == 0) { if (Thread.interrupted()) { throw new InterruptedException(); } @@ -249,4 +247,66 @@ public class StreamPumper implements Runnable { } } + private void doPostStop() throws IOException { + try { + final byte[] buf = new byte[bufferSize]; + int length; + // We were asked to stop, the contract allows us to do any non-blocking + // final bits of reads, before actually finishing. So we try and drain any (non-blocking) available + // data. We *don't* check the thread interrupt status, anymore, once we start draining this non-blocking + // available data, to allow us to cleanly write out any available data. + if (askedToStop) { + int bytesReadableWithoutBlocking; + while ((bytesReadableWithoutBlocking = is.available()) > 0) { + length = is.read(buf, 0, Math.min(bytesReadableWithoutBlocking, buf.length)); + if (length <= 0) { + break; + } + os.write(buf, 0, length); + } + } + // this can potentially be blocking, but that's OK since our post stop activity is allowed to + // cleanup/flush any data and the PostStopHandle let's the caller control over how long they want + // this to go, before actually interrupting the thread + os.flush(); + } finally { + if (this.postStopHandle != null) { + this.postStopHandle.latch.countDown(); + this.postStopHandle.inPostStopTasks = false; + } + } + } + + /** + * A handle that can be used after {@link #stop()} has been invoked to check if the + * {@link StreamPumper} is in the process of do some post-stop tasks (like flushing + * of streams), before finishing. + */ + final class PostStopHandle { + private boolean inPostStopTasks = true; + private final CountDownLatch latch = new CountDownLatch(1); + + /** + * Returns true if the {@link StreamPumper} is doing post-stop tasks (like flushing of streams). + * Else returns false. + * @return + */ + boolean isInPostStopTasks() { + return inPostStopTasks; + } + + /** + * Waits for a maximum of {@code timeout} time for the post-stop activities to complete. + * + * @param timeout The maximum amount of time to wait for the post-stop activities to complete + * @param timeUnit The unit of {@code timeout} + * @return Returns true if the post-stop activities completed within the specified {@code timeout}. + * Else returns false + * @throws InterruptedException If the current thread was interrupted while waiting + */ + boolean awaitPostStopCompletion(final long timeout, final TimeUnit timeUnit) throws InterruptedException { + return this.latch.await(timeout, timeUnit); + } + } + } diff --git a/src/tests/junit/org/apache/tools/ant/taskdefs/ExecStreamRedirectorTest.java b/src/tests/junit/org/apache/tools/ant/taskdefs/ExecStreamRedirectorTest.java new file mode 100644 index 000000000..ba39fd5e3 --- /dev/null +++ b/src/tests/junit/org/apache/tools/ant/taskdefs/ExecStreamRedirectorTest.java @@ -0,0 +1,91 @@ +package org.apache.tools.ant.taskdefs; + +import org.apache.tools.ant.Project; +import org.apache.tools.ant.ProjectHelper; +import org.junit.Before; +import org.junit.Test; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.util.Arrays; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +/** + * Tests the {@code exec} task which uses a {@code redirector} to redirect its output and error streams + */ +public class ExecStreamRedirectorTest { + + private Project project; + + @Before + public void setUp() throws Exception { + project = new Project(); + project.init(); + final File antFile = new File(System.getProperty("root"), "src/etc/testcases/taskdefs/exec/exec-with-redirector.xml"); + project.setUserProperty("ant.file", antFile.getAbsolutePath()); + final File outputDir = this.createTmpDir(); + project.setUserProperty("output", outputDir.toString()); + ProjectHelper.configureProject(project, antFile); + project.executeTarget("setUp"); + } + + /** + * Tests that the redirected streams of the exec'ed process aren't truncated. + * + * @throws Exception + * @see bz-58451 and + * bz-58833 for more details + */ + @Test + public void testRedirection() throws Exception { + final String dirToList = project.getProperty("dir.to.ls"); + assertNotNull("Directory to list isn't available", dirToList); + assertTrue(dirToList + " is not a directory", new File(dirToList).isDirectory()); + + project.executeTarget("list-dir"); + + // verify the redirected output + final String outputDirPath = project.getProperty("output"); + byte[] dirListingOutput = null; + for (int i = 1; i <= 16; i++) { + final File redirectedOutputFile = new File(outputDirPath, "ls" + i + ".txt"); + assertTrue(redirectedOutputFile + " is missing or not a regular file", redirectedOutputFile.isFile()); + final byte[] redirectedOutput = readAllBytes(redirectedOutputFile); + assertNotNull("No content was redirected to " + redirectedOutputFile, redirectedOutput); + assertFalse("Content in redirected file " + redirectedOutputFile + " was empty", redirectedOutput.length == 0); + if (dirListingOutput != null) { + // compare the directory listing that was redirected to these files. all files should have the same content + assertTrue("Redirected output in file " + redirectedOutputFile + + " doesn't match content in other redirected output file(s)", Arrays.equals(dirListingOutput, redirectedOutput)); + } + dirListingOutput = redirectedOutput; + } + } + + private File createTmpDir() { + final File tmpDir = new File(System.getProperty("java.io.tmpdir"), String.valueOf("temp-" + System.nanoTime())); + tmpDir.mkdir(); + tmpDir.deleteOnExit(); + return tmpDir; + } + + private static byte[] readAllBytes(final File file) throws IOException { + final FileInputStream fis = new FileInputStream(file); + final ByteArrayOutputStream bos = new ByteArrayOutputStream(); + try { + final byte[] dataChunk = new byte[1024]; + int numRead = -1; + while ((numRead = fis.read(dataChunk)) > 0) { + bos.write(dataChunk, 0, numRead); + } + } finally { + fis.close(); + } + return bos.toByteArray(); + } +}