200字范文,内容丰富有趣,生活中的好帮手!
200字范文 > java 分块上传_Java 文件分块上传客户端和服务器端源代码

java 分块上传_Java 文件分块上传客户端和服务器端源代码

时间:2024-06-12 09:35:26

相关推荐

java 分块上传_Java 文件分块上传客户端和服务器端源代码

本博客介绍如何进行文件的分块上传。本文侧重介绍客户端,服务器端请参考博客《Java 文件分块上传服务器端源代码》。建议读者朋友在阅读本文代码前先了解一下 MIME 协议。

所谓分块上传并非把大文件进行物理分块,然后挨个上传,而是依次读取大文件的一部分文件流进行上传。分块,倒不如说分流比较切实。本文通过一个项目中的示例,说明使用 Apache 的 HttpComponents/HttpClient 对大文件进行分块上传的过程。示例使用的版本是

HttpComponents Client 4.2.1。

本文仅以一小 demo 功能性地解释 HttpComponents/HttpClient 分块上传,没有考虑 I/O 关闭、多线程等资源因素,读者可以根据自己的项目酌情处理。

本文核心思想及流程:以 100 MB 大小为例,大于 100 MB 的进行分块上传,否则整块上传。对于大于 100 MB 的文件,又以 100 MB 为单位进行分割,保证每次以不大于 100 MB 的大小进行上传。比如 304 MB 的一个文件会分为 100 MB、100

MB、100 MB、4 MB 等四块依次上传。第一次读取 0 字节开始的 100 MB 个字节,上传;第二次读取第 100 MB 字节开始的 100 MB 个字节,上传;第三次读取第 200 MB 字节开始的 100 MB 个字节,上传;第四次读取最后剩下的 4 MB 个字节进行上传。

自定义的 ContentBody 源码如下,其中定义了流的读取和输出:

packagecom.mon.util.block;

importjava.io.File;

importjava.io.IOException;

importjava.io.OutputStream;

importjava.io.RandomAccessFile;

importorg.apache.http.entity.mime.content.AbstractContentBody;

importcom.defonds.rtupload.GlobalConstant;

publicclassBlockStreamBodyextendsAbstractContentBody{

//给MultipartEntity看的2个参数

privatelongblockSize=0;//本次分块上传的大小

privateStringfileName=null;//上传文件名

//writeTo需要的3个参数

privateintblockNumber=0,blockIndex=0;//blockNumber分块数;blockIndex当前第几块

privateFiletargetFile=null;//要上传的文件

privateBlockStreamBody(StringmimeType){

super(mimeType);

//TODOAuto-generatedconstructorstub

}

/**

*自定义的ContentBody构造子

*@paramblockNumber分块数

*@paramblockIndex当前第几块

*@paramtargetFile要上传的文件

*/

publicBlockStreamBody(intblockNumber,intblockIndex,FiletargetFile){

this("application/octet-stream");

this.blockNumber=blockNumber;//blockNumber初始化

this.blockIndex=blockIndex;//blockIndex初始化

this.targetFile=targetFile;//targetFile初始化

this.fileName=targetFile.getName();//fileName初始化

//blockSize初始化

if(blockIndex

this.blockSize=GlobalConstant.CLOUD_API_LOGON_SIZE;

}else{//最后一块

this.blockSize=targetFile.length()-GlobalConstant.CLOUD_API_LOGON_SIZE*(blockNumber-1);

}

}

@Override

publicvoidwriteTo(OutputStreamout)throwsIOException{

byteb[]=newbyte[1024];//暂存容器

RandomAccessFileraf=newRandomAccessFile(targetFile,"r");//负责读取数据

if(blockIndex==1){//第一块

intn=0;

longreadLength=0;//记录已读字节数

while(readLength<=blockSize-1024){//大部分字节在这里读取

n=raf.read(b,0,1024);

readLength+=1024;

out.write(b,0,n);

}

if(readLength<=blockSize){//余下的不足1024个字节在这里读取

n=raf.read(b,0,(int)(blockSize-readLength));

out.write(b,0,n);

}

}elseif(blockIndex

raf.seek(GlobalConstant.CLOUD_API_LOGON_SIZE*(blockIndex-1));//跳过前[块数*固定大小]个字节

intn=0;

longreadLength=0;//记录已读字节数

while(readLength<=blockSize-1024){//大部分字节在这里读取

n=raf.read(b,0,1024);

readLength+=1024;

out.write(b,0,n);

}

if(readLength<=blockSize){//余下的不足1024个字节在这里读取

n=raf.read(b,0,(int)(blockSize-readLength));

out.write(b,0,n);

}

}else{//最后一块

raf.seek(GlobalConstant.CLOUD_API_LOGON_SIZE*(blockIndex-1));//跳过前[块数*固定大小]个字节

intn=0;

while((n=raf.read(b,0,1024))!=-1){

out.write(b,0,n);

}

}

//TODO最后不要忘掉关闭out/raf

}

@Override

publicStringgetCharset(){

//TODOAuto-generatedmethodstub

returnnull;

}

@Override

publicStringgetTransferEncoding(){

//TODOAuto-generatedmethodstub

return"binary";

}

@Override

publicStringgetFilename(){

//TODOAuto-generatedmethodstub

returnfileName;

}

@Override

publiclonggetContentLength(){

//TODOAuto-generatedmethodstub

returnblockSize;

}

}

