200字范文,内容丰富有趣,生活中的好帮手!
200字范文 > 大数据Hadoop之HDFS和MapReduce_02_01

大数据Hadoop之HDFS和MapReduce_02_01

时间:2022-01-17 10:35:27

相关推荐

大数据Hadoop之HDFS和MapReduce_02_01

Hadoop主要由HDFS和MapReduce 引擎两部分组成。最底部是HDFS,它存储hadoop集群中所有存储节点上的文件。HDFS 的上一层是MapReduce 引擎,该引擎由JobTrackers 和TaskTrackers组成。

分布式文件系统的理解:

随着数据量越来越多,在一个操作系统管辖的范围存不下了,那么就分配到更多的操作系统管理的磁盘中,但是不方便管理和维护,因此迫切需要一种系统来管理多台机器上的文件,这就是分布式文件管理系统。

--是一种允许文件通过网络在多台主机上分享的文件系统,可让多机器上的多用户分享文件和存储空间。

--通透性。让实际上是通过网络来访问文件的动作,由程序与用户看来,就像是访问本地的磁盘一般。

--容错。即使系统中有某些节点脱机,整体来说系统仍然可以持续运作而不会有数据损失。

--分布式文件管理系统很多,hdfs只是其中一种。适用于一次写入多次读取的情况,不支持并发写情况,小文件不合适

HDFS基本概念

分布式存储系统

提供了高可靠性、高扩展性和高吞吐率的数据存储服务

存储模型:字节

--文件线性切割成块(Block):偏移量 offset (byte)

--Block分散存储在集群节点中。

--单一文件Block大小一致,文件与文件可以不一致

--Block可以设置副本数,副本分散在不同节点中

--副本数不要超过节点数量

--文件上传可以设置Block大小和副本数(hive-site.xml配置文件中配置)

--已上传的文件Block副本数可以调整,block大小不能改变。

--只支持一次写入多次读取,同一时刻只有一个写入者(也就是说不能并发的写)

--可以append追加数据.

注意:hadoop1.0的默认数据块大小是64M、hadoop2.0x默认数据块的大小是128M。

架构模型:

--文件元数据MetaData,文件数据

元数据

数据本身

--(主)NameNode节点保存文件元数据:单节点 posix

--(从)DataNode节点保存文件Block数据:多节点

--DataNode与NameNode保持心跳,提交Block列表

--HdfsClient与NameNode交互元数据信息

--HdfsClient与DataNode交互文件Block数据

数据块

HDFS默认的最基本的存储单位是64M(128M)的数据块,这个数据块可以理解和一般的文件里面的分块是一样的。

元数据节点(主)和数据节点(从)

NameNode(主NN)

NameNode主要功能:

--接受客户端的读写服务

--与dataNode保持心跳,收集DataNode汇报的Block列表信息

--NameNode保存metadata信息包括

--文件大小,时间

--Block列表:Block偏移量),位置信息

--Block每副本位置(由DataNode上报)

元数据节点(namenode)用来管理文件系统的命名空间,它将所有的文件和文件夹的元数据保存在一个文件系统树中。

元数据信息包括:文件的大小、副本的数量位置、各个块的数量位置等信息。当客户端得到这些元数据信息后,如果想下载a.log它会按照就近原则

寻找blk1、blk2如果某个块损坏它会继续按照就近原则寻找。(note:Block每副本位置通过心跳机制由DataNode上报给namenode)

NameNode持久化

--NameNode的metadate信息在启动后会加载到内存

--metadata存储到磁盘文件名为”fsimage”

--Block的位置信息不会保存到fsimage

--edits记录对metadata的操作日志。。。包括文件的创建和删除等操作。

总结:NameNode相当于仓库管理员,而元数据相当于仓库的账本(各种具体的信息)。

DataNode(从DN)

--本地磁盘目录存储数据(Block),文件形式

--同时存储Block的元数据信息文件

--启动DN时会向NN汇报block信息

--通过向NN发送心跳保持与其联系(3秒一次),如果NN 10分钟没有收到DN的心跳,则认为其已经lost,并copy其上的block到其它DN

数据节点(datanode)就是用来存储数据文件的。

从元数据节点(secondarynamenode)不是我们所想象的元数据节点的备用节点,其实它主要的功能是周期性将元数据节点的命名空间镜像文件和修改日志合并,以防日志文件过大。

SecondaryNameNode(协助NameNode进行数据的管理)

从NameNode上下载元数据信息(fsimage,edits),然后把二者合并,生成新的fsimage,在本地保存,并将其推送到NameNode,替换旧的fsimage.

secondary namenode的工作流程:

secondary通知namenode切换edits文件

secondary从namenode获得fsimage和edits(通过http get)

secondary将fsimage载入内存,然后开始合并edits

secondary将新的fsimage发回给namenode

namenode用新的fsimage替换旧的fsimage

HDFS优点:

--高容错性

数据自动保存多个副本

副本丢失后,自动恢复

--适合批处理

移动计算而非数据

数据位置暴露给计算框架(Block偏移量)

--适合大数据处理

GB 、TB 、甚至PB 级数据

百万规模以上的文件数量

10K+ 节点

--可构建在廉价机器上

通过多副本提高可靠性

提供了容错和恢复机制

HDFS缺点:

--低延迟数据访问

比如毫秒级

低延迟与高吞吐率

--小文件存取占据内存

占用NameNode 大量内存

寻道时间超过读取时间

--不能并发写入(多用户写入)、文件随机修改

一个文件只能有一个写者

仅支持append(只能在文件的末尾进行追加,不能再任意的位置修改)

HDFS工作原理

写操作:

有一个文件FileA,100M大小。Client将FileA写入到HDFS上。

HDFS按默认配置。

HDFS分布在三个机架上Rack1,Rack2,Rack3。

a.Client将FileA按64M分块。分成两块,block1和Block2;

b.Client向nameNode发送写数据请求,如图蓝色虚线①------>。

c.NameNode节点与dataNode保持心跳,记录block信息。并返回可用的DataNode,如粉色虚线②--------->。

Block1: host2,host1,host3

Block2: host7,host8,host4

d.client向DataNode发送block1;发送过程是以流式写入。

流式写入过程,

1>将64M的block1按64k的package划分;

2>然后将第一个package发送给host2;

3>host2接收完后,将第一个package发送给host1,同时client想host2发送第二个package;

4>host1接收完第一个package后,发送给host3,同时接收host2发来的第二个package。

