AMQP协议学习

Published on
Authors

最近在阅读AMQP协议。AMQP协议算是消息队列里无法绕开的一个协议,通过阅读该协议来学习消息队列以及自有协议设计。该协议的阅读体验非常好,协议本身没有过于复杂,会解释各个地方的设计思路。

AMQP协议中的各个概念和组件

AMQP的全称:Advanced Message Queuing Protocol

AMQP所覆盖的内容包含了网络协议以及服务端服务

  • 一套被称作 “高级消息队列协议模型(AMQ Model)”的消息能力定义。该模型涵盖了Broker服务中用于路由和存储消息的组件,以及把这些组件连在一起的规则。
  • 一个网络层协议,AMQP。能够让客户端程序与实现了AMQ Model的服务端进行通信。

AMQP更像是一个把东西连在一起的语言,而不是一个系统。其设计目标是:让服务端可通过协议编程。

AMQP协议是一个二进制协议,具有一些现代特性:多通道(multi-channel),可协商(negotiated),异步,安全,便携,中立,高效的。分成两层:

AMQP%2088f8cadffe93436a8b528d185f970a1d/Untitled.png

**功能层:**定义了一系列的命令

**传输层:**携带了从应用 → 服务端的方法,用于处理多路复用、分帧、编码、心跳、data-representation、错误处理。

这样分层之后,可以把传输层替换为其它传输协议,而不需要修改功能层。同样,也可以使用同样的传输层,而实现不同的上层协议。可能RabbitMQ也是因为类似的原因,能够较容易的支持MQTT、STOMP等协议的吧。

AMQ Model的设计是由以下需求驱动的:

  • 确保符合标准的实现之间的互操作性。
  • 提供清晰且直接的控制QoS
  • 保持一致和明确的命名
  • 通过协议,能够修改服务端的各种配置
  • 使用可以轻松映射到应用程序级API的命令符号。
  • 清晰,每个操作只能做一件事。

AMQP传输层是由以下需求驱动的

  • 紧凑。能够快速封包和解包
  • 可以携带任意大小的消息,没有明显的限制
  • 同一个连接可以承载多个通道
  • 长时间存活,没有显著的限制
  • 允许异步命令流水线
  • 容易扩展。易于处理新需求、或者变更需求
  • 向前兼容
  • 使用强大的断言模型,可修复
  • 对编程语言保持中立
  • 适合代码生成过程

在设计过程中,希望支持不同的消息架构:

  • 先存后发。有多个writer,只有一个reader
  • 分散工作负载。多个writer,多个reader
  • 发布订阅,多个writer,多个reader
  • 基于内容的路由,多个writer,多个reader
  • 队列文件传输,多个writer,多个reader
  • 两个节点之间,点对点连接
  • 市场数据(Market data)分发。多个源,多个reader

AMQ Model

主要包含了三个主要的组件:

  • exchange (交换器):从Publisher程序中收取消息,并把这些消息根据一些规则路由到消息队列(Message Queue)中
  • message queue (消息队列):存储消息。直到消息被安全的投递给了消费者。
  • binding :定义了 message queueexchange 之间的关系,提供了消息路由的规则。

AMQ Model的整体架构

AMQP%2088f8cadffe93436a8b528d185f970a1d/Untitled%201.png

可以把AMQP的架构理解为一个邮件服务:

  • 一个AMQP消息类似于一封邮件信息
  • 消息队列类似于一个邮箱(mailbox)
  • 消费者类似一个邮件客户端,能够拉取和删除邮件。
  • 交换器类似一个MTA(邮件服务器)。检查邮件,基于邮件里的路由信息、路由表,来决定如何把邮件发送到一个或多个邮箱里。
  • Routing Key类似于邮件中的 To:Cc:Bcc: 的地址。不包含服务端信息。
  • 每一个交换器实例,类似于各个MTA进程。用于处理不同子域名的邮件,或者特定类型的邮件。
  • Binding 类似于MTA中的路由表。