在自定义的 HttpComponents/HttpClient 工具类 HttpClient4Util 里进行分块上传的封装:

publicstaticStringrestPost(StringserverURL,FiletargetFile,MapmediaInfoMap){

Stringcontent="";

try{

DefaultHttpClienthttpClient=newDefaultHttpClient();

HttpPostpost=newHttpPost(serverURL+"?");

httpClient.getParams().setParameter("http.socket.timeout",60*60*1000);

MultipartEntitympEntity=newMultipartEntity();

Listkeys=newArrayList(mediaInfoMap.keySet());

Collections.sort(keys,String.CASE_INSENSITIVE_ORDER);

for(Iteratoriterator=keys.iterator();iterator.hasNext();){

Stringkey=iterator.next();

if(StringUtils.isNotBlank(mediaInfoMap.get(key))){

mpEntity.addPart(key,newStringBody(mediaInfoMap.get(key)));

}

}

if(targetFile!=null&&targetFile.exists()){

ContentBodycontentBody=newFileBody(targetFile);

mpEntity.addPart("file",contentBody);

}

post.setEntity(mpEntity);

HttpResponseresponse=httpClient.execute(post);

content=EntityUtils.toString(response.getEntity());

httpClient.getConnectionManager().shutdown();

}catch(Exceptione){

e.printStackTrace();

}

System.out.println("=====RequestUrl==========================\n"

+getRequestUrlStrRest(serverURL,mediaInfoMap).replaceAll("&fmt=json",""));

System.out.println("=====content==========================\n"+content);

returncontent.trim();

}

其中 "file" 是分块上传服务器对分块文件参数定义的名字。细心的读者会发现,整块文件上传直接使用 Apache 官方的InputStreamBody,而分块才使用自定义的BlockStreamBody。

最后调用 HttpClient4Util 进行上传:

publicstaticMapuploadToDrive(

Mapparams,Stringdomain){

FiletargetFile=newFile(params.get("filePath"));

longtargetFileSize=targetFile.length();

intmBlockNumber=0;

if(targetFileSize

mBlockNumber=1;

}else{

mBlockNumber=(int)(targetFileSize/GlobalConstant.CLOUD_API_LOGON_SIZE);

longsomeExtra=targetFileSize

%GlobalConstant.CLOUD_API_LOGON_SIZE;

if(someExtra>0){

mBlockNumber++;

}

}

params.put("blockNumber",Integer.toString(mBlockNumber));

if(domain!=null){

LOG.debug("Drive---domain="+domain);

LOG.debug("drive---url="+"http://"+domain+"/sync"

+GlobalConstant.CLOUD_API_PRE_UPLOAD_PATH);

}else{

LOG.debug("Drive---domain=null");

}

StringresponseBodyStr=HttpClient4Util.getRest("http://"+domain

+"/sync"+GlobalConstant.CLOUD_API_PRE_UPLOAD_PATH,params);

ObjectMappermapper=newObjectMapper();

DrivePreInforesult;

try{

result=mapper.readValue(responseBodyStr,ArcDrivePreInfo.class);

}catch(IOExceptione){

LOG.error("Drive.preUploadToArcDriveerror.",e);

thrownewRtuploadException(GlobalConstant.ERROR_CODE_13001);//TODO

}

//JSONObjectjsonObject=JSONObject.fromObject(responseBodyStr);

if(Integer.valueOf(result.getRc())==0){

intuuid=result.getUuid();

StringupsServerUrl=result.getUploadServerUrl().replace("https",

"http");

if(uuid!=-1){

upsServerUrl=upsServerUrl

+GlobalConstant.CLOUD_API_UPLOAD_PATH;

params.put("uuid",String.valueOf(uuid));

for(inti=1;i<=mBlockNumber;i++){

params.put("blockIndex",""+i);

HttpClient4Util.restPostBlock(upsServerUrl,targetFile,

params);//

}

}

}else{

thrownewRtuploadException(GlobalConstant.ERROR_CODE_13001);//TODO

}

returnnull;

}

