本文原作者:“水晶虾饺”,原文由“玉刚说”写作平台提供写作赞助,原文版权归“玉刚说”微信公众号所有,即时通讯网收录时有改动。
1.png (22.22 KB, 下载次数: 1735)
下载附件 保存到相册
6 年前 上传
2.jpg (19.54 KB, 下载次数: 1804)
3.png (357.73 KB, 下载次数: 1787)
public class EchoServer { private final ServerSocket mServerSocket; public EchoServer(int port) throws IOException { // 1. 创建一个 ServerSocket 并监听端口 port mServerSocket = new ServerSocket(port); } public void run() throws IOException { // 2. 开始接受客户连接 Socket client = mServerSocket.accept(); handleClient(client); } private void handleClient(Socket socket) { // 3. 使用 socket 进行通信 ... } public static void main(String[] argv) { try { EchoServer server = new EchoServer(9877); server.run(); } catch (IOException e) { e.printStackTrace(); } } }
public class EchoClient { private final Socket mSocket; public EchoClient(String host, int port) throws IOException { // 创建 socket 并连接服务器 mSocket = new Socket(host, port); } public void run() { // 和服务端进行通信 } public static void main(String[] argv) { try { // 由于服务端运行在同一主机,这里我们使用 localhost EchoClient client = new EchoClient("localhost", 9877); client.run(); } catch (IOException e) { e.printStackTrace(); } } }
public class EchoServer { // ... private void handleClient(Socket socket) throws IOException { InputStream in = socket.getInputStream(); OutputStream out = socket.getOutputStream(); byte[] buffer = new byte[1024]; int n; while ((n = in.read(buffer)) > 0) { out.write(buffer, 0, n); } } }
public class EchoClient { // ... public void run() throws IOException { Thread readerThread = new Thread(this::readResponse); readerThread.start(); OutputStream out = mSocket.getOutputStream(); byte[] buffer = new byte[1024]; int n; while ((n = System.in.read(buffer)) > 0) { out.write(buffer, 0, n); } } private void readResponse() { try { InputStream in = mSocket.getInputStream(); byte[] buffer = new byte[1024]; int n; while ((n = in.read(buffer)) > 0) { System.out.write(buffer, 0, n); } } catch (IOException e) { e.printStackTrace(); } } }
Thread readerThread = new Thread(new Runnable() { @Override public void run() { readResponse(); } });
$ javac EchoServer.java $ java EchoServer $ javac EchoClient.java $ java EchoClient hello Server hello Server foo foo
在 Java 的 SDK 中,socket 的共有两个接口:用于监听客户连接的 ServerSocket 和用于通信的 Socket。
socket.setKeepAlive(true);
{ "type": 0, // 0 表示心跳 // ... }
public final class LongLiveSocket { /** * 错误回调 */ public interface ErrorCallback { /** * 如果需要重连,返回 true */ boolean onError(); } /** * 读数据回调 */ public interface DataCallback { void onData(byte[] data, int offset, int len); } /** * 写数据回调 */ public interface WritingCallback { void onSuccess(); void onFail(byte[] data, int offset, int len); } public LongLiveSocket(String host, int port, DataCallback dataCallback, ErrorCallback errorCallback) { } public void write(byte[] data, WritingCallback callback) { } public void write(byte[] data, int offset, int len, WritingCallback callback) { } public void close() { } }
public final class LongLiveSocket { private static final String TAG = "LongLiveSocket"; private static final long RETRY_INTERVAL_MILLIS = 3 * 1000; private static final long HEART_BEAT_INTERVAL_MILLIS = 5 * 1000; private static final long HEART_BEAT_TIMEOUT_MILLIS = 2 * 1000; /** * 错误回调 */ public interface ErrorCallback { /** * 如果需要重连,返回 true */ boolean onError(); } /** * 读数据回调 */ public interface DataCallback { void onData(byte[] data, int offset, int len); } /** * 写数据回调 */ public interface WritingCallback { void onSuccess(); void onFail(byte[] data, int offset, int len); } private final String mHost; private final int mPort; private final DataCallback mDataCallback; private final ErrorCallback mErrorCallback; private final HandlerThread mWriterThread; private final Handler mWriterHandler; private final Handler mUIHandler = new Handler(Looper.getMainLooper()); private final Object mLock = new Object(); private Socket mSocket; // guarded by mLock private boolean mClosed; // guarded by mLock private final Runnable mHeartBeatTask = new Runnable() { private byte[] mHeartBeat = new byte[0]; @Override public void run() { // 我们使用长度为 0 的数据作为 heart beat write(mHeartBeat, new WritingCallback() { @Override public void onSuccess() { // 每隔 HEART_BEAT_INTERVAL_MILLIS 发送一次 mWriterHandler.postDelayed(mHeartBeatTask, HEART_BEAT_INTERVAL_MILLIS); mUIHandler.postDelayed(mHeartBeatTimeoutTask, HEART_BEAT_TIMEOUT_MILLIS); } @Override public void onFail(byte[] data, int offset, int len) { // nop // write() 方法会处理失败 } }); } }; private final Runnable mHeartBeatTimeoutTask = () -> { Log.e(TAG, "mHeartBeatTimeoutTask#run: heart beat timeout"); closeSocket(); }; public LongLiveSocket(String host, int port, DataCallback dataCallback, ErrorCallback errorCallback) { mHost = host; mPort = port; mDataCallback = dataCallback; mErrorCallback = errorCallback; mWriterThread = new HandlerThread("socket-writer"); mWriterThread.start(); mWriterHandler = new Handler(mWriterThread.getLooper()); mWriterHandler.post(this::initSocket); } private void initSocket() { while (true) { if (closed()) return; try { Socket socket = new Socket(mHost, mPort); synchronized (mLock) { // 在我们创建 socket 的时候,客户可能就调用了 close() if (mClosed) { silentlyClose(socket); return; } mSocket = socket; // 每次创建新的 socket,会开一个线程来读数据 Thread reader = new Thread(new ReaderTask(socket), "socket-reader"); reader.start(); mWriterHandler.post(mHeartBeatTask); } break; } catch (IOException e) { Log.e(TAG, "initSocket: ", e); if (closed() || !mErrorCallback.onError()) { break; } try { TimeUnit.MILLISECONDS.sleep(RETRY_INTERVAL_MILLIS); } catch (InterruptedException e1) { // interrupt writer-thread to quit break; } } } } public void write(byte[] data, WritingCallback callback) { write(data, 0, data.length, callback); } public void write(byte[] data, int offset, int len, WritingCallback callback) { mWriterHandler.post(() -> { Socket socket = getSocket(); if (socket == null) { // initSocket 失败而客户说不需要重连,但客户又叫我们给他发送数据 throw new IllegalStateException("Socket not initialized"); } try { OutputStream outputStream = socket.getOutputStream(); DataOutputStream out = new DataOutputStream(outputStream); out.writeInt(len); out.write(data, offset, len); callback.onSuccess(); } catch (IOException e) { Log.e(TAG, "write: ", e); closeSocket(); callback.onFail(data, offset, len); if (!closed() && mErrorCallback.onError()) { initSocket(); } } }); } private boolean closed() { synchronized (mLock) { return mClosed; } } private Socket getSocket() { synchronized (mLock) { return mSocket; } } private void closeSocket() { synchronized (mLock) { closeSocketLocked(); } } private void closeSocketLocked() { if (mSocket == null) return; silentlyClose(mSocket); mSocket = null; mWriterHandler.removeCallbacks(mHeartBeatTask); } public void close() { if (Looper.getMainLooper() == Looper.myLooper()) { new Thread() { @Override public void run() { doClose(); } }.start(); } else { doClose(); } } private void doClose() { synchronized (mLock) { mClosed = true; // 关闭 socket,从而使得阻塞在 socket 上的线程返回 closeSocketLocked(); } mWriterThread.quit(); // 在重连的时候,有个 sleep mWriterThread.interrupt(); } private static void silentlyClose(Closeable closeable) { if (closeable != null) { try { closeable.close(); } catch (IOException e) { Log.e(TAG, "silentlyClose: ", e); // error ignored } } } private class ReaderTask implements Runnable { private final Socket mSocket; public ReaderTask(Socket socket) { mSocket = socket; } @Override public void run() { try { readResponse(); } catch (IOException e) { Log.e(TAG, "ReaderTask#run: ", e); } } private void readResponse() throws IOException { // For simplicity, assume that a msg will not exceed 1024-byte byte[] buffer = new byte[1024]; InputStream inputStream = mSocket.getInputStream(); DataInputStream in = new DataInputStream(inputStream); while (true) { int nbyte = in.readInt(); if (nbyte == 0) { Log.i(TAG, "readResponse: heart beat received"); mUIHandler.removeCallbacks(mHeartBeatTimeoutTask); continue; } if (nbyte > buffer.length) { throw new IllegalStateException("Receive message with len " + nbyte + " which exceeds limit " + buffer.length); } if (readn(in, buffer, nbyte) != 0) { // Socket might be closed twice but it does no harm silentlyClose(mSocket); // Socket will be re-connected by writer-thread if you want break; } mDataCallback.onData(buffer, 0, nbyte); } } private int readn(InputStream in, byte[] buffer, int n) throws IOException { int offset = 0; while (n > 0) { int readBytes = in.read(buffer, offset, n); if (readBytes < 0) { // EoF break; } n -= readBytes; offset += readBytes; } return n; } } }
public class EchoClient { private static final String TAG = "EchoClient"; private final LongLiveSocket mLongLiveSocket; public EchoClient(String host, int port) { mLongLiveSocket = new LongLiveSocket( host, port, (data, offset, len) -> Log.i(TAG, "EchoClient: received: " + new String(data, offset, len)), // 返回 true,所以只要出错,就会一直重连 () -> true); } public void send(String msg) { mLongLiveSocket.write(msg.getBytes(), new LongLiveSocket.WritingCallback() { @Override public void onSuccess() { Log.d(TAG, "onSuccess: "); } @Override public void onFail(byte[] data, int offset, int len) { Log.w(TAG, "onFail: fail to write: " + new String(data, offset, len)); // 连接成功后,还会发送这个消息 mLongLiveSocket.write(data, offset, len, this); } }); } }
03:54:55.583 12691-12713/com.example.echo I/LongLiveSocket: readResponse: heart beat received 03:55:00.588 12691-12713/com.example.echo I/LongLiveSocket: readResponse: heart beat received 03:55:05.594 12691-12713/com.example.echo I/LongLiveSocket: readResponse: heart beat received 03:55:09.638 12691-12710/com.example.echo D/EchoClient: onSuccess: 03:55:09.639 12691-12713/com.example.echo I/EchoClient: EchoClient: received: hello 03:55:10.595 12691-12713/com.example.echo I/LongLiveSocket: readResponse: heart beat received 03:55:14.652 12691-12710/com.example.echo D/EchoClient: onSuccess: 03:55:14.654 12691-12713/com.example.echo I/EchoClient: EchoClient: received: echo 03:55:15.596 12691-12713/com.example.echo I/LongLiveSocket: readResponse: heart beat received 03:55:20.597 12691-12713/com.example.echo I/LongLiveSocket: readResponse: heart beat received 03:55:25.602 12691-12713/com.example.echo I/LongLiveSocket: readResponse: heart beat received
来源:即时通讯网 - 即时通讯开发者社区!
轻量级开源移动端即时通讯框架。
快速入门 / 性能 / 指南 / 提问
轻量级Web端即时通讯框架。
详细介绍 / 精编源码 / 手册教程
移动端实时音视频框架。
详细介绍 / 性能测试 / 安装体验
基于MobileIMSDK的移动IM系统。
详细介绍 / 产品截图 / 安装体验
一套产品级Web端IM系统。
详细介绍 / 产品截图 / 演示视频
引用此评论
引用:大马仕格 发表于 2018-06-29 17:22 写的很详细很系统,赞!
引用:梦醒 发表于 2019-03-05 16:33 学习了,写的很好,看的出作者的功底极其深厚
引用:lyuscott 发表于 2019-10-09 18:34 太感谢了, 很详细
引用:zyf552311 发表于 2020-08-17 17:09 问什么不用C讲啊
精华主题数超过100个。
连续任职达2年以上的合格正式版主
为论区做出突出贡献的开发者、版主等。
Copyright © 2014-2024 即时通讯网 - 即时通讯开发者社区 / 版本 V4.4
苏州网际时代信息科技有限公司 (苏ICP备16005070号-1)
Processed in 0.156250 second(s), 45 queries , Gzip On.