Java物联网开发(一) —— MQTT协议

基于 Publish/Subscribe 模式的物联网通信协议MQTT

    • 一. 简介
    • 二. 特点
      • QoS消息发布质量介绍
    • 三. 应用场景
    • 四. 原理
      • MQTT协议中的方法
    • 五. MQTT协议数据包结构
      • 固定头
        • 数据包类型
        • 标志位
      • 可变头
        • 协议名称
        • 协议版本
        • 连接标志
        • 保活心跳(Keep Alive)
      • 消息体(Payload)
      • 总结
    • 六. 常见的MQTT Broker 总结

一. 简介

MQTT 是基于 Publish/Subscribe 模式的物联网通信协议,凭借简单易实现、支持 QoS、报文小等特点,占据了物联网协议的半壁江山. MQTT在物联网方向的开发出现的频率非常高, 因此从事该行业方向开发有必要对其进行下系统的学习

官方文档传送门
mqtt官网
mqtt中文网
mqtt 3.1.1 英文文档
mqtt 5.0 英文文档
b站mqtt教程


二. 特点

  • 开放消息协议,简单易实现
  • 发布订阅模式,一对多消息发布
  • 基于TCP/IP网络连接,提供有序,无损,双向连接。
  • 1字节固定报头,2字节心跳报文,最小化传输开销和协议交换,有效减少网络流量。
  • 消息QoS支持,可靠传输保证

注意:

  1. 主流的MQTT是基于TCP连接进行数据推送的,但是同样有基于UDP的版本,叫做MQTT-SN。
  2. 关于Qos, 需要指出的是三种消息发布质量.

QoS消息发布质量介绍

  • QoS0:“至多一次”,消息发布完全依赖底层TCP/IP网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。这一种方式主要用于普通APP的推送,倘若你的智能设备在消息推送时未联网,推送过去没收到,即使再次联网也收不到了。

  • QoS1:“至少一次”,确保消息到达,但消息重复可能会发生

  • QoS2:“只有一次”,确保消息到达一次。在一些要求比较严格的计费系统中,可以使用此级别。在计费系统中,消息重复或丢失会导致不正确的结果。这种最高质量的消息发布服务还可以用于即时通讯类的APP的推送,确保用户收到且只会收到一次。


三. 应用场景

MQTT协议广泛应用于物联网、移动互联网、智能硬件、车联网、电力能源等领域。

  • 物联网M2M通信,物联网大数据采集
  • Android消息推送,WEB消息推送
  • 移动即时消息,例如Facebook Messenger
  • 智能硬件、智能家具、智能电器
  • 车联网通信,电动车站桩采集
  • 智慧城市、远程医疗、远程教育
  • 电力、石油与能源等行业市场

四. 原理

实现MQTT协议需要客户端和服务器端通讯完成,在通讯过程中,MQTT协议中有三种身份:发布者(Publish)、代理(Broker)(服务器)、订阅者(Subscribe)。其中,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者。MQTT传输的消息分为:主题(Topic)和负载(payload)两部分:

  • Topic,可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(payload);
  • payload,可以理解为消息的内容,是指订阅者具体要使用的内容。
    在这里插入图片描述

网络链接

由底层传输协议提供给MQTT使用的架构

  • 底层传输协议能够连通客户端和服务端
  • 底层传输协议提供有序的,可靠的,双向字节流

应用消息

指通过MQTT在网络中传输的应用程序数据。 当应用消息通过MQTT传输的时候会附加上质量服务(QoS)和话题名称。

客户端

一个使用MQTT协议的应用程序或者设备,它总是建立到服务器的网络连接。客户端可以

  • 发布, 订阅消息
  • 退订和删除应用程序消息
  • 断开和服务器连接

服务端

MQTT服务器被称为"消息代理"(Broker),它可以是一个应用程序或一台设备。是位于消息发布者和订阅者之间,它可以:

  • 接收客户端消息
  • 处理客户端订阅和退订请求
  • 向订阅的客户端转发消息

主题名

指附着于应用消息的标签,服务端用它来匹配订阅。服务端给每个匹配到的客户端发送一份应用信息的拷贝。

主题过滤器

包含在订阅里的一个表达式,用来表示一个或多个感兴趣的话题。话题过滤器可以包含通配符。

MQTT控制包

