本文作者小傅哥,原题“使用DDD+Netty,开发一个分布式IM(即时通信)系统”。为了提升阅读体验,即时通讯网有大量修订和改动,感谢原作者。
Netty 是一个 Java 开源框架。Netty 提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。 也就是说,Netty 是一个基于 NIO 的客户、服务器端编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户,服务端应用。 Netty 相当简化和流线化了网络应用的编程开发过程,例如,TCP 和 UDP 的 Socket 服务开发。
agreement └── src ├── main │ ├── java │ │ └── org.itstack.naive.chat │ │ ├── codec │ │ │ ├── ObjDecoder.java │ │ │ └── ObjEncoder.java │ │ ├── protocol │ │ │ ├── demo │ │ │ ├── Command.java │ │ │ └── Packet.java │ │ └── util │ │ └── SerializationUtil.java │ ├── resources │ │ └── application.yml │ └── webapp │ └── chat │ └── res │ └── index.html └── test └── java └── org.itstack.demo.test └── ApiTest.java
public abstract class Packet { private final static Map<Byte, Class<? extends Packet>> packetType = new ConcurrentHashMap<>(); static { packetType.put(Command.LoginRequest, LoginRequest.class); packetType.put(Command.LoginResponse, LoginResponse.class); packetType.put(Command.MsgRequest, MsgRequest.class); packetType.put(Command.MsgResponse, MsgResponse.class); packetType.put(Command.TalkNoticeRequest, TalkNoticeRequest.class); packetType.put(Command.TalkNoticeResponse, TalkNoticeResponse.class); packetType.put(Command.SearchFriendRequest, SearchFriendRequest.class); packetType.put(Command.SearchFriendResponse, SearchFriendResponse.class); packetType.put(Command.AddFriendRequest, AddFriendRequest.class); packetType.put(Command.AddFriendResponse, AddFriendResponse.class); packetType.put(Command.DelTalkRequest, DelTalkRequest.class); packetType.put(Command.MsgGroupRequest, MsgGroupRequest.class); packetType.put(Command.MsgGroupResponse, MsgGroupResponse.class); packetType.put(Command.ReconnectRequest, ReconnectRequest.class); } public static Class<? extends Packet> get(Byte command) { return packetType.get(command); } /** * 获取协议指令 * * @return 返回指令值 */ public abstract Byte getCommand(); }
public class AddFriendHandler extends MyBizHandler<AddFriendRequest> { public AddFriendHandler(UserService userService) { super(userService); } @Override public void channelRead(Channel channel, AddFriendRequest msg) { // 1. 添加好友到数据库中[A->B B->A] List<UserFriend> userFriendList = new ArrayList<>(); userFriendList.add(new UserFriend(msg.getUserId(), msg.getFriendId())); userFriendList.add(new UserFriend(msg.getFriendId(), msg.getUserId())); userService.addUserFriend(userFriendList); // 2. 推送好友添加完成 A UserInfo userInfo = userService.queryUserInfo(msg.getFriendId()); channel.writeAndFlush(new AddFriendResponse(userInfo.getUserId(), userInfo.getUserNickName(), userInfo.getUserHead())); // 3. 推送好友添加完成 B Channel friendChannel = SocketChannelUtil.getChannel(msg.getFriendId()); if (null == friendChannel) return; UserInfo friendInfo = userService.queryUserInfo(msg.getUserId()); friendChannel.writeAndFlush(new AddFriendResponse(friendInfo.getUserId(), friendInfo.getUserNickName(), friendInfo.getUserHead())); } }
public class MsgHandler extends MyBizHandler<MsgRequest> { public MsgHandler(UserService userService) { super(userService); } @Override public void channelRead(Channel channel, MsgRequest msg) { logger.info("消息信息处理:{}", JSON.toJSONString(msg)); // 异步写库 userService.asyncAppendChatRecord(new ChatRecordInfo(msg.getUserId(), msg.getFriendId(), msg.getMsgText(), msg.getMsgType(), msg.getMsgDate())); // 添加对话框[如果对方没有你的对话框则添加] userService.addTalkBoxInfo(msg.getFriendId(), msg.getUserId(), Constants.TalkType.Friend.getCode()); // 获取好友通信管道 Channel friendChannel = SocketChannelUtil.getChannel(msg.getFriendId()); if (null == friendChannel) { logger.info("用户id:{}未登录!", msg.getFriendId()); return; } // 发送消息 friendChannel.writeAndFlush(new MsgResponse(msg.getUserId(), msg.getMsgText(), msg.getMsgType(), msg.getMsgDate())); } }
// Channel 状态定时巡检;3 秒后每 5 秒执行一次 scheduledExecutorService.scheduleAtFixedRate(() -> {while (!nettyClient.isActive()) {System.out.println("通信管道巡检:通信管道状态" + nettyClient.isActive()); try {System.out.println("通信管道巡检:断线重连 [Begin]"); Channel freshChannel = executorService.submit(nettyClient).get(); if (null == CacheUtil.userId) continue; freshChannel.writeAndFlush(new ReconnectRequest(CacheUtil.userId)); } catch (InterruptedException | ExecutionException e) {System.out.println("通信管道巡检:断线重连 [Error]");} } }, 3, 5, TimeUnit.SECONDS);
来源:即时通讯网 - 即时通讯开发者社区!
轻量级开源移动端即时通讯框架。
快速入门 / 性能 / 指南 / 提问
轻量级Web端即时通讯框架。
详细介绍 / 精编源码 / 手册教程
移动端实时音视频框架。
详细介绍 / 性能测试 / 安装体验
基于MobileIMSDK的移动IM系统。
详细介绍 / 产品截图 / 安装体验
一套产品级Web端IM系统。
详细介绍 / 产品截图 / 演示视频
精华主题数超过100个。
连续任职达2年以上的合格正式版主
为论区做出突出贡献的开发者、版主等。
Copyright © 2014-2024 即时通讯网 - 即时通讯开发者社区 / 版本 V4.4
苏州网际时代信息科技有限公司 (苏ICP备16005070号-1)
Processed in 0.109375 second(s), 47 queries , Gzip On.