当前位置:首页>开发>正文

hadoop的reduce是怎么排序的 请简述hadoop怎样实现二级排序

2023-04-29 06:44:16 互联网 未知 开发

 hadoop的reduce是怎么排序的 请简述hadoop怎样实现二级排序

hadoop的reduce是怎么排序的

include int main(){ int arr1[20], arr2[20], sum[20] = {0} int count = 0, a, b, i, temp scanf("%d %d", &a, &b) while (a != 0 || b != 0) { arr1[count] = a % 10 arr2[count] = b % 10 a /= 10 b /= 10 count } for (i = 0 i < count i ) { temp = arr1[i] arr2[i] sum[i] = temp % 10 sum[i 1] = temp / 10 } if (sum[count]) printf("%d", sum[count]) else printf("%d", sum[count - 1]) return 0}把两个数拆开,每个位相加,处理进位。因为这是加法,所以结果的长度和两数中最长的那个相同,或者比它大1,if判断一下哪个是最高位就行了。sum存储的是两数和的每一位,它的所有元素的初始值都被置为0.

请简述hadoop怎样实现二级排序

我不是高手,但我可以告诉你我怎么学习。①选择一个Hadoop的版本,然后阅读文档了解Hadoop:Whats Hadoop, Why Hadoop exists;②安装Hadoop,三种方式都试下;③在Hadoop文档里面有Hadoop Command的资料,I.hdfs command,II.job command,尽量试...

hadoop怎么使用算法

实例一、对以下数据进行排序,根据收入减去支出得到最后结余从大到小排序,数据如下:


SumStep运行之后结果如下:


SortStep运行之后结果为上图根据结余从大到小排序。

代码如下:
[java] view plain copy
public class InfoBean implements WritableComparable{  

private String account  

private double income  

private double expenses  

private double surplus  

public void set(String account, double income, double expenses){  
this.account = account  
this.income = income  
this.expenses = expenses  
this.surplus = income - expenses  
}  

@Override  
public String toString() {  
return this.income   " "   this.expenses   " "   this.surplus  
}  

/** 
* serialize 
*/  
public void write(DataOutput out) throws IOException {  
out.writeUTF(account)  
out.writeDouble(income)  
out.writeDouble(expenses)  
out.writeDouble(surplus)  
}  

/** 
* deserialize 
*/  
public void readFields(DataInput in) throws IOException {  
this.account = in.readUTF()  
this.income = in.readDouble()  
this.expenses = in.readDouble()  
this.surplus = in.readDouble()  
}  
public int compareTo(InfoBean o) {  
if(this.income == o.getIncome()){  
return this.expenses > o.getExpenses() ? 1 : -1   
} else {  
return this.income > o.getIncome() ? -1 : 1  
}  
}  

public String getAccount() {  
return account  
}  

public void setAccount(String account) {  
this.account = account  
}  

public double getIncome() {  
return income  
}  

public void setIncome(double income) {  
this.income = income  
}  

public double getExpenses() {  
return expenses  
}  

public void setExpenses(double expenses) {  
this.expenses = expenses  
}  

public double getSurplus() {  
return surplus  
}  

public void setSurplus(double surplus) {  
this.surplus = surplus  
}  
}  
[java] view plain copy
public class SumStep {  

public static void main(String[] args) throws Exception {  
Configuration conf = new Configuration()  
Job job = Job.getInstance(conf)  

job.setJarByClass(SumStep.class)  

job.setMapperClass(SumMapper.class)  
job.setMapOutputKeyClass(Text.class)  
job.setMapOutputValueClass(InfoBean.class)  
FileInputFormat.setInputPaths(job, new Path(args[0]))  

job.setReducerClass(SumReducer.class)  
job.setOutputKeyClass(Text.class)  
job.setOutputValueClass(InfoBean.class)  
FileOutputFormat.setOutputPath(job, new Path(args[1]))  

job.waitForCompletion(true)  
}  

public static class SumMapper extends Mapper{  

private InfoBean bean = new InfoBean()  
private Text k = new Text()  
@Override  
protected void map(LongWritable key, Text value, Context context)  
throws IOException, InterruptedException {  
// split   
String line = value.toString()  
String[] fields = line.split(" ")  
// get useful field  
String account = fields[0]  
double income = Double.parseDouble(fields[1])  
double expenses = Double.parseDouble(fields[2])  
k.set(account)  
bean.set(account, income, expenses)  
context.write(k, bean)  
}  
}  

public static class SumReducer extends Reducer{  

private InfoBean bean = new InfoBean()  
@Override  
protected void reduce(Text key, Iterable v2s, Context context)  
throws IOException, InterruptedException {  

double in_sum = 0  
double out_sum = 0  
for(InfoBean bean : v2s){  
in_sum  = bean.getIncome()  
out_sum  = bean.getExpenses()  
}  
bean.set("", in_sum, out_sum)  
context.write(key, bean)  
}  

}  
}  

此处的输入为SumStep的输出而不是源文件作为输入,当然也可以将两个job合并到一起执行,此处不再讨论。