通过网络连接发送的包含一定信息的数据包。MQTT规范定义了14个不同类型的控制包,其中一个(PUBLISH包)用来传输应用信息。

发布/订阅、主题、会话

  • 订阅: 一个订阅由一个话题过滤器和一个最大的QoS组成。一个订阅只能关联一个会话。一个会话可以包含多个订阅。每个订阅都有不同的主题/话题过滤器。

  • 会话: 一个有状态的客户端和服务端的交互。有些会话的存续依赖于网络连接,而其他则可以跨越一个客户端和服务端之间的多个连续的网络连接。

  • 主题(Topic): 以 ‘/’ 为分隔符区分不同的层级
    包含通配符 ‘+’ 或 ‘#’ 的主题又称为 主题过滤器(Topic Filters);
    不含通配符的称为主题名(Topic Names) 例如:

    chat/room/1 
    sensor/10/temperature 
    sensor/+/temperature 
    $SYS/broker/metrics/packets/received 
    $SYS/broker/metrics/#'+': 表示通配一个层级,例如a/+,匹配a/x, a/y 
    '#': 表示通配多个层级(必须在末尾),例如a/#,匹配a/x, a/b/c/d 
    注意: 发布时不可以订阅通配的主题. 而只有订阅才可以
    

至此可以初步总结下mqtt工作流程

  1. 客户端发送连接请求到服务器, 在服务器确认(认证)后则建立连接.
  2. 之后客户端则可以将消息以主题的形式 发布 到服务器 broker
  3. 然后其他客户端则可以 订阅 相关主题, 接收对应主题的信息(依照订阅发布模型)
  4. 同时消息服务器broker 会接收客户端的心跳请求并返回心跳响应, 用于监测客户端,服务器健康状况.

MQTT协议中的方法

MQTT协议中定义了一些方法(也被称为动作),来于表示对确定资源所进行操作。
这个资源可以代表预先存在的数据或动态生成数据,这取决于服务器的实现。
通常来说,资源指服务器上的文件或输出。主要方法有:

  • CONNECT:客户端连接到服务器
  • CONNACK:连接确认
  • PUBLISH:发布消息
  • PUBACK:发布确认
  • PUBREC:发布的消息已接收
  • PUBREL:发布的消息已释放
  • PUBCOMP:发布完成
  • SUBSCRIBE:订阅请求
  • SUBACK:订阅确认
  • UNSUBSCRIBE:取消订阅
  • UNSUBACK:取消订阅确认
  • PINGREQ:客户端发送心跳
  • PINGRESP:服务端心跳响应
  • DISCONNECT:断开连接
  • AUTH:认证

五. MQTT协议数据包结构

在MQTT协议中,一个MQTT数据包由:固定头(Fixed header)、可变头(Variable header)、消息体(payload)三部分构成。MQTT数据包结构如下:

在这里插入图片描述

  • 固定头(Fixed header)。存在于所有MQTT数据包中,表示数据包类型及数据包的分组类标识,如连接,发布,订阅,心跳等。其中固定头是必须的,所有类型的MQTT协议中,都必须包含固定头。
  • 可变头(Variable header)。存在于部分MQTT数据包中,数据包类型决定了可变头是否存在及其具体内容。可变头部不是可选的意思,而是指这部分在有些协议类型中存在,在有些协议中不存在。
  • 消息体(Payload)。存在于部分MQTT数据包中,表示客户端收到的具体内容。 与可变头一样,在有些协议类型中有消息内容,有些协议类型中没有消息内容

固定头

存在于所有MQTT数据包中, 头包含两部分内容: 报文头(byte1)和剩余消息报文长度(从byte2开始,长度为1-4字节)

  • 报文头包括 Mqtt控制报文类型 和 每种Mqtt控制报文类型的具体标识(标志位)
  • 剩余消息报文长度不包含用来编码剩余长度的字节。是当前包中剩余内容长度的字节数,包括变量头和有效负载中的数据

在这里插入图片描述

数据包类型

位置:第一个字节(Byte 1) 中的7-4个bit位(Bit[7-4]),表示4位无符号值
通过第一个字节的高4位确定消息报文的类型,4个bit位能确定16种类型,其中0000和1111是保留字段。

MQTT消息报文类型如下:

