5个并发处理技巧

2017 年 9 月 7 日 CSDN大数据
↑ 点击上方蓝字关注我们,和小伙伴一起聊技术!


在本文中,作者总结出了5个关于处理并发性程序的技巧,并给出代码示例,让读者更好地理解和使用这5种方法。


以下为译文:


1. 捕获InterruptedException错误


请检查下面的代码片段:


public class Task implements Runnable {

  private final BlockingQueue queue = ...;

  @Override

  public void run() {

    while (!Thread.currentThread().isInterrupted()) {

      String result = getOrDefault(() -> queue.poll(1L, TimeUnit.MINUTES), "default");

      //do smth with the result

    }

  }

   T getOrDefault(Callable supplier, T defaultValue) {

    try {

      return supplier.call();

    } catch (Exception e) {

      logger.error("Got exception while retrieving value.", e);

      return defaultValue;

    }

  }

}


代码的问题是,在等待队列中的新元素时,是不可能终止线程的,因为中断的标志永远不会被恢复:


  • 运行代码的线程被中断。

  • BlockingQueue # poll()方法抛出InterruptedException异常,并清除了中断的标志。

  • while中的循环条件 (!Thread.currentThread().isInterrupted())的判断是true,因为标记已被清除。


为了防止这种行为,当一个方法被显式抛出(通过声明抛出InterruptedException)或隐式抛出(通过声明/抛出一个原始异常)时,总是捕获InterruptedException异常,并恢复中断的标志。


 T getOrDefault(Callable supplier, T defaultValue) {

  try {

    return supplier.call();

  } catch (InterruptedException e) {

    logger.error("Got interrupted while retrieving value.", e);

    Thread.currentThread().interrupt();

    return defaultValue;

  } catch (Exception e) {

    logger.error("Got exception while retrieving value.", e);

    return defaultValue;

  }

}


2.使用特定的执行程序来阻止操作


因为一个缓慢的操作而使整个服务器变得无响应,这通常不是开发人员想要的。不幸的是,对于RPC,响应时间通常是不可预测的。


假设服务器有100个工作线程,有一个端点,称为100 RPS。在内部,它发出一个RPC调用,通常需要10毫秒。在某个时间点,此RPC的响应时间变为2秒,在峰值期间服务器能够做的惟一的一件事就是等待这些调用,而其他端点则无法访问。


@GET

@Path("/genre/{name}")

@Produces(MediaType.APPLICATION_JSON)

public Response getGenre(@PathParam("name") String genreName) {

  Genre genre = potentiallyVerySlowSynchronousCall(genreName);

  return Response.ok(genre).build();

}


解决这个问题最简单的方法是提交代码,它将阻塞调用变成一个线程池:


@GET

@Path("/genre/{name}")

@Produces(MediaType.APPLICATION_JSON)

public void getGenre(@PathParam("name") String genreName, @Suspended AsyncResponse response) {

  response.setTimeout(1L, TimeUnit.SECONDS);

  executorService.submit(() -> {

    Genre genre = potentiallyVerySlowSynchronousCall(genreName);

    return response.resume(Response.ok(genre).build());

  });

}


3. 传MDC的值


MDC(Mapped Diagnostic Context)通常用于存储单个任务的特定值。例如,在web应用程序中,它可能为每个请求存储一个请求id和一个用户id,因此MDC查找与单个请求或整个用户活动相关的日志记录变得更加容易。


2017-08-27 14:38:30,893 INFO [server-thread-0] [requestId=060d8c7f, userId=2928ea66] c.g.s.web.Controller - Message.


可是如果代码的某些部分是在专用线程池中执行的,则线程(提交任务的线程)中MDC就不会被继续传值。在下面的示例中,第7行的日志中包含“requestId”,而第9行的日志则没有:


@GET

@Path("/genre/{name}")

@Produces(MediaType.APPLICATION_JSON)

public void getGenre(@PathParam("name") String genreName, @Suspended AsyncResponse response) {

  try (MDC.MDCCloseable ignored = MDC.putCloseable("requestId", UUID.randomUUID().toString())) {

    String genreId = getGenreIdbyName(genreName); //Sync call

    logger.trace("Submitting task to find genre with id '{}'.", genreId); //'requestId' is logged

    executorService.submit(() -> {

      logger.trace("Starting task to find genre with id '{}'.", genreId); //'requestId' is not logged

      Response result = getGenre(genreId) //Async call

          .map(artist -> Response.ok(artist).build())

          .orElseGet(() -> Response.status(Response.Status.NOT_FOUND).build());

      response.resume(result);

    });

  }

}


这可以通过MDC#getCopyOfContextMap()方法来解决:


...

public void getGenre(@PathParam("name") String genreName, @Suspended AsyncResponse response) {

  try (MDC.MDCCloseable ignored = MDC.putCloseable("requestId", UUID.randomUUID().toString())) {

    ...

    logger.trace("Submitting task to find genre with id '{}'.", genreId); //'requestId' is logged

    withCopyingMdc(executorService, () -> {

      logger.trace("Starting task to find genre with id '{}'.", genreId); //'requestId' is logged

      ...

    });

  }

}

private void withCopyingMdc(ExecutorService executorService, Runnable function) {

  Map


4.更改线程名称


为了简化日志读取和线程转储,可以自定义线程的名称。这可以通过创建ExecutorService时用一个ThreadFactory来完成。在流行的实用程序库中有许多ThreadFactory接口的实现:


    com.google.common.util.concurrent.ThreadFactoryBuilde+r in Guava.

    org.springframework.scheduling.concurrent.CustomizableThreadFactory in Spring.

    org.apache.commons.lang3.concurrent.BasicThreadFactory in Apache Commons Lang 3.


ThreadFactory threadFactory = new BasicThreadFactory.Builder()

  .namingPattern("computation-thread-%d")

  .build();

ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads, threadFactory);


尽管ForkJoinPool不使用ThreadFactory接口,但也支持对线程的重命名:


ForkJoinPool.ForkJoinWorkerThreadFactory forkJoinThreadFactory = pool -> {  

  ForkJoinWorkerThread thread = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);  

  thread.setName("computation-thread-" + thread.getPoolIndex());  

  return thread;

};

ForkJoinPool forkJoinPool = new ForkJoinPool(numberOfThreads, forkJoinThreadFactory, null, false);


将线程转储与默认命名进行比较:


"pool-1-thread-3" #14 prio=5 os_prio=31 tid=0x00007fc06b19f000 nid=0x5703 runnable [0x0000700001ff9000]

   java.lang.Thread.State: RUNNABLE

at com.github.sorokinigor.article.tipsaboutconcurrency.setthreadsname.TaskHandler.compute(TaskHandler.java:16)

...

"pool-2-thread-3" #15 prio=5 os_prio=31 tid=0x00007fc06aa10800 nid=0x5903 runnable [0x00007000020fc000]

   java.lang.Thread.State: RUNNABLE

at com.github.sorokinigor.article.tipsaboutconcurrency.setthreadsname.HealthCheckCallback.recordFailure(HealthChecker.java:21)

at com.github.sorokinigor.article.tipsaboutconcurrency.setthreadsname.HealthChecker.check(HealthChecker.java:9)

...

"pool-1-thread-2" #12 prio=5 os_prio=31 tid=0x00007fc06aa10000 nid=0x5303 runnable [0x0000700001df3000]

   java.lang.Thread.State: RUNNABLE

at com.github.sorokinigor.article.tipsaboutconcurrency.setthreadsname.TaskHandler.compute(TaskHandler.java:16)

    ...


与自定义命名进行比较:


"task-handler-thread-1" #14 prio=5 os_prio=31 tid=0x00007fb49c9df000 nid=0x5703 runnable [0x000070000334a000]

   java.lang.Thread.State: RUNNABLE

at com.github.sorokinigor.article.tipsaboutconcurrency.setthreadsname.TaskHandler.compute(TaskHandler.java:16)

...

"authentication-service-ping-thread-0" #15 prio=5 os_prio=31 tid=0x00007fb49c9de000 nid=0x5903 runnable [0x0000700003247000]

   java.lang.Thread.State: RUNNABLE

at com.github.sorokinigor.article.tipsaboutconcurrency.setthreadsname.HealthCheckCallback.recordFailure(HealthChecker.java:21)

