vlambda博客
学习文章列表

Java大文件读取解析工具,支持G级别文件读取

最近经常在做一些数据库导出来的大文件结果csv文件进行读取解析,按照特定的业务逻辑进行处理数据,文件都是几G 的大文件,常规的一些工具打开文件特别慢甚至无法打开,更别谈后续的一些特定规则的解析和处理,本文就Java读取大文件的方法 按字节读取,并封装了比较利于拓展的接口和案例,供参考

import org.apache.commons.lang.StringUtils;import org.apache.http.util.CharArrayBuffer;
import java.io.BufferedWriter;import java.io.FileInputStream;import java.io.IOException;import java.nio.MappedByteBuffer;import java.nio.channels.FileChannel;import java.nio.file.Files;import java.nio.file.OpenOption;import java.nio.file.Paths;import java.nio.file.StandardOpenOption;import java.util.Arrays;import java.util.List;import java.util.regex.Matcher;import java.util.regex.Pattern;
/** * 超大文件读取处理工具类,理论上可以支持无限大的文件字节流式读取 * 读取流程: * 核心使用NIO 的 FileChannel 和 MappedByteBuffer 类 * 将大文件的字节数按照Integer.MAX_VALUE-1 的大小分成若干块 * 然后使用 FileChannel.map 方法生成每一块的 MappedByteBuffer * 使用 MappedByteBuffer 按字节读取,遇到\n 换行,获取一行数据,执行处理类处理方法 */public class HugeFileProcessUtil {
//字节数进度数字长度,用于退格刷新进度 static int numLen = 0;
//每一块最大读取字节数,因为FileChannel.map 一次最大只能映射 Integer.MAX_VALUE个字节月2G左右,所以过大的文件需要分多次读取 static Long MAX_BLOCK_SIZE = (long) (Integer.MAX_VALUE - 1);
static Integer bufferCapacity = 1024;//每行字节数缓存大小,建议能够容纳最长的一行字节数
//使用指定的行处理工具实现来处理指定文件路径的文件 //bufferCapacity 每行cache字节数大小,建议设置确认能存储最长的一行的字节数,默认 1024 public static void process(String filePath, LineProcessHandler handler) throws IOException { //归零全局变量 numLen = 0;
FileInputStream audioExtraInfoFileInputStream = new FileInputStream(filePath); System.out.println("开始读取文件 " + filePath + " 数据"); FileChannel fcin = audioExtraInfoFileInputStream.getChannel(); long size = fcin.size(); CharArrayBuffer charArrayBuffer = new CharArrayBuffer(bufferCapacity); System.out.println("文件总字节数:" + size); long readReadTotalSize = 0; //计算需要分多少块 Double blockNum = Math.ceil(Double.valueOf(size) / MAX_BLOCK_SIZE); for (int b = 0; b < blockNum.intValue(); b++) { long startPosition = b * MAX_BLOCK_SIZE; long readSize = (b + 1) * MAX_BLOCK_SIZE < size ? MAX_BLOCK_SIZE : size - b * MAX_BLOCK_SIZE; System.out.println(String.format("开始读取字节范围: %s - %s, 预计读取字节数 %s", startPosition + 1L, startPosition + readSize, readSize)); long actualReadBytes = readMap(fcin, startPosition, readSize, charArrayBuffer, handler); System.out.println("实际读取字节数 " + actualReadBytes); readReadTotalSize += actualReadBytes; }
//触发行数据处理工具实现类close方法 handler.close(); System.out.println("已完成文件" + filePath + "读取 \n 实际读取总字节数:" + readReadTotalSize); }
//按字节和偏移量读取指定 size大小的字节 public static long readMap(FileChannel fcin, long position, long size, CharArrayBuffer charArrayBuffer, LineProcessHandler handler) throws IOException { System.out.print(String.format("读取进度:%s/", size)); //归零退格数值用于展示效果 numLen = 0; MappedByteBuffer buf = fcin.map(FileChannel.MapMode.READ_ONLY, position, size); long i = 0; while (i < size) { //按字节读取每个字节 char c = (char) buf.get(); //如果遇到\n 则一行结束,将charArrayBuffer汇总的一行字符串交给处理工具类实现处理 if (c == '\n') { String l = charArrayBuffer.toString(); handler.handle(l); //清空行数据buffer charArrayBuffer.clear(); } else { //如果不是换行 则继续将字节加入行数据buffer charArrayBuffer.append(c); } i++; //打印字节数读取进度 printSchedule(i, size); } //换行 System.out.println(""); return i; }
/** * 进度条总长度 */
public static void printSchedule(long curr, long size) { if (curr == 1 || curr % 10000 == 0 || curr == size) { CharArrayBuffer charArrayBuffer = new CharArrayBuffer(20); for (int i = 0; i < numLen; i++) { charArrayBuffer.append('\b'); } System.out.print(charArrayBuffer.toCharArray()); String processNum = String.valueOf(curr); System.out.print(processNum); numLen = processNum.length(); }
}
/** * 行处理工具接口声明 */ interface LineProcessHandler { String handle(String line); default void close() throws IOException {} }
//多处理器串行处理 static class MultipleHandlerProcessFactory implements LineProcessHandler{
List<LineProcessHandler> handlerList;
public MultipleHandlerProcessFactory(LineProcessHandler ... handlerList) { this.handlerList = Arrays.asList(handlerList.clone()); }
@Override public String handle(String line) { String res = new String(line.getBytes()); for (LineProcessHandler handler:handlerList){ res = handler.handle(res); } return res; } }
//抽象类写入其他文件类型的处理工具,可被实现重写解析逻辑 static abstract class WriteToAnotherFileHandler implements LineProcessHandler { String filePath;
BufferedWriter bufferedWriter;
//默认使用自动创建新文件的打开模式 public WriteToAnotherFileHandler(String filePath) throws IOException { this.filePath = filePath; bufferedWriter = Files.newBufferedWriter(Paths.get(filePath), StandardOpenOption.CREATE_NEW); }
//定制文件打开模式 public WriteToAnotherFileHandler(String filePath, OpenOption openOption) throws IOException { this.filePath = filePath; bufferedWriter = Files.newBufferedWriter(Paths.get(filePath), openOption); }
//自定义每行数据的解析判断逻辑 返回判断之后的字符串 abstract String analysis(String line);
@Override public String handle(String line) { String analysisResult = analysis(line); if (StringUtils.isNotBlank(analysisResult)) { try { bufferedWriter.write(analysisResult.concat("\r\n")); bufferedWriter.flush(); return analysisResult; } catch (IOException e) { e.printStackTrace(); return null; } } return null; }
@Override public void close() throws IOException { bufferedWriter.close(); } }
//正则提取大文件中每一行的数据内容 static class RegExpAnalysisHandler implements LineProcessHandler{ Pattern pattern;
public RegExpAnalysisHandler(String regExp) { pattern = Pattern.compile(regExp); }
@Override public String handle(String line) { Matcher matcher = pattern.matcher(line); if (matcher.find()){ return matcher.group(); } return null; } }

//案例,大文件为逗号隔开的csv文件,只需要提取第一列,写入到其他文件中 static class GetOneColumnHandler extends WriteToAnotherFileHandler{
public GetOneColumnHandler(String filePath) throws IOException { super(filePath); }
@Override String analysis(String line) { if (StringUtils.isNotBlank(line)){ return line.split(",")[0]; } return null; } }
static class PrintLineHandler implements LineProcessHandler{
@Override public String handle(String line) { if (StringUtils.isNotBlank(line)){ System.out.println(line); } return line; } }

public static void main(String[] args) throws IOException { //定制处理每一行的数据,写入另外一个文件, 例如 提取csv文件每行的第一列数据 写入另外一个文件 HugeFileProcessUtil.process("D:\\beijindfs_2.txt",new GetOneColumnHandler("D:\\a.txt"));
//多handler串行处理,例如先查询正则,然后打印匹配的行 MultipleHandlerProcessFactory multipleHandlerProcessFactory = new MultipleHandlerProcessFactory(new RegExpAnalysisHandler(".*\\.lyb"),new PrintLineHandler()); HugeFileProcessUtil.process("D:\\beijindfs_2.txt",multipleHandlerProcessFactory); }
}