博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
ChainMapper和ChainReducer
阅读量:5140 次
发布时间:2019-06-13

本文共 8989 字,大约阅读时间需要 29 分钟。

The ChainMapper class allows to use multiple Mapper classes within a single Map task. 

The ChainReducer class allows to chain multiple Mapper classes after a Reducer within the Reducer task.

 

 

 通过ChainMapper可以将多个map类合并成一个map任务。

下面个这个例子没什么实际意思,但是很好的演示了ChainMapper的作用。

源文件

100 tom 90
101 mary 85
102 kate 60

map00的结果,过滤掉100的记录

101 mary 85
102 kate 60

map01的结果,过滤掉101的记录

102 kate 60

reduce结果

102 kate 60

 package org.myorg;

import
 java.io.IOException;
import
 java.util.
*
;
import
 java.lang.String;
import
 org.apache.hadoop.fs.Path;
import
 org.apache.hadoop.conf.
*
;
import
 org.apache.hadoop.io.
*
;
import
 org.apache.hadoop.mapred.
*
;
import
 org.apache.hadoop.util.
*
;
import
 org.apache.hadoop.mapred.lib.
*
;
public
 
class
 WordCount
{
    
public
 
static
 
class
 Map00 
extends
 MapReduceBase 
implements
 Mapper
    {
        
public
 
void
 map(Text key, Text value, OutputCollector output, Reporter reporter) 
throws
 IOException
        {
            Text ft 
=
 
new
 Text(“
100
″);
            
if
(
!
key.equals(ft))
            {
                output.collect(key, value);
            }
        }
    }
    
public
 
static
 
class
 Map01 
extends
 MapReduceBase 
implements
 Mapper
    {
        
public
 
void
 map(Text key, Text value, OutputCollector output, Reporter reporter) 
throws
 IOException
        {
            Text ft 
=
 
new
 Text(“
101
″);
            
if
(
!
key.equals(ft))
            {
                output.collect(key, value);
            }
        }
    }
    
public
 
static
 
class
 Reduce 
extends
 MapReduceBase 
implements
 Reducer
    {
        
public
 
void
 reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) 
throws
 IOException
        {
            
while
(values.hasNext())
            {
                output.collect(key, values.next());
            }
        }
    }
    
public
 
static
 
void
 main(String[] args) 
throws
 Exception
    {
        JobConf conf 
=
 
new
 JobConf(WordCount.
class
);
        conf.setJobName(“wordcount00″);
        conf.setInputFormat(KeyValueTextInputFormat.
class
);
        conf.setOutputFormat(TextOutputFormat.
class
);
        ChainMapper cm 
=
 
new
 ChainMapper();
        JobConf mapAConf 
=
 
new
 JobConf(
false
);
        cm.addMapper(conf, Map00.
class
, Text.
class
, Text.
class
, Text.
class
, Text.
class
true
, mapAConf);
        JobConf mapBConf 
=
 
new
 JobConf(
false
);
        cm.addMapper(conf, Map01.
class
, Text.
class
, Text.
class
, Text.
class
, Text.
class
true
, mapBConf);
        conf.setReducerClass(Reduce.
class
);
        conf00.setOutputKeyClass(Text.
class
);
        conf00.setOutputValueClass(Text.
class
);
        FileInputFormat.setInputPaths(conf, 
new
 Path(args[
0
]));
        FileOutputFormat.setOutputPath(conf, 
new
 Path(args[
1
]));
        JobClient.runJob(conf);
    }
}
 

 

另外一个例子,代码很多,其实很简单,Conn几个类都是相同的

http://yixiaohuamax.iteye.com/blog/684244 

package
 com.oncedq.code;
import
 java.io.DataInput;
import
 java.io.DataOutput;
import
 java.io.IOException;
import
 java.text.SimpleDateFormat;
import
 org.apache.hadoop.fs.Path;
import
 org.apache.hadoop.io.LongWritable;
import
 org.apache.hadoop.io.Text;
import
 org.apache.hadoop.io.WritableComparable;
import
 org.apache.hadoop.mapred.FileInputFormat;
import
 org.apache.hadoop.mapred.FileOutputFormat;
import
 org.apache.hadoop.mapred.JobConf;
import
 org.apache.hadoop.mapred.MapReduceBase;
import
 org.apache.hadoop.mapred.Mapper;
import
 org.apache.hadoop.mapred.OutputCollector;
import
 org.apache.hadoop.mapred.Reporter;
import
 org.apache.hadoop.mapred.TextInputFormat;
import
 org.apache.hadoop.mapred.TextOutputFormat;
import
 org.apache.hadoop.mapred.jobcontrol.Job;
import
 org.apache.hadoop.mapred.jobcontrol.JobControl;
import
 org.apache.hadoop.mapred.lib.ChainMapper;
import
 com.oncedq.code.util.DateUtil;
