Tue, Mar 24th, 2009
Java中的支持重试,有限任务数量线程池
昨天学校网络没有续费没法上网,晚上找事情干,就翻着O’Reilly的《Java线程》(第三版)看了一些内容然后写的实验性质的代码,主要是对于Konachan的线程池代码很不满意而写的。
这些代码主要包括:
- 三个线程复用的线程池(利用ThreadPoolExecutor)
- 通过判断线程池现有的活动线程数量限制ThreadPoolExecutor执行新任务
- 如果执行出错,可以自动重试(这里的“任务”有80%的成功率)
- 利用Hashtable<Integer, String>来记录“任务”的运行状态,如果不在Hashtable中,表示任务已经结束
- 每个“任务”耗时0到10000毫秒(利用Thread.sleep模拟)
然而这个实现也让人觉得不够优雅,也许自己实现一个RejectedExecutionHandler接口的类,然后绑定为ThreadPoolExecutor的弹出策略才是王道吧。另外,觉得应该还需要自己还要再写一个队列。
完整的代码(Eclipse工程在文末):
Main:
package cn.vifix.concurrentLab; import java.util.Hashtable; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class ThreadPoolDemo { public static Hashtable<Integer,String> jobs; public static void main(String[] args) { new ThreadPoolDemo(); } public ThreadPoolDemo() { jobs = new Hashtable<Integer, String>(); int taskCount = 30; for (int i=0; i<taskCount; i++) { jobs.put(i, "todo"); } SynchronousQueue<Runnable> queue = new SynchronousQueue<Runnable>(); ThreadPoolExecutor exec = new ThreadPoolExecutor(3, 3, 20, TimeUnit.SECONDS, queue, new ThreadPoolExecutor.DiscardPolicy()); while(jobs.size() > 0) { try { Thread.sleep(500); } catch (Exception e) { e.printStackTrace(); } if (exec.getActiveCount() == 3) { System.out.println("Thread count == 3; Skip this loop..."); continue; } for (int i = taskCount - 1; i > -1; i--) { if (jobs.containsKey(i)) { if (jobs.get(i) == "todo") { jobs.put(i, "doing"); exec.execute(new ThreadPoolDemoWorker(i)); break; } } } synchronized(System.out) { System.out.print(jobs.size()); System.out.print(" Task Left:"); System.out.print(" List:"); System.out.println(jobs); } } System.out.println("All done."); System.exit(0); } }
Worker:
package cn.vifix.concurrentLab; public class ThreadPoolDemoWorker implements Runnable { private int idx; @Override public void run() { try { new ThreadPoolDemoJob(idx).process(); } catch (Exception e) { System.out.println(e); } } public ThreadPoolDemoWorker(int i) { idx = i; } }
Task:
package cn.vifix.concurrentLab; import java.util.Random; public class ThreadPoolDemoTask { private int idx; private Random random; public ThreadPoolDemoTask(int i) { idx = i; random = new Random(); } public void process() throws Exception { try { Thread.sleep(random.nextInt(10000)); } catch (InterruptedException ex) { ex.printStackTrace(); } this.doTask(); } private void doTask() throws Exception { if (random.nextFloat() > 0.8) { ThreadPoolDemo.jobs.put(idx, "todo"); synchronized(System.out) { System.out.println(" Task " + idx + " failed."); } throw new Exception("Task " + Integer.toString(idx) + " failed, retry."); } else { synchronized(System.out) { System.out.println(" Task " + idx + " done."); } ThreadPoolDemo.jobs.remove(idx); } } }
Eclipse工程,欢迎共同研究!

你好,你卖java线程第三版不,现在买不到了,读英文版的又有点难。不卖借我复印一下也可以,谢了qq:995888399