26. tháng 2 2025
Trước bóng đá anh tiên, chúng ta cùng xem qua đoạn demo chính nohu95 thức bắn cá ăn xu online từ tài liệu.
1public static void main(String[] args) throws InterruptedException, MQClientException {
2 /*
3 * Tạo một đối tượng với tên nhóm consumer được chỉ định.
4 * Các consumer thuộc cùng loại sẽ tiêu thụ các loại tin nhắn giống nhau và có logic xử lý nhất quán.
5 * Nhóm consumer giúp dễ dàng đạt được cân bằng tải và khả năng chịu lỗi trong quá trình tiêu thụ tin nhắn.
6 * Lưu ý rằng các phiên bản consumer trong cùng một nhóm phải đăng ký hoàn toàn cùng một Topic.
7 * RocketMQ hỗ trợ hai chế độ tin nhắn: tiêu thụ tập hợp (Clustering) và phát sóng (Broadcasting).
8 */
9 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tên_nhóm_độc_quyền");
10 /*
11 * Chỉ định địa chỉ máy chủ tên.
12 * Địa chỉ dịch vụ tên có thể được chỉ định thông qua biến môi trường hoặc thiết lập tham số đối tượng.
13 * Ngoài ra, bạn có thể chỉ định địa chỉ máy chủ tên thông qua biến môi trường NAMESRV_ADDR.
14 */
15 consumer.setNamesrvAddr("dia-chi-ten-may-chu:9876");
16 /*
17 * Xác định nơi bắt đầu tiêu thụ trong trường hợp nhóm consumer là mới.
18 */
19 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
20 /*
21 * Đăng ký một hoặc nhiều chủ đề để tiêu thụ.
22 * Chủ đề và thẻ được chỉ định, biểu thức sau có thể là 'tag1 || tag2 || tag3'.
23 */
24 consumer.subscribe("TopicTest", "*");
25 /*
26 * Đăng ký phương thức gọi lại để thực thi khi tin nhắn được lấy từ broker.
27 */
28 consumer.registerMessageListener(new MessageListenerConcurrently() {
29 @Override
30 public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
31 ConsumeConcurrentlyContext context) {
32 System.out.printf("%s Nhận Tin Nhắn Mới: %s %n", Thread.currentThread().getName(), msgs);
33 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
34 }
35 });
36 /*
37 * Khởi động phiên bản consumer.
38 */
39 consumer.start();
40 System.out.printf("Consumer Đã Bắt Đầu.%n");
41}