|
@@ -0,0 +1,188 @@
|
|
|
+/**
|
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one
|
|
|
+ * or more contributor license agreements. See the NOTICE file
|
|
|
+ * distributed with this work for additional information
|
|
|
+ * regarding copyright ownership. The ASF licenses this file
|
|
|
+ * to you under the Apache License, Version 2.0 (the
|
|
|
+ * "License"); you may not use this file except in compliance
|
|
|
+ * with the License. You may obtain a copy of the License at
|
|
|
+ *
|
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
|
+ *
|
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
+ * See the License for the specific language governing permissions and
|
|
|
+ * limitations under the License.
|
|
|
+ */
|
|
|
+
|
|
|
+package org.apache.hadoop.fs.shell;
|
|
|
+
|
|
|
+import static org.junit.Assert.*;
|
|
|
+import static org.mockito.Mockito.*;
|
|
|
+
|
|
|
+import java.io.IOException;
|
|
|
+import java.io.InputStream;
|
|
|
+import java.io.InterruptedIOException;
|
|
|
+import java.net.URI;
|
|
|
+
|
|
|
+import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.fs.FSDataInputStream;
|
|
|
+import org.apache.hadoop.fs.FSDataOutputStream;
|
|
|
+import org.apache.hadoop.fs.FSInputStream;
|
|
|
+import org.apache.hadoop.fs.FileStatus;
|
|
|
+import org.apache.hadoop.fs.FileSystem;
|
|
|
+import org.apache.hadoop.fs.FilterFileSystem;
|
|
|
+import org.apache.hadoop.fs.Path;
|
|
|
+import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
+import org.apache.hadoop.fs.shell.CopyCommands.Put;
|
|
|
+import org.apache.hadoop.util.Progressable;
|
|
|
+import org.junit.Before;
|
|
|
+import org.junit.BeforeClass;
|
|
|
+import org.junit.Test;
|
|
|
+import org.mockito.stubbing.OngoingStubbing;
|
|
|
+
|
|
|
+public class TestCopy {
|
|
|
+ static Configuration conf;
|
|
|
+ static Path path = new Path("mockfs:/file");
|
|
|
+ static Path tmpPath = new Path("mockfs:/file._COPYING_");
|
|
|
+ static Put cmd;
|
|
|
+ static FileSystem mockFs;
|
|
|
+ static PathData target;
|
|
|
+ static FileStatus fileStat;
|
|
|
+
|
|
|
+ @BeforeClass
|
|
|
+ public static void setup() throws IOException {
|
|
|
+ conf = new Configuration();
|
|
|
+ conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
|
|
|
+ mockFs = mock(FileSystem.class);
|
|
|
+ fileStat = mock(FileStatus.class);
|
|
|
+ when(fileStat.isDirectory()).thenReturn(false);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Before
|
|
|
+ public void resetMock() throws IOException {
|
|
|
+ reset(mockFs);
|
|
|
+ target = new PathData(path.toString(), conf);
|
|
|
+ cmd = new CopyCommands.Put();
|
|
|
+ cmd.setConf(conf);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testCopyStreamTarget() throws Exception {
|
|
|
+ FSDataOutputStream out = mock(FSDataOutputStream.class);
|
|
|
+ whenFsCreate().thenReturn(out);
|
|
|
+ when(mockFs.getFileStatus(eq(tmpPath))).thenReturn(fileStat);
|
|
|
+ when(mockFs.rename(eq(tmpPath), eq(path))).thenReturn(true);
|
|
|
+ FSInputStream in = mock(FSInputStream.class);
|
|
|
+ when(in.read(any(byte[].class), anyInt(), anyInt())).thenReturn(-1);
|
|
|
+
|
|
|
+ tryCopyStream(in, true);
|
|
|
+ verify(mockFs, never()).delete(eq(path), anyBoolean());
|
|
|
+ verify(mockFs).rename(eq(tmpPath), eq(path));
|
|
|
+ verify(mockFs, never()).delete(eq(tmpPath), anyBoolean());
|
|
|
+ verify(mockFs, never()).close();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testCopyStreamTargetExists() throws Exception {
|
|
|
+ FSDataOutputStream out = mock(FSDataOutputStream.class);
|
|
|
+ whenFsCreate().thenReturn(out);
|
|
|
+ when(mockFs.getFileStatus(eq(path))).thenReturn(fileStat);
|
|
|
+ target.refreshStatus(); // so it's updated as existing
|
|
|
+ cmd.setOverwrite(true);
|
|
|
+ when(mockFs.getFileStatus(eq(tmpPath))).thenReturn(fileStat);
|
|
|
+ when(mockFs.delete(eq(path), eq(false))).thenReturn(true);
|
|
|
+ when(mockFs.rename(eq(tmpPath), eq(path))).thenReturn(true);
|
|
|
+ FSInputStream in = mock(FSInputStream.class);
|
|
|
+ when(in.read(any(byte[].class), anyInt(), anyInt())).thenReturn(-1);
|
|
|
+
|
|
|
+ tryCopyStream(in, true);
|
|
|
+ verify(mockFs).delete(eq(path), anyBoolean());
|
|
|
+ verify(mockFs).rename(eq(tmpPath), eq(path));
|
|
|
+ verify(mockFs, never()).delete(eq(tmpPath), anyBoolean());
|
|
|
+ verify(mockFs, never()).close();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testInterruptedCreate() throws Exception {
|
|
|
+ whenFsCreate().thenThrow(new InterruptedIOException());
|
|
|
+ when(mockFs.getFileStatus(eq(tmpPath))).thenReturn(fileStat);
|
|
|
+ FSDataInputStream in = mock(FSDataInputStream.class);
|
|
|
+
|
|
|
+ tryCopyStream(in, false);
|
|
|
+ verify(mockFs).delete(eq(tmpPath), anyBoolean());
|
|
|
+ verify(mockFs, never()).rename(any(Path.class), any(Path.class));
|
|
|
+ verify(mockFs, never()).delete(eq(path), anyBoolean());
|
|
|
+ verify(mockFs, never()).close();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testInterruptedCopyBytes() throws Exception {
|
|
|
+ FSDataOutputStream out = mock(FSDataOutputStream.class);
|
|
|
+ whenFsCreate().thenReturn(out);
|
|
|
+ when(mockFs.getFileStatus(eq(tmpPath))).thenReturn(fileStat);
|
|
|
+ FSInputStream in = mock(FSInputStream.class);
|
|
|
+ // make IOUtils.copyBytes fail
|
|
|
+ when(in.read(any(byte[].class), anyInt(), anyInt())).thenThrow(
|
|
|
+ new InterruptedIOException());
|
|
|
+
|
|
|
+ tryCopyStream(in, false);
|
|
|
+ verify(mockFs).delete(eq(tmpPath), anyBoolean());
|
|
|
+ verify(mockFs, never()).rename(any(Path.class), any(Path.class));
|
|
|
+ verify(mockFs, never()).delete(eq(path), anyBoolean());
|
|
|
+ verify(mockFs, never()).close();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testInterruptedRename() throws Exception {
|
|
|
+ FSDataOutputStream out = mock(FSDataOutputStream.class);
|
|
|
+ whenFsCreate().thenReturn(out);
|
|
|
+ when(mockFs.getFileStatus(eq(tmpPath))).thenReturn(fileStat);
|
|
|
+ when(mockFs.rename(eq(tmpPath), eq(path))).thenThrow(
|
|
|
+ new InterruptedIOException());
|
|
|
+ FSInputStream in = mock(FSInputStream.class);
|
|
|
+ when(in.read(any(byte[].class), anyInt(), anyInt())).thenReturn(-1);
|
|
|
+
|
|
|
+ tryCopyStream(in, false);
|
|
|
+ verify(mockFs).delete(eq(tmpPath), anyBoolean());
|
|
|
+ verify(mockFs).rename(eq(tmpPath), eq(path));
|
|
|
+ verify(mockFs, never()).delete(eq(path), anyBoolean());
|
|
|
+ verify(mockFs, never()).close();
|
|
|
+ }
|
|
|
+
|
|
|
+ private OngoingStubbing<FSDataOutputStream> whenFsCreate() throws IOException {
|
|
|
+ return when(mockFs.create(eq(tmpPath), any(FsPermission.class),
|
|
|
+ anyBoolean(), anyInt(), anyShort(), anyLong(),
|
|
|
+ any(Progressable.class)));
|
|
|
+ }
|
|
|
+
|
|
|
+ private void tryCopyStream(InputStream in, boolean shouldPass) {
|
|
|
+ try {
|
|
|
+ cmd.copyStreamToTarget(new FSDataInputStream(in), target);
|
|
|
+ } catch (InterruptedIOException e) {
|
|
|
+ assertFalse("copy failed", shouldPass);
|
|
|
+ } catch (Throwable e) {
|
|
|
+ assertFalse(e.getMessage(), shouldPass);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ static class MockFileSystem extends FilterFileSystem {
|
|
|
+ Configuration conf;
|
|
|
+ MockFileSystem() {
|
|
|
+ super(mockFs);
|
|
|
+ }
|
|
|
+ @Override
|
|
|
+ public void initialize(URI uri, Configuration conf) {
|
|
|
+ this.conf = conf;
|
|
|
+ }
|
|
|
+ @Override
|
|
|
+ public Path makeQualified(Path path) {
|
|
|
+ return path;
|
|
|
+ }
|
|
|
+ @Override
|
|
|
+ public Configuration getConf() {
|
|
|
+ return conf;
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|