白话RabbitMQ(三):发布/订阅

推广

RabbitMQ专题讲座
CoolMQ开源项目

我们利用消息队列实现了分布式事务的最终一致性解决方案,请大家围观。可以参考源码:https://github.com/vvsuperman…,项目支持网站: http://rabbitmq.org.cn,最新文章或实现会更新在上面

前言

在第二章中我们描述了任务队列,在任务队列中一个消息只会发送给一个消费者。而在这一章中我们将消息发送给许多个消费者,我们称之为“发布/订阅”

为了更好的阐述这个模式,我们会建立一个新的简单的logging系统,包含2个步骤-第一步发送log信息,第二步能够接受并将信息打印出来,而且在第二步中所有的消费者都会接受到同样的消息,比如一个消费者用来将log信息写到磁盘,另外一个接受信息并显示在屏幕上。因此一旦有有消息,消息会广播到所有的消费者。

交换机(Exchanges)

前面的章节中我们是直接通过queue来处理消息,现在我们来介绍一种更完善的模式

让我们迅速浏览一遍前面的主题:

  1. 生产者是一个客户端程序,用来发送消息
  2. 队列是一个缓冲,用来存储消息
  3. 消费者是一个客户端程序,用来接受消息

RabbitMQ的核心思想是生产者不会将消息直接发送给队列,意味着生产者是完全看不到队列的。反之,生产者只能将消息发送给路由器(Exchange),再由路由器来决定该如何来处理消息,是将消息发送给一个队列呢,还是发送给许多个队列,或者直接无视,具体的规则是根据路由器的类型而定的。

白话RabbitMQ(三):发布/订阅

路由器的类型有这样几种:直连路由器(dirct), 主题路由器(topic),头部路由器(headers),以及多广播路由器(fanout)

channel.exchangeDeclare("logs", "fanout");

广播路由器听起来就很简单,它会将消息广播到所有的它所知道的队列,而这正是我们所需要的。

默认路由器

在前面的章节中虽然没有设置任何路由器,但依然能够将消息发送到队列,这是因为我们的是默认路由器:使用空字符串("")来做的定义:

channel.basicPublish("", "hello", null, message.getBytes());

第一个参数是exchange的名称,在这里是空字符串,消息会通过路由健(routingKey)发送到该键所对应的队列。

然而现在,我们有了确认的路由器

channel.basicPublish( "logs", "", null, message.getBytes());

临时队列
我们之前队列都有名字(Hello队列和task_queue队列),给队列起名字非常重要-需要将消费者绑定到特定的queue上面,以及需要把消息从生产者发送给特定的消费者。

但对于日志来说,消息会发送到所有的消费者,而并非个别,We're also interested only in currently flowing messages not in the old ones.为了满足当前需求我们可以做两件事

  1. 一旦连接上RabbitMQ,需要一个新的空队列来接受消息,我们可以随机起个名字,甚至根本不起名,而让RabbitMQ来命名它。
  2. 一旦消费者断开连接,这个队列就能被删除掉

我们可以这样定义一个不需要持久化、独立的、能够被自动删除的队列

String queueName = channel.queueDeclare().getQueue();

这个名称是RabbitMQ随机分配的,比如amq.gen-JzTY20BRgKO-HjmUJj0wLg.

绑定

白话RabbitMQ(三):发布/订阅

我们已经声明了一个广播路由器,现在需要告诉这个路由器需要把信息发送给哪些队列,路由器和队列间的这个关系就称之为绑定

channel.queueBind(queueName, "logs", "");

如此一来路由器就能够把消息发送给相应的队列了。

整合

白话RabbitMQ(三):发布/订阅

发送者与我们之前的代码基本相同,最重大的区别我们现在是发送给带名称的路由器了,同时我们也需要一个路由键,但这里也不需要,因为广播路由器会忽略这个值,这是我们EmitLog.java的代码

import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

public class EmitLog {

    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv)
                  throws java.io.IOException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        String message = getMessage(argv);

        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");

        channel.close();
        connection.close();
    }
    //...
}

可以看到,一旦我们建立的连接立即定义了一个路由器,这个步骤对我们非常重要,因为是严禁将消息发送给并不存在的路由的。

同时,如果路由器没有绑定队列,消息也会丢失掉,但这对于我们来说是ok的:如果并没有消费者在监听,我们可以直接丢弃掉这个消息。

ReciveLogs.java代码如下:

import com.rabbitmq.client.*;

import java.io.IOException;

public class ReceiveLogs {
  private static final String EXCHANGE_NAME = "logs";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    String queueName = channel.queueDeclare().getQueue();
    channel.queueBind(queueName, EXCHANGE_NAME, "");

    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    Consumer consumer = new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope,
                                 AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println(" [x] Received '" + message + "'");
      }
    };
    channel.basicConsume(queueName, true, consumer);
  }
}

编译代码

javac -cp $CP EmitLog.java ReceiveLogs.java

如果你希望将log存储到本机上

java -cp $CP ReceiveLogs > logs_from_rabbit.log

如果你希望在屏幕上显示log信息,打开一个新的终端:

java -cp $CP ReceiveLogs

发送消息

java -cp $CP EmitLog

如此一来,就能够存储消息的同时进行打印了。

相关推荐