|
@@ -18,6 +18,7 @@
|
|
|
|
|
|
package org.apache.hadoop.ozone.container.keyvalue.helpers;
|
|
|
|
|
|
+import com.google.common.annotations.VisibleForTesting;
|
|
|
import com.google.common.base.Preconditions;
|
|
|
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
|
|
.ContainerCommandRequestProto;
|
|
@@ -36,17 +37,20 @@ import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
|
|
|
import org.apache.hadoop.ozone.container.keyvalue.impl.ChunkManagerImpl;
|
|
|
import org.apache.hadoop.ozone.container.common.volume.VolumeIOStats;
|
|
|
import org.apache.hadoop.util.Time;
|
|
|
+import org.apache.ratis.util.function.CheckedSupplier;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
import java.io.File;
|
|
|
import java.io.IOException;
|
|
|
import java.nio.ByteBuffer;
|
|
|
-import java.nio.channels.AsynchronousFileChannel;
|
|
|
import java.nio.channels.FileChannel;
|
|
|
import java.nio.channels.FileLock;
|
|
|
+import java.nio.file.Path;
|
|
|
import java.nio.file.StandardOpenOption;
|
|
|
import java.security.NoSuchAlgorithmException;
|
|
|
+import java.util.Set;
|
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
|
import java.util.concurrent.ExecutionException;
|
|
|
|
|
|
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.*;
|
|
@@ -56,6 +60,8 @@ import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Res
|
|
|
*/
|
|
|
public final class ChunkUtils {
|
|
|
|
|
|
+ private static final Set<Path> LOCKS = ConcurrentHashMap.newKeySet();
|
|
|
+
|
|
|
/** Never constructed. **/
|
|
|
private ChunkUtils() {
|
|
|
|
|
@@ -67,9 +73,8 @@ public final class ChunkUtils {
|
|
|
* @param chunkFile - File to write data to.
|
|
|
* @param chunkInfo - Data stream to write.
|
|
|
* @param data - The data buffer.
|
|
|
- * @param volumeIOStats
|
|
|
+ * @param volumeIOStats statistics collector
|
|
|
* @param sync whether to do fsync or not
|
|
|
- * @throws StorageContainerException
|
|
|
*/
|
|
|
public static void writeData(File chunkFile, ChunkInfo chunkInfo,
|
|
|
ByteBuffer data, VolumeIOStats volumeIOStats, boolean sync)
|
|
@@ -85,58 +90,43 @@ public final class ChunkUtils {
|
|
|
throw new StorageContainerException(err, INVALID_WRITE_SIZE);
|
|
|
}
|
|
|
|
|
|
- FileChannel file = null;
|
|
|
- FileLock lock = null;
|
|
|
+ Path path = chunkFile.toPath();
|
|
|
+ long startTime = Time.monotonicNow();
|
|
|
+ processFileExclusively(path, () -> {
|
|
|
+ FileChannel file = null;
|
|
|
+ try {
|
|
|
+ // skip SYNC and DSYNC to reduce contention on file.lock
|
|
|
+ file = FileChannel.open(path,
|
|
|
+ StandardOpenOption.CREATE,
|
|
|
+ StandardOpenOption.WRITE,
|
|
|
+ StandardOpenOption.SPARSE);
|
|
|
+
|
|
|
+ int size;
|
|
|
+ try (FileLock ignored = file.lock()) {
|
|
|
+ size = file.write(data, chunkInfo.getOffset());
|
|
|
+ }
|
|
|
|
|
|
- try {
|
|
|
- long writeTimeStart = Time.monotonicNow();
|
|
|
-
|
|
|
- // skip SYNC and DSYNC to reduce contention on file.lock
|
|
|
- file = FileChannel.open(chunkFile.toPath(),
|
|
|
- StandardOpenOption.CREATE,
|
|
|
- StandardOpenOption.WRITE,
|
|
|
- StandardOpenOption.SPARSE);
|
|
|
-
|
|
|
- lock = file.lock();
|
|
|
- int size = file.write(data, chunkInfo.getOffset());
|
|
|
- // Increment volumeIO stats here.
|
|
|
- volumeIOStats.incWriteTime(Time.monotonicNow() - writeTimeStart);
|
|
|
- volumeIOStats.incWriteOpCount();
|
|
|
- volumeIOStats.incWriteBytes(size);
|
|
|
- if (size != bufferSize) {
|
|
|
- log.error("Invalid write size found. Size:{} Expected: {} ", size,
|
|
|
- bufferSize);
|
|
|
- throw new StorageContainerException("Invalid write size found. " +
|
|
|
- "Size: " + size + " Expected: " + bufferSize, INVALID_WRITE_SIZE);
|
|
|
+ // Increment volumeIO stats here.
|
|
|
+ volumeIOStats.incWriteTime(Time.monotonicNow() - startTime);
|
|
|
+ volumeIOStats.incWriteOpCount();
|
|
|
+ volumeIOStats.incWriteBytes(size);
|
|
|
+ if (size != bufferSize) {
|
|
|
+ log.error("Invalid write size found. Size:{} Expected: {} ", size,
|
|
|
+ bufferSize);
|
|
|
+ throw new StorageContainerException("Invalid write size found. " +
|
|
|
+ "Size: " + size + " Expected: " + bufferSize, INVALID_WRITE_SIZE);
|
|
|
+ }
|
|
|
+ } catch (StorageContainerException ex) {
|
|
|
+ throw ex;
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw new StorageContainerException(e, IO_EXCEPTION);
|
|
|
+ } finally {
|
|
|
+ closeFile(file, sync);
|
|
|
}
|
|
|
- } catch (StorageContainerException ex) {
|
|
|
- throw ex;
|
|
|
- } catch(IOException e) {
|
|
|
- throw new StorageContainerException(e, IO_EXCEPTION);
|
|
|
|
|
|
- } finally {
|
|
|
- if (lock != null) {
|
|
|
- try {
|
|
|
- lock.release();
|
|
|
- } catch (IOException e) {
|
|
|
- log.error("Unable to release lock ??, Fatal Error.");
|
|
|
- throw new StorageContainerException(e, CONTAINER_INTERNAL_ERROR);
|
|
|
+ return null;
|
|
|
+ });
|
|
|
|
|
|
- }
|
|
|
- }
|
|
|
- if (file != null) {
|
|
|
- try {
|
|
|
- if (sync) {
|
|
|
- // ensure data and metadata is persisted. Outside the lock
|
|
|
- file.force(true);
|
|
|
- }
|
|
|
- file.close();
|
|
|
- } catch (IOException e) {
|
|
|
- throw new StorageContainerException("Error closing chunk file",
|
|
|
- e, CONTAINER_INTERNAL_ERROR);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
log.debug("Write Chunk completed for chunkFile: {}, size {}", chunkFile,
|
|
|
bufferSize);
|
|
|
}
|
|
@@ -146,11 +136,8 @@ public final class ChunkUtils {
|
|
|
*
|
|
|
* @param chunkFile - file where data lives.
|
|
|
* @param data - chunk definition.
|
|
|
- * @param volumeIOStats
|
|
|
+ * @param volumeIOStats statistics collector
|
|
|
* @return ByteBuffer
|
|
|
- * @throws StorageContainerException
|
|
|
- * @throws ExecutionException
|
|
|
- * @throws InterruptedException
|
|
|
*/
|
|
|
public static ByteBuffer readData(File chunkFile, ChunkInfo data,
|
|
|
VolumeIOStats volumeIOStats) throws StorageContainerException,
|
|
@@ -165,38 +152,36 @@ public final class ChunkUtils {
|
|
|
data.toString(), UNABLE_TO_FIND_CHUNK);
|
|
|
}
|
|
|
|
|
|
- AsynchronousFileChannel file = null;
|
|
|
- FileLock lock = null;
|
|
|
- try {
|
|
|
- long readStartTime = Time.monotonicNow();
|
|
|
- file =
|
|
|
- AsynchronousFileChannel.open(chunkFile.toPath(),
|
|
|
- StandardOpenOption.READ);
|
|
|
- lock = file.lock(data.getOffset(), data.getLen(), true).get();
|
|
|
-
|
|
|
- ByteBuffer buf = ByteBuffer.allocate((int) data.getLen());
|
|
|
- file.read(buf, data.getOffset()).get();
|
|
|
-
|
|
|
- // Increment volumeIO stats here.
|
|
|
- volumeIOStats.incReadTime(Time.monotonicNow() - readStartTime);
|
|
|
- volumeIOStats.incReadOpCount();
|
|
|
- volumeIOStats.incReadBytes(data.getLen());
|
|
|
-
|
|
|
- return buf;
|
|
|
- } catch (IOException e) {
|
|
|
- throw new StorageContainerException(e, IO_EXCEPTION);
|
|
|
- } finally {
|
|
|
- if (lock != null) {
|
|
|
- try {
|
|
|
- lock.release();
|
|
|
- } catch (IOException e) {
|
|
|
- log.error("I/O error is lock release.");
|
|
|
+ long offset = data.getOffset();
|
|
|
+ long len = data.getLen();
|
|
|
+ ByteBuffer buf = ByteBuffer.allocate((int) len);
|
|
|
+
|
|
|
+ Path path = chunkFile.toPath();
|
|
|
+ long startTime = Time.monotonicNow();
|
|
|
+ return processFileExclusively(path, () -> {
|
|
|
+ FileChannel file = null;
|
|
|
+
|
|
|
+ try {
|
|
|
+ file = FileChannel.open(path, StandardOpenOption.READ);
|
|
|
+
|
|
|
+ try (FileLock ignored = file.lock(offset, len, true)) {
|
|
|
+ file.read(buf, offset);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Increment volumeIO stats here.
|
|
|
+ volumeIOStats.incReadTime(Time.monotonicNow() - startTime);
|
|
|
+ volumeIOStats.incReadOpCount();
|
|
|
+ volumeIOStats.incReadBytes(len);
|
|
|
+
|
|
|
+ return buf;
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw new StorageContainerException(e, IO_EXCEPTION);
|
|
|
+ } finally {
|
|
|
+ if (file != null) {
|
|
|
+ IOUtils.closeStream(file);
|
|
|
}
|
|
|
}
|
|
|
- if (file != null) {
|
|
|
- IOUtils.closeStream(file);
|
|
|
- }
|
|
|
- }
|
|
|
+ });
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -326,4 +311,37 @@ public final class ChunkUtils {
|
|
|
builder.setReadChunk(response);
|
|
|
return builder.build();
|
|
|
}
|
|
|
+
|
|
|
+ @VisibleForTesting
|
|
|
+ static <T, E extends Exception> T processFileExclusively(
|
|
|
+ Path path, CheckedSupplier<T, E> op
|
|
|
+ ) throws E {
|
|
|
+ for (;;) {
|
|
|
+ if (LOCKS.add(path)) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ try {
|
|
|
+ return op.get();
|
|
|
+ } finally {
|
|
|
+ LOCKS.remove(path);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private static void closeFile(FileChannel file, boolean sync)
|
|
|
+ throws StorageContainerException {
|
|
|
+ if (file != null) {
|
|
|
+ try {
|
|
|
+ if (sync) {
|
|
|
+ // ensure data and metadata is persisted
|
|
|
+ file.force(true);
|
|
|
+ }
|
|
|
+ file.close();
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw new StorageContainerException("Error closing chunk file",
|
|
|
+ e, CONTAINER_INTERNAL_ERROR);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|