@@ -32,23 +32,38 @@ public class PumpStreamHandler implements ExecuteStreamHandler {
private Thread outputThread;
private Thread outputThread;
private Thread errorThread;
private Thread errorThread;
private StreamPumper inputPump ;
private Thread inputThread ;
private OutputStream out;
private OutputStream out;
private OutputStream err;
private OutputStream err;
private InputStream input;
private InputStream input;
private final boolean nonBlockingRead;
/**
/**
* Construct a new <code>PumpStreamHandler</code>.
* Construct a new <code>PumpStreamHandler</code>.
* @param out the output <code>OutputStream</code>.
* @param out the output <code>OutputStream</code>.
* @param err the error <code>OutputStream</code>.
* @param err the error <code>OutputStream</code>.
* @param input the input <code>InputStream</code>.
* @param input the input <code>InputStream</code>.
* @param nonBlockingRead set it to <code>true</code> if the input should be
* read with simulated non blocking IO.
*/
*/
public PumpStreamHandler(OutputStream out, OutputStream err,
public PumpStreamHandler(OutputStream out, OutputStream err,
InputStream input) {
InputStream input, boolean nonBlockingRead ) {
this.out = out;
this.out = out;
this.err = err;
this.err = err;
this.input = input;
this.input = input;
this.nonBlockingRead = nonBlockingRead;
}
/**
* Construct a new <code>PumpStreamHandler</code>.
* @param out the output <code>OutputStream</code>.
* @param err the error <code>OutputStream</code>.
* @param input the input <code>InputStream</code>.
*/
public PumpStreamHandler(OutputStream out, OutputStream err,
InputStream input) {
this(out, err, input, false);
}
}
/**
/**
@@ -102,7 +117,7 @@ public class PumpStreamHandler implements ExecuteStreamHandler {
*/
*/
public void setProcessInputStream(OutputStream os) {
public void setProcessInputStream(OutputStream os) {
if (input != null) {
if (input != null) {
inputPump = createInputPump(input, os, true );
inputThread = createPump(input, os, true, nonBlockingRead );
} else {
} else {
try {
try {
os.close();
os.close();
@@ -118,9 +133,7 @@ public class PumpStreamHandler implements ExecuteStreamHandler {
public void start() {
public void start() {
outputThread.start();
outputThread.start();
errorThread.start();
errorThread.start();
if (inputPump != null) {
Thread inputThread = new Thread(inputPump);
inputThread.setDaemon(true);
if (inputThread != null) {
inputThread.start();
inputThread.start();
}
}
}
}
@@ -129,10 +142,7 @@ public class PumpStreamHandler implements ExecuteStreamHandler {
* Stop pumping the streams.
* Stop pumping the streams.
*/
*/
public void stop() {
public void stop() {
if (inputPump != null) {
inputPump.stop();
}
finish(inputThread);
try {
try {
err.flush();
err.flush();
@@ -159,6 +169,10 @@ public class PumpStreamHandler implements ExecuteStreamHandler {
* @since Ant 1.8.0
* @since Ant 1.8.0
*/
*/
protected final void finish(Thread t) {
protected final void finish(Thread t) {
if (t == null) {
// nothing to terminate
return;
}
try {
try {
StreamPumper s = null;
StreamPumper s = null;
if (t instanceof ThreadWithPumper) {
if (t instanceof ThreadWithPumper) {
@@ -241,25 +255,30 @@ public class PumpStreamHandler implements ExecuteStreamHandler {
*/
*/
protected Thread createPump(InputStream is, OutputStream os,
protected Thread createPump(InputStream is, OutputStream os,
boolean closeWhenExhausted) {
boolean closeWhenExhausted) {
final Thread result
= new ThreadWithPumper(new StreamPumper(is, os,
closeWhenExhausted,
true));
result.setDaemon(true);
return result;
return createPump(is, os, closeWhenExhausted, true);
}
}
/**
/**
* Creates a stream pumper to copy the given input stream to the
* Creates a stream pumper to copy the given input stream to the
* given output stream. Used for standard input.
* @since Ant 1.6.3
* given output stream.
* @param is the input stream to copy from.
* @param os the output stream to copy to.
* @param closeWhenExhausted if true close the inputstream.
* @param useAvailable set it to <code>true</code> to use simulated non
* blocking IO.
* @return a thread object that does the pumping, subclasses
* should return an instance of {@link ThreadWithPumper
* ThreadWithPumper}.
* @since Ant 1.8.2
*/
*/
/*protected*/ StreamPumper createInputPump(InputStream is, OutputStream os,
boolean closeWhenExhausted) {
StreamPumper pumper = new StreamPumper(is, os, closeWhenExhausted,
false);
pumper.setAutoflush(true);
return pumper;
protected Thread createPump(InputStream is, OutputStream os,
boolean closeWhenExhausted, boolean nonBlockingIO) {
final Thread result
= new ThreadWithPumper(new StreamPumper(is, os,
closeWhenExhausted,
nonBlockingIO));
result.setDaemon(true);
return result;
}
}
/**
/**