您现在的位置 >> Hadoop教程 >> Hadoop实战 >> 专题  
 

用Hadoop实现KMeans算法

【作者:Hadoop实战专家】【关键词:聚类 文件 Mapper 输入文件 】 【点击:100746次】【2013-09-1】
Mapper和Reducer线程获得了MyJob类静态变量的初始拷贝(这份拷贝是指MyJob执行完静态块之后静态变量的模样)。这里的问题是:如果确定要把质心放在文件中,那Mapper就需要从2个文件中读取数据--质心文件和样本数据文件。 为样本数据建立一个类Sample.java。  

相关热门搜索:

大数据标签:hadoop mapreduce mahout bigdata

问题导读:
什么是质心文件?
mapreduce的从哪里读取质心文件?
Driver驱动程序如何比较两次的质心是否相同?

在我们阅读的时候,我们首先知道什么是KMeans:

K-means算法是最为经典的基于划分的聚类方法,是十大经典数据挖掘算法之一。K-means算法的基本思想是:以空间中k个点为中心进行聚类,对最靠近他们的对象归类。通过迭代的方法,逐次更新各聚类中心的值,直至得到最好的聚类结果。

虽然已经发展到了hadoop2.4,但是对于一些算法只要明白其中的含义,是和语言无关的,无论是使用Java、C++、python等,
本文以Hadoop1.0.3为例。

从理论上来讲用MapReduce技术实现KMeans算法是很Natural的想法:在Mapper中逐个计算样本点离哪个中心最近,然后Emit(样本点所属的簇编号,样本点);在Reducer中属于同一个质心的样本点在一个链表中,方便我们计算新的中心,然后Emit(质心编号,质心)。但是技术上的事并没有理论层面那么简单。

Mapper和Reducer都要用到K个中心(我习惯称之为质心),Mapper要读这些质心,Reducer要写这些质心。另外Mapper还要读存储样本点的数据文件。我先后尝试以下3种方法,只有第3种是可行的,如果你不想被我误导,请直接跳过前两种。

一、用一个共享变量在存储K个质心

由于K很小,所以我们认为用一个Vector来存储K个质心是没有问题的。以下代码是错误的:

