rabbitmq 延时队列
前言
某个产品 或者订单,有个有效期 过了有效期要取消
方法一 : 写个脚本,用crontab 定时扫描 改变状态 但是最低只能一分钟 ,不适合
方法二 : 用swoole得毫秒定时器,每秒钟去扫描表 明显占用资源 mysql受不了
方法三 :用rabbitmq延时队列 一开始将其丢入mq 死信队列,设置有效期,过时转发到其他队列,再启动一个消费者 消费 更改表状态
php 安装mq扩展
https://www.cnblogs.com/brady-wang/p/7662393.html
搭建mq服务
https://www.cnblogs.com/brady-wang/p/7660174.html
创建生产者和消费者
生产者 publish.php
<?php
header(‘Content-Type:text/html;charset=utf8;‘);
$time = 30;
$params = array(
‘exchangeName‘ => ‘test_cache_exchange‘."_".$time,
‘queueName‘ => ‘test_cache_queue‘."_".$time,
‘routeKey‘ => ‘test_cache_route‘."_".$time,
);
$connectConfig = array(
‘host‘ => ‘127.0.0.1‘,
‘port‘ => 5672,
‘login‘ => ‘admin‘,
‘password‘ => ‘password‘,
‘vhost‘ => ‘/‘
);
//var_dump(extension_loaded(‘amqp‘));
//
//exit();
try {
$conn = new AMQPConnection($connectConfig);
$conn->connect();
if (!$conn->isConnected()) {
//die(‘Conexiune esuata‘);
//TODO 记录日志
echo ‘rabbit-mq 连接错误:‘, json_encode($connectConfig);
exit();
}
$channel = new AMQPChannel($conn);
if (!$channel->isConnected()) {
// die(‘Connection through channel failed‘);
//TODO 记录日志
echo ‘rabbit-mq Connection through channel failed:‘, json_encode($connectConfig);
exit();
}
$exchange = new AMQPExchange($channel);
$exchange->setFlags(AMQP_DURABLE);//持久化
$exchange->setName($params[‘exchangeName‘]?:‘‘);
$exchange->setType(AMQP_EX_TYPE_DIRECT); //direct类型
$exchange->declareExchange();
//$channel->startTransaction();
$queue = new AMQPQueue($channel);
$queue->setName($params[‘queueName‘]?:‘‘);
$queue->setFlags(AMQP_DURABLE);
// 和普通生产者区别 在这 下面是过期时间和转发到的路由
$queue->setArguments(array(
‘x-dead-letter-exchange‘ => ‘delay_exchange‘,
‘x-dead-letter-routing-key‘ => ‘delay_route‘,
‘x-message-ttl‘ => $time*1000,
));
$queue->declareQueue();
//绑定
$queue->bind($params[‘exchangeName‘], $params[‘routeKey‘]);
} catch(Exception $e) {
}
//$num = mt_rand(100, 500);
$num = 1;
//生成消息
$exchange->publish(date("Y-m-d H:i:s"), $params[‘routeKey‘], AMQP_MANDATORY, array(‘delivery_mode‘=>2));消费者 consumer.php
<?php
header(‘Content-Type:text/html;charset=utf8;‘);
$params = array(
‘exchangeName‘ => ‘delay_exchange‘,
‘queueName‘ => ‘delay_queue‘,
‘routeKey‘ => ‘delay_route‘,
);
$connectConfig = array(
‘host‘ => ‘localhost‘,
‘port‘ => 5672,
‘login‘ => ‘admin‘,
‘password‘ => ‘password‘,
‘vhost‘ => ‘/‘
);
//var_dump(extension_loaded(‘amqp‘));
//exit();
try {
$conn = new AMQPConnection($connectConfig);
$conn->connect();
if (!$conn->isConnected()) {
//die(‘Conexiuneesuata‘);
//TODO记录日志
echo ‘rabbit-mq连接错误:‘, json_encode($connectConfig);
exit();
}
$channel = new AMQPChannel($conn);
if (!$channel->isConnected()) {
//die(‘Connectionthroughchannelfailed‘);
//TODO记录日志
echo ‘rabbit-mqConnectionthroughchannelfailed:‘, json_encode($connectConfig);
exit();
}
$exchange = new AMQPExchange($channel);
$exchange->setFlags(AMQP_DURABLE);//声明一个已存在的交换器的,如果不存在将抛出异常,这个一般用在consume端
$exchange->setName($params[‘exchangeName‘] ?: ‘‘);
$exchange->setType(AMQP_EX_TYPE_DIRECT);//direct类型
$exchange->declareExchange();
//$channel->startTransaction();
$queue = new AMQPQueue($channel);
$queue->setName($params[‘queueName‘] ?: ‘‘);
$queue->setFlags(AMQP_DURABLE);
$queue->declareQueue();
//绑定
$queue->bind($params[‘exchangeName‘], $params[‘routeKey‘]);
} catch (Exception$e) {
echo $e->getMessage();
exit();
}
function callback(AMQPEnvelope $message){
global $queue;
if ($message) {
$body = $message->getBody();
echo $body . PHP_EOL;
$queue->ack($message->getDeliveryTag());
} else {
echo ‘nomessage‘ . PHP_EOL;
}
}
//$queue->consume(‘callback‘);第一种消费方式,但是会阻塞,程序一直会卡在此处
//第二种消费方式,非阻塞
$start = time();
while (true) {
$message = $queue->get();
if (!empty($message)) {
echo $message->getBody()."--失效时间 ".date("Y-m-d H:i:s"). PHP_EOL;
$queue->ack($message->getDeliveryTag());//应答,代表该消息已经消费
// $end = time();
// echo ‘<br>‘ . ($end - $start);
} else {
//echo‘messagenotfound‘.PHP_EOL;
}
}执行推送 我改了不同时间推送,会生成不同的交换机 路由 队列,因为我用得是direct类型 要一一匹配

