RocketMq 在Netty 下是如何进行消息封装传输

前言:

要了解rocketMq 需要知道 数据在 rocketMq 中 是如何进行传输,在底层的结构到底是一个什么亚子,这个需要我们对Netty 对字符编解码有一些了解。

开始:

我们从生产者发送消息,broker 接收消息 为例,来开展底层对消息结构。

1. 如下是一段生产者发送消息的代码,这里我们进去第5步看发送消息的流程。

     //1. 初始化 mq producer
        DefaultMQProducer mqProducer =new DefaultMQProducer("iscys-test");
        //2.设置nameServer 地址
        mqProducer.setNamesrvAddr("localhost:9876");
        //3. 开启mq producer,这一步是必须的,会做一些连接初始化检测工作
        mqProducer.start();
        //4.创建 Message
        Message msg = new Message("test-topis", "iscys-test".getBytes());
        //5.发送消息
        mqProducer.send(msg, new SendCallback() {

            @Override
            public void onSuccess(SendResult sendResult) {
                //在消息发送成功之后,我们收到broker的响应通知后,会进行回调
                System.out.println("send success");
            }

            @Override
            public void onException(Throwable e) {
                System.out.println("send fail");

            }
        });

  2.消息发送必须经过如下代码,将消息组装成 RemotingCommand 对象,无论是发送还是服务端返回消息,都会封装成这个对象。

public SendResult sendMessage(
        final String addr,
        final String brokerName,
        final Message msg,
        final SendMessageRequestHeader requestHeader,
        final long timeoutMillis,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final TopicPublishInfo topicPublishInfo,
        final MQClientInstance instance,
        final int retryTimesWhenSendFailed,
        final SendMessageContext context,
        final DefaultMQProducerImpl producer
    ) throws RemotingException, MQBrokerException, InterruptedException {     //1. 初始化RemotingCommand 对象
        RemotingCommand request = null;     //2.设置消息头
        if (sendSmartMsg || msg instanceof MessageBatch) {      //多条message
            SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
            request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
        } else {      //单条message
            request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
        }
      // 3.将 message 内容放入request 中
        request.setBody(msg.getBody());
      //4.发送消息
        switch (communicationMode) {
            case ONEWAY:
                this.remotingClient.invokeOneway(addr, request, timeoutMillis);
                return null;
            case ASYNC:
                final AtomicInteger times = new AtomicInteger();
                this.sendMessageAsync(addr, brokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance,
                    retryTimesWhenSendFailed, times, context, producer);
                return null;
            case SYNC:
                return this.sendMessageSync(addr, brokerName, msg, timeoutMillis, request);
            default:
                assert false;
                break;
        }

        return null;
    }

  3. 关于 RemotingCommand 远程命令 对象,我们看一下它的组成与结构

   // 业务code  对应的是 RequestCode 这个常量池的code ,基本每一种业务类型都会对应一个code码,接收端通过code 进行做不同的处理
   private int code;
  //Java 语言版本
  private LanguageCode language = LanguageCode.JAVA;
  //version 版本信息
   private int version = 0;
  // 消息唯一id ,这个id 会关联 response
  private int opaque = requestId.getAndIncrement();
  //用来标记这个消息是发送消息的消息还是返回的消息,
    private int flag = 0;
  //备注的信息,比如一些错误注意信息等
    private String remark;
  //附带额外的信息
    private HashMap<String, String> extFields;
  //请求头信息,基本每一种业务类型都会对应一个请求头类
    private transient CommandCustomHeader customHeader;
  //json
  private SerializeType serializeTypeCurrentRPC = serializeTypeConfigInThisServer;
  //msg body 信息
  private transient byte[] body;

从这个消息体中,基本上关于消息的所有信息,都知道了。

4.接下来就是发送消息了:

public void invokeAsync(String addr, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback)
        throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException,
        RemotingSendRequestException {
        //1.获取与broker的连接channel ,没有的话则创建
        final Channel channel = this.getAndCreateChannel(addr);
        if (channel != null && channel.isActive()) {
            try {
                if (this.rpcHook != null) {
                    //2.hook 前置钩子函数调用
                    this.rpcHook.doBeforeRequest(addr, request);
                }
                //3.发送消息(channel 连接对象,RemotingCommand 对象,超时时间,回调函数)
                this.invokeAsyncImpl(channel, request, timeoutMillis, invokeCallback);
            } catch (RemotingSendRequestException e) {
                log.warn("invokeAsync: send request exception, so close the channel[{}]", addr);
                this.closeChannel(addr, channel);
                throw e;
            }
        } else {
            this.closeChannel(addr, channel);
            throw new RemotingConnectException(addr);
        }
    }

  5.设置response 对象设置,方便进行发送成功后的回调,进行真实发送

public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis,
        final InvokeCallback invokeCallback)
        throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
        //1.获取消息id
        final int opaque = request.getOpaque();
        boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
        if (acquired) {
            final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
            //2.创建response对象
            final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, invokeCallback, once);
            //3.一个消息id 一个response 对象,放入responseTable中
            this.responseTable.put(opaque, responseFuture);
            try {
                //4.Netty API 将消息发送至服务端,并设置发送监听
                channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture f) throws Exception {
                        if (f.isSuccess()) {
                            responseFuture.setSendRequestOK(true);
                            return;
                        } else {
                            responseFuture.setSendRequestOK(false);
                        }

                        responseFuture.putResponse(null);
                        responseTable.remove(opaque);
                        try {
                            executeInvokeCallback(responseFuture);
                        } catch (Throwable e) {
                            log.warn("excute callback in writeAndFlush addListener, and callback throw", e);
                        } finally {
                            responseFuture.release();
                        }

                        log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));
                    }
                });
            } catch (Exception e) {
                responseFuture.release();
                log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e);
                throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
            }
        } else {
            if (timeoutMillis <= 0) {
                throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast");
            } else {
                String info =
                    String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",
                        timeoutMillis,
                        this.semaphoreAsync.getQueueLength(),
                        this.semaphoreAsync.availablePermits()
                    );
                log.warn(info);
                throw new RemotingTimeoutException(info);
            }
        }
    }

  

相关推荐