您现在的位置 >> Hadoop教程 >> Hadoop实战 >> hadoop专题  
 

hadoop简单实现文本数据全局排序

【作者:Hadoop实战专家】【关键词:可以看到 hadoop 数据 中间结果 数据量 】 【点击:49667次】【2013-08-0】
(HDFS? 需要改动的只有一处,设置JAVA_HOME。 关于配置文件的详细信息可以在http://hadoop.apache.org/common/docs/stable/cluster_setup.html中找到。input;hadoop利用shuffle操作将这N+1份数据自动排序,reduce操作只需要接收中间结果后直接输出到文件即可。  

相关热门搜索:

大数据标签:hadoop hdfs mapreduce mahout bigdata

之前在实现一些机器学习算法时,跑数据量、feature很多的数据集往往要耗费很多时间,尤其是处理大量文本数据时候,单机跑算法的时间让我无法容忍,理论上如果合理的将大数据量分布式并行计算框架(例如hadoop)应用到这些算法上能有效提高算法执行速度(当然,要求算法本身可以全部或部分进行并行化处理),下一步想要学习Mahout(http://mahout.apache.org/),它的目标是:build scalable machine learning libraries,它是基于hadoop的,所以在此之前需要学习一下Hadoop,先从hadoop搭建开始吧。

1、硬件、软件准备

手头上有三台配置一样的电脑,就不去装虚拟机了,配置如下:

CPU:Intel(R) Pentium(R) Dual CPU E2200 @ 2.20GHz

Memory:2001MiB

Network:NetLink BCM5786 Gigabit Ethernet

三台电脑装有相同的操作系统——Ubuntu 11.04

2、安装过程

任选一台机器作为master,其他机器作为slaves,所有机器拥有相同的用户、相同的环境变量配置、相同的hadoop目录结构、相同的Java目录结构。

(1)、更改host文件

master机器:在终端执行:sudo gedit /etc/hosts,添加以下信息:

172.22.9.209 namenode-m

172.22.9.185 datanode-1

172.22.9.220 datanode-2

slaves机器:处理方式类似。

(2)、安装SSH

1)、为所有机器安装ssh:在终端运行:sudo apt-get install ssh,查看/leozhang目录下是否有.ssh文件夹(需要View->Show Hidden Files才能看见隐藏文件),如果没有,在终端运行:sudo mkdir .ssh;

2)、在终端运行:

cd .ssh

#生成公钥、私钥密钥对

ssh-keygen #一直回车

#将公钥内容复制到authorized_keys文件
cp id_rsa.pub authorized_keys

#设定authorized_keys文件属性为-rw-r–r–,即文件属主拥有读写权限,与文件属主同组的用户拥有读权限,其他人拥有读权限。

chmod 644 authorized_keys

#将公钥拷贝到slaves

scp authorized_keys datanode-1:/home/leozhang/.ssh #这里也可以是scp authorized_keys leozhang@datanode-1:/home/leozhang/.ssh

scp authorized_keys datanode-2:/home/leozhang/.ssh #同上

最后测试设置是否成功,如:ssh datanode-1,如果不用输入密码就能登录,说明设置成功。

(3)、下载并配置jdk

1)、从http://www.oracle.com/technetwork/java/javase/downloads/java-se-jdk-7-download-432154.html下载jdk-7-linux-i586.tar.gz,解压后得到文件夹:jdk1.7.0,(例如下载并解压到了:/home/leozhang/Downloads);

2)、在所有机器上做如下操作:在/usr建立文件夹java:在终端执行:sudo mkdir /usr/java,并将jdk1.7.0拷贝到java文件夹:进入/home/leozhang/Downloads目录,在终端执行sudo mv jdk1.7.0 /usr/java;

3)、在终端执行:sudo gedit /etc/profile,在文件末尾添加:

JAVA_HOME=”/usr/java/jdk1.7.0″

export JAVA_HOME

PATH=$JAVA_HOME/bin:$PATH

export PATH

CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$CLASSPATH

export CLASSPATH

4)、在终端执行:

cd /usr/java

scp -r java1.7.0 leozhang@datanode-1:/usr/java

scp -r java1.7.0 leozhang@datanode-2:/usr/java

(4)、下载并配置hadoop

hadoop包含三个部分:

Hadoop Common: The common utilities that support the other Hadoop subprojects.

Hadoop Distributed File System (HDFS?): A distributed file system that provides high-throughput access to application data.

Hadoop MapReduce: A software framework for distributed processing of large data sets on compute clusters.

1)、从http://labs.renren.com/apache-mirror//hadoop/core/hadoop-0.20.204.0/下载hadoop-0.20.204.0.tar.gz,解压到home/leozhang中并重命名为hadoop;

2)、在终端执行:sudo gedit /etc/profile,在文件末尾添加:

