|
@@ -22,6 +22,7 @@ import java.io.FileNotFoundException;
|
|
|
import java.io.FileOutputStream;
|
|
|
import java.io.IOException;
|
|
|
import java.io.RandomAccessFile;
|
|
|
+import java.nio.ByteBuffer;
|
|
|
import java.nio.channels.ClosedChannelException;
|
|
|
import java.security.InvalidParameterException;
|
|
|
import java.util.EnumSet;
|
|
@@ -55,6 +56,7 @@ import org.apache.hadoop.oncrpc.security.VerifierNone;
|
|
|
import org.apache.hadoop.util.Daemon;
|
|
|
import org.jboss.netty.channel.Channel;
|
|
|
|
|
|
+import com.google.common.annotations.VisibleForTesting;
|
|
|
import com.google.common.base.Preconditions;
|
|
|
|
|
|
/**
|
|
@@ -362,6 +364,30 @@ class OpenFileCtx {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ @VisibleForTesting
|
|
|
+ public static void alterWriteRequest(WRITE3Request request, long cachedOffset) {
|
|
|
+ long offset = request.getOffset();
|
|
|
+ int count = request.getCount();
|
|
|
+ long smallerCount = offset + count - cachedOffset;
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug(String.format("Got overwrite with appended data (%d-%d),"
|
|
|
+ + " current offset %d," + " drop the overlapped section (%d-%d)"
|
|
|
+ + " and append new data (%d-%d).", offset, (offset + count - 1),
|
|
|
+ cachedOffset, offset, (cachedOffset - 1), cachedOffset, (offset
|
|
|
+ + count - 1)));
|
|
|
+ }
|
|
|
+
|
|
|
+ ByteBuffer data = request.getData();
|
|
|
+ Preconditions.checkState(data.position() == 0,
|
|
|
+ "The write request data has non-zero position");
|
|
|
+ data.position((int) (cachedOffset - offset));
|
|
|
+ Preconditions.checkState(data.limit() - data.position() == smallerCount,
|
|
|
+ "The write request buffer has wrong limit/position regarding count");
|
|
|
+
|
|
|
+ request.setOffset(cachedOffset);
|
|
|
+ request.setCount((int) smallerCount);
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Creates and adds a WriteCtx into the pendingWrites map. This is a
|
|
|
* synchronized method to handle concurrent writes.
|
|
@@ -374,12 +400,40 @@ class OpenFileCtx {
|
|
|
long offset = request.getOffset();
|
|
|
int count = request.getCount();
|
|
|
long cachedOffset = nextOffset.get();
|
|
|
-
|
|
|
+ int originalCount = WriteCtx.INVALID_ORIGINAL_COUNT;
|
|
|
+
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("requesed offset=" + offset + " and current offset="
|
|
|
+ cachedOffset);
|
|
|
}
|
|
|
|
|
|
+ // Handle a special case first
|
|
|
+ if ((offset < cachedOffset) && (offset + count > cachedOffset)) {
|
|
|
+ // One Linux client behavior: after a file is closed and reopened to
|
|
|
+ // write, the client sometimes combines previous written data(could still
|
|
|
+ // be in kernel buffer) with newly appended data in one write. This is
|
|
|
+ // usually the first write after file reopened. In this
|
|
|
+ // case, we log the event and drop the overlapped section.
|
|
|
+ LOG.warn(String.format("Got overwrite with appended data (%d-%d),"
|
|
|
+ + " current offset %d," + " drop the overlapped section (%d-%d)"
|
|
|
+ + " and append new data (%d-%d).", offset, (offset + count - 1),
|
|
|
+ cachedOffset, offset, (cachedOffset - 1), cachedOffset, (offset
|
|
|
+ + count - 1)));
|
|
|
+
|
|
|
+ if (!pendingWrites.isEmpty()) {
|
|
|
+ LOG.warn("There are other pending writes, fail this jumbo write");
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ LOG.warn("Modify this write to write only the appended data");
|
|
|
+ alterWriteRequest(request, cachedOffset);
|
|
|
+
|
|
|
+ // Update local variable
|
|
|
+ originalCount = count;
|
|
|
+ offset = request.getOffset();
|
|
|
+ count = request.getCount();
|
|
|
+ }
|
|
|
+
|
|
|
// Fail non-append call
|
|
|
if (offset < cachedOffset) {
|
|
|
LOG.warn("(offset,count,nextOffset):" + "(" + offset + "," + count + ","
|
|
@@ -389,8 +443,9 @@ class OpenFileCtx {
|
|
|
DataState dataState = offset == cachedOffset ? WriteCtx.DataState.NO_DUMP
|
|
|
: WriteCtx.DataState.ALLOW_DUMP;
|
|
|
WriteCtx writeCtx = new WriteCtx(request.getHandle(),
|
|
|
- request.getOffset(), request.getCount(), request.getStableHow(),
|
|
|
- request.getData().array(), channel, xid, false, dataState);
|
|
|
+ request.getOffset(), request.getCount(), originalCount,
|
|
|
+ request.getStableHow(), request.getData(), channel, xid, false,
|
|
|
+ dataState);
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("Add new write to the list with nextOffset " + cachedOffset
|
|
|
+ " and requesed offset=" + offset);
|
|
@@ -421,8 +476,7 @@ class OpenFileCtx {
|
|
|
WRITE3Response response;
|
|
|
long cachedOffset = nextOffset.get();
|
|
|
if (offset + count > cachedOffset) {
|
|
|
- LOG.warn("Haven't noticed any partial overwrite for a sequential file"
|
|
|
- + " write requests. Treat it as a real random write, no support.");
|
|
|
+ LOG.warn("Treat this jumbo write as a real random write, no support.");
|
|
|
response = new WRITE3Response(Nfs3Status.NFS3ERR_INVAL, wccData, 0,
|
|
|
WriteStableHow.UNSTABLE, Nfs3Constant.WRITE_COMMIT_VERF);
|
|
|
} else {
|
|
@@ -641,6 +695,7 @@ class OpenFileCtx {
|
|
|
private void addWrite(WriteCtx writeCtx) {
|
|
|
long offset = writeCtx.getOffset();
|
|
|
int count = writeCtx.getCount();
|
|
|
+ // For the offset range (min, max), min is inclusive, and max is exclusive
|
|
|
pendingWrites.put(new OffsetRange(offset, offset + count), writeCtx);
|
|
|
}
|
|
|
|
|
@@ -753,19 +808,7 @@ class OpenFileCtx {
|
|
|
long offset = writeCtx.getOffset();
|
|
|
int count = writeCtx.getCount();
|
|
|
WriteStableHow stableHow = writeCtx.getStableHow();
|
|
|
- byte[] data = null;
|
|
|
- try {
|
|
|
- data = writeCtx.getData();
|
|
|
- } catch (Exception e1) {
|
|
|
- LOG.error("Failed to get request data offset:" + offset + " count:"
|
|
|
- + count + " error:" + e1);
|
|
|
- // Cleanup everything
|
|
|
- cleanup();
|
|
|
- return;
|
|
|
- }
|
|
|
|
|
|
- Preconditions.checkState(data.length == count);
|
|
|
-
|
|
|
FileHandle handle = writeCtx.getHandle();
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("do write, fileId: " + handle.getFileId() + " offset: "
|
|
@@ -774,8 +817,8 @@ class OpenFileCtx {
|
|
|
|
|
|
try {
|
|
|
// The write is not protected by lock. asyncState is used to make sure
|
|
|
- // there is one thread doing write back at any time
|
|
|
- fos.write(data, 0, count);
|
|
|
+ // there is one thread doing write back at any time
|
|
|
+ writeCtx.writeData(fos);
|
|
|
|
|
|
long flushedOffset = getFlushedOffset();
|
|
|
if (flushedOffset != (offset + count)) {
|
|
@@ -784,10 +827,6 @@ class OpenFileCtx {
|
|
|
+ (offset + count));
|
|
|
}
|
|
|
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("After writing " + handle.getFileId() + " at offset "
|
|
|
- + offset + ", update the memory count.");
|
|
|
- }
|
|
|
|
|
|
// Reduce memory occupation size if request was allowed dumped
|
|
|
if (writeCtx.getDataState() == WriteCtx.DataState.ALLOW_DUMP) {
|
|
@@ -795,6 +834,11 @@ class OpenFileCtx {
|
|
|
if (writeCtx.getDataState() == WriteCtx.DataState.ALLOW_DUMP) {
|
|
|
writeCtx.setDataState(WriteCtx.DataState.NO_DUMP);
|
|
|
updateNonSequentialWriteInMemory(-count);
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("After writing " + handle.getFileId() + " at offset "
|
|
|
+ + offset + ", updated the memory count, new value:"
|
|
|
+ + nonSequentialWriteInMemory.get());
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -802,6 +846,11 @@ class OpenFileCtx {
|
|
|
if (!writeCtx.getReplied()) {
|
|
|
WccAttr preOpAttr = latestAttr.getWccAttr();
|
|
|
WccData fileWcc = new WccData(preOpAttr, latestAttr);
|
|
|
+ if (writeCtx.getOriginalCount() != WriteCtx.INVALID_ORIGINAL_COUNT) {
|
|
|
+ LOG.warn("Return original count:" + writeCtx.getOriginalCount()
|
|
|
+ + " instead of real data count:" + count);
|
|
|
+ count = writeCtx.getOriginalCount();
|
|
|
+ }
|
|
|
WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
|
|
|
fileWcc, count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
|
|
|
Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse(
|
|
@@ -809,7 +858,7 @@ class OpenFileCtx {
|
|
|
}
|
|
|
} catch (IOException e) {
|
|
|
LOG.error("Error writing to fileId " + handle.getFileId() + " at offset "
|
|
|
- + offset + " and length " + data.length, e);
|
|
|
+ + offset + " and length " + count, e);
|
|
|
if (!writeCtx.getReplied()) {
|
|
|
WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO);
|
|
|
Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse(
|