聊聊rocketmq的AccessChannel
序
本文主要研究一下rocketmq的AccessChannel
AccessChannel
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/AccessChannel.java
public enum AccessChannel { /** * Means connect to private IDC cluster. */ LOCAL, /** * Means connect to Cloud service. */ CLOUD, }
- AccessChannel定义了两个枚举值,分别是LOCAL及CLOUD
TraceDispatcher
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/trace/TraceDispatcher.java
public interface TraceDispatcher { /** * Initialize asynchronous transfer data module */ void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException; /** * Append the transfering data * @param ctx data infomation * @return */ boolean append(Object ctx); /** * Write flush action * * @throws IOException */ void flush() throws IOException; /** * Close the trace Hook */ void shutdown(); }
- TraceDispatcher的start方法会接收AccessChannel类型的参数
AsyncTraceDispatcher
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
public class AsyncTraceDispatcher implements TraceDispatcher { private final static InternalLogger log = ClientLogger.getLog(); private final int queueSize; private final int batchSize; private final int maxMsgSize; private final DefaultMQProducer traceProducer; private final ThreadPoolExecutor traceExecutor; // The last discard number of log private AtomicLong discardCount; private Thread worker; private ArrayBlockingQueue<TraceContext> traceContextQueue; private ArrayBlockingQueue<Runnable> appenderQueue; private volatile Thread shutDownHook; private volatile boolean stopped = false; private DefaultMQProducerImpl hostProducer; private DefaultMQPushConsumerImpl hostConsumer; private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(); private String dispatcherId = UUID.randomUUID().toString(); private String traceTopicName; private AtomicBoolean isStarted = new AtomicBoolean(false); private AccessChannel accessChannel = AccessChannel.LOCAL; //...... public void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException { if (isStarted.compareAndSet(false, true)) { traceProducer.setNamesrvAddr(nameSrvAddr); traceProducer.setInstanceName(TRACE_INSTANCE_NAME + "_" + nameSrvAddr); traceProducer.start(); } this.accessChannel = accessChannel; this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncTraceDispatcher-Thread-" + dispatcherId); this.worker.setDaemon(true); this.worker.start(); this.registerShutDownHook(); } //...... class AsyncAppenderRequest implements Runnable { List<TraceContext> contextList; public AsyncAppenderRequest(final List<TraceContext> contextList) { if (contextList != null) { this.contextList = contextList; } else { this.contextList = new ArrayList<TraceContext>(1); } } private void sendTraceDataByMQ(Set<String> keySet, final String data, String dataTopic, String regionId) { String traceTopic = traceTopicName; if (AccessChannel.CLOUD == accessChannel) { traceTopic = TraceConstants.TRACE_TOPIC_PREFIX + regionId; } final Message message = new Message(traceTopic, data.getBytes()); // Keyset of message trace includes msgId of or original message message.setKeys(keySet); //...... } //...... } //...... }
- AsyncTraceDispatcher内部类AsyncAppenderRequest的sendTraceDataByMQ方法,针对accessChannel为AccessChannel.CLOUD类型的,会给TraceConstants.TRACE_TOPIC_PREFIX加上regionId作为traceTopic
小结
AccessChannel定义了两个枚举值,分别是LOCAL及CLOUD;TraceDispatcher的start方法会接收AccessChannel类型的参数;AsyncTraceDispatcher内部类AsyncAppenderRequest的sendTraceDataByMQ方法,针对accessChannel为AccessChannel.CLOUD类型的,会给TraceConstants.TRACE_TOPIC_PREFIX加上regionId作为traceTopic
doc
相关推荐
IT农场 2020-11-13
LCFlxfldy 2020-08-17
ljcsdn 2020-07-27
LCFlxfldy 2020-07-05
lypgcs 2020-06-27
陈晨软件五千言 2020-06-17
qingyuerji 2020-06-14
MojitoBlogs 2020-06-14
lypgcs 2020-06-14
陈晨软件五千言 2020-06-14
meilongwhpu 2020-06-13
陈晨软件五千言 2020-06-11
qingyuerji 2020-06-09
MojitoBlogs 2020-06-09
meilongwhpu 2020-06-08
meilongwhpu 2020-06-08
lypgcs 2020-06-07
MojitoBlogs 2020-06-04
meilongwhpu 2020-05-30