HADOOP_HOME=/home/leozhang/hadoop

export HADOOP_HOME

export HADOOP=$HADOOP_HOME/bin

export PATH=$HADOOP:$PATH

3)、hadoop配置文件

在hadoop文件夹中有一个conf文件夹,里面是hadoop所需的配置文件,主要关注的有以下几个:

●hadoop-env.sh

需要改动的只有一处,设置JAVA_HOME。

# The java implementation to use. Required.
export JAVA_HOME=/usr/java/jdk1.7.0

●core-site.xml

fs.default.name指出NameNode所在的地址,NameNode要跑在master机器上。


< property>
< name>fs.default.name
< value>hdfs://172.22.9.209:9000
< /property>


< name>hadoop.logfile.size
< value>10000000
< description>The max size of each log file
< /property>


< name>hadoop.logfile.count
< value>10
< description>The max number of log files
< /property>



●hdfs-site.xml

dfs.replication默认是3,如果DataNode个数小于3会报错。


< property>
< name>dfs.replication
< value>2
< /property>
< /configuration>

●mapred-site.xml

mapred.job.tracker指出jobtracker所在地址,其它项不去配置则都为默认值。


< property>
< name>mapred.job.tracker
< value>172.22.9.209:9001
< /property>
< /configuration>

关于配置文件的详细信息可以在http://hadoop.apache.org/common/docs/stable/cluster_setup.html中找到。

●masters

172.22.9.209

●slaves

172.22.9.185
172.22.9.220

4)、在终端执行:

cd /home/leozhang

scp -r hadoop leozhang@datanode-1:/home/leozhang

scp -r hadoop leozhang@datanode-2:/home/leozhang
5)、在终端执行:source /etc/profile,如果不管用就注销然后重新登录。

3、数据全局排序

(1)、工具准备

需要下载eclipse,地址是http://www.eclipse.org/downloads/,也可以在终端运行sudo apt-get install eclipse,可以装个mapreduce的插件,方便在单机调试代码,那个插件在下载的hadoop的目录里,如:/home/leozhang/hadoop/contrib/eclipse-plugin/hadoop-eclipse-plugin-0.20.204.0.jar,把它拷贝到eclipse安装目录的plugins文件夹中即可。

(2)、启动hadoop

第一次使用需要初始化NameNode,在master机器的终端上执行:hadoop namenode -format;

在master机器的终端上执行:start-all.sh,可以用jps来查看本机的java进程,在master上启动了3个进程:JobTracker、SecondaryNameNode、NameNode,而slaves机器上有2个进程:TaskTracker、DataNode;需要停止进程,只要在master机器的终端上执行:stop-all.sh。

在http://localhost:50070/可以看到NameNode的详细信息,如:

在http://localhost:50030可以看到作业的详细信息,如:

(3)、关于mapreduce

mapreduce很适合数据之间相关性较低且数据量庞大的情况,map操作将原始数据经过特定操作打散后输出,作为中间结果,hadoop通过shuffle操作对中间结果排序,之后,reduce操作接收中间结果并进行汇总操作,最后将结果输出到文件中,从这里也可以看到在hadoop中,hdfs是mapreduce的基石。可以用下面这幅图描述map和reduce的过程:

有人用这么一句话解释mapreduce:

We want to count all the books in the library. You count up shelf #1, I count up shelf #2. That’s map. The more people we get, the faster it goes.
我们要数图书馆中的所有书。你数1号书架,我数2号书架。这就是“Map”。我们人越多,数书就更快。

Now we get together and add our individual counts. That’s reduce.
现在我们到一起,把所有人的统计数加在一起。这就是“Reduce”。

(4)、数据准备

将待排序文本上传到hdfs上并放在input文件夹中,在终端执行:hadoop dfs –mkdir input;

假设数据文件data.txt放在本地磁盘的/home/leozhang/testdata中,在终端执行:cd /home/leozhang/testdata;hadoop dfs –put data input/

(5)、排序思路

借鉴快速排序的思路:假设为升序排序,那么每完成一次partition,pivot左边所有元素的值都小于等于pivot,而pivot右边的所有元素的值都大于等于pivot,如果现在有N个pivot,那么数据就被map成了N+1个区间,让reducer个数等于N+1,将不同区间的数据发送到相应区间的reducer;hadoop利用shuffle操作将这N+1份数据自动排序,reduce操作只需要接收中间结果后直接输出到文件即可。

由此归纳出用hadoop对大量数据排序的步骤:

1)、对待排序数据进行抽样;

2)、对抽样数据进行排序,产生pivot(例如得到的pivot为:3,9,11);

3)、Map对输入的每条数据计算其处于哪两个pivot之间,之后将数据发给相应的reduce(例如区间划分为:<3、[3,9)、>=9,分别对应reducer0、reducer1、reducer2);

