Job Committer

关于什么是Job committer,这里不细说,可以参考一下这里:https://baifachuan.com/posts/93d12073.html

这里说的不走job committer是指的是不支持通过scheme的方式去产生committer,而是走了默认的hdfs的committer,这对于对象存储来说,是有影响的。

当然distcp的实现,本身逻辑还是很严谨的,例如它都提供了断点续传,distcp有个overwrite,如果这个开关没有打开,对于同一个path进行多次distcp的时候,它自动会检查target是否已经存在,如果存在且和source一致会skip。

而skip的逻辑是sameLength && sameBlockSize && checksums (这一个可以通过skipCrc=true来跳过,只检查前者,可以提升速度)。

不过这个功能实际上与文件语义是冲突的,这个后面会提到。

对于Hadoop来说,可以通过put和distcp进行文件的传输,put是把本地文件丢到分布式文件系统上,distcp是在不同的scheme之间拷贝。

他俩的逻辑类似,基本上是先产生临时文件,再rename过去,例如:

hadoop fs -put a hdfs://xx/dir

实际的执行步骤是在hdfs://xx/dir下产生a.copy,再rename成a,对于hdfs来说,rename是个成本较低的操作,但是对于对象存储来说,这个操作成本非常高,而put和distcp各自提供了一个能力。

put可以通-d,distcp可以用-direct,例如:

hadoop distcp -direct hdfs://emr-master-1:8020/test40G tos://fcbai/hive/distcp

这样的话,它便不会产生临时文件,也就不会有rename过程,可以大幅提高写入速度。对于distcp来说,如果不添加-direct参数,那么会直接创建临时文件,再rename:


如果使用-direct 则会直接写目标地方:

所以对于distcp来说,job committer 是无效的。

DistCp在构造Job的时候,使用的CopyOutputFormat,在new API里面committer是通过OutputFormat给创造出来的:

CopyOutputFormat是直接返回了FileOutput committer:

所以 没有走那个基于schema的初始化committer的逻辑,比如FileOutputFormat:

会基于不同的schema去创建不同的factory,然后创建不同的committer。

默认的这种实现,实际上是有问题的,因为无法保障原子性,比如一旦作业失败,那么会出现部分文件copy成功,而部分文件copy失败,因为hdfs的rename很短,所以这个问题的概率会变小,但是理论上会存在。

如果是对象存储,则这个问题会变大,所以应该考虑使用自己的committer去包装,解决一致性的问题,确保copy是个原子动作。

附录-MR作业基础知识

创建Mapper类

/**
* 4个泛型中,前两个指定mapper输入数据的类型,KEYIN是输入key的类型,VALUEIN是输入的value的类型
* map和reduce的数据输入输出都是以key-value对的形式封装的
* 默认情况下,框架传递给我们的mapper的输入数据中,key是要处理的文本中一行的起始偏移量,这一行的内容作为value
*/
public class WCMapper extends Mapper<LongWritable, Text, Text, LongWritable> {

//mapreduce框架每读一行数据,就调用一次该方法
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

//具体业务逻辑就写在这个方法体中,而且我们业务要处理的数据已经被框架传递进来,在方法的参数中key-value
// key是这一行数据的起始偏移量, value是这一行的文本内容

String line = value.toString();

String words[] = StringUtils.split(line, ' ');

//遍历单词数组为key、value形式
for (String word : words) {

context.write(new Text(word), new LongWritable(1));

}
}
}

创建Reduce类

public class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable> {

//框架的map处理完成之后,将所有kv对缓存起来后,进行分组,然后传递一个组<key, value[]>,调用一次reduce方法

@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
long count = 0;
for (LongWritable value : values) {
count += value.get();
}
context.write(key, new LongWritable(count));
}
}

创建Job

/**
* 用来测试一个特定的作业
* 比如,该作业使用哪个类作为逻辑处理中的map,哪个作为reduce
* 还可以指定该作业要处理的数据所在的路径
* 还可以指定作业输出的结果放到哪个路径
*/
public class WCRunner {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();

Job job = Job.getInstance(conf);

//设置整个job所用的哪些类在哪个jar包
job.setJarByClass(WCRunner.class);

//本作业job使用的的mapper和reducer的类
job.setMapperClass(WCMapper.class);
job.setReducerClass(WCReducer.class);

//指定reduce的输出数据kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);

//指定mapper的输出数据kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);

//指定原始数据存放在哪里
FileInputFormat.setInputPaths(job, new Path("hdfs://master:9000/mr/input/"));

//指定处理结果的输出数据存放路径
FileOutputFormat.setOutputPath(job, new Path("hdfs://master:9000/mr/output/"));

//将job提交给集群运行
job.waitForCompletion(true);
}
}

接下来打包上传执行。


扫码手机观看或分享: