The ChainMapper class allows to use multiple Mapper classes within a single Map task.
The ChainReducer class allows to chain multiple Mapper classes after a Reducer within the Reducer task.
通过ChainMapper可以将多个map类合并成一个map任务。
下面个这个例子没什么实际意思,但是很好的演示了ChainMapper的作用。
源文件
100 tom 90101 mary 85102 kate 60map00的结果,过滤掉100的记录
101 mary 85102 kate 60map01的结果,过滤掉101的记录
102 kate 60reduce结果
102 kate 60package org.myorg;
import java.io.IOException; import java.util. * ; import java.lang.String; import org.apache.hadoop.fs.Path; import org.apache.hadoop.conf. * ; import org.apache.hadoop.io. * ; import org.apache.hadoop.mapred. * ; import org.apache.hadoop.util. * ; import org.apache.hadoop.mapred.lib. * ; public class WordCount{ public static class Map00 extends MapReduceBase implements Mapper { public void map(Text key, Text value, OutputCollector output, Reporter reporter) throws IOException { Text ft = new Text(“ 100 ″); if ( ! key.equals(ft)) { output.collect(key, value); } } } public static class Map01 extends MapReduceBase implements Mapper { public void map(Text key, Text value, OutputCollector output, Reporter reporter) throws IOException { Text ft = new Text(“ 101 ″); if ( ! key.equals(ft)) { output.collect(key, value); } } } public static class Reduce extends MapReduceBase implements Reducer { public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { while (values.hasNext()) { output.collect(key, values.next()); } } } public static void main(String[] args) throws Exception { JobConf conf = new JobConf(WordCount. class ); conf.setJobName(“wordcount00″); conf.setInputFormat(KeyValueTextInputFormat. class ); conf.setOutputFormat(TextOutputFormat. class ); ChainMapper cm = new ChainMapper(); JobConf mapAConf = new JobConf( false ); cm.addMapper(conf, Map00. class , Text. class , Text. class , Text. class , Text. class , true , mapAConf); JobConf mapBConf = new JobConf( false ); cm.addMapper(conf, Map01. class , Text. class , Text. class , Text. class , Text. class , true , mapBConf); conf.setReducerClass(Reduce. class ); conf00.setOutputKeyClass(Text. class ); conf00.setOutputValueClass(Text. class ); FileInputFormat.setInputPaths(conf, new Path(args[ 0 ])); FileOutputFormat.setOutputPath(conf, new Path(args[ 1 ])); JobClient.runJob(conf); }}
另外一个例子,代码很多,其实很简单,Conn几个类都是相同的
http://yixiaohuamax.iteye.com/blog/684244
package com.oncedq.code; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.text.SimpleDateFormat; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.TextOutputFormat; import org.apache.hadoop.mapred.jobcontrol.Job; import org.apache.hadoop.mapred.jobcontrol.JobControl; import org.apache.hadoop.mapred.lib.ChainMapper; import com.oncedq.code.util.DateUtil; public class ProcessSample { public static class ExtractMappper extends MapReduceBase implements Mapper < LongWritable, Text, LongWritable, Conn1 > { @Override public void map(LongWritable arg0, Text arg1, OutputCollector < LongWritable, Conn1 > arg2, Reporter arg3) throws IOException { String line = arg1.toString(); String[] strs = line.split( " ; " ); Conn1 conn1 = new Conn1(); conn1.orderKey = Long.parseLong(strs[ 0 ]); conn1.customer = Long.parseLong(strs[ 1 ]); conn1.state = strs[ 2 ]; conn1.price = Double.parseDouble(strs[ 3 ]); conn1.orderDate = DateUtil.getDateFromString(strs[ 4 ], " yyyy-MM-dd " ); LongWritable lw = new LongWritable(conn1.orderKey); arg2.collect(lw, conn1); } } private static class Conn1 implements WritableComparable < Conn1 > { public long orderKey; public long customer; public String state; public double price; public java.util.Date orderDate; @Override public void readFields(DataInput in) throws IOException { orderKey = in.readLong(); customer = in.readLong(); state = Text.readString(in); price = in.readDouble(); orderDate = DateUtil.getDateFromString(Text.readString(in), " yyyy-MM-dd " ); } @Override public void write(DataOutput out) throws IOException { out.writeLong(orderKey); out.writeLong(customer); Text.writeString(out, state); out.writeDouble(price); Text.writeString(out, DateUtil.getDateStr(orderDate, " yyyy-MM-dd " )); } @Override public int compareTo(Conn1 arg0) { // TODO Auto-generated method stub return 0 ; } } public static class Filter1Mapper extends MapReduceBase implements Mapper < LongWritable, Conn1, LongWritable, Conn2 > { @Override public void map(LongWritable inKey, Conn1 c2, OutputCollector < LongWritable, Conn2 > collector, Reporter report) throws IOException { if (c2.state.equals( " F " )) { Conn2 inValue = new Conn2(); inValue.customer = c2.customer; inValue.orderDate = c2.orderDate; inValue.orderKey = c2.orderKey; inValue.price = c2.price; inValue.state = c2.state; collector.collect(inKey, inValue); } } } private static class Conn2 implements WritableComparable < Conn1 > { public long orderKey; public long customer; public String state; public double price; public java.util.Date orderDate; @Override public void readFields(DataInput in) throws IOException { orderKey = in.readLong(); customer = in.readLong(); state = Text.readString(in); price = in.readDouble(); orderDate = DateUtil.getDateFromString(Text.readString(in), " yyyy-MM-dd " ); } @Override public void write(DataOutput out) throws IOException { out.writeLong(orderKey); out.writeLong(customer); Text.writeString(out, state); out.writeDouble(price); Text.writeString(out, DateUtil.getDateStr(orderDate, " yyyy-MM-dd " )); } @Override public int compareTo(Conn1 arg0) { // TODO Auto-generated method stub return 0 ; } } public static class RegexMapper extends MapReduceBase implements Mapper < LongWritable, Conn2, LongWritable, Conn3 > { @Override public void map(LongWritable inKey, Conn2 c3, OutputCollector < LongWritable, Conn3 > collector, Reporter report) throws IOException { c3.state = c3.state.replaceAll( " F " , " Find " ); Conn3 c2 = new Conn3(); c2.customer = c3.customer; c2.orderDate = c3.orderDate; c2.orderKey = c3.orderKey; c2.price = c3.price; c2.state = c3.state; collector.collect(inKey, c2); } } private static class Conn3 implements WritableComparable < Conn1 > { public long orderKey; public long customer; public String state; public double price; public java.util.Date orderDate; @Override public void readFields(DataInput in) throws IOException { orderKey = in.readLong(); customer = in.readLong(); state = Text.readString(in); price = in.readDouble(); orderDate = DateUtil.getDateFromString(Text.readString(in), " yyyy-MM-dd " ); } @Override public void write(DataOutput out) throws IOException { out.writeLong(orderKey); out.writeLong(customer); Text.writeString(out, state); out.writeDouble(price); Text.writeString(out, DateUtil.getDateStr(orderDate, " yyyy-MM-dd " )); } @Override public int compareTo(Conn1 arg0) { // TODO Auto-generated method stub return 0 ; } } public static class LoadMapper extends MapReduceBase implements Mapper < LongWritable, Conn3, LongWritable, Conn3 > { @Override public void map(LongWritable arg0, Conn3 arg1, OutputCollector < LongWritable, Conn3 > arg2, Reporter arg3) throws IOException { arg2.collect(arg0, arg1); } } public static void main(String[] args) { JobConf job = new JobConf(ProcessSample. class ); job.setJobName( " ProcessSample " ); job.setNumReduceTasks( 0 ); job.setInputFormat(TextInputFormat. class ); job.setOutputFormat(TextOutputFormat. class ); JobConf mapper1 = new JobConf(); JobConf mapper2 = new JobConf(); JobConf mapper3 = new JobConf(); JobConf mapper4 = new JobConf(); ChainMapper cm = new ChainMapper(); cm.addMapper(job, ExtractMappper. class , LongWritable. class , Text. class , LongWritable. class , Conn1. class , true , mapper1); cm.addMapper(job, Filter1Mapper. class , LongWritable. class , Conn1. class , LongWritable. class , Conn2. class , true , mapper2); cm.addMapper(job, RegexMapper. class , LongWritable. class , Conn2. class , LongWritable. class , Conn3. class , true , mapper3); cm.addMapper(job, LoadMapper. class , LongWritable. class , Conn3. class , LongWritable. class , Conn3. class , true , mapper4); FileInputFormat.setInputPaths(job, new Path( " orderData " )); FileOutputFormat.setOutputPath(job, new Path( " orderDataOutput " )); Job job1; try { job1 = new Job(job); JobControl jc = new JobControl( " test " ); jc.addJob(job1); jc.run(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } }