本文原文由作者“张小方”原创发布于“高性能服务器开发”微信公众号,原题《心跳包机制设计详解》,即时通讯网收录时有改动。
//on 是 1 表示打开 keepalive 选项,为 0 表示关闭,0 是默认值 int on = 1; setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &on, sizeof(on));
//发送 keepalive 报文的时间间隔 int val = 7200; setsockopt(fd, IPPROTO_TCP, TCP_KEEPIDLE, &val, sizeof(val)); //两次重试报文的时间间隔 int interval = 75; setsockopt(fd, IPPROTO_TCP, TCP_KEEPINTVL, &interval, sizeof(interval)); int cnt = 9; setsockopt(fd, IPPROTO_TCP, TCP_KEEPCNT, &cnt, sizeof(cnt));
[root@iZ238vnojlyZ ~]# sysctl -a | grep keepalive net.ipv4.tcp_keepalive_intvl = 75 net.ipv4.tcp_keepalive_probes = 9 net.ipv4.tcp_keepalive_time = 7200
//开启 keepalive 选项 const char on = 1; setsockopt(socket, SOL_SOCKET, SO_KEEPALIVE, (char *)&on, sizeof(on); // 设置超时详细信息 DWORD cbBytesReturned; tcp_keepalive klive; // 启用保活 klive.onoff = 1; klive.keepalivetime = 7200; // 重试间隔为10秒 klive.keepaliveinterval = 1000 * 10; WSAIoctl(socket, SIO_KEEPALIVE_VALS, &klive, sizeof(tcp_keepalive), NULL, 0, &cbBytesReturned, NULL, NULL);
bool CIUSocket::Send() { int nSentBytes = 0; int nRet = 0; while (true) { nRet = ::send(m_hSocket, m_strSendBuf.c_str(), m_strSendBuf.length(), 0); if (nRet == SOCKET_ERROR) { if (::WSAGetLastError() == WSAEWOULDBLOCK) break; else { LOG_ERROR("Send data error, disconnect server:%s, port:%d.", m_strServer.c_str(), m_nPort); Close(); return false; } } else if (nRet < 1) { //一旦出现错误就立刻关闭Socket LOG_ERROR("Send data error, disconnect server:%s, port:%d.", m_strServer.c_str(), m_nPort); Close(); return false; } m_strSendBuf.erase(0, nRet); if (m_strSendBuf.empty()) break; ::Sleep(1); } { //记录一下最近一次发包时间 std::lock_guard<std::mutex> guard(m_mutexLastDataTime); m_nLastDataTime = (long)time(NULL); } return true; } bool CIUSocket::Recv() { int nRet = 0; char buff[10 * 1024]; while (true) { nRet = ::recv(m_hSocket, buff, 10 * 1024, 0); if (nRet == SOCKET_ERROR) //一旦出现错误就立刻关闭Socket { if (::WSAGetLastError() == WSAEWOULDBLOCK) break; else { LOG_ERROR("Recv data error, errorNO=%d.", ::WSAGetLastError()); //Close(); return false; } } else if (nRet < 1) { LOG_ERROR("Recv data error, errorNO=%d.", ::WSAGetLastError()); //Close(); return false; } m_strRecvBuf.append(buff, nRet); ::Sleep(1); } { std::lock_guard<std::mutex> guard(m_mutexLastDataTime); //记录一下最近一次收包时间 m_nLastDataTime = (long)time(NULL); } return true; } void CIUSocket::RecvThreadProc() { LOG_INFO("Recv data thread start..."); int nRet; //上网方式 DWORD dwFlags; BOOL bAlive; while (!m_bStop) { //检测到数据则收数据 nRet = CheckReceivedData(); //出错 if (nRet == -1) { m_pRecvMsgThread->NotifyNetError(); } //无数据 else if (nRet == 0) { long nLastDataTime = 0; { std::lock_guard<std::mutex> guard(m_mutexLastDataTime); nLastDataTime = m_nLastDataTime; } if (m_nHeartbeatInterval > 0) { //当前系统时间与上一次收发数据包的时间间隔超过了m_nHeartbeatInterval //则发一次心跳包 if (time(NULL) - nLastDataTime >= m_nHeartbeatInterval) SendHeartbeatPackage(); } } //有数据 else if (nRet == 1) { if (!Recv()) { m_pRecvMsgThread->NotifyNetError(); continue; } DecodePackages(); }// end if }// end while-loop LOG_INFO("Recv data thread finish..."); }
void BusinessSession::send(const char* pData, int dataLength) { bool sent = TcpSession::send(pData, dataLength); //发送完数据更新下发包时间 updateHeartbeatTime(); } void BusinessSession::handlePackge(char* pMsg, int msgLength, bool& closeSession, std::vector<std::string>& vectorResponse) { //对数据合法性进行校验 if (pMsg == NULL || pMsg[0] == 0 || msgLength <= 0 || msgLength > MAX_DATA_LENGTH) { //非法刺探请求,不做任何应答,直接关闭连接 closeSession = true; return; } //更新下收包时间 updateHeartbeatTime(); //省略包处理代码... } void BusinessSession::updateHeartbeatTime() { std::lock_guard<std::mutex> scoped_guard(m_mutexForlastPackageTime); m_lastPackageTime = (int64_t)time(nullptr); } bool BusinessSession::doHeartbeatCheck() { const Config& cfg = Singleton<Config>::Instance(); int64_t now = (int64_t)time(nullptr); std::lock_guard<std::mutex> lock_guard(m_mutexForlastPackageTime); if (now - m_lastPackageTime >= cfg.m_nMaxClientDataInterval) { //心跳包检测,超时,关闭连接 LOGE("heartbeat expired, close session"); shutdown(); return true; } return false; } void TcpServer::checkSessionHeartbeat() { int64_t now = (int64_t)time(nullptr); if (now - m_nLastCheckHeartbeatTime >= m_nHeartbeatCheckInterval) { m_spSessionManager->checkSessionHeartbeat(); m_nLastCheckHeartbeatTime = (int64_t)time(nullptr); } } void SessionManager::checkSessionHeartbeat() { std::lock_guard<std::mutex> scoped_lock(m_mutexForSession); for (const auto& iter : m_mapSessions) { //这里调用 BusinessSession::doHeartbeatCheck() iter.second->doHeartbeatCheck(); } }
ChatSession::ChatSession(const std::shared_ptr<TcpConnection>& conn, int sessionid) : TcpSession(conn), m_id(sessionid), m_seq(0), m_isLogin(false) { m_userinfo.userid = 0; m_lastPackageTime = time(NULL); //这里设置了非调试模式下才开启心跳包检测功能 #ifndef _DEBUG EnableHearbeatCheck(); #endif }
void BusinessSession::send(std::string_view strResponse) { bool success = WebSocketSession::send(strResponse); if (success) { bool enablePingPongLog = Singleton<Config>::Instance().m_bPingPongLogEnabled; //其他消息正常打印,心跳消息按需打印 if (strResponse != "pong" || enablePingPongLog) { LOGI("msg sent to client [%s], sessionId: %s, session: 0x%0x, clientId: %s, accountId: %s, frontId: %s, msg: %s", getClientInfo(), m_strSessionId.c_str(), (int64_t)this, m_strClientID.c_str(), m_strAccountID.c_str(), BusinessSession::m_strFrontId.c_str(), strResponse.data()); } } }
来源:即时通讯网 - 即时通讯开发者社区!
轻量级开源移动端即时通讯框架。
快速入门 / 性能 / 指南 / 提问
轻量级Web端即时通讯框架。
详细介绍 / 精编源码 / 手册教程
移动端实时音视频框架。
详细介绍 / 性能测试 / 安装体验
基于MobileIMSDK的移动IM系统。
详细介绍 / 产品截图 / 安装体验
一套产品级Web端IM系统。
详细介绍 / 产品截图 / 演示视频
引用此评论
引用:设置一个上次包时间,每次收数据和发数据时,都更新一下这个包时间
引用:椎锋陷陈 发表于 2021-09-14 10:02 这里在发数据时也更新上次包时间是否不太合适?因为发数据本身不一定成功,如果直接更新反而可能造成心跳 ...
引用:大雄528 发表于 2021-10-26 11:26 发送数据时,更新包时间,是为了能够利用非心跳包(如正常的聊天)的发送来达到心跳的目的。真正的保活还 ...
引用:椎锋陷陈 发表于 2021-10-27 10:12 大概能理解了,其实这里的“上次包时间”本质上是减少客户端发送心跳的频率,客户端利用非心跳包的发送来 ...
精华主题数超过100个。
连续任职达2年以上的合格正式版主
为论区做出突出贡献的开发者、版主等。
在线时长累积7天(即7 * 8 = 56小时)。
持有金钱达到500。
本人属:狗
积极发起、参与各类话题的讨论等,主题、发帖内容较有价值。
在线时长累积30天(即30 * 8 = 240小时)。
Copyright © 2014-2024 即时通讯网 - 即时通讯开发者社区 / 版本 V4.4
苏州网际时代信息科技有限公司 (苏ICP备16005070号-1)
Processed in 0.125000 second(s), 41 queries , Gzip On.