在AMQP里,生产者直接把消息发到服务端,然后服务端把这些消息路由到邮箱中。消费者直接从邮箱里取消息。但在AMQP之前的很多中间件中,发布者是直接把消息发到对应的邮箱里(存储发布队列),或者直接发到邮件列表里(类似topic订阅)。

这里的区别在于,用户可以控制消息队列和交换器的保定规则,而不是内部的代码。这样就可以做很多有趣的事情。比如定义一个这样的规则:”把所有包含这样和这样Header的消息,都复制一份到这个消息队列中。“

而这一点也是我认为AMQP和其他一些消息队列最重要的差异。

生命周期

消息的生命周期

AMQP%2088f8cadffe93436a8b528d185f970a1d/Untitled%202.png

  1. **消息由生产者产生。**生产者把内容放到消息里,并设置一些属性,以及消息的路由。然后生产者把消息发给服务端
  2. 服务端收到消息,交换器(大部分情况)把消息路由到若干个该服务器上的消息队列中。如果这个消息找不到路由,则会丢弃或者退回给生产者(生产者可自行决定)
  3. 一条消息可以存在于许多消息队列中。 服务器可以通过复制消息,引用计数等方式来实现。这不会影响互操作性。 但是,将一条消息路由到多个消息队列时,每个消息队列上的消息都是相同的。 没有可以区分各种副本的唯一标识符。
  4. 消息到达消息队列。消息队列会立即尝试通过AMQP将其传递给消费者。 如果做不到,消息队列将消息存储(按生产者的要求存储在内存中或磁盘上),并等待消费者准备就绪。 如果没有消费者,则消息队列可以通过AMQP将消息返回给生产者(同样,如果生产者要求这样做)。
  5. 当消息队列可以将消息传递给消费者时,它将消息从其内部缓冲区中删除。以立即删除,也可以在使用者确认其已成功处理消息之后删除(ack)。 由消费者选择“确认”消息的方式和时间。 消费者也可以拒绝消息(否定确认)。
  6. 生产者发消息与消费者确认,被分组成一个事务。当一个应用同时扮演多个角色时:发消息,发ack,commit或者回滚事务。消息从服务端投递给消费者这个过程不是事务的。消费者对消息进行确认就够了。

在这个过程中,生产者只能把所有消息发到一个单点(交换器),而不能直接把消息发到某个消息队列(message-queue)中。

交换器(exchange)的生命周期

每个AMQP服务端都会自己创建一些交换器,这些不能被销毁。AMQP程序也可以创建其自己的交换器。AMQP并不使用 create 这个方法,而是使用 declare 方法来表示:如果不存在,则创建,存在了则继续。程序可以创建交换器用于私有使用,并在任务完成后销毁它们。虽然AMQP提供了销毁交换器的方法,但一般来讲程序不需要销户它。

队列(queue)的生命周期

队列分为两种,

  • 持久化消息队列:由很多消费者共享。当消费者都退出后,队列依然存在,并会继续收集消息。
  • **临时消息队列:**临时消息队列对于消费者是私有和绑定的。当消费者断开连接,则消息队列被删除。

AMQP%2088f8cadffe93436a8b528d185f970a1d/Untitled%203.png

绑定(Bindings)

绑定是交换器和消息队列之间的关系,告诉交换器如何路有消息。

Queue.Bind <queue> TO <exchange> WHERE <condition>

绑定命令的伪代码

几个经典的使用案例:共享队列、私有的回复队列、发布-订阅

构造一个共享队列

    Queue.Declare queue=app.svc01 // 声明一个叫做 app.svc01 的队列
    
    // Comsumer
    Basic.Consume queue=app.svc01 // 消费者消费该队列
    
    // Producer
    Basic.Publish routing-key=app.svc01 // 生产者发布消息。routingKey为队列名称

