Browse Source

Rework parallel

Remove need for poll interval (only covered race condition between isAlive
and notifyAll calls
Add timeout capability
Add flag to fail if any nested task fails without waiting for other tasks
to complete.


git-svn-id: https://svn.apache.org/repos/asf/ant/core/trunk@274937 13f79535-47bb-0310-9956-ffa450edef68
master
Conor MacNeill 22 years ago
parent
commit
89847cdf7d
2 changed files with 158 additions and 59 deletions
  1. +42
    -29
      docs/manual/CoreTasks/parallel.html
  2. +116
    -30
      src/main/org/apache/tools/ant/taskdefs/Parallel.java

+ 42
- 29
docs/manual/CoreTasks/parallel.html View File

@@ -31,10 +31,20 @@ task within the parallel task will be executed in its own thread. </p>
</tr>
<tr>
<td valign="top">pollInterval</td>
<td valign="top">Maximum number of milliseconds to wait for before checking
when waiting for available threads.</td>
<td valign="top">Currently has no effect</td>
<td align="center" valign="top">No, default is 1000</td>
</tr>
<tr>
<td valign="top">timeout</td>
<td valign="top">Number of milliseconds before execution is terminated</td>
<td align="center" valign="top">No</td>
</tr>
<tr>
<td valign="top">failonany</td>
<td valign="top">If any of the nested tasks fails, execution of the task completes
at that point without waiting for any other tasks to complete.</td>
<td align="center" valign="top">No</td>
</tr>
</table>

<p>Parallel tasks have a number of uses in an Ant build file including:</p>
@@ -45,22 +55,25 @@ harness is run in another thread.</li>
</ul>

<p>Care must be taken when using multithreading to ensure the tasks within the
threads do not interact. For example, two javac compile tasks which write
threads do not interact. For example, two javac compile tasks which write
classes into the same destination directory may interact where one tries to
read a class for dependency information while the other task is writing the
class file. Be sure to avoid these types of interactions within a
read a class for dependency information while the other task is writing the
class file. Be sure to avoid these types of interactions within a
&lt;parallel&gt; task</p>
<p>The parallel task has no attributes and does not support any nested
elements apart from Ant tasks. Any valid Ant task may be embedded within a

<p>Any valid Ant task may be embedded within a
parallel task, including other parallel tasks.</p>

<p>Note that while the tasks within the parallel task are being run, the main
thread will be blocked waiting for all the child threads to complete.</p>
<p>Note that while the tasks within the parallel task are being run, the main
thread will be blocked waiting for all the child threads to complete. If
execution is terminated by a timeout or a nested task failure when the failonany
flag is set, the parallel task will complete without waiting for other nested
tasks to complete in other threads.
</p>

<p>If any of the tasks within the &lt;parallel&gt; task fails, the remaining
tasks in other threads will continue to run until all threads have completed.
In this situation, the parallel task will also fail.</p>
<p>If any of the tasks within the &lt;parallel&gt; task fails and failonany is
not set, the remaining tasks in other threads will continue to run until
all threads have completed. In this situation, the parallel task will also fail.</p>

<p>The parallel task may be combined with the <a href="sequential.html">
sequential</a> task to define sequences of tasks to be executed on each thread
@@ -70,7 +83,7 @@ within the parallel block</p>
threads for the execution. When not present all child tasks will be executed at
once. When present then the maximum number of concurrently executing tasks will
not exceed the number of threads specified. Furthermore, each task will be
started in the order they are given. But no guarantee is made as to the speed
started in the order they are given. But no guarantee is made as to the speed
of execution or the order of completion of the tasks, only that each will be
started before the next.<p>

@@ -80,11 +93,11 @@ processors (there is no affinity to a particular processor however). This will
override the value in threadCount. If threadsPerProcessor is specified using
any version prior to 1.4 then the value in threadCount will be used as is.</p>

<p>When using threadCount and threadsPerProcessor care should be taken to insure
<p>When using threadCount and threadsPerProcessor care should be taken to ensure
that the build does not deadlock. This can be caused by tasks such as waitFor
takeing up all available threads before the tasks that would unlock the waitfor
taking up all available threads before the tasks that would unlock the waitfor
would occur. This is not a repalcement for Java Language level thread
semantics and is best used for "embarasingly parallel" tasks.</p>
semantics and is best used for "embarassingly parallel" tasks.</p>

<h3>Examples</h3>
<pre>
@@ -97,14 +110,14 @@ semantics and is best used for "embarasingly parallel" tasks.</p>
&lt;/sequential&gt;
&lt;/parallel&gt;
</pre>
<p>This example represents a typical pattern for testing a server application.
In one thread the server is started (the wlrun task). The other thread consists
of a three tasks which are performed in sequence. The sleep task is used to
give the server time to come up. Another task which is capable of validating
that the server is available could be used in place of the sleep task. The
test harness is then run. Once the tests are complete, the server is stopped
(using wlstop in this example), allowing both threads to complete. The
parallel task will also complete at this time and the build will then
<p>This example represents a typical pattern for testing a server application.
In one thread the server is started (the wlrun task). The other thread consists
of a three tasks which are performed in sequence. The sleep task is used to
give the server time to come up. Another task which is capable of validating
that the server is available could be used in place of the sleep task. The
test harness is then run. Once the tests are complete, the server is stopped
(using wlstop in this example), allowing both threads to complete. The
parallel task will also complete at this time and the build will then
continue.</p>

<pre>
@@ -114,10 +127,10 @@ continue.</p>
&lt;/parallel&gt;
</pre>

<p>This example shows two independent tasks being run to achieve better
<p>This example shows two independent tasks being run to achieve better
resource utilization during the build. In this instance, some servlets are being
compiled in one thead and a set of JSPs is being precompiled in another. As
noted above, you need to be careful that the two tasks are independent, both in
compiled in one thead and a set of JSPs is being precompiled in another. As
noted above, you need to be careful that the two tasks are independent, both in
terms of their dependencies and in terms of their potential interactions in
Ant's external environment.</p>

@@ -136,7 +149,7 @@ Ant's external environment.</p>
&lt;/parallel&gt;
</pre>

<p>This example represents a typical need for use of the threadCount and
<p>This example represents a typical need for use of the threadCount and
threadsPerProcessor attributes. Spinning up all 40 of those tasks could cripple
the JVM for memory and the CPU for available time. By limiting the number of
concurrent executions you can get the task done in about the same assuming


+ 116
- 30
src/main/org/apache/tools/ant/taskdefs/Parallel.java View File

@@ -98,8 +98,40 @@ public class Parallel extends Task
/** Total number of threads per processor to run. */
private int numThreadsPerProcessor = 0;

/** Interval (in ms) to poll for finished threads. */
private int pollInterval = 1000; // default is once a second
private long timeout;

/** Indicates threads are still running and new threads can be issued */
private volatile boolean stillRunning;

/** INdicates that the execution timedout */
private boolean timedOut;

/**
* Indicates whether failure of any of the nested tasks should end
* execution
*/
private boolean failOnAny;

/**
* Interval to poll for completed threads when threadCount or
* threadsPerProcessor is specified. Integer in milliseconds.; optional
*
* @param pollInterval New value of property pollInterval.
*/
public void setPollInterval(int pollInterval) {
}

/**
* Control whether a failure in a nested task halts execution. Note that
* the task will complete but existing threads will continue to run - they
* are not stopped
*
* @param failOnAny if true any nested task failure causes parallel to
* complete.
*/
public void setFailOnAny(boolean failOnAny) {
this.failOnAny = failOnAny;
}

/**
* Add a nested task to execute in parallel.
@@ -140,15 +172,20 @@ public class Parallel extends Task
}

/**
* Interval to poll for completed threads when threadCount or
* threadsPerProcessor is specified. Integer in milliseconds.; optional
* Sets the timeout on this set of tasks. If the timeout is reached
* before the other threads complete, the execution of this
* task completes with an exception.
*
* @param pollInterval New value of property pollInterval.
* Note that existing threads continue to run.
*
* @param timeout timeout in milliseconds.
*/
public void setPollInterval(int pollInterval) {
this.pollInterval = pollInterval;
public void setTimeout(long timeout) {
this.timeout = timeout;
}



/**
* Execute the parallel tasks
*
@@ -181,58 +218,102 @@ public class Parallel extends Task
*/
private void spinThreads() throws BuildException {
final int numTasks = nestedTasks.size();
Thread[] threads = new Thread[numTasks];
TaskRunnable[] runnables = new TaskRunnable[numTasks];
stillRunning = true;
timedOut = false;

int threadNumber = 0;
for (Enumeration e = nestedTasks.elements(); e.hasMoreElements();
threadNumber++) {
Task nestedTask = (Task) e.nextElement();
ThreadGroup group = new ThreadGroup("parallel");
TaskRunnable taskRunnable
runnables[threadNumber]
= new TaskRunnable(threadNumber, nestedTask);
runnables[threadNumber] = taskRunnable;
threads[threadNumber] = new Thread(group, taskRunnable);
}

final int maxRunning = numThreads;
Thread[] running = new Thread[maxRunning];
final int maxRunning = numTasks < numThreads ? numTasks : numThreads;
TaskRunnable[] running = new TaskRunnable[maxRunning];

threadNumber = 0;
ThreadGroup group = new ThreadGroup("parallel");

// now run them in limited numbers...
// start initial batch of threads
for (int i = 0; i < maxRunning; ++i) {
running[i] = runnables[threadNumber++];
Thread thread = new Thread(group, running[i]);
thread.start();
}

if (timeout != 0) {
// start the timeout thread
Thread timeoutThread = new Thread() {
public synchronized void run() {
try {
wait(timeout);
synchronized(semaphore) {
stillRunning = false;
timedOut = true;
semaphore.notifyAll();
}
} catch (InterruptedException e) {
// ignore
}
}
};
timeoutThread.start();
}

// now find available running slots for the remaining threads
outer:
while (threadNumber < numTasks) {
while (threadNumber < numTasks && stillRunning) {
synchronized (semaphore) {
for (int i = 0; i < maxRunning; i++) {
if (running[i] == null || !running[i].isAlive()) {
running[i] = threads[threadNumber++];
running[i].start();
// countinue on outer while loop in case we
// used our last thread
if (running[i] == null || running[i].finished) {
running[i] = runnables[threadNumber++];
Thread thread = new Thread(group, running[i]);
thread.start();
// countinue on outer while loop to get another
// available slot
continue outer;
}
}
// if we got here all are running, so sleep a little

// if we got here all slots in use, so sleep until
// something happens
try {
semaphore.wait(pollInterval);
semaphore.wait();
} catch (InterruptedException ie) {
// dosen't java know interruptions are rude?
// just pretend it didn't happen and go aobut out business.
// just pretend it didn't happen and go about out business.
// sheesh!
}
}
}

// now join to all the threads
for (int i = 0; i < maxRunning; ++i) {
try {
if (running[i] != null) {
running[i].join();
synchronized(semaphore) {
// are all threads finished
outer2:
while (stillRunning) {
for (int i = 0; i < maxRunning; ++i) {
if (running[i] != null && !running[i].finished) {
//System.out.println("Thread " + i + " is still alive ");
// still running - wait for it
try {
semaphore.wait();
} catch (InterruptedException ie) {
// who would interrupt me at a time like this?
}
continue outer2;
}
}
} catch (InterruptedException ie) {
// who would interrupt me at a time like this?
stillRunning = false;
}
}

if (timedOut) {
throw new BuildException("Parallel execution timed out");
}

// now did any of the threads throw an exception
StringBuffer exceptionMessage = new StringBuffer();
int numExceptions = 0;
@@ -293,6 +374,7 @@ public class Parallel extends Task
private Throwable exception;
private Task task;
private int taskNumber;
boolean finished;

/**
* Construct a new TaskRunnable.<p>
@@ -315,6 +397,10 @@ public class Parallel extends Task
exception = t;
} finally {
synchronized (semaphore) {
finished = true;
if (failOnAny) {
stillRunning = false;
}
semaphore.notifyAll();
}
}


Loading…
Cancel
Save