springboot整合websocket(详解、教程、代码)

springboot整合websocket(教程及代码)

大家好,我是酷酷的韩~

1.websocket定义

WebSocket协议是基于TCP的一种新的网络协议。它实现了浏览器与服务器全双工(full-duplex)通信——允许服务器主动发送信息给客户端。websocket 协议是在 http 协议上的一种补充协议,是 html5 的新特性,是一种持久化的协议。

2.websocket工作原理:

握手(建立连接)

web浏览器和服务器都必须使用websocket来建立维护连接,也可以理解为HTTP握手 (handshake)和TCP数据传输
(1)浏览器向http一样发起一个请求,等待服务器响应
(2)服务器返回握手响应,告诉浏览器将后续的数据按照websocket的制定格式传输过来
(3)服务器接收到了之后,服务器与浏览器之间连接不中断,此时连接已经是双工的了
(4)浏览器和服务器由任何需要传输的数据时使用长连接来进行数据传递;

数据传输

一旦WebSocket客户端、服务端建立连接后,后续的操作都是基于数据帧的传递。这样做有几个好处:
1.数据可以进行分片传输,每条消息可能被切分成多个数据帧,这样就不用考虑数据大小导致的长度标志位不足够的情况。
ps :FIN=1表示当前数据帧为消息的最后一个数据帧,此时接收方已经收到完整的消息,可以对消息进行处理。FIN=0,则接收方还需要继续监听接收其余的数据帧;
2. 和http的chunk一样,可以边生成数据边传递消息,即提高传输效率;

3.websocket优点:

(1)较少的控制开销。在连接创建后,服务器和客户端之间交换数据时,用于协议控制的数据包头部相对较小。在不包含扩展的情况下,对于服务器到客户端的内容,此头部大小只有2至10字节(和数据包长度有关);对于客户端到服务器的内容,此头部还需要加上额外的4字节的掩码。相对于HTTP请求每次都要携带完整的头部,此项开销显著减少了。
(2)更强的实时性。由于协议是全双工的,所以服务器可以随时主动给客户端下发数据。相对于HTTP请求需要等待客户端发起请求服务端才能响应,延迟明显更少;即使是和Comet等类似的长轮询比较,其也能在短时间内更多次地传递数据。
(3)保持连接状态。与HTTP不同的是,Websocket需要先创建连接,这就使得其成为一种有状态的协议,之后通信时可以省略部分状态信息。而HTTP请求可能需要在每个请求都携带状态信息(如身份认证等)。
(4)更好的二进制支持。Websocket定义了二进制帧,相对HTTP,可以更轻松地处理二进制内容。
(5)可以支持扩展。Websocket定义了扩展,用户可以扩展协议、实现部分自定义的子协议。如部分浏览器支持压缩等。
(6)更好的压缩效果。相对于HTTP压缩,Websocket在适当的扩展支持下,可以沿用之前内容的上下文,在传递类似的数据时,可以显著地提高压缩率。

4.websocket应用

(1)系统实时通告
(2)即时聊天
(3)弹幕
(4)实时数据更新:比如体育实况更新、股票基金报价实时更新
(5)代替轮询,提高效率。

5.springBoot集成

5.1pom依赖

  
  
      org.springframework.boot
      spring-boot-starter-websocket
  

5.2websocket配置类:

package com.loit.park.common.websocket;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

/**
 * @author hanjinqun
 * @date 2022/10/24
 */
@Configuration
public class WebSocketConfig {
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {

        return new ServerEndpointExporter();
    }
}

5.3websocket操作类

注意:@ServerEndpoint路径支持自定义

package com.loit.park.common.websocket;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;

/**
 * @author hanjinqun
 * @date 2022/10/24
 * websocket操作类
 */
@Component
@ServerEndpoint("/websocket/{userId}")
public class WebSocketServer {
    /**
     * 日志工具
     */
    private Logger logger = LoggerFactory.getLogger(this.getClass());
    /**
     * 与某个客户端的连接会话,需要通过它来给客户端发送数据
     */
    private Session session;
    /**
     * 用户id
     */
    private String userId;
    /**
     * 用来存放每个客户端对应的MyWebSocket对象
     */
    private static CopyOnWriteArraySet webSockets = new CopyOnWriteArraySet<>();
    /**
     * 用来存在线连接用户信息
     */
    private static ConcurrentHashMap sessionPool = new ConcurrentHashMap();

