RabbitMQ ack la gì

Ở trong một bài viết trước mình đã có tìm hiểu về Sidekiq - backgroud job, hiểu cách implement để đảm bảo tính tin cậy. Hôm nay nhân một người bạn có hỏi về một framework khác sử dụng RabbitMQ để làm queue, thấy khá thú vị nên dành chút thời gian đọc quá.

Ý tưởng là sẽ viết một

# create connection
co = Bunny.new
co.start
2 để đóng gói các task thành dạng message và gửi vào queue. Một
# create connection
co = Bunny.new
co.start
3 chạy ở chế độ backgroud sẽ pop task từ queue ra và xử lý. Ta có thể chạy nhiều worker để tăng tốc xử lý các task nếu cần. Nói chung ý tưởng tương tự bài trước, không có gì quá đặc biệt.

Ở đây mình sẽ dùng gem bunny để kết nối tới server RabbitMQ, đây là một gem khá phổ biến, có thể dùng cho cả producer và consumer.

1. Sender

Đầu tiên ta sẽ cần require thư viện vào để xài

#!/usr/bin/env ruby

require 'bunny'

Sau đó tạo connect tới RabbitMQ server, ở đây mình xài mặc định bằng việc kết nối với 127.0.0.1:5672

# create connection
co = Bunny.new
co.start

Và tạo một channel

# create queue
ch = co.create_channel

Để có thể gửi, ta sẽ cần khai báo một queue và sau đó publish message vào queue

q = ch.queue('hello')

msg = ARGV.empty? ? 'Hello World!' : ARGV.join(' ')
q.publish(msg, persistent: true)

puts" [x] Sent #{msg}"

Bình thường ta sẽ cần queue các task cần xử lý tốn thời gian như resize image hoặc render pdf, nhưng ở đây để đơn giản ta sẽ dùng method

# create connection
co = Bunny.new
co.start
4 để fake busy task. Thời gian sleep sẽ phụ thuộc vào số dấu chấm
# create connection
co = Bunny.new
co.start
5 của message truyền vào. Ví dụ nếu truyền vào
# create connection
co = Bunny.new
co.start
6 nghĩa là sẽ sleep 3 giây (giả sử là mất 3 giây để xử lý).

Cuối cùng ta có

# create connection
co = Bunny.new
co.start
2 như sau:

#!/usr/bin/env ruby

require 'bunny'

# create connection
co = Bunny.new
co.start

# create queue
ch = co.create_channel
q = ch.queue('hello')

msg = ARGV.empty? ? 'Hello World!' : ARGV.join(' ')
q.publish(msg, persistent: true)

puts" [x] Sent #{msg}"

  • Có nhiều loại exchange, nhưng ở đây ta dùng default exchange -> phần này sẽ được nói ở khúc sau.
  • Ngoài ra khi
    # create connection
    co = Bunny.new
    co.start
    
    8 mesasge, ta gán thêm metadata
    # create connection
    co = Bunny.new
    co.start
    
    9 để nói với RabbitMQ là sẽ persist message này xuống đĩa. Nhưng lưu ý là vẫn không đảm bảo 100% không mất message, lý do là RabbitMQ vẫn cần một khoảng thời gian nhỏ để chấp nhận message và có thể chưa lưu kịp xuống đĩa do RabbitMQ không sử dụng
    # create queue
    ch = co.create_channel
    
    0 với mọi message. Cơ chế persistent này không đủ mạnh nhưng đủ cho các task queue đơn giản.

2. Worker

Tương tự với producer, ta sẽ mở connect và channel và khai báo một queue để sử dụng, queue phải giống với producer ở phía trên. Lí do cần khai báo queue là có thể consumer sẽ start trước producer và ta cần đảm bảo là queue đã tồn tại trước khi consumer bắt đầu pop message.

#!/usr/bin/env ruby

require 'bunny'

co = Bunny.new
co.start

ch = co.create_channel
q = ch.queue('hello')

begin
  puts ' [*] Waiting for messages. To exit press CTRL+C'
  q.subscribe(block: true) do |_delivery_info, _properties, body|
    wait = body.count('.').to_i
    sleep wait
    puts " [x] Receive #{body} after #{wait} seconds"
  end
rescue Interrupt => _
  co.close
  exit(0)
end

Ở đây ta sẽ dùng

# create queue
ch = co.create_channel
1, nó tương tự như ở bài trước mình nói về blocking-queue, để trong trường hợp queue rỗng thread sẽ bị block và không trả về con trỏ trên terminal.

Một lợi điểm của task-queue đó là ta có thể start nhiều

# create connection
co = Bunny.new
co.start
3 để scale, RabbitMQ sẽ gửi tuần tự round-robin message tới từng worker.

Tuy nhiên, một task được xử lý bởi worker có thể mất một khoảng thời gian khá dài. Chuyện gì xảy ra nếu một worker bị crash giữa chừng trong khi xử lý message. Với đoạn code trên, sau khi RabbitMQ gửi message tới worker, nó sẽ ngay lập tức gán nhán cho message đó là delete và nếu worker crash thì ta mất hết số message chưa kịp xử lý.

