Преглед изворни кода

Integration of TOS: Add RawFileSystem.

lijinglun пре 7 месеци
родитељ
комит
59b0ea5dfe

+ 59 - 0
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/RawFSUtils.java

@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
+
+import java.util.Objects;
+
+public class RawFSUtils {
+  private RawFSUtils() {
+  }
+
+  /**
+   * @return true means the node is included in the subtree which has the root node.
+   */
+  public static boolean inSubtree(String root, String p) {
+    return inSubtree(new Path(root), new Path(p));
+  }
+
+  /**
+   * @return true means the node is included in the subtree which has the root node.
+   */
+  public static boolean inSubtree(Path root, Path node) {
+    Preconditions.checkNotNull(root, "Root cannot be null");
+    Preconditions.checkNotNull(node, "Node cannot be null");
+    if (root.isRoot()) {
+      return true;
+    }
+
+    if (Objects.equals(root, node)) {
+      return true;
+    }
+
+    while (!node.isRoot()) {
+      if (Objects.equals(root, node)) {
+        return true;
+      }
+      node = node.getParent();
+    }
+    return false;
+  }
+}

+ 49 - 65
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/RawFileSystem.java

@@ -18,32 +18,8 @@
 
 
 package org.apache.hadoop.fs.tosfs;
 package org.apache.hadoop.fs.tosfs;
 
 
-import io.proton.commit.MagicOutputStream;
-import io.proton.common.conf.Conf;
-import io.proton.common.conf.ConfKeys;
-import io.proton.common.object.DirectoryStorage;
-import io.proton.common.object.ObjectInfo;
-import io.proton.common.object.ObjectMultiRangeInputStream;
-import io.proton.common.object.ObjectOutputStream;
-import io.proton.common.object.ObjectRangeInputStream;
-import io.proton.common.object.ObjectStorage;
-import io.proton.common.object.ObjectStorageFactory;
-import io.proton.common.object.ObjectUtils;
-import io.proton.common.object.exceptions.InvalidObjectKeyException;
-import io.proton.common.util.Bytes;
-import io.proton.common.util.Constants;
-import io.proton.common.util.FSUtils;
-import io.proton.common.util.FuseUtils;
-import io.proton.common.util.HadoopUtil;
-import io.proton.common.util.Range;
-import io.proton.common.util.RemoteIterators;
-import io.proton.common.util.ThreadPools;
-import io.proton.fs.ops.DefaultFsOps;
-import io.proton.fs.ops.DirectoryFsOps;
-import io.proton.fs.ops.FsOps;
-import io.proton.shaded.com.google.common.annotations.VisibleForTesting;
-import io.proton.shaded.com.google.common.base.Preconditions;
-import io.proton.shaded.com.google.common.collect.Iterators;
+import com.google.common.collect.Iterators;
+import org.apache.hadoop.classification.VisibleForTesting;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.CreateFlag;
@@ -62,7 +38,31 @@ import org.apache.hadoop.fs.PathIOException;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.XAttrSetFlag;
 import org.apache.hadoop.fs.XAttrSetFlag;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.tosfs.commit.MagicOutputStream;
