`

NIO Socket 编程

    博客分类:
  • java
阅读更多
Java NIO (Nonblocking IO)解决了常规IO的瓶颈:

a. 服务端的监听操作会阻塞而无法处理其它事务。多线程方式受到线程池和系统资源的限制,同步操作将会变得复杂。多线程操作磁盘将会导致响应慢甚至死锁。

b. 普通I/O通过Stream来操作,开发简单,但是对I/O的控制力弱

c. 普通IO的读取或写入会在JVM内存和操作系统内存之间进行复制,开销较大。

普通客户端和服务端的Socket通信:

Socket 编程 - 单个客户端


下面通过ServerSocketChannelSocketChannel来实现客户端和服务端的NIO通信:



1. 服务端
public class TCPEchoServerNIO {

	private static final int BUFSIZE = 256;
	private static final long TIMEOUT = 3000;

	public static void main(String[] args) throws IOException {
		args = new String[1];
		args[0] = "4451";

		Selector selector = Selector.open(); // 工厂模式创建选择器

		ServerSocketChannel servChan = ServerSocketChannel.open(); // 工厂模式创建服务套接字通道
		servChan.socket().bind(new InetSocketAddress(Integer.parseInt(args[0]))); // 绑定到指定端口
		servChan.configureBlocking(false); // 非阻塞模式(必须)
		servChan.register(selector, SelectionKey.OP_ACCEPT); // 通道注册到选择器中。是通道的方法,不是选择器的。

		while (true) {
			if (selector.select(TIMEOUT) == 0) { // 获取可进行I/O操作的通道的SelectionKey集
				System.out.println("No channel needs I/O operations");
				continue;
			}

			Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
			while (iter.hasNext()) {
				SelectionKey key = iter.next();
				if (key.isAcceptable()) { // key关联的通道已准备好接收套接字连接(当客户端connect服务端时)
					handleAccept(key);
				}

				if (key.isReadable()) { // key关联的通道已准备好读取(客户端发送的数据已到达)
					handleRead(key);
				}

				if (key.isValid() && key.isWritable()) { // key有效(自key创建后,至调用cancel()、或通道关闭,或selector关闭之前),且通道已准备好写(客户端读取服务端返回)
					handleWrite(key);
				}
				
				iter.remove(); // 每次select()都会append新的SelectionKey,所以移除已处理的,防止下次重复处理
			}
		}
	}

	private static void handleAccept(SelectionKey key) throws IOException, ClosedChannelException {
		SocketChannel clntChan = ((ServerSocketChannel) key.channel()).accept();
		if (clntChan != null) {
			clntChan.configureBlocking(false); // 通道必须为非阻塞模式
			clntChan.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(BUFSIZE)); // 注册通道的读取事件(当客户端发送数据到服务端时发生),并创建一个缓冲区存放传输数据
		}
	}

	private static void handleRead(SelectionKey key) throws IOException {
		SocketChannel clntChan = (SocketChannel) key.channel();
		ByteBuffer buf = (ByteBuffer) key.attachment(); // 获取key关联的附件,也就是前面创建的缓冲区
		int bytesRead = clntChan.read(buf); // 将客户端发过来的数据放到缓冲区
		if (bytesRead == -1) { // 从客户端读取到的数据为空(当客户端关闭通道时,会触发该读取事件),说明已关闭
			clntChan.close();
		} else if (bytesRead > 0) {
			key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE); // 接收 客户端发送数据事件 | 客户端读取服务端返回事件
		}
	}

	private static void handleWrite(SelectionKey key) throws IOException {
		ByteBuffer buf = (ByteBuffer) key.attachment();
		buf.flip(); // 从先前写入的起始位置读取
		SocketChannel clntChan = (SocketChannel) key.channel();
		clntChan.write(buf); // 将客户端发送的数据写回客户端,完成“回显”功能
		if (!buf.hasRemaining()) { // 缓冲区没有数据
			key.interestOps(SelectionKey.OP_READ); // 不需要再往通道写,只对读事件(客户端又发送数据到服务端)感兴趣
		}
		buf.compact(); // 缓冲区剩余的数据,移动到缓冲区的起始部位,给下次存放数据腾出空间
	}
}


2. 客户端

public class TCPEchoClientNIO {

	public static void main(String[] args) throws IOException {
		args = new String[3];
		args[0] = "localhost";
		args[1] = "Hello NIO";
		args[2] = "4451";
		
		String server = args[0];
		
		byte[] words = args[1].getBytes(); // 待发送字符串的字节数组形式
		
		int servPort = args.length == 3 ? Integer.parseInt(args[2]) : 4451;
		
		// 静态工厂方法创建通道,设置为非阻塞模式
		SocketChannel clntChan = SocketChannel.open();
		clntChan.configureBlocking(false);
		
		// 初始化连接,并等待连接过程结束
		// 调用非阻塞通道的方法总是立即返回而不管是否完成,所以要采用轮询方式获取状态
		if (!clntChan.connect(new InetSocketAddress(server, servPort))) { // 触发服务端通道的OP_ACCEPT
			while (!clntChan.finishConnect()) {
				System.out.println("Wait for connecting finished");
			}
		}
		
		ByteBuffer writeBuf = ByteBuffer.wrap(words);  // 发送缓冲区
		ByteBuffer readBuf = ByteBuffer.allocate(words.length); // 接收缓冲区,长度和发送数据相同(回显)
		int totalBytesRcvd = 0;
		int bytesRcvd;
		while (totalBytesRcvd < words.length) {
			if (writeBuf.hasRemaining()) { // 发送缓冲区还有数据
				clntChan.write(writeBuf); // 向服务器发送数据,触发通道的OP_READ
			}
			bytesRcvd = clntChan.read(readBuf); // 获取服务端返回的数据,触发通道的OP_WRITE
			totalBytesRcvd += bytesRcvd;
			System.out.println("Receiving " + bytesRcvd + " bytes");
		}

		System.out.println("Received: " + new String(readBuf.array(), 0, totalBytesRcvd)); // 转化为字符串
		
		clntChan.close(); // 关闭通道,触发通道的OP_READ,数据为空
	}
}
  • 大小: 53 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics