|
@@ -85,6 +85,7 @@ class OpenFileCtx {
|
|
|
private volatile boolean activeState;
|
|
|
// The stream write-back status. True means one thread is doing write back.
|
|
|
private volatile boolean asyncStatus;
|
|
|
+ private volatile long asyncWriteBackStartOffset;
|
|
|
|
|
|
/**
|
|
|
* The current offset of the file in HDFS. All the content before this offset
|
|
@@ -209,6 +210,7 @@ class OpenFileCtx {
|
|
|
updateLastAccessTime();
|
|
|
activeState = true;
|
|
|
asyncStatus = false;
|
|
|
+ asyncWriteBackStartOffset = 0;
|
|
|
dumpOut = null;
|
|
|
raf = null;
|
|
|
nonSequentialWriteInMemory = new AtomicLong(0);
|
|
@@ -582,6 +584,7 @@ class OpenFileCtx {
|
|
|
+ nextOffset.get());
|
|
|
}
|
|
|
asyncStatus = true;
|
|
|
+ asyncWriteBackStartOffset = writeCtx.getOffset();
|
|
|
asyncDataService.execute(new AsyncDataService.WriteBackTask(this));
|
|
|
} else {
|
|
|
if (LOG.isDebugEnabled()) {
|
|
@@ -911,9 +914,11 @@ class OpenFileCtx {
|
|
|
/** Invoked by AsynDataService to write back to HDFS */
|
|
|
void executeWriteBack() {
|
|
|
Preconditions.checkState(asyncStatus,
|
|
|
- "The openFileCtx has false async status");
|
|
|
+ "openFileCtx has false asyncStatus, fileId:" + latestAttr.getFileid());
|
|
|
+ final long startOffset = asyncWriteBackStartOffset;
|
|
|
try {
|
|
|
while (activeState) {
|
|
|
+ // asyncStatus could be changed to false in offerNextToWrite()
|
|
|
WriteCtx toWrite = offerNextToWrite();
|
|
|
if (toWrite != null) {
|
|
|
// Do the write
|
|
@@ -929,8 +934,18 @@ class OpenFileCtx {
|
|
|
+ latestAttr.getFileId());
|
|
|
}
|
|
|
} finally {
|
|
|
- // make sure we reset asyncStatus to false
|
|
|
- asyncStatus = false;
|
|
|
+ // Make sure to reset asyncStatus to false unless a race happens
|
|
|
+ synchronized (this) {
|
|
|
+ if (startOffset == asyncWriteBackStartOffset) {
|
|
|
+ asyncStatus = false;
|
|
|
+ } else {
|
|
|
+ LOG.info("Another asyn task is already started before this one"
|
|
|
+ + " is finalized. fileId:" + latestAttr.getFileid()
|
|
|
+ + " asyncStatus:" + asyncStatus + " original startOffset:"
|
|
|
+ + startOffset + " new startOffset:" + asyncWriteBackStartOffset
|
|
|
+ + ". Won't change asyncStatus here.");
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|