[java] view plain copy
public class SortStep {  

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {  
Configuration conf = new Configuration()  
Job job = Job.getInstance(conf)  

job.setJarByClass(SortStep.class)  

job.setMapperClass(SortMapper.class)  
job.setMapOutputKeyClass(InfoBean.class)  
job.setMapOutputValueClass(NullWritable.class)  
FileInputFormat.setInputPaths(job, new Path(args[0]))  

job.setReducerClass(SortReducer.class)  
job.setOutputKeyClass(Text.class)  
job.setOutputValueClass(InfoBean.class)  
FileOutputFormat.setOutputPath(job, new Path(args[1]))  

job.waitForCompletion(true)  

}  

public static class SortMapper extends Mapper{  

private InfoBean bean = new InfoBean()  

@Override  
protected void map(LongWritable key, Text value, Context context)  
throws IOException, InterruptedException {  
String line = value.toString()  
String[] fields = line.split(" ")  
String account = fields[0]  
double income = Double.parseDouble(fields[1])  
double expenses = Double.parseDouble(fields[2])  
bean.set(account, income, expenses)  
context.write(bean, NullWritable.get())  
}  

}  
public static class SortReducer extends Reducer{  

private Text k = new Text()  
@Override  
protected void reduce(InfoBean bean, Iterable v2s, Context context)  
throws IOException, InterruptedException {  
String account = bean.getAccount()  
k.set(account)  
context.write(k, bean)  
}  

}  
}  

实例二、倒排索引,过程如下:
[plain] view plain copy
Map阶段  
<0,"hello tom">  
....  
context.write("hello->a.txt",1)  
context.write("hello->a.txt",1)  
context.write("hello->a.txt",1)  
context.write("hello->a.txt",1)  
context.write("hello->a.txt",1)  

context.write("hello->b.txt",1)  
context.write("hello->b.txt",1)  
context.write("hello->b.txt",1)  
--------------------------------------------------------  
combiner阶段  
<"hello->a.txt",1>  
<"hello->a.txt",1>  
<"hello->a.txt",1>  
<"hello->a.txt",1>  
<"hello->a.txt",1>  

<"hello->b.txt",1>  
<"hello->b.txt",1>  
<"hello->b.txt",1>  

context.write("hello","a.txt->5")  
context.write("hello","b.txt->3")  
--------------------------------------------------------  
Reducer阶段  
<"hello",{"a.txt->5","b.txt->3"}>  
context.write("hello","a.txt->5 b.txt->3")  
-------------------------------------------------------  
hello   "a.txt->5 b.txt->3"  
tom     "a.txt->2 b.txt->1"  
kitty   "a.txt->1"  
.......  
代码如下:
[java] view plain copy
public class InverseIndex {  

public static void main(String[] args) throws Exception {  
Configuration conf = new Configuration()  

Job job = Job.getInstance(conf)  
//设置jar  
job.setJarByClass(InverseIndex.class)  

//设置Mapper相关的属性  
job.setMapperClass(IndexMapper.class)  
job.setMapOutputKeyClass(Text.class)  
job.setMapOutputValueClass(Text.class)  
FileInputFormat.setInputPaths(job, new Path(args[0]))//words.txt  

//设置Reducer相关属性  
job.setReducerClass(IndexReducer.class)  
job.setOutputKeyClass(Text.class)  
job.setOutputValueClass(Text.class)  
FileOutputFormat.setOutputPath(job, new Path(args[1]))  

job.setCombinerClass(IndexCombiner.class)  

//提交任务  
job.waitForCompletion(true)  
}  
public static class IndexMapper extends Mapper{  

private Text k = new Text()  
private Text v = new Text()  
@Override  
protected void map(LongWritable key, Text value,  
Mapper.Context context)  
throws IOException, InterruptedException {  
String line = value.toString()  
String[] fields = line.split(" ")  
FileSplit inputSplit = (FileSplit) context.getInputSplit()  
Path path = inputSplit.getPath()  
String name = path.getName()  
for(String f : fields){  
k.set(f   "->"   name)  
v.set("1")  
context.write(k, v)  
}  
}  

}  
public static class IndexCombiner extends Reducer{  

private Text k = new Text()  
private Text v = new Text()  
@Override  
protected void reduce(Text key, Iterable values,  
Reducer.Context context)  
throws IOException, InterruptedException {  
String[] fields = key.toString().split("->")  
long sum = 0  
for(Text t : values){  
sum  = Long.parseLong(t.toString())  
}  
k.set(fields[0])  
v.set(fields[1]   "->"   sum)  
context.write(k, v)  
}  

}  
public static class IndexReducer extends Reducer{  

private Text v = new Text()  
@Override  
protected void reduce(Text key, Iterable values,  

hadoop 1.x 中Reduce的输出结果排序?

[postbg]bg4.png[/postbg]你可以使用Map/Reduce的GroupingComparator来进行排序,具体可以参考Map/Reduce的GroupingComparator

hadoop二次排序什么时候用

实现简要步骤为
1. 构造(用户标识,时间)作为key, 时间和其他信息(比如访问页面)作为value,然后进入map流程
2. 在缺省的reduce的,传入参数为 单个key和value的集合,这会导致相同的用户标识和相同的时间被分在同一组,比如用户标识为11111的 1点00一个reduce, 用户标识为11111的 1点01另外一组,这不符合要求.所以需要更改缺省分组,需要由原来的按(用户标识,时间)改成按(用户标识)分组就行了。这样reduce是传入参数变为
户标识为11111 的value集合为(1点00 访问页面page1, 1点01 访问页面page2, 1点05 访问页面page3),然后在reduce方法里写自己的统计逻辑就行了。
3. 当然1和2步之间,有2个重要细节要处理:确定key的排序规则和确定分区规则(分区规则保证map后分配数据到reduce按照用户标识来散列,而不是按缺省的用户标识 时间来散列)

最新文章