|
@@ -129,8 +129,6 @@ import java.util.Map;
|
|
|
import java.util.NoSuchElementException;
|
|
|
import java.util.Optional;
|
|
|
|
|
|
-import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
|
|
|
-
|
|
|
/****************************************************************
|
|
|
* Implementation of the abstract FileSystem for the DFS system.
|
|
|
* This object is the way end-user code interacts with a Hadoop
|
|
@@ -329,7 +327,12 @@ public class DistributedFileSystem extends FileSystem
|
|
|
public FSDataInputStream doCall(final Path p) throws IOException {
|
|
|
final DFSInputStream dfsis =
|
|
|
dfs.open(getPathName(p), bufferSize, verifyChecksum);
|
|
|
- return dfs.createWrappedInputStream(dfsis);
|
|
|
+ try {
|
|
|
+ return dfs.createWrappedInputStream(dfsis);
|
|
|
+ } catch (IOException ex){
|
|
|
+ dfsis.close();
|
|
|
+ throw ex;
|
|
|
+ }
|
|
|
}
|
|
|
@Override
|
|
|
public FSDataInputStream next(final FileSystem fs, final Path p)
|
|
@@ -510,7 +513,7 @@ public class DistributedFileSystem extends FileSystem
|
|
|
: EnumSet.of(CreateFlag.CREATE),
|
|
|
true, replication, blockSize, progress, bufferSize, null,
|
|
|
favoredNodes);
|
|
|
- return dfs.createWrappedOutputStream(out, statistics);
|
|
|
+ return safelyCreateWrappedOutputStream(out);
|
|
|
}
|
|
|
@Override
|
|
|
public HdfsDataOutputStream next(final FileSystem fs, final Path p)
|
|
@@ -542,7 +545,7 @@ public class DistributedFileSystem extends FileSystem
|
|
|
final DFSOutputStream dfsos = dfs.create(getPathName(p), permission,
|
|
|
cflags, replication, blockSize, progress, bufferSize,
|
|
|
checksumOpt);
|
|
|
- return dfs.createWrappedOutputStream(dfsos, statistics);
|
|
|
+ return safelyCreateWrappedOutputStream(dfsos);
|
|
|
}
|
|
|
@Override
|
|
|
public FSDataOutputStream next(final FileSystem fs, final Path p)
|
|
@@ -590,7 +593,7 @@ public class DistributedFileSystem extends FileSystem
|
|
|
final DFSOutputStream out = dfs.create(getPathName(f), permission,
|
|
|
flag, true, replication, blockSize, progress, bufferSize,
|
|
|
checksumOpt, favoredNodes, ecPolicyName, storagePolicy);
|
|
|
- return dfs.createWrappedOutputStream(out, statistics);
|
|
|
+ return safelyCreateWrappedOutputStream(out);
|
|
|
}
|
|
|
@Override
|
|
|
public HdfsDataOutputStream next(final FileSystem fs, final Path p)
|
|
@@ -619,7 +622,7 @@ public class DistributedFileSystem extends FileSystem
|
|
|
getPathName(fixRelativePart(f)),
|
|
|
absolutePermission, flag, true, replication, blockSize,
|
|
|
progress, bufferSize, checksumOpt);
|
|
|
- return dfs.createWrappedOutputStream(dfsos, statistics);
|
|
|
+ return safelyCreateWrappedOutputStream(dfsos);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -648,7 +651,7 @@ public class DistributedFileSystem extends FileSystem
|
|
|
final DFSOutputStream out = dfs.create(getPathName(f), permission,
|
|
|
flag, false, replication, blockSize, progress, bufferSize,
|
|
|
checksumOpt, favoredNodes, ecPolicyName, storagePolicyName);
|
|
|
- return dfs.createWrappedOutputStream(out, statistics);
|
|
|
+ return safelyCreateWrappedOutputStream(out);
|
|
|
}
|
|
|
@Override
|
|
|
public HdfsDataOutputStream next(final FileSystem fs, final Path p)
|
|
@@ -685,7 +688,7 @@ public class DistributedFileSystem extends FileSystem
|
|
|
public FSDataOutputStream doCall(final Path p) throws IOException {
|
|
|
final DFSOutputStream dfsos = dfs.create(getPathName(p), permission,
|
|
|
flag, false, replication, blockSize, progress, bufferSize, null);
|
|
|
- return dfs.createWrappedOutputStream(dfsos, statistics);
|
|
|
+ return safelyCreateWrappedOutputStream(dfsos);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -697,6 +700,20 @@ public class DistributedFileSystem extends FileSystem
|
|
|
}.resolve(this, absF);
|
|
|
}
|
|
|
|
|
|
+ // Private helper to ensure the wrapped inner stream is closed safely
|
|
|
+ // upon IOException throw during wrap.
|
|
|
+ // Assuming the caller owns the inner stream which needs to be closed upon
|
|
|
+ // wrap failure.
|
|
|
+ private HdfsDataOutputStream safelyCreateWrappedOutputStream(
|
|
|
+ DFSOutputStream dfsos) throws IOException {
|
|
|
+ try {
|
|
|
+ return dfs.createWrappedOutputStream(dfsos, statistics);
|
|
|
+ } catch (IOException ex) {
|
|
|
+ dfsos.close();
|
|
|
+ throw ex;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
public boolean setReplication(Path src, final short replication)
|
|
|
throws IOException {
|