Thrift + Curator实现服务注册与发现功能

文章讲解如何使用Thrift和Curator实现自己的服务注册与发现功能,服务注册中心使用Zookeeper框架。

技术栈

        zookeeper-3.4.13

        thrift-0.11.0

        curator-2.13.0

特性

        1、服务集群部署:同一个服务可以部署到多台服务器上,注册中心维护一个服务的多份payload信息

        2、客户端软负载均衡:暂支持随机和轮询两种方式

        3、服务提供者和服务消费者可选择是否要注册到注册中心

        4、业务服务实现类自动加载和注册

        5、一个端口监听多个业务服务

最新源码: https://github.com/chenjuwen/thrift-microservice

功能主要包含以下关键实现点:

        扫描和加载业务服务实现类

        发布业务服务到thrift处理器

        注册业务服务到注册中心

        创建thrift服务端

        从注册中心查找业务服务

        调用业务服务接口

1、扫描和加载业务服务实现类

        业务服务接口通过Thrift的IDL进行描述,并使用Thrift提供的工具编译生成接口文件。

Common.thrift
namespace java com.seasy.microservice.api

struct Message {
	1: i32 type;
	2: binary data;
}

struct Response {
	1: i32 code;
	2: string message;
}


Hello.thrift
namespace java com.seasy.microservice.api

include "Common.thrift"
		
service Hello{
	string helloString(1:string param)
	Common.Response sendMessage(1:Common.Message message)
}

        开发业务服务接口的实现类,所有实现类统一用自定义注解类ServiceAnnotation加以标注。

@ServiceAnnotation(serviceClass=Hello.class, version="1.0.0")
public class HelloServiceImpl implements Hello.Iface{
	@Override
	public String helloString(String param) throws TException {
		return param;
	}
	
	@Override
	public Response sendMessage(Message message) throws TException {
		System.out.println(message.getType() + ", " + new String(message.getData()));
		
		Response response = new Response(0, "success");
		return response;
	}
}

        根据自定义注解类扫描和加载业务服务实现类。

private ConcurrentHashMap<String, ServiceInformation> loadService(String packagePath) throws Exception {
	ConcurrentHashMap<String, ServiceInformation> serviceInformationMap = new ConcurrentHashMap<String, ServiceInformation>();
	
	Reflections reflections = new Reflections(packagePath);
	
	//查找有指定注解类的服务类
	Set<Class<?>> serviceImplementClassSet = reflections.getTypesAnnotatedWith(ServiceAnnotation.class);
	for(Class<?> serviceImplementClass : serviceImplementClassSet){
		ServiceAnnotation serviceAnnotation = serviceImplementClass.getAnnotation(ServiceAnnotation.class);
		
		//服务相关信息封装在ServiceInformation类中
		ServiceInformation serviceInformation = new ServiceInformation();
		serviceInformation.setId(serviceInformation.getId());
		serviceInformation.setServiceClass(serviceAnnotation.serviceClass());
		serviceInformation.setVersion(serviceAnnotation.version());
		serviceInformation.setTimeout(serviceAnnotation.timeout());
		serviceInformation.setServiceImplementClassInstance(serviceImplementClass.newInstance());
		
		String key = serviceAnnotation.serviceClass().getName();
		serviceInformationMap.put(key, serviceInformation);
		logger.debug("Class [" + key + "] loaded!");
	}
	
	return serviceInformationMap;
}
 

2、发布业务服务到thrift处理器

        使用多路复用处理器TMultiplexedProcessor注册多个业务服务,这样通过监听一个端口即可提供多种服务。

private void buildMultiplexedProcessor(){
	multiplexedProcessor = new TMultiplexedProcessor();
	
	for(Iterator<String> it=serviceInformationMap.keySet().iterator(); it.hasNext(); ){
		String serviceClassFullname = it.next();
		logger.debug("serviceClassFullname=" + serviceClassFullname);
		
		//服务名
		String serviceName = serviceClassFullname.substring(serviceClassFullname.lastIndexOf(".")+1);
		logger.debug("serviceName=" + serviceName);
		
		Object serviceImplementClassInstance = serviceInformationMap.get(serviceClassFullname).getServiceImplementClassInstance();
		
		TProcessor serviceProcessor = createServiceProcessor(serviceClassFullname, serviceImplementClassInstance);
		if(serviceProcessor != null){
			//以接口主类的SimpleName作为服务名
			multiplexedProcessor.registerProcessor(serviceName, serviceProcessor);
			serviceInformationMap.get(serviceClassFullname).setProcessorRegistered(true);
			logger.info("Processor [" + serviceName + "] published!");
		}
	}
}

