当前位置:首页>开发>正文

hadoop如何实现分布计算 如何在hadoop上构建分布式数据库

2023-06-25 20:32:23 互联网 未知 开发

 hadoop如何实现分布计算 如何在hadoop上构建分布式数据库

hadoop如何实现分布计算

hadoop已经实现分布式计算,他本身就有hdfs分布式文件系统,通过master、slave实现分布式计算,计算框架目前包括离线框架MapReduce、实时计算框架storm、离线实时框架spark,分布式存储框架hdfs、分布式消息队列kafka,分布式日志采集工具flume,分布式数据hbase等等。

如何在hadoop上构建分布式数据库

基于hadoop的分布式数据库有hbase。安装hbase除了要安装hadoop外,还要安装Zookeeper。
分布式hbase安装和分布式hadoop安装方法差不多,hbase要有master和regionserver,regionserver相当于slave,你可以在maser上面安装好hbase,然后把它拷贝到其它slave服务器,再修改一些配置

hadoop怎么使用算法

实例一、对以下数据进行排序,根据收入减去支出得到最后结余从大到小排序,数据如下:


SumStep运行之后结果如下:


SortStep运行之后结果为上图根据结余从大到小排序。

代码如下:
[java] view plain copy
public class InfoBean implements WritableComparable{  

private String account  

private double income  

private double expenses  

private double surplus  

public void set(String account, double income, double expenses){  
this.account = account  
this.income = income  
this.expenses = expenses  
this.surplus = income - expenses  
}  

@Override  
public String toString() {  
return this.income   " "   this.expenses   " "   this.surplus  
}  

/** 
* serialize 
*/  
public void write(DataOutput out) throws IOException {  
out.writeUTF(account)  
out.writeDouble(income)  
out.writeDouble(expenses)  
out.writeDouble(surplus)  
}  

/** 
* deserialize 
*/  
public void readFields(DataInput in) throws IOException {  
this.account = in.readUTF()  
this.income = in.readDouble()  
this.expenses = in.readDouble()  
this.surplus = in.readDouble()  
}  
public int compareTo(InfoBean o) {  
if(this.income == o.getIncome()){  
return this.expenses > o.getExpenses() ? 1 : -1   
} else {  
return this.income > o.getIncome() ? -1 : 1  
}  
}  

public String getAccount() {  
return account  
}  

public void setAccount(String account) {  
this.account = account  
}  

public double getIncome() {  
return income  
}  

public void setIncome(double income) {  
this.income = income  
}  

public double getExpenses() {  
return expenses  
}  

public void setExpenses(double expenses) {  
this.expenses = expenses  
}  

public double getSurplus() {  
return surplus  
}  

public void setSurplus(double surplus) {  
this.surplus = surplus  
}  
}  
[java] view plain copy
public class SumStep {  

public static void main(String[] args) throws Exception {  
Configuration conf = new Configuration()  
Job job = Job.getInstance(conf)  

job.setJarByClass(SumStep.class)  

job.setMapperClass(SumMapper.class)  
job.setMapOutputKeyClass(Text.class)  
job.setMapOutputValueClass(InfoBean.class)  
FileInputFormat.setInputPaths(job, new Path(args[0]))  

job.setReducerClass(SumReducer.class)  
job.setOutputKeyClass(Text.class)  
job.setOutputValueClass(InfoBean.class)  
FileOutputFormat.setOutputPath(job, new Path(args[1]))  

job.waitForCompletion(true)  
}  

public static class SumMapper extends Mapper{  

private InfoBean bean = new InfoBean()  
private Text k = new Text()  
@Override  
protected void map(LongWritable key, Text value, Context context)  
throws IOException, InterruptedException {  
// split   
String line = value.toString()  
String[] fields = line.split(" ")  
// get useful field  
String account = fields[0]  
double income = Double.parseDouble(fields[1])  
double expenses = Double.parseDouble(fields[2])  
k.set(account)  
bean.set(account, income, expenses)  
context.write(k, bean)  
}  
}  

public static class SumReducer extends Reducer{  

private InfoBean bean = new InfoBean()  
@Override  
protected void reduce(Text key, Iterable v2s, Context context)  
throws IOException, InterruptedException {  

double in_sum = 0  
double out_sum = 0  
for(InfoBean bean : v2s){  
in_sum  = bean.getIncome()  
out_sum  = bean.getExpenses()  
}  
bean.set("", in_sum, out_sum)  
context.write(key, bean)  
}  

}  
}  

此处的输入为SumStep的输出而不是源文件作为输入,当然也可以将两个job合并到一起执行,此处不再讨论。

