浏览代码

HDFS-13956. iNotify should include information to identify a file as either replicated or erasure coded. Contributed by Hrishikesh Gadre.

Signed-off-by: Wei-Chiu Chuang <weichiu@apache.org>
(cherry picked from commit bf3d591f0cb0fedeab5d89cc8d2270d3b9a70313)
Hrishikesh Gadre 6 年之前
父节点
当前提交
90a9837c9d

+ 14 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/inotify/Event.java

@@ -25,6 +25,7 @@ import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.FsPermission;
 
 
 import java.util.List;
 import java.util.List;
+import java.util.Optional;
 
 
 /**
 /**
  * Events sent by the inotify system. Note that no events are necessarily sent
  * Events sent by the inotify system. Note that no events are necessarily sent
@@ -112,6 +113,7 @@ public abstract class Event {
     private String symlinkTarget;
     private String symlinkTarget;
     private boolean overwrite;
     private boolean overwrite;
     private long defaultBlockSize;
     private long defaultBlockSize;
+    private Optional<Boolean> erasureCoded;
 
 
     public static class Builder {
     public static class Builder {
       private INodeType iNodeType;
       private INodeType iNodeType;
@@ -124,6 +126,7 @@ public abstract class Event {
       private String symlinkTarget;
       private String symlinkTarget;
       private boolean overwrite;
       private boolean overwrite;
       private long defaultBlockSize = 0;
       private long defaultBlockSize = 0;
+      private Optional<Boolean> erasureCoded = Optional.empty();
 
 
       public Builder iNodeType(INodeType type) {
       public Builder iNodeType(INodeType type) {
         this.iNodeType = type;
         this.iNodeType = type;
@@ -175,6 +178,11 @@ public abstract class Event {
         return this;
         return this;
       }
       }
 
 
+      public Builder erasureCoded(boolean ecCoded) {
+        this.erasureCoded = Optional.of(ecCoded);
+        return this;
+      }
+
       public CreateEvent build() {
       public CreateEvent build() {
         return new CreateEvent(this);
         return new CreateEvent(this);
       }
       }
@@ -192,6 +200,7 @@ public abstract class Event {
       this.symlinkTarget = b.symlinkTarget;
       this.symlinkTarget = b.symlinkTarget;
       this.overwrite = b.overwrite;
       this.overwrite = b.overwrite;
       this.defaultBlockSize = b.defaultBlockSize;
       this.defaultBlockSize = b.defaultBlockSize;
+      this.erasureCoded = b.erasureCoded;
     }
     }
 
 
     public INodeType getiNodeType() {
     public INodeType getiNodeType() {
@@ -243,6 +252,10 @@ public abstract class Event {
       return defaultBlockSize;
       return defaultBlockSize;
     }
     }
 
 
+    public Optional<Boolean> isErasureCoded() {
+      return erasureCoded;
+    }
+
     @Override
     @Override
     @InterfaceStability.Unstable
     @InterfaceStability.Unstable
     public String toString() {
     public String toString() {
@@ -261,6 +274,7 @@ public abstract class Event {
 
 
       content.append("overwrite=").append(overwrite)
       content.append("overwrite=").append(overwrite)
           .append(", defaultBlockSize=").append(defaultBlockSize)
           .append(", defaultBlockSize=").append(defaultBlockSize)
+          .append(", erasureCoded=").append(erasureCoded)
           .append("]");
           .append("]");
       return content.toString();
       return content.toString();
     }
     }

+ 34 - 26
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java

@@ -908,18 +908,22 @@ public class PBHelperClient {
         case EVENT_CREATE:
         case EVENT_CREATE:
           InotifyProtos.CreateEventProto create =
           InotifyProtos.CreateEventProto create =
               InotifyProtos.CreateEventProto.parseFrom(p.getContents());
               InotifyProtos.CreateEventProto.parseFrom(p.getContents());
-          events.add(new Event.CreateEvent.Builder()
-              .iNodeType(createTypeConvert(create.getType()))
-              .path(create.getPath())
-              .ctime(create.getCtime())
-              .ownerName(create.getOwnerName())
-              .groupName(create.getGroupName())
-              .perms(convert(create.getPerms()))
-              .replication(create.getReplication())
-              .symlinkTarget(create.getSymlinkTarget().isEmpty() ? null :
-                  create.getSymlinkTarget())
-              .defaultBlockSize(create.getDefaultBlockSize())
-              .overwrite(create.getOverwrite()).build());
+          Event.CreateEvent.Builder builder = new Event.CreateEvent.Builder()
+                  .iNodeType(createTypeConvert(create.getType()))
+                  .path(create.getPath())
+                  .ctime(create.getCtime())
+                  .ownerName(create.getOwnerName())
+                  .groupName(create.getGroupName())
+                  .perms(convert(create.getPerms()))
+                  .replication(create.getReplication())
+                  .symlinkTarget(create.getSymlinkTarget().isEmpty() ? null :
+                          create.getSymlinkTarget())
+                  .defaultBlockSize(create.getDefaultBlockSize())
+                  .overwrite(create.getOverwrite());
+          if (create.hasErasureCoded()) {
+            builder.erasureCoded(create.getErasureCoded());
+          }
+          events.add(builder.build());
           break;
           break;
         case EVENT_METADATA:
         case EVENT_METADATA:
           InotifyProtos.MetadataUpdateEventProto meta =
           InotifyProtos.MetadataUpdateEventProto meta =
@@ -2909,22 +2913,26 @@ public class PBHelperClient {
           break;
           break;
         case CREATE:
         case CREATE:
           Event.CreateEvent ce2 = (Event.CreateEvent) e;
           Event.CreateEvent ce2 = (Event.CreateEvent) e;
+          InotifyProtos.CreateEventProto.Builder pB =
+                  (InotifyProtos.CreateEventProto.newBuilder());
+          pB.setType(createTypeConvert(ce2.getiNodeType()))
+             .setPath(ce2.getPath())
+             .setCtime(ce2.getCtime())
+             .setOwnerName(ce2.getOwnerName())
+             .setGroupName(ce2.getGroupName())
+             .setPerms(convert(ce2.getPerms()))
+             .setReplication(ce2.getReplication())
+             .setSymlinkTarget(ce2.getSymlinkTarget() == null ?
+                        "" : ce2.getSymlinkTarget())
+             .setDefaultBlockSize(ce2.getDefaultBlockSize())
+             .setOverwrite(ce2.getOverwrite());
+          if (ce2.isErasureCoded().isPresent()) {
+            pB.setErasureCoded(ce2.isErasureCoded().get());
+          }
           events.add(InotifyProtos.EventProto.newBuilder()
           events.add(InotifyProtos.EventProto.newBuilder()
               .setType(InotifyProtos.EventType.EVENT_CREATE)
               .setType(InotifyProtos.EventType.EVENT_CREATE)
-              .setContents(
-                  InotifyProtos.CreateEventProto.newBuilder()
-                      .setType(createTypeConvert(ce2.getiNodeType()))
-                      .setPath(ce2.getPath())
-                      .setCtime(ce2.getCtime())
-                      .setOwnerName(ce2.getOwnerName())
-                      .setGroupName(ce2.getGroupName())
-                      .setPerms(convert(ce2.getPerms()))
-                      .setReplication(ce2.getReplication())
-                      .setSymlinkTarget(ce2.getSymlinkTarget() == null ?
-                          "" : ce2.getSymlinkTarget())
-                      .setDefaultBlockSize(ce2.getDefaultBlockSize())
-                      .setOverwrite(ce2.getOverwrite()).build().toByteString()
-              ).build());
+              .setContents(pB.build().toByteString())
+              .build());
           break;
           break;
         case METADATA:
         case METADATA:
           Event.MetadataUpdateEvent me = (Event.MetadataUpdateEvent) e;
           Event.MetadataUpdateEvent me = (Event.MetadataUpdateEvent) e;

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/inotify.proto

@@ -80,6 +80,7 @@ message CreateEventProto {
   optional string symlinkTarget = 8;
   optional string symlinkTarget = 8;
   optional bool overwrite = 9;
   optional bool overwrite = 9;
   optional int64 defaultBlockSize = 10 [default=0];
   optional int64 defaultBlockSize = 10 [default=0];
+  optional bool erasureCoded = 11;
 }
 }
 
 
 message CloseEventProto {
 message CloseEventProto {

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/InotifyFSEditLogOpTranslator.java

@@ -23,6 +23,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.inotify.Event;
 import org.apache.hadoop.hdfs.inotify.Event;
 import org.apache.hadoop.hdfs.inotify.EventBatch;
 import org.apache.hadoop.hdfs.inotify.EventBatch;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.io.erasurecode.ErasureCodeConstants;
 
 
 import java.util.List;
 import java.util.List;
 
 
@@ -54,6 +55,8 @@ public class InotifyFSEditLogOpTranslator {
             .perms(addOp.permissions.getPermission())
             .perms(addOp.permissions.getPermission())
             .overwrite(addOp.overwrite)
             .overwrite(addOp.overwrite)
             .defaultBlockSize(addOp.blockSize)
             .defaultBlockSize(addOp.blockSize)
+            .erasureCoded(addOp.erasureCodingPolicyId
+                    != ErasureCodeConstants.REPLICATION_POLICY_ID)
             .iNodeType(Event.CreateEvent.INodeType.FILE).build() });
             .iNodeType(Event.CreateEvent.INodeType.FILE).build() });
       } else { // append
       } else { // append
         return new EventBatch(op.txid,
         return new EventBatch(op.txid,

+ 100 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInotifyEventInputStream.java

@@ -18,6 +18,7 @@
 
 
 package org.apache.hadoop.hdfs;
 package org.apache.hadoop.hdfs;
 
 
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
@@ -140,6 +141,7 @@ public class TestDFSInotifyEventInputStream {
       client.rename("/file5", "/dir"); // RenameOldOp -> RenameEvent
       client.rename("/file5", "/dir"); // RenameOldOp -> RenameEvent
       //TruncateOp -> TruncateEvent
       //TruncateOp -> TruncateEvent
       client.truncate("/truncate_file", BLOCK_SIZE);
       client.truncate("/truncate_file", BLOCK_SIZE);
+      client.create("/file_ec_test1", false);
       EventBatch batch = null;
       EventBatch batch = null;
 
 
       // RenameOp
       // RenameOp
@@ -180,6 +182,8 @@ public class TestDFSInotifyEventInputStream {
       Assert.assertTrue(ce.getSymlinkTarget() == null);
       Assert.assertTrue(ce.getSymlinkTarget() == null);
       Assert.assertTrue(ce.getOverwrite());
       Assert.assertTrue(ce.getOverwrite());
       Assert.assertEquals(BLOCK_SIZE, ce.getDefaultBlockSize());
       Assert.assertEquals(BLOCK_SIZE, ce.getDefaultBlockSize());
+      Assert.assertTrue(ce.isErasureCoded().isPresent());
+      Assert.assertFalse(ce.isErasureCoded().get());
       LOG.info(ce.toString());
       LOG.info(ce.toString());
       Assert.assertTrue(ce.toString().startsWith("CreateEvent [INodeType="));
       Assert.assertTrue(ce.toString().startsWith("CreateEvent [INodeType="));
 
 
@@ -395,6 +399,25 @@ public class TestDFSInotifyEventInputStream {
       LOG.info(et.toString());
       LOG.info(et.toString());
       Assert.assertTrue(et.toString().startsWith("TruncateEvent [path="));
       Assert.assertTrue(et.toString().startsWith("TruncateEvent [path="));
 
 
+      // CreateEvent without overwrite
+      batch = waitForNextEvents(eis);
+      Assert.assertEquals(1, batch.getEvents().length);
+      txid = checkTxid(batch, txid);
+      Assert.assertTrue(batch.getEvents()[0].getEventType()
+              == Event.EventType.CREATE);
+      ce = (Event.CreateEvent) batch.getEvents()[0];
+      Assert.assertTrue(ce.getiNodeType() == Event.CreateEvent.INodeType.FILE);
+      Assert.assertTrue(ce.getPath().equals("/file_ec_test1"));
+      Assert.assertTrue(ce.getCtime() > 0);
+      Assert.assertTrue(ce.getReplication() > 0);
+      Assert.assertTrue(ce.getSymlinkTarget() == null);
+      Assert.assertFalse(ce.getOverwrite());
+      Assert.assertEquals(BLOCK_SIZE, ce.getDefaultBlockSize());
+      Assert.assertTrue(ce.isErasureCoded().isPresent());
+      Assert.assertFalse(ce.isErasureCoded().get());
+      LOG.info(ce.toString());
+      Assert.assertTrue(ce.toString().startsWith("CreateEvent [INodeType="));
+
       // Returns null when there are no further events
       // Returns null when there are no further events
       Assert.assertTrue(eis.poll() == null);
       Assert.assertTrue(eis.poll() == null);
 
 
@@ -410,6 +433,83 @@ public class TestDFSInotifyEventInputStream {
     }
     }
   }
   }
 
 
+  @Test(timeout = 120000)
+  public void testErasureCodedFiles() throws Exception {
+    ErasureCodingPolicy ecPolicy = StripedFileTestUtil.getDefaultECPolicy();
+    final int dataUnits = ecPolicy.getNumDataUnits();
+    final int parityUnits = ecPolicy.getNumParityUnits();
+
+    Configuration conf = new HdfsConfiguration();
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, ecPolicy.getCellSize());
+    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
+    // so that we can get an atime change
+    conf.setLong(DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY, 1);
+
+    MiniQJMHACluster.Builder builder = new MiniQJMHACluster.Builder(conf);
+    builder.getDfsBuilder().numDataNodes(dataUnits + parityUnits);
+    MiniQJMHACluster cluster = builder.build();
+
+    try {
+      cluster.getDfsCluster().waitActive();
+      cluster.getDfsCluster().transitionToActive(0);
+      DFSClient client = new DFSClient(cluster.getDfsCluster().getNameNode(0)
+              .getNameNodeAddress(), conf);
+      DistributedFileSystem fs =
+              (DistributedFileSystem)cluster.getDfsCluster().getFileSystem(0);
+
+      Path ecDir = new Path("/ecdir");
+      fs.mkdirs(ecDir);
+      fs.setErasureCodingPolicy(ecDir, ecPolicy.getName());
+
+      DFSInotifyEventInputStream eis = client.getInotifyEventStream();
+
+      int sz = ecPolicy.getNumDataUnits() * ecPolicy.getCellSize();
+      byte[] contents = new byte[sz];
+      DFSTestUtil.writeFile(fs, new Path("/ecdir/file_ec_test2"), contents);
+
+      EventBatch batch = null;
+
+      batch = waitForNextEvents(eis);
+      Assert.assertEquals(1, batch.getEvents().length);
+      long txid = batch.getTxid();
+      long eventsBehind = eis.getTxidsBehindEstimate();
+      Assert.assertTrue(batch.getEvents()[0].getEventType()
+              == Event.EventType.CREATE);
+      Event.CreateEvent ce = (Event.CreateEvent) batch.getEvents()[0];
+      Assert.assertTrue(ce.getiNodeType() == Event.CreateEvent.INodeType.FILE);
+      Assert.assertTrue(ce.getPath().equals("/ecdir/file_ec_test2"));
+      Assert.assertTrue(ce.getCtime() > 0);
+      Assert.assertEquals(1, ce.getReplication());
+      Assert.assertTrue(ce.getSymlinkTarget() == null);
+      Assert.assertTrue(ce.getOverwrite());
+      Assert.assertEquals(ecPolicy.getCellSize(), ce.getDefaultBlockSize());
+      Assert.assertTrue(ce.isErasureCoded().isPresent());
+      Assert.assertTrue(ce.isErasureCoded().get());
+      LOG.info(ce.toString());
+      Assert.assertTrue(ce.toString().startsWith("CreateEvent [INodeType="));
+
+      batch = waitForNextEvents(eis);
+      Assert.assertEquals(1, batch.getEvents().length);
+      txid = checkTxid(batch, txid);
+      Assert.assertTrue(batch.getEvents()[0].getEventType()
+              == Event.EventType.CLOSE);
+      Assert.assertTrue(((Event.CloseEvent) batch.getEvents()[0]).getPath()
+              .equals("/ecdir/file_ec_test2"));
+
+      // Returns null when there are no further events
+      Assert.assertTrue(eis.poll() == null);
+
+      // make sure the estimate hasn't changed since the above assertion
+      // tells us that we are fully caught up to the current namesystem state
+      // and we should not have been behind at all when eventsBehind was set
+      // either, since there were few enough events that they should have all
+      // been read to the client during the first poll() call
+      Assert.assertTrue(eis.getTxidsBehindEstimate() == eventsBehind);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
   @Test(timeout = 120000)
   @Test(timeout = 120000)
   public void testNNFailover() throws IOException, URISyntaxException,
   public void testNNFailover() throws IOException, URISyntaxException,
       MissingEventsException {
       MissingEventsException {