总结了一下三个方法:hdfs自带 按字节复制 按行复制 (在java io里还有字符复制,暂且不提)
因为hdfs自带的,不知道为什么有些场合不能用,每次能下载的个数还不一定,所以就考虑自己按照java的方式来复制,就出现第2、3种方法。
有时间好好研究一下IO,比如针对特殊文件,文件复制会出现大小不一样的情况。这里
// void downloadFromHdfs(String hdfsSrc , String localDst)// String hdfsDst = "hdfs://54.0.88.53:8020/user/flume/SyslogNetwork/";// String localDir = "D://flume//";//下载单个文件public static boolean downloadFromHdfs(String hdfsSrc, String localDst) {Configuration conf = new Configuration();Path dst = new Path(hdfsSrc);try {Path Src = new Path(hdfsSrc);String Filename = Src.getName().toString();String local = localDst + Filename;Path Dst = new Path(local);FileSystem fs = FileSystem.get(URI.create(hdfsSrc), conf);FSDataInputStream in = fs.open(Src);OutputStream output = new FileOutputStream(new File(local));IOUtils.copyBytes(in, output, 4096, true);System.out.print(" download successed.");} catch (IOException e) {// TODO Auto-generated catch blocke.printStackTrace();System.out.print(" download failed.");return false;}return true;}//下载目录下所有文件,方法1: IOUtils.copyBytes或者copyToLocalpublic static boolean downFromHdfsDir(String hdfsSrc, String localDst)throws IOException {Configuration conf = new Configuration();Path dstpath = new Path(hdfsSrc);int i = 1;FileSystem fs = FileSystem.get(URI.create(hdfsSrc), conf);try {String subPath = "";FileStatus[] fList = fs.listStatus(dstpath);for (FileStatus f : fList) {if (null != f) {subPath = new StringBuffer().append(f.getPath().getParent()).append("/").append(f.getPath().getName()).toString();if (f.isDir()) {downFromHdfsDir(subPath, localDst);} else {System.out.println("/t/t" + subPath);// hdfs://54.0.88.53:8020/Path dst = new Path(subPath);i++;FSDataInputStream in = null;OutputStream output = null;try {Path Src = new Path(subPath);String Filename = Src.getName().toString();String local = localDst + Filename;Path Dst = new Path(local);FileSystem hdfs = FileSystem.get(URI.create(subPath), conf);in = hdfs.open(Src);output = new FileOutputStream(new File(local));// true-是否关闭数据流,如果是false则在finally里关闭// IOUtils.copyBytes(in, output, 4096, false); IOUtils.copyBytes(in, output, conf); output.flush();System.out.print(" download successed.");} catch (IOException e) {// TODO Auto-generated catch blocke.printStackTrace();System.out.print(" download failed.");} finally {IOUtils.closeStream(in);IOUtils.closeStream(output);}}}}} catch (Exception e) {} finally {System.out.println("the number of files is :" + i);}return true;}//下载目录下所有文件,方法2: 按字节复制public static boolean downFromHdfsDir2(String hdfsSrc, String localDst)throws IOException {Configuration conf = new Configuration();Path dstpath = new Path(hdfsSrc);int i = 1;FileSystem fs = FileSystem.get(URI.create(hdfsSrc), conf);try {String subPath = "";FileStatus[] fList = fs.listStatus(dstpath);for (FileStatus f : fList) {if (null != f) {subPath = new StringBuffer().append(f.getPath().getParent()).append("/").append(f.getPath().getName()).toString();if (f.isDir()) {downFromHdfsDir(subPath, localDst);} else {System.out.println("/t/t" + subPath);// hdfs://54.0.88.53:8020/Path dst = new Path(subPath);i++;try {Path Src = new Path(subPath);String Filename = Src.getName().toString();String local = localDst + Filename;Path Dst = new Path(local);FileSystem localFS = FileSystem.getLocal(conf);FileSystem hdfs = FileSystem.get(URI.create(subPath), conf);FSDataInputStream in = hdfs.open(Src);FSDataOutputStream output = localFS.create(Dst);byte[] buf = new byte[1024];int readbytes = 0;while ((readbytes = in.read(buf)) > 0) {output.write(buf, 0, readbytes);}in.close();output.close();} catch (IOException e) {// TODO Auto-generated catch blocke.printStackTrace();System.out.print(" download failed.");} finally {}}}}} catch (Exception e) {} finally {System.out.println("the number of files is :" + i);}return true;}//下载目录下所有文件,方法2: 按行复制public static boolean downFromHdfsDir3(String hdfsSrc, String localDst)throws IOException {Configuration conf = new Configuration();Path dstpath = new Path(hdfsSrc);int i = 1;FileSystem fs = FileSystem.get(URI.create(hdfsSrc), conf);try {String subPath = "";FileStatus[] fList = fs.listStatus(dstpath);for (FileStatus f : fList) {if (null != f) {subPath = new StringBuffer().append(f.getPath().getParent()).append("/").append(f.getPath().getName()).toString();if (f.isDir()) {downFromHdfsDir(subPath, localDst);} else {System.out.println("/t/t" + subPath);// hdfs://54.0.88.53:8020/Path dst = new Path(subPath);i++;try {Path Src = new Path(subPath);String Filename = Src.getName().toString();String local = localDst + Filename;Path Dst = new Path(local);FileSystem localFS = FileSystem.getLocal(conf);FileSystem hdfs = FileSystem.get(URI.create(subPath), conf);FSDataInputStream in = hdfs.open(Src);BufferedReader read = new BufferedReader(new InputStreamReader(in));BufferedWriter output=new BufferedWriter(new FileWriter(local));String line = null;while ((line = read.readLine()) != null) {output.append(line);output.newLine();output.flush();}in.close();read.close();output.close();} catch (IOException e) {// TODO Auto-generated catch blocke.printStackTrace();System.out.print(" download failed.");} finally {}}}}} catch (Exception e) {} finally {System.out.println("the number of files is :" + i);}return true;}
一次读取整个文件
OutputStream:(一次读入整个文件) 字节private static String readHdfsFile2(FileSystem fs, Path path, String charset) throws IOException { FSDataInputStream hdfsInStream = fs.open(path); ByteArrayOutputStream bos = new ByteArrayOutputStream(); byte[] ioBuffer = new byte[1024]; int readLen = hdfsInStream.read(ioBuffer); while (-1 != readLen) { bos.write(ioBuffer, 0, readLen); readLen = hdfsInStream.read(ioBuffer); } hdfsInStream.close(); return new String(bos.toByteArray(), charset);}
或者FileStatus status = fs.getFileStatus(Src);byte[] buffer = new byte[Integer.parseInt(String.valueOf(status.getLen()))];in.readFully(0, buffer);is.close();fs.close();System.out.println(buffer.toString());
还没有人抢沙发呢~