[java] view plain copy
public class SortStep {  

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {  
Configuration conf = new Configuration()  
Job job = Job.getInstance(conf)  

job.setJarByClass(SortStep.class)  

job.setMapperClass(SortMapper.class)  
job.setMapOutputKeyClass(InfoBean.class)  
job.setMapOutputValueClass(NullWritable.class)  
FileInputFormat.setInputPaths(job, new Path(args[0]))  

job.setReducerClass(SortReducer.class)  
job.setOutputKeyClass(Text.class)  
job.setOutputValueClass(InfoBean.class)  
FileOutputFormat.setOutputPath(job, new Path(args[1]))  

job.waitForCompletion(true)  

}  

public static class SortMapper extends Mapper{  

private InfoBean bean = new InfoBean()  

@Override  
protected void map(LongWritable key, Text value, Context context)  
throws IOException, InterruptedException {  
String line = value.toString()  
String[] fields = line.split(" ")  
String account = fields[0]  
double income = Double.parseDouble(fields[1])  
double expenses = Double.parseDouble(fields[2])  
bean.set(account, income, expenses)  
context.write(bean, NullWritable.get())  
}  

}  
public static class SortReducer extends Reducer{  

private Text k = new Text()  
@Override  
protected void reduce(InfoBean bean, Iterable v2s, Context context)  
throws IOException, InterruptedException {  
String account = bean.getAccount()  
k.set(account)  
context.write(k, bean)  
}  

}  
}  

实例二、倒排索引,过程如下:
[plain] view plain copy
Map阶段  
<0,"hello tom">  
....  
context.write("hello->a.txt",1)  
context.write("hello->a.txt",1)  
context.write("hello->a.txt",1)  
context.write("hello->a.txt",1)  
context.write("hello->a.txt",1)  

context.write("hello->b.txt",1)  
context.write("hello->b.txt",1)  
context.write("hello->b.txt",1)  
--------------------------------------------------------  
combiner阶段  
<"hello->a.txt",1>  
<"hello->a.txt",1>  
<"hello->a.txt",1>  
<"hello->a.txt",1>  
<"hello->a.txt",1>  

<"hello->b.txt",1>  
<"hello->b.txt",1>  
<"hello->b.txt",1>  

context.write("hello","a.txt->5")  
context.write("hello","b.txt->3")  
--------------------------------------------------------  
Reducer阶段  
<"hello",{"a.txt->5","b.txt->3"}>  
context.write("hello","a.txt->5 b.txt->3")  
-------------------------------------------------------  
hello   "a.txt->5 b.txt->3"  
tom     "a.txt->2 b.txt->1"  
kitty   "a.txt->1"  
.......  
代码如下:
[java] view plain copy
public class InverseIndex {  

public static void main(String[] args) throws Exception {  
Configuration conf = new Configuration()  

Job job = Job.getInstance(conf)  
//设置jar  
job.setJarByClass(InverseIndex.class)  

//设置Mapper相关的属性  
job.setMapperClass(IndexMapper.class)  
job.setMapOutputKeyClass(Text.class)  
job.setMapOutputValueClass(Text.class)  
FileInputFormat.setInputPaths(job, new Path(args[0]))//words.txt  

//设置Reducer相关属性  
job.setReducerClass(IndexReducer.class)  
job.setOutputKeyClass(Text.class)  
job.setOutputValueClass(Text.class)  
FileOutputFormat.setOutputPath(job, new Path(args[1]))  

job.setCombinerClass(IndexCombiner.class)  

//提交任务  
job.waitForCompletion(true)  
}  
public static class IndexMapper extends Mapper{  

private Text k = new Text()  
private Text v = new Text()  
@Override  
protected void map(LongWritable key, Text value,  
Mapper.Context context)  
throws IOException, InterruptedException {  
String line = value.toString()  
String[] fields = line.split(" ")  
FileSplit inputSplit = (FileSplit) context.getInputSplit()  
Path path = inputSplit.getPath()  
String name = path.getName()  
for(String f : fields){  
k.set(f   "->"   name)  
v.set("1")  
context.write(k, v)  
}  
}  

}  
public static class IndexCombiner extends Reducer{  

private Text k = new Text()  
private Text v = new Text()  
@Override  
protected void reduce(Text key, Iterable values,  
Reducer.Context context)  
throws IOException, InterruptedException {  
String[] fields = key.toString().split("->")  
long sum = 0  
for(Text t : values){  
sum  = Long.parseLong(t.toString())  
}  
k.set(fields[0])  
v.set(fields[1]   "->"   sum)  
context.write(k, v)  
}  

}  
public static class IndexReducer extends Reducer{  

private Text v = new Text()  
@Override  
protected void reduce(Text key, Iterable values,