RabbitMQ hỗ trợ một cơ chế gọi là message acknowledgment, một tín hiệu

# create queue
ch = co.create_channel
3 sẽ được gửi ngược lại RabbitMQ từ consumer để thông báo là message đã được nhận, được xử lý và RabbitMQ có thể xóa nó đi.

Nếu consumer chết (channel bị close, connection bị close hoặc TCP connection rớt kết nối), RabbitMQ sẽ hiểu là message chưa được xử lý thành công và nó sẽ requeue lại message đó, một consumer khác có thể pop message đó ra và xử lý bình thường.

Ta sẽ cần sửa lại phần subcribe phía trên với

# create queue
ch = co.create_channel
4:

q.subscribe(manunal_ack: true,  block: true) do |_delivery_info, _properties, body|
  wait = body.count('.').to_i
  sleep wait
  puts " [x] Receive #{body} after #{wait} seconds"
  ch.ack(delivery_info.delivery_tag)
end

Ta đã giải quyết được vấn đề là nếu consumer bị crash mà task cần xử lý vẫn không bị mất. Nhưng task vẫn sẽ bị mất nếu RabbitMQ bị crash. Ta cần giải quyết tiếp 2 vấn đề đó là:

  • Queue phải durable.
  • Message cũng phải durable.

Với queue, ở mỗi đầu producer và consumer cần khai báo

# create queue
ch = co.create_channel
5.

Còn với message, như ở trên

# create connection
co = Bunny.new
co.start
2 ta sẽ dùng thuộc tính
# create connection
co = Bunny.new
co.start
9 để đảm bảo một phần là message sẽ được lưu xuống đĩa.

Một vấn đề tiếp theo đối với việc xử lý round-robin trong worker đó là các worker có thể sẽ không hoạt động đồng đều. Ví dụ ta có những task với số thứ tự lẻ là các heavy task cần nhiều thời gian xử lý và các task với số thứ tự chẵn là các task nhẹ nhàng hơn. Vấn đề là sẽ có worker xui xẻo nhận toàn các heavy task và lúc nào cũng busy, nhưng sẽ có những worker lại rất nhàn dỗi. Lí do là RabbitMQ không biết được số lượng unacknowledged messages của consumer nên nó cứ gửi message tuần tự tới consumer.

Để giải quyết vấn đề này, ta sẽ dùng method

# create queue
ch = co.create_channel
8 để nói với RabbitMQ rằng không gửi hơn 1 message tới worker tại một thời điểm, hoặc không gửi message cho tới khi worker gửi lại tín hiệu ACK đã hoàn thành message trước đó. Thay vào đó RabbitMQ sẽ kiếm các worker nào đang nhàn rỗi và gửi message tới nó.

Cuối cùng

# create connection
co = Bunny.new
co.start
2 sẽ như sau:

#!/usr/bin/env ruby

require 'bunny'

# create connection
co = Bunny.new
co.start

# create queue
ch = co.create_channel
q = ch.queue('hello', durable: true)

msg = ARGV.empty? ? 'Hello World!' : ARGV.join(' ')
q.publish(msg, persistent: true)

puts" [x] Sent #{msg}"

co.close

# create connection
co = Bunny.new
co.start
3 sẽ như sau:

#!/usr/bin/env ruby

require 'bunny'

co = Bunny.new
co.start

ch = co.create_channel
q = ch.queue('hello', durable: true)

ch.prefetch(1)
puts ' [*] Waiting for messages. To exit press CTRL+C'

begin
  q.subscribe(manual_ack: true, block: true) do |_delivery_info, _properties, body|
    wait = body.count('.').to_i
    sleep wait
    puts " [x] Receive #{body} after #{wait} seconds"
    ch.ack(_delivery_info.delivery_tag)
  end
rescue Interrupt => _
  co.close
  exit(0)
end

3. Sneakers

Một người bạn giới thiệu cho mình về sneakers mình có đọc qua một xíu thì thấy bản chất cũng không khác gì cách sidekiq implement như ở bài trước mình có nói ngoại trừ việc dùng RabbitMQ làm queue. Thực ra cuối cùng để đảm bảo tính tin cậy thì vẫn phải cover đủ 3 layer dù có dùng cái gì đi chăng nữa:

  • Client để enqueue và queue phải đảm bảo handle việc network fail bằng 1 giải pháp như local queue như ở bài trước mình trao đổi.
  • Phải đảm bảo queue RabbitMQ phải luôn sống, với RabbitMQ thì mô hình HA có cluster tuy nhiên như ở phía trên thì việc
    # create connection
    co = Bunny.new
    co.start
    
    9 của rabbitmq vẫn không đảm bảo 100% không mất message => nói chung mình thấy redis cũng vậy nhưng redis thì đơn giản, dễ hiểu hơn.
  • Worker sneakers để đảm bảo tính tin cậy có thể dùng
    # create queue
    ch = co.create_channel
    
    3 như trong ví dụ phía trên, bản chất cũng tương tự cách dùng redis với
    q = ch.queue('hello')
    
    msg = ARGV.empty? ? 'Hello World!' : ARGV.join(' ')
    q.publish(msg, persistent: true)
    
    puts" [x] Sent #{msg}"
    
    3 để implement reliable queue => tuy nhiên là sidekiq tính phí phần này.

