FlinkCDC全增量读取源码分析

[未完待续]
快照读取的逻辑:

SHOW MASTER STATUS 获取 lw,插入队列
读取该分片内的记录,插入队列
SHOW MASTER STATUS 获取 hw,插入队列
判断 lw 与 hw 之间是否有增量变更
如果没有变更,队列中插入 BINLOG_END 记录
否则读取 [lw, hw] 之间的 binlog 并插入队列,最后一条记录为 BINLOG_END
MySqlSnapshotSplitReadTask#doExecute

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
protected SnapshotResult doExecute(
ChangeEventSourceContext context,
OffsetContext previousOffset,
SnapshotContext snapshotContext,
SnapshottingTask snapshottingTask)
throws Exception {
final RelationalSnapshotChangeEventSource.RelationalSnapshotContext ctx =
(RelationalSnapshotChangeEventSource.RelationalSnapshotContext) snapshotContext;
ctx.offset = offsetContext;

final BinlogOffset lowWatermark = currentBinlogOffset(jdbcConnection);
LOG.info(
"Snapshot step 1 - Determining low watermark {} for split {}",
lowWatermark,
snapshotSplit);
((SnapshotSplitChangeEventSourceContext) (context)).setLowWatermark(lowWatermark);
dispatcher.dispatchWatermarkEvent(
offsetContext.getPartition(), snapshotSplit, lowWatermark, WatermarkKind.LOW);

LOG.info("Snapshot step 2 - Snapshotting data");
createDataEvents(ctx, snapshotSplit.getTableId());

final BinlogOffset highWatermark = currentBinlogOffset(jdbcConnection);
LOG.info(
"Snapshot step 3 - Determining high watermark {} for split {}",
highWatermark,
snapshotSplit);
((SnapshotSplitChangeEventSourceContext) (context)).setHighWatermark(lowWatermark);
dispatcher.dispatchWatermarkEvent(
offsetContext.getPartition(), snapshotSplit, highWatermark, WatermarkKind.HIGH);
return SnapshotResult.completed(ctx.offset);
}