------ Vậy nên kích thước file vào khoảng 5 - nổ hũ mạt chược

/imgposts/5ruy18dk.jpg

ConsumeQueue thực chất là vị trí của các tin nhắn thuộc một topic cụ thể nằm ở đâu trong CommitLog, và nó có kích thước cố định.

1234

| ``` // Kích thước file ConsumeQueue, mặc định là 300.000 đơn vị lưu trữ private int mapedFileSizeConsumeQueue = 300000 * ConsumeQueue.CQ_STORE_UNIT_SIZE; public static final int CQ_STORE_UNIT_SIZE = 20;


---|---  

Vậy nên kích thước file vào khoảng 5.7MB.

!5udpag  
Quá trình xây dựng ConsumeQueue được thực hiện thông qua phương thức `doReput` của lớp `org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService`, và việc khởi động `reputFromOffset` được cài đặt và kích hoạt bởi đoạn mã sau trong `org.apache.rocketmq.store.DefaultMessageStore#start`.

1234


| ```
log.info("[SetReputOffset] maxPhysicalPosInLogicQueue={} clMinOffset={} clMaxOffset={} clConfirmedOffset={}",
        maxPhysicalPosInLogicQueue, this.commitLog.getMinOffset(), this.commitLog.getMaxOffset(), this.commitLog.getConfirmOffset());
this.reputMessageService.setReputFromOffset(maxPhysicalPosInLogicQueue);
this.reputMessageService.start();

---|---

Bây giờ hãy xem logic của phương thức doReput.

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677

| ``` private void doReput() { if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) { log.warn("reputFromOffset={} nhỏ hơn minPyOffset={}, điều này thường chỉ ra rằng dispatch bị chậm quá nhiều và commitlog đã hết hạn.", this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset()); this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset(); } for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) { if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) { break; } SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset); if (result != null) { try { this.reputFromOffset = result.getStartOffset(); for (int readSize = 0; readSize < result.getSize() && doNext; ) { DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false); int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize(); if (dispatchRequest.isSuccess()) { if (size > 0) { DefaultMessageStore.this.doDispatch(dispatchRequest); if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) { DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(), dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1, dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(), dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap()); } this.reputFromOffset += size; readSize += size; if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) { DefaultMessageStore.this.storeStatsService .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet(); DefaultMessageStore.this.storeStatsService .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic()) .addAndGet(dispatchRequest.getMsgSize()); } } else if (size == 0) { this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset); readSize = result.getSize(); } } else if (!dispatchRequest.isSuccess()) { if (size > 0) { log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset); this.reputFromOffset += size; } else { doNext = false; if (DefaultMessageStore.this.getMessageStoreConfig().isEnableDLegerCommitLog() || DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) { log.error("[BUG]dispatch message to consume queue error, COMMITLOG OFFSET: {}", this.reputFromOffset); this.reputFromOffset += result.getSize() - readSize; } } } } } finally { result.release(); } } else { doNext = false; } } }


---|---  

Logic phân phối dừng lại tại đây.

1234567891011121314151617181920


| ```
class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {
    @Override
    public void dispatch(DispatchRequest request) {
        final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
        switch (tranType) {
            case MessageSysFlag.TRANSACTION_NOT_TYPE:
            case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                DefaultMessageStore.this.putMessagePositionInfo(request);
                break;
            case MessageSysFlag.TRANSACTION_PREPARED_TYPE: [nổ hũ mạt chược](/blog/2f2ab9c50c37df85.html) 
            case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                break;
        }
    }
}

public void putMessagePositionInfo(DispatchRequest dispatchRequest) {
    ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());
    cq.putMessagePositionInfoWrapper(dispatchRequest);
}

---|--- the thao 24h

Thực tế việc lưu trữ diễn ra ở đây.

12345678910111213

| ``` private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode, final long cqOffset) { if (offset + size <= this.maxPhysicOffset) { log.warn("Có thể đang cố gắng xây dựng hàng đợi tiêu thụ lặp lại maxPhysicOffset={} phyOffset={}", maxPhysicOffset, offset); return true; } this.byteBufferIndex.flip(); this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE); this.byteBufferIndex.putLong(offset); this.byteBufferIndex.putInt(size); this.byteBufferIndex.putLong(tagsCode); }


---|---  

Từ đây chúng ta cũng có thể thấy định dạng lưu trữ của ConsumeQueue:

[]( "AA6Tve")

Vị trí tin nhắn, kích thước tin nhắn, và giá trị hash của tag.