【详解】Java多线程之Active Object设计模式
一、分析
Active是主动的意思,因此ActiveObject就是主动对象的意思。所谓主动一般指有自己特有的线程,举例来说,java.lang.Thread类的实例就是一种主动对象。
不过,在Active Object模式中出厂的主动对象可不仅仅有自己特有的线程,它同时还具备可以从外部接收和处理异步消息并根据需要返回处理结果的特征。
Active Object模式中的主动对象会通过自己特有的线程在合适的时机处理从外部接收到的异步消息。
在Active Object中,组成主动对象与许多自然人组成法人类似,即使是java语言这样没有异步消息的编程语言,也可以使用Active Object模式组成实际上能够处理异步消息的主动对象。
Active Object模式的主要目的是,
主要作用:
- 接受
异步
消息的主动方法 - 实现方法
调用和执行分离到不同的线程中
,这样可以提高调用方的响应速度 - 这和之前我们说的
生产者-消费者模式
,Worker-Thread模式
,Future模式
非常相似。 - 同时执行方的执行策略对调用方透明,达到双方互不干扰。
用户执行命令,系统会将命令放入到队列中,多个线程去依次执行这些命令
二、实现一个ActiveObject 设计模式
需求:
- 调用两个命令,一个是展示结果,一个是拼装字符串
ActiveObject 核心思想
主要是分为以下几个角色
真实的执行者
:负责真正的执行命令指令接收者
:负责接收用户的指令,将指令放入指令队列工厂
:负责组装所有的零件,包括:执行者,指令队列,指令接收者,指令的分发者,结果集
public interface ActiveObject {
Result makeString(int count,char fillChar) throws InterruptedException;
void displayString(String text) throws InterruptedException;
}
指令接收者
public class ActiveObjectProxy implements ActiveObject{
private final SchedulerThread schedulerThread;
private final Servant servant;
public ActiveObjectProxy(SchedulerThread schedulerThread, Servant servant) {
this.schedulerThread = schedulerThread;
this.servant = servant;
}
@Override
public Result makeString(int count, char fillChar) throws InterruptedException {
FutureResult futureResult = new FutureResult();
schedulerThread.invoke(new MakeStringRequest(servant,futureResult,count,fillChar));
return futureResult;
}
@Override
public void displayString(String text) throws InterruptedException {
schedulerThread.invoke(new DisplayStringRequest(servant,text));
}
}
工厂
public final class ActiveObjectFactory {
private ActiveObjectFactory(){
}
public static ActiveObject createActiveObject(){
Servant servant = new Servant();
ActivationQueue queue = new ActivationQueue();
SchedulerThread schedulerThread = new SchedulerThread(queue);
ActiveObjectProxy proxy = new ActiveObjectProxy(schedulerThread,servant);
schedulerThread.start();
return proxy;
}
}
真实的执行者
/** * 真正做事情的 */
class Servant implements ActiveObject {
@Override
public Result makeString(int count, char fillChar) {
char[] buf = new char[count];
for (int i = 0; i < count; i++) {
buf[i] = fillChar;
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return new RealResult(new String(buf));
}
@Override
public void displayString(String text) {
try {
System.out.println("Display: " + text);
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
指令的接收与分发
采用的生产者与消费者设计模式,有下面几个角色:
指令队列
:相当于流水线,或者吧台。消费者执行指令,生产者将指令放在上面生产者
:是上面的指令接收者。消费者
:不断的遍历命令,执行命令指令的封装
:将指令和外界调用时传递的参数进行封装,交给执行者执行
封装了展示结果的方法
public class DisplayStringRequest extends MethodRequest {
private final String text;
/** * 没有返回值,不需要Future * * @param servant * @param text */
public DisplayStringRequest(Servant servant, final String text) {
super(servant, null);
this.text = text;
}
@Override
public void execute() {
servant.displayString(text);
}
}
封装了拼接字符串的方法
/** * {@link ActiveObject#makeString(int, char)} */
public class MakeStringRequest extends MethodRequest {
private final int count;
private final char fillChar;
public MakeStringRequest(Servant servant, FutureResult futureResult, int count, char fillChar) {
super(servant, futureResult);
this.count = count;
this.fillChar = fillChar;
}
/** * 执行方法 */
@Override
public void execute() {
Result result = servant.makeString(count, fillChar);
futureResult.setResult(result);
}
}
一个指令封装的标准
/** * 对用ActiveObject的每一个方法 */
public abstract class MethodRequest {
protected final Servant servant;
protected final FutureResult futureResult;
protected MethodRequest(Servant servant, FutureResult futureResult) {
this.servant = servant;
this.futureResult = futureResult;
}
public abstract void execute();
}
指令队列
/** * 类似于生产者与消费者队列 */
public class ActivationQueue {
private final static int MAX_QUEUE_SIZE = 100;
//命令队列
private final LinkedList<MethodRequest> methodQueue ;
public ActivationQueue() {
this.methodQueue = new LinkedList<>();
}
public synchronized void put(MethodRequest request) throws InterruptedException {
while (methodQueue.size()>=MAX_QUEUE_SIZE){
this.wait();
}
this.methodQueue.addLast(request);
notifyAll();
}
public synchronized MethodRequest take() throws InterruptedException {
while (methodQueue.isEmpty()){
this.wait();
}
MethodRequest methodRequest = methodQueue.removeFirst();
this.notifyAll();
return methodRequest;
}
}
消费者
public class SchedulerThread extends Thread {
private final ActivationQueue activationQueue;
public SchedulerThread(ActivationQueue activationQueue) {
this.activationQueue = activationQueue;
}
/** * 不断的调用任务队列里的任务 */
@Override
public void run() {
while (!this.isInterrupted()){
try {
activationQueue.take().execute();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public void invoke(MethodRequest request) throws InterruptedException {
this.activationQueue.put(request);
}
}
结果的封装和返回
如果说指令的分发是将用户的参数传递给执行者,结果的封装就是将结果异步的返回给用户,采用的是Future设计模式
下面有几个角色:
Future
:相当于***,用户可以利用这个***,异步的获取里面的结果结果集
:是方法调用完成后,将结果返回设置在Future中
Future设计接口
public interface Result {
Object getResultValue() throws InterruptedException;
}
存储结果的
/** * Future模式 */
public class RealResult implements Result {
private final Object resultValue;//结果
public RealResult(Object resultValue) {
this.resultValue = resultValue;
}
@Override
public Object getResultValue() {
return resultValue;
}
}
一个票据,供其他线程异步取结果
public class FutureResult implements Result{
private Result result;
private boolean ready = false;
public synchronized void setResult(Result result){
this.ready = true;
this.result = result;
this.notifyAll();
}
@Override
public synchronized Object getResultValue() throws InterruptedException {
while (!ready){
this.wait();
}
return this.result.getResultValue();
}
}