其中 params 这个 Map 里封装的是服务器分块上传所需要的一些参数,而上传块数也在这里进行确定。

本文中的示例经本人测试能够上传大文件成功,诸如 *.mp4 的文件上传成功没有出现任何问题。如果读者朋友测试时遇到问题无法上传成功,请在博客后跟帖留言,大家共同交流下。本文示例肯定还存在很多不足之处,如果读者朋友发现还请留言指出,笔者先行谢过了。

本博客将介绍如何进行文件的分块上传。如果读者还想了解文件的“分块”下载相关内容可以去参考博客《Java

服务器端支持断点续传的源代码【支持快车、迅雷】》。

本文侧重介绍服务器端,客户端端请参考本篇博客的姊妹篇《Java

文件分块上传客户端源代码》,关于分块上传的思想及其流程,已在该博客中进行了详细说明,这里不再赘述。

直接上代码。接收客户端 HTTP 分块上传请求的 Spring MVC 控制器源代码如下:

@Controller

publicclassUploadControllerextendsBaseController{

privatestaticfinalLoglog=LogFactory.getLog(UploadController.class);

privateUploadServiceuploadService;

privateAuthServiceauthService;

/**

*大文件分成小文件块上传,一次传递一块,最后一块上传成功后,将合并所有已经上传的块,保存到FileServer

*上相应的位置,并返回已经成功上传的文件的详细属性.当最后一块上传完毕,返回上传成功的信息。此时用getFileList查询该文件,

*该文件的uploadStatus为2。client请自行处理该状态下文件如何显示。(forUPSServer)

*

*/

@RequestMapping("/core/v1/file/upload")

@ResponseBody

publicObjectupload(HttpServletResponseresponse,

@RequestParam(value="client_id",required=false)Stringappkey,

@RequestParam(value="sig",required=false)Stringappsig,

@RequestParam(value="token",required=false)Stringtoken,

@RequestParam(value="uuid",required=false)Stringuuid,

@RequestParam(value="block",required=false)StringblockIndex,

@RequestParam(value="file",required=false)MultipartFilemultipartFile,

@RequestParamMapparameters){

checkEmpty(appkey,BaseException.ERROR_CODE_16002);

checkEmpty(token,BaseException.ERROR_CODE_16007);

checkEmpty(uuid,BaseException.ERROR_CODE_20016);

checkEmpty(blockIndex,BaseException.ERROR_CODE_20006);

checkEmpty(appsig,BaseException.ERROR_CODE_10010);

if(multipartFile==null){

thrownewBaseException(BaseException.ERROR_CODE_20020);//上传文件不存在

}

LonguuidL=parseLong(uuid,BaseException.ERROR_CODE_20016);

IntegerblockIndexI=parseInt(blockIndex,BaseException.ERROR_CODE_20006);

MapappMap=getAuthService().validateSigature(parameters);

AccessTokenaccessToken=CasUtil.checkAccessToken(token,appMap);

Longuid=accessToken.getUid();

StringbucketUrl=accessToken.getBucketUrl();

//从上传目录拷贝文件到工作目录

StringfileAbsulutePath=null;

try{

fileAbsulutePath=this.copyFile(multipartFile.getInputStream(),multipartFile.getOriginalFilename());

}catch(IOExceptionioe){

log.error(ioe.getMessage(),ioe);

thrownewBaseException(BaseException.ERROR_CODE_20020);//上传文件不存在

}

FileuploadedFile=newFile(Global.UPLOAD_TEMP_DIR+fileAbsulutePath);

checkEmptyFile(uploadedFile);//file非空验证

Objectrs=uploadService.upload(uuidL,blockIndexI,uid,uploadedFile,bucketUrl);

setHttpStatusOk(response);

returnrs;

}

//TODO查看下这里是否有问题

//上传文件非空验证

privatevoidcheckEmptyFile(Filefile){

if(file==null||file.getAbsolutePath()==null){

thrownewBaseException(BaseException.ERROR_CODE_20020);//上传文件不存在

}

}

/**

*写文件到本地文件夹

*

*@throwsIOException

*返回生成的文件名

*/

privateStringcopyFile(InputStreaminputStream,StringfileName){

OutputStreamoutputStream=null;

StringtempFileName=null;

intpointPosition=fileName.lastIndexOf(".");

if(pointPosition<0){//myvedio

tempFileName=UUID.randomUUID().toString();//94d1d2e0-9aad-4dd8-a0f6-494b0099ff26

}else{//myvedio.flv

tempFileName=UUID.randomUUID()+fileName.substring(pointPosition);//94d1d2e0-9aad-4dd8-a0f6-494b0099ff26.flv

}

try{

outputStream=newFileOutputStream(Global.UPLOAD_TEMP_DIR+tempFileName);

intreadBytes=0;

byte[]buffer=newbyte[10000];

while((readBytes=inputStream.read(buffer,0,10000))!=-1){

outputStream.write(buffer,0,readBytes);

}

returntempFileName;

}catch(IOExceptionioe){

//log.error(ioe.getMessage(),ioe);

thrownewBaseException(BaseException.ERROR_CODE_20020);//上传文件不存在

}finally{

if(outputStream!=null){

try{

outputStream.close();

}catch(IOExceptione){

}

}

if(inputStream!=null){

try{

inputStream.close();

}catch(IOExceptione){

}

}

}

}

/**

*测试此服务是否可用

*

*@paramresponse

*@return

*@authorzwq7978

*/

@RequestMapping("/core/v1/file/testServer")

@ResponseBody

publicObjecttestServer(HttpServletResponseresponse){

setHttpStatusOk(response);

returnGlobal.SUCCESS_RESPONSE;

}

publicUploadServicegetUploadService(){

returnuploadService;

}

publicvoidsetUploadService(UploadServiceuploadService){

this.uploadService=uploadService;

}

publicvoidsetAuthService(AuthServiceauthService){

this.authService=authService;

}

publicAuthServicegetAuthService(){

returnauthService;

}

}

比如要上传的文件是 test450k.mp4。对照《Java

文件分块上传客户端源代码》中分块上传服务器对分块文件参数定义的名字"file",upload 方法里使用的是 MultipartFile 接收该对象。对于每次的 HTTP 请求,使用 copyFile 方法将文件流输出到服务器本地的一个临时文件夹里,比如作者的是 D:/defonds/syncPath/uploadTemp,该文件下会有

50127019-b63b-4a54-8f53-14efd1e58ada.mp4 临时文件生成用于保存上传文件流。

分块依次上传。当所有块都上传完毕之后,将这些临时文件都转移到服务器指定目录中,比如作者的这个目录是 D:/defonds/syncPath/file,在该文件夹下会有/1/temp_dir_5_1 目录生成,而 uploadTemp 的临时文件则被挨个转移到这个文件夹下,生成形如 5.part0001 的文件。以下是文件转移的源代码:

/**

*把所有块从临时文件目录移到指定本地目录或S2/S3

*

*@parampreUpload

*/

privatevoidmoveBlockFiles(BlockPreuploadFileInfopreUpload){

@SuppressWarnings("unchecked")

String[]s3BlockUrl=newString[preUpload.getBlockNumber()];

String[]localBlockUrl=newString[preUpload.getBlockNumber()];//本地的块文件路径以便以后删除

Listblocks=(List)getBaseDao().queryForList(

"upload.getBlockUploadFileByUuid",preUpload.getUuid());

StringtempDirName=SyncUtil.getTempDirName(preUpload.getUuid(),preUpload.getUid());

StringparentPath=Global.UPLOAD_ABSOLUTE_PAHT_+Global.PATH_SEPARATIVE_SIGN

+String.valueOf(preUpload.getUid());

StringdirPath=parentPath+Global.PATH_SEPARATIVE_SIGN+tempDirName;

newFile(dirPath).mkdirs();//创建存放块文件的文件夹(本地)

intj=0;

for(BlockUploadInfoinfo:blocks){

try{

StringstrBlockIndex=createStrBlockIndex(info.getBlockIndex());

StringsuffixPath=preUpload.getUuid()+".part"+strBlockIndex;

StringtempFilePath=info.getTempFile();

FiletempFile=newFile(tempFilePath);

FiletmpFile=newFile(dirPath+suffixPath);

if(tmpFile.exists()){

FileUtils.deleteQuietly(tmpFile);

}

FileUtils.moveFile(tempFile,tmpFile);

localBlockUrl[j]=dirPath+suffixPath;

j++;

info.setStatus(Global.MOVED_TO_NEWDIR);

getBaseDao().update("upload.updateBlockUpload",info);

if(log.isInfoEnabled())

log.info(preUpload.getUuid()+""+info.getBuId()+"moveBlockFiles");

}catch(IOExceptione){

log.error(e.getMessage(),e);

thrownewBaseException("filenotfound");

}

}

preUpload.setLocalBlockUrl(localBlockUrl);

preUpload.setDirPath(dirPath);

preUpload.setStatus(Global.MOVED_TO_NEWDIR);

getBaseDao().update("upload.updatePreUploadInfo",preUpload);

}

