MapReduce 案例实战

案例一 WordCount

需求说明

在给定的文本文件中统计输出每一个单词出现的总次数

  1. 输入数据
  2. 期望输出数据

代码

代码目录

  • com.luoteng.mr.wordcount
    • WordcountMapper
    • WordcountReducer
    • WordcountDriver

编写 Mapper 类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.luoteng.mr.wordcount;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/*
* map 阶段
* KEYIN 输入数据 key 的类型
* VALUEIN 输入数据 value 的类型
* KEYOUT 输出数据 key 的类型
* VALUEOUT 输出数据 value 的类型
*/
public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{

Text k = new Text();
IntWritable v = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {

String line = value.toString(); // 获取一行
String[] words = line.split(" "); // 切割单词

for (String word: words) {
k.set(word);
context.write(k, v);
}
}
}

编写 Reducer 类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package com.luoteng.mr.wordcount;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{

IntWritable v = new IntWritable();

@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {

int sum = 0;
// 累加求和
for (IntWritable value: values) {
sum += value.get();
}
// 写出
v.set(sum);
context.write(key, v);
}
}

编写 Driver 类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package com.luoteng.mr.wordcount;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordcountDriver {

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

// 1 获取 Job 对象
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);

// 2 设置 jar 存储位置
job.setJarByClass(WordcountDriver.class);

// 3 关联 Mapper 和 Reducer 类
job.setMapperClass(WordcountMapper.class);
job.setReducerClass(WordcountReducer.class);

// 4 设置 Mapper 阶段输出数据的 key 和 value 类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);

// 5 设置最终数据输出的 key 和 value 类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

// 6 设置输入路径和输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

// 7 提交 Job
// job.submit();
Boolean result = job.waitForCompletion(true); // 设置true,提交完成后会有一些打印信息
System.exit(result?0:1); // 设置系统内部运行打印信息,运行成功打印 0,否则打印 1
}
}

案例二 序列化

需求说明

统计每一个手机号耗费的总上行流量、下行流量、总流量

  1. 输入数据

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    1	13736230513	192.196.100.1	www.atguigu.com	2481	24681	200
    2 13846544121 192.196.100.2 264 0 200
    3 13956435636 192.196.100.3 132 1512 200
    4 13966251146 192.168.100.1 240 0 404
    5 18271575951 192.168.100.2 www.atguigu.com 1527 2106 200
    6 84188413 192.168.100.3 www.atguigu.com 4116 1432 200
    7 13590439668 192.168.100.4 1116 954 200
    8 15910133277 192.168.100.5 www.hao123.com 3156 2936 200
    9 13729199489 192.168.100.6 240 0 200
    10 13630577991 192.168.100.7 www.shouhu.com 6960 690 200
    11 15043685818 192.168.100.8 www.baidu.com 3659 3538 200
    12 15959002129 192.168.100.9 www.atguigu.com 1938 180 500
    13 13560439638 192.168.100.10 918 4938 200
    14 13470253144 192.168.100.11 180 180 200
    15 13682846555 192.168.100.12 www.qq.com 1938 2910 200
    16 13992314666 192.168.100.13 www.gaga.com 3008 3720 200
    17 13509468723 192.168.100.14 www.qinghua.com 7335 110349 404
    18 18390173782 192.168.100.15 www.sogou.com 9531 2412 200
    19 13975057813 192.168.100.16 www.baidu.com 11058 48243 200
    20 13768778790 192.168.100.17 120 120 200
    21 13568436656 192.168.100.18 www.alibaba.com 2481 24681 200
    22 13568436656 192.168.100.19 1116 954 200
  2. 期望输出数据

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    13470253144	180	180	360
    13509468723 7335 110349 117684
    13560439638 918 4938 5856
    13568436656 3597 25635 29232
    13590439668 1116 954 2070
    13630577991 6960 690 7650
    13682846555 1938 2910 4848
    13729199489 240 0 240
    13736230513 2481 24681 27162
    13768778790 120 120 240
    13846544121 264 0 264
    13956435636 132 1512 1644
    13966251146 240 0 240
    13975057813 11058 48243 59301
    13992314666 3008 3720 6728
    15043685818 3659 3538 7197
    15910133277 3156 2936 6092
    15959002129 1938 180 2118
    18271575951 1527 2106 3633
    18390173782 9531 2412 11943
    84188413 4116 1432 5548

代码

代码目录

  • com.luoteng.mr.flowsum
    • FlowBean
    • FlowCountMapper
    • FlowCountReducer
    • FlowsumDriver

编写流量统计的 Bean 对象

1
2
3
4
5
6
7
8
9
10
11
12
package com.luoteng.mr.flowsum;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

public class FlowBean implements Writable{### 需求说明

统计每一个手机号耗费的总上行流量、下行流量、总流量
1. 输入数据

1 13736230513 192.196.100.1 www.atguigu.com 2481 24681 200
2 13846544121 192.196.100.2 264 0 200
3 13956435636 192.196.100.3 132 1512 200
4 13966251146 192.168.100.1 240 0 404
5 18271575951 192.168.100.2 www.atguigu.com 1527 2106 200
6 84188413 192.168.100.3 www.atguigu.com 4116 1432 200
7 13590439668 192.168.100.4 1116 954 200
8 15910133277 192.168.100.5 www.hao123.com 3156 2936 200
9 13729199489 192.168.100.6 240 0 200
10 13630577991 192.168.100.7 www.shouhu.com 6960 690 200
11 15043685818 192.168.100.8 www.baidu.com 3659 3538 200
12 15959002129 192.168.100.9 www.atguigu.com 1938 180 500
13 13560439638 192.168.100.10 918 4938 200
14 13470253144 192.168.100.11 180 180 200
15 13682846555 192.168.100.12 www.qq.com 1938 2910 200
16 13992314666 192.168.100.13 www.gaga.com 3008 3720 200
17 13509468723 192.168.100.14 www.qinghua.com 7335 110349 404
18 18390173782 192.168.100.15 www.sogou.com 9531 2412 200
19 13975057813 192.168.100.16 www.baidu.com 11058 48243 200
20 13768778790 192.168.100.17 120 120 200
21 13568436656 192.168.100.18 www.alibaba.com 2481 24681 200
22 13568436656 192.168.100.19 1116 954 200

1
2. 期望输出数据

13470253144 180 180 360
13509468723 7335 110349 117684
13560439638 918 4938 5856### 需求说明

统计每一个手机号耗费的总上行流量、下行流量、总流量

