玩一玩 webSocket 聊聊天
前言 websocket 是什么?
—— WebSockikevt是HTML5开始提供的一种浏览器与服务器间进行全双工通讯的网络技k'i术。在 WebSocket API 中,浏览器和服务器只需要要做一个握手的动作,然后,浏览器和服务器之间就形成了一条快速通道。两者之间就直接可以数据互相传送。
—— 比如说我们登录了公司公告平台、这时我们想看有没有新的通告信息在传统的http中我们就得手动刷新一次浏览器才能实现,而websocket从你登录成功之后打开了连接只要不关闭会话、只要服务端有新的消息返回你都能实时看到。
1. 导入maven坐标
<!-- webSocket-start -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<!-- webSocket-end -->
2.添加 webSocket 配置类
/**
* @author zhouhouxing
* @date In 2019,
*/
@Configuration
public class WebSocketAutoConfig {
@Bean
public ServerEndpointExporter serverEndpointExporterExporter() {
return new ServerEndpointExporter();
}
}
3. 编写配置类 - 用于用户最大心跳时间、休眠时间等
@Data
@Component
@ConfigurationProperties(prefix = "scaffold.websocket")
public class WebSocketProperties {
// 心跳允许间隔时间
private Long heartbeatAllowIntervalTime = 50000L;
// 心跳校验间隔时间
private Long heartbeatCheckIntervalTime = 50100L;
// 连接等待间隔时间
private Long wsWaitingIntervalTime = 2L;
}
4、最主要的部分 websocket 服务类
/**
* 链接打开地址 ws://localhost:prot/chat/JSNkd15684用户ID
*/
@Slf4j
@Component
@ServerEndpoint(value = "/chat/{clientId}", encoders = MessageEncoder.class, decoders = MessageDecoder.class)
public class WebSocketServer {
private String fromId;
private Thread localThread;
private final static ConcurrentHashMap<String, Session> wsSessionMap = new ConcurrentHashMap<>(64);
private final static ConcurrentHashMap<String, LocalDateTime> heartbeatKeepTimeMap
= new ConcurrentHashMap<>(64);
@Autowired
private PushRest pushRest;
Autowired
private WebSocketProperties webSocketProperties;
public WebSocketServer() {} // 公开构造子
@OnOpen
public void onOpen(@PathParam("clientId") String clientId, Session session) {
this.fromId = clientId;
// 连接打开 相当于一个心跳
heartbeatKeepTimeMap.put(Constant.WS_CLIENT_ + clientId, LocalDateTime.now());
// 只允许一个客户端登录 再次登录则关闭原有的连接
Session connectedSession = wsSessionMap.get(Constant.WS_CLIENT_ + clientId);
try {
if (!Objects.isNull(connectedSession)) {
connectedSession.close();
}
} catch (IOException e) {
e.printStackTrace();
} finally {
// 关闭session 设置为null 垃圾回收
connectedSession = null;
}
wsSessionMap.put(Constant.WS_CLIENT_ + clientId, session);
// 将用户存入 redis 在线用户列表
chatSessionService.setUserOnlineList(fromId); //
log.info("--------> [clientId={}]-已连接ws服务器!当前在线客户端为{}", clientId, wsSessionMap.size());
// 开启一个线程检测用户未读信息 (根据自身实际业务需求)
this.localThread = new Thread(new PushMsg(Long.valueOf(clientId)));
this.localThread.setName("push-" + RandomStringUtils.randomNumeric(10));
this.localThread.start();
}
/**
* 收到客户端消息后调用的方法
*
* @param message 客户端发送过来的消息
*/
@OnMessage
public void onMessage(@PathParam("clientId") String clientId, Session session, String message) {
HeartMessage wsMessage = null;
try {
wsMessage = JsonUtil.fromJson(message, HeartMessage.class);
} catch (Exception e) {
log.error("--------> 解析客户端消息失败,错误信息:{}", e.getMessage());
return;
}
WSMsgType wsMsgType = WSMsgType.contains(wsMessage.getType());
switch (wsMsgType) {
case WS_MSG_HEART:
log.info("--------> [clientId={}]-收到客户端心跳消息:{}", clientId, message);
handleHeartbeat(wsMessage, session);
break;
case WS_MSG_ACK:
log.info("--------> [clientId={}]-收到客户端ack消息:{}", clientId, message);
sendAckMessage(session, message);
break;
default:
log.error("--------> [clientId={}]-匹配不到消息类型,信息为:{}", clientId, message);
break;
}
}
// 发送ACK回应
private void sendAckMessage(Session session, String wsMsgResponse) {
sendMessage(session, wsMsgResponse);
}
// 处理业务消息
private void handleRequest(Message wsMessage, Session session) { // 处理业务逻辑}
// 处理心跳消息
private void handleHeartbeat(HeartMessage wsMessage, Session session) {
wsMessage.setTime(LocalDateTime.now().toString());
// 返回应答
wsMessage.setType(WSMsgType.WS_MSG_ACK.getValue());
String ackMsg = null;
try {
ackMsg = JsonUtil.toJson(wsMessage);
} catch (JsonProcessingException e) {
log.error("--------> 序列化心跳对象失败,错误信息:{}", e.getMessage());
return;
}
try {
session.getBasicRemote().sendText(ackMsg);
} catch (IOException e) {
log.error("--------> 发送心跳信息失败,错误信息:{}", e.getMessage());
}
}
/**
* 发生错误时调用
*/
@OnError
public void onError(Throwable error) {
// 不建议在发送错误调用时,关闭WebSocket操作。
log.error("--------> [clientId={}] - error 信息={} 链接超时或非正常关闭链接", fromId, error.getMessage());
}
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose() {
String clientId = this.fromId;
// 无论是客户端还是服务器主动调用关闭WebSocket, 都会调用该方法。这里可以做关闭后清理工作。
Session mapSession = wsSessionMap.get(Constant.WS_CLIENT_ + clientId);
if (mapSession != null) {
heartbeatKeepTimeMap.remove(Constant.WS_CLIENT_ + clientId);
wsSessionMap.remove(Constant.WS_CLIENT_ + clientId);
JedisUtil.delKey(Constant.PREFIX_USER_MESG_TOKEN + clientId);
try {
mapSession.close();
} catch (IOException e) {
e.printStackTrace();
}
}
log.info("--------> [clientId={}]-关闭链接、sessionSize -->[{}]", clientId, wsSessionMap.size());
}
public void sendMessage(Session session, String message) {
try {
if (!session.isOpen()) {
// 检测长连接是否正常
return;
}
session.getBasicRemote().sendText(message);
} catch (Exception e) {
log.error("------> { sendMessage({}) 抛出异常!【{}】}<<-------", session.getId(), e);
}
}
public static ConcurrentHashMap<String, LocalDateTime> getHeartbeatKeepTimeMap() {
return heartbeatKeepTimeMap;
}
public void checkSessionAliveAndRemove(String clientId, LocalDateTime heartbeatKeepTime, LocalDateTime now) {
// 时间差 单位毫秒
Long intervalTime = Duration.between(heartbeatKeepTime, now).toMillis();
if (intervalTime.compareTo(webSocketProperties.getHeartbeatAllowIntervalTime()) > 0) {
// 大于心跳允许间隔时间 证明该客户端已死
heartbeatKeepTimeMap.remove(clientId);
try {
wsSessionMap.get(clientId).close();
} catch (IOException e) {
log.error("------> { 检查用户({})心跳 抛出异常!【{}】}<<-------", clientId, e);
}
wsSessionMap.remove(clientId);
log.warn("--------> 移除 webSocket, clientId=", clientId);
}
}
/**
* 发送 未读信息
*
* @param toId
* @param entity
*/
public void sendOne(String toId, Message entity) {
Session session = wsSessionMap.get(Constant.WS_CLIENT_ + toId);
if (session == null) return;
try {
if (session.isOpen()) {
WebSocketServer socketServer = new WebSocketServer();
log.info(" 推送消息到窗口:" + toId + " ,推送内容:" + entity.getMessage());
socketServer.sendMessage(session, getData(toId, entity));
}
} catch (Exception e) {
log.error("------> { sendOne({}) 抛出异常!【{}】}", toId, e);
}
}
/**
* 指定窗口推送消息
*
* @param entity 推送消息
* @param toId 接收方ID
*/
public void sendTo(String toId, Message entity) {
Session session = wsSessionMap.get(Constant.WS_CLIENT_ + toId);
if (session == null || !session.isOpen()) {
log.info("------ >>[{}] 发送给用户:[{}] 当前不在线,信息:[{}] ,等待下次用户上线推送给他!", entity.getFrom().getId(), toId, entity.getMessage());
mesgService.insertMsg(entity.getFrom().getId(), entity.getTo().getId(), (String) entity.getMessage(), entity.getType());
// 存入 redis 历史记录
chatSessionService.pushMessage(entity.getFrom().getId() + "", toId, (String) entity.getMessage(), entity.getType());
if (JedisUtil.getJson(Constant.USER_CLIENT_ + toId) == null) return;
if (((String) entity.getMessage()).contains("http")) entity.setMessage("[图片消息]");
pushRest.pushToSingle(toId, "私信提醒", entity.getFrom().getName() + ":" + entity.getMessage());
return;
}
try {
WebSocketServer socketServer = new WebSocketServer();
log.info(entity.getFrom().getId() + " 推送消息到窗口:" + toId + " ,推送内容:" + entity.getMessage());
entity.setTo(chatSessionService.findById(null, toId));
entity.setFrom(chatSessionService.findById(toId, entity.getFrom().getId()+""));
entity.setTime(DateUtils.timeToStamp(DateUtils.converts(new Date())));
socketServer.sendMessage(session, getData(toId, entity));
} catch (Exception e) {
log.error("------> { sendTo({}) 抛出异常!【{}】}<<-------", toId, e);
}
chatSessionService.pushMessage(entity.getFrom().getId() + "", toId, (String) entity.getMessage(), entity.getType());
}
/**
* 封装返回消息
*
* @param toId 指定窗口ID
* @param message 消息内容
* @return
* @throws IOException
*/
private String getData(String toId, Message message) throws IOException {
return JSONObject.toJSONString(new ResponseBean(HttpStatus.OK.value(), "", message));
}
5. 因为消息对象是自定义的 所以要加上消息对象编码器
/**
* Created by zhouhouxing on In 2019,
* 消息编码转换类
*/
public class MessageEncoder implements javax.websocket.Encoder.Text<Message> {
@Override
public String encode(Message message) throws EncodeException {
return JSON.toJSONString(message);
}
@Override
public void init(EndpointConfig endpointConfig) {
}
@Override
public void destroy() {
}
}
/**
* Created by zhouhouxing on In 2019,
* 消息对象转换类
*/
public class MessageDecoder implements javax.websocket.Decoder.Text<Message> {
@Override
public Message decode(String message) throws DecodeException {
return JSON.parseObject(message, Message.class);
}
@Override
public boolean willDecode(String s) {
return true;
}
@Override
public void init(EndpointConfig endpointConfig) {
}
@Override
public void destroy() {
}
}
6. 消息类型枚举类
public enum WSMsgType {
WS_MSG_ACK(0, "应答消息"),
WS_MSG_HEART(1, "心跳消息类型"),
WS_MSG_REQUEST(2, "业务消息"),
;
public Integer value;
public String msg;
WSMsgType(Integer value, String msg) {
this.value = value;
this.msg = msg;
}
public Integer getValue() { return value; }
public String getMsg() { return msg; }
public static WSMsgType contains(Integer value) {
for (WSMsgType type : WSMsgType.values()) {
if (type.value.equals(value)) {
return type;
}
}
return null;
}
}
7. 业务消息实体
/**
* 会话消息实体
*
* @author zhouhouxing
* @date In 2019,
*/
public class Message implements Serializable {
/**
* 消息推送者
*/
private User from;
/**
* 消息内容
*/
private Object message;
/**
* 消息类型
*/
private int type;
/**
* 消息接收者:
* 如果是私有(向指定窗口推送),to即为接受者User对象
* 如果是公共消息(群组聊天),to设为null
*/
private User to;
/**
* 创建时间
*/
private String time;
/*public void setMessage(String message) {
this.message = message == null ? "" : message.replaceAll("\r\n|\r|\n", "");
}*/
}
心跳消息实体
/**
* @author Created by John on In 2019,
*/
public class HeartMessage implements Serializable {
private long uid;
private String message;
private int type;
private String time;
}
自此差不多完结了…
当然实际运用中复杂度相对高一点还需自身去丰富有任何更好的意见也可以跟博主交流。
谢谢各位的观看