5>以此类推,如图红线实线所示,直到将block1发送完毕。

6>host2,host1,host3向NameNode,host2向Client发送通知,说“消息发送完了”。如图粉红颜色实线所示。

7>client收到host2发来的消息后,向namenode发送消息,说我写完了。这样就真完成了。如图黄色粗实线

8>发送完block1后,再向host7,host8,host4发送block2,如图蓝色实线所示。

9>发送完block2后,host7,host8,host4向NameNode,host7向Client发送通知,如图浅绿色实线所示。

10>client向NameNode发送消息,说我写完了,如图黄色粗实线。。。这样就完毕了。

分析,通过写过程,我们可以了解到:

①写1T文件,我们需要3T的存储,3T的网络流量贷款。

②在执行读或写的过程中,NameNode和DataNode通过HeartBeat进行保存通信,确定DataNode活着。如果发现DataNode死掉了,就将死掉的DataNode上的数据,放到其他节点去。读取时,要读其他节点去。

③挂掉一个节点,没关系,还有其他节点可以备份;甚至,挂掉某一个机架,也没关系;其他机架上,也有备份。

读操作:

读操作就简单一些了,如图所示,client要从datanode上,读取FileA。而FileA由block1和block2组成。

那么,读操作流程为:

a.client向namenode发送读请求。

b.namenode查看Metadata信息,返回fileA的block的位置。

block1:host2,host1,host3 block2:host7,host8,host4

c.block的位置是有先后顺序的,先读block1,再读block2。而且block1去host2上读取;然后block2,去host7上读取;

上面例子中,client位于机架外,那么如果client位于机架内某个DataNode上,例如,client是host6。那么读取的时候,遵循的规律是:

优选读取本机架上的数据。

HDFS单节点安装

step1:配置jdk和hadoop的环境变量vi/etc/profile 添加 export JAVA_HOME=/usr/local/jdk1.8.0 export HADOOP_HOME=/usr/local/hadoop-2.6.5

PATH=.:$HADOOP_HOME/bin:$JAVA_HOME/bin:$PATHstep2:在hdfs文件中配置jdk

hadoop-env.sh文件

执行cd /hadoop/etc/hadoop 命令进入配置文件目录

执行vi hadoop-env.sh命令修改其配置文件

export JAVA_HOME=/usr/local/jdk1.8.0

也可以把mapred-env.sh和yarn-env.sh一同修改。

step3:core-site.xml

vi core-site.xml

在configuration中添加

<property>

<name>fs.default.name</name>

<value>hdfs://node01:9000</value>#指定namenode的节点位置

</property>

<property>

<name>hadoop.tmp.dir</name>

<value>/usr/local/hadoop/full/</value> #指定存储数据的文件目录

</property>

step4:hdfs-site.xml

vim hdfs-site.xml

<property>

<name>dfs.replication</name>

<value>3</value> #指定副本的数量

</property>

<property>

<name>dfs.namenode.secondary.http-address</name>

<value>node02:50090</value>#指定secondary namenode的位置 独立与namenode节点的位置

</property>

step5:slaves

vi slaves

添加 node02

node03

node04

step6:对hadoop进行格式化(配置hadoophome)

执行命令hadoopnamenode -format

step7:启动hadoop(实际上启动的java进程)

切换到sbin目录下

先启动HDFS

sbin/start-dfs.sh

再启动YARN

sbin/start-yarn.sh

step8:验证是否启动成功

jps命令验证(5个进程hadoop版本2)

27408 NameNode

27643 SecondaryNameNode

28066 NodeManager

27803 ResourceManager

27512 DataNode

http://192.168.2.100:50070(HDFS管理界面)http://hadoop:50070/dfshealth.jsp

此界面能够显示表示HDFS正常的存活。创建HDFS目录用来存储文件上传文件到HDFS中注意:打开Block0文件发现Block 0中的文件被切割了(按照指定的字节切割的 即使里面存在着中文,也会把中文进行切割)这个时候也许你会考虑在maperReduce进行数据计算时会不会因为某个单词或者汉字被切割的问题导致计算出现误差。这个不用担心,MaperReduce计算时底层从第二个Block块以后不读取第一行数据,它是由上一个计算去读取执行。这样就可以能够读取一个完整的数据

Hadoop的HA机制

这种情形:当一个NameNode当掉了,整个集群就无法运行了。

Hadoop2.0后将NameNode进行了一个抽象,它把这个NameNode抽象为了一个NameService

一个NameService下面有两个NameNode,这时候就得需要有个东西来协调,否则两个NameNode都是active的状态或者为standby状态(等待)这时候Zookeeper的作用就是协调NameNode状态,他能确保整个NameService下只有一个活跃的NameNode。

FailoverController(ZKFC)进程的作用:监控NameNode的状态 把NameNode的状态及时的汇报给Zookeeper,以便进行实时的修改。

注意:

在hadoop1.x版本以及以前HDFS只有一个NameNode,对应一个在线的应用只有一个NameNode如果宕掉了就没法使用了,这是一个极大的弊端。

在Hadoop2.0版本后,为了高可靠性,我们使用Zookeeper,可以有多个NameNode,这样当一个NameNode宕掉之后可以马上有一个实时的切换,来顶替宕掉的那个NameNode。

说明:

在hadoop2.0中通常由两个NameNode组成,一个处于active状态,另一个处于standby状态分别位于两台单独的机器中。Active NameNode对外提供服务,而Standby NameNode则不对外提供服务,仅同步active namenode的状态,以便能够在它失败时快速进行切换。

hadoop2.0官方提供了两种HDFS HA的解决方案,一种是NFS,另一种是QJM。这里我们使用简单的QJM。在该方案中,主备NameNode之间通过一组JournalNode同步元数据信息,一条数据只要成功写入多数JournalNode即认为写入成功。通常配置奇数个JournalNode

这里还配置了一个zookeeper集群,用于ZKFC(DFSZKFailoverController)故障转移,当Active NameNode挂掉了,会自动切换Standby NameNode为standby状态。

HA集群搭建

在weekend08;weekend09;weekend10上配置hadoop

在主机名weekend08;weekend09;weekend10上配置hadoop修改6个配置文件

#hadoop2.0的配置文件全部在$HADOOP_HOME/etc/hadoop下

1、修改hadoo-env.sh

export JAVA_HOME=/usr/local/jdk1.8.0

2、修改core-site.xml

<!--指定hdfs的nameservice为ns1 -->

<property>

<name>fs.defaultFS</name>

