websocket整合rabbitMQ——实现消息推送

一、实现功能:当前页面点击请求后,另一个页面立即收到一条消息,用springboot+websocket+rabbitMQ实现。

二、java代码:

1、先在pom里导入需要的坐标

<!--rabbitMQ-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
            <version>2.1.4.RELEASE</version>
        </dependency>

        <!--webSocket-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
            <version>2.1.4.RELEASE</version>
        </dependency>
<!--lombok,可用于log输出到控制台(和System.out.println()一样)--><dependency>    <groupId>org.projectlombok</groupId>    <artifactId>lombok</artifactId>    <version>2.1.4.RELEASE</version></dependency>

2、编写websocket配置类(这个配置除了监听队列的名字外,以后回来直接Ctrl+C)

@Slf4j
@ServerEndpoint("/websocket")//把当前类标识成一个WebSocket的服务端,值是访问的URL地址
@Component//spring注入
public class WebSocketServer {

    //与某个客户端的连接会话,需要通过它来给客户端发送数据
    private Session session;

    //存放session的集合,很重要!!
    private static CopyOnWriteArraySet<WebSocketServer> sessions = new CopyOnWriteArraySet<WebSocketServer>();

    //用于存所有的连接服务的客户端,这个对象存储是安全的(因为HashMap不支持线程同步)
    private static ConcurrentHashMap<String, WebSocketServer> webSocketSet = new ConcurrentHashMap<>();


    //监听队列,从队列中把刚发送的消息取出来
    @RabbitListener(queues = "pointQueue")
    public void getMessAge(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {

        System.out.println("发来消息:" + message);

        channel.basicAck(tag, false);//确认收到--消费

        //用来判断session中是否存在数据
        if (sessions.size() != 0) {
            for (WebSocketServer s : sessions) {
                if (s != null) {
                    s.session.getBasicRemote().sendText(message);//向已连接客户端发送信息
                }
            }
        }
    }


    //连接成功
    @OnOpen
    public void onOpen(Session session) {
        this.session = session;
        //这个一定要写,第一次很容易忽略!
        sessions.add(this);
        log.info("[WebSocket] 连接成功,当前连接人数为:={}", webSocketSet.size());

    }

    //连接断开
    @OnClose
    public void onClose() {
        //释放
        sessions.remove(this);
        log.info("[WebSocket] 退出成功,当前连接人数为:={}", webSocketSet.size());
    }

    //收到消息
    @OnMessage
    public String onMessage(String message) {
        log.info("[WebSocket] 收到消息:{}", message);

        //这里是自己测试的,可以忽略
        if (message.equals("一只羊")) {//不能用==
            return "1:1个大白羊";
        }
        if (message.equals("两只羊")) {
            return "2:2个大白羊";
        }
        if (message.equals("三只羊")) {
            return "3:个大白羊";
        }

        return "你已成功连接,这是webSocket服务端的返回信息!";
    }

}

3、rabbitMQ创建队列与交换机绑定,这里不记录了,主要是在里面加上这一条

//websocket 放入到spring容器
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }

二、Html代码:注意这里是接收页面的js代码!!!

<script type="text/javascript">

    //设置一个变量,用于存储websocket
    var socket = null;

    //判断当前浏览器是否支持WebSocket
    if (‘WebSocket‘ in window) {
        //alert(‘当前浏览器支持WebSocket‘)
        //实现化WebSocket对象,指定要连接的服务器地址与端口  建立连接        //这个url必须写对!ws固定的,ip是自己配置的,路径是自己命名的
        socket = new WebSocket("ws://127.0.0.1:7777/websocket");
    }
    else {
        alert(‘当前浏览器 Not support websocket‘)
    }


    //连接成功(回调方法)
    socket.onopen = function () {
        alert("Socket 连接成功");
    }

    //连接错误(回调方法)
    socket.onerror = function () {
        alert(‘连接出错‘)
        //可尝试刷新页面--循环
        //window.location.reload();
    };

    //接收消息(回调方法)
    socket.onmessage = function (event) {
        alert("你收到了一条返回消息:"+event.data);
    }

    //连接关闭(回调方法)
    socket.onclose = function () {
        alert("连接关闭");
    }

    //窗口关闭,关闭websocket(否则server端会抛异常)
    window.onbeforeunload = function () {
        socket.close();
    }
    

</script>

 

四、总结:

到这里最核心的东西已经搭建完了,但肯定没有写全,不是自己不想写,因为一些简单枯燥的东西不必要罗列出来,否则杂多不知其意。

所以放在最后用最简洁的语言,描述大量的简单代码是最高效的:

  1. 首先创建一个发送消息的html页面,用简单的http请求向一个controller,如ajax/axios,把想要发送的消息作为参数发送过去。
  2. controller接收到消息参数后,调用rabbitMQ的方法放入队列,此时候监听这个队列的方法立马取出队列中的消息(并消费)。
  3. 监听队列方法中,用session.getBasicRemote().sendText("你的消息"); 发送给已连接websocket的接收页面,这时页面就会alert()弹出你发送页面发送的消息。