  1. 输入数据

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    1	13736230513	192.196.100.1	www.atguigu.com	2481	24681	200
    2 13846544121 192.196.100.2 264 0 200
    3 13956435636 192.196.100.3 132 1512 200
    4 13966251146 192.168.100.1 240 0 404
    5 18271575951 192.168.100.2 www.atguigu.com 1527 2106 200
    6 84188413 192.168.100.3 www.atguigu.com 4116 1432 200
    7 13590439668 192.168.100.4 1116 954 200
    8 15910133277 192.168.100.5 www.hao123.com 3156 2936 200
    9 13729199489 192.168.100.6 240 0 200
    10 13630577991 192.168.100.7 www.shouhu.com 6960 690 200
    11 15043685818 192.168.100.8 www.baidu.com 3659 3538 200
    12 15959002129 192.168.100.9 www.atguigu.com 1938 180 500
    13 13560439638 192.168.100.10 918 4938 200
    14 13470253144 192.168.100.11 180 180 200
    15 13682846555 192.168.100.12 www.qq.com 1938 2910 200
    16 13992314666 192.168.100.13 www.gaga.com 3008 3720 200
    17 13509468723 192.168.100.14 www.qinghua.com 7335 110349 404
    18 18390173782 192.168.100.15 www.sogou.com 9531 2412 200
    19 13975057813 192.168.100.16 www.baidu.com 11058 48243 200
    20 13768778790 192.168.100.17 120 120 200
    21 13568436656 192.168.100.18 www.alibaba.com 2481 24681 200
    22 13568436656 192.168.100.19 1116 954 200
  2. 期望输出数据

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    13470253144	180	180	360
    13509468723 7335 110349 117684
    13560439638 918 4938 5856
    13568436656 3597 25635 29232
    13590439668 1116 954 2070
    13630577991 6960 690 7650
    13682846555 1938 2910 4848
    13729199489 240 0 240
    13736230513 2481 24681 27162
    13768778790 120 120 240
    13846544121 264 0 264
    13956435636 132 1512 1644
    13966251146 240 0 240
    13975057813 11058 48243 59301
    13992314666 3008 3720 6728
    15043685818 3659 3538 7197
    15910133277 3156 2936 6092
    15959002129 1938 180 2118
    18271575951 1527 2106 3633
    18390173782 9531 2412 11943
    84188413 4116 1432 5548

代码

代码目录

  • com.luoteng.mr.flowsum
    • FlowBean
    • FlowCountMapper
    • FlowCountReducer
    • FlowsumDriver
      13568436656 3597 25635 29232
      13590439668 1116 954 2070
      13630577991 6960 690 7650
      13682846555 1938 2910 4848
      13729199489 240 0 240
      13736230513 2481 24681 27162
      13768778790 120 120 240
      13846544121 264 0 264
      13956435636 132 1512 1644
      13966251146 240 0 240
      13975057813 11058 48243 59301
      13992314666 3008 3720 6728
      15043685818 3659 3538 7197
      15910133277 3156 2936 6092
      15959002129 1938 180 2118
      18271575951 1527 2106 3633
      18390173782 9531 2412 11943
      84188413 4116 1432 5548
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66
      67
      68
      69
      70
      71
      72
      73
      74
      75
      76

      ### 代码

      代码目录
      - com.luoteng.mr.flowsum
      - FlowBean
      - FlowCountMapper
      - FlowCountReducer
      - FlowsumDriver

      private long upFlow; // 上行流量
      private long downFlow; // 下行流量
      private long sumFlow; // 总流量


      public FlowBean() {
      super();
      }

      public FlowBean(long upFlow, long downFlow) {
      super();
      this.upFlow = upFlow;
      this.downFlow = downFlow;
      sumFlow = upFlow + downFlow;
      }

      @Override // 序列化
      public void write(DataOutput out) throws IOException {
      out.writeLong(upFlow);
      out.writeLong(downFlow);
      out.writeLong(sumFlow);
      }

      @Override // 反序列化
      public void readFields(DataInput in) throws IOException {
      // 和序列化方法顺序一致
      upFlow = in.readLong();
      downFlow = in.readLong();
      sumFlow = in.readLong();
      }

      @Override
      public String toString() {
      return upFlow + "\t" + downFlow + "\t" + sumFlow;
      }

      public long getUpFlow() {
      return upFlow;
      }

      public void setUpFlow(long upFlow) {
      this.upFlow = upFlow;
      }

      public long getDownFlow() {
      return downFlow;
      }

      public void setDownFlow(long downFlow) {
      this.downFlow = downFlow;
      }

      public long getSumFlow() {
      return sumFlow;
      }

      public void setSumFlow(long sumFlow) {
      this.sumFlow = sumFlow;
      }

