用Flink实时构建倒排索引与全文检索
对于搜索引擎,大家不会感到陌生,我们每天都在用。
我们在百度、谷歌上搜索我们想要的信息。比如,在输入框里输入关键字查询后,会返回很多和关键字相关的内容。
或者在电商网站输入想要购买的商品名称后,就立即能查到我们想要购买的商品信息。
但是大家有没有思考过,为什么网站能快速检索到我们想要看到的信息?这里其实用到了倒排索引技术。
简单的介绍一下倒排索引
举个例子,我们小时候背诵过的古诗,当我们看到一首诗的题目时,可以很快速的背诵诗的内容。但是如果我们看到一句诗时,却很难快速说出诗的题目。
或者我们看到诗的上半句,一般会很轻松的背诵出诗的下半句。但是根据诗的下半句,很难快速想到诗的上半句。
这是因为,我们大脑存储的诗词,是通过正排索引的方式组织起来的,类似于关系型数据库一样,通过id很快能查到详细内容。但是要通过内容反查id,就不是那么容易了。
再比如,我们的电脑里有很多文件,我们能搜索到一个文件里有什么词,但是我们统计某个词在哪些文件里出现过,以及出现的次数,就不是那么容易了。
下面内容,用Flink实时构建倒排索引,实现一个全文检索的功能。
需求:有大量文本文件,需要构建索引。输入某个关键字,输出关键字在哪些文件里出现过,以及在文件里出现的次数。
思路:批量读取磁盘上的文件内容,将文件内容发送给kafka,Flink从kafka消费数据,将数据内容分词,记录每次词出现的词频和所在的文件名,然后通过Flink sql实时统计每个单词所在的文件,和在每个文件中出现的次数,写入到下游存储。
关键代码如下:
1、收集文件内容并发送给kafka
这是个很简单的程序,通过递归读取目录下的全部文件,将文件信息发送给kafka。
public static void main(String[] args) throws Exception {
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("acks", "all");
properties.put("retries", 0);
properties.put("batch.size", 16384);
properties.put("linger.ms", 1);
properties.put("buffer.memory", 33554432);
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = null;
String line = null;
try {
producer = new KafkaProducer<String, String>(properties);
// 遍历目录下的所有文本文件
String dirPath = "/Users/liuli/code/HikariCP/src/main/java/com/zaxxer/hikari/";
List<String> fileList = filePathList(dirPath);
for (String filePath : fileList) {
File file = new File(filePath);
InputStreamReader inputStreamReader = new InputStreamReader(new FileInputStream(file), "UTF-8");
BufferedReader br = new BufferedReader(inputStreamReader);
while ((line = br.readLine()) != null) {
// 将数据封装后发送给kafka
WordInput input = new WordInput(line, file.getName());
producer.send(new ProducerRecord<String, String>("word", JSON.toJSONString(input)));
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.close();
}
}
2、Flink消费kafka的数据,并对原始数据做ETL转化。
flatMap算子中的操作是,将上报的数据特殊中的特殊字符过滤并封装成Word类发送给下游。
方便起见,这里采用关键字+文件名的hashCode作为一行数据的唯一id,后续根据这个id实时更新倒排索引。
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<Word> dataStream = env
.addSource(new FlinkKafkaConsumer011<String>("word", new SimpleStringSchema(), kafkaProp()))
.flatMap(new FlatMapFunction<String, Word>() {
public void flatMap(String value, Collector<Word> out) throws Exception {
// 对原始数据做ETL转化
WordInput input = JSON.parseObject(value, WordInput.class);
String line = input.getLine();
String fileName = input.getFileName();
line = line.replace(",", " ")
.replace(".", " ")
.replace(";", " ")
.replace(":", " ")
.replace("\"", " ")
.replace(")", " ")
.replace("(", " ")
.replace("{", " ")
.replace("}", " ");
String[] words = line.split(" ");
for (String aWord : words) {
if (aWord.length() < 5) {
continue;
}
Long id = Long.parseLong((aWord + fileName).hashCode() + "");
if (id < 0) {
id = -id;
}
Word word = new Word(id, aWord, fileName);
out.collect(word);
}
}
});
StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);
tabEnv.createTemporaryView("wordSource", dataStream);
// 构建倒排索引并写入到下游存储
buildIndexAndSink(tabEnv);
env.execute("Flink Streaming Java API Skeleton");
}
public class Word {
// id
private Long id;
// 词
private String word;
// 词所在的文件名
private String filePath;
}
3、最关键的一步,构建倒排索引,将收集的数据封装成Word发送到下游算子后,下游算子通过Flink sql实时统计每个单词出现的次数以及所在的文件,并将数据实时更新到MySQL中,代码如下:
private static void buildIndexAndSink(StreamTableEnvironment tabEnv) {
String sql = "select id,word,filePath,count(*) as cnt from wordSource group by id,word,filePath";
Table table = tabEnv.sqlQuery(sql);
JDBCOptions options = JDBCOptions.builder()
.setDBUrl("jdbc:mysql://xxx/fileIndex?useUnicode=true&characterEncoding=UTF-8")
.setDriverName("com.mysql.jdbc.Driver")
.setUsername("username")
.setPassword("password")
.setTableName("a_word_cnt")
.build();
TableSchema schema = TableSchema.builder()
.field("id", DataTypes.BIGINT())
.field("word", DataTypes.STRING())
.field("filePath", DataTypes.STRING())
.field("cnt", DataTypes.BIGINT())
.build();
JDBCUpsertTableSink sink = JDBCUpsertTableSink.builder()
.setOptions(options)
.setTableSchema(schema)
.build();
tabEnv.registerTableSink("outputSink", sink);
tabEnv.insertInto("outputSink", table);
}
通过上面代码,我们可以将目录下的所有文件,构建倒排索引。然后将索引信息写入MySQL,查到的效果如下,可以看到,某个词在哪些文件里出现过,以及出现的次数。
拓展:
我们是不是可以做个简易的搜索引擎呢?原理与上面的案例类似,需要把文件名替换成url。