默认
发表评论 8
想开发IM:买成品怕坑?租第3方怕贵?找开源自已撸?尽量别走弯路了... 找站长给点建议
求教-使用http+netty-server+netty-client发消息,如何等待消息发送结果
阅读(46242) | 评论(8 收藏 淘帖
需求:服务端业务模块发送消息给APP,要求返回发送结果

现有架构:

  • 业务方:消息产生侧
  • netty-server : netty 服务端,管理TCP 长链接
  • netty-client: netty 客户端,以sdk 的形式提供给APP,APP 负责启动netty 客户端

求助: 流程如下:

  • 1. 业务端使用 http 调用 netty-server 的 tomcat 端口发送消息
  • 2. netty-server 接受到发送请求根据终端id查询到对应的channel writeAndFlush 消息
  • 3. netty-client 接收到消息回调自定义消息处理模块,并写回给netty-server 消息已处理
  • 4. netty-server 接收到netty-client 的处理结果之后,将 全局标识改为 我收到消息id为 XX 的结果啦

伪代码:
Channel channel = getChannelByTerminalId(terminalId);
channel.writeAndFlush(msg);
while(返回结果不等于空){
    sleep(100);
    返回结果=全局标识的返回结果
}
retrurn 返回客户端处理结果;

疑问:第一步发送消息之后,不能返回给调用方这个请求已经完毕了,我需要 等待第三步的netty-client 写回结果,这里我需要while sleep (或者是其他通知等待机制),这里业界的通用处理是怎么样的呢?求教

即时通讯网 - 即时通讯开发者社区! 来源: - 即时通讯开发者社区!

标签:netty 消息 netty
上一篇:我写了个开源 IM 服务端,不知道写的好不好,欢迎拍砖下一篇:求教如何优雅解决IM中由群聊产生的循环拉取问题?
推荐方案
评论 8
你这是做的推送系统是吧。
引用:JackJiang 发表于 2020-11-23 14:57
你这是做的推送系统是吧。

是的,其实就是服务端向APP 推送一个消息,等待结果
引用:ing 发表于 2020-11-23 15:36
是的,其实就是服务端向APP 推送一个消息,等待结果

同步等待这从哪个角度看都不合理,也不优雅,异步是肯定要的。你还是应该换个方式,来考虑你这个http的结果反馈逻辑。

你可以具体说说,你到底想实现的是什么样的功能业务?
引用:JackJiang 发表于 2020-11-23 15:52
同步等待这从哪个角度看都不合理,也不优雅,异步是肯定要的。你还是应该换个方式,来考虑你这个http的结 ...

我也是感觉,可以提供查询或者用类似回调之类的感觉更合适,但是领导给出的架构图要求业务方发送给我们的消息之后,就要返回给他成功了还是失败了。

我们是做类似于米家的软件,各个摄像头收集到报警之后,发送到消息队列,各个业务方接收消息,处理之后发送给 netty-server , netty-server 负责发送给绑定该设备的APP。
引用:ing 发表于 2020-11-23 16:23
我也是感觉,可以提供查询或者用类似回调之类的感觉更合适,但是领导给出的架构图要求业务方发送给我们的 ...

这种情况,不应该用http啊,应该在业务方这边也用长连接才合适
如果你必须要同步等待一个消息的话:(其实只有第二步 netty-server 写消息之后,需要把异步的结果转换为同步即可,同时,你要接受性能上的损耗。具体损耗稍后讨论),代码在这里:

1 定义一个 Future 对象
public class ResponseFuture {
    private final String commandSeq;
    private final Channel processChannel;
    private final long timeoutMillis;
    private final BiConsumer<Command/*request*/, Command/*ack*/> consumer;
    private final long beginTimestamp = System.currentTimeMillis();
    private final CountDownLatch countDownLatch = new CountDownLatch(1);

    private final SemaphoreReleaseOnlyOnce once;

    private volatile Command request;
    private volatile Command response;
    private volatile boolean sendOk = true;
    private volatile Throwable cause;
」


2 netty-server 给客户端推送时这么写:
    
final ResponseFuture responseFuture = new ResponseFuture(channel, commandSeq, timeoutMillis, null, null, request);
            this.responseTable.put(commandSeq, responseFuture);
            channel.writeAndFlush(request).addListener((ChannelFutureListener) f -> {
                if(f.isSuccess()) {
                    responseFuture.setSendOk(true);
                    if(logger.isDebugEnabled()) {
                        logger.debug("发送命令 {}", JSON.toJSONString(request));
                    }
                    return;
                } else {
                    responseFuture.setSendOk(false);
                }

                requestFail(commandSeq, f.cause());
                throw new NettyException("invokeSync error", f.cause());
            });

            Command response = responseFuture.waitResponse(timeoutMillis);


3 waitResponse 方法中 await
    
public Command waitResponse(final long timeoutMillis) throws InterruptedException {
        this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);
        return this.response;
    }


4 最后,在你接受消息的线程中,判断 requestID,如果存在这样的 future 的话,就调用他的countDown 就好
签名: 。。。。。。
sendAndFlush 成功之后根据发送的消息ID生成一个HashedTimeWheel超时定时器,放到连接的一个队列里面。
客户端收到服务端的推送消息之后需要给服务端回复一个ACK消息,ID和收到的消息相同。 当这个ACK到服务端之后,就可以通过查询当前链接里面是否存在一个超时定时器,如果有并且没超时,就可以认为推送成功了,执行成功的逻辑。如果有且超时了,那么执行重试等逻辑。
打赏楼主 ×
使用微信打赏! 使用支付宝打赏!

返回顶部