浏览代码

HDFS-12636. Ozone: OzoneFileSystem: Implement seek functionality for rpc client. Contributed by Lokesh Jain.

Mukul Kumar Singh 7 年之前
父节点
当前提交
8bbc28a974
共有 14 个文件被更改,包括 559 次插入475 次删除
  1. 131 17
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java
  2. 25 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
  3. 8 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneInputStream.java
  4. 4 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneOutputStream.java
  5. 92 20
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ChunkInputStream.java
  6. 4 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/MiniOzoneClassicCluster.java
  7. 68 5
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestChunkStreams.java
  8. 74 0
      hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSInputStream.java
  9. 59 0
      hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSOutputStream.java
  10. 83 126
      hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java
  11. 0 191
      hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/OzoneInputStream.java
  12. 0 113
      hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/OzoneOutputStream.java
  13. 3 1
      hadoop-tools/hadoop-ozone/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileInterfaces.java
  14. 8 2
      hadoop-tools/hadoop-ozone/src/test/java/org/apache/hadoop/fs/ozone/contract/OzoneContract.java

+ 131 - 17
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java

@@ -18,6 +18,8 @@
 package org.apache.hadoop.ozone.client.io;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.Seekable;
 import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
 import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
 import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo;
@@ -27,18 +29,21 @@ import org.apache.hadoop.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
 import org.apache.hadoop.scm.storage.ChunkInputStream;
 import org.apache.hadoop.scm.storage.ContainerProtocolCalls;
+import org.apache.ratis.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
 /**
  * Maintaining a list of ChunkInputStream. Read based on offset.
  */