Một số lý do khác khiến tác giả viết sneakers thay vì dùng sidekiq như vấn đề về performance (xử lý đc 1000 req/s với ack + persisted so với 600 req/s của sidekiq), support all core thì tùy nhiều yếu tố để đánh đổi.

4. Exchange type

Nói thêm một chút xíu là message khi được gửi từ

# create connection
co = Bunny.new
co.start
2 hay producer sẽ không được gửi trực tiếp vào queue giống các message broker khác, thay vào đó nó sẽ được gửi tới exchange trước, sau đó sẽ được route tới queue tương ứng => routing chính là một trong các ưu điểm của RabbitMQ với các mesage broker khác. Hoạt động như hình phía dưới sau:

RabbitMQ ack la gì

Trong RabbitMQ cũng có nhiều loại exchange khác nhau như:

  • Direct
  • Fanout
  • Topic
  • Headers

Ở đây mình sẽ giải thích ngắn gọn và một số use-case của từng loại exchange. Chi tiết có thể đọc thêm trong document của RabbitMQ.

4.1 Fanout Exchange

Fanout exchange sẽ routing message tới tất cả các queue liên quan, nếu có N queue liên quan tới fanout exchange thì khi một message được publish vào exchange sẽ có N bản copy của message được gửi tới tất cả N queue, ý tưởng là một dạng broadcast routing.

RabbitMQ ack la gì

Fanout Exchange sẽ phù hợp với các bài toán dạng broadcast như các trang sport-news muốn distributing score update tới tất cả các client near-realtime (nghe có vẻ giống cách mà ForzaFootball đang làm) hoặc các ứng dụng chat nhóm cần distrubute message tới những người trong nhóm.

4.2 Direct Exchange

Direct exchange là loại exchange sẽ gửi message tới các queue dựa trên message routing key, ví dụ message dạng

q = ch.queue('hello')

msg = ARGV.empty? ? 'Hello World!' : ARGV.join(' ')
q.publish(msg, persistent: true)

puts" [x] Sent #{msg}"
5 sẽ được gửi tới queue
q = ch.queue('hello')

msg = ARGV.empty? ? 'Hello World!' : ARGV.join(' ')
q.publish(msg, persistent: true)

puts" [x] Sent #{msg}"
6, message dạng
q = ch.queue('hello')

msg = ARGV.empty? ? 'Hello World!' : ARGV.join(' ')
q.publish(msg, persistent: true)

puts" [x] Sent #{msg}"
7 sẽ được gửi tới
q = ch.queue('hello')

msg = ARGV.empty? ? 'Hello World!' : ARGV.join(' ')
q.publish(msg, persistent: true)

puts" [x] Sent #{msg}"
8.

RabbitMQ ack la gì

Direct exchange thường được dùng để distribute tasks giữa nhiều worker theo thuật toán round-robin.

Default exchange là direct exchange mà không có tên (nameless), đây là một trường hợp đặc biệt, message sẽ được gửi tới một queue có tên bằng routing-key của message. Có bao nhiêu message thì sẽ có bấy nhiêu queue và các queue này sẽ tự động liên kết với default-exchange với routing key trùng với queue-name.

Ở ví dụ trên ta sử dụng chính là default exchange.

Ví dụ với

# create connection
co = Bunny.new
co.start
2 như sau:

#!/usr/bin/env ruby

require 'bunny'

# create connection
co = Bunny.new
co.start

# create queue
ch = co.create_channel
q = ch.queue()

msg = ARGV.empty? ? 'Hello World!' : ARGV.join(' ')
ch.default_exchange.publish(msg, routing_key: q.name)

puts" [x] Sent #{msg}"

Ta sẽ gửi vài message khác nhau vào RabbitMQ:

# create connection
co = Bunny.new
co.start
0

Sau đó list queue trên như sau:

# create connection
co = Bunny.new
co.start
1

4.3 Topic Exchange

Topic exchange sẽ route messate tới một hoặc nhiều queue dựa trên việc matching giữa message routing key và một pattern nào đó được liên kết với queue.

Một ví dụ đó là việc routing tới các geographic location cụ thể nào đó. Ví dụ với routing key là

#!/usr/bin/env ruby

require 'bunny'

# create connection
co = Bunny.new
co.start

# create queue
ch = co.create_channel
q = ch.queue('hello')

msg = ARGV.empty? ? 'Hello World!' : ARGV.join(' ')
q.publish(msg, persistent: true)

puts" [x] Sent #{msg}"
0 message sẽ được gửi tới các queue như: