# 6. RabbitMQ 支持的消息模型

image-20210319171402317

# 6.0 模型总览

image-20200921092035006

image-20200921092049674

注意:使用的时候需要先在项目中注入 rabbitmq 的依赖坐标:

<dependency>
  <groupId>com.rabbitmq</groupId>
  <artifactId>amqp-client</artifactId>
  <version>5.9.0</version>
</dependency>

# 6.1 Hello World 模型

image-20200921115219866

在上图的模型中,有以下概念:

  • P:生产者,也就是要发送消息的程序
  • C:消费者:消息的接受者,会一直等待消息到来。
  • queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。

# 6.1.1 生产者

public class Provider {
    
    public static void main(String[] args) throws IOException, TimeoutException {
        //1. 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //2. 配置连接工厂
        connectionFactory.setHost("172.16.208.138");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("hedon");
        connectionFactory.setPassword("hedon");
        connectionFactory.setVirtualHost("/hedon");
        //3. 获取连接对象
        Connection connection = connectionFactory.newConnection();
        //4. 创建通道
        Channel channel = connection.createChannel();
        /**
         * 5. 申明队列
         *
         * 参数1(queue):队列名,没有的话会自动创建
         * 参数2(durable):是否持久化,在 RabbitMQ 重启时是否会自动删除该队列(即使保存队列,队列中的消息还是会被清空)
         * 参数3(exclusive):是否独占队列
         * 参数4(autoDelete):当队列中没有消息时是否自动删除
         * 参数5(arguments):队列的其他属性,这里可以填构造方法中的一些参数
         */
        channel.queueDeclare("hello",false,false,false,null);
        /
        /**
         * 6. 发布信息
         *
         * 参数1(exchange):交换机,直连模式不需要交换机
         * 参数2(routingKey):路由key,直连模式就是对列名称
         * 参数3(props):其他属性
         * 参数4(body):要发布的信息,需要转为字节码
         */
        channel.basicPublish("","hello",null,"hello rabbitmq".getBytes());

        //7. 关闭通道
        channel.close();
        //8. 关闭连接
        connection.close();
    }
}

生产者运行6次,会发送6条消息到通道中:

image-20200921115504489

# 6.1.2 消费者

public class Consumer {