public
 
class
 ProcessSample {
    
public
 
static
 
class
 ExtractMappper 
extends
 MapReduceBase 
implements
            Mapper
<
LongWritable, Text, LongWritable, Conn1
>
 {
        @Override
        
public
 
void
 map(LongWritable arg0, Text arg1,
                OutputCollector
<
LongWritable, Conn1
>
 arg2, Reporter arg3)
                
throws
 IOException {
            String line 
=
 arg1.toString();
            String[] strs 
=
 line.split(
"
;
"
);
            Conn1 conn1 
=
 
new
 Conn1();
            conn1.orderKey 
=
 Long.parseLong(strs[
0
]);
            conn1.customer 
=
 Long.parseLong(strs[
1
]);
            conn1.state 
=
 strs[
2
];
            conn1.price 
=
 Double.parseDouble(strs[
3
]);
            conn1.orderDate 
=
 DateUtil.getDateFromString(strs[
4
], 
"
yyyy-MM-dd
"
);
            LongWritable lw 
=
 
new
 LongWritable(conn1.orderKey);
            arg2.collect(lw, conn1);
        }
    }
    
private
 
static
 
class
 Conn1 
implements
 WritableComparable
<
Conn1
>
 {
        
public
 
long
 orderKey;
        
public
 
long
 customer;
        
public
 String state;
        
public
 
double
 price;
        
public
 java.util.Date orderDate;
        @Override
        
public
 
void
 readFields(DataInput in) 
throws
 IOException {
            orderKey 
=
 in.readLong();
            customer 
=
 in.readLong();
            state 
=
 Text.readString(in);
            price 
=
 in.readDouble();
            orderDate 
=
 DateUtil.getDateFromString(Text.readString(in),
                    
"
yyyy-MM-dd
"
);
        }
        @Override
        
public
 
void
 write(DataOutput out) 
throws
 IOException {
            out.writeLong(orderKey);
            out.writeLong(customer);
            Text.writeString(out, state);
            out.writeDouble(price);
            Text.writeString(out, DateUtil.getDateStr(orderDate, 
"
yyyy-MM-dd
"
));
        }
        @Override
        
public
 
int
 compareTo(Conn1 arg0) {
            
//
 TODO Auto-generated method stub
            
return
 
0
;
        }
    }
    
public
 
static
 
class
 Filter1Mapper 
extends
 MapReduceBase 
implements
            Mapper
<
LongWritable, Conn1, LongWritable, Conn2
>
 {
        @Override
        
public
 
void
 map(LongWritable inKey, Conn1 c2,
                OutputCollector
<
LongWritable, Conn2
>
 collector, Reporter report)
                
throws
 IOException {
            
if
 (c2.state.equals(
"
F
"
)) {
                Conn2 inValue 
=
 
new
 Conn2();
                inValue.customer 
=
 c2.customer;
                inValue.orderDate 
=
 c2.orderDate;
                inValue.orderKey 
=
 c2.orderKey;
                inValue.price 
=
 c2.price;
                inValue.state 
=
 c2.state;
                collector.collect(inKey, inValue);
            }
        }
    }
    
private
 
static
 
class
 Conn2 
implements
 WritableComparable
<
Conn1
>
 {
        
public
 
long
 orderKey;
        
public
 
long
 customer;
        
public
 String state;
        
public
 
double
 price;
        
public
 java.util.Date orderDate;
        @Override
        
public
 
void
 readFields(DataInput in) 
throws
 IOException {
            orderKey 
=
 in.readLong();
            customer 
=
 in.readLong();
            state 
=
 Text.readString(in);
            price 
=
 in.readDouble();
            orderDate 
=
 DateUtil.getDateFromString(Text.readString(in),
                    
"
yyyy-MM-dd
"
);
        }
        @Override
        
public
 
void
 write(DataOutput out) 
throws
 IOException {
            out.writeLong(orderKey);
            out.writeLong(customer);
            Text.writeString(out, state);
            out.writeDouble(price);
            Text.writeString(out, DateUtil.getDateStr(orderDate, 
"
yyyy-MM-dd
"
));
        }
        @Override
        
public
 
int
 compareTo(Conn1 arg0) {
            
//
 TODO Auto-generated method stub
            
return
 
0
;
        }
    }
    
public
 
static
 
class
 RegexMapper 
extends
 MapReduceBase 
implements
            Mapper
<
LongWritable, Conn2, LongWritable, Conn3
>
 {
        @Override
        
public
 
void
 map(LongWritable inKey, Conn2 c3,
                OutputCollector
<
LongWritable, Conn3
>
 collector, Reporter report)
                
throws
 IOException {
            c3.state 
=
 c3.state.replaceAll(
"
F
"
"
Find
"
);
            Conn3 c2 
=
 
new
 Conn3();
            c2.customer 
=
 c3.customer;
            c2.orderDate 
=
 c3.orderDate;
            c2.orderKey 
=
 c3.orderKey;
            c2.price 
=
 c3.price;
            c2.state 
=
 c3.state;
            collector.collect(inKey, c2);
        }
    }
    
private
 
static
 
class
 Conn3 
implements
 WritableComparable
<
Conn1
>
 {
        
public
 
long
 orderKey;
        
public
 
long
 customer;
        
public
 String state;
        
public
 
double
 price;
        
public
 java.util.Date orderDate;
        @Override
        
public
 
void
 readFields(DataInput in) 
throws
 IOException {
            orderKey 
=
 in.readLong();
            customer 
=
 in.readLong();
            state 
=
 Text.readString(in);
            price 
=
 in.readDouble();
            orderDate 
=
 DateUtil.getDateFromString(Text.readString(in),
                    
"
yyyy-MM-dd
"
);
        }
        @Override
        
public
 
void
 write(DataOutput out) 
throws
 IOException {
            out.writeLong(orderKey);
            out.writeLong(customer);
            Text.writeString(out, state);
            out.writeDouble(price);
            Text.writeString(out, DateUtil.getDateStr(orderDate, 
"
yyyy-MM-dd
"
));
        }
        @Override
        
public
 
int
 compareTo(Conn1 arg0) {
            
//
 TODO Auto-generated method stub
            
return
 
0
;
        }
    }
    
public
 
static
 
class
 LoadMapper 
extends
 MapReduceBase 
implements
            Mapper
<
LongWritable, Conn3, LongWritable, Conn3
>
 {
        @Override
        
public
 
void
 map(LongWritable arg0, Conn3 arg1,
                OutputCollector
<
LongWritable, Conn3
>
 arg2, Reporter arg3)
                
throws
 IOException {
            arg2.collect(arg0, arg1);
        }
    }
    
public
 
static
 
void
 main(String[] args) {
        JobConf job 
=
 
new
 JobConf(ProcessSample.
class
);
        job.setJobName(
"
ProcessSample
"
);
        job.setNumReduceTasks(
0
);
        job.setInputFormat(TextInputFormat.
class
);
        job.setOutputFormat(TextOutputFormat.
class
);
        JobConf mapper1 
=
 
new
 JobConf();
        JobConf mapper2 
=
 
new
 JobConf();
        JobConf mapper3 
=
 
new
 JobConf();
        JobConf mapper4 
=
 
new
 JobConf();
        ChainMapper cm 
=
 
new
 ChainMapper();
        cm.addMapper(job, ExtractMappper.
class
, LongWritable.
class
, Text.
class
,
                LongWritable.
class
, Conn1.
class
true
, mapper1);
        cm.addMapper(job, Filter1Mapper.
class
, LongWritable.
class
, Conn1.
class
,
                LongWritable.
class
, Conn2.
class
true
, mapper2);
        cm.addMapper(job, RegexMapper.
class
, LongWritable.
class
, Conn2.
class
,
                LongWritable.
class
, Conn3.
class
true
, mapper3);
        cm.addMapper(job, LoadMapper.
class
, LongWritable.
class
, Conn3.
class
,
                LongWritable.
class
, Conn3.
class
true
, mapper4);
        FileInputFormat.setInputPaths(job, 
new
 Path(
"
orderData
"
));
        FileOutputFormat.setOutputPath(job, 
new
 Path(
"
orderDataOutput
"
));
        Job job1;
        
try
 {
            job1 
=
 
new
 Job(job);
            JobControl jc 
=
 
new
 JobControl(
"
test
"
);
            jc.addJob(job1);
            jc.run();
        } 
catch
 (IOException e) {
            
//
 TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
} 

转载于:https://www.cnblogs.com/xuxm2007/archive/2011/09/03/2165774.html

你可能感兴趣的文章
【7】Django网页视图模板处理
查看>>
D3DXLoadMeshFromXof 0xC0000005: 读取位置 0x00000000 时发生访问冲突
查看>>
Python爬虫学习1
查看>>
Application之图书馆
查看>>
绿色djvu阅读软件
查看>>
最长子串 FZU2118
查看>>
vue中input限制最多两位小数
查看>>
spring mvc ModelAndView 404的原因
查看>>
confusing c++ 重写 与 重定义 记录1
查看>>
eclipse搭建j2ee
查看>>
Run Windows Mobile Device Emulator without Visual Studio
查看>>
ASP.NET
查看>>
音乐.唱片常识
查看>>
android控制显示和隐藏视图或控件的操作
查看>>
memched 协议
查看>>
window.document对象
查看>>
[Leetcode] sqrt 开根号
查看>>
Xcode及模拟器SDK下载
查看>>
负载均衡算法
查看>>
External (and Live) snapshots with libvirt
查看>>