ZeroMQ发布-订阅模式(套接字类型ZMQ_PUB、ZMQ_SUB、ZMQ_XPUB等)
0MQ (ZeroMQ) 是一个轻量级消息内核。它可用于C、C++、Python、.NET /Mono、Fortran 和 Java 语言。它运行在AIX ,FreeBSD的,基于HP - UX , Linux和MacOS下, OpenBSD系统, OpenVMS , QNX Neutrino,Solaris 和 Windows 操作系统。
ZeroMQ (also known as ØMQ, 0MQ, or zmq) looks like an embeddable networking library but acts like a concurrency framework. It gives you sockets that carry atomic messages across various transports like in-process, inter-process, TCP, and multicast. You can connect sockets N-to-N with patterns like fan-out, pub-sub, task distribution, and request-reply. It‘s fast enough to be the fabric for clustered products. Its asynchronous I/O model gives you scalable multicore applications, built as asynchronous message-processing tasks. It has a score of language APIs and runs on most operating systems.
一、0MQ模式总览
本文介绍0MQ的“发布-订阅”模式
二、发布-订阅模式
在发布-订阅模式中,有一个发布者用来发送消息,该模式中有很多订阅者会接收发布者发布的消息
“发布-订阅”模型支持的套接字类型有4种:
ZMQ_PUB
ZMQ_SUB
ZMQ_XPUB
ZMQ_XSUB
三、“PUB-SUB”套接字类型
PUB就是发布者,SUB就是订阅者

ZMQ_PUB
发布者使用类型为ZMQ_PUB的套接字来分发数据。发送的消息以扇出方式分发给所有连接的对等方
在ZMQ_PUB类型的套接字上不能执行zmq_msg_recv()等接收数据的函数
当ZMQ_PUB套接字由于已达到订阅者的高水位标记而进入静音状态时,将发送给有问题的订阅者的任何消息都将被丢弃,直到静音状态结束为止。关于“高水位标记”请参阅:
对于该套接字类型,zmq_msg_send()函数将永远不会阻塞
ZMQ_SUB
订阅者使用ZMQ_SUB类型的套接字来订阅发布者分发的数据
ZMQ_SUB套接字创建完成之后,ZMQ_SUB套接字未订阅任何消息,请使用zmq_setsockopt()的ZMQ_SUBSCRIBE选项指定要订阅的消息
在ZMQ_PUB类型的套接字上不能执行zmq_msg_recv()等接收数据的函数
演示案例 下面编写一个使用“SUB-PUB”的发布订阅演示案例: 发布者:类似于一个天气更新服务器,向订阅者发送天气更新,内容包括邮政编码、温度、湿度等信息 订阅者:它监听发布者更新的数据流,过滤只接收与特定邮政编码相关的天气信息,默认接收接收10条数据 发布者代码如下:
// wuserver.c
#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include <unistd.h>
#include <string.h>
#include <time.h>
#include <zmq.h>
// 随机生成0...num-1的随机数
#define randof(num) (int) ((float) (num) * random () / (RAND_MAX + 1.0))
// 将string消息格式化为zmq_meg_t对象, 然后发往socket套接字上
static int s_send(void *socket, char *string);
int main()
{
// 1.初始化上下文
void *context = zmq_ctx_new();
// 2.创建、绑定套接字
void *publisher = zmq_socket(context, ZMQ_PUB);
assert(publisher != NULL);
// 此处我们将发布者绑定到一个tcp节点上和一个ipc节点上, 但是本案例我们只使用tcp, ipc那个只是演示说明zmq的套接字可以绑定到多个节点上
int rc = zmq_bind(publisher, "tcp://*:5555");
assert(rc == 0);
rc = zmq_bind(publisher, "ipc://weather.ipc");
assert(rc == 0);
// 3.初始化随机数发生器
srandom((unsigned)time(NULL));
// 4.循环发送数据
while(1)
{
// 5.随机生成邮政编码、温度、适度
int zipcode, temperature, relhumidity;
zipcode = randof(100000);
temperature = randof(215) - 80;
relhumidity = randof(50) + 10;
// 6.将消息发送给所有的订阅者
char update[20];
sprintf(update, "%05d %d %d", zipcode, temperature, relhumidity);
rc = s_send(publisher, update);
assert(rc);
}
// 7.关闭套接字、销毁上下文
zmq_close(publisher);
zmq_ctx_destroy(context);
return 0;
}
static int s_send(void *socket, char *string)
{
// 初始化一个zmq_msg_t对象, 分配的大小为string的大小
zmq_msg_t msg;
zmq_msg_init_size(&msg, strlen(string));
memcpy(zmq_msg_data(&msg), string, strlen(string));
// 发送数据
int rc = zmq_msg_send(&msg, socket, 0);
// 关闭zmq_msg_t对象
zmq_msg_close(&msg);
return rc;
}- 订阅者代码如下:
// 源码链接:https://github.com/dongyusheng/csdn-code/blob/master/ZeroMQ/wuclient.c
// wuclient.c
#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include <unistd.h>
#include <string.h>
#include <zmq.h>
// 从socket接收数据, 并将数据返回
char *s_recv(void *socket);
int main(int argc, char *argv[])
{
// 1.初始化上下文
void *context = zmq_ctx_new();
// 2.创建套接字、连接发布者
void *subscriber = zmq_socket(context, ZMQ_SUB);
assert(subscriber != NULL);
int rc = zmq_connect(subscriber, "tcp://localhost:5555");
assert(rc == 0);
// 3.因为自己是订阅者, 因此需要使用设置过滤器, 显式指定自己是否需要接收什么类型的消息
// 程序运行时可以输入参数, 参数代表邮政编码, 如果参数为空, 那么就过滤10001的消息
char *filter = (argc > 1) ? argv[1] : "10001";
rc = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, filter, strlen(filter));
assert( rc == 0);
// 4.从发布者那里接收消息, 接收10条自己想要的数据
int update_nbr;
long total_temp = 0;
for(update_nbr = 0; update_nbr < 10; update_nbr++)
{
// 5.接收数据
char *string = s_recv(subscriber);
assert(string != NULL);
// 6.将数据中的邮政编码、温度、适度分别存储变量中
int zipcode, temperature, relhumidity;
sscanf(string, "%d %d %d", &zipcode, &temperature, &relhumidity);
total_temp += temperature;
free(string);
}
// 7.接收完成之后, 打印一下平均温度
printf("Average tempature for zipcode ‘%s‘ was %dF\n", filter, (int)(total_temp / update_nbr));
// 8.关闭套接字、销毁上下文
zmq_close(subscriber);
zmq_ctx_destroy(context);
return 0;
}
char *s_recv(void *socket)
{
// 创建zmq_msg_t对象接收数据
zmq_msg_t msg;
zmq_msg_init(&msg);
int size = zmq_msg_recv(&msg, socket, 0);
if(size == -1)
{
return NULL;
}
// 将zmq_msg_t对象中的数据保存到字符串中
char *string = (char*)malloc(size + 1);
memcpy(string, zmq_msg_data(&msg), size);
zmq_msg_close(&msg);
string[size] = 0;
return string;
}- 编译如下:
gcc -o wuserver wuserver.c -lzmq gcc -o wuclient wuclient.c -lzmq
- 一次运行如下,左侧为发布者,右侧为订阅者,订阅者没有传入参数,因此默认订阅的是邮政编码为“10001”的数据

