分布式系统--封装Redis消息队列--消息队列下的异步场景
一、什么是消息队列?
1、消息就是数据。
2、队列有队尾和队头,队列有入队和出队,队列先进先出。
3、生产者
存数据入口
4、消费者
取数据入口

二、推模型--发布订阅模型--阻塞
主动把消息推给订阅者。
数据实时要求高,用推。
三、拉模型--生产者消费者模型--非阻塞
消费者自己去拉取数据。
数据实时要求不高,用拉。
四、它有哪些优势?为什么使用它?
可以解决一些分布式场景,如:异步场景,应用解耦,流量削峰,今天讲讲解决异步场景。
五、同步场景
客户端发起一个请求,创建订单,创建完订单需要增加积分,然后发送短信,假设创建订单花费1s,增加积分花费1s,发送短信花费1s,实则花费了3s。
缺陷:耗时时间太长。


六、异步场景
如果在订单服务开启1个异步线程去处理发送短信服务,这样做会有下面的缺陷。
缺陷:1个客户端请求就是1个线程,在订单服务端开启的异步线程一多,就会导致订单服务端的线程数减少,间接会导致订单服务并发量降低。

七、消息队列下的异步场景
积分服务和短信服务订阅消息队列,消息队列推送消息到积分服务,短信服务,这样创建订完单响应给客户端只需要花费1s,但是又不会影响订单服务的并发量。

八、封装Redis消息队列,解决消息队列下的异步场景
1、封装Redis消息队列
namespace MyRedisUnitty
{
/// <summary>
/// 封装Redis消息队列
/// </summary>
public class RedisMsgQueueHelper : IDisposable
{
/// <summary>
/// Redis客户端
/// </summary>
public RedisClient redisClient { get; }
public RedisMsgQueueHelper(string redisHost)
{
redisClient = new RedisClient(redisHost);
}
/// <summary>
/// 入队
/// </summary>
/// <param name="qKeys">入队key</param>
/// <param name="qMsg">入队消息</param>
/// <returns></returns>
public long EnQueue(string qKey, string qMsg)
{
//1、编码字符串
byte[] bytes = System.Text.Encoding.UTF8.GetBytes(qMsg);
//2、Redis消息队列入队
long count = redisClient.LPush(qKey, bytes);
return count;
}
/// <summary>
/// 出队(非阻塞) === 拉
/// </summary>
/// <param name="qKey">出队key</param>
/// <returns></returns>
public string DeQueue(string qKey)
{
//1、redis消息出队
byte[] bytes = redisClient.RPop(qKey);
string qMsg = null;
//2、字节转string
if (bytes == null)
{
Console.WriteLine($"{qKey}队列中数据为空");
}
else
{
qMsg = System.Text.Encoding.UTF8.GetString(bytes);
}
return qMsg;
}
/// <summary>
/// 出队(阻塞) === 推
/// </summary>
/// <param name="qKey">出队key</param>
/// <param name="timespan">阻塞超时时间</param>
/// <returns></returns>
public string DeQueueBlock(string qKey, TimeSpan? timespan)
{
// 1、Redis消息出队
string qMsg = redisClient.BlockingPopItemFromList(qKey, timespan);
return qMsg;
}
/// <summary>
/// 获取队列数量
/// </summary>
/// <param name="qKey">队列key</param>
/// <returns></returns>
public long GetQueueCount(string qKey)
{
return redisClient.GetListCount(qKey);
}
/// <summary>
/// 关闭Redis
/// </summary>
public void Dispose()
{
redisClient.Dispose();
}
}
}2、订单服务发送积分消息和短信消息
namespace MyReidsMsgQueue.Async
{
class OrderService
{
public string CreateOrder()
{
//统计时间
Stopwatch stopwatch = new Stopwatch();
stopwatch.Start();
//1、订单号生成
string orderNo = GetOrderGenrator();
//1.1、模拟存储到数据库,需要花费1s时间
Thread.Sleep(1000);
Console.WriteLine($"订单:{orderNo}保存成功");
//2、添加积分
//Console.WriteLine($"*******************开始调用积分服务*******************");
//PointsService pointsService = new PointsService();
//pointsService.AddPoints(orderNo);
//Console.WriteLine($"*******************积分服务调用完成*******************");
////3、发送短信
//Console.WriteLine($"*******************开始调用短信服务*******************");
//SmsService smsService = new SmsService();
//smsService.SendSms(orderNo);
//Console.WriteLine($"*******************短信服务调用完成*******************");
//Redis优化
using (var msgQueue = new RedisMsgQueueHelper("localhost:6379"))
{
// 1、发送积分消息
msgQueue.EnQueue("My_Points", orderNo);
// 2、发送短信消息
msgQueue.EnQueue("My_Sms", orderNo);
}
stopwatch.Stop();
Console.WriteLine($"订单完成耗时:{stopwatch.ElapsedMilliseconds}ms");
return orderNo;
}
/// <summary>
/// 订单号生成器
/// </summary>
/// <returns></returns>
private string GetOrderGenrator()
{
Random ran = new Random();
return "O-" + DateTime.Now.ToString("yyyyMMddHHmmssfff") + ran.Next(1000, 9999).ToString();
}
}
}3、消费积分消息
namespace MyRedisPoints
{
class Program
{
static void Main(string[] args)
{
//Redis优化
using (var msgQueue = new RedisMsgQueueHelper("localhost:6379"))
{
Console.WriteLine("积分消息......");
//1、获取积分消息--反复消费
while (true)
{
string msgPoints = msgQueue.DeQueueBlock("My_Points", TimeSpan.FromSeconds(60));
if (msgPoints != null)
{
//2、添加积分
PointsService pointsService = new PointsService();
pointsService.AddPoints(msgPoints);
}
}
}
}
}
}amespace MyRedisPoints.Async
{
public class PointsService
{
public void AddPoints(string orderNo)
{
//1、模拟积分添加到数据库,需要花费1s时间
Thread.Sleep(1000);
Console.WriteLine($"增加积分:orderNo:{orderNo}成功");
}
}
}4、消费短信消息
namespace MyRedisSms
{
class Program
{
static void Main(string[] args)
{
//Redis优化
using (var msgQueue = new RedisMsgQueueHelper("localhost:6379"))
{
Console.WriteLine("短信消息......");
//1、获取短信消息--反复消费
while (true)
{
string msgSms = msgQueue.DeQueueBlock("My_Sms", TimeSpan.FromSeconds(60));
if (msgSms != null)
{
//2、发送短信
SmsService smsService = new SmsService();
smsService.SendSms(msgSms);
}
}
}
}
}
}namespace MyRedisSms.Async
{
public class SmsService
{
public void SendSms(string orderNo)
{
//1、模拟调用第三方短信接口发送短信,需要花费1s时间
Thread.Sleep(1000);
Console.WriteLine($"发送短信:orderNo:{orderNo}成功");
}
}
}十、客户端调用订单服务
namespace MyReidsMsgQueue
{
class Program
{
static void Main(string[] args)
{
#region 异步处理
{
OrderService orderService = new OrderService();
orderService.CreateOrder();
}
#endregion
Console.ReadKey();
}
}
}十一、运行效果


十二、项目结构
MyReidsMsgQueue里的PointsService.cs和SmsService.cs是为了演示同步场景订单服务消耗的时间

思考:从队列中取出积分消息,去添加积分失败了怎么办?也就是消费消息失败了怎么办?