默认
打赏 发表评论 42
想开发IM:买成品怕坑?租第3方怕贵?找开源自已撸?尽量别走弯路了... 找站长给点建议
跟着源码学IM(三):基于Netty,从零开发一个IM服务端
微信扫一扫关注!

本文由“yuanrw”分享,职业:Java工程师,博客:juejin.im/user/5cefab8451882510eb758606,即时通讯网收录时内容有改动和修订。


0、引言


站长提示:本文适合IM新手阅读,但最好有一定的网络编程经验,必竟实践性的代码上手就是网络编程。如果你对网络编程,以及IM的一些理论知识知之甚少,请务必首先阅读:新手入门一篇就够:从零开发移动端IM》,该文为IM小白分类整理了详尽的理论资料,请按需补充相关知识。

配套源码:本文写的比较浅显但不太易懂,建议结合代码一起来读,文章配套的完整源码 请从本文文末 “11、完整源码下载” 处下载

1、内容概述


首先讲讲IM(即时通讯)技术可以用来做什么:

  • 1)聊天:qq、微信;
  • 2)直播:斗鱼直播、抖音;
  • 3)实时位置共享、游戏多人互动等等。

可以说几乎所有高实时性的应用场景都需要用到IM技术。

本篇将带大家从零开始搭建一个轻量级的IM服务端。

麻雀虽小,五脏俱全,我们搭建的IM服务端实现以下功能:

  • 1)一对一的文本消息、文件消息通信;
  • 2)每个消息有“已发送”/“已送达”/“已读”回执;
  • 3)存储离线消息;
  • 4)支持用户登录,好友关系等基本功能;
  • 5)能够方便地水平扩展。

通过这个项目能学到很多后端必备知识:

  • 1)rpc通信;
  • 2)数据库;
  • 3)缓存;
  • 4)消息队列;
  • 5)分布式、高并发的架构设计;
  • 6)docker部署。

2、相关文章


本文是系列文章中的第3篇:


相关IM架构方面的文章:


3、消息通信


3.1文本消息


我们先从最简单的特性开始实现:一个普通消息的发送。

消息格式如下:
message ChatMsg{
    id = 1;
    //消息id
    fromId = Alice
    //发送者userId
    destId = Bob
    //接收者userId
    msgBody = hello
    //消息体
}

1.png

如上图,我们现在有两个用户:Alice和Bob连接到了服务器,当Alice发送消息message(hello)给Bob,服务端接收到消息,根据消息的destId进行转发,转发给Bob。

3.2发送回执


那我们要怎么来实现回执的发送呢?

我们定义一种回执数据格式ACK,MsgType有三种,分别是sent(已发送),delivered(已送达), read(已读)。

消息格式如下:
message AckMsg {
    id;
    //消息id
    fromId;
    //发送者id
    destId;
    //接收者id
    msgType;
    //消息类型
    ackMsgId;
    //确认的消息id
}

enum MsgType {
    DELIVERED;
    READ;
}

当服务端接受到Alice发来的消息时:

1)向Alice发送一个sent(hello)表示消息已经被发送到服务器:
message AckMsg {
    id = 2;
    fromId = Alice;
    destId = Bob;
    msgType = SENT;
    ackMsgId = 1;
}

2.png

2)服务器把hello转发给Bob后,立刻向Alice发送delivered(hello)表示消息已经发送给Bob:
message AckMsg {
    id = 3;
    fromId = Bob;
    destId = Alice;
    msgType = DELIVERED;
    ackMsgId = 1;
}

3.png

3)Bob阅读消息后,客户端向服务器发送read(hello)表示消息已读:
message AckMsg {
    id = 4;
    fromId = Bob;
    destId = Alice;
    msgType = READ;
    ackMsgId = 1;
}

4.png

这个消息会像一个普通聊天消息一样被服务器处理,最终发送给Alice。

在服务器这里不区分ChatMsg和AckMsg,处理过程都是一样的:解析消息的destId并进行转发。

4、水平扩展


当用户量越来越大,必然需要增加服务器的数量,用户的连接被分散在不同的机器上。此时,就需要存储用户连接在哪台机器上。

我们引入一个新的模块来管理用户的连接信息。

4.1管理用户状态


5.png

模块叫做user status,共有三个接口:
public interface UserStatusService {

    /**
     * 用户上线,存储userId与机器id的关系
     *
     * @param userId
     * @param connectorId
     * @return 如果当前用户在线,则返回他连接的机器id,否则返回null
     */
    String online(String userId, String connectorId);

    /**
     * 用户下线
     *
     * @param userId
     */
    void offline(String userId);

    /**
     * 通过用户id查找他当前连接的机器id
     *
     * @param userId
     * @return
     */
    String getConnectorId(String userId);
}