- 又运行一次如下,订阅者传入的参数为“10002”,因此订阅的是邮政编码为“10002”的数据

四、“XPUB-XSUB”套接字类型
“XPUB-XSUB”套接字类型与“PUB-SUB”套接字类型相同,也是属于发布-订阅
在“PUB-SUB”中,订阅者通过zmq_connect()向发布者发起订阅;但是“XPUB-XSUB”套接字类型允许订阅者通过发送一条订阅信息到发布者来完成订阅
ZMQ_XPUB
用法与ZMQ_PUB大部分相同
但是有一点与ZMQ_PUB不同:ZMQ_XPUB(自己)的订阅方可以向自己发送一个订阅信息来进行订阅。订阅消息是字节1(用于订阅)或字节0(用于取消订阅),后跟订阅主体。也接收不带子/取消订阅前缀的消息,但对订阅状态没有影响
ZMQ_XSUB
用法与ZMQ_SUB大部分相同
但是有一点与ZMQ_SUB不同:自己可以向发布者发送一条订阅信息来进行订阅。订阅消息是字节1(用于订阅)或字节0(用于取消订阅),后跟订阅主体。也接收不带子/取消订阅前缀的消息,但对订阅状态没有影响
XSUB、XPUB应用之“代理”
动态发现问题:在设计大型的分布式体系结构时,会遇到的问题之一是——“发现”。也就是说,部件如何认识对象,以及部件增减或减少时,如何更新这些消息,因此,我们称之为“动态发现”
动态发现解决方案①:简单的方式是通过硬编码(或配置)的网络架构来完全避免,一般通过手工操作。但是这种方案导致体系结构变得脆弱和笨重。例如一个系统有一个发布者和一百个订阅者,你需要对每一个订阅者配置发布者端点来让每个订阅者连接到发布者服务器。订阅者是动态的,发布者是静态的,如果你又新增了新的发布者,那么就需要再配置这一百个订阅者,工作量相当的大
动态发现解决方案②:通过中间层(代理)来实现,这种方法比较推荐
在“发布-订阅”模型中,我们可以在中间增加一个新的代理节点,该节点绑定了XSUB套接字和XPUB套接字,发布者连接到XSUB中,订阅者连接到XPUB中。这样一来添加或者删除发布者或订阅者节点就变得微不足道了
对于代理节点,其需要执行订阅转发:SUB套接字需要将自己的订阅信息作为特殊消息发送到代理的XPUB端点上,代理转发这些订阅消息到XSUB上,然后XSUB再将消息发送到PUB,从而最终完成SUB到PUB的订阅
当完成订阅之后,PUB直接发送消息,SUB可以直接收到,不需要代理进行转发