Browse Source

emulate async I/O when processing output of forked processes in order to deal with the case where a child of the forked process outlives its parent. PR 5003. Based on a patch by Adam Sotona.

git-svn-id: https://svn.apache.org/repos/asf/ant/core/trunk@711860 13f79535-47bb-0310-9956-ffa450edef68
master
Stefan Bodewig 16 years ago
parent
commit
9770fe02f9
5 changed files with 132 additions and 22 deletions
  1. +1
    -0
      CONTRIBUTORS
  2. +8
    -0
      WHATSNEW
  3. +4
    -0
      contributors.xml
  4. +55
    -13
      src/main/org/apache/tools/ant/taskdefs/PumpStreamHandler.java
  5. +64
    -9
      src/main/org/apache/tools/ant/taskdefs/StreamPumper.java

+ 1
- 0
CONTRIBUTORS View File

@@ -2,6 +2,7 @@ Amongst other, the following people contributed to ant:

Adam Blinkinsop
Adam Bryzak
Adam Sotona
Aleksandr Ishutin
Alexey Panchenko
Alexey Solofnenko


+ 8
- 0
WHATSNEW View File

@@ -113,6 +113,14 @@ Changes that could break older environments:
http://ant.apache.org/antlibs/dotnet/index.html
instead.

* the logic of closing streams connected to forked processes (read
the input and output of <exec> and friends) has been changed to
deal with cases where child processes of the forked processes live
longer than their parents and keep Ant from exiting.
It is unlikely but possible that the changed logic breaks stream
handling on certain Java VMs.
Bugzilla issue 5003.

Fixed bugs:
-----------



+ 4
- 0
contributors.xml View File

@@ -38,6 +38,10 @@
<first>Adam</first>
<last>Bryzak</last>
</name>
<name>
<first>Adam</first>
<last>Sotona</last>
</name>
<name>
<first>Aleksandr</first>
<last>Ishutin</last>


+ 55
- 13
src/main/org/apache/tools/ant/taskdefs/PumpStreamHandler.java View File

@@ -21,6 +21,7 @@ package org.apache.tools.ant.taskdefs;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.tools.ant.taskdefs.condition.Os;

