@@ -56,6 +56,8 @@ package org.apache.tools.ant.taskdefs;
import java.lang.reflect.Method;
import java.util.Enumeration;
import java.util.Vector;
import java.util.List;
import java.util.ArrayList;
import org.apache.tools.ant.BuildException;
import org.apache.tools.ant.Location;
import org.apache.tools.ant.Task;
@@ -86,6 +88,21 @@ import org.apache.tools.ant.util.StringUtils;
public class Parallel extends Task
implements TaskContainer {
/** Class which holds a list of tasks to execute */
public static class TaskList implements TaskContainer {
/** Collection holding the nested tasks */
private List tasks = new ArrayList();
/**
* Add a nested task to execute parallel (asynchron).
* <p>
* @param nestedTask Nested task to be executed in parallel
*/
public void addTask(Task nestedTask) throws BuildException {
tasks.add(nestedTask);
}
}
/** Collection holding the nested tasks */
private Vector nestedTasks = new Vector();
@@ -98,12 +115,13 @@ public class Parallel extends Task
/** Total number of threads per processor to run. */
private int numThreadsPerProcessor = 0;
/** The timeout period in milliseconds */
private long timeout;
/** Indicates threads are still running and new threads can be issued */
private volatile boolean stillRunning;
/** IN dicates that the execution timedout */
/** In dicates that the execution timedout */
private boolean timedOut;
/**
@@ -112,6 +130,31 @@ public class Parallel extends Task
*/
private boolean failOnAny;
/** The dameon task list if any */
private TaskList daemonTasks;
/** Accumulation of exceptions messages from all nested tasks */
private StringBuffer exceptionMessage;
/** Number of exceptions from nested tasks */
private int numExceptions = 0;
/** The first exception encountered */
private Throwable firstException;
/** The location of the first exception */
private Location firstLocation;
/**
* Add a group of daemon threads
*/
public void addDaemons(TaskList daemonTasks) {
if (this.daemonTasks != null) {
throw new BuildException("Only one daemon group is supported");
}
this.daemonTasks = daemonTasks;
}
/**
* Interval to poll for completed threads when threadCount or
* threadsPerProcessor is specified. Integer in milliseconds.; optional
@@ -211,6 +254,27 @@ public class Parallel extends Task
}
}
private void processExceptions(TaskRunnable[] runnables) {
if (runnables == null) {
return;
}
for (int i = 0; i < runnables.length; ++i) {
Throwable t = runnables[i].getException();
if (t != null) {
numExceptions++;
if (firstException == null) {
firstException = t;
}
if (t instanceof BuildException
&& firstLocation == Location.UNKNOWN_LOCATION) {
firstLocation = ((BuildException) t).getLocation();
}
exceptionMessage.append(StringUtils.LINE_SEP);
exceptionMessage.append(t.getMessage());
}
}
}
/**
* Spin up required threads with a maximum number active at any given time.
*
@@ -227,7 +291,7 @@ public class Parallel extends Task
threadNumber++) {
Task nestedTask = (Task) e.nextElement();
runnables[threadNumber]
= new TaskRunnable(threadNumber, nestedTask);
= new TaskRunnable(nestedTask);
}
final int maxRunning = numTasks < numThreads ? numTasks : numThreads;
@@ -236,37 +300,52 @@ public class Parallel extends Task
threadNumber = 0;
ThreadGroup group = new ThreadGroup("parallel");
// now run them in limited numbers...
// start initial batch of threads
for (int i = 0; i < maxRunning; ++i) {
running[i] = runnables[threadNumber++];
Thread thread = new Thread(group, running[i]);
thread.start();
TaskRunnable[] daemons = null;
if (daemonTasks != null && daemonTasks.tasks.size() != 0) {
daemons = new TaskRunnable[daemonTasks.tasks.size()];
}
if (timeout != 0) {
// start the timeout thread
Thread timeoutThread = new Thread() {
public synchronized void run() {
try {
wait(timeout);
synchronized (semaphore) {
stillRunning = false;
timedOut = true;
semaphore.notifyAll();
synchronized (semaphore) {
// start any daemon threads
if (daemons != null) {
for (int i = 0; i < daemons.length; ++i) {
daemons[i] = new TaskRunnable((Task) daemonTasks.tasks.get(i));
Thread daemonThread = new Thread(group, daemons[i]);
daemonThread.setDaemon(true);
daemonThread.start();
}
}
// now run main threads in limited numbers...
// start initial batch of threads
for (int i = 0; i < maxRunning; ++i) {
running[i] = runnables[threadNumber++];
Thread thread = new Thread(group, running[i]);
thread.start();
}
if (timeout != 0) {
// start the timeout thread
Thread timeoutThread = new Thread() {
public synchronized void run() {
try {
wait(timeout);
synchronized (semaphore) {
stillRunning = false;
timedOut = true;
semaphore.notifyAll();
}
} catch (InterruptedException e) {
// ignore
}
} catch (InterruptedException e) {
// ignore
}
}
};
timeoutThread.start();
}
};
timeoutThread.start();
}
// now find available running slots for the remaining threads
outer:
while (threadNumber < numTasks && stillRunning) {
synchronized (semaphore) {
// now find available running slots for the remaining threads
outer:
while (threadNumber < numTasks && stillRunning) {
for (int i = 0; i < maxRunning; i++) {
if (running[i] == null || running[i].finished) {
running[i] = runnables[threadNumber++];
@@ -288,9 +367,7 @@ public class Parallel extends Task
// sheesh!
}
}
}
synchronized (semaphore) {
// are all threads finished
outer2:
while (stillRunning) {
@@ -315,25 +392,12 @@ public class Parallel extends Task
}
// now did any of the threads throw an exception
StringBuffer exceptionMessage = new StringBuffer();
int numExceptions = 0;
Throwable firstException = null;
Location firstLocation = Location.UNKNOWN_LOCATION;
for (int i = 0; i < numTasks; ++i) {
Throwable t = runnables[i].getException();
if (t != null) {
numExceptions++;
if (firstException == null) {
firstException = t;
}
if (t instanceof BuildException
&& firstLocation == Location.UNKNOWN_LOCATION) {
firstLocation = ((BuildException) t).getLocation();
}
exceptionMessage.append(StringUtils.LINE_SEP);
exceptionMessage.append(t.getMessage());
}
}
exceptionMessage = new StringBuffer();
numExceptions = 0;
firstException = null;
firstLocation = Location.UNKNOWN_LOCATION;
processExceptions(daemons);
processExceptions(runnables);
if (numExceptions == 1) {
if (firstException instanceof BuildException) {
@@ -373,7 +437,6 @@ public class Parallel extends Task
private class TaskRunnable implements Runnable {
private Throwable exception;
private Task task;
private int taskNumber;
boolean finished;
/**
@@ -381,9 +444,8 @@ public class Parallel extends Task
*
* @param task the Task to be executed in a seperate thread
*/
TaskRunnable(int taskNumber, Task task) {
TaskRunnable(Task task) {
this.task = task;
this.taskNumber = taskNumber;
}
/**