Hudi 查询加速

索引

Hudi 支持多种索引:

HBASE, INMEMORY, BLOOM, GLOBAL_BLOOM, SIMPLE, GLOBAL_SIMPLE, BUCKET。

但以下索引不建议使用:

  • INMEMRY 索引

根据内存中的索引数据去匹配,基本不可用

  • SIMPLE 索引

根据新数据所在的分区,获取受影响的分区文件列表,直接读取该部分 parquet 文件的 partition_key 和 record_key,与新数据执行 leftOutJoin 产生索引数据,性能低下(以 hoodieKey 为匹配键)

  • HBASE 索引

需要引入额外的 Hbase 服务,业务方基本不会允许

  • GLOBAL_SIMPLE 索引

与 SIMPLE 索引相比,GLOBAL_SIMPLE 索引读取的旧数据是全量数据,以 recordkey为匹配键,性能更低下

BLOOM

写入信息

org.apache.hudi.avro.HoodieBloomFilterWriteSupport#finalizeMetadata.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public Map<String, String> finalizeMetadata() {
HashMap<String, String> extraMetadata = new HashMap<>();

extraMetadata.put(HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY, bloomFilter.serializeToString());
if (bloomFilter.getBloomFilterTypeCode().name().contains(HoodieDynamicBoundedBloomFilter.TYPE_CODE_PREFIX)) {
extraMetadata.put(HOODIE_BLOOM_FILTER_TYPE_CODE, bloomFilter.getBloomFilterTypeCode().name());
}

if (minRecordKey != null && maxRecordKey != null) {
extraMetadata.put(HOODIE_MIN_RECORD_KEY_FOOTER, minRecordKey.toString());
extraMetadata.put(HOODIE_MAX_RECORD_KEY_FOOTER, maxRecordKey.toString());
}

return extraMetadata;
}

可以看到这里会写入布隆过滤器序列化后的信息和主键的 MIN/MAX 信息。
写入到 parquet 文件的 footer 中。

读取信息

org.apache.hudi.common.util.BaseFileUtils#readBloomFilterFromMetadata.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public BloomFilter readBloomFilterFromMetadata(Configuration configuration, Path filePath) {
Map<String, String> footerVals =
readFooter(configuration, false, filePath,
HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY,
HoodieAvroWriteSupport.OLD_HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY,
HoodieBloomFilterWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE);
String footerVal = footerVals.get(HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY);
if (null == footerVal) {
// We use old style key "com.uber.hoodie.bloomfilter"
footerVal = footerVals.get(HoodieAvroWriteSupport.OLD_HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY);
}
BloomFilter toReturn = null;
if (footerVal != null) {
if (footerVals.containsKey(HoodieBloomFilterWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE)) {
toReturn = BloomFilterFactory.fromString(footerVal,
footerVals.get(HoodieBloomFilterWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE));
} else {
toReturn = BloomFilterFactory.fromString(footerVal, BloomFilterTypeCode.SIMPLE.name());
}
}
return toReturn;
}

从 footer 中读取信息,构建布隆过滤器。

结论

还是写入 footer 中,跟 Iceberg 差不多,只是 bloomfilter 的算法可能不同。

GLOBAL_BLOOM

全局索引强制跨表的所有分区的键的唯一性,即保证表中对于给定的记录键恰好存在一条记录。

和 BLOOM 的区别

加载文件时加载全部分区的数据文件.
org.apache.hudi.index.bloom.HoodieGlobalBloomIndex#loadColumnRangesFromFiles

1
2
3
4
5
6
7
@Override
List<Pair<String, BloomIndexFileInfo>> loadColumnRangesFromFiles(List<String> partitions, final HoodieEngineContext context,
final HoodieTable hoodieTable) {
HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
List<String> allPartitionPaths = FSUtils.getAllPartitionPaths(context, config.getMetadataConfig(), metaClient.getBasePath());
return super.loadColumnRangesFromFiles(allPartitionPaths, context, hoodieTable);
}

org.apache.hudi.index.bloom.HoodieBloomIndex#loadColumnRangesFromFiles.
只加载特定分区

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
List<Pair<String, BloomIndexFileInfo>> loadColumnRangesFromFiles(
List<String> partitions, final HoodieEngineContext context, final HoodieTable hoodieTable) {
// Obtain the latest data files from all the partitions.
List<Pair<String, Pair<String, HoodieBaseFile>>> partitionPathFileIDList = getLatestBaseFilesForAllPartitions(partitions, context, hoodieTable).stream()
.map(pair -> Pair.of(pair.getKey(), Pair.of(pair.getValue().getFileId(), pair.getValue())))
.collect(toList());

context.setJobStatus(this.getClass().getName(), "Obtain key ranges for file slices (range pruning=on): " + config.getTableName());
return context.map(partitionPathFileIDList, pf -> {
try {
HoodieRangeInfoHandle rangeInfoHandle = new HoodieRangeInfoHandle(config, hoodieTable, Pair.of(pf.getKey(), pf.getValue().getKey()));
String[] minMaxKeys = rangeInfoHandle.getMinMaxKeys(pf.getValue().getValue());
return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue().getKey(), minMaxKeys[0], minMaxKeys[1]));
} catch (MetadataNotFoundException me) {
LOG.warn("Unable to find range metadata in file :" + pf);
return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue().getKey()));
}
}, Math.max(partitionPathFileIDList.size(), 1));
}

