分布式系统--封装Redis消息队列--消息队列下的异步场景

一、什么是消息队列?
1、消息就是数据。
2、队列有队尾和队头,队列有入队和出队,队列先进先出。
3、生产者
存数据入口
4、消费者
取数据入口

分布式系统--封装Redis消息队列--消息队列下的异步场景

二、推模型--发布订阅模型--阻塞

主动把消息推给订阅者。
数据实时要求高,用推。

三、拉模型--生产者消费者模型--非阻塞
消费者自己去拉取数据。
数据实时要求不高,用拉。

四、它有哪些优势?为什么使用它?
可以解决一些分布式场景,如:异步场景,应用解耦,流量削峰,今天讲讲解决异步场景。

五、同步场景

客户端发起一个请求,创建订单,创建完订单需要增加积分,然后发送短信,假设创建订单花费1s,增加积分花费1s,发送短信花费1s,实则花费了3s。

缺陷:耗时时间太长。

分布式系统--封装Redis消息队列--消息队列下的异步场景

分布式系统--封装Redis消息队列--消息队列下的异步场景

六、异步场景

如果在订单服务开启1个异步线程去处理发送短信服务,这样做会有下面的缺陷。

缺陷:1个客户端请求就是1个线程,在订单服务端开启的异步线程一多,就会导致订单服务端的线程数减少,间接会导致订单服务并发量降低。

分布式系统--封装Redis消息队列--消息队列下的异步场景

七、消息队列下的异步场景

 积分服务和短信服务订阅消息队列,消息队列推送消息到积分服务,短信服务,这样创建订完单响应给客户端只需要花费1s,但是又不会影响订单服务的并发量。

 分布式系统--封装Redis消息队列--消息队列下的异步场景

八、封装Redis消息队列,解决消息队列下的异步场景

1、封装Redis消息队列

namespace MyRedisUnitty
{
    /// <summary>
    /// 封装Redis消息队列
    /// </summary>
    public class RedisMsgQueueHelper : IDisposable
    {
        /// <summary>
        /// Redis客户端
        /// </summary>
        public RedisClient redisClient { get; }

        public RedisMsgQueueHelper(string redisHost)
        {
            redisClient = new RedisClient(redisHost);
        }

        /// <summary>
        /// 入队
        /// </summary>
        /// <param name="qKeys">入队key</param>
        /// <param name="qMsg">入队消息</param>
        /// <returns></returns>
        public long EnQueue(string qKey, string qMsg)
        {
            //1、编码字符串
            byte[] bytes = System.Text.Encoding.UTF8.GetBytes(qMsg);

            //2、Redis消息队列入队
            long count = redisClient.LPush(qKey, bytes);

            return count;
        }

        /// <summary>
        /// 出队(非阻塞) === 拉
        /// </summary>
        /// <param name="qKey">出队key</param>
        /// <returns></returns>
        public string DeQueue(string qKey)
        {
            //1、redis消息出队
            byte[] bytes = redisClient.RPop(qKey);
            string qMsg = null;
            //2、字节转string
            if (bytes == null)
            {
                Console.WriteLine($"{qKey}队列中数据为空");
            }
            else
            {
                qMsg = System.Text.Encoding.UTF8.GetString(bytes);
            }

            return qMsg;
        }

        /// <summary>
        /// 出队(阻塞) === 推
        /// </summary>
        /// <param name="qKey">出队key</param>
        /// <param name="timespan">阻塞超时时间</param>
        /// <returns></returns>
        public string DeQueueBlock(string qKey, TimeSpan? timespan)
        {
            // 1、Redis消息出队
            string qMsg = redisClient.BlockingPopItemFromList(qKey, timespan);

            return qMsg;
        }

        /// <summary>
        /// 获取队列数量
        /// </summary>
        /// <param name="qKey">队列key</param>
        /// <returns></returns>
        public long GetQueueCount(string qKey)
        {
            return redisClient.GetListCount(qKey);
        }

        /// <summary>
        /// 关闭Redis
        /// </summary>
        public void Dispose()
        {
            redisClient.Dispose();
        }
    }
}

2、订单服务发送积分消息和短信消息

namespace MyReidsMsgQueue.Async
{
    class OrderService
    {
        public string CreateOrder()
        {
            //统计时间
            Stopwatch stopwatch = new Stopwatch();
            stopwatch.Start();

            //1、订单号生成
            string orderNo = GetOrderGenrator();

            //1.1、模拟存储到数据库,需要花费1s时间
            Thread.Sleep(1000);
            Console.WriteLine($"订单:{orderNo}保存成功");

            //2、添加积分
            //Console.WriteLine($"*******************开始调用积分服务*******************");
            //PointsService pointsService = new PointsService();
            //pointsService.AddPoints(orderNo);
            //Console.WriteLine($"*******************积分服务调用完成*******************");

            ////3、发送短信
            //Console.WriteLine($"*******************开始调用短信服务*******************");
            //SmsService smsService = new SmsService();
            //smsService.SendSms(orderNo);
            //Console.WriteLine($"*******************短信服务调用完成*******************");

            //Redis优化
            using (var msgQueue = new RedisMsgQueueHelper("localhost:6379"))
            {
                // 1、发送积分消息
                msgQueue.EnQueue("My_Points", orderNo);

                // 2、发送短信消息
                msgQueue.EnQueue("My_Sms", orderNo);
            }

            stopwatch.Stop();
            Console.WriteLine($"订单完成耗时:{stopwatch.ElapsedMilliseconds}ms");

            return orderNo;
        }

        /// <summary>
        /// 订单号生成器
        /// </summary>
        /// <returns></returns>
        private string GetOrderGenrator()
        {
            Random ran = new Random();
            return "O-" + DateTime.Now.ToString("yyyyMMddHHmmssfff") + ran.Next(1000, 9999).ToString();
        }
    }
}

3、消费积分消息

namespace MyRedisPoints
{
    class Program
    {
        static void Main(string[] args)
        {
            //Redis优化
            using (var msgQueue = new RedisMsgQueueHelper("localhost:6379"))
            {
                Console.WriteLine("积分消息......");
                //1、获取积分消息--反复消费
                while (true)
                {
                    string msgPoints = msgQueue.DeQueueBlock("My_Points", TimeSpan.FromSeconds(60));

                    if (msgPoints != null)
                    {
                        //2、添加积分
                        PointsService pointsService = new PointsService();
                        pointsService.AddPoints(msgPoints);
                    }
                }
            }
        }
    }
}
amespace MyRedisPoints.Async
{
    public class PointsService
    {
        public void AddPoints(string orderNo)
        {
            //1、模拟积分添加到数据库,需要花费1s时间
            Thread.Sleep(1000);
           
            Console.WriteLine($"增加积分:orderNo:{orderNo}成功");
        }
    }
}

4、消费短信消息

namespace MyRedisSms
{
    class Program
    {
        static void Main(string[] args)
        {
            //Redis优化
            using (var msgQueue = new RedisMsgQueueHelper("localhost:6379"))
            {
                Console.WriteLine("短信消息......");
                //1、获取短信消息--反复消费
                while (true)
                {
                    string msgSms = msgQueue.DeQueueBlock("My_Sms", TimeSpan.FromSeconds(60));

                    if (msgSms != null)
                    {
                        //2、发送短信
                        SmsService smsService = new SmsService();
                        smsService.SendSms(msgSms);
                    }
                }
            }
        }
    }
}
namespace MyRedisSms.Async
{
    public class SmsService
    {
        public void SendSms(string orderNo)
        {
            //1、模拟调用第三方短信接口发送短信,需要花费1s时间
            Thread.Sleep(1000);

            Console.WriteLine($"发送短信:orderNo:{orderNo}成功");
        }
    }
}

十、客户端调用订单服务

namespace MyReidsMsgQueue
{
    class Program
    {
        static void Main(string[] args)
        {
            #region 异步处理
            {
                OrderService orderService = new OrderService();
                orderService.CreateOrder();
            }
            #endregion

            Console.ReadKey();
        }
    }
}

十一、运行效果

分布式系统--封装Redis消息队列--消息队列下的异步场景

分布式系统--封装Redis消息队列--消息队列下的异步场景

十二、项目结构

MyReidsMsgQueue里的PointsService.cs和SmsService.cs是为了演示同步场景订单服务消耗的时间

分布式系统--封装Redis消息队列--消息队列下的异步场景

 思考:从队列中取出积分消息,去添加积分失败了怎么办?也就是消费消息失败了怎么办?

相关推荐