前言Angular 是一个基于 TypeScript 构建的开发平台。它包括一个基于组件的框架,用于构建可伸缩的 Web 应用;一组完美集成的库,涵盖路由、表单管理、客户端-服务器通信等各种功能;一套开发工具,可帮助用户开发、构建、测试和更新代码。 MQTT 是一种基于发布/订阅模式的轻量级物联网消息传输协议。该协议提供了一对多的消息分发和应用程序的解耦,传输消耗小,可最大限度减少网络流量,同时具有三种不同消息服务质量等级,满足不同投递需求的优势。 本文将介绍如何在 Angular 项目中使用 MQTT 协议,实现客户端与 MQTT 服务器的连接、订阅、收发消息、取消订阅等功能。 项目初始化新建项目参考链接如下:使用 Angular CLI 创建 Angular 项目 示例: ng new my-app 安装 MQTT 客户端库本次使用的是库为 ngx-mqtt,这个库不仅仅是 MQTT.js 的包装器,用于 angular >= 2。它使用 observables 并负责订阅处理和消息路由, ngx-mqtt 非常适合具有许多组件和许多订阅者的应用程序。 通过命令行安装 ngx-mqtt,可以使用 npm 或 yarn 命令(二者选一) npm install ngx-mqtt --save yarn add ngx-mqtt MQTT 的使用连接 MQTT 服务器本文将使用 EMQX 提供的免费公共 MQTT 服务器,该服务基于 MQTT 云服务器 - EMQX Cloud 创建。EMQX 是一款大规模分布式物联网 MQTT 消息服务器,可高效可靠连接海量物联网设备,实时处理分发消息与事件流数据,助力构建关键业务的物联网平台与应用。 服务器接入信息如下: Broker: broker.emqx.io TCP Port: 1883 WebSocket Port: 8083
连接关键代码: import { IMqttMessage, IMqttServiceOptions, MqttService, IPublishOptions, } from 'ngx-mqtt'; import { IClientSubscribeOptions } from 'mqtt-browser'; import { Subscription } from 'rxjs';
@Component({ selector: 'app-root', templateUrl: './app.component.html', styleUrls: ['./app.component.scss'], }) export class AppComponent { constructor(private _mqttService: MqttService) { this.client = this._mqttService; } private curSubscription: Subscription | undefined; connection = { hostname: 'broker.emqx.io', port: 8083, path: '/mqtt', clean: true, // 保留会话 connectTimeout: 4000, // 超时时间 reconnectPeriod: 4000, // 重连时间间隔 // 认证信息 clientId: 'mqttx_597046f4', username: 'emqx_test', password: 'emqx_test', protocol: 'ws', } subscription = { topic: 'topic/mqttx', qos: 0, }; publish = { topic: 'topic/browser', qos: 0, payload: '{ "msg": "Hello, I am browser." }', }; receiveNews = ''; qosList = [ { label: 0, value: 0 }, { label: 1, value: 1 }, { label: 2, value: 2 }, ]; client: MqttService | undefined; isConnection = false; subscribeSuccess = false;
// 创建连接 createConnection() { // 连接字符串, 通过协议指定使用的连接方式 // ws 未加密 WebSocket 连接 // wss 加密 WebSocket 连接 // mqtt 未加密 TCP 连接 // mqtts 加密 TCP 连接 try { this.client?.connect(this.connection as IMqttServiceOptions) } catch (error) { console.log('mqtt.connect error', error); } this.client?.onConnect.subscribe(() => { this.isConnection = true console.log('Connection succeeded!'); }); this.client?.onError.subscribe((error: any) => { this.isConnection = false console.log('Connection failed', error); }); this.client?.onMessage.subscribe((packet: any) => { this.receiveNews = this.receiveNews.concat(packet.payload.toString()) console.log(`Received message ${packet.payload.toString()} from topic ${packet.topic}`) }) } } 订阅主题连接 MQTT 服务器成功之后,调用当前 MQTT 实例的 subscribe 方法,传入 Topic 和 QoS 参数,即可订阅成功。 doSubscribe() { const { topic, qos } = this.subscription this.curSubscription = this.client?.observe(topic, { qos } as IClientSubscribeOptions).subscribe((message: IMqttMessage) => { this.subscribeSuccess = true console.log('Subscribe to topics res', message.payload.toString()) }) } 取消订阅unsubscribe 方法可以释放订阅持有的资源。 doUnSubscribe() { this.curSubscription?.unsubscribe() this.subscribeSuccess = false } 消息发布unsafePublish发布带有可选选项的主题的消息,如 QoS、Retain 等选项,如下所示。 doPublish() { const { topic, qos, payload } = this.publish console.log(this.publish) this.client?.unsafePublish(topic, payload, { qos } as IPublishOptions) } 断开连接disconnect 断开与 MQTT 客户端的连接,传入参数 True 表示强制断开与 MQTT 客户端的连接。 destroyConnection() { try { this.client?.disconnect(true) this.isConnection = false console.log('Successfully disconnected!') } catch (error: any) { console.log('Disconnect failed', error.toString()) } } 测试我们使用 Angular 编写了如下简单的浏览器应用,该应用具备:创建连接、订阅主题、收发消息、取消订阅、断开连接等功能。 使用 MQTT 5.0 客户端工具 - MQTT X 作为另一个客户端进行消息收发测试。 在 MQTT X 发送第二条消息之前,在浏览器端进行取消订阅操作,浏览器端将不会收到 MQTT X 发送的后续消息。 总结综上所述,我们实现了在 Angular 项目中创建 MQTT 连接,模拟了客户端与 MQTT 服务器进行订阅、收发消息、取消订阅以及断开连接的场景。 Angular 作为三大主流的前端框架之一,既能够在浏览器端使用,也能够在移动端使用,结合 MQTT 协议及 MQTT 物联网云服务 可以开发出很多有趣的应用,比如客服聊天系统或实时监控物联网设备信息的管理系统等。 版权声明: 本文为 EMQ 原创,转载请注明出处。 原文链接:https://www./zh/blog/how-to-use-mqtt-in-angular
|