4)、Reduce将获得数据直接输出。

(6)、简单实现

数据抽样由:RandomSelectMapper和RandomSelectReducer完成,数据划分由ReducerPatition完成,排序输出由SortMapper和SortReducer完成,执行顺序为:RandomSelectMapper –> RandomSelectReducer –> SortMapper –> SortReducer。

这个实现方式总觉得不给力,尤其是数据划分那块儿,不知道大家会怎么做,指导一下我吧,呵呵。代码可以从这里得到。

1)、pivot的选取采用随机的方式:

   1: package MRTEST.Sort;   2:   3: import java.io.IOException;   4: import java.util.Random;   5: import java.util.StringTokenizer;   6:   7: import org.apache.hadoop.io.Text;   8: import org.apache.hadoop.mapreduce.Mapper;   9:  10: public class RandomSelectMapper  11:         extends Mapper{  12:     private static int currentSize = 0;  13:     private Random random = new Random();  14:  15:     public void map(Object key, Text value, Context context)  16:         throws IOException, InterruptedException{  17:         StringTokenizer itr = new StringTokenizer(value.toString());  18:         while(itr.hasMoreTokens()){  19:             currentSize++;  20:             Random ran = new Random();  21:             if(random.nextInt(currentSize) == ran.nextInt(1)){  22:                   Text v = new Text(itr.nextToken());  23:                     context.write(v, v);  24:             }  25:             else{  26:                 itr.nextToken();  27:             }  28:         }  29:     }  30:  31: }

pivot的排序由hadoop完成:

   1: package MRTEST.Sort;   2:   3: import java.io.IOException;   4:   5: import org.apache.hadoop.io.Text;   6: import org.apache.hadoop.mapreduce.Reducer;   7:   8: public class RandomSelectReducer   9:         extends Reducer{  10:  11:     public void reduce(Text key, Iterable values, Context context)  12:         throws IOException, InterruptedException{  13:  14:         for (Text data : values) {  15:             context.write(null,data);  16:             break;  17:         }  18:     }  19: }

2)、SortMapper直接读取数据:

   1: package MRTEST.Sort;   2:   3: import java.io.IOException;   4: import java.util.StringTokenizer;   5:   6: import org.apache.hadoop.io.Text;   7: import org.apache.hadoop.mapreduce.Mapper;   8:   9: public class SortMapper  10:         extends Mapper {  11:  12:     public void map(Object key, Text values,  13:             Context context) throws IOException,InterruptedException {  14:           StringTokenizer itr = new StringTokenizer(values.toString());  15:           while (itr.hasMoreTokens()) {  16:               Text v = new Text(itr.nextToken());  17:                 context.write(v, v);  18:         }  19:     }  20:  21: }

向相应的Reducer分发数据:

   1: package MRTEST.Sort;   2:   3: import org.apache.hadoop.io.Text;   4: import org.apache.hadoop.mapreduce.Partitioner;   5:   6: public class ReducerPartition   7:         extends Partitioner{   8:   9:     public int getPartition(Text key, Text value ,int numPartitions){  10:         return HadoopUtil.getReducerId(value, numPartitions);  11:     }  12: }

最后由SortReducer输出结果:

   1: package MRTEST.Sort;   2:   3: import java.io.IOException;   4:   5:   6: import org.apache.hadoop.io.Text;   7: import org.apache.hadoop.mapreduce.Reducer;   8:   9: public class SortReducer  10:         extends Reducer {  11:  12:     public void reduce(Text key, Iterable values,  13:             Context context) throws IOException, InterruptedException {  14:  15:         for (Text data : values) {  16:             context.write(key,data);  17:         }  18:     }  19: }