1. class MyJob extends Tool{

2.   static Vector centers=new Vector(K);

3.   static class MyMapper extends Mapper{

4.     //read centers

5.   } 

6.   static class MyMapper extends Reducer{

7.     //update centers

8.   }

9.   void run(){

10.     until ( convergence ){

11.       map();

12.       reduce();

13.     }

14. }

复制代码

发生这种错误是因为对hadoop执行流程不清楚,对数据流不清楚。简单地说Mapper和Reducer作为MyJob的内部静态类,它们应该是独立的--它们不应该与MyJob有任何交互,因为Mapper和Reducer分别在Task Tracker的不同JVM中运行,而MyJob以及MyJob的内部其他类都在客户端上运行,自然不能在不同的JVM中共享一个变量。

详细的流程是这样的:

首先在客户端上,JVM加载MyJob时先初始化静态变量,执行static块。然后提交作业到Job Tracker。

在Job Tracker上,分配Mapper和Reducer到不同的Task Tracker上。Mapper和Reducer线程获得了MyJob类静态变量的初始拷贝(这份拷贝是指MyJob执行完静态块之后静态变量的模样)。

在Task Tracker上,Mapper和Reducer分别地读写MyJob的静态变量的本地拷贝,但是并不影响原始的MyJob中的静态变量的值。

二、用分布式缓存文件存储K个质心
既然不能通过共享外部类变量的方式,那我们通过文件在map和reduce之间传递数据总可以吧,Mapper从文件中读取质心,Reducer把更新后的质心再写入这个文件。这里的问题是:如果确定要把质心放在文件中,那Mapper就需要从2个文件中读取数据--质心文件和样本数据文件。虽然有MutipleInputs可以指定map()的输入文件有多个,并可以为每个输入文件分别指定解析方式,但是MutipleInputs不能保证每条记录从不同文件中传给map()的顺序。在我们的KMeans中,我们希望质心文件全部被读入后再逐条读入样本数据。

于是乎就想到了DistributedCache,它主要用于Mapper和Reducer之间共享数据。DistributedCacheFile是缓存在本地文件,在Mapper和Reducer中都可使用本地Java I/O的方式读取它。于是我又有了一个错误的思路:

1. class MyMaper{

2.     Vector centers=new Vector(K);

3.     void setup(){

4.         //读取cacheFile,给centers赋值

5.     }

6.     void map(){

7.         //计算样本离哪个质心最近

8.     }

9. }

10. class MyReducer{

11.     Vector centers=new Vector(K);

12.     void reduce(){

13.         //更新centers

14.     }

15.     void cleanup(){

16.         //把centers写回cacheFile

17.     }

18. }

复制代码

错因:DistributedCacheFile是只读的,在任务运行前,TaskTracker从JobTracker文件系统复制文件到本地磁盘作为缓存,这是单向的复制,是不能写回的。试想在分布式环境下,如果不同的mapper和reducer可以把缓存文件写回的话,那岂不又需要一套复杂的文件共享机制,严重地影响hadoop执行效率。

三、用分布式缓存文件存储样本数据
其实DistributedCache还有一个特点,它更适合于“大文件”(各节点内存容不下)缓存在本地。仅存储了K个质心的文件显然是小文件,与之相比样本数据文件才是大文件。

此时我们需要2个质心文件:一个存放上一次的质心prevCenterFile,一个存放reducer更新后的质心currCenterFile。Mapper从prevCenterFile中读取质心,Reducer把更新后有质心写入currCenterFile。在Driver中读入prevCenterFile和currCenterFile,比较前后两次的质心是否相同(或足够地接近),如果相同则停止迭代,否则就用currCenterFile覆盖prevCenterFile(使用fs.rename),进入下一次的迭代。

这时候Mapper就是这样的:

1. class MyMaper{

2.     Vector centers=new Vector(K);

3.     void map(){

4.         //逐条读取质心,给centers赋值

5.     }

6.     void cleanup(){

7.         //逐行读取cacheFile,计算每个样本点离哪个质心最近

8.         //然后Emit(样本点所属的簇编号,样本点)

9.     }

10. }

复制代码

源代码
试验数据是在Mahout项目中作为example提供的,600个样本点,每个样本是一个60维的浮点向量。  synthetic_control.data.zip (118.04 KB, 下载次数: 2)

2014-6-28 11:58 上传
点击文件名

为样本数据建立一个类Sample.java。

1. package kmeans;

2.

3. import java.io.DataInput;

4. import java.io.DataOutput;

5. import java.io.IOException;

6.

7. import org.apache.commons.logging.Log;

8. import org.apache.commons.logging.LogFactory;

9. import org.apache.hadoop.io.Writable;

10.

11. public class Sample implements Writable{

12.     private static final Log log=LogFactory.getLog(Sample.class);

13.     public static final int DIMENTION=60;

14.     public double arr[];

15.

16.     public Sample(){

17.         arr=new double[DIMENTION];

18.     }

19.

20.     public static double getEulerDist(Sample vec1,Sample vec2){

21.         if(!(vec1.arr.length==DIMENTION && vec2.arr.length==DIMENTION)){

22.             log.error("vector's dimention is not "+DIMENTION);

23.             System.exit(1);

24.         }

25.         double dist=0.0;

26.         for(int i=0;iTHRESHOLD){

66.                 stop=false;

67.                 break;

68.             }

69.         }

70.         //如果还要进行下一次迭代,就用当前质心替代上一次的质心

71.         if(stop==false){

72.             fs.delete(pervCenterFile,true);

73.             if(fs.rename(currentCenterFile, pervCenterFile)==false){

74.                 log.error("质心文件替换失败");

75.                 System.exit(1);

76.             }

77.         }

78.         return stop;

79.     }

80.

81.     public static class ClusterMapper extends Mapper {

82.         Vector centers = new Vector();

83.         @Override

84.         //清空centers

85.         public void setup(Context context){

86.             for (int i = 0; i < K; i++) {

87.                 centers.add(new Sample());

88.             }

89.         }

90.         @Override

91.         //从输入文件读入centers

92.         public void map(LongWritable key, Text value, Context context)

93.                 throws IOException, InterruptedException {

94.             String []str=value.toString().split("\\s+");

95.             if(str.length!=Sample.DIMENTION+1){

96.                 log.error("读入centers时维度不对");

97.                 System.exit(1);

98.             }

99.             int index=Integer.parseInt(str[0]);

100.             for(int i=1;i {

135.         int prev=-1;

136.         Sample center=new Sample();;

137.         int count=0;

138.         @Override

139.         //更新每个质心(除最后一个)

140.         public void reduce(IntWritable key,Iterable values,Context context) throws IOException,InterruptedException{

141.             while(values.iterator().hasNext()){

142.                 Sample value=values.iterator().next();

143.                 if(key.get()!=prev){

144.                     if(prev!=-1){

145.                         for(int i=0;i

大数据系列相关文章:

最新评论
fangy0092014-09-10 12:50:47
Hadoop2.4.1源码编译_个人测试整理
爱已成往事2014-09-09 10:25:01
发表了博文 《hadoop 中的Streaming》 - //用于编写简单短小的mapreduce bin/hadoop jar contrib/streaming/hadoop-0.19.1-st http://t.cn/RPZ4zjs
清风2014-09-09 04:54:42
【大数据最核心的价值是什么】大数据最核心的价值就是在于对于海量数据进行存储和分析.大数据的“廉价、迅速、优化”这三方面的综合成本是最优的.大数据并不是仅仅局限在技术的显示,Hadoop系统的技术已经在事实上获得认可了,在讨论大数据最核心的价值也不能脱离Hadoop系统的技术 http://t.cn/RvYExOR
诸葛卧龙2014-09-08 10:32:34
。。。
枫叶紫竹2014-09-08 07:01:39
路径错了
人来人往2014-09-08 10:31:35
不是,是启动服务
海东2014-09-07 06:54:56
管理组件的
阿里君澜2014-09-06 04:48:47
为什么有些公司在机器学习业务方面倾向使用 R + Hadoop 方案? #读知乎# http://t.cn/RPUiHL2
南开石彬2014-09-05 09:36:54
#诚毅通知#【学术讲座:大数据与Hadoop大数据处理生态圈】(1)时间:3月19日(周三)19:00;(2)地点:景祺楼104;(3)讲座人:厦门祥讯信息技术有限公司项目经理、课程顾问陈阿敏高级工程师;(4)内容:1、什么是大数据时代2、大数据的特征、价值3、处理和分析技术生态圈4、大数据提供的工作机会
家有栋别墅2014-09-04 12:54:56
http://t.cn/RPqbI1w
 
  • Hadoop生态系统资料推荐