Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #8988] Support dispatchBehindMilliseconds #8989

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1556,6 +1556,10 @@ public boolean checkInStoreByConsumeOffset(String topic, int queueId, long consu
public long dispatchBehindBytes() {
return this.reputMessageService.behind();
}
@Override
public long dispatchBehindMilliseconds() {
return this.reputMessageService.behindMs();
}

public long flushBehindBytes() {
if (this.messageStoreConfig.isTransientStorePoolEnable()) {
Expand Down Expand Up @@ -2793,6 +2797,7 @@ public void notifyMessageArriveIfNecessary(DispatchRequest dispatchRequest) {
class ReputMessageService extends ServiceThread {

protected volatile long reputFromOffset = 0;
protected volatile long currentReputTimestamp = System.currentTimeMillis();

public long getReputFromOffset() {
return reputFromOffset;
Expand All @@ -2802,6 +2807,10 @@ public void setReputFromOffset(long reputFromOffset) {
this.reputFromOffset = reputFromOffset;
}

public long getCurrentReputTimestamp() {
return currentReputTimestamp;
}

@Override
public void shutdown() {
for (int i = 0; i < 50 && this.isCommitLogAvailable(); i++) {
Expand All @@ -2824,6 +2833,15 @@ public long behind() {
return DefaultMessageStore.this.getConfirmOffset() - this.reputFromOffset;
}

public long behindMs() {
long lastCommitLogFileTimeStamp = System.currentTimeMillis();
MappedFile lastMappedFile = DefaultMessageStore.this.commitLog.getMappedFileQueue().getLastMappedFile();
if (lastMappedFile != null) {
lastCommitLogFileTimeStamp = lastMappedFile.getStoreTimestamp();
}
return Math.max(0, lastCommitLogFileTimeStamp - this.currentReputTimestamp);
}

public boolean isCommitLogAvailable() {
return this.reputFromOffset < getReputEndOffset();
}
Expand All @@ -2838,7 +2856,11 @@ public void doReput() {
this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());
this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
}
for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
boolean isCommitLogAvailable = isCommitLogAvailable();
if (!isCommitLogAvailable) {
currentReputTimestamp = System.currentTimeMillis();
}
for (boolean doNext = true; isCommitLogAvailable && doNext; ) {

SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);

Expand All @@ -2861,6 +2883,7 @@ public void doReput() {

if (dispatchRequest.isSuccess()) {
if (size > 0) {
currentReputTimestamp = dispatchRequest.getStoreTimestamp();
DefaultMessageStore.this.doDispatch(dispatchRequest);

if (!notifyMessageArriveInBatch) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,13 @@ CompletableFuture<QueryMessageResult> queryMessageAsync(final String topic, fina
*/
long dispatchBehindBytes();

/**
* Get number of the milliseconds that have been stored in commit log and not yet dispatched to consume queue.
*
* @return number of the milliseconds to dispatch.
*/
long dispatchBehindMilliseconds();

/**
* Flush the message store to persist all data.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,11 @@ public long dispatchBehindBytes() {
return next.dispatchBehindBytes();
}

@Override
public long dispatchBehindMilliseconds() {
return next.dispatchBehindMilliseconds();
}

@Override
public long flush() {
return next.flush();
Expand Down
Loading