这样我们就能够对用户连接状态进行管理了,具体的实现应考虑服务的用户量、期望性能等进行实现。

此处我们使用redis来实现,将userId和connectorId的关系以key-value的形式存储。

4.2消息转发


除此之外,还需要一个模块在不同的机器上转发消息,如下结构:
6.png

此时我们的服务被拆分成了connector和transfer两个模块,connector模块用于维持用户的长链接,而transfer的作用是将消息在多个connector之间转发。

现在Alice和Bob连接到了两台connector上,那么消息要如何传递呢?

1)Alice上线,连接到机器[1]上时:

  • 1.1)将Alice和它的连接存入内存中。
  • 1.2)调用user status的online方法记录Alice上线。

2)Alice发送了一条消息给Bob:

  • 2.1)机器[1]收到消息后,解析destId,在内存中查找是否有Bob。
  • 2.2)如果没有,代表Bob未连接到这台机器,则转发给transfer。

3)transfer调用user status的getConnectorId(Bob)方法找到Bob所连接的connector,返回机器[2],则转发给机器[2]。

流程图:
7.png

4.3总结


引入user status模块管理用户连接,transfer模块在不同的机器之间转发,使服务可以水平扩展。为了满足实时转发,transfer需要和每台connector机器都保持长链接。

5、离线消息


如果用户当前不在线,就必须把消息持久化下来,等待用户下次上线再推送,这里使用mysql存储离线消息。

为了方便地水平扩展,我们使用消息队列进行解耦:

  • 1)transfer接收到消息后如果发现用户不在线,就发送给消息队列入库;
  • 2)用户登录时,服务器从库里拉取离线消息进行推送。

6、用户登录、好友关系


用户的注册登录、账户管理、好友关系链等功能更适合使用http协议,因此我们将这个模块做成一个restful服务,对外暴露http接口供客户端调用。

至此服务端的基本架构就完成了:
8.png

7、中场休息 ... ...


本文以上内容,本篇帮大家构建了IM服务端的架构,但还有很多细节需要我们去思考。

例如:

  • 1)如何保证消息的顺序和唯一
  • 2)多个设备在线如何保证消息一致性
  • 3)如何处理消息发送失败
  • 4)消息的安全性
  • 5)如果要存储聊天记录要怎么做
  • 6)数据库分表分库
  • 7)服务高可用
  • ……

更多细节实现请继续读下半部分啦~

8、可靠性


什么是可靠性?对于一个IM系统来说,可靠的定义至少是不丢消息、消息不重复、不乱序,满足这三点,才能说有一个好的聊天体验。

8.1不丢消息


我们先从不丢消息开始讲起。

首先复习一下上面章节中设计的服务端架构:
1.png

我们先从一个简单例子开始思考:当Alice给Bob发送一条消息时,可能要经过这样一条链路:
2.png
  • 1)client-->connecter
  • 2)connector-->transfer
  • 3)transfer-->connector
  • 4)connector-->client


在这整个链路中的每个环节都有可能出问题,虽然tcp协议是可靠的,但是它只能保证链路层的可靠,无法保证应用层的可靠。

例如在第一步中,connector收到了从client发出的消息,但是转发给transfer失败,那么这条消息Bob就无法收到,而Alice也不会意识到消息发送失败了。

如果Bob状态是离线,那么消息链路就是:

  • 1)client-->connector
  • 2)connector-->transfer
  • 3)transfer-->mq

如果在第三步中,transfer收到了来自connector的消息,但是离线消息入库失败,那么这个消息也是传递失败了。

为了保证应用层的可靠,我们必须要有一个ack机制,使发送方能够确认对方收到了这条消息。

具体的实现,我们模仿tcp协议做一个应用层的ack机制。

tcp的报文是以字节(byte)为单位的,而我们以message单位。

3.png

发送方每次发送一个消息,就要等待对方的ack回应,在ack确认消息中应该带有收到的id以便发送方识别。

其次,发送方需要维护一个等待ack的队列。 每次发送一个消息之后,就将消息和一个计时器入队。

另外存在一个线程一直轮询队列,如果有超时未收到ack的,就取出消息重发。

超时未收到ack的消息有两种处理方式:

  • 1)和tcp一样不断发送直到收到ack为止。
  • 2)设定一个最大重试次数,超过这个次数还没收到ack,就使用失败机制处理,节约资源。例如如果是connector长时间未收到client的ack,那么可以主动断开和客户端的连接,剩下未发送的消息就作为离线消息入库,客户端断连后尝试重连服务器即可。

8.2不重复、不乱序


有的时候因为网络原因可能导致ack收到较慢,发送方就会重复发送,那么接收方必须有一个去重机制。

去重的方式是给每个消息增加一个唯一id。这个唯一id并不一定是全局的,只需要在一个会话中唯一即可。例如某两个人的会话,或者某一个群。如果网络断连了,重新连接后,就是新的会话了,id会重新从0开始。

关于消息ID的生成算法方面的文章,请详细参考:


接收方需要在当前会话中维护收到的最后一个消息的id,叫做lastId。

每次收到一个新消息, 就将id与lastId作比较看是否连续,如果不连续,就放入一个暂存队列 queue中稍后处理。

例如:

  • 1)当前会话的lastId=1,接着服务器收到了消息msg(id=2),可以判断收到的消息是连续的,就处理消息,将lastId修改为2;
  • 2)但是如果服务器收到消息msg(id=3),就说明消息乱序到达了,那么就将这个消息入队,等待lastId变为2后,(即服务器收到消息msg(id=2)并处理完了),再取出这个消息处理。

因此,判断消息是否重复只需要判断msgId>lastId && !queue.contains(msgId)即可。如果收到重复的消息,可以判断是ack未送达,就再发送一次ack。

接收方收到消息后完整的处理流程如下:
4.png

伪代码如下:
class ProcessMsgNode{
    /**
     * 接收到的消息
     */
    private Message message;
    /**
     * 处理消息的方法
     */
    private Consumer<Message> consumer;
}

public CompletableFuture<Void> offer(Long id,Message     message,Consumer<Message> consumer) {
    if (isRepeat(id)) {
    //消息重复
        sendAck(id);
        return null;
    }
    if (!isConsist(id)) {
    //消息不连续
        notConsistMsgMap.put(id, new ProcessMsgNode(message, consumer));
        return null;
    }
    //处理消息
    return process(id, message, consumer);
}

private CompletableFuture<Void> process(Long id, Message message, Consumer<Message> consumer) {
    return CompletableFuture
        .runAsync(() -> consumer.accept(message))
        .thenAccept(v -> sendAck(id))
        .thenAccept(v -> lastId.set(id))
        .thenComposeAsync(v -> {
            Long nextId = nextId(id);
            if (notConsistMsgMap.containsKey(nextId)) {
                //队列中有下个消息
                ProcessMsgNode node = notConsistMsgMap.get(nextId);
                return process(nextId, node.getMessage(), consumer);
            } else {
                //队列中没有下个消息
                CompletableFuture<Void> future = new CompletableFuture<>();
                future.complete(null);
                return future;
            }
        })
        .exceptionally(e -> {
            logger.error("[process received msg] has error", e);
            return null;
        });
}

9、安全性


无论是聊天记录还是离线消息,肯定都会在服务端存储备份,那么消息的安全性,保护客户的隐私也至关重要。

因此所有的消息都必须要加密处理。

在存储模块里,维护用户信息和关系链有两张基础表,分别是im_user用户表和im_relation关系链表。

  • im_user表用于存放用户常规信息,例如用户名密码等,结构比较简单。
  • im_relation表用于记录好友关系。

结构如下:
CREATE TABLE `im_relation` (
  `id` bigint(20) COMMENT '关系id',
  `user_id1` varchar(100) COMMENT '用户1id',
  `user_id2` varchar(100) COMMENT '用户2id',
  `encrypt_key` char(33) COMMENT 'aes密钥',
  `gmt_create` timestamp DEFAULT CURRENT_TIMESTAMP,
  `gmt_update` timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, 
  PRIMARY KEY (`id`),
  UNIQUE KEY `USERID1_USERID2` (`user_id1`,`user_id2`)
);

  • 1)user_id1和user_id2是互为好友的用户id,为了避免重复,存储时按照user_id1<user_id2的顺序存,并且加上联合索引;
  • 2)encrypt_key是随机生成的密钥。当客户端登录时,就会从数据库中获取该用户的所有的relation,存在内存中,以便后续加密解密;
  • 3)当客户端给某个好友发送消息时,取出内存中该关系的密钥,加密后发送。同样,当收到一条消息时,取出相应的密钥解密。

客户端完整登录流程如下:
5.png
  • 1)client调用rest接口登录;
  • 2)client调用rest接口获取该用户所有relation;
  • 3)client向connector发送greet消息,通知上线;
  • 4)connector拉取离线消息推送给client;
  • 5)connector更新用户session。

那为什么connector要先推送离线消息再更新session呢?

我们思考一下如果顺序倒过来会发生什么:

  • 1)用户Alice登录服务器;
  • 2)connector更新session;
  • 3)推送离线消息;
  • 4)此时Bob发送了一条消息给Alice。