    /**
     * 链接成功调用的方法
     */
    @OnOpen
    public void onOpen(Session session, @PathParam(value = "userId") String userId) {
        try {
            this.session = session;
            this.userId = userId;
            webSockets.add(this);
            sessionPool.put(userId, session);
            logger.info("【websocket消息】有新的连接,总数为:" + webSockets.size());
        } catch (Exception e) {
        }
    }

    /**
     * 链接关闭调用的方法
     */
    @OnClose
    public void onClose() {
        try {
            webSockets.remove(this);
            sessionPool.remove(this.userId);
            logger.info("【websocket消息】连接断开,总数为:" + webSockets.size());
        } catch (Exception e) {
        }
    }

    /**
     * 收到客户端消息后调用的方法
     */
    @OnMessage
    public void onMessage(String message) {
        logger.info("【websocket消息】收到客户端消息:" + message);
    }

    /**
     * 发送错误时的处理
     *
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error) {
        logger.error("用户错误,原因:" + error.getMessage());
        error.printStackTrace();
    }

    /**
     * 此为广播消息
     */
    public void sendAllMessage(String message) {
        logger.info("【websocket消息】广播消息:" + message);
        for (WebSocketServer webSocket : webSockets) {
            try {
                if (webSocket.session.isOpen()) {
                    webSocket.session.getAsyncRemote().sendText(message);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 此为单点消息
     */
    public void sendOneMessage(String userId, String message) {
        Session session = sessionPool.get(userId);
        if (session != null && session.isOpen()) {
            try {
                logger.info("【websocket消息】 单点消息:" + message);
                session.getAsyncRemote().sendText(message);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 此为单点消息(多人)
     */
    public void sendMoreMessage(String[] userIds, String message) {
        for (String userId : userIds) {
            Session session = sessionPool.get(userId);
            if (session != null && session.isOpen()) {
                try {
                    logger.info("【websocket消息】 单点消息:" + message);
                    session.getAsyncRemote().sendText(message);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }

    }
}

重要类方法说明:
(1)@ServerEndpoint(“/api/pushMessage/{userId}”) 前端通过此 URI 和后端交互,建立连接
(2)@Component 不用说将此类交给 spring 管理
(3)@OnOpen websocket 建立连接的注解,前端触发上面 URI 时会进入此注解标注的方法
(4)@OnMessage 收到前端传来的消息后执行的方法
(5)@OnClose 顾名思义关闭连接,销毁 session

5.4前端测试代码




    
    websocket通讯




【socket开启者的ID信息】:

【客户端向服务器发送的内容】:

【操作】:

【操作】:

5.5模拟发送数据接口

package com.loit.park.modules.controller.websocket;

import com.loit.park.common.constant.BusinessConstants;
import com.loit.park.common.websocket.WebSocketServer;
import com.loit.test1.common.json.AjaxJson;
import com.loit.test1.common.json.LoitStatusMsg;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author hanjinqun
 * @date 2022/10/24
 * websocket接口
 */
@RestController
@RequestMapping(value = "/api/v1/websocket")
@Api(tags = "websocket接口", value = "AlarmDpController")
public class WebSocketController {
    @Autowired
    private WebSocketServer webSocketServer;

    /**
     * 模拟数据发送
     */
    @ApiOperation(value = "模拟数据发送", notes = "模拟数据发送")
    @ApiImplicitParams({
            @ApiImplicitParam(paramType = "query", name = "message", value = "模拟消息", required = true, dataType = "String"),
    })
    @RequestMapping(value = "/sendTestMessage", method = RequestMethod.GET)
    public AjaxJson sendTestMessage(String message) {
        AjaxJson ajaxJson = new AjaxJson();
        try {
            webSocketServer.sendAllMessage(message);
        } catch (Exception e) {
            e.printStackTrace();
            return AjaxJson.returnExceptionInfo(LoitStatusMsg.LOIT_USER_LOGIN_FAIL);
        }
        return ajaxJson;
    }
}

5.6模拟操作

亲爱的,你要努力,你想要的的,你要自己给自己。 ------酷酷的韩