|
@@ -31,8 +31,6 @@ import java.util.concurrent.ConcurrentNavigableMap;
|
|
|
import java.util.concurrent.ConcurrentSkipListMap;
|
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
|
|
|
|
-import org.apache.commons.logging.Log;
|
|
|
-import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.fs.FSDataInputStream;
|
|
|
import org.apache.hadoop.hdfs.DFSClient;
|
|
|
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
|
|
@@ -61,13 +59,15 @@ import org.jboss.netty.channel.Channel;
|
|
|
|
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
|
import com.google.common.base.Preconditions;
|
|
|
+import org.slf4j.Logger;
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
|
|
|
/**
|
|
|
* OpenFileCtx saves the context of one HDFS file output stream. Access to it is
|
|
|
* synchronized by its member lock.
|
|
|
*/
|
|
|
class OpenFileCtx {
|
|
|
- public static final Log LOG = LogFactory.getLog(OpenFileCtx.class);
|
|
|
+ public static final Logger LOG = LoggerFactory.getLogger(OpenFileCtx.class);
|
|
|
|
|
|
// Pending writes water mark for dump, 1MB
|
|
|
private static long DUMP_WRITE_WATER_MARK = 1024 * 1024;
|
|
@@ -210,10 +210,8 @@ class OpenFileCtx {
|
|
|
/** Increase or decrease the memory occupation of non-sequential writes */
|
|
|
private long updateNonSequentialWriteInMemory(long count) {
|
|
|
long newValue = nonSequentialWriteInMemory.addAndGet(count);
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("Update nonSequentialWriteInMemory by " + count + " new value: "
|
|
|
- + newValue);
|
|
|
- }
|
|
|
+ LOG.debug("Update nonSequentialWriteInMemory by {} new value: {}",
|
|
|
+ count, newValue);
|
|
|
|
|
|
Preconditions.checkState(newValue >= 0,
|
|
|
"nonSequentialWriteInMemory is negative " + newValue
|
|
@@ -273,9 +271,7 @@ class OpenFileCtx {
|
|
|
// Check if need to dump the new writes
|
|
|
private void waitForDump() {
|
|
|
if (!enabledDump) {
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("Do nothing, dump is disabled.");
|
|
|
- }
|
|
|
+ LOG.debug("Do nothing, dump is disabled.");
|
|
|
return;
|
|
|
}
|
|
|
|
|
@@ -286,9 +282,7 @@ class OpenFileCtx {
|
|
|
// wake up the dumper thread to dump the data
|
|
|
synchronized (this) {
|
|
|
if (nonSequentialWriteInMemory.get() >= DUMP_WRITE_WATER_MARK) {
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("Asking dumper to dump...");
|
|
|
- }
|
|
|
+ LOG.debug("Asking dumper to dump...");
|
|
|
if (dumpThread == null) {
|
|
|
dumpThread = new Daemon(new Dumper());
|
|
|
dumpThread.start();
|
|
@@ -312,7 +306,7 @@ class OpenFileCtx {
|
|
|
private void dump() {
|
|
|
// Create dump outputstream for the first time
|
|
|
if (dumpOut == null) {
|
|
|
- LOG.info("Create dump file: " + dumpFilePath);
|
|
|
+ LOG.info("Create dump file: {}", dumpFilePath);
|
|
|
File dumpFile = new File(dumpFilePath);
|
|
|
try {
|
|
|
synchronized (this) {
|
|
@@ -322,13 +316,14 @@ class OpenFileCtx {
|
|
|
dumpOut = new FileOutputStream(dumpFile);
|
|
|
}
|
|
|
} catch (IOException e) {
|
|
|
- LOG.error("Got failure when creating dump stream " + dumpFilePath, e);
|
|
|
+ LOG.error("Got failure when creating dump stream {}",
|
|
|
+ dumpFilePath, e);
|
|
|
enabledDump = false;
|
|
|
if (dumpOut != null) {
|
|
|
try {
|
|
|
dumpOut.close();
|
|
|
} catch (IOException e1) {
|
|
|
- LOG.error("Can't close dump stream " + dumpFilePath, e);
|
|
|
+ LOG.error("Can't close dump stream {}", dumpFilePath, e);
|
|
|
}
|
|
|
}
|
|
|
return;
|
|
@@ -340,17 +335,15 @@ class OpenFileCtx {
|
|
|
try {
|
|
|
raf = new RandomAccessFile(dumpFilePath, "r");
|
|
|
} catch (FileNotFoundException e) {
|
|
|
- LOG.error("Can't get random access to file " + dumpFilePath);
|
|
|
+ LOG.error("Can't get random access to file {}", dumpFilePath);
|
|
|
// Disable dump
|
|
|
enabledDump = false;
|
|
|
return;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("Start dump. Before dump, nonSequentialWriteInMemory == "
|
|
|
- + nonSequentialWriteInMemory.get());
|
|
|
- }
|
|
|
+ LOG.debug("Start dump. Before dump, nonSequentialWriteInMemory == {}",
|
|
|
+ nonSequentialWriteInMemory.get());
|
|
|
|
|
|
Iterator<OffsetRange> it = pendingWrites.keySet().iterator();
|
|
|
while (activeState && it.hasNext()
|
|
@@ -367,18 +360,16 @@ class OpenFileCtx {
|
|
|
updateNonSequentialWriteInMemory(-dumpedDataSize);
|
|
|
}
|
|
|
} catch (IOException e) {
|
|
|
- LOG.error("Dump data failed: " + writeCtx + " with error: " + e
|
|
|
- + " OpenFileCtx state: " + activeState);
|
|
|
+ LOG.error("Dump data failed: {} OpenFileCtx state: {}",
|
|
|
+ writeCtx, activeState, e);
|
|
|
// Disable dump
|
|
|
enabledDump = false;
|
|
|
return;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("After dump, nonSequentialWriteInMemory == "
|
|
|
- + nonSequentialWriteInMemory.get());
|
|
|
- }
|
|
|
+ LOG.debug("After dump, nonSequentialWriteInMemory == {}",
|
|
|
+ nonSequentialWriteInMemory.get());
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -393,26 +384,22 @@ class OpenFileCtx {
|
|
|
OpenFileCtx.this.notifyAll();
|
|
|
try {
|
|
|
OpenFileCtx.this.wait();
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("Dumper woke up");
|
|
|
- }
|
|
|
+ LOG.debug("Dumper woke up");
|
|
|
} catch (InterruptedException e) {
|
|
|
- LOG.info("Dumper is interrupted, dumpFilePath= "
|
|
|
- + OpenFileCtx.this.dumpFilePath);
|
|
|
+ LOG.info("Dumper is interrupted, dumpFilePath = {}",
|
|
|
+ OpenFileCtx.this.dumpFilePath);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("Dumper checking OpenFileCtx activeState: " + activeState
|
|
|
- + " enabledDump: " + enabledDump);
|
|
|
- }
|
|
|
+ LOG.debug("Dumper checking OpenFileCtx activeState: {} " +
|
|
|
+ "enabledDump: {}", activeState, enabledDump);
|
|
|
} catch (Throwable t) {
|
|
|
// unblock threads with new request
|
|
|
synchronized (OpenFileCtx.this) {
|
|
|
OpenFileCtx.this.notifyAll();
|
|
|
}
|
|
|
- LOG.info("Dumper get Throwable: " + t + ". dumpFilePath: "
|
|
|
- + OpenFileCtx.this.dumpFilePath, t);
|
|
|
+ LOG.info("Dumper got Throwable. dumpFilePath: {}",
|
|
|
+ OpenFileCtx.this.dumpFilePath, t);
|
|
|
activeState = false;
|
|
|
}
|
|
|
}
|
|
@@ -428,8 +415,8 @@ class OpenFileCtx {
|
|
|
return null;
|
|
|
} else {
|
|
|
if (xid != writeCtx.getXid()) {
|
|
|
- LOG.warn("Got a repeated request, same range, with a different xid: "
|
|
|
- + xid + " xid in old request: " + writeCtx.getXid());
|
|
|
+ LOG.warn("Got a repeated request, same range, with a different xid: " +
|
|
|
+ "{} xid in old request: {}", xid, writeCtx.getXid());
|
|
|
//TODO: better handling.
|
|
|
}
|
|
|
return writeCtx;
|
|
@@ -441,8 +428,8 @@ class OpenFileCtx {
|
|
|
IdMappingServiceProvider iug) {
|
|
|
|
|
|
if (!activeState) {
|
|
|
- LOG.info("OpenFileCtx is inactive, fileId: "
|
|
|
- + request.getHandle().dumpFileHandle());
|
|
|
+ LOG.info("OpenFileCtx is inactive, fileId: {}",
|
|
|
+ request.getHandle().dumpFileHandle());
|
|
|
WccData fileWcc = new WccData(latestAttr.getWccAttr(), latestAttr);
|
|
|
WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO,
|
|
|
fileWcc, 0, request.getStableHow(), Nfs3Constant.WRITE_COMMIT_VERF);
|
|
@@ -460,15 +447,11 @@ class OpenFileCtx {
|
|
|
xid);
|
|
|
if (existantWriteCtx != null) {
|
|
|
if (!existantWriteCtx.getReplied()) {
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("Repeated write request which hasn't been served: xid="
|
|
|
- + xid + ", drop it.");
|
|
|
- }
|
|
|
+ LOG.debug("Repeated write request which hasn't been served: " +
|
|
|
+ "xid={}, drop it.", xid);
|
|
|
} else {
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("Repeated write request which is already served: xid="
|
|
|
- + xid + ", resend response.");
|
|
|
- }
|
|
|
+ LOG.debug("Repeated write request which is already served: xid={}" +
|
|
|
+ ", resend response.", xid);
|
|
|
WccData fileWcc = new WccData(latestAttr.getWccAttr(), latestAttr);
|
|
|
WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
|
|
|
fileWcc, request.getCount(), request.getStableHow(),
|
|
@@ -489,13 +472,11 @@ class OpenFileCtx {
|
|
|
long offset = request.getOffset();
|
|
|
int count = request.getCount();
|
|
|
long smallerCount = offset + count - cachedOffset;
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug(String.format("Got overwrite with appended data [%d-%d),"
|
|
|
- + " current offset %d," + " drop the overlapped section [%d-%d)"
|
|
|
- + " and append new data [%d-%d).", offset, (offset + count),
|
|
|
- cachedOffset, offset, cachedOffset, cachedOffset, (offset
|
|
|
- + count)));
|
|
|
- }
|
|
|
+ LOG.debug("Got overwrite with appended data [{}-{}),"
|
|
|
+ + " current offset {}," + " drop the overlapped section [{}-{})"
|
|
|
+ + " and append new data [{}-{}).", offset, (offset + count),
|
|
|
+ cachedOffset, offset, cachedOffset, cachedOffset,
|
|
|
+ (offset + count));
|
|
|
|
|
|
ByteBuffer data = request.getData();
|
|
|
Preconditions.checkState(data.position() == 0,
|
|
@@ -538,10 +519,8 @@ class OpenFileCtx {
|
|
|
long cachedOffset = nextOffset.get();
|
|
|
int originalCount = WriteCtx.INVALID_ORIGINAL_COUNT;
|
|
|
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("requested offset=" + offset + " and current offset="
|
|
|
- + cachedOffset);
|
|
|
- }
|
|
|
+ LOG.debug("requested offset={} and current offset={}",
|
|
|
+ offset, cachedOffset);
|
|
|
|
|
|
// Ignore write request with range below the current offset
|
|
|
if (offset + count <= cachedOffset) {
|
|
@@ -576,8 +555,8 @@ class OpenFileCtx {
|
|
|
|
|
|
// Fail non-append call
|
|
|
if (offset < cachedOffset) {
|
|
|
- LOG.warn("(offset,count,nextOffset): " + "(" + offset + "," + count + ","
|
|
|
- + nextOffset + ")");
|
|
|
+ LOG.warn("(offset,count,nextOffset): ({},{},{})",
|
|
|
+ offset, count, nextOffset);
|
|
|
return null;
|
|
|
} else {
|
|
|
DataState dataState = offset == cachedOffset ? WriteCtx.DataState.NO_DUMP
|
|
@@ -586,10 +565,8 @@ class OpenFileCtx {
|
|
|
request.getOffset(), request.getCount(), originalCount,
|
|
|
request.getStableHow(), request.getData(), channel, xid, false,
|
|
|
dataState);
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("Add new write to the list with nextOffset " + cachedOffset
|
|
|
- + " and requested offset=" + offset);
|
|
|
- }
|
|
|
+ LOG.debug("Add new write to the list with nextOffset {}" +
|
|
|
+ " and requested offset={}", cachedOffset, offset);
|
|
|
if (writeCtx.getDataState() == WriteCtx.DataState.ALLOW_DUMP) {
|
|
|
// update the memory size
|
|
|
updateNonSequentialWriteInMemory(count);
|
|
@@ -598,14 +575,12 @@ class OpenFileCtx {
|
|
|
WriteCtx oldWriteCtx = checkRepeatedWriteRequest(request, channel, xid);
|
|
|
if (oldWriteCtx == null) {
|
|
|
pendingWrites.put(new OffsetRange(offset, offset + count), writeCtx);
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("New write buffered with xid " + xid + " nextOffset "
|
|
|
- + cachedOffset + " req offset=" + offset + " mapsize="
|
|
|
- + pendingWrites.size());
|
|
|
- }
|
|
|
+ LOG.debug("New write buffered with xid {} nextOffset {}" +
|
|
|
+ "req offset={} mapsize={}",
|
|
|
+ xid, cachedOffset, offset, pendingWrites.size());
|
|
|
} else {
|
|
|
- LOG.warn("Got a repeated request, same range, with xid: " + xid
|
|
|
- + " nextOffset " + +cachedOffset + " req offset=" + offset);
|
|
|
+ LOG.warn("Got a repeated request, same range, with xid: " +
|
|
|
+ "{} nextOffset {} req offset={}", xid, cachedOffset, offset);
|
|
|
}
|
|
|
return writeCtx;
|
|
|
}
|
|
@@ -625,9 +600,7 @@ class OpenFileCtx {
|
|
|
response = new WRITE3Response(Nfs3Status.NFS3ERR_INVAL, wccData, 0,
|
|
|
WriteStableHow.UNSTABLE, Nfs3Constant.WRITE_COMMIT_VERF);
|
|
|
} else {
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("Process perfectOverWrite");
|
|
|
- }
|
|
|
+ LOG.debug("Process perfectOverWrite");
|
|
|
// TODO: let executor handle perfect overwrite
|
|
|
response = processPerfectOverWrite(dfsClient, offset, count, stableHow,
|
|
|
request.getData().array(),
|
|
@@ -652,17 +625,13 @@ class OpenFileCtx {
|
|
|
|
|
|
if (writeCtx.getOffset() == nextOffset.get()) {
|
|
|
if (!asyncStatus) {
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("Trigger the write back task. Current nextOffset: "
|
|
|
- + nextOffset.get());
|
|
|
- }
|
|
|
+ LOG.debug("Trigger the write back task. Current nextOffset: {}",
|
|
|
+ nextOffset.get());
|
|
|
asyncStatus = true;
|
|
|
asyncWriteBackStartOffset = writeCtx.getOffset();
|
|
|
asyncDataService.execute(new AsyncDataService.WriteBackTask(this));
|
|
|
} else {
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("The write back thread is working.");
|
|
|
- }
|
|
|
+ LOG.debug("The write back thread is working.");
|
|
|
}
|
|
|
return true;
|
|
|
} else {
|
|
@@ -694,15 +663,13 @@ class OpenFileCtx {
|
|
|
// responses of the previous batch. So here send response immediately
|
|
|
// for unstable non-sequential write
|
|
|
if (stableHow != WriteStableHow.UNSTABLE) {
|
|
|
- LOG.info("Have to change stable write to unstable write: "
|
|
|
- + request.getStableHow());
|
|
|
+ LOG.info("Have to change stable write to unstable write: {}",
|
|
|
+ request.getStableHow());
|
|
|
stableHow = WriteStableHow.UNSTABLE;
|
|
|
}
|
|
|
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("UNSTABLE write request, send response for offset: "
|
|
|
- + writeCtx.getOffset());
|
|
|
- }
|
|
|
+ LOG.debug("UNSTABLE write request, send response for offset: {}",
|
|
|
+ writeCtx.getOffset());
|
|
|
WccData fileWcc = new WccData(preOpAttr, latestAttr);
|
|
|
WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
|
|
|
fileWcc, count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
|
|
@@ -738,8 +705,8 @@ class OpenFileCtx {
|
|
|
LOG.info("The FSDataOutputStream has been closed. "
|
|
|
+ "Continue processing the perfect overwrite.");
|
|
|
} catch (IOException e) {
|
|
|
- LOG.info("hsync failed when processing possible perfect overwrite, path="
|
|
|
- + path + " error: " + e);
|
|
|
+ LOG.info("hsync failed when processing possible perfect overwrite, " +
|
|
|
+ "path={} error: {}", path, e.toString());
|
|
|
return new WRITE3Response(Nfs3Status.NFS3ERR_IO, wccData, 0, stableHow,
|
|
|
Nfs3Constant.WRITE_COMMIT_VERF);
|
|
|
}
|
|
@@ -748,18 +715,18 @@ class OpenFileCtx {
|
|
|
fis = dfsClient.createWrappedInputStream(dfsClient.open(path));
|
|
|
readCount = fis.read(offset, readbuffer, 0, count);
|
|
|
if (readCount < count) {
|
|
|
- LOG.error("Can't read back " + count + " bytes, partial read size: "
|
|
|
- + readCount);
|
|
|
+ LOG.error("Can't read back {} bytes, partial read size: {}",
|
|
|
+ count, readCount);
|
|
|
return new WRITE3Response(Nfs3Status.NFS3ERR_IO, wccData, 0, stableHow,
|
|
|
Nfs3Constant.WRITE_COMMIT_VERF);
|
|
|
}
|
|
|
} catch (IOException e) {
|
|
|
- LOG.info("Read failed when processing possible perfect overwrite, path="
|
|
|
- + path, e);
|
|
|
+ LOG.info("Read failed when processing possible perfect overwrite, " +
|
|
|
+ "path={}", path, e);
|
|
|
return new WRITE3Response(Nfs3Status.NFS3ERR_IO, wccData, 0, stableHow,
|
|
|
Nfs3Constant.WRITE_COMMIT_VERF);
|
|
|
} finally {
|
|
|
- IOUtils.cleanup(LOG, fis);
|
|
|
+ IOUtils.cleanupWithLogger(LOG, fis);
|
|
|
}
|
|
|
|
|
|
// Compare with the request
|
|
@@ -776,8 +743,8 @@ class OpenFileCtx {
|
|
|
dfsClient.setTimes(path, Time.monotonicNow(), -1);
|
|
|
postOpAttr = Nfs3Utils.getFileAttr(dfsClient, path, iug);
|
|
|
} catch (IOException e) {
|
|
|
- LOG.info("Got error when processing perfect overwrite, path=" + path
|
|
|
- + " error: " + e);
|
|
|
+ LOG.info("Got error when processing perfect overwrite, path={} " +
|
|
|
+ "error: {}", path, e.toString());
|
|
|
return new WRITE3Response(Nfs3Status.NFS3ERR_IO, wccData, 0, stableHow,
|
|
|
Nfs3Constant.WRITE_COMMIT_VERF);
|
|
|
}
|
|
@@ -810,9 +777,7 @@ class OpenFileCtx {
|
|
|
|
|
|
COMMIT_STATUS ret = checkCommitInternal(commitOffset, channel, xid,
|
|
|
preOpAttr, fromRead);
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("Got commit status: " + ret.name());
|
|
|
- }
|
|
|
+ LOG.debug("Got commit status: {}", ret.name());
|
|
|
// Do the sync outside the lock
|
|
|
if (ret == COMMIT_STATUS.COMMIT_DO_SYNC
|
|
|
|| ret == COMMIT_STATUS.COMMIT_FINISHED) {
|
|
@@ -828,7 +793,7 @@ class OpenFileCtx {
|
|
|
ret = COMMIT_STATUS.COMMIT_ERROR;
|
|
|
}
|
|
|
} catch (IOException e) {
|
|
|
- LOG.error("Got stream error during data sync: " + e);
|
|
|
+ LOG.error("Got stream error during data sync", e);
|
|
|
// Do nothing. Stream will be closed eventually by StreamMonitor.
|
|
|
// status = Nfs3Status.NFS3ERR_IO;
|
|
|
ret = COMMIT_STATUS.COMMIT_ERROR;
|
|
@@ -867,9 +832,7 @@ class OpenFileCtx {
|
|
|
CommitCtx commitCtx = new CommitCtx(commitOffset, channel, xid, preOpAttr);
|
|
|
pendingCommits.put(commitOffset, commitCtx);
|
|
|
}
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("return COMMIT_SPECIAL_WAIT");
|
|
|
- }
|
|
|
+ LOG.debug("return COMMIT_SPECIAL_WAIT");
|
|
|
return COMMIT_STATUS.COMMIT_SPECIAL_WAIT;
|
|
|
}
|
|
|
|
|
@@ -886,10 +849,8 @@ class OpenFileCtx {
|
|
|
}
|
|
|
|
|
|
long flushed = getFlushedOffset();
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("getFlushedOffset=" + flushed + " commitOffset=" + commitOffset
|
|
|
- + "nextOffset=" + nextOffset.get());
|
|
|
- }
|
|
|
+ LOG.debug("getFlushedOffset={} commitOffset={} nextOffset={}",
|
|
|
+ flushed, commitOffset, nextOffset.get());
|
|
|
|
|
|
if (pendingWrites.isEmpty()) {
|
|
|
if (aixCompatMode) {
|
|
@@ -898,10 +859,8 @@ class OpenFileCtx {
|
|
|
return COMMIT_STATUS.COMMIT_FINISHED;
|
|
|
} else {
|
|
|
if (flushed < nextOffset.get()) {
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("get commit while still writing to the requested offset,"
|
|
|
- + " with empty queue");
|
|
|
- }
|
|
|
+ LOG.debug("get commit while still writing to the requested offset,"
|
|
|
+ + " with empty queue");
|
|
|
return handleSpecialWait(fromRead, nextOffset.get(), channel, xid,
|
|
|
preOpAttr);
|
|
|
} else {
|
|
@@ -920,18 +879,14 @@ class OpenFileCtx {
|
|
|
if (co <= flushed) {
|
|
|
return COMMIT_STATUS.COMMIT_DO_SYNC;
|
|
|
} else if (co < nextOffset.get()) {
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("get commit while still writing to the requested offset");
|
|
|
- }
|
|
|
+ LOG.debug("get commit while still writing to the requested offset");
|
|
|
return handleSpecialWait(fromRead, co, channel, xid, preOpAttr);
|
|
|
} else {
|
|
|
// co >= nextOffset
|
|
|
if (checkSequential(co, nextOffset.get())) {
|
|
|
return handleSpecialWait(fromRead, co, channel, xid, preOpAttr);
|
|
|
} else {
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("return COMMIT_SPECIAL_SUCCESS");
|
|
|
- }
|
|
|
+ LOG.debug("return COMMIT_SPECIAL_SUCCESS");
|
|
|
return COMMIT_STATUS.COMMIT_SPECIAL_SUCCESS;
|
|
|
}
|
|
|
}
|
|
@@ -993,8 +948,8 @@ class OpenFileCtx {
|
|
|
// Check the stream timeout
|
|
|
if (checkStreamTimeout(streamTimeout)) {
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("stream can be closed for fileId: "
|
|
|
- + handle.dumpFileHandle());
|
|
|
+ LOG.debug("stream can be closed for fileId: {}",
|
|
|
+ handle.dumpFileHandle());
|
|
|
}
|
|
|
flag = true;
|
|
|
}
|
|
@@ -1009,10 +964,8 @@ class OpenFileCtx {
|
|
|
*/
|
|
|
private synchronized WriteCtx offerNextToWrite() {
|
|
|
if (pendingWrites.isEmpty()) {
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("The async write task has no pending writes, fileId: "
|
|
|
- + latestAttr.getFileId());
|
|
|
- }
|
|
|
+ LOG.debug("The async write task has no pending writes, fileId: {}",
|
|
|
+ latestAttr.getFileId());
|
|
|
// process pending commit again to handle this race: a commit is added
|
|
|
// to pendingCommits map just after the last doSingleWrite returns.
|
|
|
// There is no pending write and the commit should be handled by the
|
|
@@ -1029,49 +982,35 @@ class OpenFileCtx {
|
|
|
OffsetRange range = lastEntry.getKey();
|
|
|
WriteCtx toWrite = lastEntry.getValue();
|
|
|
|
|
|
- if (LOG.isTraceEnabled()) {
|
|
|
- LOG.trace("range.getMin()=" + range.getMin() + " nextOffset="
|
|
|
- + nextOffset);
|
|
|
- }
|
|
|
+ LOG.trace("range.getMin()={} nextOffset={}",
|
|
|
+ range.getMin(), nextOffset);
|
|
|
|
|
|
long offset = nextOffset.get();
|
|
|
if (range.getMin() > offset) {
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("The next sequential write has not arrived yet");
|
|
|
- }
|
|
|
+ LOG.debug("The next sequential write has not arrived yet");
|
|
|
processCommits(nextOffset.get()); // handle race
|
|
|
this.asyncStatus = false;
|
|
|
} else if (range.getMax() <= offset) {
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("Remove write " + range.toString()
|
|
|
- + " which is already written from the list");
|
|
|
- }
|
|
|
+ LOG.debug("Remove write {} which is already written from the list",
|
|
|
+ range);
|
|
|
// remove the WriteCtx from cache
|
|
|
pendingWrites.remove(range);
|
|
|
} else if (range.getMin() < offset && range.getMax() > offset) {
|
|
|
- LOG.warn("Got an overlapping write " + range.toString()
|
|
|
- + ", nextOffset=" + offset
|
|
|
- + ". Remove and trim it");
|
|
|
+ LOG.warn("Got an overlapping write {}, nextOffset={}. " +
|
|
|
+ "Remove and trim it", range, offset);
|
|
|
pendingWrites.remove(range);
|
|
|
trimWriteRequest(toWrite, offset);
|
|
|
// update nextOffset
|
|
|
nextOffset.addAndGet(toWrite.getCount());
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("Change nextOffset (after trim) to " + nextOffset.get());
|
|
|
- }
|
|
|
+ LOG.debug("Change nextOffset (after trim) to {}", nextOffset.get());
|
|
|
return toWrite;
|
|
|
} else {
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("Remove write " + range.toString()
|
|
|
- + " from the list");
|
|
|
- }
|
|
|
+ LOG.debug("Remove write {} from the list", range);
|
|
|
// after writing, remove the WriteCtx from cache
|
|
|
pendingWrites.remove(range);
|
|
|
// update nextOffset
|
|
|
nextOffset.addAndGet(toWrite.getCount());
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("Change nextOffset to " + nextOffset.get());
|
|
|
- }
|
|
|
+ LOG.debug("Change nextOffset to {}", nextOffset.get());
|
|
|
return toWrite;
|
|
|
}
|
|
|
return null;
|
|
@@ -1095,9 +1034,9 @@ class OpenFileCtx {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- if (!activeState && LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("The openFileCtx is not active anymore, fileId: "
|
|
|
- + latestAttr.getFileId());
|
|
|
+ if (!activeState) {
|
|
|
+ LOG.debug("The openFileCtx is not active anymore, fileId: {}",
|
|
|
+ latestAttr.getFileId());
|
|
|
}
|
|
|
} finally {
|
|
|
// Make sure to reset asyncStatus to false unless a race happens
|
|
@@ -1105,11 +1044,12 @@ class OpenFileCtx {
|
|
|
if (startOffset == asyncWriteBackStartOffset) {
|
|
|
asyncStatus = false;
|
|
|
} else {
|
|
|
- LOG.info("Another async task is already started before this one"
|
|
|
- + " is finalized. fileId: " + latestAttr.getFileId()
|
|
|
- + " asyncStatus: " + asyncStatus + " original startOffset: "
|
|
|
- + startOffset + " new startOffset: " + asyncWriteBackStartOffset
|
|
|
- + ". Won't change asyncStatus here.");
|
|
|
+ LOG.info("Another async task is already started before this one " +
|
|
|
+ "is finalized. fileId: {} asyncStatus: {} " +
|
|
|
+ "original startOffset: {} " +
|
|
|
+ "new startOffset: {}. Won't change asyncStatus here.",
|
|
|
+ latestAttr.getFileId(), asyncStatus,
|
|
|
+ startOffset, asyncWriteBackStartOffset);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -1132,8 +1072,8 @@ class OpenFileCtx {
|
|
|
status = Nfs3Status.NFS3_OK;
|
|
|
} catch (ClosedChannelException cce) {
|
|
|
if (!pendingWrites.isEmpty()) {
|
|
|
- LOG.error("Can't sync for fileId: " + latestAttr.getFileId()
|
|
|
- + ". Channel closed with writes pending.", cce);
|
|
|
+ LOG.error("Can't sync for fileId: {}. " +
|
|
|
+ "Channel closed with writes pending", latestAttr.getFileId(), cce);
|
|
|
}
|
|
|
status = Nfs3Status.NFS3ERR_IO;
|
|
|
} catch (IOException e) {
|
|
@@ -1152,8 +1092,8 @@ class OpenFileCtx {
|
|
|
}
|
|
|
|
|
|
if (latestAttr.getSize() != offset) {
|
|
|
- LOG.error("After sync, the expect file size: " + offset
|
|
|
- + ", however actual file size is: " + latestAttr.getSize());
|
|
|
+ LOG.error("After sync, the expect file size: {}, " +
|
|
|
+ "however actual file size is: {}", offset, latestAttr.getSize());
|
|
|
status = Nfs3Status.NFS3ERR_IO;
|
|
|
}
|
|
|
WccData wccData = new WccData(Nfs3Utils.getWccAttr(latestAttr), latestAttr);
|
|
@@ -1170,11 +1110,11 @@ class OpenFileCtx {
|
|
|
Nfs3Utils.writeChannelCommit(commit.getChannel(), response
|
|
|
.serialize(new XDR(), commit.getXid(),
|
|
|
new VerifierNone()), commit.getXid());
|
|
|
-
|
|
|
+
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("FileId: " + latestAttr.getFileId() + " Service time: "
|
|
|
- + Nfs3Utils.getElapsedTime(commit.startTime)
|
|
|
- + "ns. Sent response for commit: " + commit);
|
|
|
+ LOG.debug("FileId: {} Service time: {}ns. " +
|
|
|
+ "Sent response for commit: {}", latestAttr.getFileId(),
|
|
|
+ Nfs3Utils.getElapsedTime(commit.startTime), commit);
|
|
|
}
|
|
|
entry = pendingCommits.firstEntry();
|
|
|
}
|
|
@@ -1190,8 +1130,8 @@ class OpenFileCtx {
|
|
|
|
|
|
FileHandle handle = writeCtx.getHandle();
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("do write, fileHandle " + handle.dumpFileHandle() + " offset: "
|
|
|
- + offset + " length: " + count + " stableHow: " + stableHow.name());
|
|
|
+ LOG.debug("do write, fileHandle {} offset: {} length: {} stableHow: {}",
|
|
|
+ handle.dumpFileHandle(), offset, count, stableHow.name());
|
|
|
}
|
|
|
|
|
|
try {
|
|
@@ -1215,10 +1155,10 @@ class OpenFileCtx {
|
|
|
writeCtx.setDataState(WriteCtx.DataState.NO_DUMP);
|
|
|
updateNonSequentialWriteInMemory(-count);
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("After writing " + handle.dumpFileHandle()
|
|
|
- + " at offset " + offset
|
|
|
- + ", updated the memory count, new value: "
|
|
|
- + nonSequentialWriteInMemory.get());
|
|
|
+ LOG.debug("After writing {} at offset {}, " +
|
|
|
+ "updated the memory count, new value: {}",
|
|
|
+ handle.dumpFileHandle(), offset,
|
|
|
+ nonSequentialWriteInMemory.get());
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -1226,7 +1166,7 @@ class OpenFileCtx {
|
|
|
|
|
|
if (!writeCtx.getReplied()) {
|
|
|
if (stableHow != WriteStableHow.UNSTABLE) {
|
|
|
- LOG.info("Do sync for stable write: " + writeCtx);
|
|
|
+ LOG.info("Do sync for stable write: {}", writeCtx);
|
|
|
try {
|
|
|
if (stableHow == WriteStableHow.DATA_SYNC) {
|
|
|
fos.hsync();
|
|
@@ -1237,7 +1177,7 @@ class OpenFileCtx {
|
|
|
fos.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
|
|
|
}
|
|
|
} catch (IOException e) {
|
|
|
- LOG.error("hsync failed with writeCtx: " + writeCtx, e);
|
|
|
+ LOG.error("hsync failed with writeCtx: {}", writeCtx, e);
|
|
|
throw e;
|
|
|
}
|
|
|
}
|
|
@@ -1245,8 +1185,8 @@ class OpenFileCtx {
|
|
|
WccAttr preOpAttr = latestAttr.getWccAttr();
|
|
|
WccData fileWcc = new WccData(preOpAttr, latestAttr);
|
|
|
if (writeCtx.getOriginalCount() != WriteCtx.INVALID_ORIGINAL_COUNT) {
|
|
|
- LOG.warn("Return original count: " + writeCtx.getOriginalCount()
|
|
|
- + " instead of real data count: " + count);
|
|
|
+ LOG.warn("Return original count: {} instead of real data count: {}",
|
|
|
+ writeCtx.getOriginalCount(), count);
|
|
|
count = writeCtx.getOriginalCount();
|
|
|
}
|
|
|
WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
|
|
@@ -1260,8 +1200,8 @@ class OpenFileCtx {
|
|
|
processCommits(writeCtx.getOffset() + writeCtx.getCount());
|
|
|
|
|
|
} catch (IOException e) {
|
|
|
- LOG.error("Error writing to fileHandle " + handle.dumpFileHandle()
|
|
|
- + " at offset " + offset + " and length " + count, e);
|
|
|
+ LOG.error("Error writing to fileHandle {} at offset {} and length {}",
|
|
|
+ handle.dumpFileHandle(), offset, count, e);
|
|
|
if (!writeCtx.getReplied()) {
|
|
|
WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO);
|
|
|
Nfs3Utils.writeChannel(channel, response.serialize(
|
|
@@ -1269,8 +1209,8 @@ class OpenFileCtx {
|
|
|
// Keep stream open. Either client retries or SteamMonitor closes it.
|
|
|
}
|
|
|
|
|
|
- LOG.info("Clean up open file context for fileId: "
|
|
|
- + latestAttr.getFileId());
|
|
|
+ LOG.info("Clean up open file context for fileId: {}",
|
|
|
+ latestAttr.getFileId());
|
|
|
cleanup();
|
|
|
}
|
|
|
}
|
|
@@ -1297,17 +1237,16 @@ class OpenFileCtx {
|
|
|
fos.close();
|
|
|
}
|
|
|
} catch (IOException e) {
|
|
|
- LOG.info("Can't close stream for fileId: " + latestAttr.getFileId()
|
|
|
- + ", error: " + e);
|
|
|
+ LOG.info("Can't close stream for fileId: {}, error: {}",
|
|
|
+ latestAttr.getFileId(), e.toString());
|
|
|
}
|
|
|
|
|
|
// Reply error for pending writes
|
|
|
- LOG.info("There are " + pendingWrites.size() + " pending writes.");
|
|
|
+ LOG.info("There are {} pending writes.", pendingWrites.size());
|
|
|
WccAttr preOpAttr = latestAttr.getWccAttr();
|
|
|
while (!pendingWrites.isEmpty()) {
|
|
|
OffsetRange key = pendingWrites.firstKey();
|
|
|
- LOG.info("Fail pending write: " + key.toString()
|
|
|
- + ", nextOffset=" + nextOffset.get());
|
|
|
+ LOG.info("Fail pending write: {}, nextOffset={}", key, nextOffset.get());
|
|
|
|
|
|
WriteCtx writeCtx = pendingWrites.remove(key);
|
|
|
if (!writeCtx.getReplied()) {
|
|
@@ -1325,11 +1264,12 @@ class OpenFileCtx {
|
|
|
try {
|
|
|
dumpOut.close();
|
|
|
} catch (IOException e) {
|
|
|
- LOG.error("Failed to close outputstream of dump file" + dumpFilePath, e);
|
|
|
+ LOG.error("Failed to close outputstream of dump file {}",
|
|
|
+ dumpFilePath, e);
|
|
|
}
|
|
|
File dumpFile = new File(dumpFilePath);
|
|
|
if (dumpFile.exists() && !dumpFile.delete()) {
|
|
|
- LOG.error("Failed to delete dumpfile: " + dumpFile);
|
|
|
+ LOG.error("Failed to delete dumpfile: {}", dumpFile);
|
|
|
}
|
|
|
}
|
|
|
if (raf != null) {
|