构造一个共享队列

AMQP%2088f8cadffe93436a8b528d185f970a1d/Untitled%204.png

构造一个私有回复队列

一般来讲,回复队列是私有的、临时的、由服务端命名、只有一个消费者。(该例子没有直接使用AMQP协议中的列子,而是使用了RabbitMQ的列子)

    Queue.Declare queue=rpc_queue // 调用的队列
    
    // Server
    Basic.Consume queue=rpc_queue
    
    // Client
    Queue.Declare queue=<empty> exclusive=TRUE
    S:Queue.Declare-Ok queue=amq.gen-X... // AMQP服务端告诉队列名称
    Basic.Publish queue=rpc_queue reply_to=amq_gen-X... // 客户端向服务端发送请求
    
    // Server
    handleMessage()
    // 服务端处理好消息后,向消息列的reply-to字段中的队列发送响应
    Basic.Publish exchange=<empty> routing-key={message.replay_to} 

AMQP%2088f8cadffe93436a8b528d185f970a1d/Untitled%205.png

构造一个发布-订阅队列

在传统的中间件中,术语 subscription 含糊不清。至少包含两个概念:匹配消息的条件集,和一个临时队列存放匹配的消息。AMQP把这两部分拆成:bindingmessage queus。在AMQP中,并没有一个实体叫做 subscription

AMQP的发布订阅模型为:

  • 给一个消费者保留消息。(一些场景下是多个消费者)
  • 从多个源收集消息,比如匹配Topic、消息的字段、或者内容等方式

订阅队列与命名队列或回复队列之间的关键区别在于,订阅队列名称与路由目的无关,并且路由是根据抽象的匹配条件完成的,而不是路由键字段的一对一匹配。

    // Consumer
    Queue.Declare queue=<empty> exclusive=TRUE
    // 这里是使用服务端下发的队列名称,并设置为独占。
    // 也可以使用约定的队列名称。这样就相当于把发布-订阅模型与共享队列组合使用了
    S:Queue.Declare-Ok queue=tmp.2
    Queue.Bind queue=tmp.2 TO exchange=amq.topic WHERE routing-key=*.orange.*
    Basic.Consume queue=tmp.2
    
    // Producer
    Basic.Publish exchange=amq.topic routing-key=quick.orange.rabbit

AMQP%2088f8cadffe93436a8b528d185f970a1d/Untitled%206.png

AMQP命令架构

中间件很复杂,所以在设计协议结构的挑战是要驯服其复杂性。方法是基于类来建立传统API模型。类中包含方法,并定义了方法明确应该做什么。

有两种不同的方式进行对话:

  • 同步请求-响应。一个节点发送请求,另一个阶段发送响应。适用于性能不重要的方法。发送同步请求时,该节点直到收到回复后,才能发送下一个请求
  • 异步通知。一个节点发送数据,但是不期待回复。一般用于性能很重要的地方。异步请求会尽可能快的发送消息,不等待确认。只在需要的时候在更上层(比如消费者层)实现限流等功能。AMQP中可以没有确认,要么成功,要么就会收到关闭Channel或者连接的异常。如果需要明确的追踪成功或者失败,那么应该使用事务。

AMQP中的类

Connection类

AMQP是一个长连接协议。Connection被设计为长期使用的,可以携带多个cannel。Connection的生命周期是:

  1. 客户端打开到服务端的TCP/IP连接,发送protocol头。这是唯一的,客户端发送的数据不能被解析为方法的。
  2. 服务端返回其协议版本、属性(比如支持的安全机制列表)。 the Start method
  3. 客户端选择安全机制 Start-Ok
  4. 服务端开始认证过程, 它使用SASL的质询-响应模型(challenge-response model). 它向客户端发送一个质询 Secure
  5. 客户端向服务端发送一个认证响应 Secure-Ok 。比如,如果使用 plain 认证机制,则响应会包含登录名和密码
  6. 客户端重复质询Secure或转到协商步骤,发送一系列参数,如最大帧大小 Tune
  7. 客户端接受,或者调低这些参数 Tune-Ok
  8. 客户端正式打开连接,并选择一个Vhost Open
  9. 服务端确认VHost有效 Open-Ok
  10. 客户端可以按照预期使用连接
  11. 当一个节点打算结束连接 Close
  12. 另一个节点需要结束握手 Close-Ok
  13. 服务端和客户端关闭Socket连接。

