聊聊elasticsearch的NodesFaultDetection
序
本文主要研究一下elasticsearch的NodesFaultDetection
NodesFaultDetection
elasticsearch-0.90.0/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java
public class NodesFaultDetection extends AbstractComponent { public static interface Listener { void onNodeFailure(DiscoveryNode node, String reason); } private final ThreadPool threadPool; private final TransportService transportService; private final boolean connectOnNetworkDisconnect; private final TimeValue pingInterval; private final TimeValue pingRetryTimeout; private final int pingRetryCount; // used mainly for testing, should always be true private final boolean registerConnectionListener; private final CopyOnWriteArrayList<Listener> listeners = new CopyOnWriteArrayList<Listener>(); private final ConcurrentMap<DiscoveryNode, NodeFD> nodesFD = newConcurrentMap(); private final FDConnectionListener connectionListener; private volatile DiscoveryNodes latestNodes = EMPTY_NODES; private volatile boolean running = false; public NodesFaultDetection(Settings settings, ThreadPool threadPool, TransportService transportService) { super(settings); this.threadPool = threadPool; this.transportService = transportService; this.connectOnNetworkDisconnect = componentSettings.getAsBoolean("connect_on_network_disconnect", true); this.pingInterval = componentSettings.getAsTime("ping_interval", timeValueSeconds(1)); this.pingRetryTimeout = componentSettings.getAsTime("ping_timeout", timeValueSeconds(30)); this.pingRetryCount = componentSettings.getAsInt("ping_retries", 3); this.registerConnectionListener = componentSettings.getAsBoolean("register_connection_listener", true); logger.debug("[node ] uses ping_interval [{}], ping_timeout [{}], ping_retries [{}]", pingInterval, pingRetryTimeout, pingRetryCount); transportService.registerHandler(PingRequestHandler.ACTION, new PingRequestHandler()); this.connectionListener = new FDConnectionListener(); if (registerConnectionListener) { transportService.addConnectionListener(connectionListener); } } public NodesFaultDetection start() { if (running) { return this; } running = true; return this; } public NodesFaultDetection stop() { if (!running) { return this; } running = false; return this; } public void close() { stop(); transportService.removeHandler(PingRequestHandler.ACTION); transportService.removeConnectionListener(connectionListener); } //...... }
- NodesFaultDetection继承了AbstractComponent,它定义了一个CopyOnWriteArrayList类型的listeners,一个ConcurrentMap的nodesFD,connectionListener、latestNodes、running等属性
- 其构造器读取connect_on_network_disconnect(
默认true
)、ping_interval(默认1s
)、ping_timeout(默认30s
)、ping_retries(默认为3
)、register_connection_listener(默认true
)配置,然后给transportService注册了PingRequestHandler.ACTION的PingRequestHandler,添加了FDConnectionListener - start方法用于设置running为true;stop用于设置running为false;close方法先执行stop,然后从transportService移除PingRequestHandler.ACTION的handler,并移除connectionListener
PingRequestHandler
elasticsearch-0.90.0/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java
class PingRequestHandler extends BaseTransportRequestHandler<PingRequest> { public static final String ACTION = "discovery/zen/fd/ping"; @Override public PingRequest newInstance() { return new PingRequest(); } @Override public void messageReceived(PingRequest request, TransportChannel channel) throws Exception { // if we are not the node we are supposed to be pinged, send an exception // this can happen when a kill -9 is sent, and another node is started using the same port if (!latestNodes.localNodeId().equals(request.nodeId)) { throw new ElasticSearchIllegalStateException("Got pinged as node [" + request.nodeId + "], but I am node [" + latestNodes.localNodeId() + "]"); } channel.sendResponse(new PingResponse()); } @Override public String executor() { return ThreadPool.Names.SAME; } } static class PingRequest extends TransportRequest { // the (assumed) node id we are pinging private String nodeId; PingRequest() { } PingRequest(String nodeId) { this.nodeId = nodeId; } @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); nodeId = in.readString(); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeString(nodeId); } } private static class PingResponse extends TransportResponse { private PingResponse() { } @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); } }
- PingRequestHandler的newInstance方法用于创建PingRequest,该对象定义了nodeId属性用于标识它要请求的目标nodeId;而messageReceived方法用于响应PingRequest请求,它会先判断目标nodeId是否跟localNodeId一致,一致的话则返回PingResponse
FDConnectionListener
elasticsearch-0.90.0/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java
private class FDConnectionListener implements TransportConnectionListener { @Override public void onNodeConnected(DiscoveryNode node) { } @Override public void onNodeDisconnected(DiscoveryNode node) { handleTransportDisconnect(node); } } private void handleTransportDisconnect(DiscoveryNode node) { if (!latestNodes.nodeExists(node.id())) { return; } NodeFD nodeFD = nodesFD.remove(node); if (nodeFD == null) { return; } if (!running) { return; } nodeFD.running = false; if (connectOnNetworkDisconnect) { try { transportService.connectToNode(node); nodesFD.put(node, new NodeFD()); threadPool.schedule(pingInterval, ThreadPool.Names.SAME, new SendPingRequest(node)); } catch (Exception e) { logger.trace("[node ] [{}] transport disconnected (with verified connect)", node); notifyNodeFailure(node, "transport disconnected (with verified connect)"); } } else { logger.trace("[node ] [{}] transport disconnected", node); notifyNodeFailure(node, "transport disconnected"); } } private void notifyNodeFailure(final DiscoveryNode node, final String reason) { threadPool.generic().execute(new Runnable() { @Override public void run() { for (Listener listener : listeners) { listener.onNodeFailure(node, reason); } } }); }
- FDConnectionListener在onNodeDisconnected的时候会执行handleTransportDisconnect;该方法会将该node从nodesFD中移除,标记该nodeFD的running为false
- 如果connectOnNetworkDisconnect为true则对该node进行connect,成功则放入nodesFD,并注册对该node进行SendPingRequest的延时任务,延时pingInterval执行;如果connect异常或者connectOnNetworkDisconnect为false,否执行notifyNodeFailure方法
- notifyNodeFailure方法则会触发NodesFaultDetection.Listener.onNodeFailure回调,这里回调ZenDiscovery的NodeFailureListener的onNodeFailure方法
ZenDiscovery
elasticsearch-0.90.0/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java
public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implements Discovery, DiscoveryNodesProvider { //...... @Inject public ZenDiscovery(Settings settings, ClusterName clusterName, ThreadPool threadPool, TransportService transportService, ClusterService clusterService, NodeSettingsService nodeSettingsService, DiscoveryNodeService discoveryNodeService, ZenPingService pingService) { super(settings); this.clusterName = clusterName; this.threadPool = threadPool; this.clusterService = clusterService; this.transportService = transportService; this.discoveryNodeService = discoveryNodeService; this.pingService = pingService; // also support direct discovery.zen settings, for cases when it gets extended this.pingTimeout = settings.getAsTime("discovery.zen.ping.timeout", settings.getAsTime("discovery.zen.ping_timeout", componentSettings.getAsTime("ping_timeout", componentSettings.getAsTime("initial_ping_timeout", timeValueSeconds(3))))); this.sendLeaveRequest = componentSettings.getAsBoolean("send_leave_request", true); this.masterElectionFilterClientNodes = settings.getAsBoolean("discovery.zen.master_election.filter_client", true); this.masterElectionFilterDataNodes = settings.getAsBoolean("discovery.zen.master_election.filter_data", false); logger.debug("using ping.timeout [{}], master_election.filter_client [{}], master_election.filter_data [{}]", pingTimeout, masterElectionFilterClientNodes, masterElectionFilterDataNodes); this.electMaster = new ElectMasterService(settings); nodeSettingsService.addListener(new ApplySettings()); this.masterFD = new MasterFaultDetection(settings, threadPool, transportService, this); this.masterFD.addListener(new MasterNodeFailureListener()); this.nodesFD = new NodesFaultDetection(settings, threadPool, transportService); this.nodesFD.addListener(new NodeFailureListener()); this.publishClusterState = new PublishClusterStateAction(settings, transportService, this, new NewClusterStateListener()); this.pingService.setNodesProvider(this); this.membership = new MembershipAction(settings, transportService, this, new MembershipListener()); transportService.registerHandler(RejoinClusterRequestHandler.ACTION, new RejoinClusterRequestHandler()); } protected void doStart() throws ElasticSearchException { Map<String, String> nodeAttributes = discoveryNodeService.buildAttributes(); // note, we rely on the fact that its a new id each time we start, see FD and "kill -9" handling String nodeId = UUID.randomBase64UUID(); localNode = new DiscoveryNode(settings.get("name"), nodeId, transportService.boundAddress().publishAddress(), nodeAttributes); latestDiscoNodes = new DiscoveryNodes.Builder().put(localNode).localNodeId(localNode.id()).build(); nodesFD.updateNodes(latestDiscoNodes); pingService.start(); // do the join on a different thread, the DiscoveryService waits for 30s anyhow till it is discovered asyncJoinCluster(); } public void publish(ClusterState clusterState) { if (!master) { throw new ElasticSearchIllegalStateException("Shouldn't publish state when not master"); } latestDiscoNodes = clusterState.nodes(); nodesFD.updateNodes(clusterState.nodes()); publishClusterState.publish(clusterState); } private class NodeFailureListener implements NodesFaultDetection.Listener { @Override public void onNodeFailure(DiscoveryNode node, String reason) { handleNodeFailure(node, reason); } } private void handleNodeFailure(final DiscoveryNode node, String reason) { if (lifecycleState() != Lifecycle.State.STARTED) { // not started, ignore a node failure return; } if (!master) { // nothing to do here... return; } clusterService.submitStateUpdateTask("zen-disco-node_failed(" + node + "), reason " + reason, new ProcessedClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { DiscoveryNodes.Builder builder = new DiscoveryNodes.Builder() .putAll(currentState.nodes()) .remove(node.id()); latestDiscoNodes = builder.build(); currentState = newClusterStateBuilder().state(currentState).nodes(latestDiscoNodes).build(); // check if we have enough master nodes, if not, we need to move into joining the cluster again if (!electMaster.hasEnoughMasterNodes(currentState.nodes())) { return rejoin(currentState, "not enough master nodes"); } // eagerly run reroute to remove dead nodes from routing table RoutingAllocation.Result routingResult = allocationService.reroute(newClusterStateBuilder().state(currentState).build()); return newClusterStateBuilder().state(currentState).routingResult(routingResult).build(); } @Override public void clusterStateProcessed(ClusterState clusterState) { sendInitialStateEventIfNeeded(); } }); } //...... }
- ZenDiscovery的构造器创建了NodesFaultDetection,并给它添加了NodeFailureListener;该listener实现了NodesFaultDetection.Listener接口,其onNodeFailure回调执行的是handleNodeFailure方法,它会执行ProcessedClusterStateUpdateTask,将该node从currentState.nodes()中移除,然后判断masterNode数量是否满足minimumMasterNodes,不够的话会执行rejoin方法,够的话则执行allocationService.reroute
- 其doStart方法会根据配置文件的node配置创建localNode,然后加入到latestDiscoNodes中,之后执行nodesFD.updateNodes(latestDiscoNodes)方法,然后执行pingService.start()及asyncJoinCluster()
- 其publish方法则根据clusterState的nodes来更新本地的latestDiscoNodes,然后执行nodesFD.updateNodes(latestDiscoNodes)方法,最后执行publishClusterState.publish(clusterState)
NodesFaultDetection.updateNodes
elasticsearch-0.90.0/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java
public class NodesFaultDetection extends AbstractComponent { //...... public void updateNodes(DiscoveryNodes nodes) { DiscoveryNodes prevNodes = latestNodes; this.latestNodes = nodes; if (!running) { return; } DiscoveryNodes.Delta delta = nodes.delta(prevNodes); for (DiscoveryNode newNode : delta.addedNodes()) { if (newNode.id().equals(nodes.localNodeId())) { // no need to monitor the local node continue; } if (!nodesFD.containsKey(newNode)) { nodesFD.put(newNode, new NodeFD()); threadPool.schedule(pingInterval, ThreadPool.Names.SAME, new SendPingRequest(newNode)); } } for (DiscoveryNode removedNode : delta.removedNodes()) { nodesFD.remove(removedNode); } } //...... }
- NodesFaultDetection提供了updateNodes方法用于更新自身的latestNodes,该方法调用了nodes.delta(prevNodes)来计算DiscoveryNodes.Delta,它的addedNodes方法返回新增的node,而emovedNodes()方法返回删除的node
- 对于newNode先判断是否在nodesFD,如果不在的话,则会添加到nodesFD中,并注册一个SendPingRequest的延时任务,延时pingInterval执行
- 对于removedNode则将其从nodesFD中移除;handleTransportDisconnect方法也会将一个disconnect的node从ndoesFD中移除,如果重试一次成功则会再次放入nodesFD中
SendPingRequest
elasticsearch-0.90.0/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java
private class SendPingRequest implements Runnable { private final DiscoveryNode node; private SendPingRequest(DiscoveryNode node) { this.node = node; } @Override public void run() { if (!running) { return; } transportService.sendRequest(node, PingRequestHandler.ACTION, new PingRequest(node.id()), options().withHighType().withTimeout(pingRetryTimeout), new BaseTransportResponseHandler<PingResponse>() { @Override public PingResponse newInstance() { return new PingResponse(); } @Override public void handleResponse(PingResponse response) { if (!running) { return; } NodeFD nodeFD = nodesFD.get(node); if (nodeFD != null) { if (!nodeFD.running) { return; } nodeFD.retryCount = 0; threadPool.schedule(pingInterval, ThreadPool.Names.SAME, SendPingRequest.this); } } @Override public void handleException(TransportException exp) { // check if the master node did not get switched on us... if (!running) { return; } if (exp instanceof ConnectTransportException) { // ignore this one, we already handle it by registering a connection listener return; } NodeFD nodeFD = nodesFD.get(node); if (nodeFD != null) { if (!nodeFD.running) { return; } int retryCount = ++nodeFD.retryCount; logger.trace("[node ] failed to ping [{}], retry [{}] out of [{}]", exp, node, retryCount, pingRetryCount); if (retryCount >= pingRetryCount) { logger.debug("[node ] failed to ping [{}], tried [{}] times, each with maximum [{}] timeout", node, pingRetryCount, pingRetryTimeout); // not good, failure if (nodesFD.remove(node) != null) { notifyNodeFailure(node, "failed to ping, tried [" + pingRetryCount + "] times, each with maximum [" + pingRetryTimeout + "] timeout"); } } else { // resend the request, not reschedule, rely on send timeout transportService.sendRequest(node, PingRequestHandler.ACTION, new PingRequest(node.id()), options().withHighType().withTimeout(pingRetryTimeout), this); } } } @Override public String executor() { return ThreadPool.Names.SAME; } }); } }
- SendPingRequest方法会往目标node发送PingRequest,其超时时间为pingRetryTimeout;其handleResponse方法会判断该node是否在nodesFD中,如果已经被移除了则忽略,如果改nodeFD的running为false,也忽略,否则重置其retryCount,并重新注册SendPingRequest的延时任务,延时pingInterval执行
- 如果请求出现TransportException则判断是否是ConnectTransportException,如果是则忽略,因为该异常已经由往transportService注册的FDConnectionListener的onNodeDisconnected来处理
- 如果是其他异常则增加nodeFD.retryCount,当retryCount大于等于配置的pingRetryCount时,则会将该node从nodesFD中移除,并回调notifyNodeFailure方法,具体就是回调了ZenDiscovery的handleNodeFailure方法;如果没有超过配置的pingRetryCount则会进行重试,重新发送PingRequest请求
小结
- NodesFaultDetection给transportService注册了PingRequestHandler.ACTION的PingRequestHandler,添加了FDConnectionListener;PingRequestHandler用于响应PingRequest请求,返回PingResponse;FDConnectionListener则用于处理ConnectTransportException异常
- FDConnectionListener的onNodeDisconnected方法会将该node从nodesFD中移除,标记该nodeFD的running为false;如果connectOnNetworkDisconnect为true则会重试一次(
对该node进行connect,成功则放入nodesFD,并注册对该node进行SendPingRequest的延时任务,延时pingInterval执行
);如果connect异常或者connectOnNetworkDisconnect为false,否执行notifyNodeFailure方法;notifyNodeFailure方法则会触发NodesFaultDetection.Listener.onNodeFailure回调,这里回调ZenDiscovery的NodeFailureListener的onNodeFailure方法 - ZenDiscovery的doStart方法及publish方法都会执行NodesFaultDetection的updateNodes方法来更新latestNodes,对于新的node则注册延时任务SendPingRequest
- SendPingRequest执行成功时会重置retryCount并继续注册SendPingRequest的延时任务,如果是非TransportException则进行重试,重试次数超过限制则触发notifyNodeFailure,回调NodesFaultDetection.Listener.onNodeFailure方法,这里回调ZenDiscovery的NodeFailureListener的onNodeFailure方法
- ZenDiscovery的NodeFailureListener实现了NodesFaultDetection.Listener接口,其onNodeFailure回调执行的是handleNodeFailure方法,它会执行ProcessedClusterStateUpdateTask,将该node从currentState.nodes()中移除,然后判断masterNode数量是否满足minimumMasterNodes,不够的话会执行rejoin方法,够的话则执行allocationService.reroute
doc
相关推荐
另外一部分,则需要先做聚类、分类处理,将聚合出的分类结果存入ES集群的聚类索引中。数据处理层的聚合结果存入ES中的指定索引,同时将每个聚合主题相关的数据存入每个document下面的某个field下。