public abstract class Message implements Serializable {
private int sequenceId;
private int messageType;
@Override
public String toString() {
return "Message{" +
"sequenceId=" + sequenceId +
", messageType=" + messageType +
'}';
}
public int getSequenceId() {
return sequenceId;
}
public void setSequenceId(int sequenceId) {
this.sequenceId = sequenceId;
}
public void setMessageType(int messageType) {
this.messageType = messageType;
}
public abstract int getMessageType();
public static final int LoginRequestMessage = 0;
public static final int LoginResponseMessage = 1;
public static final int ChatRequestMessage = 2;
public static final int ChatResponseMessage = 3;
public static final int GroupCreateRequestMessage = 4;
public static final int GroupCreateResponseMessage = 5;
public static final int GroupJoinRequestMessage = 6;
public static final int GroupJoinResponseMessage = 7;
public static final int GroupQuitRequestMessage = 8;
public static final int GroupQuitResponseMessage = 9;
public static final int GroupChatRequestMessage = 10;
public static final int GroupChatResponseMessage = 11;
public static final int GroupMembersRequestMessage = 12;
public static final int GroupMembersResponseMessage = 13;
public static final int PingMessage = 14;
public static final int PongMessage = 15;
}
public class LoginResponseMessage extends AbstractResponseMessage {
public LoginResponseMessage(boolean success, String reason) {
super(success, reason);
}
@Override
public int getMessageType() {
return LoginResponseMessage;
}
}
登录响应类的实现十分简单,由登录状态和登录消息组成,OK,接着来看看登录的具体实现。
首先在客户端中,再通过pipeline添加一个处理器,如下:
CountDownLatch WAIT_FOR_LOGIN = new CountDownLatch(1);
AtomicBoolean LOGIN = new AtomicBoolean(false);
AtomicBoolean EXIT = new AtomicBoolean(false);
Scanner scanner = new Scanner(System.in);
ch.pipeline().addLast("client handler", new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 负责接收用户在控制台的输入,负责向服务器发送各种消息
new Thread(() -> {
System.out.println("请输入用户名:");
String username = scanner.nextLine();
if(EXIT.get()){
return;
}
System.out.println("请输入密码:");
String password = scanner.nextLine();
if(EXIT.get()){
return;
}
// 构造消息对象
LoginRequestMessage message = new LoginRequestMessage(username, password);
System.out.println(message);
// 发送消息
ctx.writeAndFlush(message);
System.out.println("等待后续操作...");
try {
WAIT_FOR_LOGIN.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
// 如果登录失败
if (!LOGIN.get()) {
ctx.channel().close();
return;
}
}).start();
}
public class GroupCreateResponseMessage extends AbstractResponseMessage {
public GroupCreateResponseMessage(boolean success, String reason) {
super(success, reason);
}
@Override
public int getMessageType() {
return GroupCreateResponseMessage;
}
}
public class Group {
// 聊天室名称
private String name;
// 聊天室成员
private Set<String> members;
public static final Group EMPTY_GROUP = new Group("empty", Collections.emptySet());
public Group(String name, Set<String> members) {
this.name = name;
this.members = members;
}
// 省略其他Get/Settings、toString()方法.....
}
接着定义了一个群聊会话的顶级接口,如下:
public interface GroupSession {
// 创建一个群聊
Group createGroup(String name, Set<String> members);
// 加入某个群聊
Group joinMember(String name, String member);
// 移除群聊中的某个成员
Group removeMember(String name, String member);
// 解散一个群聊
Group removeGroup(String name);
// 获取一个群聊的成员列表
Set<String> getMembers(String name);
// 获取一个群聊所有在线用户的Channel通道
List<Channel> getMembersChannel(String name);
}