/**
* Copies standard output and error of subprocesses to standard output and
@@ -129,16 +130,8 @@ public class PumpStreamHandler implements ExecuteStreamHandler {
* Stop pumping the streams.
*/
public void stop() {
try {
outputThread.join();
} catch (InterruptedException e) {
// ignore
}
try {
errorThread.join();
} catch (InterruptedException e) {
// ignore
}
finish(outputThread);
finish(errorThread);

if (inputPump != null) {
inputPump.stop();
@@ -156,6 +149,35 @@ public class PumpStreamHandler implements ExecuteStreamHandler {
}
}

private static final long JOIN_TIMEOUT = 500;

/**
* Waits for a thread to finish while trying to make it finish
* quicker by stopping the pumper (if the thread is a {@link
* ThreadWithPumper ThreadWithPumper} instance) or interrupting
* the thread.
*
* @since Ant 1.8.0
*/
protected final void finish(Thread t) {
try {
t.join(JOIN_TIMEOUT);
StreamPumper s = null;
if (t instanceof ThreadWithPumper) {
s = ((ThreadWithPumper) t).getPumper();
}
if (s != null && !s.isFinished()) {
s.stop();
}
while ((s == null || !s.isFinished()) && t.isAlive()) {
t.interrupt();
t.join(JOIN_TIMEOUT);
}
} catch (InterruptedException e) {
// ignore
}
}

/**
* Get the error stream.
* @return <code>OutputStream</code>.
@@ -207,12 +229,16 @@ public class PumpStreamHandler implements ExecuteStreamHandler {
* @param is the input stream to copy from.
* @param os the output stream to copy to.
* @param closeWhenExhausted if true close the inputstream.
* @return a thread object that does the pumping.
* @return a thread object that does the pumping, subclasses
* should return an instance of {@link ThreadWithPumper
* ThreadWithPumper}.
*/
protected Thread createPump(InputStream is, OutputStream os,
boolean closeWhenExhausted) {
final Thread result
= new Thread(new StreamPumper(is, os, closeWhenExhausted));
= new ThreadWithPumper(new StreamPumper(is, os,
closeWhenExhausted,
Os.isFamily("windows")));
result.setDaemon(true);
return result;
}
@@ -224,9 +250,25 @@ public class PumpStreamHandler implements ExecuteStreamHandler {
*/
/*protected*/ StreamPumper createInputPump(InputStream is, OutputStream os,
boolean closeWhenExhausted) {
StreamPumper pumper = new StreamPumper(is, os, closeWhenExhausted);
StreamPumper pumper = new StreamPumper(is, os, closeWhenExhausted,
false);
pumper.setAutoflush(true);
return pumper;
}

/**
* Specialized subclass that allows access to the running StreamPumper.
*
* @since Ant 1.8.0
*/
protected static class ThreadWithPumper extends Thread {
private final StreamPumper pumper;
public ThreadWithPumper(StreamPumper p) {
super(p);
pumper = p;
}
protected StreamPumper getPumper() {
return pumper;
}
}
}

+ 64
- 9
src/main/org/apache/tools/ant/taskdefs/StreamPumper.java View File

@@ -20,6 +20,7 @@ package org.apache.tools.ant.taskdefs;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.tools.ant.util.FileUtils;

/**
* Copies all data from an input stream to an output stream.
@@ -30,15 +31,16 @@ public class StreamPumper implements Runnable {

private static final int SMALL_BUFFER_SIZE = 128;

private InputStream is;
private OutputStream os;
private final InputStream is;
private final OutputStream os;
private volatile boolean finish;
private volatile boolean finished;
private boolean closeWhenExhausted;
private final boolean closeWhenExhausted;
private boolean autoflush = false;
private Exception exception = null;
private int bufferSize = SMALL_BUFFER_SIZE;
private boolean started = false;
private final boolean useAvailable;

/**
* Create a new StreamPumper.
@@ -49,9 +51,40 @@ public class StreamPumper implements Runnable {
* the input is exhausted.
*/
public StreamPumper(InputStream is, OutputStream os, boolean closeWhenExhausted) {
this(is, os, closeWhenExhausted, false);
}


/**
* Create a new StreamPumper.
*
* <p><b>Note:</b> If you set useAvailable to true, you must
* explicitly invoke {@link #stop stop} or interrupt the
* corresponding Thread when you are done or the run method will
* never finish on some JVMs (namely those where available returns
* 0 on a closed stream). Setting it to true may also impact
* performance negatively. This flag should only be set to true
* if you intend to stop the pumper before the input stream gets
* closed.</p>
*
* @param is input stream to read data from
* @param os output stream to write data to.
* @param closeWhenExhausted if true, the output stream will be closed when
* the input is exhausted.
* @param useAvailable whether the pumper should use {@link
* java.io.InputStream#available available} to determine
* whether input is ready, thus trying to emulate
* non-blocking behavior.
*
* @since Ant 1.8.0
*/
public StreamPumper(InputStream is, OutputStream os,
boolean closeWhenExhausted,
boolean useAvailable) {
this.is = is;
this.os = os;
this.closeWhenExhausted = closeWhenExhausted;
this.useAvailable = useAvailable;
}

/**
@@ -90,8 +123,14 @@ public class StreamPumper implements Runnable {
int length;
try {
while (true) {
waitForInput(is);

if (finish || Thread.interrupted()) {
break;
}

length = is.read(buf);
if ((length <= 0) || finish) {
if (length <= 0 || finish || Thread.interrupted()) {
break;
}
os.write(buf, 0, length);
@@ -100,17 +139,15 @@ public class StreamPumper implements Runnable {
}
}
os.flush();
} catch (InterruptedException ie) {
// likely PumpStreamHandler trying to stop us
} catch (Exception e) {
synchronized (this) {
exception = e;
}
} finally {
if (closeWhenExhausted) {
try {
os.close();
} catch (IOException e) {
// ignore
}
FileUtils.close(os);
}
finished = true;
synchronized (this) {
@@ -177,4 +214,22 @@ public class StreamPumper implements Runnable {
finish = true;
notifyAll();
}

private static final long POLL_INTERVAL = 100;

private void waitForInput(InputStream is)
throws IOException, InterruptedException {
if (useAvailable) {
while (!finish && is.available() == 0) {
if (Thread.interrupted()) {
throw new InterruptedException();
}

synchronized (this) {
this.wait(POLL_INTERVAL);
}
}
}
}

}

Loading…
Cancel
Save