Dubbo异步转同步
Dubbo是一款开源的RPC中间件框架,底层数据传输默认使用的Netty,那么请求的处理理论上是异步的,为什么我们在使用的时候是同步的呢?肯定是Dubbo框架,做了异步转同步的处理。
首先我们来梳理下,异步转同步,我们的需求是怎样的?
1、调用方请求远程服务之后,需要等待结果,此刻,请求线程应该阻塞
2、远程服务返回结果后,唤醒请求线程,调用方得到结果
Dubbo异步转同步,核心类是DefaultFuture,核心方法是get(),received(Channel channel, Response response)。
DefaultFuture构造函数:
private static final Map<Long, Channel> CHANNELS = new ConcurrentHashMap<Long, Channel>();
// 每次请求都会生成一个DefaultFuture对象,然后保存到FUTURES中,请求返回结果时,根据id从FUTURES中找到对应的DefaultFuture对象,并删除
private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<Long, DefaultFuture>();
// AtomicLong从0开始递增,创建Request对象时生成的id
private final long id;
private final Channel channel;
// 请求对象
private final Request request;
// 超时的设置
private final int timeout;
// 这里使用Lock和Condition实现等待通知机制
private final Lock lock = new ReentrantLock();
private final Condition done = lock.newCondition();
private final long start = System.currentTimeMillis();
private volatile long sent;
// 请求的返回结果
private volatile Response response;
private volatile ResponseCallback callback;
public DefaultFuture(Channel channel, Request request, int timeout) {
this.channel = channel;
this.request = request;
this.id = request.getId();
this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
// put into waiting map.
FUTURES.put(id, this);
CHANNELS.put(id, channel);
}get():
public Object get(int timeout) throws RemotingException {
if (timeout <= 0) {
timeout = Constants.DEFAULT_TIMEOUT;
}
// isDone()方法就是判断Response是否有值(即是否有返回结果)
if (!isDone()) {
long start = System.currentTimeMillis();
lock.lock();
try {
while (!isDone()) {
// 超时等待
done.await(timeout, TimeUnit.MILLISECONDS);
// 如果有返回结果了,或者,超时了,就退出循环
if (isDone() || System.currentTimeMillis() - start > timeout) {
break;
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
// 如果是超时了,就抛出异常
if (!isDone()) {
throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
}
}
// 远程服务正常返回结果,则返回给调用方
return returnFromResponse();
}received(Channel channel, Response response):
public static void received(Channel channel, Response response) {
try {
// 根据请求id从FUTURES中获取DefaultFuture,并删除
DefaultFuture future = FUTURES.remove(response.getId());
if (future != null) {
future.doReceived(response);
} else {
logger.warn("The timeout response finally returned at "
+ (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
+ ", response " + response
+ (channel == null ? "" : ", channel: " + channel.getLocalAddress()
+ " -> " + channel.getRemoteAddress()));
}
} finally {
// CHANNELS也删除
CHANNELS.remove(response.getId());
}
}private void doReceived(Response res) {
lock.lock();
try {
response = res;
if (done != null) {
// 唤醒阻塞的线程
done.signal();
}
} finally {
lock.unlock();
}
if (callback != null) {
invokeCallback(callback);
}
}总结:Dubbo异步转同步的原理,其实就是利用Lock和Condition实现了等待通知机制。请求与返回结果进行匹配,则是通过传递以及接收请求id实现的。