Java并发编程二三事
作者:Katsura 发布时间:[ 2016/9/1 11:38:21 ] 推荐标签:并发编程 Java
信号量
用于线程的同步,类似PV操作。当计数减少到0的时候acquire()会堵塞,并且直到有其他线程调用release()线程释放信号量或者线程被中断。
1 /**
2 * Semaphore
3 */
4 public void semaphore() {
5 Semaphore sem = new Semaphore(1);
6 int nThread = 5;
7 for (int i = 0; i < nThread; i++) {
8 Thread t = new Thread() {
9 public void run() {
10 try {
11 sem.acquire();
12 int random = (int) (5 * Math.random());
13 sleep(random);
14 System.out.println(currentThread().getId());
15 sem.release();
16 } catch (InterruptedException e) {
17 e.printStackTrace();
18 }
19 }
20 };
21 t.start();
22 }
23 }
栅栏
与闭锁类似,线程会堵塞在await()方法,直到一定数量的线程到达或者线程被中断。在一定数量的线程到达后,栅栏打开,这批线程可以从堵塞中恢复,然后栅栏再次关闭。
1 /**
2 * 栅栏
3 * @return
4 * @throws InterruptedException
5 */
6 public long cyclicBarrier() throws InterruptedException {
7 int nThreads = 25;
8 final CyclicBarrier barrier = new CyclicBarrier(5, new Runnable() {
9 @Override
10 public void run() {
11 System.out.println("Barrier pass...");
12 }
13 });
14
15 for (int i = 0; i < nThreads; i++) {
16 Thread t = new Thread() {
17 public void run() {
18 try {
19 barrier.await();
20 sleep(1000);
21 } catch (BrokenBarrierException | InterruptedException e) {
22 e.printStackTrace();
23 }
24 }
25 };
26 t.start();
27 }
28 }
稍微高级一点的内容
CompletionService
作为Executor的包装,主要用来在执行多线程的时候获取返回的执行结果进行处理。至于ExecutorService线程池的种类和相关配置请参考JDK文档。
1 /**
2 * CompletionService
3 */
4 public void completionService() {
5 int nThread = 5;
6 ExecutorService executor = Executors.newFixedThreadPool(5);
7 CompletionService completionService = new ExecutorCompletionService(executor);
8
9 for (int i = 0; i < nThread; i++) {
10 completionService.submit(new Callable<String>() {
11 @Override
12 public String call() throws Exception {
13 if (Thread.currentThread().isInterrupted()) {
14 return "interrupted";
15 }
16 int st = (int)(Math.random() * 5000);
17 Thread.sleep(st);
18 return Thread.currentThread().getId() + ":" + st;
19 }
20 });
21 }
22
23 for (int i = 0; i < nThread; i++) {
24 try {
25 Future<String> f = completionService.take();
26 System.out.println("Round" + i);
27 System.out.println(f.get());
28 } catch (InterruptedException | ExecutionException e) {
29 e.printStackTrace();
30 }
31
32 }
33 }
设定执行时间的任务
下面这个例子只是说明可以这样做,但是并不建议这样处理,因为在非调用线程取消一个线程是一个不太合理的处理方式,好是让调用者取消,这样调用者还可以进行下一步的处理。 另外这种方式看起来并不优雅。
1 /**
2 * 取消: 即使任务不响应中断,运行的方法仍能够返回到他的调用者。在任务启动以后偶timedRun执行一个的join方法,
3 * 在join返回后,将检查是否有异常抛出,有的话再次抛出异常,由于Throwable在两个线程之间共享,所以设置为volatile
4 */
5 public void timeRun(final Runnable r) throws Throwable {
6 ScheduledExecutorService cancelExec = Executors.newScheduledThreadPool(5);
7 class RethrowableTask implements Runnable {
8 private volatile Throwable t;
9 @Override
10 public void run() {
11 try {
12 r.run();
13 } catch (Throwable t) {
14 this.t = t;
15 }
16 }
17 void rethrow() throws Throwable {
18 if (t != null) {
19 throw t;
20 }
21 }
22 }
23
24 RethrowableTask task = new RethrowableTask();
25 final Thread taskThread = new Thread(task);
26 taskThread.start();
27 cancelExec.schedule(() -> taskThread.interrupt(), 100000, TimeUnit.MILLISECONDS);
28 taskThread.join(10000);
29 task.rethrow();
30 }
下面是通过Future带时间的get()方法实现的,有时限的任务,可以对比一下,下面这个方式显然要优雅很多。
1 public void betterTimeRun(Runnable r) throws Throwable {
2 ExecutorService executorService = Executors.newFixedThreadPool(5);
3 Future<?> task = executorService.submit(r);
4 try {
5 task.get(10000, TimeUnit.MILLISECONDS);
6 } catch (TimeoutException e) {
7 // 在finally中被取消
8 } catch (ExecutionException e) {
9 throw e.getCause();
10 } finally {
11 task.cancel(true);
12 }
13 }
在try…catch…finally中取消任务。
UncaughtExcption的处理
Thread的run方法是不抛出非检查异常的,所以外部的try…catch也无法捕获,有时候这会导致一些问题,比如资源没有被释放。
但是我们可以通过Thread的实例方法setUncaughtExceptionHandler去为任何一个线程设置一个 UncaughtExceptionHandler。当然也可以调用Thread类的静态方法setUncaughtExceptionHandler去 为所有线程设置一个UncaughtExceptionHandler。接口示例如下。
1 class MyHandler implements Thread.UncaughtExceptionHandler {
2 /**
3 * 对于unchecked异常,可以实现UncaughtExceptionHandler接口,当一个线程由于未捕获异常而退出时,JVM会把这个事件告报给应用程序提供的
4 * UncaughtExceptionHandler,如果没有提供任何异常处理器,那么默认的行为是将栈追踪信息输出到System.error
5 */
6 @Override
7 public void uncaughtException(Thread t, Throwable e) {
8 // Do something...
9 }
10 }
如果没有设置自己的Handler,那么JVM的默认行为是将栈信息输出到System.error。
保存被取消的任务
如果需要在关闭线程池的时候保存被取消的任务,那么可以扩展AbstractExcecutorService,对被取消的任务进行纪录,以便下次继续处理。
1 /**
2 * 当关闭线程池时保存被取消的任务
3 */
4 class TrackingExecutor extends AbstractExecutorService {
5 private final ExecutorService exec;
6 private final Set<Runnable> tasksCancelledAtShutdown = Collections.synchronizedSet(new HashSet<Runnable>());
7
8 public TrackingExecutor() {
9 this.exec = Executors.newCachedThreadPool();
10 }
11
12 public List<Runnable> getCancelledTasks() {
13 if (!exec.isTerminated()) {
14 throw new IllegalStateException("...");
15 }
16 return new ArrayList<Runnable>(tasksCancelledAtShutdown);
17 }
18
19 @Override
20 public void execute(Runnable command) {
21 exec.execute(new Runnable() {
22 @Override
23 public void run() {
24 try {
25 command.run();
26 } finally {
27 if (isShutdown() && Thread.currentThread().isInterrupted()) {
28 tasksCancelledAtShutdown.add(command);
29 }
30 }
31 }
32 });
33 }
34 }
可以看到,用一个同步的集合保存被取消的任务,任务的run方法在try的包围中,如果线程被中断或者关闭,那么将相应的任务加入 taskCancelledAtShutdown中,以上代码只覆写了exceute方法,其他方法可以委托给ExecutorService。
本文内容不用于商业目的,如涉及知识产权问题,请权利人联系SPASVO小编(021-61079698-8054),我们将立即处理,马上删除。
相关推荐
Java性能测试有哪些不为众人所知的原则?Java设计模式??装饰者模式谈谈Java中遍历Map的几种方法Java Web入门必知你需要理解的Java反射机制知识总结编写更好的Java单元测试的7个技巧编程常用的几种时间戳转换(java .net 数据库)适合Java开发者学习的Python入门教程Java webdriver如何获取浏览器新窗口中的元素?Java重写与重载(区别与用途)Java变量的分类与初始化JavaScript有这几种测试分类Java有哪四个核心技术?给 Java开发者的10个大数据工具和框架Java中几个常用设计模式汇总java生态圈常用技术框架、开源中间件,系统架构及经典案例等
更新发布
功能测试和接口测试的区别
2023/3/23 14:23:39如何写好测试用例文档
2023/3/22 16:17:39常用的选择回归测试的方式有哪些?
2022/6/14 16:14:27测试流程中需要重点把关几个过程?
2021/10/18 15:37:44性能测试的七种方法
2021/9/17 15:19:29全链路压测优化思路
2021/9/14 15:42:25性能测试流程浅谈
2021/5/28 17:25:47常见的APP性能测试指标
2021/5/8 17:01:11热门文章
常见的移动App Bug??崩溃的测试用例设计如何用Jmeter做压力测试QC使用说明APP压力测试入门教程移动app测试中的主要问题jenkins+testng+ant+webdriver持续集成测试使用JMeter进行HTTP负载测试Selenium 2.0 WebDriver 使用指南