# JavaSE —— BIO/NIO/AIO

# 一、网络编程基础

# 1. URL 的解析和构造

image-20210917101524042

URI 和 URL 的区别与联系

  • URI: Uniform Resource Identifier
  • URL: Uniform Resource Locator
  • URN: Uniform Resource Name

URI 是抽象的定义,不管用什么方法表示,只要能定位一个资源,就叫 URI,本来设想的的使用两种方法定位:

  1. URL:用地址定位
  2. URN:用名称定位

# 2. DNS 解析

域名 -> IP 地址

# 3. 网络协议

img

# 4. Java IO

# 5. Socket

对于底层网络应用开发者而言,几乎所有网络编程都是 Socket,因为大部分底层网络 的编程都离不开 Socket 编程。 HTTP 编程、 Web 开发、 IM 通信 、视频流传输的底层都是 Socket 编程 。

日常生活中我们每天打开浏览器浏览网页、使用 QQ 聊天、 邮件收发、 直播等,客户端和服务器端的通信在底层看来都是依靠 Socket 通信的。

Socket 起源于 UNIX,而 UNIX 的基本哲学之一就是“一切皆文件”,都可以用 “打开(open)→读写(write/read)→关闭(close)” 模式来操作, Socket 就是该模式的一个实现,网络的 Socket 数据传输是 一种特殊的 I/0,Socket 也是一种文件描述符。 Socket 也具有一个类似于打开文件的函数调用:Socket(), 该函数返回一个整型的 Socket 描述符, 随后的连接建立、数据传输等操作都是通过该 Socket 实现的 。

网络中的进程之间如何通过 Socket 通信呢?首要解决的问题是如何唯一标识一个进程,否则通信无从谈起 ! 在本地可以通过进程 PID 来唯一标识一个进程,但是在网络中这是行不通的。其实 TCP/IP 协议族己经帮我们解决了这个问题,网络层的“IP 地址”可以唯一标识网络中的主机,而传输层的“协议+端口” 可以唯一标识主机中的应用程序(进程)。这样利用三大要素(IP 地址协议端口)就可以标识网络的进程了,网络中需要互相通信的进程,就可以利用这个标志在它们之间进行交互。

使用 TCP/IP 协议的应用程序通常采用应用编程接口:UNIX BSD 的套接字和 UNIX System V 的 TLI(己经被淘汰),来实现网络进程之间的通信。就目前而言, 几乎所有的应用程序都是采用 Socket,而现在又是网络时代,网络中进程通信是无处不在的,这就是为什么说“一切皆 Socket”。

Socket 有两种:

  • TCPSocket
  • UDP Socket

TCP 和 UDP 是协议,而要确定一个进程得需要三要素, 所以还需要 IP 地址和端口。

# 6. 阻塞/非阻塞 vs 异步/同步

阻塞/非阻塞

👀 从调用方的角度来看)

  • 如果调用方在被调用方返回结果之前只能傻傻等待,那就是阻塞的。
  • 如果调用方在被调用方返回结果之前可以先干别的事情,那就是非阻塞的。

同步/异步