at com.github.sorokinigor.article.tipsaboutconcurrency.setthreadsname.HealthChecker.check(HealthChecker.java:9)

...

"task-handler-thread-0" #12 prio=5 os_prio=31 tid=0x00007fb49b9b5000 nid=0x5303 runnable [0x0000700003144000]

   java.lang.Thread.State: RUNNABLE

at com.github.sorokinigor.article.tipsaboutconcurrency.setthreadsname.TaskHandler.compute(TaskHandler.java:16)

    ...


想象一下,可能会不止3个线程。


5. 使用LongAdder计数器


在高竞争的情况下,会采用java.util.concurrent.atomic.LongAdder进行计数,而不会采用AtomicLong/AtomicInteger。LongAdder可以跨越多个单元间仍保持值不变,但是如果需要的话,也可以增加它们的值,但与父类AtomicXX比较,这会导致更高的吞吐量,也会增加内存消耗。


LongAdder counter = new LongAdder();

counter.increment();

...

long currentValue = counter.sum();


长按识别二维码享更多精彩

登录查看更多
1

相关内容

【2020新书】实战R语言4,323页pdf
专知会员服务
100+阅读 · 2020年7月1日
Python导论,476页pdf,现代Python计算
专知会员服务
260+阅读 · 2020年5月17日
干净的数据:数据清洗入门与实践,204页pdf
专知会员服务
161+阅读 · 2020年5月14日
TensorFlow Lite指南实战《TensorFlow Lite A primer》,附48页PPT
专知会员服务
69+阅读 · 2020年1月17日
在K8S上运行Kafka合适吗?会遇到哪些陷阱?
DBAplus社群
9+阅读 · 2019年9月4日
Python奇淫技巧,5个数据可视化工具
机器学习算法与Python学习
7+阅读 · 2019年4月12日
已删除
AI科技评论
4+阅读 · 2018年8月12日
开发、调试计算机视觉代码有哪些技巧?
AI研习社
3+阅读 · 2018年7月9日
实战 | 用Python做图像处理(三)
七月在线实验室
15+阅读 · 2018年5月29日
干货 | Python 爬虫的工具列表大全
机器学习算法与Python学习
10+阅读 · 2018年4月13日
文本情感分析的预处理
Datartisan数据工匠
17+阅读 · 2018年3月8日
深度学习中的「卷积层」如何深入理解?
深度学习世界
6+阅读 · 2017年11月30日
python pandas 数据处理
Python技术博文
4+阅读 · 2017年8月30日
A survey on deep hashing for image retrieval
Arxiv
14+阅读 · 2020年6月10日
Arxiv
11+阅读 · 2018年5月13日
Arxiv
5+阅读 · 2018年5月1日
Arxiv
6+阅读 · 2018年1月14日
VIP会员
相关VIP内容
【2020新书】实战R语言4,323页pdf
专知会员服务
100+阅读 · 2020年7月1日
Python导论,476页pdf,现代Python计算
专知会员服务
260+阅读 · 2020年5月17日
干净的数据:数据清洗入门与实践,204页pdf
专知会员服务
161+阅读 · 2020年5月14日
TensorFlow Lite指南实战《TensorFlow Lite A primer》,附48页PPT
专知会员服务
69+阅读 · 2020年1月17日
相关资讯
在K8S上运行Kafka合适吗?会遇到哪些陷阱?
DBAplus社群
9+阅读 · 2019年9月4日
Python奇淫技巧,5个数据可视化工具
机器学习算法与Python学习
7+阅读 · 2019年4月12日
已删除
AI科技评论
4+阅读 · 2018年8月12日
开发、调试计算机视觉代码有哪些技巧?
AI研习社
3+阅读 · 2018年7月9日
实战 | 用Python做图像处理(三)
七月在线实验室
15+阅读 · 2018年5月29日
干货 | Python 爬虫的工具列表大全
机器学习算法与Python学习
10+阅读 · 2018年4月13日
文本情感分析的预处理
Datartisan数据工匠
17+阅读 · 2018年3月8日
深度学习中的「卷积层」如何深入理解?
深度学习世界
6+阅读 · 2017年11月30日
python pandas 数据处理
Python技术博文
4+阅读 · 2017年8月30日
Top
微信扫码咨询专知VIP会员