|
@@ -156,10 +156,10 @@ public class TeraInputFormat extends FileInputFormat<Text,Text> {
|
|
|
* them and picks N-1 keys to generate N equally sized partitions.
|
|
|
* @param job the job to sample
|
|
|
* @param partFile where to write the output file to
|
|
|
- * @throws IOException if something goes wrong
|
|
|
+ * @throws Throwable if something goes wrong
|
|
|
*/
|
|
|
public static void writePartitionFile(final JobContext job,
|
|
|
- Path partFile) throws IOException, InterruptedException {
|
|
|
+ Path partFile) throws Throwable {
|
|
|
long t1 = System.currentTimeMillis();
|
|
|
Configuration conf = job.getConfiguration();
|
|
|
final TeraInputFormat inFormat = new TeraInputFormat();
|
|
@@ -174,11 +174,12 @@ public class TeraInputFormat extends FileInputFormat<Text,Text> {
|
|
|
final long recordsPerSample = sampleSize / samples;
|
|
|
final int sampleStep = splits.size() / samples;
|
|
|
Thread[] samplerReader = new Thread[samples];
|
|
|
+ SamplerThreadGroup threadGroup = new SamplerThreadGroup("Sampler Reader Thread Group");
|
|
|
// take N samples from different parts of the input
|
|
|
for(int i=0; i < samples; ++i) {
|
|
|
final int idx = i;
|
|
|
samplerReader[i] =
|
|
|
- new Thread ("Sampler Reader " + idx) {
|
|
|
+ new Thread (threadGroup,"Sampler Reader " + idx) {
|
|
|
{
|
|
|
setDaemon(true);
|
|
|
}
|
|
@@ -201,7 +202,7 @@ public class TeraInputFormat extends FileInputFormat<Text,Text> {
|
|
|
} catch (IOException ie){
|
|
|
System.err.println("Got an exception while reading splits " +
|
|
|
StringUtils.stringifyException(ie));
|
|
|
- System.exit(-1);
|
|
|
+ throw new RuntimeException(ie);
|
|
|
} catch (InterruptedException e) {
|
|
|
|
|
|
}
|
|
@@ -215,6 +216,9 @@ public class TeraInputFormat extends FileInputFormat<Text,Text> {
|
|
|
for (int i = 0; i < samples; i++) {
|
|
|
try {
|
|
|
samplerReader[i].join();
|
|
|
+ if(threadGroup.getThrowable() != null){
|
|
|
+ throw threadGroup.getThrowable();
|
|
|
+ }
|
|
|
} catch (InterruptedException e) {
|
|
|
}
|
|
|
}
|
|
@@ -225,6 +229,25 @@ public class TeraInputFormat extends FileInputFormat<Text,Text> {
|
|
|
long t3 = System.currentTimeMillis();
|
|
|
System.out.println("Computing parititions took " + (t3 - t2) + "ms");
|
|
|
}
|
|
|
+
|
|
|
+ static class SamplerThreadGroup extends ThreadGroup{
|
|
|
+
|
|
|
+ private Throwable throwable;
|
|
|
+
|
|
|
+ public SamplerThreadGroup(String s) {
|
|
|
+ super(s);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void uncaughtException(Thread thread, Throwable throwable) {
|
|
|
+ this.throwable = throwable;
|
|
|
+ }
|
|
|
+
|
|
|
+ public Throwable getThrowable() {
|
|
|
+ return this.throwable;
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
|
|
|
static class TeraRecordReader extends RecordReader<Text,Text> {
|
|
|
private FSDataInputStream in;
|