|
@@ -25,7 +25,9 @@ import java.util.concurrent.ConcurrentMap;
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
|
|
import org.apache.hadoop.hdfs.DFSClient;
|
|
|
+import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
|
|
|
import org.apache.hadoop.nfs.NfsFileType;
|
|
|
import org.apache.hadoop.nfs.nfs3.FileHandle;
|
|
|
import org.apache.hadoop.nfs.nfs3.IdUserGroup;
|
|
@@ -48,6 +50,7 @@ import com.google.common.collect.Maps;
|
|
|
public class WriteManager {
|
|
|
public static final Log LOG = LogFactory.getLog(WriteManager.class);
|
|
|
|
|
|
+ private final Configuration config;
|
|
|
private final IdUserGroup iug;
|
|
|
private final ConcurrentMap<FileHandle, OpenFileCtx> openFileMap = Maps
|
|
|
.newConcurrentMap();
|
|
@@ -76,6 +79,7 @@ public class WriteManager {
|
|
|
|
|
|
WriteManager(IdUserGroup iug, final Configuration config) {
|
|
|
this.iug = iug;
|
|
|
+ this.config = config;
|
|
|
|
|
|
streamTimeout = config.getLong("dfs.nfs3.stream.timeout",
|
|
|
DEFAULT_STREAM_TIMEOUT);
|
|
@@ -129,12 +133,41 @@ public class WriteManager {
|
|
|
OpenFileCtx openFileCtx = openFileMap.get(fileHandle);
|
|
|
if (openFileCtx == null) {
|
|
|
LOG.info("No opened stream for fileId:" + fileHandle.getFileId());
|
|
|
- WccData fileWcc = new WccData(Nfs3Utils.getWccAttr(preOpAttr), preOpAttr);
|
|
|
- WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO,
|
|
|
- fileWcc, count, request.getStableHow(),
|
|
|
- Nfs3Constant.WRITE_COMMIT_VERF);
|
|
|
- Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
|
|
|
- return;
|
|
|
+
|
|
|
+ String fileIdPath = Nfs3Utils.getFileIdPath(fileHandle.getFileId());
|
|
|
+ HdfsDataOutputStream fos = null;
|
|
|
+ Nfs3FileAttributes latestAttr = null;
|
|
|
+ try {
|
|
|
+ int bufferSize = config.getInt(
|
|
|
+ CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
|
|
|
+ CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);
|
|
|
+
|
|
|
+ fos = dfsClient.append(fileIdPath, bufferSize, null, null);
|
|
|
+
|
|
|
+ latestAttr = Nfs3Utils.getFileAttr(dfsClient, fileIdPath, iug);
|
|
|
+ } catch (IOException e) {
|
|
|
+ LOG.error("Can't apapend to file:" + fileIdPath + ", error:" + e);
|
|
|
+ if (fos != null) {
|
|
|
+ fos.close();
|
|
|
+ }
|
|
|
+ WccData fileWcc = new WccData(Nfs3Utils.getWccAttr(preOpAttr),
|
|
|
+ preOpAttr);
|
|
|
+ WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO,
|
|
|
+ fileWcc, count, request.getStableHow(),
|
|
|
+ Nfs3Constant.WRITE_COMMIT_VERF);
|
|
|
+ Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // Add open stream
|
|
|
+ String writeDumpDir = config.get(Nfs3Constant.FILE_DUMP_DIR_KEY,
|
|
|
+ Nfs3Constant.FILE_DUMP_DIR_DEFAULT);
|
|
|
+ openFileCtx = new OpenFileCtx(fos, latestAttr, writeDumpDir + "/"
|
|
|
+ + fileHandle.getFileId());
|
|
|
+ addOpenFileStream(fileHandle, openFileCtx);
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("opened stream for file:" + fileHandle.getFileId());
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
// Add write into the async job queue
|