+import org.apache.hadoop.fs.tosfs.common.Bytes;
+import org.apache.hadoop.fs.tosfs.common.ThreadPools;
+import org.apache.hadoop.fs.tosfs.conf.ConfKeys;
+import org.apache.hadoop.fs.tosfs.object.ChecksumInfo;
+import org.apache.hadoop.fs.tosfs.object.Constants;
+import org.apache.hadoop.fs.tosfs.object.DirectoryStorage;
+import org.apache.hadoop.fs.tosfs.object.ObjectInfo;
+import org.apache.hadoop.fs.tosfs.object.ObjectMultiRangeInputStream;
+import org.apache.hadoop.fs.tosfs.object.ObjectOutputStream;
+import org.apache.hadoop.fs.tosfs.object.ObjectRangeInputStream;
+import org.apache.hadoop.fs.tosfs.object.ObjectStorage;
+import org.apache.hadoop.fs.tosfs.object.ObjectStorageFactory;
+import org.apache.hadoop.fs.tosfs.object.ObjectUtils;
+import org.apache.hadoop.fs.tosfs.object.exceptions.InvalidObjectKeyException;
+import org.apache.hadoop.fs.tosfs.ops.DefaultFsOps;
+import org.apache.hadoop.fs.tosfs.ops.DirectoryFsOps;
+import org.apache.hadoop.fs.tosfs.ops.FsOps;
+import org.apache.hadoop.fs.tosfs.util.FSUtils;
+import org.apache.hadoop.fs.tosfs.util.FuseUtils;
+import org.apache.hadoop.fs.tosfs.util.Range;
+import org.apache.hadoop.fs.tosfs.util.RemoteIterators;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
+import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.Progressable;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
@@ -79,7 +79,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.NoSuchElementException;
-import java.util.Objects;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.Future;
@@ -99,7 +98,7 @@ public class RawFileSystem extends FileSystem {
   private static final long DFS_BLOCK_SIZE_DEFAULT = 128 << 20;
   private static final long DFS_BLOCK_SIZE_DEFAULT = 128 << 20;
 
 
   private String scheme;
   private String scheme;
-  private Conf protonConf;
+  private Configuration protonConf;
   private String username;
   private String username;
   private Path workingDir;
   private Path workingDir;
   private URI uri;
   private URI uri;
@@ -134,35 +133,24 @@ public class RawFileSystem extends FileSystem {
   @Override
   @Override
   public FSDataInputStream open(Path path, int bufferSize) throws IOException {
   public FSDataInputStream open(Path path, int bufferSize) throws IOException {
     LOG.debug("Opening '{}' for reading.", path);
     LOG.debug("Opening '{}' for reading.", path);
-    FileStatus status = innerFileStatus(path);
+    RawFileStatus status = innerFileStatus(path);
     if (status.isDirectory()) {
     if (status.isDirectory()) {
       throw new FileNotFoundException(String.format("Can't open %s because it is a directory", path));
       throw new FileNotFoundException(String.format("Can't open %s because it is a directory", path));
     }
     }
 
 
     // Parse the range size from the hadoop conf.
     // Parse the range size from the hadoop conf.
     long rangeSize = getConf().getLong(
     long rangeSize = getConf().getLong(
-        ConfKeys.OBJECT_STREAM_RANGE_SIZE.key(),
-        ConfKeys.OBJECT_STREAM_RANGE_SIZE.defaultValue());
+        ConfKeys.OBJECT_STREAM_RANGE_SIZE,
+        ConfKeys.OBJECT_STREAM_RANGE_SIZE_DEFAULT);
     Preconditions.checkArgument(rangeSize > 0, "Object storage range size must be positive.");
     Preconditions.checkArgument(rangeSize > 0, "Object storage range size must be positive.");
 
 
-    FSInputStream fsIn = new ObjectMultiRangeInputStream(taskThreadPool, storage, path, status.getLen(), rangeSize);
+    FSInputStream fsIn = new ObjectMultiRangeInputStream(taskThreadPool, storage, path,
+        status.getLen(), rangeSize, status.checksum());
     return new FSDataInputStream(fsIn);
     return new FSDataInputStream(fsIn);
   }
   }
 
 
-  public FSDataInputStream open(Path path, String expectedChecksum, Range range) throws IOException {
-    LOG.debug("Opening '{}' for reading.", path);
-    RawFileStatus status = innerFileStatus(path);
-    if (status.isDirectory()) {
-      throw new FileNotFoundException(String.format("Can't open %s because it is a directory", path));
-    }
-
-    if (expectedChecksum != null && !Objects.equals(status.checksum(), expectedChecksum)) {
-      throw new ChecksumMismatchException(String.format("The requested file has been staled, " +
-          "request version's checksum is %s " +
-          "while current version's checksum is %s", expectedChecksum, status.checksum()));
-    }
-
-    return new FSDataInputStream(new ObjectRangeInputStream(storage, path, range));
+  public FSDataInputStream open(Path path, byte[] expectedChecksum, Range range) throws IOException {
+    return new FSDataInputStream(new ObjectRangeInputStream(storage, path, range, expectedChecksum));
   }
   }
 
 
   @Override
   @Override
@@ -547,7 +535,7 @@ public class RawFileSystem extends FileSystem {
 
 
     // Root directory always exists
     // Root directory always exists
     if (key.isEmpty()) {
     if (key.isEmpty()) {
-      return new RawFileStatus(0, true, 0, 0, qualifiedPath, username, Constants.EMPTY_CHECKSUM);
+      return new RawFileStatus(0, true, 0, 0, qualifiedPath, username, Constants.MAGIC_CHECKSUM);
     }
     }
 
 
     try {
     try {
@@ -577,7 +565,7 @@ public class RawFileSystem extends FileSystem {
   }
   }
 
 
   @Override
   @Override
-  public FsServerDefaults getServerDefaults(Path p) throws IOException {
+  public FsServerDefaults getServerDefaults(Path p) {
     Configuration config = getConf();
     Configuration config = getConf();
     // CRC32 is chosen as default as it is available in all
     // CRC32 is chosen as default as it is available in all
     // releases that support checksum.
     // releases that support checksum.
@@ -595,8 +583,8 @@ public class RawFileSystem extends FileSystem {
   }
   }
 
 
   private void stopAllServices() {
   private void stopAllServices() {
-    HadoopUtil.shutdownHadoopExecutors(uploadThreadPool, LOG, 30, TimeUnit.SECONDS);
-    HadoopUtil.shutdownHadoopExecutors(taskThreadPool, LOG, 30, TimeUnit.SECONDS);
+    ThreadPools.shutdown(uploadThreadPool, 30, TimeUnit.SECONDS);
+    ThreadPools.shutdown(taskThreadPool, 30, TimeUnit.SECONDS);
   }
   }
 
 
   @Override
   @Override
@@ -605,10 +593,6 @@ public class RawFileSystem extends FileSystem {
     setConf(conf);
     setConf(conf);
     this.scheme = FSUtils.scheme(conf, uri);
     this.scheme = FSUtils.scheme(conf, uri);
 
 
-    // Merge the deprecated configure keys with the new configure keys and convert hadoop conf to proton conf.
-    FSUtils.withCompatibleKeys(conf, scheme);
-    this.protonConf = Conf.copyOf(conf);
-
     // Username is the current user at the time the FS was instantiated.
     // Username is the current user at the time the FS was instantiated.
     this.username = UserGroupInformation.getCurrentUser().getShortUserName();
     this.username = UserGroupInformation.getCurrentUser().getShortUserName();
     this.workingDir = new Path("/user", username).makeQualified(uri, null);
     this.workingDir = new Path("/user", username).makeQualified(uri, null);
@@ -619,14 +603,15 @@ public class RawFileSystem extends FileSystem {
       throw new FileNotFoundException(String.format("Bucket: %s not found.", uri.getAuthority()));
       throw new FileNotFoundException(String.format("Bucket: %s not found.", uri.getAuthority()));
     }
     }
 
 
-    int taskThreadPoolSize = protonConf.get(ConfKeys.TASK_THREAD_POOL_SIZE.format(scheme));
+    int taskThreadPoolSize =
+        protonConf.getInt(ConfKeys.TASK_THREAD_POOL_SIZE, ConfKeys.TASK_THREAD_POOL_SIZE_DEFAULT);
     this.taskThreadPool = ThreadPools.newWorkerPool(TASK_THREAD_POOL_PREFIX, taskThreadPoolSize);
     this.taskThreadPool = ThreadPools.newWorkerPool(TASK_THREAD_POOL_PREFIX, taskThreadPoolSize);
 
 
-    int uploadThreadPoolSize = protonConf.get(ConfKeys.MULTIPART_THREAD_POOL_SIZE.format(scheme));
+    int uploadThreadPoolSize = protonConf.getInt(ConfKeys.MULTIPART_THREAD_POOL_SIZE,
+        ConfKeys.MULTIPART_THREAD_POOL_SIZE_DEFAULT);
     this.uploadThreadPool = ThreadPools.newWorkerPool(MULTIPART_THREAD_POOL_PREFIX, uploadThreadPoolSize);
     this.uploadThreadPool = ThreadPools.newWorkerPool(MULTIPART_THREAD_POOL_PREFIX, uploadThreadPoolSize);
 
 
     if (storage.bucket().isDirectory()) {
     if (storage.bucket().isDirectory()) {
-
       fsOps = new DirectoryFsOps((DirectoryStorage) storage, this::objectToFileStatus);
       fsOps = new DirectoryFsOps((DirectoryStorage) storage, this::objectToFileStatus);
     } else {
     } else {
       fsOps = new DefaultFsOps(storage, protonConf, taskThreadPool, this::objectToFileStatus);
       fsOps = new DefaultFsOps(storage, protonConf, taskThreadPool, this::objectToFileStatus);
@@ -651,26 +636,24 @@ public class RawFileSystem extends FileSystem {
     return uploadThreadPool;
     return uploadThreadPool;
   }
   }
 
 
-  String username() {
-    return username;
-  }
-
   /**
   /**
    * @return null if checksum is not supported.
    * @return null if checksum is not supported.
    */
    */
   @Override
   @Override
   public FileChecksum getFileChecksum(Path f, long length) throws IOException {
   public FileChecksum getFileChecksum(Path f, long length) throws IOException {
     Preconditions.checkArgument(length >= 0);
     Preconditions.checkArgument(length >= 0);
+
     RawFileStatus fileStatus = innerFileStatus(f);
     RawFileStatus fileStatus = innerFileStatus(f);
     if (fileStatus.isDirectory()) {
     if (fileStatus.isDirectory()) {
       // Compatible with HDFS
       // Compatible with HDFS
       throw new FileNotFoundException(String.format("Path is not a file, %s", f));
       throw new FileNotFoundException(String.format("Path is not a file, %s", f));
     }
     }
-    if (!protonConf.get(ConfKeys.CHECKSUM_ENABLED.format(scheme))) {
+    if (!protonConf.getBoolean(ConfKeys.CHECKSUM_ENABLED, ConfKeys.CHECKSUM_ENABLED_DEFAULT)) {
       return null;
       return null;
     }
     }
 
 
-    return BaseChecksum.create(protonConf, fileStatus, length);
+    ChecksumInfo csInfo = storage.checksumInfo();
+    return new TosChecksum(csInfo.algorithm(), fileStatus.checksum());
   }
   }
 
 
   @Override
   @Override
@@ -708,7 +691,8 @@ public class RawFileSystem extends FileSystem {
       String key = ObjectUtils.pathToKey(qualifiedPath);
       String key = ObjectUtils.pathToKey(qualifiedPath);
 
 
       Map<String, String> tags = storage.getTags(key);
       Map<String, String> tags = storage.getTags(key);
-      return tags.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, t -> Bytes.toBytes(t.getValue())));
+      return tags.entrySet().stream()
+          .collect(Collectors.toMap(Map.Entry::getKey, t -> Bytes.toBytes(t.getValue())));
     }
     }
   }
   }
 
 
@@ -736,7 +720,7 @@ public class RawFileSystem extends FileSystem {
 
 
   @Override
   @Override
   public List<String> listXAttrs(Path path) throws IOException {
   public List<String> listXAttrs(Path path) throws IOException {
-    return getXAttrs(path).keySet().stream().collect(Collectors.toList());
+    return Lists.newArrayList(getXAttrs(path).keySet());
   }
   }
 
 
   @Override
   @Override

+ 30 - 0
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/RawLocatedFileStatus.java

@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs;
+
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.LocatedFileStatus;
+
+import static org.apache.hadoop.util.Preconditions.checkNotNull;
+
+public class RawLocatedFileStatus extends LocatedFileStatus {
+  public RawLocatedFileStatus(RawFileStatus status, BlockLocation[] locations) {
+    super(checkNotNull(status), locations);
+  }
+}

+ 67 - 0
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/TosChecksum.java

@@ -0,0 +1,67 @@
+/*
+ * ByteDance Volcengine EMR, Copyright 2022.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs;
+
+import org.apache.hadoop.fs.FileChecksum;
+import org.apache.hadoop.fs.tosfs.common.Bytes;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+public class TosChecksum extends FileChecksum {
+  private String algorithm;
+  private byte[] checksum;
+
+  public TosChecksum(String algorithm, byte[] checksum) {
+    this.algorithm = algorithm;
+    this.checksum = checksum;
+  }
+
+  @Override
+  public String getAlgorithmName() {
+    return algorithm;
+  }
+
+  @Override
+  public int getLength() {
+    return checksum.length;
+  }
+
+  @Override
+  public byte[] getBytes() {
+    return checksum;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    byte[] algorithmBytes = Bytes.toBytes(algorithm);
+    out.write(algorithmBytes.length);
+    out.write(algorithmBytes);
+    out.write(checksum.length);
+    out.write(checksum);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    byte[] algorithmBytes = new byte[in.readInt()];
+    in.readFully(algorithmBytes);
+    algorithm = Bytes.toString(algorithmBytes);
+    checksum = new byte[in.readInt()];
+    in.readFully(checksum);
+  }
+}

+ 13 - 0
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/common/Bytes.java

@@ -19,6 +19,7 @@ package org.apache.hadoop.fs.tosfs.common;
 import org.apache.hadoop.util.Preconditions;
 import org.apache.hadoop.util.Preconditions;
 
 
 import java.nio.ByteBuffer;
 import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
 
 
 public class Bytes {
 public class Bytes {
   private Bytes() {
   private Bytes() {
@@ -63,6 +64,10 @@ public class Bytes {
     return b;
     return b;
   }
   }
 
 
+  public static byte[] toBytes(String s) {
+    return s.getBytes(StandardCharsets.UTF_8);
+  }
+
   // Decode big-endian binaries into basic Java types.
   // Decode big-endian binaries into basic Java types.
 
 
   public static boolean toBoolean(byte[] b) {
   public static boolean toBoolean(byte[] b) {
@@ -166,4 +171,12 @@ public class Bytes {
     System.arraycopy(b, off, data, 0, len);
     System.arraycopy(b, off, data, 0, len);
     return data;
     return data;
   }
   }
+
+  public static String toString(byte[] b) {
+    return new String(b, StandardCharsets.UTF_8);
+  }
+
+  public static String toString(byte[] b, int off, int len) {
+    return new String(b, off, len, StandardCharsets.UTF_8);
+  }
 }
 }

+ 23 - 0
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/conf/ConfKeys.java

@@ -117,6 +117,29 @@ public class ConfKeys {
   public static final String OBJECT_STREAM_RANGE_SIZE = "proton.objectstorage.stream.range-size";
   public static final String OBJECT_STREAM_RANGE_SIZE = "proton.objectstorage.stream.range-size";
   public static final long OBJECT_STREAM_RANGE_SIZE_DEFAULT = Long.MAX_VALUE;
   public static final long OBJECT_STREAM_RANGE_SIZE_DEFAULT = Long.MAX_VALUE;
 
 
+  /**
+   * The size of thread pool used for running tasks in parallel for the given object fs,
+   * e.g. delete objects, copy files. the key example: fs.tos.task.thread-pool-size.
+   */
+  public static final String TASK_THREAD_POOL_SIZE = "fs.tos.task.thread-pool-size";
+  public static final int TASK_THREAD_POOL_SIZE_DEFAULT =
+      Math.max(2, Runtime.getRuntime().availableProcessors());
+
+  /**
+   * The size of thread pool used for uploading multipart in parallel for the given object storage,
+   * e.g. fs.tos.multipart.thread-pool-size
+   */
+  public static final String MULTIPART_THREAD_POOL_SIZE = "fs.tos.multipart.thread-pool-size";
+  public static final int MULTIPART_THREAD_POOL_SIZE_DEFAULT =
+      Math.max(2, Runtime.getRuntime().availableProcessors());
+
+  /**
+   * The toggle indicates whether enable checksum during getting file status for the given object.
+   * E.g. fs.tos.checksum.enabled
+   */
+  public static final String CHECKSUM_ENABLED = "fs.tos.checksum.enabled";
+  public static final boolean CHECKSUM_ENABLED_DEFAULT = true;
+
   public static String defaultDir(String basename) {
   public static String defaultDir(String basename) {
     String tmpdir = System.getProperty("java.io.tmpdir");
     String tmpdir = System.getProperty("java.io.tmpdir");
     Preconditions.checkNotNull(tmpdir, "System property 'java.io.tmpdir' cannot be null");
     Preconditions.checkNotNull(tmpdir, "System property 'java.io.tmpdir' cannot be null");

+ 28 - 0
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/util/FuseUtils.java

@@ -0,0 +1,28 @@
+/*
+ * ByteDance Volcengine EMR, Copyright 2022.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.util;
+
+public class FuseUtils {
+  public static final String ENV_TOS_ENABLE_FUSE = "TOS_ENABLE_FUSE";
+
+  private FuseUtils() {
+  }
+
+  public static boolean fuseEnabled() {
+    return ParseUtils.envAsBoolean(ENV_TOS_ENABLE_FUSE, false);
+  }
+}

+ 122 - 0
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/util/RemoteIterators.java

@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.util;
+
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.thirdparty.com.google.common.base.MoreObjects;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+public class RemoteIterators {
+
+  private RemoteIterators() {
+  }
+
+  /**
+   * Create an iterator from a singleton.
+   *
+   * @param singleton instance
+   * @param <T>       type
+   * @return a remote iterator
+   */
+  public static <T> RemoteIterator<T> fromSingleton(@Nullable T singleton) {
+    return new SingletonIterator<>(singleton);
+  }
+
+  /**
+   * Create an iterator from an iterable and a transformation function.
+   *
+   * @param <S>      source type
+   * @param <T>      result type
+   * @param iterator source
+   * @param mapper   transformation
+   * @return a remote iterator
+   */
+  public static <S, T> RemoteIterator<T> fromIterable(Iterable<S> iterator, FunctionRaisingIOE<S, T> mapper) {
+    return new IterableRemoteIterator<>(iterator, mapper);
+  }
+
+  public interface FunctionRaisingIOE<S, T> {
+
+    /**
+     * Apply the function.
+     *
+     * @param s argument 1
+     * @return result
+     * @throws IOException Any IO failure
+     */
+    T apply(S s) throws IOException;
+  }
+
+  private static final class IterableRemoteIterator<S, T> implements RemoteIterator<T> {
+    private final Iterator<S> sourceIterator;
+    private final FunctionRaisingIOE<S, T> mapper;
+
+    private IterableRemoteIterator(Iterable<S> source, FunctionRaisingIOE<S, T> mapper) {
+      this.sourceIterator = source.iterator();
+      this.mapper = mapper;
+    }
+
+    @Override
+    public boolean hasNext() {
+      return sourceIterator.hasNext();
+    }
+
+    @Override
+    public T next() throws IOException {
+      return mapper.apply(sourceIterator.next());
+    }
+  }
+
+  private static final class SingletonIterator<T> implements RemoteIterator<T> {
+    private final T singleton;
+
+    private boolean processed;
+
+    private SingletonIterator(@Nullable T singleton) {
+      this.singleton = singleton;
+      this.processed = singleton == null;
+    }
+
+    @Override
+    public boolean hasNext() {
+      return !processed;
+    }
+
+    @Override
+    public T next() {
+      if (hasNext()) {
+        processed = true;
+        return singleton;
+      } else {
+        throw new NoSuchElementException();
+      }
+    }
+
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(this)
+          .add("singleton", singleton)
+          .toString();
+    }
+  }
+}