/** * Computes pruned list of candidate base-files' names based on provided list of {@link dataFilters} * conditions, by leveraging Metadata Table's Column Statistics index (hereon referred as ColStats for brevity) * bearing "min", "max", "num_nulls" statistics for all columns. * * NOTE: This method has to return complete set of candidate files, since only provided candidates will * ultimately be scanned as part of query execution. Hence, this method has to maintain the * invariant of conservatively including every base-file's name, that is NOT referenced in its index. * * @param queryFilters list of original data filters passed down from querying engine * @return list of pruned (data-skipped) candidate base-files' names */ privatedeflookupCandidateFilesInMetadataTable(queryFilters: Seq[Expression]): Try[Option[Set[String]]] = Try { // NOTE: Data Skipping is only effective when it references columns that are indexed w/in // the Column Stats Index (CSI). Following cases could not be effectively handled by Data Skipping: // - Expressions on top-level column's fields (ie, for ex filters like "struct.field > 0", since // CSI only contains stats for top-level columns, in this case for "struct") // - Any expression not directly referencing top-level column (for ex, sub-queries, since there's // nothing CSI in particular could be applied for) lazyval queryReferencedColumns = collectReferencedColumns(spark, queryFilters, schema)
if (!isMetadataTableEnabled || !isDataSkippingEnabled || !columnStatsIndex.isIndexAvailable) { validateConfig() Option.empty } elseif (queryFilters.isEmpty || queryReferencedColumns.isEmpty) { Option.empty } else { // NOTE: Since executing on-cluster via Spark API has its own non-trivial amount of overhead, // it's most often preferential to fetch Column Stats Index w/in the same process (usually driver), // w/o resorting to on-cluster execution. // For that we use a simple-heuristic to determine whether we should read and process CSI in-memory or // on-cluster: total number of rows of the expected projected portion of the index has to be below the // threshold (of 100k records) val shouldReadInMemory = columnStatsIndex.shouldReadInMemory(this, queryReferencedColumns)
columnStatsIndex.loadTransposed(queryReferencedColumns, shouldReadInMemory) { transposedColStatsDF => val indexSchema = transposedColStatsDF.schema val indexFilter = queryFilters.map(translateIntoColumnStatsIndexFilterExpr(_, indexSchema)) .reduce(And)
val allIndexedFileNames = transposedColStatsDF.select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME) .collect() .map(_.getString(0)) .toSet
val prunedCandidateFileNames = transposedColStatsDF.where(newColumn(indexFilter)) .select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME) .collect() .map(_.getString(0)) .toSet
// NOTE: Col-Stats Index isn't guaranteed to have complete set of statistics for every // base-file: since it's bound to clustering, which could occur asynchronously // at arbitrary point in time, and is not likely to be touching all of the base files. // // To close that gap, we manually compute the difference b/w all indexed (by col-stats-index) // files and all outstanding base-files, and make sure that all base files not // represented w/in the index are included in the output of this method val notIndexedFileNames = lookupFileNamesMissingFromIndex(allIndexedFileNames)