介绍 MQ 的概念,缺点,AMQP和 JMS
chou403
/ MQ
/ c:
/ u:
/ 19 min read
概述
介绍
消息队列是典型的: 生产者,消费者模型。生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,这样就实现了生产者和消费者的解耦。
本质上说消息队列就是一个队列结构的中间件,也就是说消息放入这个中间件之后就可以直接返回,并不需要系统立即处理,而另外会有一个程序读取这些数据,并按顺序进行逐次处理。
也就是说当你遇到一个并发特别大并且耗时特别长同时还不需要立即返回处理结果,使用消息队列可以解决这类问题。
消息队列主要解决了应用耦合,异步处理,流量削锋等问题。
当前使用较多的消息队列有 RabbitMQ,RocketMQ,ActiveMQ,Kafka,ZeroMQ,MetaMQ等,而部分数据库如 Redis,Mysql以及PhxSQL也可实现消息队列的功能。
应用场景
- 数据冗余: 比如订单系统,后续需要严格的进行数据转换和记录,消息队列可以把这些数据持久化的存储在队列中,然后有订单,后续处理程序进行获取,后续处理完之后在把这条记录进行删除来保证每一条记录都能够处理完成。
- 系统解耦: 使用消息系统之后,入队系统和出队系统是分开的,也就说只要一天崩溃了,不会影响另外一台系统正常运转。
- 流量削锋: 例如秒杀和抢购,我们可以配合缓存来使用消息队列,能够有效的顶住瞬间访问量,防止服务器承受不住导致崩溃。
- 异步通信: 消息本身使用入队之后可以直接返回。
- 扩展性: 例如订单队列,不仅可以处理订单,还可以给其他业务使用。
- 排序保证: 有些场景需要按照产品的顺序进行处理比如单进单出从而保证数据按照一定的顺序处理,使用消息队列是可以的。
以上都是消息队列常见的使用场景,消息队列只是一个中间件,可以配合其他产品进行使用。
- 消息通讯: 消息通讯是指,消息队列一般都内置了搞笑的通信机制,因此也可以用作消息通讯。比如实现点对点消息队列,或者聊天室等。
以上实际是消息队列的两种消息模式,点对点或发布订阅模式。
消息队列的缺点
- 降低系统的可用性: 系统引入的外部依赖越多,越容易挂掉。
- 系统复杂度提高: 使用MQ后可能需要保证消息没有被重复消费,处理消息丢失的情况,保证消息传递的顺序性等等问题。
- 一致性问题: A系统处理完了直接返回成功了,但问题是: 要是B,C,D三个系统那里,B和D两个系统写库成功了,结果C系统写库失败了,就造成了数据不一致了。
如何保证消息队列的顺序性
- 生产者有序的情况下,单线程消费来保证消息的顺序性。
- 生产者无序的情况下,对消息进行编号,消费者处理时根据编号判断顺序。多个消费者,可以考虑增加分布式锁。
如何保证消息不被重复消费/如何保证消息的幂等性
- 生产者发送每条数据的时候,里面添加一个全局唯一的id,消费者消费到消息后,先根据消息id去redis中查询,如果redis不存在,就处理消息,然后将消息id写入redis。如果redis中存在,说明消息已经消费过,就不用处理。
- 基于数据库的唯一键,保证重复数据不会重复插入。因为有唯一键的约束,所以重复数据只会插入报错,不会导致数据库中出现脏数据。
AMQP 和 JMS
AMQP
简介
AMQP,即Advanced Message Queuing Protocol(高级消息队列协议),一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件传递消息,不受客户端/中间件不同产品,不同开发语言等条件的限制。该协议是一种二进制协议,提供客户端应用于消息中间件之间异步,安全,高效的交互。相对于我们常见的REST API,AMQP更容易实现,可以降低开销,同时灵活性高,可以轻松的添加负载平衡和高可用性的功能,并保证消息传递,在性能上AMQP协议也相对更好一些。
通俗来说,在异步通讯中,消息不会立刻到达接收方,而是被存放到一个容器中,当满足一定的条件之后,消息会被容器发送给接收方,这个容器即消息队列,而完成这个功能需要双方和容器以及其中的各个组件遵守统一的约定和规则,AMQP就是这样的一种协议,消息发送与接收的双方遵守这个协议可以实现异步通讯。这个协议约定了消息的格式和工作方式。
核心组成
- 消息(Message): 即客户端与消息中间件传送的数据。
- 生产者(Producer): 消息生产者。
- 消费者(Consumer): 消息消费者。
- 连接(Connection): 一个网络连接,比如TCP/IP连接。AMQP连接通常是长连接,当一个应用不再需要连接到AMQP代理的时候,需要释放掉 AMQP 连接,而不是直接将TCP连接关闭。
- 信道(Channel): 网络信道,是建立在Connection连接之上的一种轻量级的连接,可以创建多个信道。
- 交换机(Exchange): 接收消息,并将消息路由转发给消息队列。
- 虚拟主机(Virtual Host): 进行逻辑隔离,一个虚拟主机可以创建若干个交换机和队列。
- 绑定(Binding): 交换机和队列之间的虚拟连接。
- 路由键(Routing Key): 路由规则,虚拟机可以用来确定如何路由一个特定的消息。
- 队列(Queue): 存储即将被消费者消费掉的消息。
- 中间件(Broker ): 实现AMQP实体服务,比如常见的RabbitMQ,Azure Service Bus等。
工作过程
- 生产者发布消息,经由交换机。
- 交换机根据路由规则将收到的消息分发给与该交换机绑定的队列。
- 最后消息中间件会将消息投递给订阅了此队列的消费者,或者消费者按照需求自行获取。
JMS
JMS 简介
JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持(百度百科给出的概述)。我们可以简单的理解: 两个应用程序之间需要进行通信,我们使用一个JMS服务,进行中间的转发,通过JMS 的使用,我们可以解除两个程序之间的耦合。
优势
-
Asynchronous(异步)
JMS is asynchronous by default. So to receive a message, the client is not required to send the request. The message will arrive automatically to the client as they become available.(JMS 原本就是一个异步的消息服务,客户端获取消息的时候,不需要主动发送请求,消息会自动发送给可用的客户端)。
-
Reliable(可靠)
JMS provides the facility of assurance that the message will delivered once and only once. You know that duplicate messages create problems. JMS helps you avoiding such problems.(JMS保证消息只会递送一次。大家都遇到过重复创建消息问题,而JMS能帮你避免该问题。)
JMS 的消息模型
JMS具有两种通信模式:
- Point-to-Point Messaging Domain (点对点)
- Publish/Subscribe Messaging Domain (发布/订阅模式)
在JMS API出现之前,大部分产品使用”点对点”和”发布/订阅”中的任一方式来进行消息通讯。JMS定义了这两种消息发送模型的规范,它们相互独立。任何JMS的提供者可以实现其中的一种或两种模型,这是它们自己的选择。JMS规范提供了通用接口保证我们基于JMS API编写的程序适用于任何一种模型。
####### Point-to-Point Messaging Domain(点对点通信模型)
-
在点对点通信模式中,应用程序由消息队列,发送方,接收方组成。每个消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着消息,直到他们被消费或超时
-
特点
- 每个消息只要一个消费者
- 发送者和接收者在时间上是没有时间的约束,也就是说发送者在发送完消息之后,不管接收者有没有接受消息,都不会影响发送方发送消息到消息队列中。
- 发送方不管是否在发送消息,接收方都可以从消息队列中去到消息(The receiver can fetch message whether it is running or not when the sender sends the message)
- 接收方在接收完消息之后,需要向消息队列应答成功
####### Publish/Subscribe Messaging Domain(发布/订阅通信模型)
- 在发布/订阅消息模型中,发布者发布一个消息,该消息通过topic传递给所有的客户端。该模式下,发布者与订阅者都是匿名的,即发布者与订阅者都不知道对方是谁。并且可以动态的发布与订阅Topic。Topic主要用于保存和传递消息,且会一直保存消息直到消息被传递给客户端。
- 特点
- 一个消息可以传递个多个订阅者(即: 一个消息可以有多个接受方)
- 发布者与订阅者具有时间约束,针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息,而且为了消费消息,订阅者必须保持运行的状态。
- 为了缓和这样严格的时间相关性,JMS允许订阅者创建一个可持久化的订阅。这样,即使订阅者没有被激活(运行),它也能接收到发布者的消息。
JMS接收消息
在JMS中,消息的产生和消息是异步的。对于消费来说,JMS的消息者可以通过两种方式来消费消息。
-
同步(Synchronous)
在同步消费信息模式模式中,订阅者/接收方通过调用 receive()方法来接收消息。在receive()方法中,线程会阻塞直到消息到达或者到指定时间后消息仍未到达。
-
异步(Asynchronous)
使用异步方式接收消息的话,消息订阅者需注册一个消息监听者,类似于事件监听器,只要消息到达,JMS服务提供者会通过调用监听器的onMessage()递送消息。
JMS编程模型
-
管理对象(Administered objects)-连接工厂(Connection Factories)和目的地(Destination)
-
Connection Factories
创建Connection对象的工厂,针对两种不同的jms消息模型,分别有QueueConnectionFactory和TopicConnectionFactory两种。可以通过JNDI来查找ConnectionFactory对象。客户端使用一个连接工厂对象连接到JMS服务提供者,它创建了JMS服务提供者和客户端之间的连接。JMS客户端(如发送者或接受者)会在JNDI名字空间中搜索并获取该连接。使用该连接,客户端能够与目的地通讯,往队列或话题发送/接收消息。
QueueConnectionFactory queueConnFactory = (QueueConnectionFactory) initialCtx.lookup ("primaryQCF"); Queue purchaseQueue = (Queue) initialCtx.lookup ("Purchase_Queue"); Queue returnQueue = (Queue) initialCtx.lookup ("Return_Queue");
-
Destination
目的地指明消息被发送的目的地以及客户端接收消息的来源。JMS使用两种目的地,队列和话题。如下代码指定了一个队列和话题:
-
创建一个队列Session:
QueueSession ses = con.createQueueSession (false, Session.AUTO_ACKNOWLEDGE); //get the Queue object Queue t = (Queue) ctx.lookup ("myQueue"); //create QueueReceiver QueueReceiver receiver = ses.createReceiver(t);
-
创建一个Topic Session:
QueueSession ses = con.createQueueSession (false, Session.AUTO_ACKNOWLEDGE); //get the Queue object Queue t = (Queue) ctx.lookup ("myQueue"); //create QueueReceiver QueueReceiver receiver = ses.createReceiver(t);
-
-
-
连接对象(Connections)
Connection表示在客户端和JMS系统之间建立的链接(对TCP/IP socket的包装)。Connection可以产生一个或多个Session。跟ConnectionFactory一样,Connection也有两种类型: QueueConnection和TopicConnection。连接对象封装了与JMS提供者之间的虚拟连接,如果我们有一个ConnectionFactory对象,可以使用它来创建一个连接。
Connection connection = connectionFactory.createConnection();
-
会话(Sessions)
Session 是我们对消息进行操作的接口,可以通过session创建生产者,消费者,消息等。Session 提供了事务的功能,如果需要使用session发送/接收多个消息时,可以将这些发送/接收动作放到一个事务中。
我们可以在连接创建完成之后创建session:
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
消息生产者(Message Producers)
消息生产者由Session创建,用于往目的地发送消息。生产者实现MessageProducer接口,我们可以为目的地,队列或话题创建生产者。
MessageProducer producer = session.createProducer(dest); MessageProducer producer = session.createProducer(queue); MessageProducer producer = session.createProducer(topic);
-
消息消费者(Message Consumers)
消息消费者由Session创建,用于接收被发送到Destination的消息。
MessageConsumer consumer = session.createConsumer(dest); MessageConsumer consumer = session.createConsumer(queue); MessageConsumer consumer = session.createConsumer(topic);
-
消息监听者(Message Listeners)
消息监听器。如果注册了消息监听器,一旦消息到达,将自动调用监听器的onMessage方法。EJB中的MDB(Message-Driven Bean)就是一种MessageListener。
常见MQ
现在比较常见的MQ产品主要是ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等。
常见的消息队列有以下几种:
- Apache ActiveMQ: 基于JMS的消息队列,支持多种配置。
- RabbitMQ: 一个可靠的开源消息队列系统,支持多种编程语言和操作系统。
- Apache Kafka: 一种高吞吐量,分布式发布订阅消息系统,可以处理大量数据并保持持久性。
- RocketMQ: 阿里巴巴开源的低延迟,高可靠,可伸缩的消息队列服务。
- ZeroMQ: 一种高性能,高可用的封装层,可以在不同的消息队列上运行。
- NATS: 一种基于云计算的消息队列系统,具有高性能和可扩展性。
- Redis: 一种内存数据库,支持多种数据结构和发布订阅功能,可以实现消息队列的功能。