|
@@ -22,6 +22,8 @@ import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ER
|
|
|
import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR_INVALID;
|
|
|
import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR_UNSUPPORTED;
|
|
|
import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.SUCCESS;
|
|
|
+import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitFdResponse.USE_RECEIPT_VERIFICATION;
|
|
|
+import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitFdResponse.DO_NOT_USE_RECEIPT_VERIFICATION;
|
|
|
import static org.apache.hadoop.hdfs.server.datanode.DataNode.DN_CLIENTTRACE_FORMAT;
|
|
|
import static org.apache.hadoop.util.Time.now;
|
|
|
|
|
@@ -291,64 +293,83 @@ class DataXceiver extends Receiver implements Runnable {
|
|
|
@Override
|
|
|
public void requestShortCircuitFds(final ExtendedBlock blk,
|
|
|
final Token<BlockTokenIdentifier> token,
|
|
|
- SlotId slotId, int maxVersion) throws IOException {
|
|
|
+ SlotId slotId, int maxVersion, boolean supportsReceiptVerification)
|
|
|
+ throws IOException {
|
|
|
updateCurrentThreadName("Passing file descriptors for block " + blk);
|
|
|
BlockOpResponseProto.Builder bld = BlockOpResponseProto.newBuilder();
|
|
|
FileInputStream fis[] = null;
|
|
|
+ SlotId registeredSlotId = null;
|
|
|
+ boolean success = false;
|
|
|
try {
|
|
|
- if (peer.getDomainSocket() == null) {
|
|
|
- throw new IOException("You cannot pass file descriptors over " +
|
|
|
- "anything but a UNIX domain socket.");
|
|
|
- }
|
|
|
- if (slotId != null) {
|
|
|
- boolean isCached = datanode.data.
|
|
|
- isCached(blk.getBlockPoolId(), blk.getBlockId());
|
|
|
- datanode.shortCircuitRegistry.registerSlot(
|
|
|
- ExtendedBlockId.fromExtendedBlock(blk), slotId, isCached);
|
|
|
- }
|
|
|
try {
|
|
|
- fis = datanode.requestShortCircuitFdsForRead(blk, token, maxVersion);
|
|
|
- } finally {
|
|
|
- if ((fis == null) && (slotId != null)) {
|
|
|
- datanode.shortCircuitRegistry.unregisterSlot(slotId);
|
|
|
+ if (peer.getDomainSocket() == null) {
|
|
|
+ throw new IOException("You cannot pass file descriptors over " +
|
|
|
+ "anything but a UNIX domain socket.");
|
|
|
}
|
|
|
+ if (slotId != null) {
|
|
|
+ boolean isCached = datanode.data.
|
|
|
+ isCached(blk.getBlockPoolId(), blk.getBlockId());
|
|
|
+ datanode.shortCircuitRegistry.registerSlot(
|
|
|
+ ExtendedBlockId.fromExtendedBlock(blk), slotId, isCached);
|
|
|
+ registeredSlotId = slotId;
|
|
|
+ }
|
|
|
+ fis = datanode.requestShortCircuitFdsForRead(blk, token, maxVersion);
|
|
|
+ Preconditions.checkState(fis != null);
|
|
|
+ bld.setStatus(SUCCESS);
|
|
|
+ bld.setShortCircuitAccessVersion(DataNode.CURRENT_BLOCK_FORMAT_VERSION);
|
|
|
+ } catch (ShortCircuitFdsVersionException e) {
|
|
|
+ bld.setStatus(ERROR_UNSUPPORTED);
|
|
|
+ bld.setShortCircuitAccessVersion(DataNode.CURRENT_BLOCK_FORMAT_VERSION);
|
|
|
+ bld.setMessage(e.getMessage());
|
|
|
+ } catch (ShortCircuitFdsUnsupportedException e) {
|
|
|
+ bld.setStatus(ERROR_UNSUPPORTED);
|
|
|
+ bld.setMessage(e.getMessage());
|
|
|
+ } catch (InvalidToken e) {
|
|
|
+ bld.setStatus(ERROR_ACCESS_TOKEN);
|
|
|
+ bld.setMessage(e.getMessage());
|
|
|
+ } catch (IOException e) {
|
|
|
+ bld.setStatus(ERROR);
|
|
|
+ bld.setMessage(e.getMessage());
|
|
|
}
|
|
|
- bld.setStatus(SUCCESS);
|
|
|
- bld.setShortCircuitAccessVersion(DataNode.CURRENT_BLOCK_FORMAT_VERSION);
|
|
|
- } catch (ShortCircuitFdsVersionException e) {
|
|
|
- bld.setStatus(ERROR_UNSUPPORTED);
|
|
|
- bld.setShortCircuitAccessVersion(DataNode.CURRENT_BLOCK_FORMAT_VERSION);
|
|
|
- bld.setMessage(e.getMessage());
|
|
|
- } catch (ShortCircuitFdsUnsupportedException e) {
|
|
|
- bld.setStatus(ERROR_UNSUPPORTED);
|
|
|
- bld.setMessage(e.getMessage());
|
|
|
- } catch (InvalidToken e) {
|
|
|
- bld.setStatus(ERROR_ACCESS_TOKEN);
|
|
|
- bld.setMessage(e.getMessage());
|
|
|
- } catch (IOException e) {
|
|
|
- bld.setStatus(ERROR);
|
|
|
- bld.setMessage(e.getMessage());
|
|
|
- }
|
|
|
- try {
|
|
|
bld.build().writeDelimitedTo(socketOut);
|
|
|
if (fis != null) {
|
|
|
FileDescriptor fds[] = new FileDescriptor[fis.length];
|
|
|
for (int i = 0; i < fds.length; i++) {
|
|
|
fds[i] = fis[i].getFD();
|
|
|
}
|
|
|
- byte buf[] = new byte[] { (byte)0 };
|
|
|
- peer.getDomainSocket().
|
|
|
- sendFileDescriptors(fds, buf, 0, buf.length);
|
|
|
+ byte buf[] = new byte[1];
|
|
|
+ if (supportsReceiptVerification) {
|
|
|
+ buf[0] = (byte)USE_RECEIPT_VERIFICATION.getNumber();
|
|
|
+ } else {
|
|
|
+ buf[0] = (byte)DO_NOT_USE_RECEIPT_VERIFICATION.getNumber();
|
|
|
+ }
|
|
|
+ DomainSocket sock = peer.getDomainSocket();
|
|
|
+ sock.sendFileDescriptors(fds, buf, 0, buf.length);
|
|
|
+ if (supportsReceiptVerification) {
|
|
|
+ LOG.trace("Reading receipt verification byte for " + slotId);
|
|
|
+ int val = sock.getInputStream().read();
|
|
|
+ if (val < 0) {
|
|
|
+ throw new EOFException();
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ LOG.trace("Receipt verification is not enabled on the DataNode. " +
|
|
|
+ "Not verifying " + slotId);
|
|
|
+ }
|
|
|
+ success = true;
|
|
|
}
|
|
|
} finally {
|
|
|
+ if ((!success) && (registeredSlotId != null)) {
|
|
|
+ LOG.info("Unregistering " + registeredSlotId + " because the " +
|
|
|
+ "requestShortCircuitFdsForRead operation failed.");
|
|
|
+ datanode.shortCircuitRegistry.unregisterSlot(registeredSlotId);
|
|
|
+ }
|
|
|
if (ClientTraceLog.isInfoEnabled()) {
|
|
|
DatanodeRegistration dnR = datanode.getDNRegistrationForBP(blk
|
|
|
.getBlockPoolId());
|
|
|
BlockSender.ClientTraceLog.info(String.format(
|
|
|
"src: 127.0.0.1, dest: 127.0.0.1, op: REQUEST_SHORT_CIRCUIT_FDS," +
|
|
|
" blockid: %s, srvID: %s, success: %b",
|
|
|
- blk.getBlockId(), dnR.getDatanodeUuid(), (fis != null)
|
|
|
- ));
|
|
|
+ blk.getBlockId(), dnR.getDatanodeUuid(), success));
|
|
|
}
|
|
|
if (fis != null) {
|
|
|
IOUtils.cleanup(LOG, fis);
|