|
@@ -48,6 +48,7 @@ import org.apache.hadoop.nfs.nfs3.Nfs3Constant.WriteStableHow;
|
|
|
import org.apache.hadoop.nfs.nfs3.Nfs3FileAttributes;
|
|
|
import org.apache.hadoop.nfs.nfs3.Nfs3Status;
|
|
|
import org.apache.hadoop.nfs.nfs3.request.WRITE3Request;
|
|
|
+import org.apache.hadoop.nfs.nfs3.response.COMMIT3Response;
|
|
|
import org.apache.hadoop.nfs.nfs3.response.WRITE3Response;
|
|
|
import org.apache.hadoop.nfs.nfs3.response.WccAttr;
|
|
|
import org.apache.hadoop.nfs.nfs3.response.WccData;
|
|
@@ -69,12 +70,18 @@ class OpenFileCtx {
|
|
|
// Pending writes water mark for dump, 1MB
|
|
|
private static long DUMP_WRITE_WATER_MARK = 1024 * 1024;
|
|
|
|
|
|
- public final static int COMMIT_FINISHED = 0;
|
|
|
- public final static int COMMIT_WAIT = 1;
|
|
|
- public final static int COMMIT_INACTIVE_CTX = 2;
|
|
|
- public final static int COMMIT_INACTIVE_WITH_PENDING_WRITE = 3;
|
|
|
- public final static int COMMIT_ERROR = 4;
|
|
|
+ static enum COMMIT_STATUS {
|
|
|
+ COMMIT_FINISHED,
|
|
|
+ COMMIT_WAIT,
|
|
|
+ COMMIT_INACTIVE_CTX,
|
|
|
+ COMMIT_INACTIVE_WITH_PENDING_WRITE,
|
|
|
+ COMMIT_ERROR,
|
|
|
+ COMMIT_DO_SYNC;
|
|
|
+ }
|
|
|
|
|
|
+ private final DFSClient client;
|
|
|
+ private final IdUserGroup iug;
|
|
|
+
|
|
|
// The stream status. False means the stream is closed.
|
|
|
private volatile boolean activeState;
|
|
|
// The stream write-back status. True means one thread is doing write back.
|
|
@@ -87,11 +94,58 @@ class OpenFileCtx {
|
|
|
private AtomicLong nextOffset;
|
|
|
private final HdfsDataOutputStream fos;
|
|
|
|
|
|
- // TODO: make it mutable and update it after each writing back to HDFS
|
|
|
- private final Nfs3FileAttributes latestAttr;
|
|
|
+ // It's updated after each sync to HDFS
|
|
|
+ private Nfs3FileAttributes latestAttr;
|
|
|
|
|
|
private final ConcurrentNavigableMap<OffsetRange, WriteCtx> pendingWrites;
|
|
|
|
|
|
+ private final ConcurrentNavigableMap<Long, CommitCtx> pendingCommits;
|
|
|
+
|
|
|
+ static class CommitCtx {
|
|
|
+ private final long offset;
|
|
|
+ private final Channel channel;
|
|
|
+ private final int xid;
|
|
|
+ private final Nfs3FileAttributes preOpAttr;
|
|
|
+
|
|
|
+ // Remember time for debug purpose
|
|
|
+ private final long startTime;
|
|
|
+
|
|
|
+ long getOffset() {
|
|
|
+ return offset;
|
|
|
+ }
|
|
|
+
|
|
|
+ Channel getChannel() {
|
|
|
+ return channel;
|
|
|
+ }
|
|
|
+
|
|
|
+ int getXid() {
|
|
|
+ return xid;
|
|
|
+ }
|
|
|
+
|
|
|
+ Nfs3FileAttributes getPreOpAttr() {
|
|
|
+ return preOpAttr;
|
|
|
+ }
|
|
|
+
|
|
|
+ long getStartTime() {
|
|
|
+ return startTime;
|
|
|
+ }
|
|
|
+
|
|
|
+ CommitCtx(long offset, Channel channel, int xid,
|
|
|
+ Nfs3FileAttributes preOpAttr) {
|
|
|
+ this.offset = offset;
|
|
|
+ this.channel = channel;
|
|
|
+ this.xid = xid;
|
|
|
+ this.preOpAttr = preOpAttr;
|
|
|
+ this.startTime = System.currentTimeMillis();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public String toString() {
|
|
|
+ return String.format("offset: %d xid: %d startTime: %d", offset, xid,
|
|
|
+ startTime);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
// The last write, commit request or write-back event. Updating time to keep
|
|
|
// output steam alive.
|
|
|
private long lastAccessTime;
|
|
@@ -130,7 +184,7 @@ class OpenFileCtx {
|
|
|
}
|
|
|
|
|
|
OpenFileCtx(HdfsDataOutputStream fos, Nfs3FileAttributes latestAttr,
|
|
|
- String dumpFilePath) {
|
|
|
+ String dumpFilePath, DFSClient client, IdUserGroup iug) {
|
|
|
this.fos = fos;
|
|
|
this.latestAttr = latestAttr;
|
|
|
// We use the ReverseComparatorOnMin as the comparator of the map. In this
|
|
@@ -138,6 +192,9 @@ class OpenFileCtx {
|
|
|
// retrieve the last element to write back to HDFS.
|
|
|
pendingWrites = new ConcurrentSkipListMap<OffsetRange, WriteCtx>(
|
|
|
OffsetRange.ReverseComparatorOnMin);
|
|
|
+
|
|
|
+ pendingCommits = new ConcurrentSkipListMap<Long, CommitCtx>();
|
|
|
+
|
|
|
updateLastAccessTime();
|
|
|
activeState = true;
|
|
|
asyncStatus = false;
|
|
@@ -153,6 +210,8 @@ class OpenFileCtx {
|
|
|
assert(nextOffset.get() == this.fos.getPos());
|
|
|
} catch (IOException e) {}
|
|
|
dumpThread = null;
|
|
|
+ this.client = client;
|
|
|
+ this.iug = iug;
|
|
|
}
|
|
|
|
|
|
public Nfs3FileAttributes getLatestAttr() {
|
|
@@ -547,19 +606,23 @@ class OpenFileCtx {
|
|
|
// of reordered writes and won't send more writes until it gets
|
|
|
// responses of the previous batch. So here send response immediately
|
|
|
// for unstable non-sequential write
|
|
|
- if (request.getStableHow() == WriteStableHow.UNSTABLE) {
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("UNSTABLE write request, send response for offset: "
|
|
|
- + writeCtx.getOffset());
|
|
|
- }
|
|
|
- WccData fileWcc = new WccData(preOpAttr, latestAttr);
|
|
|
- WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
|
|
|
- fileWcc, count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
|
|
|
- Nfs3Utils
|
|
|
- .writeChannel(channel, response.writeHeaderAndResponse(new XDR(),
|
|
|
- xid, new VerifierNone()), xid);
|
|
|
- writeCtx.setReplied(true);
|
|
|
+ if (stableHow != WriteStableHow.UNSTABLE) {
|
|
|
+ LOG.info("Have to change stable write to unstable write:"
|
|
|
+ + request.getStableHow());
|
|
|
+ stableHow = WriteStableHow.UNSTABLE;
|
|
|
}
|
|
|
+
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("UNSTABLE write request, send response for offset: "
|
|
|
+ + writeCtx.getOffset());
|
|
|
+ }
|
|
|
+ WccData fileWcc = new WccData(preOpAttr, latestAttr);
|
|
|
+ WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
|
|
|
+ fileWcc, count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
|
|
|
+ Nfs3Utils
|
|
|
+ .writeChannel(channel, response.writeHeaderAndResponse(new XDR(),
|
|
|
+ xid, new VerifierNone()), xid);
|
|
|
+ writeCtx.setReplied(true);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -637,19 +700,52 @@ class OpenFileCtx {
|
|
|
return response;
|
|
|
}
|
|
|
|
|
|
+ public COMMIT_STATUS checkCommit(DFSClient dfsClient, long commitOffset,
|
|
|
+ Channel channel, int xid, Nfs3FileAttributes preOpAttr) {
|
|
|
+ // Keep stream active
|
|
|
+ updateLastAccessTime();
|
|
|
+ Preconditions.checkState(commitOffset >= 0);
|
|
|
+
|
|
|
+ COMMIT_STATUS ret = checkCommitInternal(commitOffset, channel, xid,
|
|
|
+ preOpAttr);
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Got commit status: " + ret.name());
|
|
|
+ }
|
|
|
+ // Do the sync outside the lock
|
|
|
+ if (ret == COMMIT_STATUS.COMMIT_DO_SYNC) {
|
|
|
+ try {
|
|
|
+ // Sync file data and length
|
|
|
+ fos.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
|
|
|
+ // Nothing to do for metadata since attr related change is pass-through
|
|
|
+ } catch (ClosedChannelException cce) {
|
|
|
+ if (pendingWrites.isEmpty()) {
|
|
|
+ ret = COMMIT_STATUS.COMMIT_FINISHED;
|
|
|
+ } else {
|
|
|
+ ret = COMMIT_STATUS.COMMIT_ERROR;
|
|
|
+ }
|
|
|
+ } catch (IOException e) {
|
|
|
+ LOG.error("Got stream error during data sync:" + e);
|
|
|
+ // Do nothing. Stream will be closed eventually by StreamMonitor.
|
|
|
+ // status = Nfs3Status.NFS3ERR_IO;
|
|
|
+ ret = COMMIT_STATUS.COMMIT_ERROR;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return ret;
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* return one commit status: COMMIT_FINISHED, COMMIT_WAIT,
|
|
|
- * COMMIT_INACTIVE_CTX, COMMIT_ERROR
|
|
|
+ * COMMIT_INACTIVE_CTX, COMMIT_INACTIVE_WITH_PENDING_WRITE, COMMIT_ERROR
|
|
|
*/
|
|
|
- public int checkCommit(long commitOffset) {
|
|
|
- return activeState ? checkCommitInternal(commitOffset)
|
|
|
- : COMMIT_INACTIVE_CTX;
|
|
|
- }
|
|
|
-
|
|
|
- private int checkCommitInternal(long commitOffset) {
|
|
|
- if (commitOffset == 0) {
|
|
|
- // Commit whole file
|
|
|
- commitOffset = nextOffset.get();
|
|
|
+ private synchronized COMMIT_STATUS checkCommitInternal(long commitOffset,
|
|
|
+ Channel channel, int xid, Nfs3FileAttributes preOpAttr) {
|
|
|
+ if (!activeState) {
|
|
|
+ if (pendingWrites.isEmpty()) {
|
|
|
+ return COMMIT_STATUS.COMMIT_INACTIVE_CTX;
|
|
|
+ } else {
|
|
|
+ // TODO: return success if already committed
|
|
|
+ return COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
long flushed = 0;
|
|
@@ -657,39 +753,38 @@ class OpenFileCtx {
|
|
|
flushed = getFlushedOffset();
|
|
|
} catch (IOException e) {
|
|
|
LOG.error("Can't get flushed offset, error:" + e);
|
|
|
- return COMMIT_ERROR;
|
|
|
+ return COMMIT_STATUS.COMMIT_ERROR;
|
|
|
}
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("getFlushedOffset=" + flushed + " commitOffset=" + commitOffset);
|
|
|
}
|
|
|
- if (flushed < commitOffset) {
|
|
|
- // Keep stream active
|
|
|
- updateLastAccessTime();
|
|
|
- return COMMIT_WAIT;
|
|
|
- }
|
|
|
|
|
|
- int ret = COMMIT_WAIT;
|
|
|
- try {
|
|
|
- // Sync file data and length
|
|
|
- fos.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
|
|
|
- // Nothing to do for metadata since attr related change is pass-through
|
|
|
- ret = COMMIT_FINISHED;
|
|
|
- } catch (ClosedChannelException cce) {
|
|
|
- ret = COMMIT_INACTIVE_CTX;
|
|
|
- if (pendingWrites.isEmpty()) {
|
|
|
- ret = COMMIT_INACTIVE_CTX;
|
|
|
+ if (commitOffset > 0) {
|
|
|
+ if (commitOffset > flushed) {
|
|
|
+ CommitCtx commitCtx = new CommitCtx(commitOffset, channel, xid,
|
|
|
+ preOpAttr);
|
|
|
+ pendingCommits.put(commitOffset, commitCtx);
|
|
|
+ return COMMIT_STATUS.COMMIT_WAIT;
|
|
|
} else {
|
|
|
- ret = COMMIT_INACTIVE_WITH_PENDING_WRITE;
|
|
|
+ return COMMIT_STATUS.COMMIT_DO_SYNC;
|
|
|
}
|
|
|
- } catch (IOException e) {
|
|
|
- LOG.error("Got stream error during data sync:" + e);
|
|
|
- // Do nothing. Stream will be closed eventually by StreamMonitor.
|
|
|
- ret = COMMIT_ERROR;
|
|
|
}
|
|
|
|
|
|
- // Keep stream active
|
|
|
- updateLastAccessTime();
|
|
|
- return ret;
|
|
|
+ Entry<OffsetRange, WriteCtx> key = pendingWrites.firstEntry();
|
|
|
+
|
|
|
+ // Commit whole file, commitOffset == 0
|
|
|
+ if (pendingWrites.isEmpty()) {
|
|
|
+ // Note that, there is no guarantee data is synced. TODO: We could still
|
|
|
+ // do a sync here though the output stream might be closed.
|
|
|
+ return COMMIT_STATUS.COMMIT_FINISHED;
|
|
|
+ } else {
|
|
|
+ // Insert commit
|
|
|
+ long maxOffset = key.getKey().getMax() - 1;
|
|
|
+ Preconditions.checkState(maxOffset > 0);
|
|
|
+ CommitCtx commitCtx = new CommitCtx(maxOffset, channel, xid, preOpAttr);
|
|
|
+ pendingCommits.put(maxOffset, commitCtx);
|
|
|
+ return COMMIT_STATUS.COMMIT_WAIT;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
private void addWrite(WriteCtx writeCtx) {
|
|
@@ -734,8 +829,18 @@ class OpenFileCtx {
|
|
|
LOG.debug("The asyn write task has no pending writes, fileId: "
|
|
|
+ latestAttr.getFileId());
|
|
|
}
|
|
|
+ // process pending commit again to handle this race: a commit is added
|
|
|
+ // to pendingCommits map just after the last doSingleWrite returns.
|
|
|
+ // There is no pending write and the commit should be handled by the
|
|
|
+ // last doSingleWrite. Due to the race, the commit is left along and
|
|
|
+ // can't be processed until cleanup. Therefore, we should do another
|
|
|
+ // processCommits to fix the race issue.
|
|
|
+ processCommits(nextOffset.get()); // nextOffset has same value as
|
|
|
+ // flushedOffset
|
|
|
this.asyncStatus = false;
|
|
|
- } else {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
Entry<OffsetRange, WriteCtx> lastEntry = pendingWrites.lastEntry();
|
|
|
OffsetRange range = lastEntry.getKey();
|
|
|
WriteCtx toWrite = lastEntry.getValue();
|
|
@@ -750,6 +855,7 @@ class OpenFileCtx {
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("The next sequencial write has not arrived yet");
|
|
|
}
|
|
|
+ processCommits(nextOffset.get()); // handle race
|
|
|
this.asyncStatus = false;
|
|
|
} else if (range.getMin() < offset && range.getMax() > offset) {
|
|
|
// shouldn't happen since we do sync for overlapped concurrent writers
|
|
@@ -757,6 +863,7 @@ class OpenFileCtx {
|
|
|
+ range.getMax() + "), nextOffset=" + offset
|
|
|
+ ". Silently drop it now");
|
|
|
pendingWrites.remove(range);
|
|
|
+ processCommits(nextOffset.get()); // handle race
|
|
|
} else {
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("Remove write(" + range.getMin() + "-" + range.getMax()
|
|
@@ -771,7 +878,7 @@ class OpenFileCtx {
|
|
|
}
|
|
|
return toWrite;
|
|
|
}
|
|
|
- }
|
|
|
+
|
|
|
return null;
|
|
|
}
|
|
|
|
|
@@ -793,7 +900,7 @@ class OpenFileCtx {
|
|
|
|
|
|
if (!activeState && LOG.isDebugEnabled()) {
|
|
|
LOG.debug("The openFileCtx is not active anymore, fileId: "
|
|
|
- + +latestAttr.getFileId());
|
|
|
+ + latestAttr.getFileId());
|
|
|
}
|
|
|
} finally {
|
|
|
// make sure we reset asyncStatus to false
|
|
@@ -801,6 +908,71 @@ class OpenFileCtx {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private void processCommits(long offset) {
|
|
|
+ Preconditions.checkState(offset > 0);
|
|
|
+ long flushedOffset = 0;
|
|
|
+ Entry<Long, CommitCtx> entry = null;
|
|
|
+
|
|
|
+ int status = Nfs3Status.NFS3ERR_IO;
|
|
|
+ try {
|
|
|
+ flushedOffset = getFlushedOffset();
|
|
|
+ entry = pendingCommits.firstEntry();
|
|
|
+ if (entry == null || entry.getValue().offset > flushedOffset) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // Now do sync for the ready commits
|
|
|
+ // Sync file data and length
|
|
|
+ fos.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
|
|
|
+ status = Nfs3Status.NFS3_OK;
|
|
|
+ } catch (ClosedChannelException cce) {
|
|
|
+ if (!pendingWrites.isEmpty()) {
|
|
|
+ LOG.error("Can't sync for fileId: " + latestAttr.getFileId()
|
|
|
+ + ". Channel closed with writes pending");
|
|
|
+ }
|
|
|
+ status = Nfs3Status.NFS3ERR_IO;
|
|
|
+ } catch (IOException e) {
|
|
|
+ LOG.error("Got stream error during data sync:" + e);
|
|
|
+ // Do nothing. Stream will be closed eventually by StreamMonitor.
|
|
|
+ status = Nfs3Status.NFS3ERR_IO;
|
|
|
+ }
|
|
|
+
|
|
|
+ // Update latestAttr
|
|
|
+ try {
|
|
|
+ latestAttr = Nfs3Utils.getFileAttr(client,
|
|
|
+ Nfs3Utils.getFileIdPath(latestAttr.getFileId()), iug);
|
|
|
+ } catch (IOException e) {
|
|
|
+ LOG.error("Can't get new file attr for fileId: " + latestAttr.getFileId());
|
|
|
+ status = Nfs3Status.NFS3ERR_IO;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (latestAttr.getSize() != offset) {
|
|
|
+ LOG.error("After sync, the expect file size: " + offset
|
|
|
+ + ", however actual file size is: " + latestAttr.getSize());
|
|
|
+ status = Nfs3Status.NFS3ERR_IO;
|
|
|
+ }
|
|
|
+ WccData wccData = new WccData(Nfs3Utils.getWccAttr(latestAttr), latestAttr);
|
|
|
+
|
|
|
+ // Send response for the ready commits
|
|
|
+ while (entry != null && entry.getValue().offset <= flushedOffset) {
|
|
|
+ pendingCommits.remove(entry.getKey());
|
|
|
+ CommitCtx commit = entry.getValue();
|
|
|
+
|
|
|
+ COMMIT3Response response = new COMMIT3Response(status, wccData,
|
|
|
+ Nfs3Constant.WRITE_COMMIT_VERF);
|
|
|
+ Nfs3Utils.writeChannelCommit(commit.getChannel(), response
|
|
|
+ .writeHeaderAndResponse(new XDR(), commit.getXid(),
|
|
|
+ new VerifierNone()), commit.getXid());
|
|
|
+
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("FileId: " + latestAttr.getFileid() + " Service time:"
|
|
|
+ + (System.currentTimeMillis() - commit.getStartTime())
|
|
|
+ + "ms. Sent response for commit:" + commit);
|
|
|
+ }
|
|
|
+ entry = pendingCommits.firstEntry();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
private void doSingleWrite(final WriteCtx writeCtx) {
|
|
|
Channel channel = writeCtx.getChannel();
|
|
|
int xid = writeCtx.getXid();
|
|
@@ -856,6 +1028,10 @@ class OpenFileCtx {
|
|
|
Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse(
|
|
|
new XDR(), xid, new VerifierNone()), xid);
|
|
|
}
|
|
|
+
|
|
|
+ // Handle the waiting commits without holding any lock
|
|
|
+ processCommits(writeCtx.getOffset() + writeCtx.getCount());
|
|
|
+
|
|
|
} catch (IOException e) {
|
|
|
LOG.error("Error writing to fileId " + handle.getFileId() + " at offset "
|
|
|
+ offset + " and length " + count, e);
|
|
@@ -937,4 +1113,29 @@ class OpenFileCtx {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ @VisibleForTesting
|
|
|
+ ConcurrentNavigableMap<OffsetRange, WriteCtx> getPendingWritesForTest(){
|
|
|
+ return pendingWrites;
|
|
|
+ }
|
|
|
+
|
|
|
+ @VisibleForTesting
|
|
|
+ ConcurrentNavigableMap<Long, CommitCtx> getPendingCommitsForTest(){
|
|
|
+ return pendingCommits;
|
|
|
+ }
|
|
|
+
|
|
|
+ @VisibleForTesting
|
|
|
+ long getNextOffsetForTest() {
|
|
|
+ return nextOffset.get();
|
|
|
+ }
|
|
|
+
|
|
|
+ @VisibleForTesting
|
|
|
+ void setNextOffsetForTest(long newValue) {
|
|
|
+ nextOffset.set(newValue);
|
|
|
+ }
|
|
|
+
|
|
|
+ @VisibleForTesting
|
|
|
+ void setActiveStatusForTest(boolean activeState) {
|
|
|
+ this.activeState = activeState;
|
|
|
+ }
|
|
|
}
|