如果离线消息还在推送的过程中,Bob发送了新消息给Alice,服务器获取到Alice的session,就会立刻推送。这时新消息就有可能夹在一堆离线消息当中推过去了,那这时,Alice收到的消息就乱序了。

而我们必须保证离线消息的顺序在新消息之前。

那么如果先推送离线消息,之后才更新session。在离线消息推送的过程中,Alice的状态就是“未上线”,这时Bob新发送的消息只会入库im_offline,im_offline表中的数据被读完之后才会“上线”开始接受新消息。这也就避免了乱序。

10、存储设计


10.1存储离线消息


当用户不在线时,离线消息必然要存储在服务端,等待用户上线再推送。理解了上一个小节后,离线消息的存储就非常容易了。

增加一张离线消息表im_offline,表结构如下:
CREATE TABLE `im_offline` (
  `id` int(11) COMMENT '主键',
  `msg_id` bigint(20) COMMENT '消息id',
  `msg_type` int(2) COMMENT '消息类型',
  `content` varbinary(5000) COMMENT '消息内容',
  `to_user_id` varchar(100) COMMENT '收件人id',
  `has_read` tinyint(1) COMMENT '是否阅读',
  `gmt_create` timestamp COMMENT '创建时间',
  PRIMARY KEY (`id`)
);

msg_type用于区分消息类型(chat,ack),content加密后的消息内容以byte数组的形式存储。

用户上线时按照条件to_user_id=用户id拉取记录即可。

10.2防止离线消息重复推送


我们思考一下多端登录的情况,Alice有两台设备同时登陆,在这种并发的情况下,我们就需要某种机制来保证离线消息只被读取一次。

这里利用CAS机制来实现:

  • 1)首先取出所有has_read=false的字段;
  • 2)检查每条消息的has_read值是否为false,如果是,则改为true。这是原子操作:
    update im_offline set has_read = true where id = ${msg_id} and has_read = false
  • 3)修改成功则推送,失败则不推送。

相信到这里,同学们已经可以自己动手搭建一个完整可用的IM服务端了。

11、完整源码下载


从零开发一个IM服务端(完整源码)(52im.net).zip (266.94 KB , 下载次数: 1364 , 售价: 1 金币)

即时通讯网 - 即时通讯开发者社区! 来源: - 即时通讯开发者社区!

上一篇:求教关于IM离线聊天消息同步策略的的一些疑惑下一篇:跟着源码学IM(四):拿起键盘就是干,教你徒手开发一套分布式IM系统

本帖已收录至以下技术专辑

推荐方案
评论 42
文章还行
签名: 秋天到了,终于凉快了
有客户端可以跑跑吗
签名: 星期六,那又怎样,还是得上班
感谢
签名: 今天天气真好~签到。
还没学过netty看不懂
引用:Chowing 发表于 2019-10-25 14:18
还没学过netty看不懂

先学netty再来看不迟
新手学习,谢谢分享。
学习学习,还是比较好的东西
代码是Netty+spring。。。没有注释好难看的懂orz
关于离线消息获取状态那里,就算先设置登录状态,在发送离线消息,会导致Alice收到消息乱序,消息乱序会有什么印象吗?客户端不是会把不返回消息序号的消息放在一个队列里吗?这样的话不就解决了乱序问题?
感谢分享
写的真好
签名: 早上好
transfer服务模块必须可靠,而且还多了一次转发
为什么不每台结点互联?
引用:mozhengxin 发表于 2020-08-20 10:18
transfer服务模块必须可靠,而且还多了一次转发
为什么不每台结点互联?

是的,结点之间互联提高了可用性,解决了单点问题,但架构的复杂性就大大提高了
引用:JackJiang 发表于 2020-08-20 10:46
是的,结点之间互联提高了可用性,解决了单点问题,但架构的复杂性就大大提高了

请教个问题 为什么长连接建立WIFI切4G会断,而4G切WIFI不会?
引用:mozhengxin 发表于 2020-08-20 12:00
请教个问题 为什么长连接建立WIFI切4G会断,而4G切WIFI不会?

都会断。

来,补个课:

4G切WIFI不会断 亲自验证过
引用:JackJiang 发表于 2020-08-20 10:46
是的,结点之间互联提高了可用性,解决了单点问题,但架构的复杂性就大大提高了

请问节点互联复杂在哪里了,transfer不也需要跟每台机器维持长连接吗
超级感谢
签名: 不要看的太远,未来不可知
将client移植到android上,启动报错:java.lang.ClassNotFoundException:java.lang.management.ManagementFactory,导入rt.jar包也不行,请问一下怎么解决的。
打赏楼主 ×
使用微信打赏! 使用支付宝打赏!

返回顶部