如果在发送或者收到 Open 或者 Open-Ok 之前,某一个节点发现了一个错误,则必须直接关闭Socket,且不发送任何数据。

Channel类

AMQP是一个多通道协议。Channel提供了一种方式,在比较重的TCP/IP连接上建立多个轻量级的连接。这会让协议更加的防火墙友好,因为端口使用是可预知的。它也意味着流量调整和其他QoS特性也很容易支持。

Channels相互是独立的,可以同步执行不同的功能。可用带宽会在当前活动之间共享.

这里期望也鼓励多线程客户端程序应该使用 每个线程一个channel 的模型。不过,一个客户端在一个或多个AMQP服务端上打开多个连接也是可以的。

Channel的生命周期为:

  1. 客户端打开一个新通道 Open
  2. 服务端确认新通道准备就绪 Open-Ok
  3. 客户端和服务端按预期来使用通道.
  4. 一个节点关闭了通道 Close
  5. 另一个节点对通道关闭进行握手 Close-Ok

Exchange类

Exchange类能够让应用操作服务端的交换器。这个类能够让程序自己设置路由,而不是通过某些配置。注意:大部分程序并不需要这个级别的复杂度,过去的中间件也不只支持这个语义。

Exchange的生命周期为:

  1. 客户端让服务端确保该exchange存在 Declare 。客户端可以细化为:“如果交换器不存在则进行创建” 或 “如果交换器不存在,警告我,不需要创建”
  2. 客户端向Exchange发消息
  3. 客户端也可以选择删掉Exchange Delete

Queue类

该类用于让程序管理服务端上的消息队列。几乎所有的消费者应用都是基本步骤,至少要验证使用的消息队列是否存在。

一个持久化消息队列的生命周期非常简单

  1. 客户端断言这个消息队列存在 Declare(设置 passive 参数)
  2. 服务端确认消息队列存在 Declare-Ok
  3. 客户端消息队列中读消息

一个临时消息队列的生命周期会更有趣些:

  1. 客户端创建消息队列 Declare(不提供队列名称,服务器会分配一个名称)。服务端确认 Declare-Ok
  2. 客户端在消息队列上启动一个消费者
  3. 客户端取消消费,可以是显示取消,也可以是通过关闭通道或者连接连接隐式取消的
  4. 当最后一个消费者从消息队列中消失的时候,在过了礼貌性超时后,服务端会删除消息队列

AMQP实现了Topic订阅的分发模型。这可以让订阅在合作的订阅者间进行负载均衡。涉及到额外的绑定阶段的生命周期:

  1. 客户端创建一个队列Declare,服务端确认Declare-Ok
  2. 客户端绑定消息队列到一个topic exchange上Bind,服务端确认Bind-Ok
  3. 客户端像之前一样使用消息队列。

Basic类

Basic实现本规范中描述的消息功能。支持如下语义:

  • 从客户端→服务端发消息。异步Publish
  • 开始或者停止消费ConsumeCancel
  • 从服务端到客户端发消息。异步DeliverReturn
  • 确认消息AckReject
  • 同步的从消息队列中读取消息Get

事务类:

AMQP支持两种类型的事务:

  1. 自动事务。每个发布的消息和应答都处理为独立事务.
  2. 服务端本地事务:服务器会缓存发布的消息和应答,并会根据需要由client来提交它们.

