Hudi cdc源码分析

原理部分参考这篇博客

HoodieMergeHandleFactory在开启cdc时创建HoodieMergeHandleWithChangeLog,需要设置参数”hoodie.table.cdc.enabled”为true。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public static <T, I, K, O> HoodieMergeHandle<T, I, K, O> create(
WriteOperationType operationType,
HoodieWriteConfig writeConfig,
String instantTime,
HoodieTable<T, I, K, O> table,
Iterator<HoodieRecord<T>> recordItr,
String partitionPath,
String fileId,
TaskContextSupplier taskContextSupplier,
Option<BaseKeyGenerator> keyGeneratorOpt) {
if (table.requireSortedRecords()) {
if (table.getMetaClient().getTableConfig().isCDCEnabled()) {
return new HoodieSortedMergeHandleWithChangeLog<>(writeConfig, instantTime, table, recordItr, partitionPath, fileId, taskContextSupplier,
keyGeneratorOpt);
} else {
return new HoodieSortedMergeHandle<>(writeConfig, instantTime, table, recordItr, partitionPath, fileId, taskContextSupplier,
keyGeneratorOpt);
}
} else if (!WriteOperationType.isChangingRecords(operationType) && writeConfig.allowDuplicateInserts()) {
return new HoodieConcatHandle<>(writeConfig, instantTime, table, recordItr, partitionPath, fileId, taskContextSupplier, keyGeneratorOpt);
} else {
if (table.getMetaClient().getTableConfig().isCDCEnabled()) {
return new HoodieMergeHandleWithChangeLog<>(writeConfig, instantTime, table, recordItr, partitionPath, fileId, taskContextSupplier, keyGeneratorOpt);
} else {
return new HoodieMergeHandle<>(writeConfig, instantTime, table, recordItr, partitionPath, fileId, taskContextSupplier, keyGeneratorOpt);
}
}
}

HoodieMergeHandleWithChangeLog 初始化时会创建一个HoodieCDCLogger对象。
在update和insert数据时写入变更数据,在insert数据的情况下,oldRecord写入null。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
protected boolean writeUpdateRecord(HoodieRecord<T> newRecord, HoodieRecord<T> oldRecord, Option<HoodieRecord> combinedRecordOpt, Schema writerSchema)
throws IOException {
// TODO [HUDI-5019] Remove these unnecessary newInstance invocations
Option<HoodieRecord> savedCombineRecordOp = combinedRecordOpt.map(HoodieRecord::newInstance);
final boolean result = super.writeUpdateRecord(newRecord, oldRecord, combinedRecordOpt, writerSchema);
if (result) {
boolean isDelete = HoodieOperation.isDelete(newRecord.getOperation());
Option<IndexedRecord> avroRecordOpt = savedCombineRecordOp.flatMap(r ->
toAvroRecord(r, writerSchema, config.getPayloadConfig().getProps()));
cdcLogger.put(newRecord, (GenericRecord) oldRecord.getData(), isDelete ? Option.empty() : avroRecordOpt);
}
return result;
}

protected void writeInsertRecord(HoodieRecord<T> newRecord) throws IOException {
Schema schema = useWriterSchemaForCompaction ? writeSchemaWithMetaFields : writeSchema;
// TODO Remove these unnecessary newInstance invocations
HoodieRecord<T> savedRecord = newRecord.newInstance();
super.writeInsertRecord(newRecord);
if (!HoodieOperation.isDelete(newRecord.getOperation())) {
cdcLogger.put(newRecord, null, savedRecord.toIndexedRecord(schema, config.getPayloadConfig().getProps()).map(HoodieAvroIndexedRecord::getData));
}
}

HoodieCDCLogger 通过比较oldRecord和newRecord判断这条数据的操作类型,这里写入的是recordKey和完整的record数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public void put(HoodieRecord hoodieRecord,
GenericRecord oldRecord,
Option<IndexedRecord> newRecord) {
String recordKey = hoodieRecord.getRecordKey();
GenericData.Record cdcRecord;
if (newRecord.isPresent()) {
GenericRecord record = (GenericRecord) newRecord.get();
if (oldRecord == null) {
// INSERT cdc record
cdcRecord = this.transformer.transform(HoodieCDCOperation.INSERT, recordKey,
null, record);
} else {
// UPDATE cdc record
cdcRecord = this.transformer.transform(HoodieCDCOperation.UPDATE, recordKey,
oldRecord, record);
}
} else {
// DELETE cdc record
cdcRecord = this.transformer.transform(HoodieCDCOperation.DELETE, recordKey,
oldRecord, null);
}

flushIfNeeded(false);
HoodieAvroPayload payload = new HoodieAvroPayload(Option.of(cdcRecord));
if (cdcData.isEmpty()) {
averageCDCRecordSize = sizeEstimator.sizeEstimate(payload);
}
cdcData.put(recordKey, payload);
numOfCDCRecordsInMemory.incrementAndGet();
}