mq预取机制

总括:

预取时生产者向消费者推预取的数量,消费完后再给,不消费玩不给(计划经济)(批量给相应的消费者)

预取为0时消费者主动获取

 ActiveMQ的一个主要的设计目标是:提供一个高性能的消息中间件。它使用了SEDA(Staged Event Driven Architecture)架构及异步传输。为了提供更高的性能,很重要的一点是 尽快地将消息传送给消费者,这样消费者利用消息缓冲区等待处理,而不是等待消息。
  然后,这样也有很大风险:不断地向 消费者 传送消息可能使得其消息缓冲溢出,因为传送的速度比消费者真正“消费”消息更快,这是很可能。

  因此,ActiveMQ使用了 消息”预取限制“(prefetch limit):表示在某个时间段内,可能向消费者传输的最大消息量,如果达到该上限,那么停止发送,直到ActiveMQ收到消费者的acknowledgements(确认,表示已经处理了该消息)。prefetch limit可以针对每个不同的consumer来设置。
  为了获取更高的性能,prefetch limit当然是越大越好,只要consumer有足够大的消息缓冲区(messagevolume)。如果消息的总量非常少,而且每个消息的处理时间非常的长,那么,可以将prefetch设置为1,这样,每次向consumer发送一个消息,等其确认已经处理完毕后,再发送第二个。
  特别地,如果prefetch设置为0,表示consumer每次 主动向activeMQ要求传输最大的数据量,而不是被动地接收消息。

如何指定prefetech的值:
  通过指定ActiveMQConnection或ActiveMQConnectionFactory的ActiveMQPretchPolicy来设置所有的pretch值:各种不同的消息服务都可以指定不同的值,如下:
   

  • persistent queues (default value: 1000)
  • non-persistent queues (default value: 1000)
  • persistent topics (default value: 100)
  • non-persistent topics (default value: Short.MAX_VALUE -1)

To change the prefetch size for all consumer types you would use a connection URI similar to:

tcp://localhost:61616?jms.prefetchPolicy.all=50

To change the prefetch size for just queue consumer types you would use a connection URI similar to:

tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=1

It can also be configured on a per consumer basis using Destination Options.

  1.  
    queue = new ActiveMQQueue("TEST.QUEUE?consumer.prefetchSize=10");
  2.  
    consumer = session.createConsumer(queue);

P

PPooled Connections and prefetch

当连接来自一个连接池中,消费消息可能出现一些由于“prefetch”而产生的问题:预取的消息(还未被处理)当连接关闭时会被释放(即,可以在activeMQ中再次读取到该消息)。而连接池中的连接只有在连接池关闭后才真正的销毁。这使得 预取的消息直到连接被重用时才会被处理(或者等连接池关闭,可再次从activeMQ中读取)。这样导致了消息可能丢失,或者当连接池中有多个连接时,消息乱序(out-of-sequence)!


一个处理方法是:将生产者放入连接池,消费者则不放入连接池!当然,这样在消费者的性能上,会受到影响(当有多个线程快速的消费消息时,对象被不断的创建销毁)。

另外一种方法:将消费者的连接池中数量设为1。

第三种方法:在使用连接池的情况下,将prefetch设为1或者0。当使用Spring JMS和MessageDrivenPojo时,只能将prefetch设为1,而不能为0;

相关推荐