Transaction 类(“tx”) 使应用程序可访问第二种类型,即服务器事务。这个类的语义是:

  1. 应用程序要求服务端事务,在需要的每个channel里Select
  2. 应用程序做一些工作PublishAck
  3. 应用程序提交或回滚工作 CommitRoll-back
  4. 应用程序正常工作,循环往复。

事务包含发布消息和ack,不包含分发。所以,回滚并不能重入队列或者重新分发任何消息。客户端有权在事务中确认这些消息。

功能说明

AMQP的功能描述,一定程度上也是RabbitMQ的功能描述,不过RabbitMQ基于AMQP做了一些扩展

消息和内容

消息会携带一些属性,以及具体内容(二进制数据)

消息是可被持久化的。持久化消息是可以安全的存在硬盘上的,即使发生了验证的网络错误、服务端崩溃溢出等情况,也可以确保被投递。

消息可以有优先级。同一个队列中,高优先级的消息会比低优先级的消息先被发送。当消息需要被丢弃时(比如服务端内存不足等),将会优先丢弃低优先级消息

服务端一定不能修改消息的内容。但服务端可能会在消息头上添加一些属性,但一定不会移除或者修改已经存在的属性。

虚拟主机(VHost)

虚拟主机是服务端的一个数据分区。在多租户使用是,可以方便进行管理。

虚拟主机有自己的命名空间、交换器、消息队列等等。所有连接,只可能和一个虚拟主机建立

交换器(Exchange)

交换器是一个虚拟主机内的消息路由Agent。用于处理消息的路由信息(一般是Routing-Key),然后将其发送到消息队列或者内部服务中。交换器可能是持久化的、临时的、自动删除的。交换器把消息路由到消息队列时可以是并行的。这会创建一个消息的多个实例。

Direct 交换器

AMQP%2088f8cadffe93436a8b528d185f970a1d/Untitled%207.png

  1. 一个消息队列使用RoutingKey K 绑定到交换器
  2. 生产者向交换器发送RoutingKey为R的消息
  3. K=R时,消息被转发到该消息队列中

Fanout 交换器

AMQP%2088f8cadffe93436a8b528d185f970a1d/Untitled%208.png

  1. 一个消息队列没有使用任何参数绑定交换器
  2. 生产者向交换器发了一条消息
  3. 这个消息无条件的发送到该消息队列

Topic 交换器

  1. 消息队列使用路由规则 P 绑定到交换器
  2. 生产者使用RoutingKey R 发送消息到交换器
  3. 如果 R 能够匹配 P,则把消息发到该消息队列。

RoutingKey必须由若干个被点.分隔的单词组成。每个单词只能包含字母和数字。其中 * 匹配一个单词,# 匹配0个或者多个单词。比如 *.stock.# 匹配 usd.stockeur.stock.db 但是不匹配 stock.nasdaq

Headers 交换器

  1. 消息队列使用Header的参数表来绑定。不适用RoutingKey
  2. 生产者向交换器发送消息,Header中包含了指定的键值对
  3. 如果匹配,则传给消息队列。

比如:

    format=json,type=log,x-match=all
    format=line,type=log,x-match=any

如果 x-matchall,则必须都匹配才行。如果x-matchany,则有任意一个header匹配即可。

系统交换器

这个略过吧。

AMQP的传输架构

解释了命令如何映射到传输层的。想要设计自有协议时,可以参考一下它的设计思路,以及中间需要注意的问题。

AMQP是一个二进制协议。有不同类型的帧frame 构成。帧会携带协议的方法以及其他信息。所有的帧都有相同的基本结构,即:帧头,payload,帧尾。payload格式取决于帧的类型。

我们假设使用的是面向流的可靠网络层(比如TCP/IP)。单个Socket连接上可以有多个独立的控制线程,也就是通道Channel 。不同的通道共享一个连接,每个通道上的帧都是按严格的顺序排列,这样可以用一个状态机来解析协议。