<value>hdfs://ns1</value>

</property>

<!--指定hadoop临时目录-->

<property>

<name>hadoop.tmp.dir</name>

<value>/usr/local/hadoop-2.6.5/ha</value>

</property>

<!--指定zookeeper地址-->

<property>

<name>ha.zookeeper.quorum</name>

<value>weekend11:2181,weekend12:2181,weekend13:2181</value>

</property>

</configuration>

3、修改hdfs-site.xml

<configuration>

<!--指定副本数量-->

<property>

<name>dfs.replication</name>

<value>3</value>

</property>

<!--指定hdfs的nameservice为ns1,需要和core-site.xml中的保持一致-->

<property>

<name>dfs.nameservices</name>

<value>ns1</value>

</property>

<!-- ns1下面有两个NameNode,分别是nn1,nn2 -->

<property>

<name>dfs.ha.namenodes.ns1</name>

<value>nn1,nn2</value>

</property>

<!-- nn1的RPC通信地址-->

<property>

<name>dfs.namenode.rpc-address.ns1.nn1</name>

<value>weekend08:9000</value>

</property>

<!-- nn1的http通信地址-->

<property>

<name>dfs.namenode.http-address.ns1.nn1</name>

<value>weekend08:50070</value>

</property>

<!-- nn2的RPC通信地址-->

<property>

<name>dfs.namenode.rpc-address.ns1.nn2</name>

<value>weekend09:9000</value>

</property>

<!-- nn2的http通信地址-->

<property>

<name>dfs.namenode.http-address.ns1.nn2</name>

<value>weekend09:50070</value>

</property>

<!--指定NameNode的元数据在JournalNode上的存放位置-->

<property>

<name>dfs.namenode.shared.edits.dir</name>

<value>qjournal://weekend11:8485;weekend12:8485;weekend13:8485/ns1

</value>

</property>

<!--指定JournalNode在本地磁盘存放数据的位置-->

<property>

<name>dfs.journalnode.edits.dir</name>

<value>/usr/local/hadoop-2.6.5/journal</value>

</property>

<!--开启NameNode失败自动切换-->

<property>

<name>dfs.ha.automatic-failover.enabled</name>

<value>true</value>

</property>

<!--配置失败自动切换实现方式-->

<property>

<name>dfs.client.failover.proxy.provider.ns1</name>

<value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>

</property>

<!--配置隔离机制方法,多个机制用换行分割,即每个机制暂用一行-->

<property>

<name>dfs.ha.fencing.methods</name>

<value>

sshfence

shell(/bin/true)

</value>

</property>

<!--使用sshfence隔离机制时需要ssh免登陆-->

<property>

<name>dfs.ha.fencing.ssh.private-key-files</name>

<value>/root/.ssh/id_rsa</value>

</property>

</configuration>

在节点weekend08上执行如下命令复制到其他节点

scp ./core-site.xml hdfs-site.xml weekend09:/usr/local/hadoop-2.6.5/etc/hadoop

scp ./core-site.xml hdfs-site.xml weekend10:/usr/local/hadoop-2.6.5/etc/hadoop

scp ./core-site.xml hdfs-site.xml weekend11:/usr/local/hadoop-2.6.5/etc/hadoop scp ./core-site.xml hdfs-site.xml weekend12:/usr/local/hadoop-2.6.5/etc/hadoop

scp ./core-site.xml hdfs-site.xml weekend13:/usr/local/hadoop-2.6.5/etc/hadoop

4、修改mapred-site.xml

<configuration>

<!--指定mr框架为yarn方式-->

<property>

<name>mapreduce.framework.name</name>

<value>yarn</value>

</property>

</configuration>

5、修改yarn-site.xml

<configuration>

<!--指定resourcemanager地址-->

<property>

<name>yarn.resourcemanager.hostname</name>

<value>weekend10</value>

</property>

<!--指定zk集群地址-->

<property>

<name>yarn.resourcemanager.zk-address</name>

<value>weekend11:2181,weekend12:2181,weekend13:2181</value>

</property>

<!--指定nodemanager启动时加载server的方式为shuffle server -->

<property>

<name>yarn.nodemanager.aux-services</name>

<value>mapreduce_shuffle</value>

</property>

</configuration>

注意:这个文件也可以给MapperReducer进行HA的配置;这里就不在进行配置了。

6、修改slaves(slaves是指定子节点的位置,因为要在weekend10上启动HDFS、在weekend10启动yarn,所以weekend08上的slaves文件指定的是datanode的位置,weekend10上的slaves文件指定的是nodemanager的位置)

weekend11 192.168.2.111

weekend12 192.168.2.112

weekend13 192.168.2.113

7、配置免密码登陆

#首先要配置weekend08到weekend09、weekend10、weekend11、weekend12、weekend13的免密码登陆

#在weekend08上生产一对钥匙

ssh-keygen -t rsa

#将公钥拷贝到其他节点(免登陆的节点上),包括自己。

ssh-copy-id -i ~/.ssh/id_rsa.pub remote-host

ssh-copy-id -i ~/.ssh/id_rsa.pubweekend08

ssh-copy-id -i ~/.ssh/id_rsa.pubweekend09

ssh-copy-id -i ~/.ssh/id_rsa.pubweekend10

ssh-copy-id -i ~/.ssh/id_rsa.pubweekend11

ssh-copy-id -i ~/.ssh/id_rsa.pubweekend12

ssh-copy-id -i ~/.ssh/id_rsa.pubweekend13

#配置weekend10到weekend11、weekend12、weekend13的免密码登陆

#在weekend10上生产一对钥匙

ssh-keygen -t rsa

#将公钥拷贝到其他节点

ssh-copy-id -i ~/.ssh/id_rsa.pubweekend10

ssh-copy-id -i ~/.ssh/id_rsa.pubweekend11

ssh-copy-id -i ~/.ssh/id_rsa.pubweekend12

ssh-copy-id -i ~/.ssh/id_rsa.pubweekend13

#注意:两个namenode之间要配置ssh免密码登陆,别忘了配置weekend09到weekend08的免登陆

在weekend09上生产一对钥匙

ssh-keygen -t rsa

ssh-copy-id -i ~/.ssh/id_rsa.pubweekend08

这个时候就可以使用 ssh weekend09进行免登陆了。

在weekend11,weekend12,weekend13配置hadoop和Zookeeper

1、在weekend11节点上解压安装zookeeper-2.6.5 进入zoo_sample.cfg 并重命名zoo.cfg

vizoo.cfg

dataDir=/usr/local/zookeeper-3.4.6/zk #修改数据目录

#添加如下内容

server.1=weekend11:2888:3888

server.2=weekend12:2888:3888

server.3=weekend13:2888:3888

2、在数据目录/usr/local/zookeeper-3.4.6/zk下创建一个myid的文件并输入server的id号

在 weekend11节点下执行 echo 1 > myid

复制weekend11节点中的zookeeper-2.6.5到weekend12、weekend13

scp -r ./zookeeper-3.4.6 weekend13:/usr/local

修改 myid中的文件分别为 2 、 3

3、添加zk环境变量

export JAVA_HOME=/usr/local/jdk1.8.0

export HADOOP_HOME=/usr/local/hadoop-2.6.5

export ZK_HOME=/usr/local/zookeeper-3.4.6

PATH=.:$HADOOP_HOME/bin:$JAVA_HOME/bin:$ZK_HOME/bin:$PATH

复制到其他节点

scp /etc/profile weekend12:/etc/

scp /etc/profile weekend13:/etc/

4、###注意:严格按照下面的步骤(未格式化nomenode时的启动步骤)

启动zookeeper集群(分别在weekend11、weekend12、weekend13上启动zk)

cd /usr/local/zookeeper-3.4.5/bin/

./zkServer.sh start

#查看状态:一个leader,两个follower

./zkServer.sh status

5、启动journalnode(在weekend11,weekend12,weekend13,上启动所有journalnode,注意:是调用的hadoop-daemons.sh这个脚本,注意是复数s的那个脚本)

cd /usr/local/hadoop-2.2.0

sbin/hadoop-daemons.sh start journalnode

#运行jps命令检验,weekend11,weekend12,weekend13上多了JournalNode进程

6、格式化HDFS

#在weenkend08上执行命令:(前提是配置hadoophome并且使其生效)

hdfs namenode -format

#格式化后会在根据core-site.xml中的hadoop.tmp.dir配置生成个文件,这里我配置的是/usr/local/hadoop-2.6.5/ha,然后将/usr/local/hadoop-2.6.5/ha拷贝到weekend09的/usr/local/hadoop-2.6.5下。

scp -r ha/ weekend09:/usr/local/hadoop-2.6.5/

这样weekend08和weekend09中的ha数据完全一样

7、格式化ZK(在weekend08上执行即可)

hdfs zkfc -formatZK

这时会在zookeeper的根目录下出现hadoop-ha目录

8、启动HDFS(在weekend08上执行)

sbin/start-dfs.sh

这时会在weekend11,weekend12,weekend13,出现datanode进程

到目前为止各节点的进程情况如下

杀死weekend08的namenode进程 kill -9 2369 发现此时的weekend09 的namenode依然处于Active状态。

示例:

创建hdfs的根目录

hdfs dfs -mkdir -p /user/root

上传Linux目录下的文件到hdfs中

hdfs dfs -D dfs.blocksize=1048576 -put ./test.txt

查看ResourceManager的界面