private TProcessor createServiceProcessor(String serviceClassFullname, Object serviceImplementClassInstance){
	try{
		String processorClassName = serviceClassFullname + "$Processor";
		String ifaceClassName = serviceClassFullname + "$Iface";
		
		Class<?> processorClass = Class.forName(processorClassName);
		Class<?> ifaceClass = Class.forName(ifaceClassName);
		
                //一个服务实现类对应一个Processor
		Constructor<?> constructor = processorClass.getDeclaredConstructor(new Class[]{ifaceClass});
		TProcessor processor = (TProcessor) constructor.newInstance(new Object[]{serviceImplementClassInstance});
		
		return processor;
		
	}catch(Exception ex){
		logger.error("create service[" + serviceClassFullname + "] processor error", ex);
	}
	return null;
}
 

3、注册业务服务到注册中心

        功能采用Zookeeper作为服务注册中心,使用Curator与Zookeeper进行通信。服务注册需要用到Curator扩展包curator-x-discovery.jar

 

        构造CuratorFramework实例

curator = CuratorFrameworkFactory.builder()
		.connectString(connectString)
		.namespace("thrift-microservice")
		.build();
curator.start();
 

        构造ServiceDiscovery实例

serviceDiscovery = ServiceDiscoveryBuilder.builder(ThriftServicePayload.class)
	.client(curator)
	.serializer(new JsonInstanceSerializer<>(ThriftServicePayload.class))
	.basePath(basePath)
	.build();
serviceDiscovery.start();
 

        将业务服务注册到Zookeeper注册中心

private void registerBusinessService(){
	for(Iterator<String> it=serviceInformationMap.keySet().iterator(); it.hasNext(); ){
		String serviceClassFullname = it.next();
		String serviceName = serviceClassFullname.substring(serviceClassFullname.lastIndexOf(".")+1);
		ServiceInformation serviceInformation = serviceInformationMap.get(serviceClassFullname);
		
		if(serviceInformation.isProcessorRegistered()){
			try{
				//构造ServiceInstance对象,该对象表示一个业务服务,用于存储业务服务相关的参数数据
				ServiceInstance<ThriftServicePayload> serviceInstance = ServiceInstance.<ThriftServicePayload>builder()
					.name(serviceName)
					.id(StringUtil.isEmpty(serviceInformation.getId()) ? serviceName : serviceInformation.getId())
					.address(EnvUtil.getLocalIp())
					.port(getPort())
					.payload(new ThriftServicePayload(serviceInformation.getVersion(), serviceInformation.getServiceClass().getName()))
					.registrationTimeUTC(System.currentTimeMillis())
					.serviceType(ServiceType.DYNAMIC)
					.build();
				
				serviceRegistry.registerService(serviceInstance);
				serviceInformation.setServiceRegistered(true);
				logger.info("Service [" + serviceName + "] registered!");
				
			}catch(Exception ex){
				logger.error("register service[" + serviceName + "] error", ex);
			}
		}
	}
}

  

4、创建thrift服务端

private void startServer()throws Exception{
	serverSocket = new TNonblockingServerSocket(getPort());
	
	TNonblockingServer.Args tArgs = new TNonblockingServer.Args(serverSocket);
	tArgs.processor(multiplexedProcessor);
	tArgs.transportFactory(new TFramedTransport.Factory());
	tArgs.protocolFactory(new TCompactProtocol.Factory());

	tserver = new TNonblockingServer(tArgs);
	tserver.setServerEventHandler(new DefaultServerEventHandler());
	tserver.serve();
}

  

5、从注册中心查找业务服务

        根据业务服务的Client类从注册中心查找对应的服务配置信息,并根据服务配置信息实例化服务Client类的对象。

public <T> T getServiceClient(Class<T> serviceClientClass){
	try{
		String serviceClientClassName = serviceClientClass.getName();
		if(!serviceClientClassName.endsWith("$Client")){
			throw new IllegalArgumentException("serviceClientClass must be $Client class");
		}
		
		String serviceName = serviceClientClassName.replace("$Client", "");
		serviceName = serviceName.substring(serviceName.lastIndexOf(".")+1);
		
		Object object = getServiceClient(serviceName);
		if(object != null){
			return (T)object;
		}
		
	}catch(Exception ex){
		logger.error("Failed to get ServiceClient", ex);
	}
	return null;
}