传输层(wire-level)的格式被设计为扩展性强、且足够通用,可以用于任何更高层的协议(不仅仅是AMQP)。我们假设AMQP是会被扩展、优化的。

主要涉及这几个部分:数据类型、协议协商、分帧方式、帧细节、方法帧、内容帧、心跳帧、错误处理、通道与连接的关闭

数据类型

AMQP的数据类型用于方法帧中,他们有

  • 整数(1-8个字节),表示大小,数量,范围等。全都是无符号整数
  • Bits。用于表示为开/关值,会被封包为字节。
  • 短字符串。用于存放短的文本属性。最多255字节,解析时不用担心缓冲区溢出。
  • 长字符串:用于存放二进制数据块
  • 字段表(Field Table),用于存放键值对

协议协商

客户端连接时,和服务端协商可接受的配置。当两个节点达成一致后,连接才能继续使用。通过协商,可以让我们断言假设和前提条件。主要协商这几方面的信息

  • 实现的协议和版本。服务端可能会在同一端口提供多种协议的支持
  • 加密参数和验证
  • 最大帧尺寸、Channel的数量、某些操作的限制。

如果协商达成一致,双方会根据协商预分配缓冲区避免死锁。传入的帧如果满足协商条件,则认为其实安全的。如果超过了,那么另一方必须断开连接。

分帧方式

TCP/IP是流协议。没有内置的分帧机制。现有的协议一般有这几种方式进行分帧:

  • 每个连接只发送一个帧。简单,但是慢。
  • 在流中加入分隔符来分帧。简单,但是解析较慢(因为需要不断的读取,去寻找分隔符)
  • 计算帧的尺寸,并在每个帧之前发送尺寸。简单且快速。也是AMQP的选择

帧细节

帧头包括:帧类型、通道、尺寸。

帧尾包含错误检测信息。

AMQP%2088f8cadffe93436a8b528d185f970a1d/Untitled%209.png

处理一个帧的步骤:

  1. 读帧头,检查帧类型和Channel
  2. 根据帧类型,读取payload并处理
  3. 读帧尾校验

在实现时,性能很重要的时候,我们会使用 read-ahead buffering 或者 gathering reads 去避免读帧时进行三次系统调用。

方法帧

AMQP%2088f8cadffe93436a8b528d185f970a1d/Untitled%2010.png

处理方式:

  1. 读取方法帧的payload
  2. 解包为结构
  3. 检查方法在当前上下文中是否允许
  4. 检查参数是否有效
  5. 执行方法。

方法帧是由AMQP数据字段组成。编码代码直接从协议规范中生成,速度非常快。

内容帧

内容是端到端直接发送的应用数据。内容由一系列属性和二进制数据组成。其中一系列的属性组成了 ”内容帧的帧头“。而二进制数据,可以是任意大小,它可能被拆分成多个块发送,每个块是一个 content-body帧

一些方法(比如 Basic.PublishBasic.Deliver)是会携带内容的。一个内容帧的帧头如下结构:

AMQP%2088f8cadffe93436a8b528d185f970a1d/Untitled%2011.png

这里把 content-body 作为单独的帧,这样就可以支持Zero-copy技术,这部分内容就不需要被编码。把内容属性放到自己的帧里,这样收件人就可以选择性的丢弃不想处理的内容。

通道与连接的关闭

对于客户端,只要发送了 Open 就认为连接和通道是打开的。对于服务端则是 Open-Ok 。如果一个节点想要关闭通道和连接必须要进行握手。

如果突然或者意外关闭,没办法立刻被检测到,可能会导致丢失返回值。所以需要在关闭之前进行握手。在一个节点发送 Close 后,另一个节点必须发送 Close-Ok 来回复。然后双方可以关闭通道或者连接。如果节点忽略了 Close 操作,当双方同时发送 Close 时,可能会导致死锁。