MQTT.js入门使用的方法是什么

寻技术 JS脚本 2023年07月18日 90

这篇文章主要介绍“MQTT.js入门使用的方法是什么”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“MQTT.js入门使用的方法是什么”文章能帮助大家解决问题。

    简介

    MQTT.js 是一个开源的 MQTT 协议的客户端库,使用 JavaScript 编写,主要用于 Node.js 和 浏览器环境中。是目前 JavaScript 生态中使用最为广泛的 MQTT 客户端库。

    MQTT 是一种基于发布/订阅模式的轻量级物联网消息传输协议,可以用极少的代码和带宽为联网设备提供实时可靠的消息服务,它广泛应用于物联网、移动互联网、智能硬件、车联网、电力能源等行业。

    由于 JavaScript 单线程特性,MQTT.js 是全异步 MQTT 客户端,MQTT.js 支持 MQTT/TCP、MQTT/TLS、MQTT/WebSocket,在不同运行环境支持的度如下:

    • 浏览器环境:MQTT over WebSocket(包括微信小程序、支付宝小程序等定制浏览器环境)

    • Node.js 环境:MQTT、MQTT over WebSocket

    注意:不同环境里除了少部分连接参数不同,其他 API 均是相同的。且在 MQTT.js v3.0.0 及以上版本后,已经完整支持到 MQTT 5.0。

    安装

    使用 npm 或 yarn 安装

    npm install mqtt --save
    
    # 或使用 yarn
    
    yarn add mqtt

    注意:如果您的 Node 环境是 v12 或 v14 及以上版本,请使用 MQTT.js 4.0.0 及以上版本

    使用 CDN 安装

    浏览器环境中,我们还可以使用 CDN 的方式引入 MQTT.js。MQTT.js 的 bundle 包通过 http://unpkg.com 管理,我们可以直接添加 unpkg.com/mqtt/dist/mqtt.min.js 来进行使用。

    <script src="https://unpkg.com/mqtt/dist/mqtt.min.js"></script>
    <script>
      // 将在全局初始化一个 mqtt 变量
      console.log(mqtt)
    </script>

    全局安装

    除了上述的安装方式外,MQTT.js 还提供了全局安装的方式,使用命令行工具来完成 MQTT 的连接、发布和订阅等。

    npm install mqtt -g

    我们会在下文中的一些使用教程中详细描述如何使用 MQTT.js 的命令行工具。

    使用

    本文将使用 EMQX Cloud 提供的 免费公共 MQTT 服务器 作为本次测试的 MQTT 服务器地址,服务器接入信息如下:

    • Broker: broker.emqx.io

    • TCP Port: 1883

    • SSL/TLS Port: 8883

    更多详情请访问 EMQX Cloud 官网,或查看 EMQX Cloud 文档。

    简单例子

    我们简单编写一段代码实现连接到 EMQX Cloud 并完成订阅主题、收发消息的简单例子。因为在浏览器环境中仅支持使用 WebSocket 连接,所以我们将使用在浏览器环境Node.js 环境两种不同的连接参数来完成连接。不过除连接地址外,其它参数均是相同的,因此读者可根据自己的实际情况选择使用。

    const mqtt = require('mqtt')
    
    /***
     * 浏览器环境
     * 使用协议为 ws 和 wss 的 MQTT over WebSocket 连接
     * EMQX 的 ws 连接默认端口为 8083,wss 为 8084
     * 注意需要在连接地址后加上一个 path, 例如 /mqtt
     */
    const url = 'ws://broker.emqx.io:8083/mqtt'
    /***
     * Node.js 环境
     * 使用协议为 mqtt 和 mqtts 的 MQTT over TCP 连接
     * EMQX 的 mqtt 连接默认端口为 1883,mqtts 为 8084
     */
    // const url = 'mqtt://broker.emqx.io:1883'
    
    // 创建客户端实例
    const options = {
      // Clean session
      clean: true,
      connectTimeout: 4000,
      // 认证信息
      clientId: 'emqx_test',
      username: 'emqx_test',
      password: 'emqx_test',
    }
    const client = mqtt.connect(url, options)
    client.on('connect', function () {
      console.log('Connected')
      // 订阅主题
      client.subscribe('test', function (err) {
        if (!err) {
          // 发布消息
          client.publish('test', 'Hello mqtt')
        }
      })
    })
    
    // 接收消息
    client.on('message', function (topic, message) {
      // message is Buffer
      console.log(message.toString())
      client.end()
    })

    命令行

    在全局安装完 MQTT.js 后,我们同样可以使用命令行工具来完成主题订阅消息发布接收的动作。

    示例连接到

    broker.emqx.io
    并订阅
    testtopic/#
    主题:
    mqtt sub -t 'testtopic/#' -h 'broker.emqx.io' -v

    示例连接到

    broker.emqx.io
    并向
    testtopic/hello
    主题发送消息
    mqtt pub -t 'testtopic/hello' -h 'broker.emqx.io' -m 'from MQTT.js'

    如需使用功能更加全面的 MQTT 命令行工具,可参考使用 MQTT X CLI。

    API 介绍

    mqtt.connect([url], options)

    连接到指定的 MQTT Broker 的函数,并始终返回一个 Client 对象。第一个参数传入一个 URL 值,URL 可以是以下协议:

    mqtt
    ,
    mqtts
    ,
    tcp
    ,
    tls
    ,
    ws
    ,
    wss
    。URL 也可以是一个由 URL.parse() 返回的对象。然后再传入一个 Options 对象,用于配置 MQTT 连接时的选项。当使用 WebSocket 连接时需要注意地址后是否需要加上一个 path,例如
    /mqtt

    下面列举一些常用的 Options 对象中的属性值:

    • Options

      • topic
        : 遗嘱发送的 Topic
      • payload
        : 遗嘱发布的消息
      • QoS
        : 遗嘱发送的 QoS 值
      • retain
        : 遗嘱发布的消息的 retain 标志
      • keepalive
        : 单位为
        ,数值类型,默认为 60 秒,设置为 0 时禁止
      • clientId
        : 默认为
        'mqttjs_' + Math.random().toString(16).substr(2, 8)
        ,可以支持自定义修改的字符串
      • protocolVersion
        : MQTT 协议版本号,默认为 4(v3.1.1)可以修改为 3(v3.1)和 5(v5.0)
      • clean
        : 默认为
        true
        ,是否清除会话。当设置为
        true
        时,断开连接后将清除会话,订阅过的 Topics 也将失效。当设置为
        false
        时,离线状态下也能收到 QoS 为 1 和 2 的消息
      • reconnectPeriod
        : 重连间隔时间,单位为毫秒,默认为 1000 毫秒,注意:当设置为 0 以后将取消自动重连
      • connectTimeout
        : 连接超时时长,收到 CONNACK 前的等待时间,单位为毫秒,默认 30000 毫秒
      • username
        : 认证用户名,如果 Broker 要求用户名认证的话,请设置该值
      • password
        : 认证密码,如果 Broker 要求密码认证的话,请设置该值
      • will
        : 遗嘱消息,一个可配置的对象值,当客户端非正常断开连接时,Broker 就会向遗嘱 Topic 里面发布一条消息,格式为:
      • properties
        : MQTT 5.0 新增,可配置的对象的属性值,详情请参考:https://github.com/mqttjs/MQTT.js#mqttclientstreambuilder-options
    • 如果需要配置 SSL/TLS 连接,Option 对象会被传递给

      tls.connect()
      ,因此可以在 option 中配置以下属性
      • rejectUnauthorized
        : 是否验证服务端证书链和地址名称,设置为 false 时将跳过验证,会暴露在中间人的攻击之下,所以不建议在生产环境中使用这种配置,当设置为 true 时,将开启强认证模式,且如果是自签名证书,请在证书配置时设置 Alt name。
      • ca
        : 只有在服务器使用自签名证书时才有必要,自签名证书中生成的 CA 文件
      • cert
        : 只有当服务器需要客户证书认证时才有必要(双向认证),客户端证书
      • key
        : 只有当服务器需要客户证书认证时才有必要(双向认证),客户端密钥

    Client 事件

    当连接成功后,返回的 Client 对象可通过 on 方法监听多个事件,业务逻辑可在监听的回调函数中完成。以下列举一些常用的事件:

    connect

    当连接成功时触发,参数为 connack

    client.on('connect', function (connack) {
      console.log('Connected')
    })

    reconnect

    当断开连接后,经过重连间隔时间重新自动连接到 Broker 时触发

    client.on('reconnect', function () {
      console.log('Reconnecting...')
    })

    close

    在断开连接以后触发

    client.on('close', function () {
      console.log('Disconnected')
    })

    disconnect

    在收到 Broker 发送过来的断开连接的报文时触发,参数 packet 即为断开连接时接收到的报文,MQTT 5.0 中的功能

    client.on('disconnect', function (packet) {
      console.log(packet)
    })

    offline

    当客户端下线时触发

    client.on('offline', function () {
      console.log('offline')
    })

    error

    当客户端无法成功连接时或发生解析错误时触发,参数 error 为错误信息

    client.on('error', function (error) {
      console.log(error)
    })

    message

    当客户端收到一个发布过来的 Payload 时触发,其中包含三个参数,topic、payload 和 packet,其中 topic 为接收到的消息的 topic,payload 为接收到的消息内容,packet 为 MQTT 报文信息,其中包含 QoS、retain 等信息

    client.on('message', function (topic, payload, packet) {
      // Payload is Buffer
      console.log(`Topic: ${topic}, Message: ${payload.toString()}, QoS: ${packet.qos}`)
    })

    Client 方法

    Client 除监听事件外,也内置一些方法,用来进行发布订阅的操作等,以下列举一些常用的方法。

    Client.publish(topic, message, [options], [callback])

    向某一 topic 发布消息的函数方法,其中包含四个参数:

    // 向 testtopic 主题发送一条 QoS 为 0 的测试消息
    client.publish('testtopic', 'Hello, MQTT!', { qos: 0, retain: false }, function (error) {
      if (error) {
        console.log(error)
      } else {
        console.log('Published')
      }
    })
    • topic: 要发送的主题,为字符串

    • message: 要发送的主题的下的消息,可以是字符串或者是 Buffer

    • options: 可选值,发布消息时的配置信息,主要是设置发布消息时的 QoS、Retain 值等。

    • callback: 发布消息后的回调函数,参数为 error,当发布失败时,该参数才存在

    Client.subscribe(topic/topic array/topic object, [options], [callback])

    订阅一个或者多个 topic 的方法,当连接成功需要订阅主题来获取消息,该方法包含三个参数:

    // 订阅一个名为 testtopic QoS 为 0 的 Topic
    client.subscribe('testtopic', { qos: 0 }, function (error, granted) {
      if (error) {
        console.log(error)
      } else {
        console.log(`${granted[0].topic} was subscribed`)
      }
    })
    • topic: 可传入一个字符串,或者一个字符串数组,也可以是一个 topic 对象,

      {'test1': {qos: 0}, 'test2': {qos: 1}}
    • options: 可选值,订阅 Topic 时的配置信息,主要是填写订阅的 Topic 的 QoS 等级的

    • callback: 订阅 Topic 后的回调函数,参数为 error 和 granted,当订阅失败时 error 参数才存在, granted 是一个 {topic, qos} 的数组,其中 topic 是一个被订阅的主题,qos 是 Topic 是被授予的 QoS 等级

    Client.unsubscribe(topic/topic array, [options], [callback])

    取消订阅单个主题或多个主题,该方法包含三个参数:

    // 取消订阅名为 testtopic 的 Topic
    client.unsubscribe('testtopic', function (error) {
      if (error) {
        console.log(error)
      } else {
        console.log('Unsubscribed')
      }
    })
    • topic: 可传入一个字符串或一个字符串数组

    • options: 可选值,取消订阅时的配置信息

    • callback: 取消订阅时的回调函数,参数为 error,当取消订阅失败时 error 参数才存在

    Client.end([force], [options], [callback])

    关闭客户端,该方法包含三个参数:

    client.end()
    • force: 设置为 true 时将立即关闭客户端,而无需等待断开连接的消息被接受。这个参数是可选的,默认为 false。注意:使用该值为 true 时,Broker 无法接收到 disconnect 的报文

    • options: 可选值,关闭客户端时的配置信息,主要是可以配置 reasonCode,断开连接时的 Reason Code

    • callback: 当客户端关闭时的回调函数

    关闭

    用微信“扫一扫”