串行任务并行化处理

遇到数返回慢,从程序角度分析优化。

  • 底层数据查询优化(sql)优化,减少内存运算操作。
  • 增加缓存,减少数据查询(如:使用redis)。
  • 分析业务流水,将串行处理的工作并行处理。

本文主要介绍串行任务并行化处理。

前提

一个主任务依赖于不同的子任务结果,而各子任务之间相互隔离(不相关)。这样,可以考虑将子任务并行处理。这就会用到线程相关知识,然后就就是线程池等。
思路演变过程:

  • 对不同的子任务启动线程来处理(通过实现Runnable处理)。

问题1:如何从线程中获取返回值?
思路:构造函数设值(传入的对象要和线程的私有变量指向同一堆内存)。

问题2:如何实现分步结束后,主任务才能执行。
思路:使用CountDownLatch计数器实现线程等待。

  • 使用Callable返回结果,通过Future的get方法获取线程返回结果。

其中,Future的get方法是阻塞式的,这样,主线程就会一直等待,知道所有子线程的返回。
最终,也是选择了此乃方法。

  • 线程池管理

线程的生命周期管理,交给线程池处理。这里使用 java.util.concurrent.ExecutorService处理,线程池使用newCachedThreadPool线程池。

线程池

名称 介绍
newCachedThreadPool 缓存型池子,先查看池中有没有以前建立的线程,如果有,就reuse.如果没有,就建一个新的线程加入池中缓存型池子通常用于执行一些生存期很短的异步型任务因此在一些面向连接的daemon型SERVER中用得不多。能reuse的线程,必须是timeout IDLE内的池中线程,缺省timeout是60s,超过这个IDLE时长,线程实例将被终止及移出池。注意,放入CachedThreadPool的线程不必担心其结束,超过TIMEOUT不活动,其会自动被终止
newFixedThreadPool -newFixedThreadPool与cacheThreadPool差不多,也是能reuse就用,但不能随时建新的线程。-其独特之处:任意时间点,最多只能有固定数目的活动线程存在,此时如果有新的线程要建立,只能放在另外的队列中等待,直到当前的线程中某个线程终止直接被移出池子。-和cacheThreadPool不同,FixedThreadPool没有IDLE机制(可能也有,但既然文档没提,肯定非常长,类似依赖上层的TCP或UDP IDLE机制之类的),所以FixedThreadPool多数针对一些很稳定很固定的正规并发线程,多用于服务器。-从方法的源代码看,cache池和fixed 池调用的是同一个底层池,只不过参数不同:fixed池线程数固定,并且是0秒IDLE(无IDLE)cache池线程数支持0-Integer.MAX_VALUE(显然完全没考虑主机的资源承受能力),60秒IDLE
ScheduledThreadPool 调度型线程池。这个池子里的线程可以按schedule依次delay执行,或周期执行
SingleThreadExecutor -单例线程,任意时间池中只能有一个线程;用的是和cache池和fixed池相同的底层池,但线程数目是1-1,0秒IDLE(无IDLE)

代码

辅助类:工人实体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com.zhq.threadpool;
/**
* 工人
* @author alanzhang211
*
*/
public class Worker {
private Integer age;
private String name;
public Integer getAge() {
return age;
}
public Worker(){};
public Worker(String name,Integer age) {
this.name = name;
this.age = age;
}
@Override
public String toString(){
return this.getName() + “年龄:” + this.getAge();
}
public void setAge(Integer age) {
this.age = age;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}

Thread实现

Thread工作线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package com.zhq.threadpool;
import java.util.concurrent.CountDownLatch;
/**
* Thread实现
* @author alanzhang211
*
* @param <T>
*/
public class MyThread<T> implements Runnable{
/**
* 计数器
*/
private CountDownLatch latch;
/**
* 操作算子
*/
private String opType;
/**
* 需要返回的数据
*/
private T data;
/**
* 索引
*/
private Integer index;
public MyThread(CountDownLatch latch,String opType,T data,Integer indexParam) {
this.latch = latch;
this.opType = opType;
this.data = data;
this.index = indexParam;
}
@SuppressWarnings(“unchecked”)
@Override
public void run() {
if (opType != null && opType.equals(“Worker”)) {
try {
Thread.sleep(3000);//模拟线程工作
Worker tempWorker = new Worker(“工作后:”,(index+1));
data = (T) tempWorker;
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(data+”工作线程:”+Thread.currentThread().getName());
}
latch.countDown();
}
}

Callable实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package com.zhq.threadpool;
import java.util.concurrent.Callable;
/**
*
* @author alanzhang211
*
* @param <V>
*/
public class MyCallable<V> implements Callable<V> {
private V data;
public MyCallable(V data) {
super();
this.data = data;
}
@Override
public V call() throws Exception {
Thread.sleep(3000);
System.out.println(data+”工作线程:”+Thread.currentThread().getName());
return data;
}
}

测试类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
package com.zhq.threadpool;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/**
*
* @author alanzhang211
*
*/
public class ThreadPoolExecutorTest {
/**
* Thread实现
*/
public void threadMethod(){
ExecutorService executorService = Executors.newCachedThreadPool();
int workerNum = 5;
CountDownLatch latch = new CountDownLatch(workerNum);//工作线程
MyThread<?> worker = null;
Object data = null;
for (int i = 0; i < workerNum; i++) {
data = new Worker();
((Worker) data).setAge(i);
((Worker) data).setName(“工人”+i);
System.out.println(“工作前:”+data.toString());
worker = new MyThread<Worker>(latch, “Worker”, (Worker)data,i);
executorService.execute(worker);
System.out.println(“worker返回:”+worker.toString());
try {
Thread.sleep(1000);//模拟主线程业务处理1s
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(“=====”+worker.toString()+”结束工作”);
}
}
/**
* callable实现
*/
public void callableMethod(){
ExecutorService executorService = Executors.newCachedThreadPool();
Worker worker = null;
List<Future<?>> resultList = new ArrayList<Future<?>>();
for (int i = 0; i < 5; i++) {
worker = new Worker();
worker.setAge(i);
worker.setName(“工人”+i);
Future<Worker> future = executorService.submit(new MyCallable<Worker>(worker));
resultList.add(future);
System.out.println(“=====”+worker.toString()+”开始工作”);
try {
//get()方法阻塞式,阻塞main 主线程
// System.out.println(future.get());
Thread.sleep(1000);//模拟主线程业务处理1s
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(“=====”+worker.toString()+”结束工作”);
}
Future<Integer> futureInt = executorService.submit(new MyCallable<Integer>(0));
resultList.add(futureInt);
//增加Integer包装类
Future<Integer> futureInteger = executorService.submit(new MyCallable<Integer>(new Integer(1)));
resultList.add(futureInteger);
try {
for (Future<?> futuretemp : resultList) {
System.out.println(“get获取线程返回的结果:”+futuretemp.get());
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws Exception {
long startTime = System.currentTimeMillis();
ThreadPoolExecutorTest test = new ThreadPoolExecutorTest();
test.callableMethod();
// test.threadMethod();
System.out.println(“耗时:”+(System.currentTimeMillis() – startTime ) + “ms”);
}
}
Alan Zhang wechat
欢迎您扫一扫上面的微信公众号“补愚者说”,订阅我的博客!