      public void set(long upFlow, long downFlow) {
      this.upFlow = upFlow;
      this.downFlow = downFlow;
      sumFlow = upFlow + downFlow;
      }
      }

编写 Mapper 类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.luoteng.mr.flowsum;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean>{

Text k = new Text();
FlowBean v = new FlowBean();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {

// 1 获取一行
String line = value.toString();

// 2 切割
String[] fields = line.split("\t");

// 3 封装 bean 对象
k.set(fields[1]);
long upFlow = Long.parseLong(fields[fields.length - 3]);
long downFlow = Long.parseLong(fields[fields.length - 2]);
v.setUpFlow(upFlow);
v.setDownFlow(downFlow);

// 4 写出
context.write(k, v);
}
}

编写 Reducer 类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package com.luoteng.mr.flowsum;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean>{

FlowBean v = new FlowBean();

@Override
protected void reduce(Text key, Iterable<FlowBean> values, Context context)
throws IOException, InterruptedException {

long sum_upFlow = 0;
long sum_downFlow = 0;

// 1 累加求和
for(FlowBean flowBean: values) {
sum_upFlow += flowBean.getUpFlow();
sum_downFlow += flowBean.getDownFlow();
}

// 2 写出
v.set(sum_upFlow, sum_downFlow);
context.write(key, v);
}
}

编写 Driver 类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package com.luoteng.mr.flowsum;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class FlowsumDriver {

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

args = new String[] {"/root/data/input", "/root/data/output1"};
// 1 获取 Job 对象
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);

// 2 设置 jar 存储位置
job.setJarByClass(FlowsumDriver.class);

// 3 关联 Mapper 和 Reducer 类
job.setMapperClass(FlowCountMapper.class);
job.setReducerClass(FlowCountReducer.class);

// 4 设置 Mapper 阶段输出数据的 key 和 value 类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);

// 5 设置最终数据输出的 key 和 value 类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);

// 6 设置输入路径和输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

// 7 提交 Job
Boolean result = job.waitForCompletion(true); // 设置true,提交完成后会有一些打印信息
System.exit(result?0:1); // 设置系统内部运行打印信息,运行成功打印 0,否则打印 1
}
}

案例三 CombineTextInputFormat 切片机制

需求说明

将输入的大量小文件合并成一个切片统一处理。

  1. 输入数据:同案例一
  2. 期望输出数据:同案例一

代码

不做任何处理,修改案例一 WordCount 的案例程序,观察修改前后代码运行信息中的切片个数

修改 Driver 类

在代码中增加一行

1
2
// * 关联
job.setCombinerClass(WordcountReducer.class);

运行结果说明

待补充

案例四 KeyValueTextInputFormat 的使用

需求说明

统计输入文件中每一行的第一个单词相同的行数。

  1. 输入数据

    1
    2


  2. 期望输出数据

    1
    2


代码

代码目录

  • KVTextMapper
    • KVTextMapper
    • KVTextReducer
    • KVTextDriver

编写 Mapper 类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package KVTextMapper;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class KVTextMapper extends Mapper<Text, Text, Text, IntWritable>{

IntWritable v = new IntWritable(1);

@Override
protected void map(Text key, Text value, Context context)
throws IOException, InterruptedException {

// 1 封装对象

// 2 写出
context.write(key, v);
}
}

编写 Reducer 类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package KVTextMapper;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class KVTextReducer extends Reducer<Text, IntWritable, Text, IntWritable>{

IntWritable v = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {

// 1 累加求和
int sum = 0;
for (IntWritable value: values) {
sum += value.get();
}

// 2 写出
v.set(sum);
context.write(key, v);
}
}

编写 Driver 类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package KVTextMapper;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueLineRecordReader;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class KVTextDriver {

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

args = new String[] {"/root/data/input", "/root/data/output"};
// 1 获取 Job 对象
Configuration conf = new Configuration();
conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, " "); // * 设置切割符
Job job = Job.getInstance(conf);

// 2 设置 jar 存储位置
job.setJarByClass(KVTextDriver.class);

// 3 关联 Mapper 和 Reducer 类
job.setMapperClass(KVTextMapper.class);
job.setReducerClass(KVTextReducer.class);

// 4 设置 Mapper 阶段输出数据的 key 和 value 类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);

// 5 设置最终数据输出的 key 和 value 类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

// * 设置输入格式
job.setInputFormatClass(KeyValueTextInputFormat.class);

// 6 设置输入路径和输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

// 7 提交 Job
Boolean result = job.waitForCompletion(true); // 设置true,提交完成后会有一些打印信息
System.exit(result?0:1); // 设置系统内部运行打印信息,运行成功打印 0,否则打印 1
}
}

案例五 自定义InputFormat

需求说明

将多个小文件合并成一个 SequenceFile 文件(SequenceFile 文件是 Hadoop 用来存储二进制形式的 key-value 对的文件格式),SequenceFile 里面存储着多个文件,存储的形式为文件路径+名称为 key,文件内容为 value。

  1. 输入数据

    1
    2


  2. 期望输出数据

    1
    2


代码

代码目录

  • com.luoteng.mr.inputformat
    • WholeFileInputFormat
    • WholeRecordReader
    • SequenceFileMapper
    • SequenceFileReducer
    • SequenceFileDriver

自定义 InputFromat

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package com.luoteng.mr.inputformat;

import java.io.IOException;

import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

public class WholeFileInputFormat extends FileInputFormat<Text, BytesWritable>{

@Override
public RecordReader<Text, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {

WholeRecordReader recordReader = new WholeRecordReader();
recordReader.initialize(split, context);
return recordReader;
}
}