结论

这么看,GLOBAL_BLOOM 并不能实现比 BLOOM 更好的性能。

BUCKET

推荐在较大数据量的场景下使用,避免布隆过滤器的假阳性问题。

MetadataTable

MetadataTable 是一张包含元数据的 mor 表,记录了数据表的文件、列统计、布隆过滤器的信息。

写入打标:

engine column_stat_idx bloom_filter_idx bucket_idx flink_state Simple Hbase_idx
Spark Y Y Y N flink only Y Y
Spark N N Y Y N spark only N

MetaDataTable表索引分区构建:

engine file_idx column_stat_idx bloom_filter_idx
Spark Y Y Y
Spark Y Y Y

读取data skipping:

engine column_stat_idx bloom_filter_idx bucket_idx
Spark Y N N
Spark Y N N

MetadataTable 的使用.
org.apache.hudi.HoodieFileIndex#lookupCandidateFilesInMetadataTable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
/**
* Computes pruned list of candidate base-files' names based on provided list of {@link dataFilters}
* conditions, by leveraging Metadata Table's Column Statistics index (hereon referred as ColStats for brevity)
* bearing "min", "max", "num_nulls" statistics for all columns.
*
* NOTE: This method has to return complete set of candidate files, since only provided candidates will
* ultimately be scanned as part of query execution. Hence, this method has to maintain the
* invariant of conservatively including every base-file's name, that is NOT referenced in its index.
*
* @param queryFilters list of original data filters passed down from querying engine
* @return list of pruned (data-skipped) candidate base-files' names
*/
private def lookupCandidateFilesInMetadataTable(queryFilters: Seq[Expression]): Try[Option[Set[String]]] = Try {
// NOTE: Data Skipping is only effective when it references columns that are indexed w/in
// the Column Stats Index (CSI). Following cases could not be effectively handled by Data Skipping:
// - Expressions on top-level column's fields (ie, for ex filters like "struct.field > 0", since
// CSI only contains stats for top-level columns, in this case for "struct")
// - Any expression not directly referencing top-level column (for ex, sub-queries, since there's
// nothing CSI in particular could be applied for)
lazy val queryReferencedColumns = collectReferencedColumns(spark, queryFilters, schema)

if (!isMetadataTableEnabled || !isDataSkippingEnabled || !columnStatsIndex.isIndexAvailable) {
validateConfig()
Option.empty
} else if (queryFilters.isEmpty || queryReferencedColumns.isEmpty) {
Option.empty
} else {
// NOTE: Since executing on-cluster via Spark API has its own non-trivial amount of overhead,
// it's most often preferential to fetch Column Stats Index w/in the same process (usually driver),
// w/o resorting to on-cluster execution.
// For that we use a simple-heuristic to determine whether we should read and process CSI in-memory or
// on-cluster: total number of rows of the expected projected portion of the index has to be below the
// threshold (of 100k records)
val shouldReadInMemory = columnStatsIndex.shouldReadInMemory(this, queryReferencedColumns)

columnStatsIndex.loadTransposed(queryReferencedColumns, shouldReadInMemory) { transposedColStatsDF =>
val indexSchema = transposedColStatsDF.schema
val indexFilter =
queryFilters.map(translateIntoColumnStatsIndexFilterExpr(_, indexSchema))
.reduce(And)

val allIndexedFileNames =
transposedColStatsDF.select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
.collect()
.map(_.getString(0))
.toSet

val prunedCandidateFileNames =
transposedColStatsDF.where(new Column(indexFilter))
.select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
.collect()
.map(_.getString(0))
.toSet

// NOTE: Col-Stats Index isn't guaranteed to have complete set of statistics for every
// base-file: since it's bound to clustering, which could occur asynchronously
// at arbitrary point in time, and is not likely to be touching all of the base files.
//
// To close that gap, we manually compute the difference b/w all indexed (by col-stats-index)
// files and all outstanding base-files, and make sure that all base files not
// represented w/in the index are included in the output of this method
val notIndexedFileNames = lookupFileNamesMissingFromIndex(allIndexedFileNames)

Some(prunedCandidateFileNames ++ notIndexedFileNames)
}
}
}

结论

dataskip 的时候只使用了 column_stat_idx 信息。

其他优化

TimelineService

构建 timeline 的时候避免从 HDFS 上读取数据文件。