RabbitMQ ack la gì
Ở trong một bài viết trước mình đã có tìm hiểu về Sidekiq - backgroud job, hiểu cách implement để đảm bảo tính tin cậy. Hôm nay nhân một người bạn có hỏi về một framework khác sử dụng RabbitMQ để làm queue, thấy khá thú vị nên dành chút thời gian đọc quá. Ý tưởng là sẽ viết một 2 để đóng gói các task thành dạng message và gửi vào queue. Một 3 chạy ở chế độ backgroud sẽ pop task từ queue ra và xử lý. Ta có thể chạy nhiều worker để tăng tốc xử lý các task nếu cần. Nói chung ý tưởng tương tự bài trước, không có gì quá đặc biệt.Ở đây mình sẽ dùng gem bunny để kết nối tới server RabbitMQ, đây là một gem khá phổ biến, có thể dùng cho cả producer và consumer. 1. SenderĐầu tiên ta sẽ cần require thư viện vào để xài
Sau đó tạo connect tới RabbitMQ server, ở đây mình xài mặc định bằng việc kết nối với 127.0.0.1:5672
Và tạo một channel
Để có thể gửi, ta sẽ cần khai báo một queue và sau đó publish message vào queue
Bình thường ta sẽ cần queue các task cần xử lý tốn thời gian như resize image hoặc render pdf, nhưng ở đây để đơn giản ta sẽ dùng method 4 để fake busy task. Thời gian sleep sẽ phụ thuộc vào số dấu chấm 5 của message truyền vào. Ví dụ nếu truyền vào 6 nghĩa là sẽ sleep 3 giây (giả sử là mất 3 giây để xử lý).Cuối cùng ta có 2 như sau:
2. WorkerTương tự với producer, ta sẽ mở connect và channel và khai báo một queue để sử dụng, queue phải giống với producer ở phía trên. Lí do cần khai báo queue là có thể consumer sẽ start trước producer và ta cần đảm bảo là queue đã tồn tại trước khi consumer bắt đầu pop message.
Ở đây ta sẽ dùng 1, nó tương tự như ở bài trước mình nói về blocking-queue, để trong trường hợp queue rỗng thread sẽ bị block và không trả về con trỏ trên terminal.Một lợi điểm của task-queue đó là ta có thể start nhiều 3 để scale, RabbitMQ sẽ gửi tuần tự round-robin message tới từng worker.Tuy nhiên, một task được xử lý bởi worker có thể mất một khoảng thời gian khá dài. Chuyện gì xảy ra nếu một worker bị crash giữa chừng trong khi xử lý message. Với đoạn code trên, sau khi RabbitMQ gửi message tới worker, nó sẽ ngay lập tức gán nhán cho message đó là delete và nếu worker crash thì ta mất hết số message chưa kịp xử lý. RabbitMQ hỗ trợ một cơ chế gọi là message acknowledgment, một tín hiệu 3 sẽ được gửi ngược lại RabbitMQ từ consumer để thông báo là message đã được nhận, được xử lý và RabbitMQ có thể xóa nó đi.Nếu consumer chết (channel bị close, connection bị close hoặc TCP connection rớt kết nối), RabbitMQ sẽ hiểu là message chưa được xử lý thành công và nó sẽ requeue lại message đó, một consumer khác có thể pop message đó ra và xử lý bình thường. Ta sẽ cần sửa lại phần subcribe phía trên với 4:
Ta đã giải quyết được vấn đề là nếu consumer bị crash mà task cần xử lý vẫn không bị mất. Nhưng task vẫn sẽ bị mất nếu RabbitMQ bị crash. Ta cần giải quyết tiếp 2 vấn đề đó là:
Với queue, ở mỗi đầu producer và consumer cần khai báo 5.Còn với message, như ở trên 2 ta sẽ dùng thuộc tính 9 để đảm bảo một phần là message sẽ được lưu xuống đĩa.Một vấn đề tiếp theo đối với việc xử lý round-robin trong worker đó là các worker có thể sẽ không hoạt động đồng đều. Ví dụ ta có những task với số thứ tự lẻ là các heavy task cần nhiều thời gian xử lý và các task với số thứ tự chẵn là các task nhẹ nhàng hơn. Vấn đề là sẽ có worker xui xẻo nhận toàn các heavy task và lúc nào cũng busy, nhưng sẽ có những worker lại rất nhàn dỗi. Lí do là RabbitMQ không biết được số lượng unacknowledged messages của consumer nên nó cứ gửi message tuần tự tới consumer. Để giải quyết vấn đề này, ta sẽ dùng method 8 để nói với RabbitMQ rằng không gửi hơn 1 message tới worker tại một thời điểm, hoặc không gửi message cho tới khi worker gửi lại tín hiệu ACK đã hoàn thành message trước đó. Thay vào đó RabbitMQ sẽ kiếm các worker nào đang nhàn rỗi và gửi message tới nó.Cuối cùng 2 sẽ như sau:
Và 3 sẽ như sau:
3. SneakersMột người bạn giới thiệu cho mình về sneakers mình có đọc qua một xíu thì thấy bản chất cũng không khác gì cách sidekiq implement như ở bài trước mình có nói ngoại trừ việc dùng RabbitMQ làm queue. Thực ra cuối cùng để đảm bảo tính tin cậy thì vẫn phải cover đủ 3 layer dù có dùng cái gì đi chăng nữa:
Một số lý do khác khiến tác giả viết sneakers thay vì dùng sidekiq như vấn đề về performance (xử lý đc 1000 req/s với ack + persisted so với 600 req/s của sidekiq), support all core thì tùy nhiều yếu tố để đánh đổi. 4. Exchange typeNói thêm một chút xíu là message khi được gửi từ 2 hay producer sẽ không được gửi trực tiếp vào queue giống các message broker khác, thay vào đó nó sẽ được gửi tới exchange trước, sau đó sẽ được route tới queue tương ứng => routing chính là một trong các ưu điểm của RabbitMQ với các mesage broker khác. Hoạt động như hình phía dưới sau:Trong RabbitMQ cũng có nhiều loại exchange khác nhau như:
Ở đây mình sẽ giải thích ngắn gọn và một số use-case của từng loại exchange. Chi tiết có thể đọc thêm trong document của RabbitMQ. 4.1 Fanout ExchangeFanout exchange sẽ routing message tới tất cả các queue liên quan, nếu có N queue liên quan tới fanout exchange thì khi một message được publish vào exchange sẽ có N bản copy của message được gửi tới tất cả N queue, ý tưởng là một dạng broadcast routing. Fanout Exchange sẽ phù hợp với các bài toán dạng broadcast như các trang sport-news muốn distributing score update tới tất cả các client near-realtime (nghe có vẻ giống cách mà ForzaFootball đang làm) hoặc các ứng dụng chat nhóm cần distrubute message tới những người trong nhóm. 4.2 Direct ExchangeDirect exchange là loại exchange sẽ gửi message tới các queue dựa trên message routing key, ví dụ message dạng 5 sẽ được gửi tới queue 6, message dạng 7 sẽ được gửi tới 8.Direct exchange thường được dùng để distribute tasks giữa nhiều worker theo thuật toán round-robin. Default exchange là direct exchange mà không có tên (nameless), đây là một trường hợp đặc biệt, message sẽ được gửi tới một queue có tên bằng routing-key của message. Có bao nhiêu message thì sẽ có bấy nhiêu queue và các queue này sẽ tự động liên kết với default-exchange với routing key trùng với queue-name. Ở ví dụ trên ta sử dụng chính là default exchange. Ví dụ với 2 như sau:
Ta sẽ gửi vài message khác nhau vào RabbitMQ: 0Sau đó list queue trên như sau: 14.3 Topic ExchangeTopic exchange sẽ route messate tới một hoặc nhiều queue dựa trên việc matching giữa message routing key và một pattern nào đó được liên kết với queue. Một ví dụ đó là việc routing tới các geographic location cụ thể nào đó. Ví dụ với routing key là 0 message sẽ được gửi tới các queue như: |