package io; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.util.Iterator; public class Server { ServerSocketChannel serverSocketChannel; // 通道管理器 private Selector selector; private void initServer(int port) throws IOException { serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.bind(new InetSocketAddress(port)); serverSocketChannel.configureBlocking(false); // 设置成非阻塞模式 // 获得一个通道管理器 this.selector = Selector.open(); // 将通道管理器和该通道绑定,并为该通道注册SelectionKey.OP_ACCEPT事件,注册该事件后, // 当该事件到达时,selector.select()会返回,如果该事件没到达selector.select()会一直阻塞。 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); System.out.println("Server初始化成功"); } private void listen() throws IOException { System.out.println("服务端启动成功!"); while (true) { // 如果是阻塞模式,程序会阻塞在这里,直到有连接进来, // 现在为非阻塞模式,无论有没有连接都会返回,socketChannel可能为null selector.select(); Iterator ite = this.selector.selectedKeys().iterator(); while(ite.hasNext()) { SelectionKey key = (SelectionKey)ite.next(); ite.remove(); if(key.isAcceptable()) { System.out.println("有新的连接"); ServerSocketChannel server = (ServerSocketChannel)key.channel(); SocketChannel channel = server.accept(); channel.configureBlocking(false); channel.write(ByteBuffer.wrap(new String("向客户端发送了一条消息").getBytes())); channel.register(selector, SelectionKey.OP_READ); }else if(key.isValid()&&key.isReadable()){ read(key); } } } } public void read(SelectionKey key) { // 服务器可读取消息:得到事件发生的Socket通道 SocketChannel channel = (SocketChannel) key.channel(); try{ // 创建读取的缓冲区 ByteBuffer buffer = ByteBuffer.allocate(100); if(channel.isConnected()==false) { System.out.println("连接断开"); return; } int read = channel.read(buffer); //如果是正常断开 read = -1 if(read == -1){ //取消事件 key.cancel(); return; } byte[] data = buffer.array(); String msg = new String(data).trim(); System.out.println("服务端收到信息:" + msg); ByteBuffer outBuffer = ByteBuffer.wrap(msg.getBytes()); channel.write(outBuffer);// 将消息回送给客户端 }catch(Exception e) { try { channel.close(); } catch (IOException ioException) { ioException.printStackTrace(); } System.out.println("客户端断开了连接~~"); } } private void read(SocketChannel socketChannel) throws IOException { ByteBuffer buf = ByteBuffer.allocate(1024); // 非阻塞模式下,read()方法可能在尚未读取到任何数据时就返回了,所以需要判断 while (true) { if(socketChannel.read(buf) != -1) { buf.flip(); System.out.println("收到消息: " + Charset.forName("UTF-8").decode(buf)); buf.clear(); } } } public static void main(String[] args) throws IOException { Server server = new Server(); server.initServer(8000); server.listen(); } }
package io; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; public class Client { SocketChannel socketChannel; private void initClient(String ip, int port) throws IOException { socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); socketChannel.connect(new InetSocketAddress(ip, port)); } private void sendMsg(String msg) throws IOException { ByteBuffer buf = ByteBuffer.wrap(msg.getBytes(Charset.forName("UTF-8"))); socketChannel.write(buf); } public static void main(String[] args) throws IOException { Client client = new Client(); client.initClient("127.0.0.1", 8000); while (!client.socketChannel.finishConnect()) { System.out.println("等待连接..."); } InputStream inputStream = System.in; BufferedReader brin = new BufferedReader(new InputStreamReader(inputStream)); String line = null; while ((line=brin.readLine()) != null) { System.out.println("client:" + line); client.sendMsg(line); } } }
多线程处理
package nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.Channel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Iterator; import java.util.Set; public class ChatServer { private ServerSocketChannel serverSocketChannel; private Selector selector; //获取客户端连接 public ChatServer() throws IOException { //获取ServerSocketChannel通道 serverSocketChannel = ServerSocketChannel.open(); //绑定端口号 serverSocketChannel.bind(new InetSocketAddress(9999)); //设置非阻塞方式 serverSocketChannel.configureBlocking(false); //获取Selector selector = Selector.open(); //把serverSockerChannel注册到服务器 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); } //监控客户端连接 public void start() throws IOException { //干活 while (true) { if (selector.select(2000) == 0) {//体现了NIO非阻塞的优势 System.out.println("没有客户端连接,我可以做别的"); continue; } SetselectionKeys = selector.selectedKeys(); Iterator iterator = selectionKeys.iterator(); while (iterator.hasNext()) { SelectionKey selectionKey = iterator.next(); if (selectionKey.isAcceptable()) { //接收请求,得到SocketChannel SocketChannel socketChannel = serverSocketChannel.accept(); //设置非阻塞方式 socketChannel.configureBlocking(false); //将socketChannel注册到selector中 socketChannel.register(selector, SelectionKey.OP_READ); System.out.println(socketChannel.getRemoteAddress().toString().substring(1)+" 上线了..."); } if (selectionKey.isReadable()) { //读取客户端发来的数据 readMsg(selectionKey); } //一定要把当前key删掉,防止重复处理 iterator.remove(); } } } //读取客户端发送来的数据并且进行广播 private void readMsg(SelectionKey selectionKey) throws IOException { //获取通道 SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); //获取缓冲区 ByteBuffer byteBuffer = ByteBuffer.allocate(1024); try{ //读取客户端发送的数据 int count = socketChannel.read(byteBuffer); if (count >0) { String message = new String(byteBuffer.array()); printMsg(message); //将客户端发来的消息进行广播 broadCast(message,socketChannel); } }catch(IOException e) { try { socketChannel.close(); } catch (IOException ioException) { ioException.printStackTrace(); } System.out.println("客户端断开了连接~~"); } } private void broadCast(String message,SocketChannel socketChannel) throws IOException { //得到所有已经就绪的Channel for (SelectionKey key : selector.keys()) { Channel channel = key.channel(); if (channel instanceof SocketChannel && channel != socketChannel) { SocketChannel targetChannel = (SocketChannel) channel; //获取缓冲区 ByteBuffer byteBuffer = ByteBuffer.wrap(message.getBytes()); targetChannel.write(byteBuffer); } } } private void printMsg(String msg) { SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); System.out.println(dateFormat.format(new Date())+" "+msg); } public static void main(String[] args) throws IOException { new ChatServer().start(); } }
package nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; public class ChatClient { private final String IP = "127.0.0.1"; private int port = 9999; private SocketChannel socketChannel; //网络通道 private String userName; public ChatClient() throws IOException { //获取网络通道 socketChannel = SocketChannel.open(); //设置非阻塞方式 socketChannel.configureBlocking(false); if (!socketChannel.connect(new InetSocketAddress(IP,port))) { while (!socketChannel.finishConnect()) {//体现了NIO非阻塞的优势 System.out.println("连接服务器的同事还可以做别的事"); } } //得到客户端IP作为用户名 userName = socketChannel.getLocalAddress().toString().substring(1); System.out.println("------------------"+userName+" is ready---------------"); } //向服务端发送数据 public void sendMsg(String message) throws IOException { //如果从键盘录入的为“bye”则关闭socketChannel,退出聊天 if (message.equalsIgnoreCase("bye")) { socketChannel.close(); return; } String msg = userName + "说:" + message; //获取缓冲区 ByteBuffer byteBuffer = ByteBuffer.wrap(msg.getBytes()); socketChannel.write(byteBuffer); } //从服务端接收数据 public void receiveMsg() throws IOException { //获取缓冲区 ByteBuffer byteBuffer = ByteBuffer.allocate(1024); //将接收到的数据写到缓冲区中 int count = socketChannel.read(byteBuffer); if (count > 0) { System.out.println(new String(byteBuffer.array()).trim()); } } }
package nio; import java.io.IOException; import java.util.Scanner; public class TestChat { public static void main(String[] args) throws IOException { final ChatClient client = new ChatClient(); new Thread() { public void run() { while (true) { try { client.receiveMsg(); Thread.sleep(2000); } catch (Exception e) { e.printStackTrace(); } } } }.start(); Scanner scanner = new Scanner(System.in); while (scanner.hasNextLine()) { String msg = scanner.nextLine(); client.sendMsg(msg); } } }