【详解】Java多线程之worker设计模式
分析
Master-Worker模式是常用的并行设计模式。核心思想是,系统由两个角色组成,Master和Worker
Master负责接收和分配任务
Worker负责处理子任务
- 任务处理过程中,Master还负责监督任务进展和Worker的健康状态;Master将接收Client提交的任务,并将任务的进展汇总反馈给Client。
基本框架
- 主要需要设计一个
队列
,开启多个线程 - 然后任务就
按照队列的先后顺序放在上面
- 多个线程也按照
先后顺序执行
public class Channel {
private final static int MAX_REQUEST = 100;
//流水线
private final Request[] requests;
private int head;
private int tail;
private int count;
//工人
private final WorkerThread[] workerPool;
public Channel(int workers) {
this.requests = new Request[MAX_REQUEST];
this.head = 0;
this.tail = 0;
this.count = 0;
this.workerPool = new WorkerThread[workers];
this.init();
}
private void init() {
for (int i = 0; i < workerPool.length; i++) {
workerPool[i] = new WorkerThread("worker" + i, this);
}
}
/** * 相当于启动流水线 */
public void startWorker() {
Arrays.asList(workerPool).forEach(WorkerThread::start);
}
/** * 放任务 * @param request * @throws InterruptedException */
public synchronized void put(Request request) throws InterruptedException {
while (count >= requests.length) {
this.wait();
}
this.requests[tail] = request;
this.tail = (tail + 1) % requests.length;
this.count++;
this.notifyAll();
}
/** * 拿任务 * @return * @throws InterruptedException */
public synchronized Request take() throws InterruptedException {
while (count<=0){
this.wait();
}
Request request = this.requests[head];
this.head = (this.head+1) % this.requests.length;
this.count--;
this.notifyAll();
return request;
}
}
测试
/** * 负责放任务 */
public class TransprotThread extends Thread{
private final Channel channel;
private int i=0;
private static final Random random = new Random(System.currentTimeMillis());
public TransprotThread(String name,Channel channel) {
super(name);
this.channel = channel;
}
@Override
public void run() {
while (!this.isInterrupted())
try {
Request request = new Request(getName(), i);
this.channel.put(request);
Thread.sleep(random.nextInt(1000));
i++;
} catch (InterruptedException e) {
System.out.println(getName() + " closing");
break;
}
}
}
public class WorkerThread extends Thread{
private final Channel channel;
private static final Random RANDOM = new Random(System.currentTimeMillis());
public WorkerThread(String s, Channel channel) {
super(s);
this.channel = channel;
}
@Override
public void run() {
while (!this.isInterrupted()){
try {
channel.take().execute();
Thread.sleep(RANDOM.nextInt(1000));
} catch (InterruptedException e) {
System.out.println(getName() + " closing");
break;
}
}
}
}
public class Request {
private final String name;
private final int num;
public Request(String name, int num) {
this.name = name;
this.num = num;
}
public void execute(){
System.out.println(Thread.currentThread().getName()+ " execute " + this);
}
@Override
public String toString() {
return "Request{" +
"name='" + name + '\'' +
", num=" + num +
'}';
}
}
public class Client {
public static void main(String[] args) throws InterruptedException {
final Channel channel = new Channel(5);
channel.startWorker();
new TransprotThread("Alex",channel).start();
new TransprotThread("Jack",channel).start();
new TransprotThread("William",channel).start();
Thread.sleep(10000);
Thread.currentThread().getThreadGroup().interrupt();
}
}