3)、作业的组织由SortDriver完成:

   1: package MRTEST.Sort;   2:   3: import java.io.IOException;   4:   5: import org.apache.hadoop.conf.Configuration;   6: import org.apache.hadoop.fs.Path;   7: import org.apache.hadoop.io.Text;   8: import org.apache.hadoop.mapreduce.Job;   9: import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  10: import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  11: import org.apache.hadoop.util.GenericOptionsParser;  12:  13:  14: public class SortDriver {  15:  16:     public static void runPivotSelect(Configuration conf,  17:                                       Path input,  18:                                       Path output) throws IOException, ClassNotFoundException, InterruptedException{  19:  20:         Job job = new Job(conf, "get pivot");  21:         job.setJarByClass(SortDriver.class);  22:         job.setMapperClass(RandomSelectMapper.class);  23:         job.setReducerClass(RandomSelectReducer.class);  24:         job.setOutputKeyClass(Text.class);  25:         job.setOutputValueClass(Text.class);  26:         FileInputFormat.addInputPath(job, input);  27:         FileOutputFormat.setOutputPath(job, output);  28:         if(!job.waitForCompletion(true)){  29:             System.exit(2);  30:         }  31:     }  32:  33:     public static void runSort(Configuration conf,  34:                                Path input,  35:                                Path partition,  36:                                Path output) throws IOException, ClassNotFoundException, InterruptedException{  37:         Job job = new Job(conf, "sort");  38:         job.setJarByClass(SortDriver.class);  39:         job.setMapperClass(SortMapper.class);  40:         job.setCombinerClass(SortReducer.class);  41:         job.setPartitionerClass(ReducerPartition.class);  42:         job.setReducerClass(SortReducer.class);  43:         job.setOutputKeyClass(Text.class);  44:         job.setOutputValueClass(Text.class);  45:         HadoopUtil.readPartition(conf, new Path(partition.toString() + "\\part-r-00000"));  46:         job.setNumReduceTasks(HadoopUtil.pivots.size());  47:         FileInputFormat.addInputPath(job, input);  48:         FileOutputFormat.setOutputPath(job, output);  49:  50:         System.exit(job.waitForCompletion(true) ? 0 : 1);  51:     }  52:  53:     public static void main(String[] args) throws Exception {  54:         Configuration conf = new Configuration();  55:         String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();  56:         if (otherArgs.length != 3) {  57:           System.err.println("Usage: sort   ");  58:           System.exit(2);  59:         }  60:  61:         Path input = new Path(otherArgs[0]);  62:         Path partition = new Path(otherArgs[1]);  63:         Path output = new Path(otherArgs[2]);  64:  65:         HadoopUtil.delete(conf, partition);  66:         HadoopUtil.delete(conf, output);  67:  68:         SortDriver.runPivotSelect(conf,input,partition);  69:         SortDriver.runSort(conf,input, partition, output);  70:     }  71: }

(7)、打包并测试

在master机器上,单击eclipse的File菜单中的Export,选择Java –> JAR file,单击Next,在左边树形结构中把你想打包的文件勾选,单击Next,再单击Next,在Main class里选择应用程序入口(可选项),最后点Finish,可以看到一个jar文件,例如:Sort.jar。

进入Sort.jar所在路径,在终端输入:hadoop jar Sort.jar input partition output

(8)、查看结果

在http://localhost:50030中可以跟踪所有作业的执行情况。

在hdfs上查看结果,终端输入:hadoop dfs –cat output/*,或者将hdfs上的文件抓到本地查看:hadoop dfs –get output output。

? hadoop streaming例子Hadoop源代码eclipse编译教程 ?

没有评论▼

点击这里取消回复。
发表评论

验证码 *

姓名 (必填)

电子邮件 (不会被公开) (必填)

站点

*

最新日志

* Hadoop源代码eclipse编译教程
* hadoop简单实现文本数据全局排序
* hadoop streaming例子
* hadoop安装配置
* Hadoop公平调度器
* Hadoop RPC学习
* Hadoop单节点安装
* 整合CASSANDRA与HADOOP MAPREDUCE
* hadoop文件操作
* Hadoop启动时出现Unrecognized option: -jvm 和 Could not create the Java virtual machine

*

*

友情链接

Hadoop教程 ? 2011-2012.

大数据系列hadoop相关文章:

最新评论
2014-09-09 09:20:02
可以挂靠
百毒不侵2014-09-09 03:59:34
None of yarn.resourcemanager.address.rm1 yarn.resourcemanager.address.rm2
Amy2014-09-08 02:42:13
腾讯大规模Hadoop集群实践-CSDN.NET http://t.cn/8FQoiXL
℡宁缺勿滥2014-09-08 04:43:49
国家图书馆的网上馆是不用钱的,除了论文
萧萧2014-09-07 12:20:08
死了这条心吧。hive不可能快,人家是批处理不是实时查询。
人在福田2014-09-06 07:49:58
【继拥抱Storm和Spark之后,Hortonworks添加对Kafka的支持】在整合了Spark、Storm之后,Hortonworks又将Apache Kafka纳入其Hadoop软件平台。对比Cloudera、MapR公司的类似操作,因为Hortonworks本身开源的特性,所以Hortonworks的一举一动显然更值得关注和重视。 http://t.cn/Rh7Cbw3
落叶归处2014-09-06 07:13:09
小麦,我做月报做疯了
符东铭2014-09-05 07:54:11
发表了博文《王家林Spark实战高手之路大数据Hadoop培训Spark培训》Spark实战高手之路-第一章RPo13rO王家林老师(邮箱18610086859@126.comhttp://weibo.com/p/230418686c3cfb0102uyix
简单2014-09-04 04:27:48
Cloud是新时代的Infrastructure,Hadoop是Middleware,Cloud Architect经常需要做的事情是整合、集成
小熊2014-09-03 01:23:29
Hadoop集群终于搭好了
 
  • Hadoop生态系统资料推荐