Lập trình với thư viện MQTTnet

    0

    Trong các bài học trước bạn đã thực hành viết code sử dụng thư viện MQTTnet cũng như học chi tiết về giao thức MQTT. Trong bài học này chúng ta sẽ đi phân tích chi tiết code của bài thực hành đã làm, qua đó giới thiệu kỹ hơn về cách lập trình cơ bản với thư viện MQTTnet.

    Thư viện MQTTnet

    MQTTnet là một thư viện mã nguồn mở miễn phí thực thi giao thức MQTT cho các ứng dụng .NET hoạt động đa nền tảng (Windows, Linux, macOS). 

    Thư viện MQTTnet cung cấp các tính năng như:

    • Quản lý kết nối và đăng ký với máy chủ MQTT
    • Gửi và nhận các tin nhắn MQTT với các cấp độ QoS khác nhau
    • Hỗ trợ mã hóa TLS/SSL và xác thực người dùng
    • Hỗ trợ các tính năng mới của MQTT 5.0, như thuộc tính tin nhắn, chủ đề được chia sẻ và quyền truy cập chủ đề
    • Hỗ trợ các giao diện lập trình ứng dụng (API) đồng bộ và không đồng bộ
    • Hỗ trợ các phiên bản MQTT 3.1, 3.1.1 và 5.0.
    • Cung cấp các gói mở rộng hỗ trợ thêm cho MQTT, như Managed Client (với các tính năng nâng cao), RPC (gọi hàm từ xa).

    Để sử dụng thư viện MQTTnet trong ứng dụng, bạn có thể tải gói thư viện từ NuGet như đã thực hiện trong phần ví dụ ở bài học trước. Thư viện này được cài đặt chung cho cả Publisher và Subscriber. Sau khi cài đặt thư viện cho dự án, bạn cần thực hiện các thao tác riêng cho các loại client. Các đoạn code ví dụ ở đây đều trích từ bài thực hành đã làm trong bài học trước.

    Khởi tạo MQTTnet

    Bước đầu tiên trong lập trình với thư viện này là khởi tạo đối tượng của lớp MqttClient. Việc khởi tạo này là bắt buộc với cả Publisher và Subscriber.

    Lớp MqttClient là lớp trung tâm trong hoạt động của thư viện này. Lớp này chứa đầy đủ các phương thức để kết nối tới broker, phát thông điệp, đăng ký chủ đề, nhận thông điệp, ngắt kết nối với broker.

    Để khởi tạo đối tượng của lớp MqttClient, bạn cần sử dụng một lớp hỗ trợ là MqttFactory. Lớp này sử dụng mẫu thiết kế Factory để đơn giản hóa việc khởi tạo đối tượng cho MqttClient. Code khởi tạo MqttClient như sau:

    var mqttFactory = new MqttFactory();
    var mqttClient = mqttFactory.CreateMqttClient();

    Ở đây cần lưu ý không nên sử dụng cách khởi tạo đối tượng thông thường của C# vì nó rắc rối hơn nhiều.

    Kết nối tới broker

    Bước thứ hai là kết nối MqttClient tới broker. Bước này cũng bắt buộc với cả Publisher và Subscriber. 

    Code thực hiện kết nối tới broker như sau:

    var mqttClientOptions = new MqttClientOptionsBuilder()
        .WithTcpServer("broker.hivemq.com")
        .Build();
    await mqttClient.ConnectAsync(mqttClientOptions, CancellationToken.None);

    Để kết nối tới broker cần cung cấp các tham số cơ bản nhất là địa chỉ của broker và số cổng TCP mà broker đang chờ yêu cầu kết nối. Địa chỉ của broker có thể là địa chỉ IP hoặc tên miền. Nếu broker sử dụng cổng mặc định (1883 cho kết nối không mã hóa) thì không cần cung cấp giá trị cổng. Trong đoạn code trên chúng ta sử dụng broker của HiveMQ tại địa chỉ broker.hivemq.com và cổng mặc định.

    Các tham số kết nối được lưu trong đối tượng của lớp MqttClientOptions. Đối tượng của lớp MqttClientOptions được tạo ra nhờ hỗ trợ của một đối tượng của lớp MqttClientOptionsBuilder.  Lớp MqttClientOptionsBuilder sử dụng cú pháp fluent để đơn giản hóa việc cung cấp các tham số. Phương thức Build cần được gọi cuối cùng trong chuỗi để sinh đối tượng cần thiết.

    Tham số kết nối được sử dụng khi gọi phương thức kết nối ConnectAsync của MqttClient. Phương thức này yêu cầu một tham số thứ hai CancellationToken để có thể hủy bỏ quá trình bất đồng bộ khi cần thiết. Trong đoạn code trên chúng ta sử dụng gia trị CancellationToken.None, nghĩa là không cho phép hủy bỏ quá trình kết nối bất đồng bộ.

    MQTTnet hầu như hoàn toàn sử dụng các phương thức bất đồng bộ TAP khi làm việc với broker. Vì vậy các phương thức đều có hậu tố Async, được gọi cùng từ khóa await, và phải đặt trong phương thức khai báo cùng từ khóa async.

    Phát thông điệp ở Publisher

    Sau khi kết nối thành công với broker, Publisher có thể bắt đầu phát đi thông điệp. Chúng ta cần thực hiện hai bước: Tạo thông điệp, và gửi thông điệp. Code thực hiện công việc này như sau:

    var applicationMessage = new MqttApplicationMessageBuilder()
        .WithTopic("mype/test/command/2")
        .WithPayload(payload)
        .Build();
    
    await mqttClient.PublishAsync(applicationMessage, CancellationToken.None);
    

    Mỗi thông điệp trong MQTTnet được lưu trong một đối tượng của lớp MqttApplicationMessage. Đối tượng của lớp này được tạo ra nhờ lớp hỗ trợ MqttApplicationMessageBuilder với cú pháp fluent. Lớp MqttClientOptionsBuilder giúp cung cấp các tham số để tạo thông điệp tiện lợi hơn. 

    Tham số bắt buộc khi tạo thông điệp là chuỗi chủ đề, được cung cấp qua phương thức WithTopic. Chuỗi chủ đề trong MQTT có thể xem như là một lệnh. Về cấu trúc của chuỗi chủ đề chúng ta đã xem xét ở phần lý thuyết về MQTT. Trong ví dụ trên, chuỗi chủ đề là “mype/test/command/2”. Khi tạo chuỗi chủ đề nên lưu ý tới đặc điểm phân cấp của các thiết bị thực tế. Ví dụ, nếu trong một ngôi nhà có nhiều phòng, mỗi phòng có vài thiết bị, thì có thể tạo chủ đề phân cấp theo kiểu “house/livingroom/airconditioner/1”.

    Tham số thứ hai bạn có thể muốn cung cấp là giá trị payload, được cung cấp qua phương thức WithPayload. Payload có thể xem như dữ liệu / tham số phục vụ cho lệnh (tức là chủ đề). Payload là không bắt buộc khi tạo thông điệp. MQTTnet cung cấp 5 overload khác nhau của phương thức WithPayload để hỗ trợ bạn gửi các loại giá trị khác nhau, bao gồm chuỗi ký tự (string), luồng dữ liệu (Stream), mảng byte (byte[] hoặc IEnumerable<byte>). Nếu cần gửi các kiểu dữ liệu khác, bạn cần tự mình thực hiện chuyển đổi về một trong các kiểu trên qua cơ chế serialization.

    Sau khi tạo thông điệp, bạn có thể gửi nó đi qua lời gọi phương thức PublishAsync của MqttClient. Phương thức này yêu cầu tham số thứ nhất là thông điệp vừa tạo, và tham số thứ 2 là CancellationToken, tương tự như ở phương thức kết nối. Lời gọi PublishAsync sẽ đẩy thông điệp vừa tạo lên broker.

    Ở đây cần lưu ý rằng, publisher là bên chủ động phát thông điệp, nghĩa là nó phải là nguồn dữ liệu. Như vậy, các thiết bị IoT gắn với cảm biến thu thập thông số từ môi trường ngoài sẽ thường đóng vai trò là publisher. Nếu ứng dụng người dùng cần phát đi lệnh thì nó cũng đóng vai trò publisher. 

    Nhận thông điệp ở Subscriber

    Subscriber là bên thụ động nhận và xử lý thông điệp. Quá trình Subscriber nhận thông điệp hoạt động theo mô hình hướng sự kiện và bất đồng bộ. Các thao tác tại Subscriber phức tạp hơn so với bên Publisher và bao gồm các bước:

    • Xây dựng phương thức xử lý thông điệp nhận được;
    • Chỉ định phương thức xử lý thông điệp nhận được cho MqttClient;
    • Đăng ký chủ đề cần theo dõi.

    Phương thức xử lý thông điệp được xây dựng theo mô hình của một phương thức xử lý sự kiện tiêu chuẩn của C#, với tham số đầu vào kiểu MqttApplicationMessageReceivedEventArgs.

    static async Task OnMessageReceived(MqttApplicationMessageReceivedEventArgs e) {
        Console.WriteLine($"# Received message: {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}");
        await Task.CompletedTask;
    }
    

    Trong phương thức xử lý sự kiện này, nếu cần lấy thông tin gì từ thông điệp, bạn có thể sử dụng tham số e.ApplicationMessage. Ví dụ, nếu cần lấy chuỗi chủ đề từ thông điệp, bạn cần truy cập vào thuộc tính e.ApplicationMessage.Topic, hoặc nếu cần lấy ra phần payload từ thông điệp, bạn cần truy cập vào thuộc tính e.ApplicationMessage.Payload.

    Một điều cần lưu ý nữa là phương thức xử lý sự kiện này cũng phải xây dựng theo mô hình bất đồng bộ. Do đó, kiểu trả về của nó phải là Task. Trong ví dụ trên, do phương thức xử lý sự kiện chỉ đơn giản là in kết quả ra màn hình console và không có lời gọi bất đồng bộ nào khác, chúng ta phải có lệnh await Task.CompletedTask để phù hợp với yêu cầu.

    Để chỉ định phương thức xử lý sự kiện cho MqttClient, chúng ta sử dụng cú pháp gán sự kiện của C# cho sự kiện ApplicationMessageReceivedAsync của lớp MqttClient.

    mqttClient.ApplicationMessageReceivedAsync += OnMessageReceived;

    Để đăng ký chủ đề cần theo dõi với broker, Subscriber cần thực hiện như sau:

    var mqttSubscribeOptions = mqttFactory.CreateSubscribeOptionsBuilder()
        .WithTopicFilter(f => f.WithTopic("mype/test/command/1"))
        .WithTopicFilter("mype/test/command/2")
        .Build();
    
    await mqttClient.SubscribeAsync(mqttSubscribeOptions, CancellationToken.None);
    

    Code đăng ký này bao gồm hai phần: (1) Tạo danh sách chủ đề, (2) Đăng ký.

    Danh sách chủ đề cần đăng ký được chứa trong một đối tượng của lớp MqttClientSubscribeOptions. Đối tượng của lớp này lại được tạo ra nhờ phương thức hỗ trợ Build của lớp MqttClientSubscribeOptionsBuilder.

    Đối tượng của lớp MqttClientSubscribeOptionsBuilder được tạo ra nhờ lệnh gọi phương thức CreateSubscribeOptionsBuilder của lớp MqttFactory (vốn cũng được cùng để chế tạo đối tượng MqttClient).

    Mỗi chủ đề cần đăng ký được cung cấp qua một lời gọi phương thức WithTopicFilter. Bạn có thể gọi phương thức này nhiều lần để đăng ký cùng lúc nhiều chủ đề khác nhau. 

    Phương thức WithTopicFilter có một số overload khác nhau. Trong ví dụ trên sử dụng hai overload khác nhau để đăng ký 2 chủ đề: 

    • Overload thứ nhất yêu cầu cung cấp một phương thức làm tham số có kiểu Action<MqttTopicFilterBuilder>, thường được viết ở dạng hàm lambda.
    • Overload thứ hai chấp nhận thẳng một chuỗi ký tự làm chủ đề. Đây là cách khai báo chủ đề đơn giản hơn cả.

    Các chủ đề mà Subscriber đăng ký cần phù hợp với chủ đề mà Publisher xuất bản. Danh sách các chủ đề có thể xem như một tập lệnh mà hai bên thỏa thuận với nhau.

    Ngắt kết nối với broker

    Bước này có thể thực hiện ở cả Publisher và Subscriber. 

    Việc ngắt kết nối với broker là không bắt buộc. Nếu đóng client, liên kết TCP sẽ bị hủy bỏ, và broker sẽ hủy bỏ phiên làm việc của client. Tuy nhiên, chúng ta nên kiểm soát việc ngắt kết nối để đảm bảo chương trình hoạt động đúng theo cách mong muốn. Việc phát lệnh ngắt kết nối cần xem xét trong tình huống cụ thể. 

    Nếu Publisher không thường xuyên gửi thông điệp thì bạn có thể ngắt kết nối sau mỗi lần gửi. Nhưng nếu Publisher cần gửi thông điệp thường xuyên thì bạn nên giữ kết nối. Vì việc tạo kết nối mới (cũng như hủy kết nối) đều cần nhiều thời gian để thực hiện.

    Subscriber thông thường sẽ duy trì kết nối để nhận được thông điệp kịp thời vì đây là chương trình thụ động nhờ nhận lệnh. Nó không thể biết khi nào Publisher sẽ phát thông điệp.

    Việc ngắt kết nối MQTT rất đơn giản:

    await mqttClient.DisconnectAsync();

    Chú ý, đây cũng là một lời gọi phương thức bất đồng bộ.

    Một số kinh nghiệm làm việc với thư viện MQTTnet

    Thư viện MQTTnet không quá phức tạp khi khai thác. Tuy nhiên, các phương thức do nó cung cấp đều khá rắc rối khi sử dụng với rất nhiều tham số. Nếu phải sử dụng bộ thư viện này liên tục trong nhiều dự án, bạn nên xây dựng một bộ thư viện dạng wrapper quanh thư viện MQTTnet để giảm bớt lượng code cần viết.

    Dưới đây sẽ giới thiệu với bạn code cho thư viện như vậy.

    Hãy tạo một dự án dạng class library và copy code sau vào các file cùng tên.

    namespace Mqtt;
    /// <summary>
    /// Class for storing configuration for Mqtt broker
    /// </summary>
    public class MqttConfig {
        public MqttConfig() {
        }
        public string Uri { get; set; } = "broker.hivemq.com";
        public int Port { get; set; } = 1883;
    } 
    using MQTTnet;
    using MQTTnet.Client;
    namespace Mqtt.Unmanaged;
    public class ClientBase {
        protected readonly IMqttClient _client;
        protected readonly MqttConfig _config;
        public IMqttClient Client => _client;
        public ClientBase(IMqttClient client, MqttConfig config) {
            _client = client;
            _config = config;
        }
        public ClientBase(MqttConfig config) {
            _config = config;
            _client = new MqttFactory().CreateMqttClient();
        }
        public async Task ConnectAsync() {
            var options = new MqttClientOptionsBuilder()
                    //.WithClientId(clientId)
                    .WithTcpServer(_config.Uri, _config.Port)
                    .WithCleanSession()
                    .Build();
            await _client.ConnectAsync(options);
        }
        public async Task DisconnectAsync() {
            await _client.DisconnectAsync();
        }
    }
    
    using MQTTnet;
    using MQTTnet.Client;
    using MQTTnet.Protocol;
    namespace Mqtt.Unmanaged;
    public class Publisher : ClientBase {
        public Publisher(IMqttClient client, MqttConfig config) : base(client, config) { }
        public Publisher(MqttConfig config) : base(config) { }
        public async Task PublishNoPayloadAsync(string topic, bool retainFlag = false, MqttQualityOfServiceLevel qos = MqttQualityOfServiceLevel.AtMostOnce) {
            var message = new MqttApplicationMessageBuilder()
                .WithTopic(topic)
                .WithPayload(new byte[] { 0 })
                .WithContentType("0")
                .WithQualityOfServiceLevel(qos)
                .WithRetainFlag(retainFlag)
                .Build();
            await _client.PublishAsync(message);
        }
        public async Task PublishStringAsync(string topic, string payload, MqttQualityOfServiceLevel qos = MqttQualityOfServiceLevel.AtMostOnce, bool retailFlag = false) {
            await _client.PublishStringAsync(topic, payload, qos, retailFlag);
        }
        public async Task PublishBinaryAsync(string topic, byte[] payload, MqttQualityOfServiceLevel qos = MqttQualityOfServiceLevel.AtMostOnce, bool retailFlag = false) {
            await _client.PublishBinaryAsync(topic, payload, qos, retailFlag);
        }
    }
    
    using MQTTnet;
    using MQTTnet.Client;
    using MQTTnet.Packets;
    using MQTTnet.Protocol;
    using System;
    namespace Mqtt.Unmanaged;
    public class Subscriber : ClientBase {
        public Subscriber(IMqttClient client, MqttConfig config) : base(client, config) { }
        public Subscriber(MqttConfig config) : base(config) { }
        public Dictionary<string, Func<MqttApplicationMessage, Task>> Handlers { get; set; } = new();
        public Func<string, Task>? ErrorHandler { get; set; }
        public Func<string, Task>? LogHandler { get; set; }
        public async Task StartAsync() {
            _client.ApplicationMessageReceivedAsync += Client_ApplicationMessageReceivedAsync;
            await ConnectAsync();
            await SubscribeAsync(Handlers.Keys.ToArray());
        }
        private async Task Client_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg) {
            string topic = arg.ApplicationMessage.Topic;
            // if this is a rpc call - abort
            if (topic.StartsWith("MQTTnet.RPC/")) { return; }
            try {
                await Handlers[topic].Invoke(arg.ApplicationMessage);
            } catch (Exception e) {
                if (ErrorHandler != null)
                    await ErrorHandler.Invoke(e.Message);
            }
        }
        public async Task SubscribeAsync(string topic, MqttQualityOfServiceLevel qos = MqttQualityOfServiceLevel.AtMostOnce) {
            var topicFilter = new MqttTopicFilterBuilder()
                .WithTopic(topic)
                .WithQualityOfServiceLevel(qos)
                .Build();
            await _client.SubscribeAsync(topicFilter);
        }
        public async Task SubscribeAsync(string[] topics, MqttQualityOfServiceLevel qos = MqttQualityOfServiceLevel.AtMostOnce) {
            foreach (var topic in topics) {
                await _client.SubscribeAsync(topic, qos);
            }
        }
        public async Task UnsubscribeAsync(string topic) {
            await _client.UnsubscribeAsync(topic);
        }
    }
    

    Khi này bạn có thể sử dụng bộ wrapper trên cho Subscriber như sau:

    using Mqtt;
    using Mqtt.Unmanaged;
    using System.Text;
    // subscriber
    internal class Program {    
        private static async Task Main(string[] args) {
            MqttConfig mqttConfig = new() { Uri = "broker.hivemq.com" };
            Subscriber subscriber = new(mqttConfig);
            subscriber.Handlers["test/command/1"] = async e => {
                Console.WriteLine($"#Message: {Encoding.UTF8.GetString(e.Payload)}");
                await Task.CompletedTask;
            };
            await subscriber.StartAsync();        
            Thread.Sleep(-1);
        }
    }

    và cho Publisher như sau:

    using Mqtt;
    using Mqtt.Unmanaged;
    // publisher
    internal class Program {    
        private static async Task Main(string[] args) {
            MqttConfig mqttConfig = new() { Uri = "broker.hivemq.com" };
            Publisher publisher = new(mqttConfig);
            await publisher.ConnectAsync();
            while(true) {
                Console.Write("Message >>> ");
                var payload = Console.ReadLine();
                await publisher.PublishStringAsync("test/command/1", payload);
            }        
        }
    }

    Kết quả chạy như sau:

    Với hỗ trợ của thư viện wrapper vừa viết, việc sử dụng MQTTnet đơn giản và nhanh gọn hơn rất nhiều.

    + Nếu bạn thấy site hữu ích, trước khi rời đi hãy giúp đỡ site bằng một hành động nhỏ để site có thể phát triển và phục vụ bạn tốt hơn.
    + Nếu bạn thấy bài viết hữu ích, hãy giúp chia sẻ tới mọi người.
    + Nếu có thắc mắc hoặc cần trao đổi thêm, mời bạn viết trong phần thảo luận cuối trang.
    Cảm ơn bạn!

    Kết luận

    Trong bài học này các bạn đã học chi tiết một số kỹ thuật cơ bản để lập trình với thư viện MQTTnet, bao gồm các thao tác tạo client, kết nối tới broker, ngắt kết nối, phát thông điệp, nhận và xử lý thông điệp.

    Cần nhấn mạnh rằng, đây là một bài học sơ lược về thư viện MQTTnet. Nếu muốn áp dụng trong những dự án thực tế, bạn cần tự mình tìm hiểu chi tiết hơn các kỹ thuật nâng cao qua trang wiki của dự án trên github.

    Theo dõi
    Thông báo của
    guest

    0 Thảo luận
    Phản hồi nội tuyến
    Xem tất cả bình luận