9、启动YARN(#####注意#####:是在weekend10上执行start-yarn.sh,把namenode和resourcemanager分开是因为性能问题,因为他们都要占用大量资源,所以把他们分开了,他们分开了就要分别在不同的机器上启动)

sbin/start-yarn.sh

这时候weekend10多了一个resourcesmanager进程weekend11,weekend12,weekend13出现了datanode和nodemanage进程

已经格式化nomenode,并且配置完免登陆时的启动步骤

step1:首先启动weekend11;weekend12;weekend13上的zookeeper

启动journalnode(在weekend11,weekend12,weekend13,上启动所有journalnode,注意:是调用的hadoop-daemons.sh这个脚本,注意是复数s的 那个脚本)

Step2:在weekend08上启动hdfs 迁移到sbin目录下

执行start-dfs.sh 命令

Step3:在weekend10上启动hdfs 迁移到sbin目录下

执行start-yarn.sh 命令

到此,hadoop2.6.5配置完毕,可以统计浏览器访问:

http://192.168.222.108:50070

NameNode 'weeked08:9000' (active)

http://192.168.222.109:50070

NameNode 'weekend09:9000' (standby)

验证HDFS HA

首先向hdfs上传一个文件

hadoop fs -put /etc/profile /profile

hadoop fs -ls /

然后再kill掉active的NameNode

kill -9 <pid of NN>

通过浏览器访问:http://192.168.1.202:50070

NameNode 'weekend09:9000' (active)

这个时候weekend09上的NameNode变成了active

在执行命令:

hadoop fs -ls /

-rw-r--r-- 3 root supergroup 1926 -02-06 15:36 /profile

刚才上传的文件依然存在!!!

手动启动那个挂掉的NameNode

sbin/hadoop-daemon.sh start namenode

通过浏览器访问:http://weekend08:50070

NameNode 'weekend09:9000' (standby)

验证YARN:

运行一下hadoop提供的demo中的WordCount程序:

hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.2.0.jar wordcount /profile /out

OK,大功告成!!!

HDFS的API操作

public class HDFSDemo {

FileSystem fs = null;

@Before

public void init() throws IOException, URISyntaxException, InterruptedException {

// 创建FileSystem的实现类(工具类)FileSystem是抽象类只能使用get方法获得实现类对象

fs = FileSystem.get(new URI("hdfs://weekend08:9000"), new Configuration(), "root");

}

/**

* 测试文件的上传

*

* @throws IOException

* @throws IllegalArgumentException

*/

@Test

public void testFileUpload() throws IllegalArgumentException, IOException {

FileInputStream in = new FileInputStream("c://words.txt");

OutputStream out = fs.create(new Path("/words.txt"));

IOUtils.copyBytes(in, out, 4096, true);

System.out.println("将本地文件系统上传到hdfs文件系统中!");

}

@Test

public void testFiledownload() throws IllegalArgumentException, IOException {

InputStream in = fs.open(new Path("/words.txt"));// 读取hdfs文件

OutputStream out = new FileOutputStream("c://usr.txt");

IOUtils.copyBytes(in, out, 4096, true);

System.out.println("从hdfs文件中读取文件到本地文件系统中");

}

@Test

public void testFasterDownLoad() throws IllegalArgumentException, IOException {

fs.copyToLocalFile(new Path("/words.txt"), new Path("c://words.txt"));

System.out.println("从hdfs文件中读取文件到本地文件系统中的快速方法!");

}

@Test

public void testDelete() throws IllegalArgumentException, IOException {

boolean flag = fs.delete(new Path("/testCreateDirectory"), true);

System.out.println("当设置为true的时候递归删除hdfs文件!" + flag);

}

@Test

public void testCreateDir() throws IllegalArgumentException, IOException {

Path path = new Path("/testCreateDirectory");

boolean flag = false;

if (!fs.exists(path)) {

flag = fs.mkdirs(path);

}

System.out.println("测试创建文件夹!" + flag);

}

@Test

public void getBlock() throws IllegalArgumentException, IOException {

Path input = new Path("/user/root/test.txt");

FileStatus inFile = fs.getFileStatus(input);

BlockLocation[] bls = fs.getFileBlockLocations(inFile, 0, inFile.getLen());

for (BlockLocation b : bls) {

System.out.println(b);

}

FSDataInputStream readF = fs.open(input);

byte char01 = readF.readByte();// 读取块的第一个字节

System.out.println((char) char01);

readF.seek(1048576);// 设置读取的偏移量

System.out.println((char) readF.readByte());

}

}

HDFS的shell操作

执行命令[root@hadoop sbin]# start-dfs.sh启动hdfs

1.0查看帮助

hadoop fs -help <cmd>

1.1上传

hadoop fs -put <linux上文件> <hdfs上的路径>

等同于hadoop fs -copyFromLocal 命令

如:hadoop fs -copyFromLocal /root/install.log /in.log

hadoop fs -put /root/install.log /in.log

意思就是说将linux根目录下的install.log文件复制到hdfs根目录下的in.log

1.2查看文件内容(适用于小文件)

hadoop fs -cat <hdfs上的路径>

如:hadoop fs -cat /in.log

等同于hadoop fs -text/in.log

1.3查看hdfs文件列表

hadoop fs -ls /

其中/ 代表hdfs的根目录

hadoop fs -lsr / 递归查询

hadoop fs -lsr -h /顺便显示文件的大小

1.4下载文件

hadoop fs -get <hdfs上的路径> <linux上文件>

等同于hadoop fs -copyToLocal命令

下载hdfs上的文件到本地

1.5 查看hdfs存储空间

Hadoop fs -df -h

1.6 查看文件的大小

hadoop fs -du -s -h / 查看根目录下所有文件的总大小

hadoop fs -du -h /user/root 查看每个文件的大小

1.7 创建目录

hadoop fs -mkdir /aa

1.8 删除目录

hadoop fs -rm -r /aa 删除文件 hadoop fs -rmdir /aa 删除没有数据的空目录

1.9 查看文件的数量

hadoop fs -count /

1 代表文件夹的数量 2 代表文件的数量

1.10 给文件添加权限

hadoop fs -chmod a+x /in..log

1.11HDFS查看文件的前几行-后几行-行数

随机返回指定行数的样本数据

hadoop fs -cat /user/root/words.txt | shuf -n 5

返回前几行的样本数据

hadoop fs -cat /user/root/words.txt | head -100

返回最后几行的样本数据

hadoop fs -cat /user/root/words.txt | tail -5

查看文本行数

hadoop fs -cat /user/root/words.txt |wc -l

查看文件大小(单位byte)

hadoop fs -du /user/root/words.txt

hadoop fs -count /user/root/words.txt

详细的shell操作见文档,以上是常见的命令,需要熟练的掌握。

HDFS的体系架构

客户端首先与NameNode交互,NameNode要查询元数据信息(通常情况下元数据信息在内存中和磁盘中都要进行保存,内存中的保存 是为了读写效率高,磁盘的保存是为了内存元数据的丢失)NameNode将查询到的元数据信息返回给客户端,然后客户端从DataNode中获取数据

NameNode(数据的管理服务)

接收用户的操作请求(如shell操作),是整个文件系统的管理节点。它维护着整个文件系统的文件目录树,文件/目录的元信息和每个文件对应的数据块列表。

文件包括:

fsimage:元数据镜像文件。存储某一时段NameNode内存元数据信息。

edits:操作日志文件。

fstime:保存最近一次checkpoint的时间。

以上这些文件是保存在linux的文件系统中(tmp文件夹中)

NameNode的工作特点

Namenode始终在内存中保存metedata,用于处理“读请求”

到有“写请求”到来时,namenode会首先写editlog到磁盘,即向edits文件中写日志,成功返回后,才会修改内存,并且向客户端返回

Hadoop会维护一个fsimage文件,也就是namenode中metedata的镜像,但是fsimage不会随时与namenode内存中的metedata保持一致,而是每隔一段时间通过合并edits文件来更新内容。Secondary namenode就是用来合并fsimage和edits文件来更新NameNode的metedata的。

Hadoop的RPC

代理接口:

public interface Bizable {

public String sayHi(String name);

public final static long versionID=10010;//此接口要给它一个versionID否则报错

}

客户端:

import java.io.IOException;

import .InetSocketAddress;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.ipc.RPC;

/**

* 客户端程序

* @author Administrator

*

*/

public class RPCClient {

public static void main(String[] args) throws Throwable {

Bizable proxy = RPC.getProxy(Bizable.class, 10010, new InetSocketAddress("192.168.2.106", 9527),

new Configuration());

String retValue = proxy.sayHi("limei");

System.out.println(retValue);

RPC.stopProxy(proxy);

}

}

服务器端:

import java.io.IOException;

import org.apache.hadoop.HadoopIllegalArgumentException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.ipc.RPC;

import org.apache.hadoop.ipc.Server;

public class RPCServer implements Bizable {

public String sayHi(String name) {

return "Hi~" + name;

}

public static void main(String[] args) throws Throwable, IOException {

Configuration conf=new Configuration();

Server server=new RPC.Builder(conf).setProtocol(Bizable.class).setInstance(new RPCServer()).setBindAddress("192.168.2.106").setPort(9527).build();

server.start();

}

}

将windows下的Clinet打成jar文件上传到linux中运行客户端;而服务器端的运行是在winddows下。这样能够做到多台机器的交互。

MapReduce

MR由两个阶段组成:Map和Reduce,用户只需要实现map()和reduce()两个函数,即可实现分布式计算,非常简单。

这两个函数的形参是key、value对,表示函数的输入信息。

注意:在hadoop1.0版本 MapReduce的两个主从结构为JobTracker、TaskTracker

Hadoop2.0版本 MapReduce的两个主从结构为ResourceManager、NodeManager

ResourceManager(JobTracker)的功能是资源分配任务。它决定着哪台机器执行多少个Map和Reduce。

比如说第一台机器执行10个Mapper和10个Reduce,而第二台机器的性能不是很好;其CPU和内存占用了很多,这时可以在这台机器上执行5个Mapper和5个Reduce。

它还监控着这些任务,有时一些任务出现了问题,它还要重新的进行资源的分配,把这些任务进行转移。

Mapreduce的计算模型

要了解Mapreduce,首先要了解Mapreduce的载体是什么。

在Hadoop中用于执行Mapreduce任务的机器角色有两个,一个是ResourceManager(JobTracker),另一个是NodeManager(TaskTracker)。JobTracker用于调度工作,TaskTracker用于执行工作,一个Hadoop集群中只有一个JobTracker。

◆执行步骤:

1. map任务处理

1.1 读取输入文件内容,解析成key、value对。对输入文件的每一行,解析成key、value对。每一个键值对调用一次map函数。

1.2 写自己的逻辑,对输入的key、value处理,转换成新的key、value输出。

1.3 对输出的key、value进行分区。

1.4 对不同分区的数据,按照key进行排序、分组。相同key的value放到一个集合中。

1.5 (可选)分组后的数据进行归约。

2.reduce任务处理

2.1 对多个map任务的输出,按照不同的分区,通过网络copy到不同的reduce节点。

2.2 对多个map任务的输出进行合并、排序。写reduce函数自己的逻辑,对输入的key、value处理,转换成新的key、value输出。

2.3 把reduce的输出保存到文件中。

Mapreduce原理

决定Mapper的数量

HDFS中数据的存储是以块的形式存储的,数据块的切分是物理切分,而split是在Block的基础上进行的逻辑切分。每一个split对应着一个Mapper。

一个文件被切分成多少个split就有多少个Mapper。

决定Reducer的数量

如:10个key可以有1个reducer,但是这个reducer只能一次处理一个key,也就是说处理10次

10个key可以有大于10个reducer ,只不过有的reduce不进行key的处理。

10个key有10个reducer,这是最合理的分配,达到并行计算。

相同的key如何识别到指定的reducer进行计算呢?

对输出的key、value进行分区。

总结:Mapper阶段是并行读取处理的它的数量是由切片的数量决定的;Reducer阶段可以不并行,他的数量的是通过key进行规划,由人来决定。

hadoop运行原理之shuffle

hadoop的核心思想是MapReduce,但shuffle又是MapReduce的核心。shuffle的主要工作是从Map结束到Reduce开始之间的过程。首先看下这张图,就能了解shuffle所处的位置。图中的partitions、copy phase、sort phase所代表的就是shuffle的不同阶段。

shuffle被称作MapReduce的心脏,是MapReduce的核心。

由上图看出,每个数据切片由一个Map处理,也就是说map只是处理文件的一部分。

每一个Map都有一个环形的内存缓冲区,用来存储Map的输出数据,这个内存缓冲区的默认大小是100MB,当数据达到阙值0.8,也就是80MB的时候,一个后台的程序就会把数据溢写到磁盘中。在将数据溢写到磁盘的过程中要经过复杂的过程,首先要将数据进行分区排序(按照分区号如0,1,2),分区完以后为了避免Map输出数据的内存溢出,可以将Map的输出数据分为各个小文件再进行分区,这样map的输出数据就会被分为了具有多个小文件的分区已排序过的数据。然后将各个小文件分区数据进行合并成为一个大的文件(将各个小文件中分区号相同的进行合并)。

这个时候Reducer启动了三个分别为0,1,2。0号Reducer会取得0号分区 的数据;1号Reducer会取得1号分区的数据;2号Reducer会取得2号分区的数据。

一、Map端的shuffle

(1)在map端首先接触的是InputSplit,在InputSplit中含有DataNode中的数据,每一个InputSplit都会分配一个Mapper任务,Mapper任务结束后产生<K2,V2>的输出,这些输出先存放在缓存中,每个map有一个环形内存缓冲区,用于存储任务的输出。默认大小100MB(io.sort.mb属性),一旦达到阀值0.8(io.sort.spil l.percent),一个后台线程就把内容写到(spill)Linux本地磁盘中的指定目录(mapred.local.dir)下的新建的一个溢出写文件。

(2)写磁盘前,要进行partition、sort和combine等操作。通过分区,将不同类型的数据分开处理,之后对不同分区的数据进行排序,如果有Combiner,还要对排序后的数据进行combine。等最后记录写完,将全部溢出文件合并为一个分区且排序的文件。

(3)最后将磁盘中的数据送到Reduce中,从图中可以看出Map输出有三个分区,有一个分区数据被送到图示的Reduce任务中,剩下的两个分区被送到其他Reducer任务中。而图示的Reducer任务的其他的三个输入则来自其他节点的Map输出。

二、Reduce端的shuffle

Reduce端的shuffle主要包括三个阶段,copy、sort(merge)和reduce。

(1)Copy阶段:Reducer通过Http方式得到输出文件的分区。

reduce端可能从n个map的结果中获取数据,而这些map的执行速度不尽相同,当其中一个map运行结束时,reduce就会从JobTracker中获取该信息。map运行结束后TaskTracker会得到消息,进而将消息汇报给JobTracker,reduce定时从JobTracker获取该信息,reduce端默认有5个数据复制线程从map端复制数据。

(2)Merge阶段:如果形成多个磁盘文件会进行合并

从map端复制来的数据首先写到reduce端的缓存中,同样缓存占用到达一定阈值后会将数据写到磁盘中,同样会进行partition、combine、排序等过程。如果形成了多个磁盘文件还会进行合并,最后一次合并的结果作为reduce的输入而不是写入到磁盘中。

(3)Reducer的参数:最后将合并后的结果作为输入传入Reduce任务中。

最后就是Reduce过程了,在这个过程中产生了最终的输出结果,并将其写到HDFS上。

MapperReduce之YARN

YARN:负责资源管理和调度

WordCount的例子

DataCount的例子

日志的格式信息

代码是实现:

Mapper阶段

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

public class DataCountMapper extends Mapper<LongWritable, Text, Text, DataBean> {

@Override

protected void map(LongWritable key, Text text, Context context) throws IOException, InterruptedException {

String line = text.toString();

String[] fields = line.split("\t");// 按照指制表符进行切分

String telNo = fields[1];

long upPayLoad = Long.parseLong(fields[8]);

long downPayLoad = Long.parseLong(fields[9]);

DataBean bean = new DataBean(telNo, upPayLoad, downPayLoad);

context.write(new Text(telNo), bean);

}

}

Reducer阶段

import java.io.IOException;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

public class DataCountReducer extends Reducer<Text, DataBean, Text, DataBean> {

protected void reduce(Text text, Iterable<DataBean> beans, Context context)

throws IOException, InterruptedException {

long up_sum = 0;

long down_sum = 0;

for (DataBean bean : beans) {

up_sum = bean.getUpPayLoad();

down_sum = bean.getDownPayLoad();

}

DataBean dataBean = new DataBean("", up_sum, down_sum);

context.write(text, dataBean);

}

}

程序的主入口

import java.io.IOException;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**

* 书写完成后将程序打成jar包 放在linux的 /usr/local目录下面

* 这个时候文件的输入输出路径要进行自定义操作。

*/

public class DataCountMain {

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

Job job = new Job();

job.setJarByClass(DataCountMain.class);

job.setJobName("DataCount");

/*

* 将path设置为args0 时输入输出路径可以在运行jar的时候指定

*/

FileInputFormat.addInputPath(job, new Path(args[0]));

FileOutputFormat.setOutputPath(job, new Path(args[1]));

job.setMapperClass(DataCountMapper.class);

/*

* 当k2 v2 和和k3 v3 一一对应的时候 注释的部分可以省略

*

*/

// job.setMapOutputKeyClass(Text.class);

// job.setMapOutputValueClass(DataBean.class);

job.setReducerClass(DataCountReducer.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(DataBean.class);

System.exit(job.waitForCompletion(true) ? 0 : 1);

}

}

将要分析的日志信息提交到HDFS的根目录下 datacount.txt 然后将书写完的程序打成jar包在linux下执行命令:hadoop jar /usr/local/dataCount.jar com.hadoop.datacount.DataCountMain /datacount.txt /dataout

HDFS中输出文件的信息

Hadoop的远程Debug

JPDA 简介

Sun Microsystem 的 Java Platform Debugger Architecture (JPDA) 技术是一个多层架构,使您能够在各种环境中轻松调试 Java 应用程序。JPDA 由两个接口(分别是 JVM Tool Interface 和 JDI)、一个协议(Java Debug Wire Protocol)和两个用于合并它们的软件组件(后端和前端)组成。它的设计目的是让调试人员在任何环境中都可以进行调试。

更详细的介绍,您可以参考使用 Eclipse 远程调试 Java 应用程序

JDWP 设置

JVM本身就支持远程调试,Eclipse也支持JDWP,只需要在各模块的JVM启动时加载以下参数:

dt_socket表示使用套接字传输。

address=8000

JVM在8000端口上监听请求,这个设定为一个不冲突的端口即可。

server=y

y表示启动的JVM是被调试者。如果为n,则表示启动的JVM是调试器。

suspend=y

y表示启动的JVM会暂停等待,直到调试器连接上才继续执行。suspend=n,则JVM不会暂停等待。

需要在$HADOOP_HOME/etc/hadoop/hadoop-env.sh文件的最后添加你想debug的进程

远程调试namenode

export HADOOP_NAMENODE_OPTS="-agentlib:jdwp=transport=dt_socket,address=8888,server=y,suspend=y"

远程调试datanode

export HADOOP_DATANODE_OPTS="-agentlib:jdwp=transport=dt_socket,address=9888,server=y,suspend=y"

远程调试RM

export YARN_RESOURCEMANAGER_OPTS="-agentlib:jdwp=transport=dt_socket,address=10888,server=y,suspend=y"

远程调试NM

export YARN_NODEMANAGER_OPTS="-agentlib:jdwp=transport=dt_socket,address=10888,server=y,suspend=y"

Mapreduce编程

Partitioner编程

1、如果要定制partitioner需要继承该类,partitioner的执行是在Map输出之后,其泛型就是map的输出即:Key2,V2

2、实现其方法getPartition,返回值为分区号,方法的前两个参数为map的K2,V2;方法的第三个参数由reducer的数量决定。

3、将Partitioner加到Mapreducer主类中job.setPartitionerClass(ServiceProviderPartitioner.class);

4、打成jar包在集群中执行。

5、默认情况下的reducer的数量为1个,可以在Mepreducer中设置reducer的数量,数量的值通过运行jar 文件时传递的。job.setNumReduceTasks(Integer.parseInt(args[2]));

在dataCount例子的基础上添加Partitioner

创建一个类继承Partitionerimport java.util.HashMap;import java.util.Map;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Partitioner;public class ProviderPartitioner extends Partitioner<Text,DataBean>{private static Map<String,Integer> providerMap=new HashMap<String,Integer>();static{providerMap.put("135",1); //1 代表中国移动 2 代表中国联通 3代表中国电信providerMap.put("136",1);providerMap.put("137",1);providerMap.put("138",1);providerMap.put("159",2);providerMap.put("150",2);providerMap.put("182",3);providerMap.put("183",3);}@Overridepublic int getPartition(Text text, DataBean bean, int paramInt) {String account=text.toString();String sub_acc=account.substring(0, 3);Integer code=providerMap.get(sub_acc);if(code==null){code=0;}return code;}}

在MapReducer的主类中添加打成jar包提交到linux下执行

执行结果

多个Mapreduce处理逻辑排序

业务需求:

对淘宝账户的收入进行自定的排序。默认的排序是根据账户的数据字典进行排序的,在这里我们要自定义排序即:按照收入的大小进行排序。

通过这样的排序,我们就能够分析出有价值的信息,比如:对于收入高的店会继续的合作,收入不高的店对其进行推广。

分析过程:

首先要在mapper中进行求和,然后再继续排序。在这里写一个MaperReducer无法搞定,因此要写两个maperReducer即:迭代式编程模型,第一个Mapreducer的结果作为第二个MaperReducer的输入。通过这样的迭代可以书写多个MaperReducer。

代码实现:

InfoBean类

publicclassInfoBeanimplementsWritableComparable<InfoBean>{

privateStringaccount;

privatedoubleincome;

privatedoubleexpenses;

privatedoublesurplus;

publicvoidset(String account,doubleincome,doubleexpenses){

this.account= account;

this.income= income;

this.expenses= expenses;

this.surplus= income - expenses;

}

@Override

publicString toString() {

returnthis.income+"\t"+this.expenses+"\t"+this.surplus;

}

/**

* serialize

*/

publicvoidwrite(DataOutput out)throwsIOException {

out.writeUTF(account);

out.writeDouble(income);

out.writeDouble(expenses);

out.writeDouble(surplus);

}

/**

*deserialize

*/

publicvoidreadFields(DataInput in)throwsIOException {

this.account= in.readUTF();

this.income= in.readDouble();

this.expenses= in.readDouble();

this.surplus= in.readDouble();

}

publicintcompareTo(InfoBean o) {

if(this.income== o.getIncome()){

returnthis.expenses> o.getExpenses() ? 1 : -1;

}else{

returnthis.income> o.getIncome() ? -1 : 1;

}

}

publicString getAccount() {

returnaccount;

}

publicvoidsetAccount(String account) {

this.account= account;

}

publicdoublegetIncome() {

returnincome;

}

publicvoidsetIncome(doubleincome) {

this.income= income;

}

publicdoublegetExpenses() {

returnexpenses;

}

publicvoidsetExpenses(doubleexpenses) {

this.expenses= expenses;

}

publicdoublegetSurplus() {

returnsurplus;

}

publicvoidsetSurplus(doublesurplus) {

this.surplus= surplus;

}

}

SumStep类

import java.io.IOException;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class SumStep {

public static void main(String[] args) throws Exception {

Job job = new Job();

job.setJarByClass(SumStep.class);

job.setMapperClass(SumMapper.class);

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(InfoBean.class);

FileInputFormat.setInputPaths(job, new Path(args[0]));

job.setReducerClass(SumReducer.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(InfoBean.class);

FileOutputFormat.setOutputPath(job, new Path(args[1]));

job.waitForCompletion(true);

}

public static class SumMapper extends Mapper<LongWritable, Text, Text, InfoBean> {

private InfoBean bean = new InfoBean();

private Text k = new Text();

@Override

protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

// split

String line = value.toString();

String[] fields = line.split("\t");

// get useful field

String account = fields[0];

double income = Double.parseDouble(fields[1]);

double expenses = Double.parseDouble(fields[2]);

k.set(account);

bean.set(account, income, expenses);

context.write(k, bean);

}

}

public static class SumReducer extends Reducer<Text, InfoBean, Text, InfoBean> {

private InfoBean bean = new InfoBean();

@Override

protected void reduce(Text key, Iterable<InfoBean> v2s, Context context)

throws IOException, InterruptedException {

double in_sum = 0;

double out_sum = 0;

for (InfoBean bean : v2s) {

in_sum += bean.getIncome();

out_sum += bean.getExpenses();

}

bean.set("", in_sum, out_sum);

context.write(key, bean);

}

}

}

SortStep类

import java.io.IOException;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class SortStep {

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

Job job = new Job();

job.setJarByClass(SortStep.class);

job.setMapperClass(SortMapper.class);

job.setMapOutputKeyClass(InfoBean.class);

job.setMapOutputValueClass(NullWritable.class);

FileInputFormat.setInputPaths(job, new Path(args[0]));

job.setReducerClass(SortReducer.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(InfoBean.class);

FileOutputFormat.setOutputPath(job, new Path(args[1]));

job.waitForCompletion(true);

}

public static class SortMapper extends Mapper<LongWritable, Text, InfoBean, NullWritable>{

private InfoBean bean = new InfoBean();

@Override

protected void map(LongWritable key, Text value, Context context)

throws IOException, InterruptedException {

String line = value.toString();

String[] fields = line.split("\t");

String account = fields[0];

double income = Double.parseDouble(fields[1]);

double expenses = Double.parseDouble(fields[2]);

bean.set(account, income, expenses);

context.write(bean, NullWritable.get());

}

}

public static class SortReducer extends Reducer<InfoBean, NullWritable, Text, InfoBean>{

private Text k = new Text();

@Override

protected void reduce(InfoBean bean, Iterable<NullWritable> v2s, Context context)

throws IOException, InterruptedException {

String account = bean.getAccount();

k.set(account);

context.write(k, bean);

}

}

}

将上述的程序打成jar文件 运行 hadoop jar datasumsort.jar com.hadoop.datasumsort.SumStep /sumandsort /sumandsort-sumdata

继续执行hadoop jar datasumsort.jar com.hadoop.datasumsort.SortStep /sumandsort-sumdata /sumandsort-sortdata

Combiners编程

Map执行完成以后,提交作业前要执行combiner,在这里combiner也有着像reducer进行合并的功能,在WCount程序中,我们可以修改如下:

这样修改后程序的执行经历3个步骤:Map、combiner、reducer

图解:

没有combiner的情形

在这里有两台机器存储着文件,map读取数据记为1并且输出。

Reducer在接收到数据前要经过shuffle过程,经过shuffle后map输出的数据已经分好组。第三台机器运行着reducer。

Reducer接收到的数据格式为<hello ,{1,1,1,1,1}> <kitty,{1}> <tom,1,1,1,1>

有combiner的情形

两种方式的比较:

有combiner的程序适合处理大数据,减少了网络数据的传输,提高了数据的计算效率。

总结:combiner是一个特殊的Reducer,它首先将mapper的输出进行一次合并,然后将合并的结果传给Reducer。

怎么理解combiner是可插拔的?

就是说combiner添加和不添加都是可以的。如果说combiner是可插拔的,那么combiner就只应该适于Reducer的输入 Key/Value 和输出 Key/Valued 的类型是一致的,且不影响最终的结果。

Shuffle

shuffle被称作MapReduce的心脏,是MapReduce的核心。

由上图看出,每个数据切片由一个Map处理,也就是说map只是处理文件的一部分,但是这个文件的切片到底大小如何呢?后面讲解

每一个Map都有一个环形的内存缓冲区,用来存储Map的输出数据,这个内存缓冲区的默认大小是100MB,当数据达到阙值0.8,也就是80MB的时候,一个后台的程序就会把数据溢写到磁盘中。在将数据溢写到磁盘的过程中要经过复杂的过程,首先要将数据进行分区排序(按照分区号如0,1,2),分区完以后为了避免Map输出数据的内存溢出,可以将Map的输出数据分为各个小文件再进行分区,这样map的输出数据就会被分为了具有多个小文件的分区已排序过的数据。然后将各个小文件分区数据进行合并成为一个大的文件(将各个小文件中分区号相同的进行合并)。

这个时候Reducer启动了三个分别为0,1,2。0号Reducer会取得0号分区 的数据;1号Reducer会取得1号分区的数据;2号Reducer会取得2号分区的数据。

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。