|
@@ -631,28 +631,11 @@ public class RouterRpcServer extends AbstractService
|
|
|
RemoteLocation createLocation = locations.get(0);
|
|
|
if (locations.size() > 1) {
|
|
|
try {
|
|
|
- // Check if this file already exists in other subclusters
|
|
|
- LocatedBlocks existingLocation = getBlockLocations(src, 0, 1);
|
|
|
+ RemoteLocation existingLocation = getExistingLocation(src, locations);
|
|
|
+ // Forward to the existing location and let the NN handle the error
|
|
|
if (existingLocation != null) {
|
|
|
- // Forward to the existing location and let the NN handle the error
|
|
|
- LocatedBlock existingLocationLastLocatedBlock =
|
|
|
- existingLocation.getLastLocatedBlock();
|
|
|
- if (existingLocationLastLocatedBlock == null) {
|
|
|
- // The block has no blocks yet, check for the meta data
|
|
|
- for (RemoteLocation location : locations) {
|
|
|
- RemoteMethod method = new RemoteMethod("getFileInfo",
|
|
|
- new Class<?>[] {String.class}, new RemoteParam());
|
|
|
- if (rpcClient.invokeSingle(location, method) != null) {
|
|
|
- createLocation = location;
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
- } else {
|
|
|
- ExtendedBlock existingLocationLastBlock =
|
|
|
- existingLocationLastLocatedBlock.getBlock();
|
|
|
- String blockPoolId = existingLocationLastBlock.getBlockPoolId();
|
|
|
- createLocation = getLocationForPath(src, true, blockPoolId);
|
|
|
- }
|
|
|
+ LOG.debug("{} already exists in {}.", src, existingLocation);
|
|
|
+ createLocation = existingLocation;
|
|
|
}
|
|
|
} catch (FileNotFoundException fne) {
|
|
|
// Ignore if the file is not found
|
|
@@ -661,6 +644,27 @@ public class RouterRpcServer extends AbstractService
|
|
|
return createLocation;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Gets the remote location where the file exists.
|
|
|
+ * @param src the name of file.
|
|
|
+ * @param locations all the remote locations.
|
|
|
+ * @return the remote location of the file if it exists, else null.
|
|
|
+ * @throws IOException in case of any exception.
|
|
|
+ */
|
|
|
+ private RemoteLocation getExistingLocation(String src,
|
|
|
+ List<RemoteLocation> locations) throws IOException {
|
|
|
+ RemoteMethod method = new RemoteMethod("getFileInfo",
|
|
|
+ new Class<?>[] {String.class}, new RemoteParam());
|
|
|
+ Map<RemoteLocation, HdfsFileStatus> results = rpcClient.invokeConcurrent(
|
|
|
+ locations, method, false, false, HdfsFileStatus.class);
|
|
|
+ for (RemoteLocation loc : locations) {
|
|
|
+ if (results.get(loc) != null) {
|
|
|
+ return loc;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
@Override // ClientProtocol
|
|
|
public LastBlockWithStatus append(String src, final String clientName,
|
|
|
final EnumSetWritable<CreateFlag> flag) throws IOException {
|