privateStringcreateStrBlockIndex(intblockIndex){

StringstrBlockIndex;

if(blockIndex<10){

strBlockIndex="000"+blockIndex;

}elseif(10<=blockIndex&&blockIndex<100){

strBlockIndex="00"+blockIndex;

}elseif(100<=blockIndex&&blockIndex<1000){

strBlockIndex="0"+blockIndex;

}else{

strBlockIndex=""+blockIndex;

}

returnstrBlockIndex;

}

最后是文件的组装源代码:

/**

*组装文件

*

*/

privatevoidassembleFileWithBlock(BlockPreuploadFileInfopreUpload){

StringdirPath=preUpload.getDirPath();

//开始在指定目录组装文件

StringuploadedUrl=null;

String[]separatedFiles;

String[][]separatedFilesAndSize;

intfileNum=0;

Filefile=newFile(dirPath);

separatedFiles=file.list();

separatedFilesAndSize=newString[separatedFiles.length][2];

Arrays.sort(separatedFiles);

fileNum=separatedFiles.length;

for(inti=0;i

separatedFilesAndSize[i][0]=separatedFiles[i];

StringfileName=dirPath+separatedFiles[i];

FiletmpFile=newFile(fileName);

longfileSize=tmpFile.length();

separatedFilesAndSize[i][1]=String.valueOf(fileSize);

}

RandomAccessFilefileReader=null;

RandomAccessFilefileWrite=null;

longalreadyWrite=0;

intlen=0;

byte[]buf=newbyte[1024];

try{

uploadedUrl=Global.UPLOAD_ABSOLUTE_PAHT_+Global.PATH_SEPARATIVE_SIGN+preUpload.getUid()+Global.PATH_SEPARATIVE_SIGN+preUpload.getUuid();

fileWrite=newRandomAccessFile(uploadedUrl,"rw");

for(inti=0;i

fileWrite.seek(alreadyWrite);

//读取

fileReader=newRandomAccessFile((dirPath+separatedFilesAndSize[i][0]),"r");

//写入

while((len=fileReader.read(buf))!=-1){

fileWrite.write(buf,0,len);

}

fileReader.close();

alreadyWrite+=Long.parseLong(separatedFilesAndSize[i][1]);

}

fileWrite.close();

preUpload.setStatus(Global.ASSEMBLED);

preUpload.setServerPath(uploadedUrl);

getBaseDao().update("upload.updatePreUploadInfo",preUpload);

if(Global.BLOCK_UPLOAD_TO!=Global.BLOCK_UPLOAD_TO_LOCAL)

{

//组装完毕没有问题删除掉S2/S3上的block

String[]path=preUpload.getS3BlockUrl();

for(Stringstring:path){

try{

if(Global.BLOCK_UPLOAD_TO==Global.BLOCK_UPLOAD_TO_S2)

{

S2Util.deleteFile(preUpload.getBucketUrl(),string);

}else

{

S3Util.deleteFile(preUpload.getBucketUrl(),string);

}

}catch(Exceptione){

log.error(e.getMessage(),e);

}

}

}

if(log.isInfoEnabled())

log.info(preUpload.getUuid()+"assembleFileWithBlock");

}catch(IOExceptione){

log.error(e.getMessage(),e);

try{

if(fileReader!=null){

fileReader.close();

}

if(fileWrite!=null){

fileWrite.close();

}

}catch(IOExceptionex){

log.error(e.getMessage(),e);

}

}

}

BlockPreuploadFileInfo 是我们自定义的业务文件处理 bean。

OK,分块上传的服务器、客户端源代码及其工作流程至此已全部介绍完毕,以上源代码全部是经过项目实践过的,大部分现在仍运行于一些项目之中。有兴趣的朋友可以自己动手,将以上代码自行改造,看看能否运行成功。如果遇到问题可以在本博客下跟帖留言,大家一起讨论讨论。

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。