|
@@ -1691,6 +1691,15 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
|
|
if(LOG.isDebugEnabled()) {
|
|
|
LOG.debug(src + ": masked=" + masked);
|
|
|
}
|
|
|
+ final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this,
|
|
|
+ src, masked, flag, createParent, replication, blockSize, progress,
|
|
|
+ buffersize, dfsClientConf.createChecksum(checksumOpt),
|
|
|
+ getFavoredNodesStr(favoredNodes));
|
|
|
+ beginFileLease(result.getFileId(), result);
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+
|
|
|
+ private String[] getFavoredNodesStr(InetSocketAddress[] favoredNodes) {
|
|
|
String[] favoredNodeStrs = null;
|
|
|
if (favoredNodes != null) {
|
|
|
favoredNodeStrs = new String[favoredNodes.length];
|
|
@@ -1700,12 +1709,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
|
|
+ favoredNodes[i].getPort();
|
|
|
}
|
|
|
}
|
|
|
- final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this,
|
|
|
- src, masked, flag, createParent, replication, blockSize, progress,
|
|
|
- buffersize, dfsClientConf.createChecksum(checksumOpt),
|
|
|
- favoredNodeStrs);
|
|
|
- beginFileLease(result.getFileId(), result);
|
|
|
- return result;
|
|
|
+ return favoredNodeStrs;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1723,7 +1727,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
|
|
}
|
|
|
return null;
|
|
|
}
|
|
|
- return callAppend(src, buffersize, flag, progress);
|
|
|
+ return callAppend(src, buffersize, flag, progress, null);
|
|
|
}
|
|
|
return null;
|
|
|
}
|
|
@@ -1802,7 +1806,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
|
|
|
|
|
/** Method to get stream returned by append call */
|
|
|
private DFSOutputStream callAppend(String src, int buffersize,
|
|
|
- EnumSet<CreateFlag> flag, Progressable progress) throws IOException {
|
|
|
+ EnumSet<CreateFlag> flag, Progressable progress, String[] favoredNodes)
|
|
|
+ throws IOException {
|
|
|
CreateFlag.validateForAppend(flag);
|
|
|
try {
|
|
|
LastBlockWithStatus blkWithStatus = namenode.append(src, clientName,
|
|
@@ -1810,7 +1815,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
|
|
return DFSOutputStream.newStreamForAppend(this, src,
|
|
|
flag.contains(CreateFlag.NEW_BLOCK),
|
|
|
buffersize, progress, blkWithStatus.getLastBlock(),
|
|
|
- blkWithStatus.getFileStatus(), dfsClientConf.createChecksum());
|
|
|
+ blkWithStatus.getFileStatus(), dfsClientConf.createChecksum(), favoredNodes);
|
|
|
} catch(RemoteException re) {
|
|
|
throw re.unwrapRemoteException(AccessControlException.class,
|
|
|
FileNotFoundException.class,
|
|
@@ -1838,14 +1843,38 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
|
|
public HdfsDataOutputStream append(final String src, final int buffersize,
|
|
|
EnumSet<CreateFlag> flag, final Progressable progress,
|
|
|
final FileSystem.Statistics statistics) throws IOException {
|
|
|
- final DFSOutputStream out = append(src, buffersize, flag, progress);
|
|
|
+ final DFSOutputStream out = append(src, buffersize, flag, null, progress);
|
|
|
+ return createWrappedOutputStream(out, statistics, out.getInitialLen());
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Append to an existing HDFS file.
|
|
|
+ *
|
|
|
+ * @param src file name
|
|
|
+ * @param buffersize buffer size
|
|
|
+ * @param flag indicates whether to append data to a new block instead of the
|
|
|
+ * last block
|
|
|
+ * @param progress for reporting write-progress; null is acceptable.
|
|
|
+ * @param statistics file system statistics; null is acceptable.
|
|
|
+ * @param favoredNodes FavoredNodes for new blocks
|
|
|
+ * @return an output stream for writing into the file
|
|
|
+ * @see ClientProtocol#append(String, String, EnumSetWritable)
|
|
|
+ */
|
|
|
+ public HdfsDataOutputStream append(final String src, final int buffersize,
|
|
|
+ EnumSet<CreateFlag> flag, final Progressable progress,
|
|
|
+ final FileSystem.Statistics statistics,
|
|
|
+ final InetSocketAddress[] favoredNodes) throws IOException {
|
|
|
+ final DFSOutputStream out = append(src, buffersize, flag,
|
|
|
+ getFavoredNodesStr(favoredNodes), progress);
|
|
|
return createWrappedOutputStream(out, statistics, out.getInitialLen());
|
|
|
}
|
|
|
|
|
|
private DFSOutputStream append(String src, int buffersize,
|
|
|
- EnumSet<CreateFlag> flag, Progressable progress) throws IOException {
|
|
|
+ EnumSet<CreateFlag> flag, String[] favoredNodes, Progressable progress)
|
|
|
+ throws IOException {
|
|
|
checkOpen();
|
|
|
- final DFSOutputStream result = callAppend(src, buffersize, flag, progress);
|
|
|
+ final DFSOutputStream result = callAppend(src, buffersize, flag, progress,
|
|
|
+ favoredNodes);
|
|
|
beginFileLease(result.getFileId(), result);
|
|
|
return result;
|
|
|
}
|