|
@@ -34,6 +34,8 @@ import java.util.concurrent.ExecutorService;
|
|
|
import java.util.concurrent.Executors;
|
|
|
import java.util.concurrent.Future;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
+import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
+import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
|
|
import org.junit.AfterClass;
|
|
|
import org.junit.Assert;
|
|
@@ -41,7 +43,6 @@ import org.junit.Assume;
|
|
|
import org.junit.Before;
|
|
|
import org.junit.BeforeClass;
|
|
|
import org.junit.Test;
|
|
|
-
|
|
|
import org.apache.commons.lang.exception.ExceptionUtils;
|
|
|
import org.apache.hadoop.io.IOUtils;
|
|
|
import org.apache.hadoop.net.unix.DomainSocket.DomainChannel;
|
|
@@ -727,4 +728,38 @@ public class TestDomainSocket {
|
|
|
tmp.close();
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ @Test(timeout=180000)
|
|
|
+ public void testShutdown() throws Exception {
|
|
|
+ final AtomicInteger bytesRead = new AtomicInteger(0);
|
|
|
+ final AtomicBoolean failed = new AtomicBoolean(false);
|
|
|
+ final DomainSocket[] socks = DomainSocket.socketpair();
|
|
|
+ Runnable reader = new Runnable() {
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
+ while (true) {
|
|
|
+ try {
|
|
|
+ int ret = socks[1].getInputStream().read();
|
|
|
+ if (ret == -1) return;
|
|
|
+ bytesRead.addAndGet(1);
|
|
|
+ } catch (IOException e) {
|
|
|
+ DomainSocket.LOG.error("reader error", e);
|
|
|
+ failed.set(true);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ };
|
|
|
+ Thread readerThread = new Thread(reader);
|
|
|
+ readerThread.start();
|
|
|
+ socks[0].getOutputStream().write(1);
|
|
|
+ socks[0].getOutputStream().write(2);
|
|
|
+ socks[0].getOutputStream().write(3);
|
|
|
+ Assert.assertTrue(readerThread.isAlive());
|
|
|
+ socks[0].shutdown();
|
|
|
+ readerThread.join();
|
|
|
+ Assert.assertFalse(failed.get());
|
|
|
+ Assert.assertEquals(3, bytesRead.get());
|
|
|
+ IOUtils.cleanup(null, socks);
|
|
|
+ }
|
|
|
}
|