(从被调用方的角度来看👀

  • 如果被调用方被调用之后需要立刻返回结果,那么就是同步的。
  • 如果被调用方被调用之后先返回一个空的结果,等到任务执行完成后再通知调用方,那就是异步的。

同步和异步的区别:是否开启新线程。

阻塞和非阻塞的区别:当前线程是否挂起,即是否释放 CPU。

# 7. Linux 5 种 IO

# 二、BIO

# 1. 简介

  • Java BIO 就是传统的 java io 编程,其相关接口在 java.io。
  • BIO(Blocking I/O):同步阻塞,服务器实现模式为一个连接一个线程,即客户端有连接请求时,服务器端就需要一个启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销,可以通过线程池机制改善(实现多个客户连接服务器)。
  • BIO 适用于连接数目较小且固定架构,这种方式对服务器资源要求较高,并发局限于应用中,JDK1.4 之前的唯一选择,程序简单易理解。
# 阻塞体现在哪里?
accept() :阻塞接收客户端的连接
read()/write()
connect():和服务端建立连接,连接的过程中 connect() 会阻塞

# 2. 编程模型

  1. 服务器启动一个 ServerSocket;
  2. 客户端启动 Socket 对服务器进行通信,默认情况下服务器需要对每个客户建立一个线程与之通讯;
  3. 客户端发出请求后,先咨询服务器,是否有线程响应,如果没有则会等待,或者被拒绝;
  4. 如果有响应,客户端线程会等待请求结束后,再继续执行。

# 3. 缺点

  1. 每个请求都需要创建独立线程,与对应的客户端进行数据 Read 业务处理,数据 Write。
  2. 当并发数量较大,需要创建大量线程来处理连接,系统资源占用较大。
  3. 连接建立后,如果当前线程没有数据可读,则线程就阻塞在 Read 操作上,造成线程资源浪费。

# 4. 实战多人聊天室

# 4.1 架构设计

bio-chatroom-sequence-diagram

服务端:

  • 一个线程来负责添加和删除客户端,并维护一个在线的客户端列表;
  • 一个线程来负责接收和广播客户端的信息;

客户端:

  • 一个线程来负责等待用户输入;
  • 一个线程来负责接收其他客户端发来的信息;
# 目录结构
.
├── client
│   ├── ChatClient.java
│   ├── ChatClientStarter.java
│   └── UserInputHandler.java
└── server
    ├── ChatHandler.java
    ├── ChatServer.java
    └── ChatServerStarter.java

# 4.2 服务端实现

  • ChatServer

    public class ChatServer {
    
        private int DEFAULT_PORT = 8888;                                // 默认端口
        private final String QUIT = "quit";                             // 客户端退出命令
    
        private ServerSocket serverSocket;                              // socket
    
        private ExecutorService executorService;                        // 线程池
        private HashMap<Integer, Writer> connectedClients;              // 端口:写对象
    
        public ChatServer() {
            connectedClients = new HashMap<>();
            executorService = Executors.newFixedThreadPool(10);
        }
    
        /**
         * 客户端上线
         * @param socket        accept 到的客户端 socket
         * @throws IOException  获取 socket 的 outputStream 时可能抛出 IOException
         */
        public synchronized void addClient(Socket socket) throws IOException{
            if (socket != null) {
                int port = socket.getPort();
                BufferedWriter writer = new BufferedWriter(
                        new OutputStreamWriter(socket.getOutputStream())
                );
                // 添加
                connectedClients.put(port, writer);
                // 日志
                System.out.println("客户端 [" + port + "] 已连接到服务器");
            }
        }
    
        /**
         * 客户端下线
         * @param socket        accept 到的客户端 socket
         * @throws IOException  关闭 socket 的 outputStream 时可能抛出 IOException
         */
        public synchronized void removeClient(Socket socket) throws IOException {
            if (socket != null){
                int port = socket.getPort();
                if (connectedClients.containsKey(port)) {
                    // 关闭 writer 对象
                    connectedClients.get(port).close();
                    // 移除
                    connectedClients.remove(port);
                    // 日志
                    System.out.println("客户端 [" + socket.getPort() + "] 已下线");
                }
            }
        }
    
        /**
         * 服务端转发信息给除发送者之外的所有客户端
         * @param socket        发送者
         * @param message       消息
         * @throws IOException  向 socket 的 outputStream 进行写操作时可能抛出 IOException
         */
        public synchronized void forwardMessage(Socket socket, String message) throws IOException {
            if (socket != null && !message.isEmpty()) {
                for (Integer port: connectedClients.keySet()) {
                    if (!port.equals(socket.getPort())) {
                        Writer writer = connectedClients.get(port);
                        writer.write(message);
                        writer.flush();
                    }
                }
            }
        }
    
        /**
         * 检查用户是否准备退出
         */
        public boolean readyToQuit(String msg) {
            return QUIT.equals(msg);
        }
    
        /**
         * 服务端主线程
         * 1. 监听客户端,等待客户端连接
         * 2. 接收客户端信息,并进行转发
         * 3. 维护在线的客户端列表
         */
        public void start(){
            try {
                // 绑定监听端口
                serverSocket = new ServerSocket(DEFAULT_PORT);
                System.out.println("启动服务器,监听端口:" + DEFAULT_PORT + "...");
                // 监听客户端请求
                Socket accept;
                while (true) {
                    // 等待客户端连接
                    accept = serverSocket.accept();
                    // 创建 ChatHandler 线程
                    executorService.execute(new ChatHandler(this, accept));
                }
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                close();
            }
    
        }
    
        /**
         * 关闭服务器
         */
        private synchronized void close(){
            if (serverSocket != null){
                try {
                    serverSocket.close();
                    System.out.println("服务器正常退出...");
                }catch (IOException e) {
                    System.out.println("服务器退出异常...");
                    e.printStackTrace();
                }
            }
        }
    }
    
  • ChatHandler

    public class ChatHandler implements Runnable{
    
        private ChatServer chatServer;          // 服务端
        private Socket socket;                  // 客户端
    
        public ChatHandler(ChatServer chatServer, Socket socket) {
            this.chatServer = chatServer;
            this.socket = socket;
        }
    
        @Override
        public void run() {
            try {
                // 存储新上线的客户端
                chatServer.addClient(socket);
                System.out.println("添加客户端 [" + socket.getPort() + "] 成功!");
    
                // 读取用户发送的信息
                BufferedReader reader = new BufferedReader(
                        new InputStreamReader(socket.getInputStream())
                );
                String msg = null;
                while ((msg = reader.readLine()) != null) {
                    // 检查用户是否是退出命令
                    if (chatServer.readyToQuit(msg.trim())) {
                        break;
                    }
    
                    // 转发消息到其他在线的客户端
                    chatServer.forwardMessage(socket, "客户端 [" + socket.getPort() + "]: " + msg + "\n");
    
                }
            } catch (IOException e) {
                System.out.println("添加客户端 [" + socket.getPort() + "] 失败...");
                e.printStackTrace();
            } finally {
                try {
                    // 移除客户端
                    chatServer.removeClient(socket);
                } catch (IOException e) {
                    System.out.println("客户端 [" + socket.getPort() + "] 下线异常...");
                    e.printStackTrace();
                }
            }
        }
    }
    
  • ChatServerStarter

    public class ChatServerStarter {
        public static void main(String[] args) {
            ChatServer chatServer = new ChatServer();
            chatServer.start();
        }
    }
    

# 4.3 客户端实现

  • ChatClient

    public class ChatClient {
    
        private final String DEFAULT_SERVER_HOST = "127.0.0.1";     // 服务端主机
        private final int DEFAULT_SERVER_PORT = 8888;               // 服务端端口
        private final String QUIT = "quit";                         // 客户端退出命令
    
        private Socket socket;  // 客户端 socket
        private BufferedReader reader;
        private BufferedWriter writer;
    
        /**
         * 发送消息
         */
        public void sendMessage(String message) throws IOException {
            if (socket != null && !socket.isOutputShutdown()) {
                if (writer != null) {
                    writer.write(message + "\n");
                    writer.flush();
                }
            }
        }
    
        /**
         * 接收消息
         */
        public String receiveMessage() throws IOException {
            if (socket != null && !socket.isInputShutdown()) {
                if (reader != null) {
                    return reader.readLine();
                }
            }
            return null;
        }
    
        /**
         * 检查用户是否准备退出
         */
        public boolean readyToQuit(String message) {
            return QUIT.equals(message);
        }
    
        /**
         * 启动客户端
         */
        public void start() {
            try {
                // 创建 socket,绑定服务端
                socket = new Socket(DEFAULT_SERVER_HOST, DEFAULT_SERVER_PORT);
    
                // 创建 IO 流
                reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
    
                // 处理用户输入
                new Thread(new UserInputHandler(this)).start();
    
                // 读取服务器转发的其他客户端的信息
                String message = null;
                while ((message = receiveMessage()) != null) {
                    System.out.println(message);
                }
    
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                close();
            }
        }
    
        /**
         * 关闭客户端
         */
        private void close() {
    
            if (reader != null) {
                try {
                    reader.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
    
            if (writer != null) {
                try {
                    writer.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
    
            if (socket != null) {
                try {
                    socket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    
  • UserInputHandler

    public class UserInputHandler implements Runnable{
    
        private ChatClient chatClient;          // 对应的客户端
    
        public UserInputHandler(ChatClient chatClient) {
            this.chatClient = chatClient;
        }
    
        @Override
        public void run() {
            // 等待用户输入消息
            BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
            String message = null;
    
            try {
                while (true) {
                    // 读取用户输入
                    message = reader.readLine();
    
                    // 向服务器发送消息
                    chatClient.sendMessage(message);
    
                    // 检查用户是否准备退出
                    if (chatClient.readyToQuit(message)) {
                        break;
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                try {
                    reader.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    
  • ChatClientStarter

    public class ChatClientStarter {
        public static void main(String[] args) {
            ChatClient chatClient = new ChatClient();
            chatClient.start();
        }
    }
    

# 4.4 演示

image-20210919202314400

# 三、NIO

# 1. BIO 中的阻塞

  • ServerSocket.accept()
  • InputStream.read()
  • OutputStream.writer()
  • 无法在同一个线程里面处理多个 Stream I/O

# 2. 简介

NIO 中的 N 可以理解为 Non-blocking,不单纯是 New。它支持面向缓冲的,基于通道的 I/O 操作方法。 NIO 提供了与传统 BIO 模型中的 SocketServerSocket 相对应的 SocketChannelServerSocketChannel 两种不同的套接字通道实现。两种通道都支持阻塞和非阻塞两种模式。

阻塞模式使用就像传统中的支持一样,比较简单,但是性能和可靠性都不好;非阻塞模式正好与之相反。

对于低负载、低并发的应用程序,可以使用同步阻塞 I/O 来提升开发速率和更好的维护性;对于高负载、高并发的(网络)应用,应使用 NIO 的非阻塞模式来开发。

# 3. 特点

  • 使用 Channel 代替 Stream
  • 使用 Selector 监控多条 Channel
  • 可以在一个线程里处理多个 Channel I/O

# 4. Buffer

image-20210920152348572
  • position: 当前指针
  • limit: 能写或者能读的最远地方
  • capacity: 缓冲区容量

# 4.1 向 Buffer 写入数据

  • 从 position 位置写,然后移动 position 指针,在到达 limit 之前都是可以写的

# 4.2 从 Buffer 读取数据

  • 调用 flip() 方法,将 Buffer 的写模式反转成读模式;
  • flip() 放把 position 移动回写的初始位置,然后把 limit 移动到刚刚写入的最远位置;
image-20210920152823069

# 4.2 刷新 Buffer

  • Buffer 里所有数据都已经被读完了,调用 clear()
    • 会把 position 和 limit 都复原到原位置。
  • Buffer 里面数据只读了一半,调用 clear()
    • 会调用 compact() 方法把未读的位置挪到前面,然后移动 position 指针到已有数据的下面,然后继续接受写或者读。

# 5. Channel

Channel 通过 Buffer 来写入和读取数据,Channel 也可以直接跟 Channel 进行数据的传输。

几个重要的 Channel:

  • FileChannel
  • SocketChannel
  • ServerSocketChannel

# 6. 多种方式实现本地文件拷贝

interface FileCopyRunner {
    void copyFile(File source, File target);
}

public class FileCopyDemo {
		
  	// 性能测试
    private static void benchmark(FileCopyRunner fileCopyRunner) {
        Date start = new Date();
        File source = new File("a.txt");
        File target = new File("b.txt");
        for (int i = 0; i < 1000; i++) {
            fileCopyRunner.copyFile(source, target);
        }
        Date end = new Date();
        System.out.println("总时间:" + (end.getTime() - start.getTime()));
    }

  	// 关闭资源
    private static void closeResource(Closeable closeable) {
        if (closeable != null){
            try {
                closeable.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {

        /**
         * ① BIO: 不利用缓冲区
         */
        FileCopyRunner noBufferStreamCopy = new FileCopyRunner() {
            @Override
            public void copyFile(File source, File target) {
                InputStream fin = null;
                OutputStream fout = null;
                try {
                    fin = new FileInputStream(source);
                    fout = new FileOutputStream(target);

                    // 读文件
                    int result;
                    while ( (result = fin.read() ) != -1) {
                        // 写文件
                        fout.write(result);
                    }

                } catch (IOException e ) {
                    e.printStackTrace();
                } finally {
                    closeResource(fin);
                    closeResource(fout);
                }
            }
        };

        /**
         * ② BIO: 利用缓冲区
         */
        FileCopyRunner bufferredStreamCopy = new FileCopyRunner() {
            @Override
            public void copyFile(File source, File target) {
                InputStream fin = null;
                OutputStream fout = null;

                try {
                    fin = new BufferedInputStream(new FileInputStream(source));
                    fout = new BufferedOutputStream(new FileOutputStream(target));

                    byte[] buffer = new byte[1024];
                    int result;

                    // 读
                    while ((result = fin.read(buffer)) != -1) {
                        // 写
                        fout.write(buffer, 0, result);
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                } finally {
                    closeResource(fin);
                    closeResource(fout);
                }
            }
        };

        /**
         * ③ NIO: 利用缓冲区
         */
        FileCopyRunner nioBufferCopy = new FileCopyRunner() {
            @Override
            public void copyFile(File source, File target) {
                FileChannel fin = null;
                FileChannel fout = null;

                try {
                    // 通过文件流来获取对应通道
                    fin = new FileInputStream(source).getChannel();
                    fout = new FileOutputStream(target).getChannel();
                    // 缓冲区
                    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                    // 读数据,写 Buffer
                    while ( fin.read(byteBuffer) != -1) {
                        // 需要将 Buffer 从写模式转移到读模式
                        byteBuffer.flip();
                        // 读 Buffer,传进 channel
                        while (byteBuffer.hasRemaining()) {
                            fout.write(byteBuffer);
                        }
                        // 读模式再转为写模式
                        byteBuffer.compact();
                        byteBuffer.clear();
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                } finally {
                    closeResource(fin);
                    closeResource(fout);
                }
            }
        };

        /**
         * ④ NIO: 不利用缓冲区,channel 直接交互
         */
        FileCopyRunner nioTransferCopy = new FileCopyRunner() {
            @Override
            public void copyFile(File source, File target) {
                FileChannel fin = null;
                FileChannel fout = null;

                try {
                    fin = new FileInputStream(source).getChannel();
                    fout = new FileOutputStream(target).getChannel();
                    // channel 直接传输
                    long hasTransferred = 0L;
                    while (hasTransferred != fin.size()) {
                        hasTransferred += fin.transferTo(hasTransferred, fin.size(), fout);
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                } finally {
                    closeResource(fin);
                    closeResource(fout);
                }
            }
        };

      	// 测试
        System.out.print("noBufferStreamCopy ");
        benchmark(noBufferStreamCopy);
        System.out.print("bufferredStreamCopy ");
        benchmark(bufferredStreamCopy);
        System.out.print("nioBufferCopy ");
        benchmark(nioBufferCopy);
        System.out.print("nioTransferCopy ");
        benchmark(nioTransferCopy);
    }
}

输出:

noBufferStreamCopy 总时间:25588
bufferredStreamCopy 总时间:206
nioBufferCopy 总时间:271
nioTransferCopy 总时间:240

经过多次实验,得出结论:

  • 文件大的话,使用 Channel 的优势会更明显;
  • Buffer 的用处非常大;
  • 单线程下 BIO 和 NIO 区别不大,多线程下 NIO 的优势明显;

# 7. Selector

Selector 是用来管理 Channel 的一个工具,开发者可以将 Channel 注册到特定的 Selector,Selector 会为 Channel 生成一个 SelectorKey,然后就可以通过 SelectorKey 的各种方法来管理 Channel,如以下方法:

  • interestOps(): 开发者希望 Selector 监控到 Channel 的哪些状态;
  • readyOps(): 获得目前的 Channel 处于哪些可操作的状态;
  • channel(): 返回对应的 Channel 对象;
  • selector(): 返回 Channel 所注册 Selector 对象;
  • attachment(): 开发者可以将任意对象附着在 Channel 上,通过该方法可以获取该对象;

其中 Ops 代表了 Channel 的某种状态,也可以理解为 Channel 所监听的某种事件,有如下:

  • CONNECT
  • ACCEPT
  • READ
  • WRITE

Selector 对象有以下方法可以获取 Channel 的信息:

  • select(): 返回有几个 Channel 处于满足 Selector 所监听的状态;

# 8. 编程模型

image-20210920215427218

# 9. 实战多人聊天室

# 9.1 架构设计

与 BIO 不同的是,NIO 的实现方式不需要再手动维护一个 Map,因为 Selector 会维护一个所有注册到它上面 SelectionKey 组成的 Set。

另外,NIO 也不需要每连接一个客户端就派一个线程去处理,而是支持在单线程下对多个客户端进行处理,因为用到了 Channel,而 Selector 又可以用来管理 Channel。

读写数据都是要经过 Buffer 的,且要注意,每次往 Buffer 里面写完东西后,都需要调用 flip() 将 Buffer 从写模式切换到读模式:

  • 写数据:先写到 wBuffer,再转到 Channel
  • 读数据:从 Channel 将数据读到 rBuffer,再冲 rBuffer 中读出数据
# 目录结构
├── client
│   ├── ChatClient.java
│   ├── ChatClientStarter.java
│   └── UserInputHandler.java
└── server
    ├── ChatServer.java
    └── ChatServerStarter.java

# 9.2 服务端实现

  • ChatServer

    public class ChatServer {
    
        private static final int DEFAULT_PORT = 8888;
        private static final String QUIT = "quit";
        private static final int BUFFER = 1024;
    
        private ServerSocketChannel serverSocketChannel;
        private Selector selector;
        private ByteBuffer rBuffer = ByteBuffer.allocate(BUFFER);
        private ByteBuffer wBuffer = ByteBuffer.allocate(BUFFER);
        private Charset charset = Charset.forName("UTF-8");
    
        private int port;
    
        public ChatServer(){
            this(DEFAULT_PORT);
        }
    
        public ChatServer(int port){
            this.port = port;
        }
    
        /**
         * 启动服务端
         */
        public void start(){
            try {
                // 获得服务端的通道
                serverSocketChannel = ServerSocketChannel.open();
                // 修改为非阻塞模式
                serverSocketChannel.configureBlocking(false);
                // 绑定端口
                serverSocketChannel.bind(new InetSocketAddress(this.port));
    
                // 获得 Channel 控制器 Selector 对象
                selector = Selector.open();
                // 将服务端 Channel 注册到 Selector 中,注册 ACCEPT 事件
                serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
                System.out.println("启动服务器,监听端口:" + this.port + "...");
    
                // Selector 监听事件
                while (true) {
                    selector.select();
                    // 处理所有被触发的事件
                    Set<SelectionKey> selectionKeys = selector.selectedKeys();
                    for (SelectionKey selectionKey : selectionKeys){
                        handles(selectionKey);
                    }
                    // 清空之前的事件集
                    selectionKeys.clear();
                }
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                closeResource(selector);
            }
        }
    
        /**
         * 处理被触发的事件
         */
        private void handles(SelectionKey selectionKey) throws IOException {
            // ACCEPT 事件 —— 即和客户端建立连接
            if (selectionKey.isAcceptable()) {
                ServerSocketChannel server = (ServerSocketChannel)selectionKey.channel();
                // 获取客户端 channel
                SocketChannel client = server.accept();
                // 将客户端 channel 转为非阻塞模式
                client.configureBlocking(false);
                // 为客户端 channel 注册 READ 事件
                // 当 READ 事件触发时,表示有客户端写东西了,channel 有可以读的东西
                client.register(selector, SelectionKey.OP_READ);
                System.out.println(getClientName(client) + "已连接");
            }
            // READ 事件 —— 即客户端发来信息,需要转发给其他客户端
            else if (selectionKey.isReadable()) {
                SocketChannel client = (SocketChannel)selectionKey.channel();
                // 获取客户端发来的信息
                String fwdMsg = receive(client);
                if (fwdMsg.isEmpty()) {
                    // 空信息 -> 客户端异常 -> 退出客户端
                    selectionKey.cancel();
                    selector.wakeup();
                } else {
                    System.out.println(getClientName(client) + ": " + fwdMsg);
    
                    // 转发信息
                    forwardMessage(client, fwdMsg);
    
                    // 判断用户是否准备退出
                    if (readyToQuit(fwdMsg)){
                        selectionKey.cancel();
                        selector.wakeup();
                        System.out.println(getClientName(client) + "已断开");
                    }
                }
            }
        }
    
        /**
         * 将 client 发来的信息转发给其他客户端
         */
        private void forwardMessage(SocketChannel client, String fwdMsg) throws IOException {
            for (SelectionKey key : selector.keys()) {
                Channel connectedChannel = key.channel();
                // 不转发给服务端
                if (connectedChannel instanceof ServerSocketChannel) {
                    continue;
                }
                // 不转发给自身
                if (key.isValid() && connectedChannel instanceof SocketChannel && !client.equals(connectedChannel)) {
                    wBuffer.clear();
                    // 先将数据写到 wBuffer 中
                    wBuffer.put(charset.encode(getClientName(client) + ": " + fwdMsg));
                    wBuffer.flip();
                    // 将从 rBuffer 将数据送到 channel
                    while (wBuffer.hasRemaining()){
                        ((SocketChannel)connectedChannel).write(wBuffer);
                    }
                }
            }
        }
    
        /**
         * 接收客户端发来的信息
         */
        private String receive(SocketChannel client) throws IOException {
            rBuffer.clear();
            // 将 channel 数据读到 rBuffer
            while (client.read(rBuffer) > 0) {}
            // 将 rBuffer 的写模式转为读模式
            rBuffer.flip();
            return String.valueOf(charset.decode(rBuffer));
        }
    
        /**
         * 构建客户端名称
         */
        private String getClientName(SocketChannel client) {
            return "客户端 [" + client.socket().getPort() + "] ";
        }
    
        /**
         * 判断客户端是否准备退出
         */
        private boolean readyToQuit(String msg) {
            return QUIT.equals(msg);
        }
    
        /**
         * 释放资源
         */
        private void closeResource(Closeable closable) {
            if (closable != null) {
                try {
                    closable.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    
  • ChatServerStarter

    public class ChatServerStarter {
        public static void main(String[] args) {
            ChatServer chatServer = new ChatServer(7777);
            chatServer.start();
        }
    }
    

# 9.3 客户端实现

  • ChatClient

    public class ChatClient {
    
        private static final String DEFAULT_SERVER_HOST = "127.0.0.1";
        private static final int DEFAULT_SERVER_PORT = 8888;
        private static final String QUIT = "quit";
        private static final int BUFFER = 1024;
    
        private String host;
        private int port;
        private SocketChannel clientSocketChannel;
        private ByteBuffer rBuffer = ByteBuffer.allocate(BUFFER);
        private ByteBuffer wBuffer = ByteBuffer.allocate(BUFFER);
        private Selector selector;
        private Charset charset = Charset.forName("UTF-8");
    
    
        public ChatClient() {
            this(DEFAULT_SERVER_HOST, DEFAULT_SERVER_PORT);
        }
    
        public ChatClient(String host, int port) {
            this.host = host;
            this.port = port;
        }
    
        /**
         * 启动客户端
         */
        public void start(){
            try {
                // 获取客户端 channel
                clientSocketChannel = SocketChannel.open();
                // 设置为非阻塞模式
                clientSocketChannel.configureBlocking(false);
    
                // 创建一个通道管理器 selector
                selector = Selector.open();
                // channel 注册 CONNECT 事件到 selector
                clientSocketChannel.register(selector, SelectionKey.OP_CONNECT);
                // 连接服务端
                clientSocketChannel.connect(new InetSocketAddress(this.host, this.port));
    
                // 监听事件
                while (true){
                    selector.select();
                    Set<SelectionKey> selectionKeys = selector.selectedKeys();
                    for (SelectionKey key : selectionKeys) {
                        handles(key);
                    }
                    selectionKeys.clear();
                }
            }catch (IOException e){
                e.printStackTrace();
            } catch (ClosedSelectorException e) {
                // 正常退出,无需处理
            } finally {
                closeResource(selector);
            }
        }
    
        /**
         * 处理被触发的事件
         */
        private void handles(SelectionKey key) throws IOException {
            // CONNECT 事件 —— 客户端已经连接上服务端了
            if (key.isConnectable()) {
                SocketChannel client = (SocketChannel)key.channel();
                // 判断是否已经完成连接
                if (client.isConnectionPending()) {
                    client.finishConnect();
                    // 处理用户输入
                    new Thread(new UserInputHandler(this)).start();
                }
                // 注册 READ 事件
                client.register(selector, SelectionKey.OP_READ);
            }
            // READ 事件 —— 服务端转发别的客户端的消息过来
            else if (key.isReadable()) {
                SocketChannel clientSocketChannel = (SocketChannel) key.channel();
                String msg = receive(clientSocketChannel);
                if (msg.isEmpty()) {
                    // 空信息 -> 服务端异常 -> 客户端退出
                    closeResource(selector);
                } else {
                    System.out.println(msg);
                }
            }
        }
    
        /**
         * 从通道中读取信息
         */
        private String receive(SocketChannel clientSocketChannel) throws IOException {
            rBuffer.clear();
            while (clientSocketChannel.read(rBuffer) > 0){}
            rBuffer.flip();
            return String.valueOf(charset.decode(rBuffer));
        }
    
        /**
         * 发送消息
         */
        public void send(String input) throws IOException {
            if (input.isEmpty()) {
                return;
            }
    
            // 先写入 wBuffer
            wBuffer.clear();
            wBuffer.put(charset.encode(input));
            wBuffer.flip();
            // 再转到 channel
            while (wBuffer.hasRemaining()){
                clientSocketChannel.write(wBuffer);
            }
    
            // 监控用户是否退出
            if (readyToQuit(input)) {
                closeResource(selector);
            }
        }
    
        /**
         * 判断客户端是否准备退出
         */
        public boolean readyToQuit(String msg) {
            return QUIT.equals(msg);
        }
    
        /**
         * 释放资源
         */
        private void closeResource(Closeable closable) {
            if (closable != null) {
                try {
                    closable.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    
  • UserInputHandler

    public class UserInputHandler implements Runnable {
    
        private ChatClient chatClient;
    
        public UserInputHandler(ChatClient chatClient){
            this.chatClient = chatClient;
        }
    
        @Override
        public void run() {
    
            try {
                // 等待用户输入消息
                BufferedReader consoleReader = new BufferedReader(new InputStreamReader(System.in));
                while (true) {
                    String input = consoleReader.readLine();
                    // 像服务器发送消息
                    chatClient.send(input);
    
                    // 检查用户是否退出
                    if (chatClient.readyToQuit(input)) {
                        break;
                    }
                }
            }catch (IOException e){
                e.printStackTrace();
            }
        }
    }
    
  • ChatClientStarter

    public class ChatClientStarter {
        public static void main(String[] args) {
            ChatClient chatClient = new ChatClient("127.0.0.1", 7777);
            chatClient.start();
        }
    }
    

# 9.4 演示

image-20210923193826991

# 四、AIO

# 1. 简介

与 NIO 不同,使用 AIO 进行读写操作时,只须直接调用 API 的 readwrite 方法即可。这两种方法均为异步的:

  • 对于读操作而言,当有流可读取时,操作系统会将可读的流传入 read 方法的缓冲区,并通知应用程序;
  • 对于写操作而言,当操作系统将 write 方法传递的流写入完毕时,操作系统主动通知应用程序。

即可以理解为,read/write 方法都是异步的,完成后会主动调用回调函数。 在 JDK1.7 中,这部分内容被称作 NIO.2,主要在java.nio.channels 包下增加了下面四个异步通道:

  • AsynchronousSocketChannel
  • AsynchronousServerSocketChannel
  • AsynchronousFileChannel
  • AsynchronousDatagramChannel

其中的 read/write 方法,会返回一个带回调函数的对象,当执行完读取/写入操作后,直接调用回调函数。

# 2. 异步实现

# 2.1 通过 Future

image-20210924152147217

# 2.2 通过 CompletionHandler

image-20210924152342721

# 3. 原理

# 4. 编程模型

image-20210924163634768

# 5. 实现“回音壁”

  • 服务器端采用 CompletionHandler 来实现异步
  • 客户端采用 Future 来实现异步

Server:

public class Server {

    final String LOCALHOST = "localhost";
    final int DEFAULT_PORT = 8888;
    final int BUFFER = 1024;
    final String READ_OP = "read";
    final String WRITE_OP = "write";
    AsynchronousServerSocketChannel serverSocketChannel;


    public static void main(String[] args) {
        Server server = new Server();
        server.start();
    }


    /**
     * 启动服务端
     */
    public void start(){
        try {
            // 绑定监听端口
            serverSocketChannel = AsynchronousServerSocketChannel.open();
            serverSocketChannel.bind(new InetSocketAddress(LOCALHOST, DEFAULT_PORT));
            System.out.println("启动服务器,监听端口:" + DEFAULT_PORT + "...");

            // 监听客户端,accept 的第二个参数就是一个回调函数 CompletionHandler
            while (true) {
                // 异步,直接返回
                serverSocketChannel.accept(null, new AcceptHandler());
                // 阻塞一下,避免一直循环
                System.in.read();
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            closeResource(serverSocketChannel);
        }

    }

    /**
     * 释放资源
     */
    private void closeResource(Closeable closeable) {
        if(closeable != null){
            try{
                closeable.close();
                System.out.println("关闭" + closeable);
            }catch (IOException e){
                e.printStackTrace();
            }
        }
    }

    /**
     * IO 完成后要调用的回调
     *
     * CompletionHandler<V,A>
     *      @param   '<V>'     The result type of the I/O operation 这里我们要返回的就是客户端对应的异步通道
     *      @param   '<A>'     The type of the object attached to the I/O operation 要附带的对象的类型
     */
    private class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, Object>{

        /**
         * IO 正常结束后要做的事情
         */
        @Override
        public void completed(AsynchronousSocketChannel result, Object attachment) {
            // 服务端还在线的话,要继续监听
            if (serverSocketChannel.isOpen()){
                serverSocketChannel.accept(null, this);
            }

            // 处理客户端请求
            AsynchronousSocketChannel clientChannel = result;
            if (clientChannel != null && clientChannel.isOpen()) {
                ByteBuffer buffer = ByteBuffer.allocate(BUFFER);
                // attachment
                Map<String, Object> info = new HashMap<>();
                info.put("type", READ_OP);
                info.put("buffer", buffer);
                // 参数1:目标 Buffer
                // 参数2:attachment
                // 参数3:IO 完成后要执行的回调
                clientChannel.read(buffer, info, new ClientHandler(clientChannel));
            }
        }

        /**
         * IO 异常后要做的事情
         */
        @Override
        public void failed(Throwable exc, Object attachment) {
            System.out.println("AcceptHandler 出现异常了,exception: " + exc.toString() + ",attachment:" + attachment.toString());
        }
    }

    /**
     * 读取完客户端发来的信息后要执行的回调
     *
     * CompletionHandler<Integer,? super A> handler
     *      @param   '<Integer>'     The result type of the I/O operation 这里是返回读到了多少个字节
     *      @param   '<? super A>'   The type of the object attached to the I/O operation 需要是 A 的父类,A 是 attachment
     */
    private class ClientHandler implements CompletionHandler<Integer, Map<String, Object>>{

        private AsynchronousSocketChannel clientChannel;

        public ClientHandler(AsynchronousSocketChannel clientChannel) {
            this.clientChannel = clientChannel;
        }

        /**
         * IO 正常结束后要做的事情
         */
        @Override
        public void completed(Integer result, Map<String, Object> attachment) {
            String type = (String) attachment.get("type");
            // READ -> 即服务端收到来自客户端的信息了
            if (type.equals(READ_OP)) {
                // 回音壁:读出客户端发来的数据,然后再回传回去
                ByteBuffer buffer = (ByteBuffer) attachment.get("buffer");
                // 从写模式切换为读模式
                buffer.flip();
                // 将 buffer 中的数据写进 channel 中
                attachment.put("type", WRITE_OP);
                clientChannel.write(buffer, attachment, this);
                buffer.clear();
            }
            // WRITE -> 即服务端回传数据了
            else if (type.equals(WRITE_OP)){
                // 服务端回传完后让服务端继续监听客户端有可能发来的数据
                ByteBuffer buffer = ByteBuffer.allocate(BUFFER);
                attachment.put("type", READ_OP);
                attachment.put("buffer", buffer);
                clientChannel.read(buffer, attachment, this);
            }
        }

        /**
         * IO 异常后要做的事情
         */
        @Override
        public void failed(Throwable exc, Map<String, Object> attachment) {
            System.out.println("ClientHandler 出现异常了,exception: " + exc.toString() + ",attachment:" + attachment.toString());
        }
    }
}

Client:

public class Client {

    final String LOCALHOST = "localhost";
    final int DEFAULT_PORT = 8888;
    AsynchronousSocketChannel clientSocketChannel;

    public static void main(String[] args) {
        Client client = new Client();
        client.start();
    }

    /**
     * 启动客户端
     */
    public void start(){
        try {
            // 连接服务端
            clientSocketChannel = AsynchronousSocketChannel.open();
            // 获得 Future 对象
            Future<Void> connectFuture = clientSocketChannel.connect(new InetSocketAddress(LOCALHOST, DEFAULT_PORT));

            // ========================================
            // 此处可以加一些其他的操作,因为可以是异步非阻塞的
            // ========================================

            // 等待连接建立完成
            connectFuture.get();

            // 处理用户输入
            while (true) {
                BufferedReader consoleReader = new BufferedReader(new InputStreamReader(System.in));
                String input = consoleReader.readLine();
                byte[] inputBytes = input.getBytes();
                ByteBuffer buffer = ByteBuffer.wrap(inputBytes);

                // 传给服务端
                Future<Integer> writeFuture = clientSocketChannel.write(buffer);

                // ========================================
                // 此处可以加一些其他的操作,因为可以是异步非阻塞的
                // ========================================

                // 等待写入完成
                writeFuture.get();

                // 读取服务器返回的消息
                buffer.flip();
                buffer.clear();
                Future<Integer> readFuture = clientSocketChannel.read(buffer);

                // ========================================
                // 此处可以加一些其他的操作,因为可以是异步非阻塞的
                // ========================================

                // 等待读取完成
                readFuture.get();
                String echo = new String(buffer.array());
                buffer.clear();
                System.out.println("服务端:" + echo);
            }

        } catch (ExecutionException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            closeResource(clientSocketChannel);
        }
    }

    /**
     * 释放资源
     */
    private void closeResource(Closeable closeable) {
        if(closeable != null){
            try{
                closeable.close();
                System.out.println("关闭" + closeable);
            }catch (IOException e){
                e.printStackTrace();
            }
        }
    }

}

演示:

image-20210924162408192

# 6. 实战多人聊天室

# 6.1 架构设计

跟 BIO 很像,只不过这会儿是异步的。在服务端我们采用 CompletionHandler 来实现异步,在客户端我们采用 Future 来实现异步。

# 目录结构
├── client
│   ├── ChatClient.java							# 客户端实现
│   ├── ChatClientStarter.java			# 客户端启动器
│   └── UserInputHandler.java				# 处理用户输入
└── server
    ├── AcceptHandler.java					# 客户端连接到服务器时的回调
    ├── ChatServer.java							# 服务端实现
    ├── ChatServerStarter.java			# 服务端启动器
    └── ClientHandler.java				  # 服务器收到客户端信息时的回调

# 6.2 服务端实现

  • ChatServer

    public class ChatServer {
    
        private static final String LOCALHOST = "localhost";
        private static final int DEFAULT_PORT = 8888;
        private static final int THREAD_POOL_SIZE = 8;
    
        private AsynchronousChannelGroup channelGroup;                  // 自定义 asyncChannelGroup
        private AsynchronousServerSocketChannel serverSocketChannel;    // 服务端异步通道
    
        private List<ClientHandler> connectedClients;
        private int port;
    
        public ChatServer(){
            this(DEFAULT_PORT);
        }
    
        public ChatServer(int port){
            this.port = port;
            this.connectedClients = new ArrayList<>();
        }
    
        /**
         * 启动服务器
         */
        public void start() {
            // 定义一个线程池,给 channel group 用
            ExecutorService executorService = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
    
            try {
                // 自定义 asyncChannelGroup
                channelGroup = AsynchronousChannelGroup.withThreadPool(executorService);
                // 开一个服务端通道
                serverSocketChannel = AsynchronousServerSocketChannel.open(channelGroup);
                // 绑定端口
                serverSocketChannel.bind(new InetSocketAddress(LOCALHOST, this.port));
                System.out.println("启动服务端,监听端口:" + this.port + "...");
    
                // 监听客户端的连接请求
                while (true) {
                    // 参数1:附带对象
                    // 参数2:客户端连接后要进行的回调
                    serverSocketChannel.accept(null, new AcceptHandler(this.serverSocketChannel, this.connectedClients));
                    // 阻塞一下,避免一直循环。
                    // accept 后,read 前可以做其他一些操作,因为是异步非阻塞的
                    System.in.read();
                }
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                closeResource(serverSocketChannel);
                if (channelGroup != null) {
                    channelGroup.shutdown();
                }
            }
        }
    
        /**
         * 释放资源
         */
        private void closeResource(Closeable closeable){
            if (closeable != null){
                try {
                    closeable.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    
  • ChatServerStarter

    public class ChatServerStarter {
        public static void main(String[] args) {
            new ChatServer(9999).start();
        }
    }
    
  • AcceptHandler

    /**
     * 客户端连接完成后要做的回调
     *
     * @param   '<V>'     The result type of the I/O operation                      这里要返回的是客户端对应的异步通道
     * @param   '<A>'     The type of the object attached to the I/O operation      附加对象的类型
     */
    public class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, Object> {
    
        private static final int BUFFER = 1024;
    
        private AsynchronousServerSocketChannel serverSocketChannel;
        private List<ClientHandler> connectedClients;
    
        public AcceptHandler(AsynchronousServerSocketChannel serverSocketChannel, List<ClientHandler> connectedClients) {
            this.serverSocketChannel = serverSocketChannel;
            this.connectedClients = connectedClients;
        }
    
        /**
         * IO 正常完成后要做的回调: 接收客户端发来的信息 -> 转发给其他客户端
         */
        @Override
        public void completed(AsynchronousSocketChannel clientChannel, Object attachment) {
            // 如果服务端还在线的话,要继续监听客户端的连接请求
            if (serverSocketChannel.isOpen()) {
                serverSocketChannel.accept(null, this);
            }
    
            if (clientChannel != null && clientChannel.isOpen()) {
                ClientHandler clientHandler = new ClientHandler(clientChannel, this.connectedClients);
                // 添加新客户端
                clientHandler.addClient(clientHandler);
    
                // 接收客户端发来的信息
                ByteBuffer buffer = ByteBuffer.allocate(BUFFER);
                // 参数1:把客户端发来的信息读要 buffer 缓冲区中
                // 参数2:将 buffer 作为附加对象传给回调对象
                // 参数3:回调对象,每个客户端对应一个自己的 ClientHandler
                clientChannel.read(buffer, buffer, clientHandler);
            }
        }
    
        /**
         * IO 异常结束后要做的回调
         */
        @Override
        public void failed(Throwable exc, Object attachment) {
            System.out.println("AcceptHandler 发生异常了,exception: " + exc + ",attachment: " + attachment);
        }
    }
    
  • ClientHandler

    /**
     * 收到客户端发来的信息后,IO 完成后要做的回调
     *
     * @param   '<V>'     The result type of the I/O operation                      这里是从客户端读到了多少数据,所以是 Integer
     * @param   '<A>'     The type of the object attached to the I/O operation      附加对象的类型,这里是 ByteBuffer
     */
    public class ClientHandler implements CompletionHandler<Integer, ByteBuffer> {
    
        private static final String QUIT = "quit";
    
        private Charset charset = Charset.forName("UTF-8");
    
        private AsynchronousSocketChannel clientChannel;
        private List<ClientHandler> connectedClients;
    
        public ClientHandler(AsynchronousSocketChannel clientChannel, List<ClientHandler> connectedClients) {
            this.clientChannel = clientChannel;
            this.connectedClients = connectedClients;
        }
    
        /**
         * IO 正常完成后要做的回调: 将客户端发来的消息转发给其他客户端
         */
        @Override
        public void completed(Integer result, ByteBuffer attachment) {
    
            ByteBuffer buffer = attachment;
    
            // 如果 attachment 为空,则表示之前服务器刚刚对 buffer 完成了读操作(即已经转发了),不需要额外的操作
            if (buffer == null) {
    
            }
            // 如果 attachment 不为空,则表示服务器刚刚对 buffer 完成了写操作(即还没转发),现在可以来读取它
            else {
                // 客户端异常
                if (result <= 0) {
                    removeClient(this);
                    return;
                }
                // 将 buffer 从写模式切换为读模式
                buffer.flip();
                // 获取客户端发来的消息
                String fwdMsg = receive(buffer);
                System.out.println(getClientName(this.clientChannel) + ":" + fwdMsg);
                // 转发
                forwardMessage(clientChannel, fwdMsg);
                buffer.clear();
    
                // 判断用户是否要退出
                if (readyToQuit(fwdMsg)) {
                    // 退出,移除客户端
                    removeClient(this);
                } else {
                    // 不退出,继续监听客户端信息
                    clientChannel.read(buffer, buffer, this);
                }
            }
    
        }
    
        /**
         * IO 异常结束后要做的回调
         */
        @Override
        public void failed(Throwable exc, ByteBuffer attachment) {
            System.out.println("ClientHandler 发生异常了,exception: " + exc + ",attachment: " + attachment);
        }
    
        /**
         * 获取客户端发来的消息
         */
        private String receive(ByteBuffer buffer) {
            return String.valueOf(charset.decode(buffer));
        }
    
        /**
         * 构建客户端名称
         */
        private String getClientName(AsynchronousSocketChannel socketChannel) {
            String port = "UNKNOWN_CLIENT";
            try {
                InetSocketAddress remoteAddress = (InetSocketAddress)clientChannel.getRemoteAddress();
                port = "" + remoteAddress.getPort();
            } catch (IOException e) {
                e.printStackTrace();
            }
            return "客户端 [" + port + "] ";
        }
    
        /**
         * 判断客户端是否要下线
         */
        private boolean readyToQuit(String msg){
            return msg.equals(QUIT);
        }
    
        /**
         * 释放资源
         */
        private void closeResource(Closeable closeable){
            if (closeable != null){
                try {
                    closeable.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    
        /**
         * 添加新客户端
         */
        public synchronized void addClient(ClientHandler clientHandler) {
            this.connectedClients.add(clientHandler);
            System.out.println(getClientName(clientHandler.clientChannel) + "上线");
        }
    
        /**
         * 移除异常客户端
         */
        public synchronized void removeClient(ClientHandler clientHandler) {
            this.connectedClients.remove(clientHandler);
            System.out.println(getClientName(clientHandler.clientChannel) + "下线");
            closeResource(clientHandler.clientChannel);
        }
    
        /**
         * 转发 self 的消息给 connectedClients 中其他的信息
         */
        private synchronized void forwardMessage(AsynchronousSocketChannel self, String fwdMsg) {
            for (ClientHandler clientHandler: this.connectedClients) {
                // 不转发给自身
                if (clientHandler.clientChannel.equals(self)) {
                    continue;
                }
                // 转发给其他客户端
                try {
                    clientHandler.clientChannel.write(charset.encode(getClientName(self) + fwdMsg), null, clientHandler);
                }catch (Exception e){
                    // 捕获异常是为了避免某个客户端出意外而导致整个系统瘫痪
                    e.printStackTrace();
                }
            }
        }
    }
    

# 6.3 客户端实现

  • ChatClient

    public class ChatClient {
        private static final String LOCALHOST = "localhost";
        private static final int DEFAULT_PORT = 8888;
        private static final String QUIT = "quit";
        private static final int BUFFER = 1024;
    
        private Charset charset = Charset.forName("UTF-8");
    
        private String host;
        private int port;
    
        private AsynchronousSocketChannel clientSocketChannel;
    
        public ChatClient(){
            this(LOCALHOST, DEFAULT_PORT);
        }
    
        public ChatClient(String host, int port) {
            this.host = host;
            this.port = port;
        }
    
        /**
         * 启动客户端
         */
        public void start(){
    
            try {
                // 获得一个客户端异步通道
                clientSocketChannel = AsynchronousSocketChannel.open();
                // 连接服务端
                Future<Void> connectFuture = clientSocketChannel.connect(new InetSocketAddress(this.host, this.port));
    
                // ======================================
                // 此处可以加一些其他的操作,因为是异步非阻塞的
                // ======================================
    
                // 等待连接结束
                connectFuture.get();
    
                // 处理用户输入
                new Thread(new UserInputHandler(this)).start();
    
                // 接收其他客户端的消息
                ByteBuffer buffer = ByteBuffer.allocate(BUFFER);
                while (true) {
                    Future<Integer> readFuture = clientSocketChannel.read(buffer);
                    int result = readFuture.get();
                    if (result <= 0){
                        System.out.println("服务器断开...");
                        break;
                    }
                    buffer.flip();
                    System.out.println(charset.decode(buffer));
                    buffer.clear();
                }
            } catch (IOException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            } finally {
                closeResource(clientSocketChannel);
            }
    
        }
    
        /**
         * 向服务器发送信息
         */
        public void send(String msg) {
            if (msg.isEmpty()) {
                return;
            }
            Future<Integer> writeFuture = clientSocketChannel.write(charset.encode(msg));
            try {
                writeFuture.get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 判断客户端是否要下线
         */
        public boolean readyToQuit(String msg){
            return msg.equals(QUIT);
        }
    
        /**
         * 释放资源
         */
        private void closeResource(Closeable closeable){
            if (closeable != null){
                try {
                    closeable.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    
  • ChatClientStarter

    public class ChatClientStarter {
        public static void main(String[] args) {
            new ChatClient("localhost", 9999).start();
        }
    }
    
  • UserInputHandler

    public class UserInputHandler implements Runnable {
    
        private ChatClient chatClient;
    
        public UserInputHandler(ChatClient chatClient) {
            this.chatClient = chatClient;
        }
    
        @Override
        public void run() {
            try {
                BufferedReader consoleReader = new BufferedReader(new InputStreamReader(System.in));
                while (true) {
                    String input = consoleReader.readLine();
                    // 向服务器发送信息
                    chatClient.send(input);
    
                    // 检查用户是否退出
                    if (chatClient.readyToQuit(input)) {
                        break;
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    

# 6.4 演示

image-20210924200423608

# 五、使用场景

  • BIO 方式适用于连接数目比较小且固定的架构,这种方式对服务器资源要求比较高,并发局限于应用中,JDK1.4 以前的唯一选择,但程序直观简单易理解。
  • NIO 方式适用于连接数目多且连接比较短(轻操作)的架构,比如聊天服务器,并发局限于应用中,编程比较复杂,JDK1.4 开始支持。
  • AIO 方式使用于连接数目多且连接比较长(重操作)的架构,比如相册服务器,充分调用 OS 参与并发操作,编程比较复杂,JDK7 开始支持。

# 六、Web 服务器

# 1. Tomcat 架构

image-20210925082554884
  • Server

    • Tomcat 服务器的最顶层组件;
    • 负责运行 Tomcat 服务器;
    • 负责加载服务器资源和环境变量;
  • Service

    • 集合 Connector 和 Engine 等其他重要组件的抽象组件;
    • 一个 Server 可以包含多个 Server;
    • 一个 Server 可以包含多个 Connector 和一个 Engine;
  • Connector、Processor

    • Connector 提供基于不同特定协议的实现;
    • Connector 负责接收请求和回传响应;
    • Processor 负责接收来自 Connector 的请求并对其进行解析和处理,并派遣至 Engine 进行处理;
  • Engine

    • Engine 可以理解为容器,Tomcat 中一个处理请求的组件;
    • 容器内部的组件按照层级排列;
    • Engine 是容器的顶层组件;
    • Engine 根据请求的内容进行一定的解析,然后进一步派遣请求;
  • Host

    • Host 代表一个虚拟主机;
    • 一个 Engine 可以支持多个 Host 的请求;
    • Engine 通过解析请求来决定将请求发送给哪一个 Host;
  • Context

    • 每一个 Context 代表一个 Web Application;
    • Tomcat 最复杂的组件之一;
    • 负责应用资源管理、应用类加载、Servlet 管理和安全管理等;
  • Wrapper

    • Wrapper 是容器的最底层组件;
    • 包裹住 Servlet 实例;
    • 负责管理 Servlet 实例的生命周期;
  • Servlet

    • Servlet 是运行在 Web 服务器或应用服务器上的程序,它是作为来自 Web 浏览器或其他 HTTP 客户端的请求和 HTTP 服务器上的数据库或应用程序之间的中间层。
    • 使用 Servlet,可以收集来自网页表单的用户输入,呈现来自数据库或者其他源的记录,还可以动态创建网页。

# 2. 实现简易版 Web 服务器

  • 待补充。
上次更新: 10/27/2021, 10:33:23 PM