自定义 RecordReader 类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package com.luoteng.mr.inputformat;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class WholeRecordReader extends RecordReader<Text, BytesWritable>{

FileSplit split;
Configuration configuration;
Text k = new Text();
BytesWritable v = new BytesWritable();
Boolean isProgress = true;

@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
// 初始化
this.split = (FileSplit) split;
configuration = context.getConfiguration();
}

@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
// 核心业务逻辑处理
if (isProgress) {
byte[] buf = new byte[(int) split.getLength()];
// 1 获取 fs 对象
Path path = split.getPath();
FileSystem fs = path.getFileSystem(configuration);

// 2 获取输入流
FSDataInputStream fis = fs.open(path);

// 3 拷贝
IOUtils.readFully(fis, buf, 0, buf.length);

// 4 封装 v
v.set(buf, 0, buf.length);

// 5 封装 k
k.set(path.toString());

// 6 关闭资源
IOUtils.closeStream(fis);
isProgress = false;
return true;
}
return false;
}

@Override
public Text getCurrentKey() throws IOException, InterruptedException {
// 获取当前 key
return k;
}

@Override
public BytesWritable getCurrentValue() throws IOException, InterruptedException {
// 获取当前 value
return v;
}

@Override
public float getProgress() throws IOException, InterruptedException {
// 获取当前进度
return 0;
}

@Override
public void close() throws IOException {
// 关闭资源
}
}

编写 Mapper 类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package com.luoteng.mr.inputformat;

import java.io.IOException;

import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class SequenceFileMapper extends Mapper<Text, BytesWritable, Text, BytesWritable>{

@Override
protected void map(Text key, BytesWritable value, Mapper<Text, BytesWritable, Text, BytesWritable>.Context context)
throws IOException, InterruptedException {

context.write(key, value);
}
}

编写 Reducer 类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package com.luoteng.mr.inputformat;

import java.io.IOException;

import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class SequenceFileReducer extends Reducer<Text, BytesWritable, Text, BytesWritable>{

@Override
protected void reduce(Text key, Iterable<BytesWritable> values,
Reducer<Text, BytesWritable, Text, BytesWritable>.Context context) throws IOException, InterruptedException {
// 循环写出
for (BytesWritable value: values) {
context.write(key, value);
}
}

}

编写 Driver 类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package com.luoteng.mr.inputformat;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;

public class SequenceFileDriver {

public static void main(String[] args) throws IOException, InterruptedException, Exception {

args = new String[] {"/root/data/input", "/root/data/output"};
// 1 获取job对象
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);

// 2 设置jar包存储位置、关联自定义的mapper和reducer
job.setJarByClass(SequenceFileDriver.class);
job.setMapperClass(SequenceFileMapper.class);
job.setReducerClass(SequenceFileReducer.class);

// * 设置输入的inputFormat
job.setInputFormatClass(WholeFileInputFormat.class);

// * 设置输出的outputFormat
job.setOutputFormatClass(SequenceFileOutputFormat.class);

// 3 设置map输出端的kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(BytesWritable.class);

// 4 设置最终输出端的kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(BytesWritable.class);

// 5 设置输入输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

// 6 提交job
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}

案例六

需求说明

将统计结果按照手机归属地不同省份输出到不同文件中(分区)

  1. 输入数据

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    1	13736230513	192.196.100.1	www.atguigu.com	2481	24681	200
    2 13846544121 192.196.100.2 264 0 200
    3 13956435636 192.196.100.3 132 1512 200
    4 13966251146 192.168.100.1 240 0 404
    5 18271575951 192.168.100.2 www.atguigu.com 1527 2106 200
    6 84188413 192.168.100.3 www.atguigu.com 4116 1432 200
    7 13590439668 192.168.100.4 1116 954 200
    8 15910133277 192.168.100.5 www.hao123.com 3156 2936 200
    9 13729199489 192.168.100.6 240 0 200
    10 13630577991 192.168.100.7 www.shouhu.com 6960 690 200
    11 15043685818 192.168.100.8 www.baidu.com 3659 3538 200
    12 15959002129 192.168.100.9 www.atguigu.com 1938 180 500
    13 13560439638 192.168.100.10 918 4938 200
    14 13470253144 192.168.100.11 180 180 200
    15 13682846555 192.168.100.12 www.qq.com 1938 2910 200
    16 13992314666 192.168.100.13 www.gaga.com 3008 3720 200
    17 13509468723 192.168.100.14 www.qinghua.com 7335 110349 404
    18 18390173782 192.168.100.15 www.sogou.com 9531 2412 200
    19 13975057813 192.168.100.16 www.baidu.com 11058 48243 200
    20 13768778790 192.168.100.17 120 120 200
    21 13568436656 192.168.100.18 www.alibaba.com 2481 24681 200
    22 13568436656 192.168.100.19 1116 954 200
  2. 期望输出数据
    手机号136、137、138、139开头都分别放到一个独立的4个文件中,其他开头的放到一个文件中。

代码

在案例三的代码基础上进行修改

代码目录

  • com.luoteng.mr.flowsum
    • FlowBean
    • FlowCountMapper
    • FlowCountReducer
    • FlowsumDriver
    • ProvincePartitioner

增加分区类 ProvincePartitioner 对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package com.luoteng.mr.flowsum;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class ProvincePartitioner extends Partitioner<Text, FlowBean>{

@Override
public int getPartition(Text key, FlowBean value, int numPartitions) {
// 获取手机号前三位
String prePhoneNum = key.toString().substring(0, 3);

int partition = 4;

if ("136".equals(prePhoneNum)) {
partition = 0;
}else if ("137".equals(prePhoneNum)) {
partition = 1;
}else if ("138".equals(prePhoneNum)) {
partition = 2;
}else if ("139".equals(prePhoneNum)) {
partition = 3;
}

return partition;
}
}

