|
@@ -15,7 +15,6 @@
|
|
* See the License for the specific language governing permissions and
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
* limitations under the License.
|
|
*/
|
|
*/
|
|
-
|
|
|
|
package org.apache.hadoop.hdds.scm.container;
|
|
package org.apache.hadoop.hdds.scm.container;
|
|
|
|
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
@@ -23,18 +22,16 @@ import java.util.Set;
|
|
import java.util.stream.Collectors;
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
|
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
|
-import org.apache.hadoop.hdds.protocol.proto
|
|
|
|
- .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
|
|
|
|
|
|
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
|
|
|
|
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
|
|
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
|
|
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
|
|
-import org.apache.hadoop.hdds.scm.container.replication
|
|
|
|
- .ReplicationActivityStatus;
|
|
|
|
|
|
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationActivityStatus;
|
|
import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest;
|
|
import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest;
|
|
import org.apache.hadoop.hdds.scm.events.SCMEvents;
|
|
import org.apache.hadoop.hdds.scm.events.SCMEvents;
|
|
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
|
|
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
|
|
import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap;
|
|
import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap;
|
|
import org.apache.hadoop.hdds.scm.node.states.ReportResult;
|
|
import org.apache.hadoop.hdds.scm.node.states.ReportResult;
|
|
-import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
|
|
|
|
- .ContainerReportFromDatanode;
|
|
|
|
|
|
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode;
|
|
import org.apache.hadoop.hdds.server.events.EventHandler;
|
|
import org.apache.hadoop.hdds.server.events.EventHandler;
|
|
import org.apache.hadoop.hdds.server.events.EventPublisher;
|
|
import org.apache.hadoop.hdds.server.events.EventPublisher;
|
|
|
|
|
|
@@ -59,22 +56,21 @@ public class ContainerReportHandler implements
|
|
|
|
|
|
private ReplicationActivityStatus replicationStatus;
|
|
private ReplicationActivityStatus replicationStatus;
|
|
|
|
|
|
-
|
|
|
|
public ContainerReportHandler(Mapping containerMapping,
|
|
public ContainerReportHandler(Mapping containerMapping,
|
|
Node2ContainerMap node2ContainerMap,
|
|
Node2ContainerMap node2ContainerMap,
|
|
ReplicationActivityStatus replicationActivityStatus) {
|
|
ReplicationActivityStatus replicationActivityStatus) {
|
|
Preconditions.checkNotNull(containerMapping);
|
|
Preconditions.checkNotNull(containerMapping);
|
|
Preconditions.checkNotNull(node2ContainerMap);
|
|
Preconditions.checkNotNull(node2ContainerMap);
|
|
Preconditions.checkNotNull(replicationActivityStatus);
|
|
Preconditions.checkNotNull(replicationActivityStatus);
|
|
|
|
+ this.containerStateManager = containerMapping.getStateManager();
|
|
this.containerMapping = containerMapping;
|
|
this.containerMapping = containerMapping;
|
|
this.node2ContainerMap = node2ContainerMap;
|
|
this.node2ContainerMap = node2ContainerMap;
|
|
- this.containerStateManager = containerMapping.getStateManager();
|
|
|
|
this.replicationStatus = replicationActivityStatus;
|
|
this.replicationStatus = replicationActivityStatus;
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public void onMessage(ContainerReportFromDatanode containerReportFromDatanode,
|
|
public void onMessage(ContainerReportFromDatanode containerReportFromDatanode,
|
|
- EventPublisher publisher) {
|
|
|
|
|
|
+ EventPublisher publisher) {
|
|
|
|
|
|
DatanodeDetails datanodeOrigin =
|
|
DatanodeDetails datanodeOrigin =
|
|
containerReportFromDatanode.getDatanodeDetails();
|
|
containerReportFromDatanode.getDatanodeDetails();
|
|
@@ -88,7 +84,8 @@ public class ContainerReportHandler implements
|
|
.processContainerReports(datanodeOrigin, containerReport, false);
|
|
.processContainerReports(datanodeOrigin, containerReport, false);
|
|
|
|
|
|
Set<ContainerID> containerIds = containerReport.getReportsList().stream()
|
|
Set<ContainerID> containerIds = containerReport.getReportsList().stream()
|
|
- .map(containerProto -> containerProto.getContainerID())
|
|
|
|
|
|
+ .map(StorageContainerDatanodeProtocolProtos
|
|
|
|
+ .ContainerInfo::getContainerID)
|
|
.map(ContainerID::new)
|
|
.map(ContainerID::new)
|
|
.collect(Collectors.toSet());
|
|
.collect(Collectors.toSet());
|
|
|
|
|
|
@@ -102,13 +99,12 @@ public class ContainerReportHandler implements
|
|
for (ContainerID containerID : reportResult.getMissingContainers()) {
|
|
for (ContainerID containerID : reportResult.getMissingContainers()) {
|
|
containerStateManager
|
|
containerStateManager
|
|
.removeContainerReplica(containerID, datanodeOrigin);
|
|
.removeContainerReplica(containerID, datanodeOrigin);
|
|
- emitReplicationRequestEvent(containerID, publisher);
|
|
|
|
|
|
+ checkReplicationState(containerID, publisher);
|
|
}
|
|
}
|
|
|
|
|
|
for (ContainerID containerID : reportResult.getNewContainers()) {
|
|
for (ContainerID containerID : reportResult.getNewContainers()) {
|
|
containerStateManager.addContainerReplica(containerID, datanodeOrigin);
|
|
containerStateManager.addContainerReplica(containerID, datanodeOrigin);
|
|
-
|
|
|
|
- emitReplicationRequestEvent(containerID, publisher);
|
|
|
|
|
|
+ checkReplicationState(containerID, publisher);
|
|
}
|
|
}
|
|
|
|
|
|
} catch (IOException e) {
|
|
} catch (IOException e) {
|
|
@@ -119,8 +115,9 @@ public class ContainerReportHandler implements
|
|
|
|
|
|
}
|
|
}
|
|
|
|
|
|
- private void emitReplicationRequestEvent(ContainerID containerID,
|
|
|
|
- EventPublisher publisher) throws SCMException {
|
|
|
|
|
|
+ private void checkReplicationState(ContainerID containerID,
|
|
|
|
+ EventPublisher publisher)
|
|
|
|
+ throws SCMException {
|
|
ContainerInfo container = containerStateManager.getContainer(containerID);
|
|
ContainerInfo container = containerStateManager.getContainer(containerID);
|
|
|
|
|
|
if (container == null) {
|
|
if (container == null) {
|
|
@@ -134,18 +131,18 @@ public class ContainerReportHandler implements
|
|
if (container.isContainerOpen()) {
|
|
if (container.isContainerOpen()) {
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
- if (replicationStatus.isReplicationEnabled()) {
|
|
|
|
-
|
|
|
|
- int existingReplicas =
|
|
|
|
- containerStateManager.getContainerReplicas(containerID).size();
|
|
|
|
-
|
|
|
|
- int expectedReplicas = container.getReplicationFactor().getNumber();
|
|
|
|
-
|
|
|
|
- if (existingReplicas != expectedReplicas) {
|
|
|
|
|
|
|
|
|
|
+ ReplicationRequest replicationState =
|
|
|
|
+ containerStateManager.checkReplicationState(containerID);
|
|
|
|
+ if (replicationState != null) {
|
|
|
|
+ if (replicationStatus.isReplicationEnabled()) {
|
|
publisher.fireEvent(SCMEvents.REPLICATE_CONTAINER,
|
|
publisher.fireEvent(SCMEvents.REPLICATE_CONTAINER,
|
|
- new ReplicationRequest(containerID.getId(), existingReplicas,
|
|
|
|
- container.getReplicationFactor().getNumber()));
|
|
|
|
|
|
+ replicationState);
|
|
|
|
+ } else {
|
|
|
|
+ LOG.warn(
|
|
|
|
+ "Over/under replicated container but the replication is not "
|
|
|
|
+ + "(yet) enabled: "
|
|
|
|
+ + replicationState.toString());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|