什么是消息中间件
消息中间件是用于在分布式系统中传递和处理消息的软件组件或服务,它提供了一种可靠、解耦合和异步通信的方式,使得不同的应用程序和服务可以相互协作和通信。通俗来说,实现就是软件和软件之间通信的软件。
有哪些主流的消息中间件
- Apache Pulsar(2018年):一款新兴的分布式消息中间件,使用多租户、无共享的架构,具有高性能、高可靠性和灵活性的特点,适用于云原生应用和大规模数据处理等场景。
- Apache Kafka(2011年):一款高吞吐量、低延迟的分布式消息中间件,基于发布-订阅模式,适用于大数据处理和实时流处理等场景。
- RabbitMQ(2007年):一款开源的AMQP(高级消息队列协议)消息中间件,提供可靠、可扩展的消息传递机制,适用于各种场景。
- ActiveMQ(2004年):一款开源的JMS(Java消息服务)消息中间件,支持消息的异步通信和事务处理,具有高可靠性和可扩展性。
- Redis(2009年):一款开源的内存数据存储系统,同时也支持消息中间件功能,适用于高并发和实时通信等场景。
为什么选择RabbitMQ
目前最受欢迎的消息中间件有两个,分别是Kafka和RabbitMQ。Kafka和RabbitMQ都是功能强大的消息中间件,但它们的设计理念和使用场景略有不同。
Kafka和RabbitMQ都支持发布/订阅模式,但它们的实现方式有所不同。Kafka的设计重点是高吞吐量和低延迟的流处理,适用于处理大量实时数据的场景;而RabbitMQ的设计则更加注重消息的可靠性和灵活性,适用于需要确保消息可靠传递和具有更高处理能力的场景。
因此,在一些对实时性和吞吐量有较高要求的场景,比如流处理和实时数据处理等方面,Kafka可能更为适合;而在一些更加注重消息的可靠性和传递的场景,比如任务调度、异步通信和事务处理等方面,RabbitMQ可能更为适合。
所以,中间件的选择不过是取决于具体的应用场景和需求。
为什么RabbitMQ高效
- AMQP协议:RabbitMQ基于AMQP(Advanced Message Queuing Protocol)协议,这是一种高级消息队列协议。AMQP协议采用二进制协议格式,使得消息传递效率更高。
- 基于Erlang语言:RabbitMQ的底层是基于Erlang语言实现的,Erlang是一种并发编程语言,具有轻量级进程和可扩展的消息传递机制,使得RabbitMQ具有高并发和高可用性的特点。
- 高效的消息路由机制:RabbitMQ使用一种称为Exchange的机制,可以根据消息的路由键将消息分发到相应的队列中,同时支持多种路由算法和交换机类型,使得消息的路由效率更高。
- 异步IO和多线程机制:RabbitMQ使用多个线程来处理消息的投递和消费,同时采用异步IO机制,使得消息传递效率更高。
- 可扩展性:RabbitMQ采用分布式架构,支持多个节点之间的消息传递和负载均衡,可以有效地提高系统的可扩展性和可用性。
RabbitMQ的执行流程结构图
- Broker:接收和分发消息的应用,RabbitMQ就是Message Broker
- Virtual Host:虚拟Broker,将多个单元隔开
- Connection:Publisher/Consumer和Broker之间的TCP连接
- Channel:Connection内部建立的逻辑连接,通常每个线程创建单独的Channel
- Routing Key:路由键,用来指示路由的消息转发
- Exchange:交换机,可以接受Publisher发送的消息,并根据Routing Key将消息发送给指定的Queue
- Queue:存放Exchange发送过来的消息,最终被Consumer消费
- Bingding:Exchange和Queue之间的虚拟连接,用于message的分发
RabbitMQ工作原理
- 生产者是连接到 Server,建立一个连接,开启一个信道。
- 生产者声明交换器和队列,设置相关属性,并通过路由键将交换器和队列进行绑定。
- 消费者也需要进行建立连接,开启信道等操作,便于接收消息。
- 生产者发送消息,发送到服务端中的虚拟主机。
- 虚拟主机中的交换器根据路由键选择路由规则,发送到不同的消息队列中。
- 订阅了消息队列的消费者就可以获取到消息,进行消费。
RabbitMQ的核心Exchange交换机
其实RabbitMQ的Exchange跟现实中交换机实现数据传输逻辑相同。这里用交换机设备来类比Exchange路由转发流程。物理交换机上有很多的网口,每个网口都有一个网口编号,比如LAN1、LAN2等。交换机通过网线连接LAN口和另一头的子设备。那么这里我们可以将子设备比做RabbitMQ中的Queue,Bingding就是类比网线,Bingding Key就是网口编号。
当消费者Publisher通过Routing Key路由键将消息发送到Exchange中,Exchange就根据传过来的Routing Key和Bingding Key做对比后,将消息转发到具体的Queue中。而Exchange转发消息常用的规则有三种,分别是direct、fanout和topic。
- Direct Exchange:这种转发模式就是对比Routing Key和Bingding Key,如果两者的值一样,就转发到Bingding Key对应的Queue中
- Fanout Exchange:这种转发模式不需要对比Routing Key和Bingding Key,直接将消息转发到所有该Exchange绑定的所有Queue。可以理解物理交换机中接受到路由消息后,直接向所有网口发送数据
- Topic Exchange:这种转发模式也是要对比Routing Key和Bingding Key,但是不同的是,RabbitMQ为Bingding Key提供一系列通配规则(不仅仅是简单的字符串,必须要加上”.”符号连接)。通配符有两种:”*” 、 “#”。
- *符号:有且只匹配一个词。比如Bingding Key是”a.*”,那么当Routing Key是”a.b”、”a.c”时,可以发送到对应Queue中,但是如果Routing Key是”a.b.c”,则无法发送到指定Queue。
- #符号:匹配一个或多个词。比如”rabbit.#”既可以匹配到”rabbit.a.b”、”rabbit.a”,也可以匹配到”rabbit.a.b.c”。
如果想更好的在感官上了解Exchange的转发机制,可以通过RabbitMQ模拟器模拟消息转发:RabbitMQ Simulator
RabbitMQ在ubuntu上的安装
RabbitMQ 服务器是用 Erlang 语言编写的,它的安装包里并没有集成 Erlang 的环境,因此需要先安装 Erlang。
sudo apt install erlang
erl --version
Erlang 安装成功后,就可以安装 RabbitMQ 了。
# 更新软件包
sudo apt upgrade
# 安装rabbitMQ
sudo apt install rabbitmq-server
# 查看rabbitMQ状态
systemctl status rabbitmq-server
启动RabbitMQ的Web服务
通过命令开启插件
sudo rabbitmq-plugins enable rabbitmq_management
访问路径http://localhost:15672,默认账户密码都是guest
RabbitMQ添加账户
# 添加账号密码
rabbitmqctl add_user admin 123456
# 添加访问权限
rabbitmqctl set_premissions -p "/" admin ".*" ".*" ".*"
# 设置超级权限
rabbitmqctl set_user_tags admin administator
RabbitMQ的命令行
sudo rabbitmqctl --help
项目实战
待写~
如何保证消息的可靠性
发送端确认机制(生产端)
生产者发送消息到 RabbitMQ 时,可以通过开启发送端确认机制来确保消息的可靠性。在发送端确认模式下,生产者发送消息到 RabbitMQ 后,RabbitMQ 会给生产者返回一个确认信息,表示消息已经成功到达 RabbitMQ 服务器。
如果生产者在指定的时间内没有收到服务器的确认信息,那么就认为消息发送失败。生产者可以根据需要选择同步确认或异步确认。
// 同步确认机制,在开启confirmSelect方法后,通过在发送消息后立即调用waitForConfigrms方法获取bool结果
try {
channel.confirmSelect(); // 启动发送端确认模式
channel.basicPublish(exchangeName, routingKey, null, messageBody.getBytes());
if (channel.waitForConfirms()) {
System.out.println("单条消息发送成功");
} else {
System.out.println("单条消息发送失败");
}
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
// 异步确认,在开启confirmSelect方法后,通过调用addConfirmListener方法并传入ConfirmLIstener对象实现异地确认
final SortedSet<Long> unconfirmedSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
try(Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel();){
String message = objectMapper.writeValueAsString(orderMessageDTO);
// 消息确认集合
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("确认成功,{} : {}", deliveryTag, multiple);
if (multiple) {
unconfirmedSet.headSet(deliveryTag + 1).clear();
} else {
unconfirmedSet.remove(deliveryTag);
}
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("确认失败,{} : {}", deliveryTag, multiple);
}
});
for (int i = 0; i < 10; i++) {
System.out.println("订单微服务向商家微服务发送消息");
long nextPublishSeqNo = channel.getNextPublishSeqNo();
channel.basicPublish(exchangeName, routingKey, null, messageBody.getBytes());
unconfirmedSet.add(nextPublishSeqNo);
}
} catch (Exception e) {
System.out.println(e.getMessage());
}
while (!unconfirmedSet.isEmpty()){
Thread.sleep(100);
}
消息路由返回机制(生产端)
在 RabbitMQ 中,可以使用消息返回机制来处理不能路由的消息。当一条消息无法被路由到目标队列或订阅的消费者时,RabbitMQ 会将消息返回给生产者。我们可以通过设置回调函数来处理返回的消息。
设置basicPublish中mandatory属性为true,并回调监听:
try (Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel();) {
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int i, String s, String s1, String s2, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {
log.info("消息路由不到队列,返回的{},{},{},{},{},{}", i, s, s1, s2, basicProperties, new String(bytes));
}
});
channel.basicPublish(exchangeName, routingKey, modatory, null, messageBody.getBytes()); // 将modatory设置为true
}
第二种监听方式,这个aReturn对象中就包含了上面的全部参数:
channel.addReturnListener(new ReturnCallback() {
@Override
public void handle(Return aReturn) {
log.info("消息路由不到队列,返回的{}",aReturn);
}
});
消费端确认机制
消费者在接收到消息后,需要向 RabbitMQ 服务器确认已经处理完该消息。如果消费者没有确认该消息,那么 RabbitMQ 就会认为该消息没有被成功处理,重新将该消息投递给消费者,直到被确认为止。
- 在消费者中开启手动确认模式:在消费者创建消费者实例时,通过设置 channel.basicConsume() 方法的第二个参数 autoAck 为 false,即可开启手动确认模式。
- 消费者处理完消息后发送确认回执:在消费者处理完消息后,可以通过
channel.basicAck()
方法发送确认回执,告诉 Broker 该消息已经被成功处理。 - 消费者处理消息出现异常时发送拒绝回执:如果消费者在处理消息时出现异常,可以通过
channel.basicNack()
方法发送拒绝回执,告诉 Broker 该消息未被正确地处理。此时可以选择将该消息重新投递到队列中(设置requeue
参数为 true),或者将该消息丢弃(设置requeue
参数为 false)。
try{
// 模拟消费者处理消息的过程
String messageBody=new String(message.getBody(),"UTF-8");
System.out.println("Received message: "+messageBody);
// 手动发送确认回执
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}catch(Exception e){
// 处理消息出现异常,发送拒绝回执,将消息重新投递到队列中
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
}
消费端限流机制
消费端可以通过限流机制来控制消费者接收消息的数量,从而避免因过多的消息导致消费者崩溃。在 RabbitMQ 中,消费端可以使用 basicQos 方法来限流,它允许消费者在处理完指定数量的消息后,再接收新的消息。
channel.basicQos(1);
消息过期机制
在 RabbitMQ 中,消息可以设置过期时间。如果消息在指定的时间内没有被消费者消费,那么就会被自动删除。
// 单条过期消息的设置(在队列发送处)
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().expiration("15000").build();
channel.basicPublish(
exchangeName, routingKey, basicProperties, messageBody.getBytes()
);
// 队列内统一设置消息过期(在队列申明处)
HashMap<String, Object> args = new HashMap<>();
args.put("x-message-ttl",15000);
channel.queueDeclare(
"queue.restaurant",
true,
false,
false,
args
);
死信队列
当消息无法被消费时(超时、超容、拒收),RabbitMQ 会将该消息转移到一个特殊的队列,称为死信队列。可以对死信队列进行监控和处理,以确保所有的消息都能得到处理。
// 声明死信交换机和队列并绑定
channel.exchangeDeclare(
"exchange.dlx",
BuiltinExchangeType.TOPIC,
true,
false,
null
);
channel.queueDeclare(
"queue.dlx",
true,
false,
false,
null
);
channel.queueBind(
"queue.dlx",
"exchange.dlx",
"#"
);
死信队列不需要做任何设置,需要设置的是普通队列,需要给普通队列添加x-dead-letter-exchange属性,让队列中的消息在异常时路由到死信交换机。
HashMap<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 15000);
args.put("x-dead-letter-exchange","exchange.dlx");
channel.queueDeclare(
"queue.restaurant",
true,
false,
false,
args
);