消费者开启
[ mq]# php consumer.php 2020-07-18 11:07:22--失效时间 2020-07-18 11:07:42 2020-07-18 11:07:22--失效时间 2020-07-18 11:07:42 2020-07-18 11:07:23--失效时间 2020-07-18 11:07:43 2020-07-18 11:07:23--失效时间 2020-07-18 11:07:43 2020-07-18 11:13:04--失效时间 2020-07-18 11:13:24 2020-07-18 11:21:00--失效时间 2020-07-18 11:21:10 2020-07-18 11:21:32--失效时间 2020-07-18 11:22:02 2020-07-18 11:21:32--失效时间 2020-07-18 11:22:02 2020-07-18 11:21:22--失效时间 2020-07-18 11:22:12 2020-07-18 11:21:23--失效时间 2020-07-18 11:22:13 2020-07-18 11:21:23--失效时间 2020-07-18 11:22:13
发现正常,都是我设置的事件过期后就到处理队列,在这里消费,处理逻辑即可
参考 https://www.cnblogs.com/Zhangcsc/p/11739754.html
https://blog.csdn.net/weixin_34310369/article/details/92262465?utm_medium=distribute.pc_relevant.none-task-blog-baidujs-2
相关推荐
程序员伊成 2020-08-06
waitzkj 2020-07-25
powrexly 2020-07-20
shenzhenzsw 2020-06-21
Soongp 2020-06-07
waitzkj 2020-06-04
cj0 2020-06-01
zhuxue 2020-10-14
shenzhenzsw 2020-10-09
shyoldboy 2020-09-27
leihui00 2020-09-16
lishijian 2020-08-17
ljcsdn 2020-07-27
liym 2020-07-20
zhoucheng0 2020-07-19
shenzhenzsw 2020-07-18
woaishanguosha 2020-07-18
zhoucheng0 2020-07-08