|
@@ -103,6 +103,7 @@ import java.util.concurrent.atomic.AtomicLong;
|
|
|
import org.apache.commons.text.CaseUtils;
|
|
|
import org.apache.hadoop.hdfs.protocol.ECTopologyVerifierResult;
|
|
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
|
|
+import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
|
|
|
import org.apache.hadoop.hdfs.protocol.SnapshotStatus;
|
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY;
|
|
|
import static org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.*;
|
|
@@ -2202,14 +2203,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- } else if (haEnabled && haContext != null &&
|
|
|
- haContext.getState().getServiceState() == OBSERVER) {
|
|
|
- for (LocatedBlock b : res.blocks.getLocatedBlocks()) {
|
|
|
- if (b.getLocations() == null || b.getLocations().length == 0) {
|
|
|
- throw new ObserverRetryOnActiveException("Zero blocklocations "
|
|
|
- + "for " + srcArg);
|
|
|
- }
|
|
|
- }
|
|
|
+ } else if (isObserver()) {
|
|
|
+ checkBlockLocationsWhenObserver(res.blocks, srcArg);
|
|
|
}
|
|
|
} finally {
|
|
|
readUnlock(operationName, getLockReportInfoSupplier(srcArg));
|
|
@@ -3470,6 +3465,10 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|
|
logAuditEvent(false, operationName, src);
|
|
|
throw e;
|
|
|
}
|
|
|
+ if (needLocation && isObserver() && stat instanceof HdfsLocatedFileStatus) {
|
|
|
+ LocatedBlocks lbs = ((HdfsLocatedFileStatus) stat).getLocatedBlocks();
|
|
|
+ checkBlockLocationsWhenObserver(lbs, src);
|
|
|
+ }
|
|
|
logAuditEvent(true, operationName, src);
|
|
|
return stat;
|
|
|
}
|
|
@@ -4175,6 +4174,14 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|
|
logAuditEvent(false, operationName, src);
|
|
|
throw e;
|
|
|
}
|
|
|
+ if (needLocation && isObserver()) {
|
|
|
+ for (HdfsFileStatus fs : dl.getPartialListing()) {
|
|
|
+ if (fs instanceof HdfsLocatedFileStatus) {
|
|
|
+ LocatedBlocks lbs = ((HdfsLocatedFileStatus) fs).getLocatedBlocks();
|
|
|
+ checkBlockLocationsWhenObserver(lbs, fs.toString());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
logAuditEvent(true, operationName, src);
|
|
|
return dl;
|
|
|
}
|
|
@@ -9020,4 +9027,17 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|
|
throw new UnsupportedActionException(operationName + " not supported.");
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ private boolean isObserver() {
|
|
|
+ return haEnabled && haContext != null && haContext.getState().getServiceState() == OBSERVER;
|
|
|
+ }
|
|
|
+
|
|
|
+ private void checkBlockLocationsWhenObserver(LocatedBlocks blocks, String src)
|
|
|
+ throws ObserverRetryOnActiveException {
|
|
|
+ for (LocatedBlock b : blocks.getLocatedBlocks()) {
|
|
|
+ if (b.getLocations() == null || b.getLocations().length == 0) {
|
|
|
+ throw new ObserverRetryOnActiveException("Zero blocklocations for " + src);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|