修改 Driver 类

在驱动函数中增加自定义数据分区设置和ReduceTask设置,增加如下代码

1
2
3
// * 设置自定义的 Partitioner 类
job.setPartitionerClass(ProvincePartitioner.class);
job.setNumReduceTasks(5);

案例七

需求说明

根据案例六产生的结果再次对总流量进行排序。

  1. 输入数据:案例六的输出结果
  2. 期望输出数据
    手机号136、137、138、139开头都分别放到一个独立的4个文件中,其他开头的放到一个文件中。并且文件内容有序

代码

  • com.luoteng.mr.sort
    • FlowBean
    • FlowCountSortMapper
    • FlowCountSortReducer
    • FlowCountSortDriver
    • ProvincePartitioner

编写流量统计的 Bean 对象

lowBean对象在案例六的基础上增加了比较功能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
package com.luoteng.mr.sort;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class FlowBean implements WritableComparable<FlowBean>{

private long upFlow; // 上行流量
private long downFlow; // 下行流量
private long sumFlow;

public FlowBean() {
super();
}

public FlowBean(long upFlow, long downFlow) {
super();
this.upFlow = upFlow;
this.downFlow = downFlow;
this.sumFlow = upFlow + downFlow;
}

// 序列化
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}

// 反序列化
@Override
public void readFields(DataInput in) throws IOException {
upFlow = in.readLong();
downFlow = in.readLong();
sumFlow = in.readLong();
}

// 比较
@Override
public int compareTo(FlowBean bean) {
int result;

// 核心比较条件判断
if (sumFlow > bean.getSumFlow()) {
result = -1;
}else if (sumFlow < bean.getSumFlow()) {
result = 1;
}else {
result = 0;
}
return result;
}

public long getUpFlow() {
return upFlow;
}

public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}

public long getDownFlow() {
return downFlow;
}

public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}

public long getSumFlow() {
return sumFlow;
}

public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}

@Override
public String toString() {
return upFlow + "\t" + downFlow + "\t" + sumFlow;
}
}

编写 Mapper 类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package com.luoteng.mr.sort;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class FlowCountSortMapper extends Mapper<LongWritable, Text, FlowBean, Text>{

FlowBean k = new FlowBean();
Text v = new Text();
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, FlowBean, Text>.Context context)
throws IOException, InterruptedException {
// 1 获取一行
String line = value.toString();

// 2 切割
String[] fields = line.split("\t");

// 3 封装对象
k.setUpFlow(Long.parseLong((fields[1])));
k.setDownFlow(Long.parseLong((fields[2])));
k.setSumFlow(Long.parseLong((fields[3])));
v.set(fields[0]);

// 4 写出
context.write(k, v);
}

}

编写 Reducer 类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package com.luoteng.mr.sort;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class FlowCountSortReducer extends Reducer<FlowBean, Text, Text, FlowBean>{

@Override
protected void reduce(FlowBean key, Iterable<Text> values, Reducer<FlowBean, Text, Text, FlowBean>.Context context)
throws IOException, InterruptedException {

for (Text value: values) {
context.write(value, key);
}
}
}

编写 Driver 类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package com.luoteng.mr.sort;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class FlowCountSortDriver {

public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException {

args = new String[]{"/root/data/output1", "/root/data/output2"};
// 1 获取配置信息,或者job对象实例
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);

// 2 指定本程序的jar包所在的本地路径
job.setJarByClass(FlowCountSortDriver.class);

// 3 指定本业务job要使用的mapper/Reducer业务类
job.setMapperClass(FlowCountSortMapper.class);
job.setReducerClass(FlowCountSortReducer.class);

// 4 指定mapper输出数据的kv类型
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(Text.class);

// * 关联分区
job.setPartitionerClass(ProvincePartitioner.class);
job.setNumReduceTasks(5);

// 5 指定最终输出的数据的kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);

// 6 指定job的输入原始文件所在目录
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

// 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}

编写分区类 ProvincePartitioner

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package com.luoteng.mr.sort;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class ProvincePartitioner extends Partitioner<FlowBean, Text>{

@Override
public int getPartition(FlowBean key, Text value, int numPartitions) {
// 按照手机号前三位进行分区
String prePhoneNum = value.toString().substring(0, 3);

int partition = 4;

if ("136".equals(prePhoneNum)) {
partition = 0;
}else if ("137".equals(prePhoneNum)) {
partition = 1;
}else if ("138".equals(prePhoneNum)) {
partition = 2;
}else if ("139".equals(prePhoneNum)) {
partition = 3;
}

return partition;
}
}

案例八 GroupingComparator 分组

需求说明

订单数据,现在需要求出每一个订单中最贵的商品。

  1. 输入数据

    1
    2
    3
    4
    5
    6
    7
    000001 Pdt_01 222.8
    000002 Pdt_05 722.4
    000001 Pdt_02 33.8
    000003 Pdt_06 232.8
    000003 Pdt_02 33.8
    000002 Pdt_03 522.8
    000002 Pdt_04 122.4
  2. 期望输出数据

    1
    2
    3
    1	222.8
    2 722.4
    3 232.8

代码

  • com.luoteng.mr.order
    • OrderBean
    • OrderMapper
    • OrderGroupingComparator
    • OrderReducer
    • OrderDriver

定义订单信息 OrderBean 类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
package com.luoteng.mr.order;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class OrderBean implements WritableComparable<OrderBean>{

private int order_id; // 订单id
private double price; // 订单价格

public OrderBean() {
super();
}

public OrderBean(int order_id, double price) {
super();
this.order_id = order_id;
this.price = price;
}

@Override
public void write(DataOutput out) throws IOException {
out.writeInt(order_id);
out.writeDouble(price);
}

@Override
public void readFields(DataInput in) throws IOException {
order_id = in.readInt();
price = in.readDouble();
}

@Override
public int compareTo(OrderBean bean) {
// 先安装订单id升序排序,如相同按照价格降序排序
int result;

if (order_id > bean.getOrder_id()) {
result = 1;
}else if (order_id < bean.getOrder_id()) {
result = -1;
}else {
if (price > bean.getPrice()) {
result = -1;
}else if (price < bean.getPrice()) {
result = 1;
}else {
result = 0;
}
}
return result;
}

public int getOrder_id() {
return order_id;
}

public void setOrder_id(int order_id) {
this.order_id = order_id;
}

public double getPrice() {
return price;
}

public void setPrice(double price) {
this.price = price;
}

@Override
public String toString() {
return order_id + "\t" + price;
}
}

编写 Mapper 类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
ackage com.luoteng.mr.order;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class OrderMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable>{

OrderBean k = new OrderBean();

@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, OrderBean, NullWritable>.Context context)
throws IOException, InterruptedException {

// 1 获取一行
String line = value.toString();

// 2 切割
String[] fields = line.split(" ");

// 3 封装对象
k.setOrder_id(Integer.parseInt(fields[0]));
k.setPrice(Double.parseDouble(fields[2]));

// 4 写出
context.write(k, NullWritable.get());
}
}

编写 GroupingComparator 类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com.luoteng.mr.order;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class OrderGroupingComparator extends WritableComparator{

// * 默认为 false, 不写报空指针异常
protected OrderGroupingComparator() {
super(OrderBean.class, true);
}

@Override
public int compare(WritableComparable a, WritableComparable b) {

// 要求只要id相同,就认为是相同的 key
OrderBean aBean = (OrderBean) a;
OrderBean bBean = (OrderBean) b;

int result;
if (aBean.getOrder_id() > bBean.getOrder_id()) {
result = 1;
}else if (aBean.getOrder_id() < bBean.getOrder_id()) {
result = -1;
}else {
result = 0;
}

return result;
}
}

编写 Reducer 类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package com.luoteng.mr.order;

import java.io.IOException;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;

public class OrderReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable>{

@Override
protected void reduce(OrderBean key, Iterable<NullWritable> values,
Reducer<OrderBean, NullWritable, OrderBean, NullWritable>.Context context)
throws IOException, InterruptedException {

context.write(key, NullWritable.get());
}

}

编写 Driver 类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package com.luoteng.mr.order;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class OrderDriver {

public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException {

args = new String[] {"/root/data/input", "/root/data/output"};
// 1 获取配置信息
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);

// 2 设置jar包加载路径
job.setJarByClass(OrderDriver.class);

// 3 加载map/reduce类
job.setMapperClass(OrderMapper.class);
job.setReducerClass(OrderReducer.class);

// 4 设置map输出数据key和value类型
job.setMapOutputKeyClass(OrderBean.class);
job.setMapOutputValueClass(NullWritable.class);

// 5 设置最终输出数据的key和value类型
job.setOutputKeyClass(OrderBean.class);
job.setOutputValueClass(NullWritable.class);

// 6 设置输入数据和输出数据路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

// * 设置reduce端的分组
job.setGroupingComparatorClass(OrderGroupingComparator.class);

// 7 提交
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}

}

案例九 自定义OutputFormat

需求说明

过滤输入的log日志,包含atguigu的网站输出到 /root/data/luoteng.log,不包含atguigu的网站输出到 /root/data//other.log。

  1. 输入数据

    1
    2
    3
    4
    5
    http://www.baidu.com
    http://luoteng.com
    http:/www.sohu.com
    http://www.sin2a.com
    http://atguigu.com
  2. 期望输出数据

    1
    2


代码

  • com.luoteng.mr.outputformat
    • FilterMapper
    • FilterReducer
    • FilterOutputFormat
    • FRecordWriter
    • FilterDriver

编写 Mapper 类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package com.luoteng.mr.outputformat;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class FilterMapper extends Mapper<LongWritable, Text, Text, NullWritable>{

@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context)
throws IOException, InterruptedException {

context.write(value, NullWritable.get());
}
}

编写 Reducer 类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package com.luoteng.mr.outputformat;

import java.io.IOException;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.KeyValueLineRecordReader;

public class FilterReducer extends Reducer<Text, NullWritable, Text, NullWritable>{

Text k = new Text();

@Override
protected void reduce(Text key, Iterable<NullWritable> values,
Reducer<Text, NullWritable, Text, NullWritable>.Context context) throws IOException, InterruptedException {

String line = key.toString();
line = line + "\r\n";
k.set(line);
for (NullWritable nullWritable: values) {
context.write(k, nullWritable);
}
}
}

自定义一个 OutputFormat 类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package com.luoteng.mr.outputformat;

import java.io.IOException;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class FilterOutputFormat extends FileOutputFormat<Text, NullWritable>{

@Override
public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job)
throws IOException, InterruptedException {

return new FRecordWriter(job);
}

}

编写 RecordWriter 类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package com.luoteng.mr.outputformat;

import java.io.IOException;


import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

public class FRecordWriter extends RecordWriter<Text, NullWritable> {

FSDataOutputStream fosluoteng;
FSDataOutputStream fosother;
public FRecordWriter(TaskAttemptContext job) {

try {
// 获取文件系统
FileSystem fs = FileSystem.get(job.getConfiguration());

// 创建输出到 luoteng.log 的输出流
fosluoteng = fs.create(new Path("/root/data/luoteng.log"));

// 创建输出到 other.log 的输出流
fosother = fs.create(new Path("/root/data/other.log"));
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

}

@Override
public void write(Text key, NullWritable value) throws IOException, InterruptedException {
// 判断 key 中是否有 luoteng, 如果有写出到 luoteng.log,否则写出到 other.log

if (key.toString().contains("luoteng")) {
fosluoteng.write(key.toString().getBytes());
}else {
fosother.write(key.toString().getBytes());
}

}

@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {

IOUtils.closeStream(fosluoteng);
IOUtils.closeStream(fosother);
}
}

编写 Driver 类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package com.luoteng.mr.outputformat;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class FilterDriver {

public static void main(String[] args) throws ClassNotFoundException, IOException, InterruptedException {

// 输入输出路径需要根据自己电脑上实际的输入输出路径设置
args = new String[] { "/root/data/input/", "/root/data/output2" };
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);

job.setJarByClass(FilterDriver.class);
job.setMapperClass(FilterMapper.class);
job.setReducerClass(FilterReducer.class);

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);

// 要将自定义的输出格式组件设置到job中
job.setOutputFormatClass(FilterOutputFormat.class);

FileInputFormat.setInputPaths(job, new Path(args[0]));

// 虽然我们自定义了outputformat,但是因为我们的outputformat继承自fileoutputformat
// 而fileoutputformat要输出一个_SUCCESS文件,所以,在这还得指定一个输出目录
FileOutputFormat.setOutputPath(job, new Path(args[1]));

boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}

案例十

需求说明

将商品信息表中数据根据商品pid合并到订单数据表中。

订单数据:
| id | pid | amount |
|—-|—–|——–|
| 1001 | 01 | 1 |
| 1002 | 02 | 2 |
| 1003 | 03 | 3 |
| 1004 | 01 | 4 |
| 1005 | 02 | 5 |
| 1006 | 03 | 6 |

商品信息:
|pid | pname |
|—-|——-|
| 01 | 小米 |
| 02 | 华为 |
| 03 | 格力 |

  1. 输入数据

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    1001	01	1
    1002 02 2
    1003 03 3
    1001 01 1
    1002 02 2
    1003 03 3

    01 小米
    02 华为
    03 格力
  2. 期望输出数据

    1
    2


代码

  • com.luoteng.mr.table
    • TableBean
    • TableMapper
    • TableReducer
    • TableDriver

创建商品和订合并后的 Bean 类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
package com.luoteng.mr.table;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

public class TableBean implements Writable{

private String order_id; // 订单id
private String p_id; // 产品id
private int amount; // 产品数量
private String pname; // 产品名称
private String flag; // 表的标记

public TableBean() {
super();
}

public TableBean(String order_id, String p_id, int amount, String pname, String flag) {
super();
this.order_id = order_id;
this.p_id = p_id;
this.amount = amount;
this.pname = pname;
this.flag = flag;
}

@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(order_id);
out.writeUTF(p_id);
out.writeInt(amount);
out.writeUTF(pname);
out.writeUTF(flag);
}

@Override
public void readFields(DataInput in) throws IOException {
this.order_id = in.readUTF();
this.p_id = in.readUTF();
this.amount = in.readInt();
this.pname = in.readUTF();
this.flag = in.readUTF();
}

public String getOrder_id() {
return order_id;
}

public void setOrder_id(String order_id) {
this.order_id = order_id;
}

public String getP_id() {
return p_id;
}

public void setP_id(String p_id) {
this.p_id = p_id;
}

public int getAmount() {
return amount;
}

public void setAmount(int amount) {
this.amount = amount;
}

public String getPname() {
return pname;
}

public void setPname(String pname) {
this.pname = pname;
}

public String getFlag() {
return flag;
}

public void setFlag(String flag) {
this.flag = flag;
}

@Override
public String toString() {
return order_id + "\t" + pname + "\t" + amount + "\t" ;
}
}

编写 Mapper 类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package com.luoteng.mr.table;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class TableMapper extends Mapper<LongWritable, Text, Text, TableBean> {

String name;
TableBean tableBean = new TableBean();
Text k = new Text();

@Override
protected void setup(Context context)
throws IOException, InterruptedException {

// 获取文件的名称
FileSplit inputSplit = (FileSplit) context.getInputSplit();
name = inputSplit.getPath().getName();
}

@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// 获取一行
String line = value.toString();

if (name.startsWith("order")) {

String[] fields = line.split("\t");

// 封装 key 和 value(bean 对象)
tableBean.setOrder_id(fields[0]);
tableBean.setP_id(fields[1]);
tableBean.setAmount(Integer.parseInt(fields[2]));
tableBean.setPname("");
tableBean.setFlag("order");
k.set(fields[1]);

}else {
String[] fields = line.split("/t");
// 封装 key 和 value(bean 对象)
tableBean.setOrder_id("");
tableBean.setP_id(fields[0]);
tableBean.setAmount(0);
tableBean.setPname(fields[1]);
tableBean.setFlag("pd");
k.set(fields[0]);
}

// 写出
context.write(k, tableBean);
}
}

编写 Reducer 类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package com.luoteng.mr.table;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;

import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

//import com.luoteng.mr.text.TableBean;

public class TableReducer extends Reducer<Text, TableBean, TableBean, NullWritable>{

@Override
protected void reduce(Text key, Iterable<TableBean> values,
Context context) throws IOException, InterruptedException {

// 存储所有订单集合
ArrayList<TableBean> orderBeans = new ArrayList<>();
// 存储产品信息
TableBean pdBean = new TableBean();

for (TableBean tableBean: values) {
if ("order".equals(tableBean.getFlag())) {

TableBean tmpBean = new TableBean();

try {
BeanUtils.copyProperties(tmpBean, tableBean);
orderBeans.add(tmpBean);
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}
}else {
try {
BeanUtils.copyProperties(pdBean, tableBean);
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}
}
}

for (TableBean tableBean: orderBeans) {
tableBean.setPname(pdBean.getPname());
context.write(tableBean, NullWritable.get());
}
}
}

编写 Driver 类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package com.luoteng.mr.table;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class TableDriver {

public static void main(String[] args) throws ClassNotFoundException, IOException, InterruptedException {

// 0 根据自己电脑路径重新配置
args = new String[] { "/root/data/input/", "/root/data/output1" };

// 1 获取配置信息,或者job对象实例
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);

// 2 指定本程序的jar包所在的本地路径
job.setJarByClass(TableDriver.class);

// 3 指定本业务job要使用的Mapper/Reducer业务类
job.setMapperClass(TableMapper.class);
job.setReducerClass(TableReducer.class);

// 4 指定Mapper输出数据的kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(TableBean.class);

// 5 指定最终输出的数据的kv类型
job.setOutputKeyClass(TableBean.class);
job.setOutputValueClass(NullWritable.class);

// 6 指定job的输入原始文件所在目录
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

// 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}

案例十一 Map Join

需求说明

同案例十

代码

  • com.luoteng.mr.cache
    • DistributedCacheMapper
    • DistributedCacheDriver

编写 Mapper 类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package com.luoteng.mr.cache;


import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class DistributedCacheMapper extends Mapper<LongWritable, Text, Text, NullWritable>{

HashMap<String, String> pdMap = new HashMap<>();
Text k = new Text();
@Override
protected void setup(Mapper<LongWritable, Text, Text, NullWritable>.Context context)
throws IOException, InterruptedException {
// 缓存小表
URI[] cacheFiles = context.getCacheFiles();
String path = cacheFiles[0].getPath().toString();

BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(path), "UTF-8"));

String line;
while(StringUtils.isNotEmpty(line = reader.readLine())) {
// 切割
String[] fileds = line.split("\t");

pdMap.put(fileds[0], fileds[1]);
}

// 关闭资源
IOUtils.closeStream(reader);
}

@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context)
throws IOException, InterruptedException {
// 1 获取一行
String line = value.toString();

// 2 切割
String[] fields = line.split("\t");

// 3 获取 pid;
String pid = fields[1];

// 4 获取pname
String pname = pdMap.get(pid);

// 5 拼接
line = line + "\t" + pname;

// 6 写出
k.set(line);
context.write(k, NullWritable.get());
}

}

编写 Driver 类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package com.luoteng.mr.cache;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class DistributedCacheDriver {

public static void main(String[] args) throws IllegalArgumentException, IOException, URISyntaxException, ClassNotFoundException, InterruptedException {
// 0 根据自己电脑路径重新配置
args = new String[] { "/root/data/input/", "/root/data/output1" };

// 1 获取job信息
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);

// 2 设置加载jar包路径
job.setJarByClass(DistributedCacheDriver.class);

// 3 关联map
job.setMapperClass(DistributedCacheMapper.class);

// 4 设置最终输出数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);

// 5 设置输入输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

// * 加载缓存数据
job.addCacheFile(new URI("/root/data/input/pd.txt"));

// * Map端Join的逻辑不需要Reduce阶段,设置reduceTask数量为0
job.setNumReduceTasks(0);

// 8 提交
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}

案例十二 数据清洗案例实操

需求说明

去除日志中字段长度小于等于11的日志。

  1. 输入数据

    1
    2


  2. 期望输出数据:每行字段长度都大于11。

    1
    2


代码

  • com.luoteng.mr.log
    • LogMapper
    • LogDriver

编写 Mapper 类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package com.luoteng.mr.log;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class LogMapper extends Mapper<LongWritable, Text, Text, NullWritable>{

@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context)
throws IOException, InterruptedException {
// 1 获取一行
String line = value.toString();

// 2 解析数据
boolean result = parseLog(line, context);

if (!result) {
return; // 解析失败,返回
}

// 3 写出
context.write(value, NullWritable.get());
}

private boolean parseLog(String line, Context context) {

String[] fields = line.split(" ");

if (fields.length > 11) {
context.getCounter("map", "true").increment(1);
return true;
}else {
context.getCounter("map", "false").increment(1);
return false;
}
}
}

编写 Driver 类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package com.luoteng.mr.log;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class LogDriver {

public static void main(String[] args) throws ClassNotFoundException, IOException, InterruptedException {

args = new String[] {"/root/data/input", "/root/data/output1"};
// 1 获取job信息
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);

// 2 加载jar包
job.setJarByClass(LogDriver.class);

// 3 关联map
job.setMapperClass(LogMapper.class);

// 4 设置最终输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);

// * 设置 reducetask 个数为 0
job.setNumReduceTasks(0);

// 5 设置输入和输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

// 6 提交
job.waitForCompletion(true);
}
}

模板

案例三

需求说明

统计每一个手机号耗费的总上行流量、下行流量、总流量

  1. 输入数据

    1
    2


  2. 期望输出数据

    1
    2


代码

编写流量统计的 Bean 对象

1
2


编写 Mapper 类

1
2


编写 Reducer 类

1
2


编写 Driver 类

1
2


坚持原创技术分享,您的支持将鼓励我继续创作!