dubbo之RpcContext

dubbo之RpcContext

RpcContext 是一个 ThreadLocal 的临时状态记录器,当接收到 RPC 请求,或发起 RPC 请求时,RpcContext 的状态都会变化。比如:A 调 B,B 再调 C,则 B 机器上,在 B 调 C 之前,RpcContext 记录的是 A 调 B 的信息,在 B 调 C 之后,RpcContext 记录的是 B 调 C 的信息。

使用

消费端

// 远程调用之前,通过attachment传KV给提供方
RpcContext.getContext().setAttachment("userKey", "userValue");
// 远程调用
xxxService.xxx();
// 本端是否为消费端,这里会返回true
boolean isConsumerSide = RpcContext.getContext().isConsumerSide();
// 获取最后一次调用的提供方IP地址
String serverIP = RpcContext.getContext().getRemoteHost();
// 获取当前服务配置信息,所有配置信息都将转换为URL的参数
String application = RpcContext.getContext().getUrl().getParameter("application");
// 注意:每发起RPC调用,上下文状态会变化
yyyService.yyy();
// 此时 RpcContext 的状态已变化 
RpcContext.getContext();

提供方

public class XxxServiceImpl implements XxxService {

    public void xxx() {
        // 通过RpcContext获取用户传参,这里会返回userValue
        String value = RpcContext.getContext().getAttachment("userKey");
        // 本端是否为提供端,这里会返回true
        boolean isProviderSide = RpcContext.getContext().isProviderSide();
        // 获取调用方IP地址
        String clientIP = RpcContext.getContext().getRemoteHost();
        // 获取当前服务配置信息,所有配置信息都将转换为URL的参数
        String application = RpcContext.getContext().getUrl().getParameter("application");
        // 注意:每发起RPC调用,上下文状态会变化
        yyyService.yyy();
        // 此时本端变成消费端,这里会返回false
        boolean isProviderSide = RpcContext.getContext().isProviderSide();
    } 
}

异步调用

//使用RpcContext 来实现异步调用
Future<T> future = RpcContext.getContext().asyncCall(new Callable<T>(){...});

//获取异步调用的结果,通过Future.
Future<T> future = RpcContext.getContext().getFuture();

原理

消费端在执行Rpc调用之前,经过Filter处理, 会将信息写入RpcContext.
ConsumerContextFilter:

RpcContext.getContext()
        setInvoker(invoker)
        .setInvocation(invocation)
        .setLocalAddress(NetUtils.getLocalHost(), 0)
        .setRemoteAddress(invoker.getUrl().getHost(), invoker.getUrl().getPort());

服务端在执行调用之前,也会经过Filter处理,将信息写入RpcContext. 见ContextFilter

RpcContext.getContext()
        .setInvoker(invoker)
        .setInvocation(invocation)
        .setAttachments(attachments) //attachment 来自于RpcInvocation中的attachment
        .setLocalAddress(invoker.getUrl().getHost(), invoker.getUrl().getPort());

provider端的remoteAddress是从tcp连接中获取的,见DubboProtocol类:

RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());

因此每发起RPC调用,上下文状态会变化。

消费者在执行调用之前(AbstractInvoker),会将RpcContext中的内容写入到invocation,实现参数传递

Map<String, String> context = RpcContext.getContext().getAttachments();
if (context != null) {
    invocation.addAttachmentsIfAbsent(context);  //只新增,不修改
}

在异步实现中,将async参数设置为true, 也会传递到提供者来实现异步调用。

setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString());

DubboInvoker在执行调用的时候,会判断ASYNC_KEY是否为true,
如果是,则会向context中写入future对象:

...
else if (isAsync) {
    ResponseFuture future = currentClient.request(inv, timeout) ;
    RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
    return new RpcResult();
}
...

不等待调用结果返回。以实现消费端的异步调用。

判断是否为Provider或者Consumer则是通过remoteAdress和localAddress做对比来实现的。
源码如下:

// address为remoteAddress,在调用之前Filter中写入
String host;
if (address.getAddress() == null) {
    host = address.getHostName();
} else {
    host = address.getAddress().getHostAddress();
}
return url.getPort() == address.getPort() && 
        NetUtils.filterLocalHost(url.getIp()).equals(NetUtils.filterLocalHost(NetUtils.getIpByHost(host)));

相关推荐