Tuesday, June 18, 2013

ChainMapper


ChainMapper allows to create multiple Mapper classes in single Map task. Where output of one mapper will be chained to next mapper. This can be useful for synchronization such as barrier in OpenCl.

This enables having reusable specialized Mappers that can be combined to perform composite operations within a single task.

Special care has to be taken when creating chains that the key/values output by a Mapper are valid for the following Mapper in the chain. It is assumed all Mappers and the Reduce in the chain use maching output and input key and value classes as no conversion is done by the chaining code.

Using the ChainMapper and the ChainReducer classes is possible to compose Map/Reduce jobs that look like [MAP+ / REDUCE MAP*]. And immediate benefit of this pattern is a dramatic reduction in disk IO.

IMPORTANT: There is no need to specify the output key/value classes for the ChainMapper, this is done by the addMapper for the last mapper in the chain

Now will see how to use ChainMapper in over example to convert input to UpperCase and count the number of occurrence of each word in input.
Sample input:

sample.txt
The ChainMapper class allows to use multiple Mapper
classes within a single Map task.
The Mapper classes are invoked in a chained ( or piped ) fashion,
the output of the first becomes the input of the second,
and so on until the last Mapper,
the output of the last Mapper will be written to the task's output.
TokenizerMapper.java
public class TokenizerMapper extends Mapper< LongWritable, Text, Text, LongWritable > {
    private Text word = new Text();
    private LongWritable count = new LongWritable(1l);

    @Override
    public void map (LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] tokens = value.toString().split(" ");
        for (String token : tokens ) {
            if(token.isEmpty())
                return;
            word.set(token);
            context.write(word, count);
        }
    }
}

UppercaseMapper.java
public class UppercaseMapper extends Mapper< Text, LongWritable, Text, LongWritable > {

    @Override
    public void map (Text word, LongWritable count, Context context) throws IOException, InterruptedException {
        String uppercaseString = word.toString().toUpperCase();
        word.set(uppercaseString);
        context.write(word, count);
    }
}

ChainMapperDriver.java
public class ChainMapperDriver extends Configured implements Tool { 

    static int printUsageAndExit() {
        ToolRunner.printGenericCommandUsage(System.out);
        return -1;
    }

    @Override
    public int run(String[] args) throws Exception {

        if (args.length != 2) {
            printUsageAndExit();
        }

        Job job = Job.getInstance(getConf());
        job.setJobName("Word_ToUppercase_Count");

        Path inPath = new Path(args[0]);
        Path outPath = new Path(args[1]);
        FileSystem fileSystem = FileSystem.get(outPath.toUri(), getConf());
        fileSystem.delete(outPath, true);

        TextInputFormat.setInputPaths(job, inPath);
        TextOutputFormat.setOutputPath(job, outPath);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        Configuration tokenizerMapperConfig = new Configuration(false);
        ChainMapper.addMapper(job, TokenizerMapper.class,
                LongWritable.class, Text.class,
                Text.class, LongWritable.class,
                tokenizerMapperConfig);

        Configuration uppercaseMapperConfig = new Configuration(false);
        ChainMapper.addMapper(job, UppercaseMapper.class,
                Text.class, LongWritable.class,
                Text.class, LongWritable.class,
                uppercaseMapperConfig);

        job.setReducerClass((new LongSumReducer< Text >()).getClass());

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        System.exit(ToolRunner.run(new ChainMapperDriver(), args));
    }
}
Output:
(    1
)    1
A    2
ALLOWS    1
AND    1
ARE    1
BE    1
BECOMES    1
CHAINED    1
CHAINMAPPER    1
CLASS    1
CLASSES    2
FASHION,    1
FIRST    1
IN    1
INPUT    1
INVOKED    1
LAST    2
MAP    1
MAPPER    3
MAPPER,    1
MULTIPLE    1
OF    3
ON    1
OR    1
OUTPUT    2
OUTPUT.    1
PIPED    1
SECOND,    1
SINGLE    1
SO    1
TASK'S    1
TASK.    1
THE    10
TO    2
UNTIL    1
USE    1
WILL    1
WITHIN    1
WRITTEN    1

No comments:

Post a Comment