-public class ChunkGroupInputStream extends InputStream {
+public class ChunkGroupInputStream extends InputStream implements Seekable {
 
   private static final Logger LOG =
       LoggerFactory.getLogger(ChunkGroupInputStream.class);
@@ -46,7 +51,13 @@ public class ChunkGroupInputStream extends InputStream {
   private static final int EOF = -1;
 
   private final ArrayList<ChunkInputStreamEntry> streamEntries;
+  // streamOffset[i] stores the offset at which chunkInputStream i stores
+  // data in the key
+  private long[] streamOffset = null;
   private int currentStreamIndex;
+  private long length = 0;
+  private boolean closed = false;
+  private String key;
 
   public ChunkGroupInputStream() {
     streamEntries = new ArrayList<>();
@@ -66,19 +77,21 @@ public class ChunkGroupInputStream extends InputStream {
   /**
    * Append another stream to the end of the list.
    *
-   * @param stream the stream instance.
-   * @param length the max number of bytes that should be written to this
-   *               stream.
+   * @param stream       the stream instance.
+   * @param streamLength the max number of bytes that should be written to this
+   *                     stream.
    */
-  public synchronized void addStream(InputStream stream, long length) {
-    streamEntries.add(new ChunkInputStreamEntry(stream, length));
+  public synchronized void addStream(ChunkInputStream stream,
+      long streamLength) {
+    streamEntries.add(new ChunkInputStreamEntry(stream, streamLength));
   }
 
 
   @Override
   public synchronized int read() throws IOException {
+    checkNotClosed();
     if (streamEntries.size() <= currentStreamIndex) {
-      throw new IndexOutOfBoundsException();
+      return EOF;
     }
     ChunkInputStreamEntry entry = streamEntries.get(currentStreamIndex);
     int data = entry.read();
@@ -87,6 +100,7 @@ public class ChunkGroupInputStream extends InputStream {
 
   @Override
   public synchronized int read(byte[] b, int off, int len) throws IOException {
+    checkNotClosed();
     if (b == null) {
       throw new NullPointerException();
     }
@@ -122,15 +136,82 @@ public class ChunkGroupInputStream extends InputStream {
     return totalReadLen;
   }
 
-  private static class ChunkInputStreamEntry extends InputStream {
+  @Override
+  public void seek(long pos) throws IOException {
+    checkNotClosed();
+    if (pos < 0 || pos >= length) {
+      if (pos == 0) {
+        // It is possible for length and pos to be zero in which case
+        // seek should return instead of throwing exception
+        return;
+      }
+      throw new EOFException(
+          "EOF encountered at pos: " + pos + " for key: " + key);
+    }
+    Preconditions.assertTrue(currentStreamIndex >= 0);
+    if (currentStreamIndex >= streamEntries.size()) {
+      currentStreamIndex = Arrays.binarySearch(streamOffset, pos);
+    } else if (pos < streamOffset[currentStreamIndex]) {
+      currentStreamIndex =
+          Arrays.binarySearch(streamOffset, 0, currentStreamIndex, pos);
+    } else if (pos >= streamOffset[currentStreamIndex] + streamEntries
+        .get(currentStreamIndex).length) {
+      currentStreamIndex = Arrays
+          .binarySearch(streamOffset, currentStreamIndex + 1,
+              streamEntries.size(), pos);
+    }
+    if (currentStreamIndex < 0) {
+      // Binary search returns -insertionPoint - 1  if element is not present
+      // in the array. insertionPoint is the point at which element would be
+      // inserted in the sorted array. We need to adjust the currentStreamIndex
+      // accordingly so that currentStreamIndex = insertionPoint - 1
+      currentStreamIndex = -currentStreamIndex - 2;
+    }
+    // seek to the proper offset in the ChunkInputStream
+    streamEntries.get(currentStreamIndex)
+        .seek(pos - streamOffset[currentStreamIndex]);
+  }
+
+  @Override
+  public long getPos() throws IOException {
+    return length == 0 ? 0 :
+        streamOffset[currentStreamIndex] + streamEntries.get(currentStreamIndex)
+            .getPos();
+  }
+
+  @Override
+  public boolean seekToNewSource(long targetPos) throws IOException {
+    return false;
+  }
+
+  @Override
+  public int available() throws IOException {
+    checkNotClosed();
+    long remaining = length - getPos();
+    return remaining <= Integer.MAX_VALUE ? (int) remaining : Integer.MAX_VALUE;
+  }
+
+  @Override
+  public void close() throws IOException {
+    closed = true;
+    for (int i = 0; i < streamEntries.size(); i++) {
+      streamEntries.get(i).close();
+    }
+  }
+
+  /**
+   * Encapsulates ChunkInputStream.
+   */
+  public static class ChunkInputStreamEntry extends InputStream
+      implements Seekable {
 
-    private final InputStream inputStream;
+    private final ChunkInputStream chunkInputStream;
     private final long length;
     private long currentPosition;
 
-
-    ChunkInputStreamEntry(InputStream chunkInputStream, long length) {
-      this.inputStream = chunkInputStream;
+    public ChunkInputStreamEntry(ChunkInputStream chunkInputStream,
+        long length) {
+      this.chunkInputStream = chunkInputStream;
       this.length = length;
       this.currentPosition = 0;
     }
@@ -142,21 +223,36 @@ public class ChunkGroupInputStream extends InputStream {
     @Override
     public synchronized int read(byte[] b, int off, int len)
         throws IOException {
-      int readLen = inputStream.read(b, off, len);
+      int readLen = chunkInputStream.read(b, off, len);
       currentPosition += readLen;
       return readLen;
     }
 
     @Override
     public synchronized int read() throws IOException {
-      int data = inputStream.read();
+      int data = chunkInputStream.read();
       currentPosition += 1;
       return data;
     }
 
     @Override
     public synchronized void close() throws IOException {
-      inputStream.close();
+      chunkInputStream.close();
+    }
+
+    @Override
+    public void seek(long pos) throws IOException {
+      chunkInputStream.seek(pos);
+    }
+
+    @Override
+    public long getPos() throws IOException {
+      return chunkInputStream.getPos();
+    }
+
+    @Override
+    public boolean seekToNewSource(long targetPos) throws IOException {
+      return false;
     }
   }
 
@@ -168,8 +264,12 @@ public class ChunkGroupInputStream extends InputStream {
     long length = 0;
     String containerKey;
     ChunkGroupInputStream groupInputStream = new ChunkGroupInputStream();
-    for (KsmKeyLocationInfo ksmKeyLocationInfo :
-        keyInfo.getLatestVersionLocations().getBlocksLatestVersionOnly()) {
+    groupInputStream.key = keyInfo.getKeyName();
+    List<KsmKeyLocationInfo> keyLocationInfos =
+        keyInfo.getLatestVersionLocations().getBlocksLatestVersionOnly();
+    groupInputStream.streamOffset = new long[keyLocationInfos.size()];
+    for (int i = 0; i < keyLocationInfos.size(); i++) {
+      KsmKeyLocationInfo ksmKeyLocationInfo = keyLocationInfos.get(i);
       String containerName = ksmKeyLocationInfo.getContainerName();
       Pipeline pipeline =
           storageContainerLocationClient.getContainer(containerName);
@@ -180,6 +280,7 @@ public class ChunkGroupInputStream extends InputStream {
       try {
         LOG.debug("get key accessing {} {}",
             xceiverClient.getPipeline().getContainerName(), containerKey);
+        groupInputStream.streamOffset[i] = length;
         ContainerProtos.KeyData containerKeyData = OzoneContainerTranslation
             .containerKeyDataForRead(
                 xceiverClient.getPipeline().getContainerName(), containerKey);
@@ -202,6 +303,19 @@ public class ChunkGroupInputStream extends InputStream {
         }
       }
     }
+    groupInputStream.length = length;
     return new LengthInputStream(groupInputStream, length);
   }
+
+  /**
+   * Verify that the input stream is open. Non blocking; this gives
+   * the last state of the volatile {@link #closed} field.
+   * @throws IOException if the connection is closed.
+   */
+  private void checkNotClosed() throws IOException {
+    if (closed) {
+      throw new IOException(
+          ": " + FSExceptionMessages.STREAM_IS_CLOSED + " Key: " + key);
+    }
+  }
 }

+ 25 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java

@@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.client.io;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import org.apache.hadoop.fs.FSExceptionMessages;
 import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Result;
 import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfoGroup;
 import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType;
@@ -72,6 +73,7 @@ public class ChunkGroupOutputStream extends OutputStream {
   private final XceiverClientManager xceiverClientManager;
   private final int chunkSize;
   private final String requestID;
+  private boolean closed;
 
   /**
    * A constructor for testing purpose only.
@@ -86,6 +88,7 @@ public class ChunkGroupOutputStream extends OutputStream {
     xceiverClientManager = null;
     chunkSize = 0;
     requestID = null;
+    closed = false;
   }
 
   /**
@@ -196,6 +199,8 @@ public class ChunkGroupOutputStream extends OutputStream {
 
   @Override
   public synchronized void write(int b) throws IOException {
+    checkNotClosed();
+
     if (streamEntries.size() <= currentStreamIndex) {
       Preconditions.checkNotNull(ksmClient);
       // allocate a new block, if a exception happens, log an error and
@@ -230,6 +235,8 @@ public class ChunkGroupOutputStream extends OutputStream {
   @Override
   public synchronized void write(byte[] b, int off, int len)
       throws IOException {
+    checkNotClosed();
+
     if (b == null) {
       throw new NullPointerException();
     }
@@ -286,6 +293,7 @@ public class ChunkGroupOutputStream extends OutputStream {
 
   @Override
   public synchronized void flush() throws IOException {
+    checkNotClosed();
     for (int i = 0; i <= currentStreamIndex; i++) {
       streamEntries.get(i).flush();
     }
@@ -298,6 +306,10 @@ public class ChunkGroupOutputStream extends OutputStream {
    */
   @Override
   public synchronized void close() throws IOException {
+    if (closed) {
+      return;
+    }
+    closed = true;
     for (ChunkOutputStreamEntry entry : streamEntries) {
       if (entry != null) {
         entry.close();
@@ -464,4 +476,17 @@ public class ChunkGroupOutputStream extends OutputStream {
       }
     }
   }
+
+  /**
+   * Verify that the output stream is open. Non blocking; this gives
+   * the last state of the volatile {@link #closed} field.
+   * @throws IOException if the connection is closed.
+   */
+  private void checkNotClosed() throws IOException {
+    if (closed) {
+      throw new IOException(
+          ": " + FSExceptionMessages.STREAM_IS_CLOSED + " Key: " + keyArgs
+              .getKeyName());
+    }
+  }
 }

+ 8 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneInputStream.java

@@ -49,4 +49,12 @@ public class OzoneInputStream extends InputStream {
     inputStream.close();
   }
 
+  @Override
+  public int available() throws IOException {
+    return inputStream.available();
+  }
+
+  public InputStream getInputStream() {
+    return inputStream;
+  }
 }

+ 4 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneOutputStream.java

@@ -57,4 +57,8 @@ public class OzoneOutputStream extends OutputStream {
     //commitKey can be done here, if needed.
     outputStream.close();
   }
+
+  public OutputStream getOutputStream() {
+    return outputStream;
+  }
 }

+ 92 - 20
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ChunkInputStream.java

@@ -18,13 +18,16 @@
 
 package org.apache.hadoop.scm.storage;
 
+import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 import java.util.List;
 
 import com.google.protobuf.ByteString;
 
+import org.apache.hadoop.fs.Seekable;
 import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ReadChunkResponseProto;
 import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ChunkInfo;
 import org.apache.hadoop.scm.XceiverClientSpi;
@@ -38,7 +41,7 @@ import org.apache.hadoop.scm.XceiverClientManager;
  * instances.  This class encapsulates all state management for iterating
  * through the sequence of chunks and the sequence of buffers within each chunk.
  */
-public class ChunkInputStream extends InputStream {
+public class ChunkInputStream extends InputStream implements Seekable {
 
   private static final int EOF = -1;
 
@@ -47,9 +50,10 @@ public class ChunkInputStream extends InputStream {
   private XceiverClientManager xceiverClientManager;
   private XceiverClientSpi xceiverClient;
   private List<ChunkInfo> chunks;
-  private int chunkOffset;
+  private int chunkIndex;
+  private long[] chunkOffset;
   private List<ByteBuffer> buffers;
-  private int bufferOffset;
+  private int bufferIndex;
 
   /**
    * Creates a new ChunkInputStream.
@@ -67,9 +71,21 @@ public class ChunkInputStream extends InputStream {
     this.xceiverClientManager = xceiverClientManager;
     this.xceiverClient = xceiverClient;
     this.chunks = chunks;
-    this.chunkOffset = 0;
+    this.chunkIndex = -1;
+    // chunkOffset[i] stores offset at which chunk i stores data in
+    // ChunkInputStream
+    this.chunkOffset = new long[this.chunks.size()];
+    initializeChunkOffset();
     this.buffers = null;
-    this.bufferOffset = 0;
+    this.bufferIndex = 0;
+  }
+
+  private void initializeChunkOffset() {
+    int tempOffset = 0;
+    for (int i = 0; i < chunks.size(); i++) {
+      chunkOffset[i] = tempOffset;
+      tempOffset += chunks.get(i).getLen();
+    }
   }
 
   @Override
@@ -77,7 +93,8 @@ public class ChunkInputStream extends InputStream {
       throws IOException {
     checkOpen();
     int available = prepareRead(1);
-    return available == EOF ? EOF : buffers.get(bufferOffset).get();
+    return available == EOF ? EOF :
+        Byte.toUnsignedInt(buffers.get(bufferIndex).get());
   }
 
   @Override
@@ -106,7 +123,7 @@ public class ChunkInputStream extends InputStream {
     if (available == EOF) {
       return EOF;
     }
-    buffers.get(bufferOffset).get(b, off, available);
+    buffers.get(bufferIndex).get(b, off, available);
     return available;
   }
 
@@ -144,20 +161,20 @@ public class ChunkInputStream extends InputStream {
         return EOF;
       } else if (buffers == null) {
         // The first read triggers fetching the first chunk.
-        readChunkFromContainer(0);
+        readChunkFromContainer();
       } else if (!buffers.isEmpty() &&
-          buffers.get(bufferOffset).hasRemaining()) {
+          buffers.get(bufferIndex).hasRemaining()) {
         // Data is available from the current buffer.
-        ByteBuffer bb = buffers.get(bufferOffset);
+        ByteBuffer bb = buffers.get(bufferIndex);
         return len > bb.remaining() ? bb.remaining() : len;
       } else if (!buffers.isEmpty() &&
-          !buffers.get(bufferOffset).hasRemaining() &&
-          bufferOffset < buffers.size() - 1) {
+          !buffers.get(bufferIndex).hasRemaining() &&
+          bufferIndex < buffers.size() - 1) {
         // There are additional buffers available.
-        ++bufferOffset;
-      } else if (chunkOffset < chunks.size() - 1) {
+        ++bufferIndex;
+      } else if (chunkIndex < chunks.size() - 1) {
         // There are additional chunks available.
-        readChunkFromContainer(chunkOffset + 1);
+        readChunkFromContainer();
       } else {
         // All available input has been consumed.
         return EOF;
@@ -170,20 +187,75 @@ public class ChunkInputStream extends InputStream {
    * successful, then the data of the read chunk is saved so that its bytes can
    * be returned from subsequent read calls.
    *
-   * @param readChunkOffset offset in the chunk list of which chunk to read
    * @throws IOException if there is an I/O error while performing the call
    */
-  private synchronized void readChunkFromContainer(int readChunkOffset)
-      throws IOException {
+  private synchronized void readChunkFromContainer() throws IOException {
+    // On every chunk read chunkIndex should be increased so as to read the
+    // next chunk
+    chunkIndex += 1;
     final ReadChunkResponseProto readChunkResponse;
     try {
       readChunkResponse = ContainerProtocolCalls.readChunk(xceiverClient,
-          chunks.get(readChunkOffset), key, traceID);
+          chunks.get(chunkIndex), key, traceID);
     } catch (IOException e) {
       throw new IOException("Unexpected OzoneException: " + e.toString(), e);
     }
-    chunkOffset = readChunkOffset;
     ByteString byteString = readChunkResponse.getData();
     buffers = byteString.asReadOnlyByteBufferList();
+    bufferIndex = 0;
+  }
+
+  @Override
+  public synchronized void seek(long pos) throws IOException {
+    if (pos < 0 || (chunks.size() == 0 && pos > 0)
+        || pos >= chunkOffset[chunks.size() - 1] + chunks.get(chunks.size() - 1)
+        .getLen()) {
+      throw new EOFException(
+          "EOF encountered pos: " + pos + " container key: " + key);
+    }
+    if (chunkIndex == -1) {
+      chunkIndex = Arrays.binarySearch(chunkOffset, pos);
+    } else if (pos < chunkOffset[chunkIndex]) {
+      chunkIndex = Arrays.binarySearch(chunkOffset, 0, chunkIndex, pos);
+    } else if (pos >= chunkOffset[chunkIndex] + chunks.get(chunkIndex)
+        .getLen()) {
+      chunkIndex =
+          Arrays.binarySearch(chunkOffset, chunkIndex + 1, chunks.size(), pos);
+    }
+    if (chunkIndex < 0) {
+      // Binary search returns -insertionPoint - 1  if element is not present
+      // in the array. insertionPoint is the point at which element would be
+      // inserted in the sorted array. We need to adjust the chunkIndex
+      // accordingly so that chunkIndex = insertionPoint - 1
+      chunkIndex = -chunkIndex -2;
+    }
+    // adjust chunkIndex so that readChunkFromContainer reads the correct chunk
+    chunkIndex -= 1;
+    readChunkFromContainer();
+    adjustBufferIndex(pos);
+  }
+
+  private void adjustBufferIndex(long pos) {
+    long tempOffest = chunkOffset[chunkIndex];
+    for (int i = 0; i < buffers.size(); i++) {
+      if (pos - tempOffest >= buffers.get(i).capacity()) {
+        tempOffest += buffers.get(i).capacity();
+      } else {
+        bufferIndex = i;
+        break;
+      }
+    }
+    buffers.get(bufferIndex).position((int) (pos - tempOffest));
+  }
+
+  @Override
+  public synchronized long getPos() throws IOException {
+    return chunkIndex == -1 ? 0 :
+        chunkOffset[chunkIndex] + buffers.get(bufferIndex).position();
+  }
+
+  @Override
+  public boolean seekToNewSource(long targetPos) throws IOException {
+    return false;
   }
 }

+ 4 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/MiniOzoneClassicCluster.java

@@ -206,6 +206,10 @@ public final class MiniOzoneClassicCluster extends MiniDFSCluster
     return this.scm;
   }
 
+  public OzoneConfiguration getConf() {
+    return conf;
+  }
+
   @Override
   public KeySpaceManager getKeySpaceManager() {
     return this.ksm;

+ 68 - 5
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestChunkStreams.java

@@ -19,14 +19,15 @@ package org.apache.hadoop.ozone.ksm;
 import org.apache.commons.lang.RandomStringUtils;
 import org.apache.hadoop.ozone.client.io.ChunkGroupInputStream;
 import org.apache.hadoop.ozone.client.io.ChunkGroupOutputStream;
+import org.apache.hadoop.scm.storage.ChunkInputStream;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
-import java.io.InputStream;
 import java.io.OutputStream;
+import java.io.IOException;
 import java.util.ArrayList;
 
 import static org.junit.Assert.assertEquals;
@@ -111,13 +112,44 @@ public class TestChunkStreams {
   @Test
   public void testReadGroupInputStream() throws Exception {
     try (ChunkGroupInputStream groupInputStream = new ChunkGroupInputStream()) {
-      ArrayList<InputStream> inputStreams = new ArrayList<>();
+      ArrayList<ChunkInputStream> inputStreams = new ArrayList<>();
 
       String dataString = RandomStringUtils.randomAscii(500);
       byte[] buf = dataString.getBytes();
       int offset = 0;
       for (int i = 0; i < 5; i++) {
-        ByteArrayInputStream in = new ByteArrayInputStream(buf, offset, 100);
+        int tempOffset = offset;
+        ChunkInputStream in =
+            new ChunkInputStream(null, null, null, new ArrayList<>(), null) {
+              private ByteArrayInputStream in =
+                  new ByteArrayInputStream(buf, tempOffset, 100);
+
+              @Override
+              public void seek(long pos) throws IOException {
+                throw new UnsupportedOperationException();
+              }
+
+              @Override
+              public long getPos() throws IOException {
+                throw new UnsupportedOperationException();
+              }
+
+              @Override
+              public boolean seekToNewSource(long targetPos)
+                  throws IOException {
+                throw new UnsupportedOperationException();
+              }
+
+              @Override
+              public int read() throws IOException {
+                return in.read();
+              }
+
+              @Override
+              public int read(byte[] b, int off, int len) throws IOException {
+                return in.read(b, off, len);
+              }
+            };
         inputStreams.add(in);
         offset += 100;
         groupInputStream.addStream(in, 100);
@@ -134,13 +166,44 @@ public class TestChunkStreams {
   @Test
   public void testErrorReadGroupInputStream() throws Exception {
     try (ChunkGroupInputStream groupInputStream = new ChunkGroupInputStream()) {
-      ArrayList<InputStream> inputStreams = new ArrayList<>();
+      ArrayList<ChunkInputStream> inputStreams = new ArrayList<>();
 
       String dataString = RandomStringUtils.randomAscii(500);
       byte[] buf = dataString.getBytes();
       int offset = 0;
       for (int i = 0; i < 5; i++) {
-        ByteArrayInputStream in = new ByteArrayInputStream(buf, offset, 100);
+        int tempOffset = offset;
+        ChunkInputStream in =
+            new ChunkInputStream(null, null, null, new ArrayList<>(), null) {
+              private ByteArrayInputStream in =
+                  new ByteArrayInputStream(buf, tempOffset, 100);
+
+              @Override
+              public void seek(long pos) throws IOException {
+                throw new UnsupportedOperationException();
+              }
+
+              @Override
+              public long getPos() throws IOException {
+                throw new UnsupportedOperationException();
+              }
+
+              @Override
+              public boolean seekToNewSource(long targetPos)
+                  throws IOException {
+                throw new UnsupportedOperationException();
+              }
+
+              @Override
+              public int read() throws IOException {
+                return in.read();
+              }
+
+              @Override
+              public int read(byte[] b, int off, int len) throws IOException {
+                return in.read(b, off, len);
+              }
+            };
         inputStreams.add(in);
         offset += 100;
         groupInputStream.addStream(in, 100);

+ 74 - 0
hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSInputStream.java

@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.ozone;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.ozone.client.io.ChunkGroupInputStream;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * The input stream for Ozone file system.
+ *
+ * TODO: Make inputStream generic for both rest and rpc clients
+ * This class is not thread safe.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public final class OzoneFSInputStream extends FSInputStream {
+
+  private final ChunkGroupInputStream inputStream;
+
+  public OzoneFSInputStream(InputStream inputStream) {
+    this.inputStream = (ChunkGroupInputStream)inputStream;
+  }
+
+  @Override
+  public int read() throws IOException {
+    return inputStream.read();
+  }
+
+  @Override
+  public synchronized void close() throws IOException {
+    inputStream.close();
+  }
+
+  @Override
+  public void seek(long pos) throws IOException {
+    inputStream.seek(pos);
+  }
+
+  @Override
+  public long getPos() throws IOException {
+    return inputStream.getPos();
+  }
+
+  @Override
+  public boolean seekToNewSource(long targetPos) throws IOException {
+    return false;
+  }
+
+  @Override
+  public int available() throws IOException {
+    return inputStream.available();
+  }
+}

+ 59 - 0
hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSOutputStream.java

@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.ozone;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import org.apache.hadoop.ozone.client.io.ChunkGroupOutputStream;
+
+
+/**
+ * The output stream for Ozone file system.
+ *
+ * TODO: Make outputStream generic for both rest and rpc clients
+ * This class is not thread safe.
+ */
+public class OzoneFSOutputStream extends OutputStream {
+
+  private final ChunkGroupOutputStream outputStream;
+
+  public OzoneFSOutputStream(OutputStream outputStream) {
+    this.outputStream = (ChunkGroupOutputStream)outputStream;
+  }
+
+  @Override
+  public void write(int b) throws IOException {
+    outputStream.write(b);
+  }
+
+  @Override
+  public void write(byte[] b, int off, int len) throws IOException {
+    outputStream.write(b, off, len);
+  }
+
+  @Override
+  public synchronized void flush() throws IOException {
+    outputStream.flush();
+  }
+
+  @Override
+  public synchronized void close() throws IOException {
+    outputStream.close();
+  }
+}

+ 83 - 126
hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java

@@ -18,16 +18,15 @@
 
 package org.apache.hadoop.fs.ozone;
 
-import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
-import java.text.ParseException;
 import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.Objects;
+import java.util.Iterator;
 
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -35,12 +34,18 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
-import org.apache.hadoop.ozone.web.client.OzoneKey;
-import org.apache.hadoop.ozone.web.client.OzoneRestClient;
-import org.apache.hadoop.ozone.web.utils.OzoneUtils;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.client.OzoneKey;
+import org.apache.hadoop.ozone.client.OzoneVolume;
+import org.apache.hadoop.ozone.client.ReplicationFactor;
+import org.apache.hadoop.ozone.client.ReplicationType;
 import org.apache.http.client.utils.URIBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,19 +55,16 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.ozone.web.client.OzoneBucket;
-import org.apache.hadoop.ozone.web.client.OzoneVolume;
-import org.apache.hadoop.ozone.client.rest.OzoneException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.ozone.client.io.OzoneInputStream;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 
 import static org.apache.hadoop.fs.ozone.Constants.OZONE_DEFAULT_USER;
 import static org.apache.hadoop.fs.ozone.Constants.OZONE_URI_SCHEME;
 import static org.apache.hadoop.fs.ozone.Constants.OZONE_USER_DIR;
-import static org.apache.hadoop.fs.ozone.Constants.OZONE_HTTP_SCHEME;
 import static org.apache.hadoop.fs.ozone.Constants.OZONE_URI_DELIMITER;
 import static org.apache.hadoop.fs.ozone.Constants.LISTING_PAGE_SIZE;
-import static org.apache.hadoop.fs.ozone.Constants.BUFFER_DIR_KEY;
 
 /**
  * The Ozone Filesystem implementation.
@@ -78,11 +80,15 @@ public class OzoneFileSystem extends FileSystem {
   static final Logger LOG = LoggerFactory.getLogger(OzoneFileSystem.class);
 
   /** The Ozone client for connecting to Ozone server. */
-  private OzoneRestClient ozone;
+  private OzoneClient ozoneClient;
+  private ObjectStore objectStore;
+  private OzoneVolume volume;
   private OzoneBucket bucket;
   private URI uri;
   private String userName;
   private Path workingDir;
+  private ReplicationType replicationType;
+  private ReplicationFactor replicationFactor;
 
   @Override
   public void initialize(URI name, Configuration conf) throws IOException {
@@ -115,23 +121,24 @@ public class OzoneFileSystem extends FileSystem {
           .setPath(OZONE_URI_DELIMITER + volumeStr + OZONE_URI_DELIMITER
               + bucketStr + OZONE_URI_DELIMITER).build();
       LOG.trace("Ozone URI for ozfs initialization is " + uri);
-      this.ozone = new OzoneRestClient(OZONE_HTTP_SCHEME + hostStr);
+      this.ozoneClient = OzoneClientFactory.getRpcClient(conf);
+      objectStore = ozoneClient.getObjectStore();
+      this.volume = objectStore.getVolume(volumeStr);
+      this.bucket = volume.getBucket(bucketStr);
+      this.replicationType = ReplicationType.valueOf(
+          conf.get(OzoneConfigKeys.OZONE_REPLICATION_TYPE,
+              OzoneConfigKeys.OZONE_REPLICATION_TYPE_DEFAULT));
+      this.replicationFactor = ReplicationFactor.valueOf(
+          conf.getInt(OzoneConfigKeys.OZONE_REPLICATION,
+              OzoneConfigKeys.OZONE_REPLICATION_DEFAULT));
       try {
         this.userName =
             UserGroupInformation.getCurrentUser().getShortUserName();
       } catch (IOException e) {
         this.userName = OZONE_DEFAULT_USER;
       }
-      this.ozone.setUserAuth(userName);
-
-      OzoneVolume volume = ozone.getVolume(volumeStr);
-      this.bucket = volume.getBucket(bucketStr);
       this.workingDir = new Path(OZONE_USER_DIR, this.userName)
               .makeQualified(this.uri, this.workingDir);
-    } catch (OzoneException oe) {
-      final String msg = "Ozone server exception when initializing file system";
-      LOG.error(msg, oe);
-      throw new IOException(msg, oe);
     } catch (URISyntaxException ue) {
       final String msg = "Invalid Ozone endpoint " + name;
       LOG.error(msg, ue);
@@ -142,7 +149,7 @@ public class OzoneFileSystem extends FileSystem {
   @Override
   public void close() throws IOException {
     try {
-      ozone.close();
+      ozoneClient.close();
     } finally {
       super.close();
     }
@@ -162,14 +169,13 @@ public class OzoneFileSystem extends FileSystem {
   public FSDataInputStream open(Path f, int bufferSize) throws IOException {
     LOG.trace("open() path:{}", f);
     final FileStatus fileStatus = getFileStatus(f);
-
+    final String key = pathToKey(f);
     if (fileStatus.isDirectory()) {
       throw new FileNotFoundException("Can't open directory " + f + " to read");
     }
 
     return new FSDataInputStream(
-        new OzoneInputStream(getConf(), uri, bucket, pathToKey(f),
-            fileStatus.getLen(), bufferSize, statistics));
+        new OzoneFSInputStream(bucket.readKey(key).getInputStream()));
   }
 
   @Override
@@ -206,11 +212,12 @@ public class OzoneFileSystem extends FileSystem {
       // does not exists and a new file can thus be created.
     }
 
-    final OzoneOutputStream stream =
-        new OzoneOutputStream(getConf(), uri, bucket, key, this.statistics);
+    OzoneOutputStream ozoneOutputStream =
+        bucket.createKey(key, 0, replicationType, replicationFactor);
     // We pass null to FSDataOutputStream so it won't count writes that
     // are being buffered to a file
-    return new FSDataOutputStream(stream, null);
+    return new FSDataOutputStream(
+        new OzoneFSOutputStream(ozoneOutputStream.getOutputStream()), null);
   }
 
   @Override
@@ -245,7 +252,7 @@ public class OzoneFileSystem extends FileSystem {
 
     RenameIterator(Path srcPath, Path dstPath)
         throws IOException {
-      super(srcPath, true);
+      super(srcPath);
       srcKey = pathToKey(srcPath);
       dstKey = pathToKey(dstPath);
       LOG.trace("rename from:{} to:{}", srcKey, dstKey);
@@ -253,30 +260,17 @@ public class OzoneFileSystem extends FileSystem {
 
     boolean processKey(String key) throws IOException {
       String newKeyName = dstKey.concat(key.substring(srcKey.length()));
-      return rename(key, newKeyName);
+      rename(key, newKeyName);
+      return true;
     }
 
-    // TODO: currently rename work by copying the file, with changes in KSM,
-    // this operation can be made improved by renaming the keys in KSM directly.
-    private boolean rename(String src, String dst) throws IOException {
-      final LocalDirAllocator dirAlloc = new LocalDirAllocator(BUFFER_DIR_KEY);
-      final File tmpFile = dirAlloc.createTmpFileForWrite("output-",
-          LocalDirAllocator.SIZE_UNKNOWN, getConf());
-
-      try {
-        LOG.trace("rename by copying file from:{} to:{}", src, dst);
-        bucket.getKey(src, tmpFile.toPath());
-        bucket.putKey(dst, tmpFile);
-        return true;
-      } catch (OzoneException oe) {
-        String msg = String.format("Error when renaming key from:%s to:%s",
-            src, dst);
-        LOG.error(msg, oe);
-        throw new IOException(msg, oe);
-      } finally {
-        if (!tmpFile.delete()) {
-          LOG.warn("Can not delete tmpFile: " + tmpFile);
-        }
+    // TODO: currently rename work by copying the streams, with changes in KSM,
+    // this operation can be improved by renaming the keys in KSM directly.
+    private void rename(String src, String dst) throws IOException {
+      try (OzoneInputStream inputStream = bucket.readKey(src);
+          OzoneOutputStream outputStream = bucket
+              .createKey(dst, 0, replicationType, replicationFactor)) {
+        IOUtils.copyBytes(inputStream, outputStream, getConf());
       }
     }
   }
@@ -386,8 +380,13 @@ public class OzoneFileSystem extends FileSystem {
     private boolean recursive;
     DeleteIterator(Path f, boolean recursive)
         throws IOException {
-      super(f, recursive);
+      super(f);
       this.recursive = recursive;
+      if (getStatus().isDirectory()
+          && !this.recursive
+          && listStatus(f).length != 0) {
+        throw new PathIsNotEmptyDirectoryException(f.toString());
+      }
     }
 
     boolean processKey(String key) throws IOException {
@@ -421,7 +420,7 @@ public class OzoneFileSystem extends FileSystem {
     private Path f;
 
     ListStatusIterator(Path f) throws IOException  {
-      super(f, true);
+      super(f);
       this.f = f;
     }
 
@@ -532,8 +531,7 @@ public class OzoneFileSystem extends FileSystem {
 
     if (key.length() == 0) {
       return new FileStatus(0, true, 1, 0,
-          getModifiedTime(bucket.getCreatedOn(), OZONE_URI_DELIMITER),
-          qualifiedPath);
+          bucket.getCreationTime(), qualifiedPath);
     }
 
     // consider this a file and get key status
@@ -548,14 +546,11 @@ public class OzoneFileSystem extends FileSystem {
       throw new FileNotFoundException(f + ": No such file or directory!");
     } else if (isDirectory(meta)) {
       return new FileStatus(0, true, 1, 0,
-          getModifiedTime(meta.getObjectInfo().getModifiedOn(), key),
-          qualifiedPath);
+          meta.getModificationTime(), qualifiedPath);
     } else {
       //TODO: Fetch replication count from ratis config
-      return new FileStatus(meta.getObjectInfo().getSize(), false, 1,
-            getDefaultBlockSize(f),
-          getModifiedTime(meta.getObjectInfo().getModifiedOn(), key),
-          qualifiedPath);
+      return new FileStatus(meta.getDataSize(), false, 1,
+            getDefaultBlockSize(f), meta.getModificationTime(), qualifiedPath);
     }
   }
 
@@ -566,54 +561,23 @@ public class OzoneFileSystem extends FileSystem {
    */
   private OzoneKey getKeyInfo(String key) {
     try {
-      return bucket.getKeyInfo(key);
-    } catch (OzoneException e) {
+      return bucket.getKey(key);
+    } catch (IOException e) {
       LOG.trace("Key:{} does not exists", key);
       return null;
     }
   }
 
-  /**
-   * Helper method to get the modified time of the key.
-   * @param key key to fetch the modified time
-   * @return last modified time of the key
-   */
-  private long getModifiedTime(String modifiedTime, String key) {
-    try {
-      return OzoneUtils.formatDate(modifiedTime);
-    } catch (ParseException pe) {
-      LOG.error("Invalid time:{} for key:{}", modifiedTime, key, pe);
-      return 0;
-    }
-  }
-
   /**
    * Helper method to check if an Ozone key is representing a directory.
    * @param key key to be checked as a directory
    * @return true if key is a directory, false otherwise
    */
   private boolean isDirectory(OzoneKey key) {
-    LOG.trace("key name:{} size:{}", key.getObjectInfo().getKeyName(),
-        key.getObjectInfo().getSize());
-    return key.getObjectInfo().getKeyName().endsWith(OZONE_URI_DELIMITER)
-        && (key.getObjectInfo().getSize() == 0);
-  }
-
-  /**
-   * Helper method to list entries matching the key name in bucket.
-   * @param dirKey key prefix for listing the keys
-   * @param lastKey last iterated key
-   * @return List of Keys
-   */
-  List<OzoneKey> listKeys(String dirKey, String lastKey)
-      throws IOException {
-    LOG.trace("list keys dirKey:{} lastKey:{}", dirKey, lastKey);
-    try {
-      return bucket.listKeys(dirKey, LISTING_PAGE_SIZE, lastKey);
-    } catch (OzoneException oe) {
-      LOG.error("list keys failed dirKey:{} lastKey:{}", dirKey, lastKey, oe);
-      throw new IOException("List keys failed " + oe.getMessage());
-    }
+    LOG.trace("key name:{} size:{}", key.getName(),
+        key.getDataSize());
+    return key.getName().endsWith(OZONE_URI_DELIMITER)
+        && (key.getDataSize() == 0);
   }
 
   /**
@@ -623,11 +587,11 @@ public class OzoneFileSystem extends FileSystem {
    */
   private boolean createDirectory(String keyName) {
     try {
-      LOG.trace("creating dir for key:{}", keyName);
-      bucket.putKey(keyName, "");
+      LOG.info("creating dir for key:{}", keyName);
+      bucket.createKey(keyName, 0, replicationType, replicationFactor).close();
       return true;
-    } catch (OzoneException oe) {
-      LOG.error("create key failed for key:{}", keyName, oe);
+    } catch (IOException ioe) {
+      LOG.error("create key failed for key:{}", keyName, ioe);
       return false;
     }
   }
@@ -642,8 +606,8 @@ public class OzoneFileSystem extends FileSystem {
     try {
       bucket.deleteKey(keyName);
       return true;
-    } catch (OzoneException oe) {
-      LOG.error("delete key failed " + oe.getMessage());
+    } catch (IOException ioe) {
+      LOG.error("delete key failed " + ioe.getMessage());
       return false;
     }
   }
@@ -671,7 +635,7 @@ public class OzoneFileSystem extends FileSystem {
    * @param key the ozone Key which needs to be appended
    * @return delimiter appended key
    */
-  String addTrailingSlashIfNeeded(String key) {
+  private String addTrailingSlashIfNeeded(String key) {
     if (StringUtils.isNotEmpty(key) && !key.endsWith(OZONE_URI_DELIMITER)) {
       return key + OZONE_URI_DELIMITER;
     } else {
@@ -690,47 +654,36 @@ public class OzoneFileSystem extends FileSystem {
 
   private abstract class OzoneListingIterator {
     private final Path path;
-    private final boolean recursive;
     private final FileStatus status;
     private String pathKey;
+    private Iterator<OzoneKey> keyIterator;
 
-    OzoneListingIterator(Path path, boolean recursive)
+    OzoneListingIterator(Path path)
         throws IOException {
       this.path = path;
-      this.recursive = recursive;
       this.status = getFileStatus(path);
       this.pathKey = pathToKey(path);
       if (status.isDirectory()) {
         this.pathKey = addTrailingSlashIfNeeded(pathKey);
       }
+      keyIterator = bucket.listKeys(pathKey);
     }
 
     abstract boolean processKey(String key) throws IOException;
 
     // iterates all the keys in the particular path
     boolean iterate() throws IOException {
-      LOG.trace("Iterating path {} - recursive {}", path, recursive);
+      LOG.trace("Iterating path {}", path);
       if (status.isDirectory()) {
         LOG.trace("Iterating directory:{}", pathKey);
-        String lastKey = pathKey;
-        while (true) {
-          List<OzoneKey> ozoneKeys = listKeys(pathKey, lastKey);
-          LOG.trace("number of sub keys:{}", ozoneKeys.size());
-          if (ozoneKeys.size() == 0) {
-            return processKey(pathKey);
-          } else {
-            if (!recursive) {
-              throw new PathIsNotEmptyDirectoryException(path.toString());
-            } else {
-              for (OzoneKey ozoneKey : ozoneKeys) {
-                lastKey = ozoneKey.getObjectInfo().getKeyName();
-                if (!processKey(lastKey)) {
-                  return false;
-                }
-              }
-            }
+        while (keyIterator.hasNext()) {
+          OzoneKey key = keyIterator.next();
+          LOG.info("iterating key:{}", key.getName());
+          if (!processKey(key.getName())) {
+            return false;
           }
         }
+        return true;
       } else {
         LOG.trace("iterating file:{}", path);
         return processKey(pathKey);
@@ -744,5 +697,9 @@ public class OzoneFileSystem extends FileSystem {
     boolean pathIsDirectory() {
       return status.isDirectory();
     }
+
+    FileStatus getStatus() {
+      return status;
+    }
   }
 }

+ 0 - 191
hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/OzoneInputStream.java

@@ -1,191 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.ozone;
-
-import java.io.EOFException;
-import java.io.File;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.net.URI;
-import java.util.Objects;
-
-import com.google.common.base.Preconditions;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSExceptionMessages;
-import org.apache.hadoop.fs.FSInputStream;
-import org.apache.hadoop.fs.FileSystem.Statistics;
-import org.apache.hadoop.fs.LocalDirAllocator;
-import org.apache.hadoop.ozone.web.client.OzoneBucket;
-import org.apache.hadoop.ozone.client.rest.OzoneException;
-
-import static org.apache.hadoop.fs.ozone.Constants.BUFFER_TMP_KEY;
-import static org.apache.hadoop.fs.ozone.Constants.BUFFER_DIR_KEY;
-
-/**
- * Wraps OzoneInputStream implementation.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Evolving
-public final class OzoneInputStream extends FSInputStream {
-  private static final Log LOG = LogFactory.getLog(OzoneInputStream.class);
-
-  private final RandomAccessFile in;
-
-  /** Closed bit. Volatile so reads are non-blocking. */
-  private volatile boolean closed = false;
-
-  /** the ozone bucket client. */
-  private final OzoneBucket bucket;
-
-  /** The object key. */
-  private final String key;
-
-  /** Object content length. */
-  private final long contentLen;
-
-  /** file system stats. */
-  private final Statistics stats;
-
-  private final URI keyUri;
-
-  OzoneInputStream(Configuration conf, URI fsUri, OzoneBucket bucket,
-      String key, long contentLen, int bufferSize, Statistics statistics)
-      throws IOException {
-    Objects.requireNonNull(bucket, "bucket can not be null!");
-    Objects.requireNonNull(key, "kenName can not be null!");
-    this.bucket = bucket;
-    this.key = key;
-    this.contentLen = contentLen;
-    this.stats = statistics;
-    this.keyUri = fsUri.resolve(key);
-
-    if (conf.get(BUFFER_DIR_KEY) == null) {
-      conf.set(BUFFER_DIR_KEY, conf.get(BUFFER_TMP_KEY) + "/ozone");
-    }
-    final LocalDirAllocator dirAlloc = new LocalDirAllocator(BUFFER_DIR_KEY);
-    final File tmpFile = dirAlloc.createTmpFileForWrite("output-",
-        LocalDirAllocator.SIZE_UNKNOWN, conf);
-    try {
-      LOG.trace("Get Key:" + this.keyUri + " tmp-file:" + tmpFile.toPath());
-      bucket.getKey(this.key, tmpFile.toPath());
-      in = new RandomAccessFile(tmpFile, "r");
-      statistics.incrementReadOps(1);
-    } catch (OzoneException oe) {
-      final String msg = "Error when getBytes for key = " + key;
-      LOG.error(msg, oe);
-      throw new IOException(msg, oe);
-    }
-  }
-
-  @Override
-  public synchronized void seek(long targetPos) throws IOException {
-    checkNotClosed();
-    // Do not allow negative seek
-    if (targetPos < 0) {
-      throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK + targetPos);
-    }
-
-    if (this.contentLen <= 0) {
-      return;
-    }
-
-    in.seek(targetPos);
-  }
-
-  @Override
-  public synchronized long getPos() throws IOException {
-    checkNotClosed();
-    return in.getFilePointer();
-  }
-
-  @Override
-  public boolean seekToNewSource(long l) throws IOException {
-    return false;
-  }
-
-  @Override
-  public synchronized int read() throws IOException {
-    int ch = in.read();
-    if (stats != null && ch != -1) {
-      stats.incrementBytesRead(1);
-    }
-    return ch;
-  }
-
-  @Override
-  public int read(long position, byte[] buffer, int offset, int length)
-      throws IOException {
-    Preconditions.checkArgument(buffer != null, "buffer can not be null");
-    int numberOfByteRead = super.read(position, buffer, offset, length);
-
-    if (stats != null && numberOfByteRead > 0) {
-      stats.incrementBytesRead(numberOfByteRead);
-    }
-    return numberOfByteRead;
-  }
-
-  @Override
-  public synchronized int read(byte[] buffer, int offset, int length)
-      throws IOException {
-    Preconditions.checkArgument(buffer != null, "buffer can not be null");
-    int numberOfByteRead = in.read(buffer, offset, length);
-    if (stats != null && numberOfByteRead > 0) {
-      stats.incrementBytesRead(numberOfByteRead);
-    }
-    return numberOfByteRead;
-  }
-
-  @Override
-  public synchronized int available() throws IOException {
-    checkNotClosed();
-
-    final long remainingInWrapped = contentLen - in.getFilePointer();
-    return (remainingInWrapped < Integer.MAX_VALUE)
-        ? (int)remainingInWrapped
-        : Integer.MAX_VALUE;
-  }
-
-  @Override
-  public synchronized void close() throws IOException {
-    in.close();
-  }
-
-  @Override
-  public synchronized long skip(long pos) throws IOException {
-    return in.skipBytes((int) pos);
-  }
-
-  /**
-   * Verify that the input stream is open. Non blocking; this gives
-   * the last state of the volatile {@link #closed} field.
-   * @throws IOException if the connection is closed.
-   */
-  private void checkNotClosed() throws IOException {
-    if (closed) {
-      throw new IOException(this.keyUri + ": "
-          + FSExceptionMessages.STREAM_IS_CLOSED);
-    }
-  }
-
-}

+ 0 - 113
hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/OzoneOutputStream.java

@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.ozone;
-
-import java.io.BufferedOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.URI;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem.Statistics;
-import org.apache.hadoop.fs.LocalDirAllocator;
-import org.apache.hadoop.ozone.web.client.OzoneBucket;
-import org.apache.hadoop.ozone.client.rest.OzoneException;
-
-import static org.apache.hadoop.fs.ozone.Constants.BUFFER_DIR_KEY;
-import static org.apache.hadoop.fs.ozone.Constants.BUFFER_TMP_KEY;
-
-
-/**
- * The output stream for Ozone file system.
- *
- * Data will be buffered on local disk, then uploaded to Ozone in
- * {@link #close()} method.
- *
- * This class is not thread safe.
- */
-public class OzoneOutputStream extends OutputStream {
-  private static final Log LOG = LogFactory.getLog(OzoneOutputStream.class);
-  private OzoneBucket bucket;
-  private final String key;
-  private final URI keyUri;
-  private Statistics statistics;
-  private LocalDirAllocator dirAlloc;
-  private boolean closed;
-  private File tmpFile;
-  private BufferedOutputStream backupStream;
-
-  OzoneOutputStream(Configuration conf, URI fsUri, OzoneBucket bucket,
-      String key, Statistics statistics) throws IOException {
-    this.bucket = bucket;
-    this.key = key;
-    this.keyUri = fsUri.resolve(key);
-    this.statistics = statistics;
-
-    if (conf.get(BUFFER_DIR_KEY) == null) {
-      conf.set(BUFFER_DIR_KEY, conf.get(BUFFER_TMP_KEY) + "/ozone");
-    }
-    dirAlloc = new LocalDirAllocator(BUFFER_DIR_KEY);
-    tmpFile = dirAlloc.createTmpFileForWrite("output-",
-        LocalDirAllocator.SIZE_UNKNOWN, conf);
-    backupStream = new BufferedOutputStream(new FileOutputStream(tmpFile));
-
-    closed = false;
-  }
-
-  @Override
-  public synchronized void close() throws IOException {
-    if (closed) {
-      return;
-    }
-    closed = true;
-    if (backupStream != null) {
-      backupStream.close();
-    }
-    try {
-      LOG.trace("Put tmp-file:" + tmpFile + " to key "+ keyUri);
-      bucket.putKey(key, tmpFile);
-      statistics.incrementWriteOps(1);
-    } catch (OzoneException oe) {
-      final String msg = "Uploading error: file=" + tmpFile + ", key=" + key;
-      LOG.error(msg, oe);
-      throw new IOException(msg, oe);
-    } finally {
-      if (!tmpFile.delete()) {
-        LOG.warn("Can not delete tmpFile: " + tmpFile);
-      }
-    }
-  }
-
-  @Override
-  public synchronized void flush() throws IOException {
-    backupStream.flush();
-  }
-
-  @Override
-  public synchronized void write(int b) throws IOException {
-    backupStream.write(b);
-    statistics.incrementBytesWritten(1);
-  }
-
-}

+ 3 - 1
hadoop-tools/hadoop-ozone/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileInterfaces.java

@@ -114,7 +114,9 @@ public class TestOzoneFileInterfaces {
     }
 
     FileStatus status = fs.getFileStatus(path);
-    Assert.assertTrue(status.getModificationTime() < currentTime);
+    // The timestamp of the newly created file should always be greater than
+    // the time when the test was started
+    Assert.assertTrue(status.getModificationTime() > currentTime);
 
     try (FSDataInputStream inputStream = fs.open(path)) {
       byte[] buffer = new byte[stringLen];

+ 8 - 2
hadoop-tools/hadoop-ozone/src/test/java/org/apache/hadoop/fs/ozone/contract/OzoneContract.java

@@ -35,6 +35,8 @@ import org.apache.hadoop.ozone.web.handlers.UserArgs;
 import org.apache.hadoop.ozone.web.handlers.VolumeArgs;
 import org.apache.hadoop.ozone.web.interfaces.StorageHandler;
 import org.apache.hadoop.ozone.web.utils.OzoneUtils;
+import org.apache.hadoop.ozone.ksm.KSMConfigKeys;
+import org.apache.hadoop.scm.ScmConfigKeys;
 import org.junit.Assert;
 
 import java.io.IOException;
@@ -76,6 +78,10 @@ class OzoneContract extends AbstractFSContract {
     storageHandler = new ObjectStoreHandler(conf).getStorageHandler();
   }
 
+  private void copyClusterConfigs(String configKey) {
+    getConf().set(configKey, cluster.getConf().get(configKey));
+  }
+
   @Override
   public FileSystem getTestFileSystem() throws IOException {
     //assumes cluster is not null
@@ -95,8 +101,6 @@ class OzoneContract extends AbstractFSContract {
     BucketArgs bucketArgs = new BucketArgs(volumeName, bucketName, userArgs);
     try {
       storageHandler.createVolume(volumeArgs);
-
-
       storageHandler.createBucket(bucketArgs);
     } catch (OzoneException e) {
       throw new IOException(e.getMessage());
@@ -107,6 +111,8 @@ class OzoneContract extends AbstractFSContract {
     String uri = String.format("%s://localhost:%d/%s/%s",
         Constants.OZONE_URI_SCHEME, port, volumeName, bucketName);
     getConf().set("fs.defaultFS", uri);
+    copyClusterConfigs(KSMConfigKeys.OZONE_KSM_ADDRESS_KEY);
+    copyClusterConfigs(ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY);
     return FileSystem.get(getConf());
   }