You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

PumpStreamHandler.java 9.2 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298
  1. /*
  2. * Licensed to the Apache Software Foundation (ASF) under one or more
  3. * contributor license agreements. See the NOTICE file distributed with
  4. * this work for additional information regarding copyright ownership.
  5. * The ASF licenses this file to You under the Apache License, Version 2.0
  6. * (the "License"); you may not use this file except in compliance with
  7. * the License. You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package org.apache.tools.ant.taskdefs;
  19. import java.io.IOException;
  20. import java.io.InputStream;
  21. import java.io.OutputStream;
  22. /**
  23. * Copies standard output and error of subprocesses to standard output and
  24. * error of the parent process.
  25. *
  26. * @since Ant 1.2
  27. */
  28. public class PumpStreamHandler implements ExecuteStreamHandler {
  29. private Thread outputThread;
  30. private Thread errorThread;
  31. private Thread inputThread;
  32. private OutputStream out;
  33. private OutputStream err;
  34. private InputStream input;
  35. private final boolean nonBlockingRead;
  36. /**
  37. * Construct a new <code>PumpStreamHandler</code>.
  38. * @param out the output <code>OutputStream</code>.
  39. * @param err the error <code>OutputStream</code>.
  40. * @param input the input <code>InputStream</code>.
  41. * @param nonBlockingRead set it to <code>true</code> if the input should be
  42. * read with simulated non blocking IO.
  43. */
  44. public PumpStreamHandler(OutputStream out, OutputStream err,
  45. InputStream input, boolean nonBlockingRead) {
  46. this.out = out;
  47. this.err = err;
  48. this.input = input;
  49. this.nonBlockingRead = nonBlockingRead;
  50. }
  51. /**
  52. * Construct a new <code>PumpStreamHandler</code>.
  53. * @param out the output <code>OutputStream</code>.
  54. * @param err the error <code>OutputStream</code>.
  55. * @param input the input <code>InputStream</code>.
  56. */
  57. public PumpStreamHandler(OutputStream out, OutputStream err,
  58. InputStream input) {
  59. this(out, err, input, false);
  60. }
  61. /**
  62. * Construct a new <code>PumpStreamHandler</code>.
  63. * @param out the output <code>OutputStream</code>.
  64. * @param err the error <code>OutputStream</code>.
  65. */
  66. public PumpStreamHandler(OutputStream out, OutputStream err) {
  67. this(out, err, null);
  68. }
  69. /**
  70. * Construct a new <code>PumpStreamHandler</code>.
  71. * @param outAndErr the output/error <code>OutputStream</code>.
  72. */
  73. public PumpStreamHandler(OutputStream outAndErr) {
  74. this(outAndErr, outAndErr);
  75. }
  76. /**
  77. * Construct a new <code>PumpStreamHandler</code>.
  78. */
  79. public PumpStreamHandler() {
  80. this(System.out, System.err);
  81. }
  82. /**
  83. * Set the <code>InputStream</code> from which to read the
  84. * standard output of the process.
  85. * @param is the <code>InputStream</code>.
  86. */
  87. public void setProcessOutputStream(InputStream is) {
  88. createProcessOutputPump(is, out);
  89. }
  90. /**
  91. * Set the <code>InputStream</code> from which to read the
  92. * standard error of the process.
  93. * @param is the <code>InputStream</code>.
  94. */
  95. public void setProcessErrorStream(InputStream is) {
  96. if (err != null) {
  97. createProcessErrorPump(is, err);
  98. }
  99. }
  100. /**
  101. * Set the <code>OutputStream</code> by means of which
  102. * input can be sent to the process.
  103. * @param os the <code>OutputStream</code>.
  104. */
  105. public void setProcessInputStream(OutputStream os) {
  106. if (input != null) {
  107. inputThread = createPump(input, os, true, nonBlockingRead);
  108. } else {
  109. try {
  110. os.close();
  111. } catch (IOException e) {
  112. //ignore
  113. }
  114. }
  115. }
  116. /**
  117. * Start the <code>Thread</code>s.
  118. */
  119. public void start() {
  120. outputThread.start();
  121. errorThread.start();
  122. if (inputThread != null) {
  123. inputThread.start();
  124. }
  125. }
  126. /**
  127. * Stop pumping the streams.
  128. */
  129. public void stop() {
  130. finish(inputThread);
  131. try {
  132. err.flush();
  133. } catch (IOException e) {
  134. // ignore
  135. }
  136. try {
  137. out.flush();
  138. } catch (IOException e) {
  139. // ignore
  140. }
  141. finish(outputThread);
  142. finish(errorThread);
  143. }
  144. private static final long JOIN_TIMEOUT = 200;
  145. /**
  146. * Waits for a thread to finish while trying to make it finish
  147. * quicker by stopping the pumper (if the thread is a {@link
  148. * ThreadWithPumper ThreadWithPumper} instance) or interrupting
  149. * the thread.
  150. *
  151. * @since Ant 1.8.0
  152. */
  153. protected final void finish(Thread t) {
  154. if (t == null) {
  155. // nothing to terminate
  156. return;
  157. }
  158. try {
  159. StreamPumper s = null;
  160. if (t instanceof ThreadWithPumper) {
  161. s = ((ThreadWithPumper) t).getPumper();
  162. }
  163. if (s != null && s.isFinished()) {
  164. return;
  165. }
  166. if (!t.isAlive()) {
  167. return;
  168. }
  169. t.join(JOIN_TIMEOUT);
  170. if (s != null && !s.isFinished()) {
  171. s.stop();
  172. }
  173. while ((s == null || !s.isFinished()) && t.isAlive()) {
  174. t.interrupt();
  175. t.join(JOIN_TIMEOUT);
  176. }
  177. } catch (InterruptedException e) {
  178. // ignore
  179. }
  180. }
  181. /**
  182. * Get the error stream.
  183. * @return <code>OutputStream</code>.
  184. */
  185. protected OutputStream getErr() {
  186. return err;
  187. }
  188. /**
  189. * Get the output stream.
  190. * @return <code>OutputStream</code>.
  191. */
  192. protected OutputStream getOut() {
  193. return out;
  194. }
  195. /**
  196. * Create the pump to handle process output.
  197. * @param is the <code>InputStream</code>.
  198. * @param os the <code>OutputStream</code>.
  199. */
  200. protected void createProcessOutputPump(InputStream is, OutputStream os) {
  201. outputThread = createPump(is, os);
  202. }
  203. /**
  204. * Create the pump to handle error output.
  205. * @param is the input stream to copy from.
  206. * @param os the output stream to copy to.
  207. */
  208. protected void createProcessErrorPump(InputStream is, OutputStream os) {
  209. errorThread = createPump(is, os);
  210. }
  211. /**
  212. * Creates a stream pumper to copy the given input stream to the
  213. * given output stream.
  214. * @param is the input stream to copy from.
  215. * @param os the output stream to copy to.
  216. * @return a thread object that does the pumping.
  217. */
  218. protected Thread createPump(InputStream is, OutputStream os) {
  219. return createPump(is, os, false);
  220. }
  221. /**
  222. * Creates a stream pumper to copy the given input stream to the
  223. * given output stream.
  224. * @param is the input stream to copy from.
  225. * @param os the output stream to copy to.
  226. * @param closeWhenExhausted if true close the inputstream.
  227. * @return a thread object that does the pumping, subclasses
  228. * should return an instance of {@link ThreadWithPumper
  229. * ThreadWithPumper}.
  230. */
  231. protected Thread createPump(InputStream is, OutputStream os,
  232. boolean closeWhenExhausted) {
  233. return createPump(is, os, closeWhenExhausted, true);
  234. }
  235. /**
  236. * Creates a stream pumper to copy the given input stream to the
  237. * given output stream.
  238. * @param is the input stream to copy from.
  239. * @param os the output stream to copy to.
  240. * @param closeWhenExhausted if true close the inputstream.
  241. * @param nonBlockingIO set it to <code>true</code> to use simulated non
  242. * blocking IO.
  243. * @return a thread object that does the pumping, subclasses
  244. * should return an instance of {@link ThreadWithPumper
  245. * ThreadWithPumper}.
  246. * @since Ant 1.8.2
  247. */
  248. protected Thread createPump(InputStream is, OutputStream os,
  249. boolean closeWhenExhausted, boolean nonBlockingIO) {
  250. StreamPumper pumper = new StreamPumper(is, os, closeWhenExhausted, nonBlockingIO);
  251. pumper.setAutoflush(true);
  252. final Thread result = new ThreadWithPumper(pumper);
  253. result.setDaemon(true);
  254. return result;
  255. }
  256. /**
  257. * Specialized subclass that allows access to the running StreamPumper.
  258. *
  259. * @since Ant 1.8.0
  260. */
  261. protected static class ThreadWithPumper extends Thread {
  262. private final StreamPumper pumper;
  263. public ThreadWithPumper(StreamPumper p) {
  264. super(p);
  265. pumper = p;
  266. }
  267. protected StreamPumper getPumper() {
  268. return pumper;
  269. }
  270. }
  271. }