Java中的支持重试,有限任务数量线程池

threadpooldemo_output

昨天学校网络没有续费没法上网,晚上找事情干,就翻着O’Reilly的《Java线程》(第三版)看了一些内容然后写的实验性质的代码,主要是对于Konachan的线程池代码很不满意而写的。

这些代码主要包括:

  1. 三个线程复用的线程池(利用ThreadPoolExecutor)
  2. 通过判断线程池现有的活动线程数量限制ThreadPoolExecutor执行新任务
  3. 如果执行出错,可以自动重试(这里的“任务”有80%的成功率)
  4. 利用Hashtable<Integer, String>来记录“任务”的运行状态,如果不在Hashtable中,表示任务已经结束
  5. 每个“任务”耗时0到10000毫秒(利用Thread.sleep模拟)

然而这个实现也让人觉得不够优雅,也许自己实现一个RejectedExecutionHandler接口的类,然后绑定为ThreadPoolExecutor的弹出策略才是王道吧。另外,觉得应该还需要自己还要再写一个队列。

完整的代码(Eclipse工程在文末):

Main:

package cn.vifix.concurrentLab;
import java.util.Hashtable;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
 
public class ThreadPoolDemo
{
	public static Hashtable<Integer,String> jobs;
 
	public static void main(String[] args)
	{
		new ThreadPoolDemo();
	}
 
	public ThreadPoolDemo()
	{
		jobs = new Hashtable<Integer, String>();
		int taskCount = 30;
		for (int i=0; i<taskCount; i++)
		{
			jobs.put(i, "todo");
		}
		SynchronousQueue<Runnable> queue = new SynchronousQueue<Runnable>();
		ThreadPoolExecutor exec = new ThreadPoolExecutor(3, 3, 20, TimeUnit.SECONDS, queue, new ThreadPoolExecutor.DiscardPolicy());
		while(jobs.size() > 0)
		{
			try
			{
				Thread.sleep(500);
			}
			catch (Exception e)
			{
				e.printStackTrace();
			}
			if (exec.getActiveCount() == 3)
			{
				System.out.println("Thread count == 3; Skip this loop...");
				continue;
			}
			for (int i = taskCount - 1; i > -1; i--)
			{
				if (jobs.containsKey(i))
				{
					if (jobs.get(i) == "todo")
					{
						jobs.put(i, "doing");
						exec.execute(new ThreadPoolDemoWorker(i));
						break;
					}
				}
			}
			synchronized(System.out)
			{
				System.out.print(jobs.size());
				System.out.print(" Task Left:");
				System.out.print(" List:");
				System.out.println(jobs);
			}
		}
		System.out.println("All done.");
		System.exit(0);
	}
}

Worker:

package cn.vifix.concurrentLab;
 
public class ThreadPoolDemoWorker implements Runnable
{
	private int idx;
 
	@Override
	public void run()
	{
		try
		{
			new ThreadPoolDemoJob(idx).process();
		}
		catch (Exception e)
		{
			System.out.println(e);
		}
	}
 
	public ThreadPoolDemoWorker(int i)
	{
		idx = i;
	}
}

Task:

package cn.vifix.concurrentLab;
import java.util.Random;
 
public class ThreadPoolDemoTask
{
	private int idx;
	private Random random;
 
	public ThreadPoolDemoTask(int i)
	{
		idx = i;
		random = new Random();
	}
	public void process() throws Exception
	{
		try
		{
			Thread.sleep(random.nextInt(10000));
		}
		catch (InterruptedException ex)
		{
			ex.printStackTrace();
		}
		this.doTask();
	}
 
	private void doTask() throws Exception
	{
		if (random.nextFloat() > 0.8)
		{
			ThreadPoolDemo.jobs.put(idx, "todo");
			synchronized(System.out)
			{
				System.out.println(" Task " + idx + " failed.");
			}
			throw new Exception("Task " + Integer.toString(idx) + " failed, retry.");
		}
		else
		{
			synchronized(System.out)
			{
				System.out.println(" Task " + idx + " done.");
			}
			ThreadPoolDemo.jobs.remove(idx);
		}
	}
}

Eclipse工程,欢迎共同研究!

One Response

  • argeric says:

    你好,你卖java线程第三版不,现在买不到了,读英文版的又有点难。不卖借我复印一下也可以,谢了qq:995888399

Leave a Comment

(Necessary)

(Necessary, will not be published)

Please note: Comment moderation is enabled and may delay your comment. There is no need to resubmit your comment.