public Object getServiceClient(String serviceName) {
	try{
		ServiceInstance<ThriftServicePayload> serviceInstance = queryForInstance(serviceName);
		if(serviceInstance != null){
			//服务所在机器的IP地址
			String host = serviceInstance.getAddress();
			
			//服务所在机器的监听端口
			int port = serviceInstance.getPort();
			
			ServiceClientFactory factory = null;
			ServiceClientWrapper wrapper = null;
			
			String key = host + ":" + port;
			if(serviceClientFactoryMap.containsKey(key)){
				factory = serviceClientFactoryMap.get(key);
				wrapper = factory.getServiceClientWrapper(serviceName);
			}else{
				logger.info("create ServiceClientFactory...");
				factory = ServiceClientFactory.getInstance();
				factory.setHost(host);
				factory.setPort(port);
				factory.open();
				
				serviceClientFactoryMap.put(key, factory);
			}
			
			if(wrapper == null){
				Class<?> serviceClientClass = Class.forName(serviceInstance.getPayload().getInterfaceName() + "$Client");
				
				wrapper = new ServiceClientWrapper(serviceInstance, serviceClientClass, serviceName);
				factory.addServiceClientWrapper(wrapper);
				
				return factory.getServiceClientWrapper(serviceName).getServiceClientInstanceObject();
			}else{
				return wrapper.getServiceClientInstanceObject();
			}
		}else{
			//服务不存在
			logger.error("service not found: " + serviceName);
		}
		
	}catch(Exception ex){
		logger.error("Failed to get ServiceClient", ex);
	}
	return null;
}

    一个ServiceClientFactory对象代表一个服务提供者,一个服务提供者可以提供多个业务服务,每个业务服务对应的客户端对象用ServiceClientWrapper实例表示。

public void open(){
	try{
		if(StringUtil.isNotEmpty(this.host) && this.port != 0){
			if(transport == null){
				transport = new TFramedTransport(new TSocket(this.host, this.port));
				transport.open();
				logger.debug("Transport opened --> " + this.host + ":" + this.port);
			}else if(!transport.isOpen()){
				transport.open();
			}
		}else{
			throw new IllegalArgumentException("parameter host or port is invalid!");
		}
	}catch(Exception ex){
		close();
		throw new RuntimeException("Failed to open service Socket", ex);
	}
}

public ServiceClientWrapper getServiceClientWrapper(String serviceName){
	return serviceClientWrapperMap.get(serviceName);
}

public void addServiceClientWrapper(ServiceClientWrapper wrapper){
	if(!serviceClientWrapperMap.containsKey(wrapper.getServiceName())){
		wrapper = instanceServiceClient(wrapper);
		serviceClientWrapperMap.put(wrapper.getServiceName(), wrapper);
	}
}

private ServiceClientWrapper instanceServiceClient(ServiceClientWrapper wrapper){
	try{
		logger.debug("instance ServiceClient: " + wrapper.getServiceClientClass().getName());
		TProtocol protocol = new TCompactProtocol(transport);
		TMultiplexedProtocol multiplexedProtocol = new TMultiplexedProtocol(protocol, wrapper.getServiceName());
		Class[] classes = new Class[]{TProtocol.class};
		Object serviceClientInstanceObject = wrapper.getServiceClientClass().getConstructor(classes).newInstance(multiplexedProtocol);
		wrapper.setServiceClientInstanceObject(serviceClientInstanceObject);
		return wrapper;
	}catch(Exception ex){
		logger.error("instance ServiceClient error", ex);
	}
	return wrapper;
}
public class ServiceClientWrapper {
	private ServiceInstance<ThriftServicePayload> serviceInstance;
    private Class<?> serviceClientClass;
    private String serviceName;
	private Object serviceClientInstanceObject; //服务客户端类的实例对象
    
    public ServiceClientWrapper(ServiceInstance<ThriftServicePayload> serviceInstance, Class<?> serviceClientClass, String serviceName){
    	this.serviceInstance = serviceInstance;
    	this.serviceClientClass = serviceClientClass;
    	this.serviceName = serviceName;
    }

	public ServiceInstance<ThriftServicePayload> getServiceInstance() {
		return serviceInstance;
	}

	public Class<?> getServiceClientClass() {
		return serviceClientClass;
	}

	public String getServiceName() {
		return serviceName;
	}
	
	public Object getServiceClientInstanceObject() {
		return serviceClientInstanceObject;
	}

	public void setServiceClientInstanceObject(Object serviceClientInstanceObject) {
		this.serviceClientInstanceObject = serviceClientInstanceObject;
	}
}

  

6、调用业务服务接口

        客户端获取服务Client实例对象的方式:

Hello.Client client = getServiceClient(Hello.Client.class)

或者

Object object = clientBootstrap.getServiceClient("Hello");
if(object != null){
	Hello.Client client = (Hello.Client)object;
}

        访问接口方法:

client.helloString("hello string")

ByteBuffer data = ByteBuffer.wrap("hello world".getBytes("UTF-8"));
Response response = client.sendMessage(new Message(1, data));
System.out.println(response.getCode() + ", " + response.getMessage());

 7、注册中心管理(截图)

Thrift + Curator实现服务注册与发现功能
 


Thrift + Curator实现服务注册与发现功能
 


Thrift + Curator实现服务注册与发现功能
 

相关推荐