Lập trình với socket publisher / subscriber của NetMQ

    0

    Trong bài học này chúng ta sẽ phân tích chi tiết kỹ thuật lập trình với socket publisher / subscriber mà chúng ta đã thực hiện trong bài tập thực hành lập trình ZeroMQ cơ bản.

    Khởi tạo socket

    NetMQ cung cấp lớp PublisherSocket để tạo publisher. Việc khởi tạo socket của publisher thực hiện như sau:

    using var publisher = new PublisherSocket();
    publisher.Bind("tcp://*:1308");
    

    Cân lưu ý rằng, trong mô hình publisher / subscriber của ZMQ, publisher là bên có đầu cuối cố định để các subscriber kết nối tới. Do vậy, sau khi khởi tạo đối tượng, chúng ta phải gắn nó với một điểm cuối cố định thông qua gọi phương thức Bind. Phương thức này nhận tham số giống như đối với ResponseSocket.

    Chúng ta cũng có thể sử dụng cách thức thứ hai để khởi tạo publisher socket như sau:

    using var publisher = new PublisherSocket("@tcp://*:1308");

    Lưu ý ký tự @ trước chuỗi đầu cuối. Nó có cùng ý nghĩa như trong trường hợp ResponseSocket.

    Đối với subscriber, NetMQ cung cấp lớn SubscriberSocket. Quá trình thực hiện như sau:

    using var subscriber = new SubscriberSocket();
    subscriber.Connect("tcp://127.0.0.1:1308");
    

    Có thể thấy rằng, sau khi khởi tạo đối tượng của lớp SubscriberSocket, chúng ta cần kết nối nó trực tiếp với publisher. Đây là điểm khác biệt với mô hình publisher / subscriber của MQTT.

    Chúng ta cũng có thể gộp hai lệnh trên vào một lệnh duy nhất:

    using var subscriber = new SubscriberSocket(">tcp://127.0.0.1:1308");

    Cũng lưu ý ký tự > trước chuỗi đầu cuối. Cách viết này sẽ yêu cầu NetMQ tự động gọi phương thức Connect.

    Đăng ký chủ đề ở subscriber

    Ở phía subscriber có thêm một thao tác nữa sau khởi tạo: đăng ký các chủ đề cần theo dõi. Việc đăng ký chủ đề được thực hiện qua lời gọi phương thức Subscriber như sau:

    subscriber.Subscribe("hello_topic");

    Nếu muốn theo dõi những chủ đề nào thì gọi Subscribe cho từng chủ đề. Một cách khác để đăng ký theo dõi mọi chủ đề từ publisher là sử dụng phương thức SubscribeToAnyTopic. Cách tương đương để để đăng ký mọi chủ đề là cung cấp chủ đề rỗng “” cho phương thức Subscribe.

    Chủ đề trong ZMQ thực tế là một chuỗi byte. Tuy nhiên, chúng ta hoàn toàn có thể cung cấp các chuỗi ký tự làm chủ đề. NetMQ sẽ tự động chuyển đổi cho chúng ta.

    Chuỗi ký tự chủ đề trong ZMQ cũng được tổ chức theo cấu trúc phân cấp, sử dụng ký tự / để phân tách các cấp. Như vậy, chúng ta có thể có chủ đề như “house/living/air-conditioner”, “sport/football”, v.v..

    Một điều cần lưu ý là ZMQ so khớp chủ đề theo kiểu tiền tố. Ví dụ, nếu đăng ký chủ đề “topic” thì nó cũng đồng thời nhận được các thông điệp với chủ đề “topic/subtopic” hoặc thậm chí là “topical”. Bởi vì các chủ đề này đều có chung tiền tố “topic”. Đây là một điều cần lưu tâm nếu đang quen với chủ đề trong MQTT. Như vậy, cấu trúc phân cấp chủ đề của ZMQ mang tính hình thức để chủ đề dễ đọc, chứ nó không hỗ trợ lọc chủ đề như MQTT.

    Cũng do sử dụng chuỗi byte để so sánh, chủ đề trong ZMQ có phân biệt hoa – thường. Như vậy “topic” và “TOPIC” là hai chủ đề khác nhau. Cần lưu ý rằng, trong thông điệp của ZMQ, chủ đề luôn được lưu trong frame đầu tiên. Điều này sẽ ảnh hưởng đến cách publisher phát thông điệp và cách subscriber nhận thông điệp.

    Phát thông điệp ở publisher

    Trong ví dụ thực hành đã làm, code phát đi thông điệp ở publisher như sau:

    var message = ReadLine();
    publisher
        .SendMoreFrame("hello_topic") // Topic
        .SendFrame(message); // Message
    

    Đến đây chúng ta cần hiểu rõ hơn cách ZMQ và thư viện NetMQ đóng gói thông điệp.

    Mỗi thông điệp của ZMQ là một chuỗi byte. Chuỗi byte này được chia thành các phần rời rạc, gọi là frame. Mỗi frame bao gồm hai phần: kích thước phần dữ liệu và bản thân dữ liệu. Tức là mỗi thông điệp của ZMQ có thể hình dung là một mảng của các frame.

    Người dùng có quyền tạo ra số lượng frame tùy ý. Và mỗi frame cũng có thể chứa dữ liệu tùy ý, miễn là chúng ta phải chuyển dữ liệu đó được về dạng chuỗi byte.

    Mỗi frame được tạo ra (và đưa vào thông điệp ZMQ) qua lời gọi phương thức SendMoreFrame của socket. Riêng frame cuối cùng cần tạo qua phương thức SendFrame. Phương thức SendFrame cũng kết thúc việc tạo thông điệp và đưa nó vào hàng đợi chờ xử lý.

    Nếu thông điệp chỉ chứa một frame duy nhất thì chỉ cần gọi phương thức SendFrame là đủ. Đây là trường hợp chúng ta vừa gặp ở đoạn code gửi dữ liệu trong mô hình truy vấn / phản hồi.

    Việc xác định số lượng frame và cấu trúc dữ liệu chứa trong mỗi frame là nhiệm vụ của chương trình ứng dụng. ZeroMQ không quan tâm đến vấn đề này. Trong mô hình publisher / subscriber, thông điệp từ publisher phát đi cần có hai frame. Frame đầu tiên chứa chuỗi chủ đề. Frame thứ hai chứa dữ liệu payload. Để gửi đi thông điệp chứa nhiều frame như vậy, chúng ta cần gọi phương thức mở rộng SendMoreFrame trước, và kết thúc bằng phương thức SendFrame.

    Nhận thông điệp ở subscriber

    Do thông điệp của ZMQ chứa một mảng các frame, việc đọc dữ liệu cũng thực hiện theo từng frame. Nếu phía gửi phát đi thông điệp chứa bao nhiêu frame thì chúng ta cần gọi bấy nhiêu lần phương thức ReceiveFrame để lấy ra đủ chừng đó frame. Trong trường hợp socket request / response, thường request socket chỉ gửi đi 1 frame, nên chúng ta chỉ gọi 1 lần ReceiveFrame để lấy dữ liệu. Đối với socket publisher / subscriber, do publisher phát đi hai frame, chúng ta phải gọi ReceiveFrame hai lần. Lần thứ nhất để lấy chuỗi chủ đề. Lần thứ hai để lấy dữ liệu payload.

    while (true) {
               var topic = subscriber.ReceiveFrameString(); // lấy chủ đề
               var message = subscriber.ReceiveFrameString(); // lấy dữ liệu          
               WriteLine("#Received: {0} {1}", topic, message);
    }
    

    NetMQ tạo ra một số phương thức mở rộng để giúp đơn giản hóa việc đọc một số loại dữ liệu. Phương thức ReceiveFrameString lấy toàn bộ phần dữ liệu trong một frame và chuyển đổi nó thành chuỗi ký tự. Nếu cần nhận chuỗi byte, chúng ta có thể gọi phương thức ReceiveFrameBytes.

    Một cách khác để đọc toàn bộ dữ liệu trong thông điệp (từ tất cả các frame) là sử dụng phương thức ReceiveMultipartStrings hoặc ReceiveMultipartBytes. Các phương thức này sẽ đọc phần dữ liệu chứa trong tất cả các frame và đưa vào một danh sách dữ liệu tương ứng. Ví dụ ReceiveMultipartStrings sẽ lấy phần dữ liệu của tất cả các frame và đưa vào một đối tượng kiểu List<string>, với mỗi phần tử là dữ liệu của một frame. Cách này phù hợp hơn nếu các socket trao đổi nhiều frame trong mỗi thông điệp.

    Do subscriber thụ động nhận dữ liệu và không biết khi nào thông điệp sẽ tới, chúng ta đặt code nhận dữ liệu trong một vòng lặp vô tận. Một cách khác để xử lý thông điệp là sử dụng Proactor.

    Với socket subscriber chúng ta vẫn có thể áp dụng proactor, đặc biệt là khi xây dựng subscriber với giao diện đồ hoạ. Nếu không sử dụng proactor, chúng ta phải để code nhận và xử lý thông điệp của subscriber trong một luồng riêng để tránh khóa luồng thực thi chính, nơi vẽ ra giao diện đồ hoạ. Tương tự với socket response, trước khi kết thúc chương trình, chúng ta nên gọi phương thức Cleanup của NetMQConfig để chủ động hủy bỏ các luồng thực thi phụ. Điều này quan trọng đối với chương trình sử dụng giao diện đồ họa.

    + Nếu bạn thấy site hữu ích, trước khi rời đi hãy giúp đỡ site bằng một hành động nhỏ để site có thể phát triển và phục vụ bạn tốt hơn.
    + Nếu bạn thấy bài viết hữu ích, hãy giúp chia sẻ tới mọi người.
    + Nếu có thắc mắc hoặc cần trao đổi thêm, mời bạn viết trong phần thảo luận cuối trang.
    Cảm ơn bạn!

    Theo dõi
    Thông báo của
    guest

    0 Thảo luận
    Phản hồi nội tuyến
    Xem tất cả bình luận