    public static void main(String[] args) throws IOException, TimeoutException {
        //1. 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //2. 配置连接工厂
        connectionFactory.setHost("172.16.208.138");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("hedon");
        connectionFactory.setPassword("hedon");
        connectionFactory.setVirtualHost("/hedon");
        //3. 获取连接对象
        Connection connection = connectionFactory.newConnection();
        //4. 创建通道
        Channel channel = connection.createChannel();
        /**
         * 5. 申明队列 => 这里需要跟发布者一一对应
         * 
         * 参数1(queue):队列名,没有的话会自动创建
         * 参数2(durable):是否持久化,在 RabbitMQ 重启时是否会自动删除该队列(即使保存队列,队列中的消息还是会被清空)
         * 参数3(exclusive):是否独占队列
         * 参数4(autoDelete):当队列中没有消息时是否自动删除
         * 参数5(arguments):队列的其他属性,这里可以填构造方法中的一些参数
         */
        channel.queueDeclare("hello",false,false,false,null);
        /**
         * 6. 获取消息
         *
         * 参数1(queue):队列名
         * 参数2(autoAck):如果服务器应考虑消息传递后已确认,则为true
         * 参数3(callback):一个实现了 Consumer 接口的对象
         */
        channel.basicConsume("hello",true,new DefaultConsumer(channel){
            /**
             * 7. 处理信息的接收
             *
             * @param consumerTag  消费者标签
             * @param envelope     消息的打包数据
             * @param properties   AMQP的属性,消息的内容头信息
             * @param body         信息的内容
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //输出消息
                System.out.println(new String(body));
            }
        });

        //这里不关闭的话会一直监听,一但有消息进入,就会输出。
        //channel.close();
        //connection.close();

    }
}

消费者运行,会一直监听,把所有消息都输出,如果有新的消息进入,也会输出:

image-20200921115652871

image-20200921115709080

# 6.2 Work Queues 模型

Work queues,也被称为(Task queues),任务模型。当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用work 模型:让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。

image-20200921130118819

# 6.2.1 平均消费

# 6.2.1.1 生产者

public class Provider {

    public static void main(String[] args) throws IOException {
        //1. 通过自行封装的方法获取连接对象
        Connection connection = RabbitMqUtils.getConnection();
        //2. 获取通道对象
        Channel channel = connection.createChannel();
        //3. 申明队列
        channel.queueDeclare("work",true,false,false,null);
        //4. 发布信息
        for (int i = 0; i < 100; i++) {
            channel.basicPublish("","work",null,("发布消息"+i).getBytes());
        }
        //5. 通过自行封装的方法关闭资源
        RabbitMqUtils.closeChannelAndConnection(channel,connection);
    }
}

# 6.2.1.2 消费者一

public class ConsumerOne {

    public static void main(String[] args) throws IOException {
        //1. 通过自行封装的方法获取连接对象
        Connection connection = RabbitMqUtils.getConnection();
        //2. 获取通道对象
        Channel channel = connection.createChannel();
        //3. 申明队列
        channel.queueDeclare("work",true,false,false,null);
        //4. 消费消息
        channel.basicConsume("work",new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("这是消费者1在消费信息:" + new String(body));
            }
        });
    }
}

# 6.2.1.3 消费者二

public class ConsumerTwo {

    public static void main(String[] args) throws IOException {
        //1. 通过自行封装的方法获取连接对象
        Connection connection = RabbitMqUtils.getConnection();
        //2. 获取通道对象
        Channel channel = connection.createChannel();
        //3. 申明队列
        channel.queueDeclare("work",true,false,false,null);
        //4. 消费消息
        channel.basicConsume("work",new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("这是消费者2在消费信息:" + new String(body));
            }
        });
    }
}

# 6.2.1.4 测试

先运行两个消费者,然后运行生产者产生消息,观察两个消费者消费消息的情况:

image-20200922170812945

image-20200922170825819

通过上图我们可以发现 work 模型在默认情况下,RabbitMQ 会按顺序将每个消息发送给下一个消费者者。平均而言,每个消费者都会收到相同数量的消息。这种分发消息的方式称为轮询。

# 6.2.2 能者多劳

# 6.2.2.1 平均消费可能出现的问题

在平均消费模式中,消费者只要从队列中拿到消息,就立刻发送确认机制,有可能在处理消息的时候就突然宕机了或者出现意外了,这样消息还没来得及消费就遗失了,就造成业务数据的丢失。

另外,也有可能两个消费者处理消息的效率不一样,就有可能造成一个消费者已经消费完消息然后闲着,而另外一个消费者拿到了消息,却一直处于处理消息的状态,造成资源的浪费。

所以我们需要做==三件事==:

  • 关闭消息自动确认机制

    //第二个参数 = false:关闭消息自动确认机制
    channel.basicConsume(队列名,false,new DefaultConsumer(channel){
      ....
    })
    
  • 不能一次性把消息交给消费者

    channel.basicQos(1); //每一次只消费一个消息
    
  • 主动进行确认

    //参数1:确认的是队列中的哪个具体消息
    //参数2:是否开启多个消息同时确认
    channel.basicAck(envelope.getDeliveryTag(),false);
    

# 6.2.2.2 修改消费者

image-20200922173808342

# 6.2.2.3 再次测试

image-20200922173945794

image-20200922174005139

从上述结果中可以发现,由于我先前修改了消费者1的代码,让它的执行时间边长,所以它就只来得及接收并处理一条信息,其他信息都被消费者2接收并处理了,这样就达到了“能者多劳”的效果了。

# 6.3 Fanout 模型

fanout 模型是一种广播模型,也就是一个生产者可以发一个消息,进行广播,让多个消费者消费同一个消息。

image-20200922220711728

在广播模式下,消息发送流程是这样的:

  • 可以有多个消费者
  • 每个消费者有自己的queue(队列)
  • 每个队列都要绑定到Exchange(交换机)
  • 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。
  • 交换机把消息发送给绑定过的所有队列
  • 队列的消费者都能拿到消息。实现一条消息被多个消费者消费

# 6.3.1 生产者

public class Provider {

    public static void main(String[] args) throws IOException {
        //1. 通过自行封装的方法获取连接对象
        Connection connection = RabbitMqUtils.getConnection();
        //2. 获取通道对象
        Channel channel = connection.createChannel();
        //3. 声明交换机 => fanout 模式的交换机 type 需要制定为 fanout
        channel.exchangeDeclare("logs","fanout");
        //4. 广播消息
        channel.basicPublish("logs","",null,"这是一条广播".getBytes());
        //5. 关闭资源
        RabbitMqUtils.closeChannelAndConnection(channel,connection);
    }
}

# 6.3.2 消费者一

public class ConsumerOne {

    public static void main(String[] args) throws IOException {
        //1. 通过自行封装的方法获取连接对象
        Connection connection = RabbitMqUtils.getConnection();
        //2. 获取通道对象
        Channel channel = connection.createChannel();
        //3. 绑定交换机 => fanout 模式的交换机 type 需要制定为 fanout
        channel.exchangeDeclare("logs","fanout");
        //4. 创建临时队列,这里会随机生成一个队列名称
        String queue = channel.queueDeclare().getQueue();
        //5. 将临时队列绑定到交换机上,fanout模式暂时还不需要routingKey
        channel.queueBind(queue,"logs","");
        //6. 消费消息
        channel.basicConsume(queue,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1接收到消息:"+new String(body));
            }
        });
    }
}

消费者二同理。

# 6.3.3 测试

image-20200922222633967

image-20200922222645571

# 6.4 Routing 模型

在 Fanout 模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到 Direct 类型的Exchange。

image-20200922222723227

在 Direct 模型下:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
  • 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey
  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

图解:

  • P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。
  • X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列
  • C1:消费者,其所在队列指定了需要routing key 为 error 的消息
  • C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息

# 6.6.1 生产者

public class Provider {

    public static void main(String[] args) throws IOException {
        //1. 通过自行封装的方法获取连接对象
        Connection connection = RabbitMqUtils.getConnection();
        //2. 获取通道对象
        Channel channel = connection.createChannel();
        //3. 声明交换机 => routing 模型的交换机类型是 direct
        channel.exchangeDeclare("ex_direct","direct");
        //4. 广播消息 => 一条是 error,另一条是 info
        channel.basicPublish("ex_direct","error",null,"这是一条error广播".getBytes());
        channel.basicPublish("ex_direct","info",null,"这是一条info广播".getBytes());
        //5. 关闭资源
        RabbitMqUtils.closeChannelAndConnection(channel,connection);
    }
}

# 6.6.2 消费者一

public class ConsumerOne {

    public static void main(String[] args) throws IOException {
        //1. 通过自行封装的方法获取连接对象
        Connection connection = RabbitMqUtils.getConnection();
        //2. 获取通道对象
        Channel channel = connection.createChannel();
        //3. 绑定交换机 => routing 模型的交换机类型是 direct
        channel.exchangeDeclare("ex_direct","direct");
        //4. 创建临时队列
        String queue = channel.queueDeclare().getQueue();
        //5. 将临时队列绑定到交换机上 => 消费者1只能消费 routingKey 为 error 的消息
        channel.queueBind(queue,"ex_direct","error");
        //6. 消费消息
        channel.basicConsume(queue,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1接收到消息:"+new String(body));
            }
        });
    }
}

# 6.6.3 消费者二

public class ConsumerTwo {

    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMqUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare("ex_direct","direct");
        String queue = channel.queueDeclare().getQueue();
        //消费者2可以消费 routingKey 为 info、error 和 warning 的消息
        channel.queueBind(queue,"ex_direct","info");
        channel.queueBind(queue,"ex_direct","error");
        channel.queueBind(queue,"ex_direct","warning");
        channel.basicConsume(queue,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("这是消费者2在消费消息:"+new String(body));
            }
        });
    }
}

# 6.6.4 测试

image-20200922223137183

image-20200922223152961

# 6.5 Topics 模型

Topics类型的ExchangeDirect相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!这种模型Routingkey 一般都是由一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert

image-20200922223252428

# 统配符
		*   匹配不多不少恰好1个词
		#   匹配一个或多个词
# 如:
		audit.#    匹配audit.irs.corporate或者 audit.irs 等
    audit.*    只能匹配 audit.irs

# 6.5.1 生产者

public class Provider {

    public static void main(String[] args) throws IOException {
        //1. 通过自行封装的方法获取连接对象
        Connection connection = RabbitMqUtils.getConnection();
        //2. 获取通道对象
        Channel channel = connection.createChannel();
        //3. 声明交换机 => topics 模型的交换机类型为 topic
        channel.exchangeDeclare("ex_topic","topic");
        //4. 广播消息
        channel.basicPublish("ex_topic","logs",null,"这是一条 logs 广播".getBytes());
        channel.basicPublish("ex_topic","logs.a",null,"这是一条 logs.a 信息".getBytes());
        channel.basicPublish("ex_topic","logs.a.b",null,"这是一条 logs.a.b 广播".getBytes());
        channel.basicPublish("ex_topic","logs.a.b.c",null,"这是一条 logs.a.b.c 广播".getBytes());
        //5. 关闭资源
        RabbitMqUtils.closeChannelAndConnection(channel,connection);
    }
}

# 6.5.2 消费者一

public class ConsumerOne {

    public static void main(String[] args) throws IOException {
        //1. 通过自行封装的方法获取连接对象
        Connection connection = RabbitMqUtils.getConnection();
        //2. 获取通道对象
        Channel channel = connection.createChannel();
        //3. 绑定交换机 => topics 模型的交换机类型为 topic
        channel.exchangeDeclare("ex_topic","topic");
        //4. 创建临时队列
        String queue = channel.queueDeclare().getQueue();
        //5. 将临时队列绑定到交换机上 => 消费者1只能消费 routingKey 为 error 的消息
        //routingKey 的每个单词之间用"."来分割,通配符 * 表示只能匹配1个单词,如 logs.a
        channel.queueBind(queue,"ex_topic","logs.*");
        //6. 消费消息
        channel.basicConsume(queue,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1接收到消息:"+new String(body));
            }
        });
    }
}

# 6.5.3 消费者二

public class ConsumerTwo {

    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMqUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare("ex_topic","topic");
        String queue = channel.queueDeclare().getQueue();
        //消费者2可以消费 routingKey 为 info、error 和 warning 的消息
        //每个单词之间用"."来分割,通配符 # 表示可以匹配多个单词,如 logs.a,logs.a.b
        channel.queueBind(queue,"ex_topic","logs.#");
        channel.basicConsume(queue,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("这是消费者2在消费消息:"+new String(body));
            }
        });
    }
}

# 6.5.4 测试

如下图,消费者1的 routingKey 为“logs.*”,所以 logs. 后面必须有一个单词才能被消费者1消费,而消费者2的 routingKey 为“logs.#”,所以 logs. 后面没有单词或者有多个单词都可以被消费者2消费。

image-20200922224334440

image-20200922224348242

# 6.6 RPC 模型

在 Work Queues 模型中,我们学习了如何使用工作队列在多个工作人员之间分配耗时的任务。

但是,如果我们需要在远程计算机上运行功能并等待结果怎么办?那就可以用 RPC 模型(远程过程调用)。

我们将使用 RabbitMQ 构建RPC系统:客户端和可伸缩 RPC 服务器。由于我们没有值得分配的耗时任务,因此我们将创建一个虚拟 RPC服务,该服务返回斐波那契数。

image-20200922230825652

==图解:==

  • 对于RPC请求,客户端发送一条消息,该消息具有两个属性:replyTo(设置为仅为该请求创建的匿名互斥队列)和correlationId(设置为每个请求的唯一值)。

  • 该请求被发送到rpc_queue队列。

  • RPC工作程序(又名:服务器)正在等待该队列上的请求。出现请求时,它会使用 replyTo 字段中的队列来完成工作,并将消息和结果发送回客户端。

  • 客户端等待答复队列中的数据。出现消息时,它将检查correlationId属性。如果它与请求中的值匹配,则将响应返回给应用程序。

==补充:消息的属性==

image-20200922232442807

# 6.6.1 客户端

public class RPCClient {

    private Connection connection;
    private Channel channel;
    private String requestQueueName = "rpc_queue"; //发送请求的队列名称

    public static void main(String[] args) throws IOException, InterruptedException {
        //初始化 RPCClient
        RPCClient rpcClient = new RPCClient();
        rpcClient.connection = RabbitMqUtils.getConnection();
        rpcClient.channel = rpcClient.connection.createChannel();
        //发送 request 请求信息
        for (int i = 0; i < 5; i++) {
            String i_str = Integer.toString(i);
            System.out.println("现在客户端希望计算 fic("+i+")");
            String response = rpcClient.call(i_str);
            System.out.println("现在计算出来 fic("+i+") = "+ response);
        }
        //关闭资源
        close(rpcClient);
    }

    //发送请求,希望调用远程的函数
    public String call(String message) throws IOException, InterruptedException {
        //定义一个相关ID
        final String correlationId = UUID.randomUUID().toString();
        //回调队列
        String replyQueueName = channel.queueDeclare().getQueue();
        //定义消息属性
        AMQP.BasicProperties basicProperties = new AMQP.BasicProperties
                .Builder()
                .correlationId(correlationId)  //设置相关ID
                .replyTo(replyQueueName)       //设置回调队列
                .build();

        //存储相应信息
        final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);

        //发送消息
        channel.basicPublish("",requestQueueName,basicProperties,message.getBytes("UTF-8"));

        //消费消息
        String ctag = channel.basicConsume(replyQueueName, true
            //参数3:服务器端传过来的回调对象
          , new DeliverCallback() {
            @Override
            public void handle(String consumerTag, Delivery message) throws IOException {
                if (message.getProperties().getCorrelationId().equals(correlationId)) {
                    response.offer(new String(message.getBody(), "UTF-8"));
                }
            }
        }, new CancelCallback() {
            @Override
            public void handle(String consumerTag) throws IOException {

            }
        });
        String result = response.take();
        channel.basicCancel(ctag);
        return result;
    }

    //关闭资源
    public static void close(RPCClient rpcClient){
        RabbitMqUtils.closeChannelAndConnection(rpcClient.channel,rpcClient.connection);
    }

}

# 6.6.2 服务端

public class RPCServer {

    //定义队列名称
    private static final String RPC_QUEUE_NAME = "rpc_queue";

    //计算斐波那契数列
    public static int fic(int a){
        if (a == 0) return 0;
        if (a == 1) return 1;
        return fic(a-1) + fic(a-2);
    }

    public static void main(String[] args) throws IOException {
        //调用自行封装的工具类获取连接对象
        Connection connection = RabbitMqUtils.getConnection();
        //获取通道
        Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(RPC_QUEUE_NAME,false,false,false,null);
        //清除队列
        channel.queuePurge(RPC_QUEUE_NAME);
        //每次处理一条信息
        channel.basicQos(1);
        //定义一个监听器
        Object monitor = new Object();
        //定义回调信息
        DeliverCallback deliverCallback = new DeliverCallback() {
            /**
             * @param consumerTag 消费者标签,可以与消费者建立联系
             * @param message     消费者发送过来的消息(消息属性、消息封装体、消息没人)
             */
            @Override
            public void handle(String consumerTag, Delivery message) throws IOException {
                //定义信息属性
                AMQP.BasicProperties basicProperties = new AMQP.BasicProperties
                                                    .Builder()
                                                    .correlationId(message.getProperties().getCorrelationId()) //指明关联ID
                                                    .build();
                //定义相应信息
                String response = "";
                //解析request信息
                try{
                    String s = new String(message.getBody(),"UTF-8");
                    int i = Integer.parseInt(s);
                    System.out.println("正在计算 fic("+i+")");
                    response += fic(i);
                }catch (Exception e){
                    e.printStackTrace();
                }finally {
                    //发布 response 消息
                    channel.basicPublish("",message.getProperties().getReplyTo(),basicProperties,response.getBytes("UTF-8"));
                    //手动确认信息
                    channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
                    //RabbitMq 消费者工作线程通知 RPC 服务器所有者线程
                    synchronized (monitor){
                        /**
                         * notify()
                         *
                         * 唤醒处于等待的线程
                         */
                        monitor.notify();
                    }
                }
            }
        };
        //消费客户端发送过来的请求消息
        channel.basicConsume(RPC_QUEUE_NAME, false, deliverCallback, new CancelCallback() {
            // 等待并准备好使用来自RPC客户端的消息
            @Override
            public void handle(String consumerTag) throws IOException {
                while (true){
                    synchronized (monitor){
                        try {
                            /**
                             * wait()
                             *
                             * 使得当前线程立刻停止运行,处于等待状态(WAIT),
                             * 并将当前线程置入锁对象的等待队列中,
                             * 直到被通知(notify)或被中断为止。
                             */
                            monitor.wait();
                        }catch (InterruptedException e){
                            e.printStackTrace();
                        }
                    }
                }
            }
        });
    }
}

# 6.6.3 测试

image-20200923102623691

image-20200923102637743

# 6.6.4 图解上述两段代码

上面两段代码是笔者模仿官网代码写出来的,刚开始写的时候也是一头雾水,现尝试画张图来加深理解。

image-20200923112003146

# 6.7 Publisher Confirms 模型

待补。

上次更新: 8/29/2022, 12:17:25 AM