鲁春利的工作笔记,谁说程序员不能有文艺范?
通过hadoop shell与java api访问hdfs
工作笔记之Hadoop2.6集群搭建已经将集群环境搭建好了,下面来进行一些HDFS的操作
1、HDFS的shell访问
HDFS设计主要用来对海量数据进行处理,即HDFS上存储大量文件。HDFS将这些文件进行分割后存储在不同的DataNode上。HDFS提供了一个shell接口,屏蔽了block存储的内部细节,所有的Hadoop操作均由bin/hadoop脚本引发。
不指定任何参数的hadoop命令将打印所有命令的描述,与hdfs文件相关的操作为hadoop fs(hadoop脚本其他的命令此处不涉及)。[hadoop@nnode~]$hadoopfs
Usage:hadoopfs[genericoptions]
[-appendToFile...]
[-cat[-ignoreCrc]...]
[-checksum...]
[-chgrp[-R]GROUPPATH...]
[-chmod[-R]PATH...]
[-chown[-R][OWNER][:[GROUP]]PATH...]
[-copyFromLocal[-f][-p][-l]...]
[-copyToLocal[-p][-ignoreCrc][-crc]...]
[-count[-q][-h]...]
[-cp[-f][-p|-p[topax]]...]
[-createSnapshot[]]
[-deleteSnapshot]
[-df[-h][...]]
[-du[-s][-h]...]
[-expunge]
[-get[-p][-ignoreCrc][-crc]...]
[-getfacl[-R]]
[-getfattr[-R]{-nname|-d}[-een]]
[-getmerge[-nl]]
[-help[cmd...]]
[-ls[-d][-h][-R][...]]
[-mkdir[-p]...]
[-moveFromLocal...]
[-moveToLocal]
[-mv...]
[-put[-f][-p][-l]...]
[-renameSnapshot]
[-rm[-f][-r|-R][-skipTrash]...]
[-rmdir[--ignore-fail-on-non-empty]
[-setfacl[-R][{-b|-k}{-m|-x}]|[--set]]
[-setfattr{-nname[-vvalue]|-xname}]
[-setrep[-R][-w]...]
[-stat[format]...]
[-tail[-f]]
[-test-[defsz]]
[-text[-ignoreCrc]...]
[-touchz...]
[-usage[cmd...]]
Genericoptionssupportedare
-confspecifyanapplicationconfigurationfile
-Dusevalueforgivenproperty
-fsspecifyanamenode
-jtspecifyaResourceManager
-filesspecifycommaseparatedfilestobecopiedtothemapreducecluster
-libjarsspecifycommaseparatedjarfilestoincludeintheclasspath.
-archivesspecifycommaseparatedarchivestobeunarchivedonthecomputemachines.
Thegeneralcommandlinesyntaxis
bin/hadoopcommand[genericOptions][commandOptions]
hadoop2.6版本中提示hadoop fs为“Deprecated, use hdfs dfs instead.”(2.6之前的版本未接触过,这里就没有深究从哪一个版本开始的,但是hadoop fs仍然可以使用)。[hadoop@nnode~]$hdfsdfs
Usage:hadoopfs[genericoptions]
[-appendToFile...]
[-cat[-ignoreCrc]...]
[-checksum...]
[-chgrp[-R]GROUPPATH...]
[-chmod[-R]PATH...]
[-chown[-R][OWNER][:[GROUP]]PATH...]
[-copyFromLocal[-f][-p][-l]...]
[-copyToLocal[-p][-ignoreCrc][-crc]...]
[-count[-q][-h]...]
[-cp[-f][-p|-p[topax]]...]
[-createSnapshot[]]
[-deleteSnapshot]
[-df[-h][...]]
[-du[-s][-h]...]
[-expunge]
[-get[-p][-ignoreCrc][-crc]...]
[-getfacl[-R]]
[-getfattr[-R]{-nname|-d}[-een]]
[-getmerge[-nl]]
[-help[cmd...]]
[-ls[-d][-h][-R][...]]
[-mkdir[-p]...]
[-moveFromLocal...]
[-moveToLocal]
[-mv...]
[-put[-f][-p][-l]...]
[-renameSnapshot]
[-rm[-f][-r|-R][-skipTrash]...]
[-rmdir[--ignore-fail-on-non-empty]
[-setfacl[-R][{-b|-k}{-m|-x}]|[--set]]
[-setfattr{-nname[-vvalue]|-xname}]
[-setrep[-R][-w]...]
[-stat[format]...]
[-tail[-f]]
[-test-[defsz]]
[-text[-ignoreCrc]...]
[-touchz...]
[-usage[cmd...]]
Genericoptionssupportedare
-confspecifyanapplicationconfigurationfile
-Dusevalueforgivenproperty
-fsspecifyanamenode
-jtspecifyaResourceManager
-filesspecifycommaseparatedfilestobecopiedtothemapreducecluster
-libjarsspecifycommaseparatedjarfilestoincludeintheclasspath.
-archivesspecifycommaseparatedarchivestobeunarchivedonthecomputemachines.
Thegeneralcommandlinesyntaxis
bin/hadoopcommand[genericOptions][commandOptions]
如:[hadoop@nnode~]$hdfsdfs-ls-R/user/hadoop
-rw-r--r--2hadoophadoop2297-06-2914:44/user/hadoop/0913152700.txt.gz
-rw-r--r--2hadoophadoop211-06-2914:45/user/hadoop/0913160307.txt.gz
-rw-r--r--2hadoophadoop93046447-07-1818:01/user/hadoop/apache-hive-1.2.0-bin.tar.gz
-rw-r--r--2hadoophadoop4139112-06-2822:54/user/hadoop/httpInterceptor_192.168.1.101_1_0913160307.txt
-rw-r--r--2hadoophadoop240-05-3020:54/user/hadoop/lucl.gz
-rw-r--r--2hadoophadoop63-05-2723:55/user/hadoop/lucl.txt
-rw-r--r--2hadoophadoop9994248-06-2914:12/user/hadoop/scalog.txt
-rw-r--r--2hadoophadoop2664495-06-2820:54/user/hadoop/scalog.txt.gz
-rw-r--r--3hadoophadoop28026803-06-2421:16/user/hadoop/test.txt.gz
-rw-r--r--2hadoophadoop28551-05-2723:54/user/hadoop/zookeeper.out
[hadoop@nnode~]$
#这里的点为当前目录,我是通过hadoop用户操作的因此类似于/user/hadoop
#hdfs默认具有/user/{hadoop-user},但是在/下也可以自己通过mkdir命令来创建自己的目录
[hadoop@nnode~]$hdfsdfs-ls-R.
-rw-r--r--2hadoophadoop2297-06-2914:440913152700.txt.gz
-rw-r--r--2hadoophadoop211-06-2914:450913160307.txt.gz
-rw-r--r--2hadoophadoop93046447-07-1818:01apache-hive-1.2.0-bin.tar.gz
-rw-r--r--2hadoophadoop4139112-06-2822:54httpInterceptor_192.168.1.101_1_0913160307.txt
-rw-r--r--2hadoophadoop240-05-3020:54lucl.gz
-rw-r--r--2hadoophadoop63-05-2723:55lucl.txt
-rw-r--r--2hadoophadoop9994248-06-2914:12scalog.txt
-rw-r--r--2hadoophadoop2664495-06-2820:54scalog.txt.gz
-rw-r--r--3hadoophadoop28026803-06-2421:16test.txt.gz
-rw-r--r--2hadoophadoop28551-05-2723:54zookeeper.out
[hadoop@nnode~]$
如果不清楚hdfs命令的详细操作,可以查看帮助信息:[hadoop@nnode~]$hdfsdfs-helpls
-ls[-d][-h][-R][...]:
Listthecontentsthatmatchthespecifiedfilepattern.Ifpathisnot
specified,thecontentsof/user/willbelisted.Directoryentriesareoftheform:
permissions-userIdgroupIdsizeOfDirectory(inbytes)
modificationDate(yyyy-MM-ddHH:mm)directoryName
andfileentriesareoftheform:
permissionsnumberOfReplicasuserIdgroupIdsizeOfFile(inbytes)
modificationDate(yyyy-MM-ddHH:mm)fileName
-dDirectoriesarelistedasplainfiles.
-hFormatsthesizesoffilesinahuman-readablefashionratherthananumberofbytes.
-RRecursivelylistthecontentsofdirectories.
[hadoop@nnode~]$
2、HDFS的Java API访问
Hadoop中通过DataNode节点存储数据,而NameNode节点则记录数据的存储位置。Hadoop中各部分的通信基于RPC来实现,NameNode也是hadoop中RPC的server端(dfs.namenode.rpc-address说明了rpc端的主机名和端口号),而Hadoop提供的FileSystem类为hadoop中RPC Client的抽象实现。
a.) 通过java.util.URL来读取hdfs的数据
为了让java程序能够识别Hadoop的hdfs URL需要通过URL的setURLStreamHandlerFactory(...);
每个Java虚拟机只能调用依次这个方法,因此通常在静态方法中调用。packagecom.invic.hdfs;
importjava.io.IOException;
importjava.io.InputStream;
importjava.io.OutputStream;
.URL;
importorg.apache.hadoop.fs.FsUrlStreamHandlerFactory;
importorg.apache.hadoop.io.IOUtils;
/**
*
*@authorlucl
*@通过javaapi来访问hdfs上特定的数据
*
*/
publicclassMyHdfsOfJavaApi{
static{
/**
*为了让java程序能够识别hadoop的hdfsurl需要配置额外的URLStreamHandlerFactory
*如下方法java虚拟机只能调用一次,若原有的其他程序已经声明过该factory,则我的java程序将无法从hadoop中读取数据
*/
URL.setURLStreamHandlerFactory(newFsUrlStreamHandlerFactory());
}
publicstaticvoidmain(String[]args)throwsIOException{
Stringpath="hdfs://nnode:8020/user/hadoop/lucl.txt";
InputStreamin=newURL(path).openStream();
OutputStreamou=System.out;
intbuffer=4096;
booleanclose=false;
IOUtils.copyBytes(in,ou,buffer,close);
IOUtils.closeStream(in);
}
}
b.) 通过Hadoop的FileSystem来访问HDFS
Hadoop有一个抽象的文件系统概念,HDFS只是其中的一个实现。java抽象类org.apache.hadoop.fs.FileSystem定义了Hadoop中的一个文件系统接口。java.lang.Object
org.apache.hadoop.conf.Configured
org.apache.hadoop.fs.FileSystem
|--org.apache.hadoop.fs.FilterFileSystem
|----org.apache.hadoop.fs.ChecksumFileSystem
|----org.apache.hadoop.fs.LocalFileSystem
|--org.apache.hadoop.fs.ftp.FTPFileSystem
|--org.apache.hadoop.fs.s3native.NativeS3FileSystem
|--org.apache.hadoop.fs.RawLocalFileSystem
|--org.apache.hadoop.fs.viewfs.ViewFileSystempackagecom.invic.hdfs;
importjava.io.IOException;
importjava.io.OutputStream;
.URI;
importjava.util.Scanner;
importorg.apache.hadoop.conf.Configuration;
importorg.apache.hadoop.fs.FSDataInputStream;
importorg.apache.hadoop.fs.FSDataOutputStream;
importorg.apache.hadoop.fs.FileStatus;
importorg.apache.hadoop.fs.FileSystem;
importorg.apache.hadoop.fs.FileUtil;
importorg.apache.hadoop.fs.LocatedFileStatus;
importorg.apache.hadoop.fs.Path;
importorg.apache.hadoop.fs.PathFilter;
importorg.apache.hadoop.fs.RemoteIterator;
importorg.apache.hadoop.io.IOUtils;
importorg.apache.hadoop.util.Progressable;
/**
*
*@authorlucl
*@通过FileSystemAPI来实现
*FileSystemget(Configuration)通过设置配置文件core-site.xml读取类路径来实现,默认本地文件系统
*FileSystemget(URI,Configuration)通过URI来设定要使用的文件系统
*FileSystemget(URI,Configuration,user)作为给定用户来访问文件系统,对安全来说至关重要
*/
publicclassMyHdfsOfFS{
privatestaticStringHOST="hdfs://nnode";
privatestaticStringPORT="8020";
privatestaticStringNAMENODE=HOST+":"+PORT;
publicstaticvoidmain(String[]args)throwsIOException{
Configurationconf=newConfiguration();
Stringpath=NAMENODE+"/user/";
/**
*由于这里设计的为hadoop的user目录,默认会查询hdfs的用户家目录下的文件
*/
Stringuser="hadoop";
FileSystemfs=null;
try{
fs=FileSystem.get(URI.create(path),conf,user);
}catch(InterruptedExceptione){
e.printStackTrace();
}
if(null==fs){
return;
}
/**
*递归创建目录
*/
booleanmkdirs=fs.mkdirs(newPath("invic/test/mvtech"));
if(mkdirs){
System.out.println("Dir‘invic/test/mvtech’createsuccess.");
}
/**
*判断目录是否存在
*/
booleanexists=fs.exists(newPath("invic/test/mvtech"));
if(exists){
System.out.println("Dir‘invic/test/mvtech’exists.");
}
/**
*FSDataInputStream支持随意位置访问
*这里的lucl.txt默认查找路径为/user/Administrator/lucl.txt
因为我是windows的eclipse
*如果我上面的get方法最后指定了user
则查询的路径为/user/get方法指定的user/lucl.txt
*/
FSDataInputStreamin=fs.open(newPath("lucl.txt"));
OutputStreamos=System.out;
intbuffSize=4098;
booleanclose=false;
IOUtils.copyBytes(in,os,buffSize,close);
System.out.println("\r\n跳到文件开始重新读取文件。。。。。。");
in.seek(0);
IOUtils.copyBytes(in,os,buffSize,close);
IOUtils.closeStream(in);
/**
*创建文件
*/
FSDataOutputStreamcreate=fs.create(newPath("sample.txt"));
create.write("Thisismyfirstsamplefile.".getBytes());
create.flush();
create.close();
/**
*文件拷贝
*/
fs.copyFromLocalFile(newPath("F:\\Mvtech\\ftpfile\\cg-.csv"),
newPath("cg-.csv"));
/**
*文件追加
*/
FSDataOutputStreamappend=fs.append(newPath("sample.txt"));
append.writeChars("\r\n");
append.writeChars("Newday,newWorld.");
append.writeChars("\r\n");
IOUtils.closeStream(append);
/**
*progress的使用
*/
FSDataOutputStreamprogress=fs.create(newPath("progress.txt"),
newProgressable(){
@Override
publicvoidprogress(){
System.out.println("writeisinprogress......");
}
});
//接收键盘输入到hdfs上
Scannersc=newScanner(System.in);
System.out.print("Pleasetypeyourenter:");
Stringname=sc.nextLine();
while(!"quit".equals(name)){
if(null==name||"".equals(name.trim())){
continue;
}
progress.writeChars(name);
System.out.print("Pleasetypeyourenter:");
name=sc.nextLine();
}
/**
*递归列出文件
*/
RemoteIteratorit=fs.listFiles(newPath(path),true);
while(it.hasNext()){
LocatedFileStatusloc=it.next();
System.out.println(loc.getPath().getName()+"|"+loc.getLen()+"|"
+loc.getOwner());
}
/**
*文件或目录元数据:文件长度、块大小、复本、修改时间、所有者及权限信息
*/
FileStatusstatus=fs.getFileStatus(newPath("lucl.txt"));
System.out.println(status.getPath().getName()+"|"+
status.getPath().getParent().getName()+"|"+status.getBlockSize()+"|"
+status.getReplication()+"|"+status.getOwner());
/**
*列出目录中文件listStatus,若参数为文件则以数组方式返回长度为1的FileStatus对象
*/
fs.listStatus(newPath(path));
fs.listStatus(newPath(path),newPathFilter(){
@Override
publicbooleanaccept(PathtmpPath){
StringtmpName=tmpPath.getName();
if(tmpName.endsWith(".txt")){
returntrue;
}
returnfalse;
}
});
//可以传入一组路径,会最终累计合并成一个数组返回
//fs.listStatus(Path[]files);
FileStatus[]mergeStatus=fs.listStatus(newPath[]{newPath("lucl.txt"),
newPath("progress.txt"),newPath("sample.txt")});
Path[]listPaths=FileUtil.stat2Paths(mergeStatus);
for(Pathp:listPaths){
System.out.println(p);
}
/**
*文件模式匹配
*/
FileStatus[]patternStatus=fs.globStatus(newPath("*.txt"));
for(FileStatusstat:patternStatus){
System.out.println(stat.getPath());
}
/**
*删除数据
*/
booleanrecursive=true;
fs.delete(newPath("demo.txt"),recursive);
fs.close();
}
}
c.) 访问HDFS集群packagecom.invic.hdfs;
importjava.io.IOException;
importorg.apache.hadoop.conf.Configuration;
importorg.apache.hadoop.fs.FileSystem;
importorg.apache.hadoop.fs.LocatedFileStatus;
importorg.apache.hadoop.fs.Path;
importorg.apache.hadoop.fs.RemoteIterator;
importorg.apache.log4j.Logger;
/**
*
*@authorlucl
*@通过访问hadoop集群来访问hdfs
*
*/
publicclassMyClusterHdfs{
publicstaticvoidmain(String[]args)throwsIOException{
System.setProperty("hadoop.home.dir","E:\\Hadoop\\hadoop-2.6.0\\hadoop-2.6.0\\");
Loggerlogger=Logger.getLogger(MyClusterHdfs.class);
Configurationconf=newConfiguration();
conf.set("fs.defaultFS","hdfs://cluster");
conf.set("dfs.nameservices","cluster");
conf.set("dfs.ha.namenodes.cluster","nn1,nn2");
conf.set("dfs.namenode.rpc-address.cluster.nn1","nnode:8020");
conf.set("dfs.namenode.rpc-address.cluster.nn2","dnode1:8020");
conf.set("dfs.client.failover.proxy.provider.cluster",
"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
FileSystemfs=FileSystem.get(conf);
RemoteIteratorit=fs.listFiles(newPath("/"),true);
while(it.hasNext()){
LocatedFileStatusloc=it.next();
logger.info(loc.getPath().getName()+"|"+loc.getLen()+loc.getOwner());
}
/*for(inti=0;i
Stringstr="thesequenceis"+i;
logger.info(str);
}*/
try{
Thread.sleep(10);
}catch(InterruptedExceptione){
e.printStackTrace();
}
System.exit(0);
}
}
说明:System.setProperty("hadoop.home.dir","E:\\Hadoop\\hadoop-2.6.0\\hadoop-2.6.0\\");
#在main方法的第一行配置hadoop的home路径,否则在Windows下可能报错如下:
15/07/1922:05:54DEBUGutil.Shell:Failedtodetectavalidhadoophomedirectory
java.io.IOException:HADOOP_HOMEorhadoop.home.dirarenotset.
atorg.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:302)
atorg.apache.hadoop.util.Shell.(Shell.java:327)
atorg.apache.hadoop.util.GenericOptionsParser.preProcessForWindows(GenericOptionsParser.java:438)
atorg.apache.hadoop.util.GenericOptionsParser.parseGeneralOptions(GenericOptionsParser.java:484)
atorg.apache.hadoop.util.GenericOptionsParser.(GenericOptionsParser.java:170)
atorg.apache.hadoop.util.GenericOptionsParser.(GenericOptionsParser.java:153)
atorg.apache.hadoop.util.ToolRunner.run(ToolRunner.java:64)
atorg.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)
atcom.invic.mapreduce.wordcount.WordCounterTool.main(WordCounterTool.java:29)
15/07/1922:05:54ERRORutil.Shell:Failedtolocatethewinutilsbinaryinthehadoopbinarypath
java.io.IOException:Couldnotlocateexecutablenull\bin\winutils.exeintheHadoopbinaries.
atorg.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:355)
atorg.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:370)
atorg.apache.hadoop.util.Shell.(Shell.java:363)
atorg.apache.hadoop.util.GenericOptionsParser.preProcessForWindows(GenericOptionsParser.java:438)
atorg.apache.hadoop.util.GenericOptionsParser.parseGeneralOptions(GenericOptionsParser.java:484)
atorg.apache.hadoop.util.GenericOptionsParser.(GenericOptionsParser.java:170)
atorg.apache.hadoop.util.GenericOptionsParser.(GenericOptionsParser.java:153)
atorg.apache.hadoop.util.ToolRunner.run(ToolRunner.java:64)
atorg.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)
atcom.invic.mapreduce.wordcount.WordCounterTool.main(WordCounterTool.java:29)