-- 单表同步 DROP TABLE IF EXISTS cdc_dinky_flink_document; CREATE TABLE IF NOT EXISTS cdc_dinky_flink_document ( `id` INT NOT NULL COMMENT 'id', `category` STRING COMMENT 'document category', `type` STRING COMMENT 'document type', `subtype` STRING COMMENT 'document subtype', `name` STRING COMMENT 'document name', `description` STRING, `fill_value` STRING COMMENT 'fill value', `version` STRING COMMENT 'document version such as:(flink1.12,flink1.13,flink1.14,flink1.15,flink1.16,flink1.17,flink1.18)', `like_num` INT COMMENT 'like number', `enabled` BOOLEAN NOT NULL COMMENT 'is enable', `create_time` TIMESTAMP COMMENT 'create time', `update_time` TIMESTAMP COMMENT 'update time', `creator` INT COMMENT 'creator user id', `updater` INT COMMENT 'updater user id', PRIMARY KEY (`menu_id`) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'mysql57', 'port' = '3306', 'username' = 'root', 'password' = 'mysql57', 'server-time-zone' = 'Asia/Shanghai', 'scan.incremental.snapshot.enabled' = 'true', 'debezium.snapshot.mode' = 'initial', 'database-name' = 'dinky', 'table-name' = 'dinky_flink_document' ); CREATE TABLE IF NOT EXISTS mysql_dinky_flink_document ( `id` INT NOT NULL COMMENT 'id', `category` STRING COMMENT 'document category', `type` STRING COMMENT 'document type', `subtype` STRING COMMENT 'document subtype', `name` STRING COMMENT 'document name', `description` STRING, `fill_value` STRING COMMENT 'fill value', `version` STRING COMMENT 'document version such as:(flink1.12,flink1.13,flink1.14,flink1.15,flink1.16,flink1.17,flink1.18)', `like_num` INT COMMENT 'like number', `enabled` BOOLEAN NOT NULL COMMENT 'is enable', `create_time` TIMESTAMP COMMENT 'create time', `update_time` TIMESTAMP COMMENT 'update time', `creator` INT COMMENT 'creator user id', `updater` INT COMMENT 'updater user id', PRIMARY KEY (`menu_id`) NOT ENFORCED ) WITH ( 'connector' = 'jdbc' ,'url' = 'jdbc:mysql://mysql57:3306/demo' ,'username' = 'root' ,'password' = 'mysql57' ,'table-name' = 'dinky_flink_document' ); INSERT INTO mysql_dinky_flink_document SELECT * FROM cdc_dinky_flink_document ; -- 整库同步 SET 'execution.checkpointing.interval' = '10s'; EXECUTE CDCSOURCE cdc_mysql WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'mysql57', 'port' = '3306', 'username' = 'root', 'password' = 'mysql57', 'checkpoint' = '3000', 'scan.startup.mode' = 'initial', 'parallelism' = '1', 'server-time-zone' = 'Asia/Shanghai', 'table-name' = 'demo\.admin_menu,demo\.admin_role', 'sink.connector' = 'jdbc', 'sink.url' = 'jdbc:mysql://mysql57:3306/to_demo?characterEncoding=utf-8&useSSL=false&serverTimezone=Asia/Shanghai', 'sink.username' = 'root', 'sink.password' = 'mysql57', 'sink.sink.db' = 'to_demo', 'sink.table.prefix' = 'ods_', 'sink.table.lower' = 'true', 'sink.table-name' = '#{tableName}', 'sink.driver' = 'com.mysql.jdbc.Driver', 'sink.sink.buffer-flush.interval' = '2s', 'sink.sink.buffer-flush.max-rows' = '100', 'sink.sink.max-retries' = '5', 'sink.auto.create' = 'true' );