报文类型 字段值 数据方向 描述 7-4bit值
保留 0 禁用 保留字段 0000
CONNECT 1 Client—>Server 客户端连接到服务器 0001
CONNACK 2 Server —> Client 连接确认 0010
PUBLISH 3 Client—>Server 发布消息 0011
PUBACK 4 Client—>Server 发布确认(QoS1) 0100
PUBREC 5 Client—>Server 消息已接收(Qos2 第一阶段) 0101
PUBREL(发布的消息已释放) 6 Client—>Server 消息释放(QoS2 第二阶段) 0110
PUBCOMP 7 Client—>Server 发布结束(QoS2 第三阶段) 0111
SUBSCRIBE 8 Client—>Server 客户端订阅请求 1000
SUBACK 9 Server —> Client 服务端订阅确认 1001
UNSUBACRIBE 10 Client—>Server 客户端取消订阅 1010
UNSUBACK 11 Server —> Client 服务端取消订阅确认 1011
PINGREQ 12 Client—>Server 客户端发送心跳 1100
PINGRESP 13 Server —> Client 服务端回复心跳 1101
DISCONNECT 14 Client—>Server 客户端断开连接请求 1110
AUTH(V5.0使用) 15 Client—>Server 认证数据交换 1111

标志位

位置:第一个字节中的0-3个bit位(Bit[3-0])。意思是字节位Bit[3-0]用作报文的标识。
首字节的低4位(bit3~bit0)用来表示某些报文类型的控制字段,实际上只有少数报文类型有控制位,如下图:

报文类型 固定头标记 Bit 3 Bit 2 Bit 1 Bit 0
CONNECT 保留 0 0 0 0
CONNACK 保留 0 0 0 0
PUBLISH 保留 DUP Qos QoS RETAIN
PUBACK 保留 0 0 0 0
PUBREC 保留 0 0 0 0
PUBREL(发布的消息已释放) 保留 0 0 0 0
PUBCOMP 保留 0 0 0 0
SUBSCRIBE 保留 0 0 0 0
SUBACK 保留 0 0 0 0
UNSUBACRIBE 保留 0 0 0 0
UNSUBACK 保留 0 0 0 0
PINGREQ 保留 0 0 0 0
PINGRESP 保留 0 0 0 0
DISCONNECT 保留 0 0 0 0
  1. Bit[3]为DUP字段,如果该值为1,表明这个数据包是一条重复的消息;否则该数据包就是第一次发布的消息。
  2. Bit[2-1]为Qos字段:
    如果Bit 1和Bit 2都为0->(00),表示QoS 0:至多一次(如图1);
    如果Bit 1为1->(01),表示QoS 1:至少一次(如图2);
    如果Bit 2为1->(10),表示QoS 2:只有一次(如图3);
    如果同时将Bit 1和Bit 2都设置成1,那么客户端或服务器认为这是一条非法的消息,会关闭当前连接
  3. Bit0 为 RETAIN字段, 表示该消息是否是保留消息 如果该值为1 表示该数据包就是保留消息
    服务器 broker接收到此消息后, 除了转发给订阅者外, 还会将此消息保存.
    如果之后有新的订阅者之后, 则会将消息推送给新的订阅者然后释放

图1
在这里插入图片描述
图2
在这里插入图片描述
图3
在这里插入图片描述

注意:

目前Bit[3-0]只在PUBLISH协议中使用有效,并且表中指明了是MQTT 3.1.1版本(如上表)。
对于其它MQTT协议版本,内容可能不同。所有固定头标记为"保留"的协议类型,
Bit[3-0]必须保持与表中保持一致,如SUBSCRIBE协议,其Bit 1必须为1。
如果接收方接收到非法的消息,会强行关闭当前连接。

下图则是MQTT 5.0 版本标志位
在这里插入图片描述

可变头

  • 某些类型的MQTT控制包包含一个可变包头结构。位于固定包头和载荷之间。可变包头的内容取决于包的类型。可变包头中的包标识符字段在大多类型的包中比较常见。
  • CONNECT数据包的变量头按以下顺序包含四个字段:协议名称,协议版本,连接标志和保活心跳。

在这里插入图片描述

协议名称

  • 协议名称是UTF-8编码的字符串,代表协议名称“ MQTT”,大写,如下所示。字符串,其偏移量和长度将不会在MQTT规范的未来版本中更改。
  • 如果协议名称不正确,服务器可能会断开客户端的连接,或者可能会继续按照其他规范处理CONNECT数据包。
    在这里插入图片描述

协议版本

  • 8位无符号值表示客户端的版本等级。3.1.1版本的协议等级是4,MQTT v5.0的协议版本字段为5(0x05)
  • 如果服务器不支持协议级别 [MQTT-3.1.2-2] ,则服务器必须使用CONNACK返回码0x01(不可接受的协议级别)响应CONNECT数据包,然后断开客户端的连接。
    在这里插入图片描述

连接标志

  • 连接标志字节包含许多参数,这些参数指定MQTT连接的行为。它还指示有效载荷中字段的存在或不存在。
  • 服务器必须验证CONNECT控制包中的保留标志是否设置为零,如果不为零,则断开与客户端的连接 [MQTT-3.1.2-3]。
    在这里插入图片描述

保活心跳(Keep Alive)

Keep Alive是以秒为单位的时间间隔。用2字节表示,它指的是客户端从发送完成一个控制包到开始发送下一个的最大时间间隔。客户端有责任确保两个控制包发送的间隔不能超过Keep Alive的值。如果没有其他控制包可发,客户端必须发送PINGREQ包

心跳的作用:

  • MQTT客户端向服务器发起CONNECT请求时,通过KeepAlive参数设置保活周期。
  • 如果Keep Alive的值非0,而且服务端在一个半Keep Alive的周期内没有收到客户端的控制包,服务端必须作为网络故障断开网络连接
  • Keep Alive的值为0,就关闭了维持的机制。这意味着,在这种情况下,服务端不会断开静默的客户端。

更多可变头介绍可见mqtt官网 3.1.2.1 -3.1.2.11

消息体(Payload)

有些报文类型是包含Payload的,Payload意思是消息载体的意思. 如PUBLISH的Payload就是指消息内容(应用程序发布的消息内容)。而CONNECT的Payload则包含ClientIdentifier,Will Topic,Will Message,Username,Password等信息。包含payload的报文类型如下

在这里插入图片描述

总结

  • 我们介绍了MQTT协议的消息格式,MQTT消息格式包含Fixed Header, Variable Header和Payload。因为MQTT消息格式非常精简,所以可以高效的传输数据。
  • Fixed Header中包含首字节,高4位用来表示报文类型,低4位用于类型控制。目前只有PUBLISH使用了类型控制字段。其它控制字段被保留并且必须与协议定义保持一致。
  • Fixed Header同时包含Remaining Length,这是剩余消息长度,最大长度为4字节,理论上一条MQTT最大可以传输256MB数据。Remaining Length=Variable Header+Payload长度。
  • Variable Header是可变头部,有些报文类型中需要包含可变头部,可变头部根据报文类型不同而不同。比如Packet Identifier在发布,订阅/取消订阅等报文中都使用到。
  • Payload是消息内容,也只在某些报文类型中出现,其内容和格式也根据报文类型不同而不同。

六. 常见的MQTT Broker 总结

到目前为止,比较流行的 MQTT Broker 有几个:

  1. Eclipse Mosquitto: https://github.com/eclipse/mosquitto
    使用 C 语言实现的 MQTT Broker。Eclipse 组织还还包含了大量的 MQTT 客户端项目:https://www.eclipse.org/paho/#
  2. EMQX: https://github.com/emqx/emqx
    使用 Erlang 语言开发的 MQTT Broker,支持许多其他 IoT 协议比如 CoAPLwM2M 等
  3. Mosca: https://github.com/mcollina/mosca
    使用 Node.JS 开发的 MQTT Broker,简单易用。
  4. VerneMQ: https://github.com/vernemq/vernemq
    同样使用 Erlang 开发的 MQTT Broker

从支持 MQTT5.0、稳定性、扩展性、集群能力等方面考虑,EMQX 的表现应该是最好的。


而下面, 我们将介绍万物互联消息引擎 —— EMQ系列消息引擎
使用基于 Erlang/OTP 平台开发的开源物联网 MQTT 消息服务器 —— EMQ X (Erlang/Enterprise/Elastic MQTT Broker) 来实现基于mqtt协议特性以及更多拓展特性的消息队列

Published by

风君子

独自遨游何稽首 揭天掀地慰生平