?个完整的hadoop程序开发过程
Hadoop的各版本一定要匹配
1. 在namenode节点配置完成hadoop以后,需要?scp把hadoop复制到datanode节点,为了?便,最好全部机器的路径都是?样的,?如都
在/opt/hadoop-0.20.2中。
2. conf?录下的masters?件要把默认的localhost改成namenode节点的主机名或IP地址,Slaves?件中,要把localhost改成datanode节点的主机名或IP
eclipse的hadoop插件配置
hadoop-0.20.2-eclipse-plugin.jar是?个 eclipse中的hadoop插件。
它的作?是实现了HDFS的可视化操作,如果没有它,就要在?量地在终端输?命令,每个命令都是以bin/hadoop dfs开头。
下?简单说?下配置过程:
eclipse和hadoop-eclipse-plugin这套插件的版本要求?常?,?定要?度匹配
1.下载hadoop-0.20.2-eclipse-plugin.jar
2.把此jar放到eclipse插件?录下,?般是plugins?录
重新启动eclipse,如果版本正确,此时在eclipse中的project exporer中应该可以看到DFS Locations项。如果没有出现,很可能是版本的问题。
3.配置Hadoop所在?录。eclipse-->window菜单-->Preferences-->Hadoop Map/Reduce,右侧输?或选择你的Hadoop?录
4.显?Map/Reduce Locations窗?。eclipse-->window菜单-->Open Perspective-->Other,选择蓝?的?象图标Map/Reduce,会在下?出黄?的?象窗?,Map/Reduce Locations
5.配置Hadoop Location。Map/Reduce Locations中右键,New Hadoop Location,出现配置窗?,location name随便你写。下?的Map/Reduce Master 框中的host,如果是分布式就?IP或主机名,不要?默认的localhost。port改成9000。DFS Master框中的Use M/R Master host默认打勾保持不变,下?的Port改成9001 。user name ?般默认中不中,
?此,eclipse的hadoop插件就配置完成了。
编写程序
1.打开eclipse,新建java项?。右键项?,properties,Java Builder Path,Libraries,Add External JARS,找到hadoop的?录,把根?录下的?个jar 包都添加进来。
2.新建类,Score_process.java,编写以下代码:
package pkg1;
import java.net.URI;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class Score_process extends Configured implements Tool {
//内部类Map
public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
//map?法
public void map(LongWritable key, Text value, Context context) throws java.io.IOException ,InterruptedException {
System.out.println("key值:" + key);
String line = value.toString();//将输?的纯?本?件的数据转化为string
//将输?的数据按?分割
StringTokenizer tokenizerArticle = new StringTokenizer(line, "\n");
//分别对每??进?处理
while (tokenizerArticle.hasMoreTokens()) {
//每?按空格划分
StringTokenizer tokenizerLine = new StringTokenizer(tokenizerArticle.nextToken());
String nameString = tokenizerLine.nextToken();
String scoreString = tokenizerLine.nextToken();
Text name = new Text(nameString);
int scoreInt = Integer.parseInt(scoreString);
context.write(name, new IntWritable(scoreInt));//输出姓名和成绩
}
};
}
//内部类Reduce
public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
//reduce?法
public void reduce(Text key, java.lang.Iterable<IntWritable> values, Context context) throws java.io.IOException ,InterruptedException {
int sum=0;
int count=0;
Iterator<IntWritable> iterator = values.iterator();
while (iterator.hasNext()) {
sum += iterator.next().get();
count++;
}
int average = (int)sum/count;
context.write(key, new IntWritable(average));
};
}
public int run(String[] args) throws Exception {
Configuration configuration = getConf();
//configuration.set("mapred", "Score_Process.jar");
//准备环境,删除已经存在的output2?录,保证输出?录不存在**开始************
final String uri = "hdfs://192.168.1.8:9000/";
FileSystem fs = FileSystem.get(URI.create(uri),configuration);
final String path = "/user/grid/output2";
boolean exists = fs.exists(new Path(path));
if(exists){
fs.delete(new Path(path),true);
}
//准备环境,删除已经存在的output2?录,保证输出?录不存在**结束************
Job job= new Job(configuration);
job.setJobName("Score_process");
job.setJarByClass(Score_process.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(Map.class);
job.setCombinerClass(Reduce.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
// System.out.println(new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean success = job.waitForCompletion(true);
return success ? 0:1;
}
public static void main(String[] args) throws Exception {
int ret = ToolRunner.run(new Score_process1(), args);
System.exit(ret);
}
}
Map是处理输?参数中给定的?本?件,处理完毕后,输出到HDFS,供reduce调?。 context.write(name, new IntWritable(scoreInt));这?句是关键。Reduce调?map?法的结果,reduce后,写到OS?件系统。context.write(key, new IntWritable(average));这?句是关键。
整个run?法,需要改的只有setJobName和setJarByClass类的名字,其他的不?动。
整个main?法,不?动。
编译
终端中输?
javac -classpath /opt/hadoop-0.20.2/hadoop-0.20.2-core.jar -d ~/allTest/ScoreProcessFinal/class ~/workspace-indigo/test5/src/pkg1/Score_process.java 如果没有报错,就说明编译成功。
打包
jar -cvf ~/allTest/ScoreProcessFinal/ScoreProcessFinal.jar -C ~/allTest/ScoreProcessFinal/class .
可以?以下命令查看包?的?件:
jar -vtf ~/allTest/ScoreProcessFinal/ScoreProcessFinal.jar
执?
执?可以分为两种?式,?种在eclipse中,另?种在终端。
eclipse中运?
配置运?参数。run configurations,arguments,Program arguments:
?本框中输?:hdfs://host-thinkpad:9000/user/grid/input2 hdfs://host-thinkpad:9000/user/grid/output2
就是输??录和输出?录,注意中间有个空格。
终端中运?
/opt/hadoop-0.20.2/bin/hadoop jar ~/allTest/ScoreProcessFinal/ScoreProcessFinal.jar pkg1.Score_process1 input2 output2这就是hadoop开发的全过程框架。