[未完待续]
快照读取的逻辑:
SHOW MASTER STATUS 获取 lw,插入队列
读取该分片内的记录,插入队列
SHOW MASTER STATUS 获取 hw,插入队列
判断 lw 与 hw 之间是否有增量变更
如果没有变更,队列中插入 BINLOG_END 记录
否则读取 [lw, hw] 之间的 binlog 并插入队列,最后一条记录为 BINLOG_END
MySqlSnapshotSplitReadTask#doExecute
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| protected SnapshotResult doExecute( ChangeEventSourceContext context, OffsetContext previousOffset, SnapshotContext snapshotContext, SnapshottingTask snapshottingTask) throws Exception { final RelationalSnapshotChangeEventSource.RelationalSnapshotContext ctx = (RelationalSnapshotChangeEventSource.RelationalSnapshotContext) snapshotContext; ctx.offset = offsetContext;
final BinlogOffset lowWatermark = currentBinlogOffset(jdbcConnection); LOG.info( "Snapshot step 1 - Determining low watermark {} for split {}", lowWatermark, snapshotSplit); ((SnapshotSplitChangeEventSourceContext) (context)).setLowWatermark(lowWatermark); dispatcher.dispatchWatermarkEvent( offsetContext.getPartition(), snapshotSplit, lowWatermark, WatermarkKind.LOW);
LOG.info("Snapshot step 2 - Snapshotting data"); createDataEvents(ctx, snapshotSplit.getTableId());
final BinlogOffset highWatermark = currentBinlogOffset(jdbcConnection); LOG.info( "Snapshot step 3 - Determining high watermark {} for split {}", highWatermark, snapshotSplit); ((SnapshotSplitChangeEventSourceContext) (context)).setHighWatermark(lowWatermark); dispatcher.dispatchWatermarkEvent( offsetContext.getPartition(), snapshotSplit, highWatermark, WatermarkKind.HIGH); return SnapshotResult.completed(ctx.offset); }
|