commit 0ceb53fdabc49fdbd04b3e2911fc0199697dda5e Author: 尹舟 <13007110208@163.com> Date: Fri Jul 25 17:04:18 2025 +0800 1 diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..74cc9bb --- /dev/null +++ b/pom.xml @@ -0,0 +1,211 @@ + + + 4.0.0 + org.example + flinkcdc + 1.0 + + UTF-8 + UTF-8 + 1.8 + 1.19.1 + 2.12 + 3.0.1 + 3.2.0-1.19 + 1.7.25 + 1.18.8 + 1.2.83 + + + + + + org.apache.flink + flink-java + ${flink.version} + + + + org.apache.flink + flink-streaming-java + ${flink.version} + + + + org.apache.flink + flink-clients + ${flink.version} + + + + org.apache.flink + flink-table-planner_${scala.binary.version} + ${flink.version} + + + + org.apache.flink + flink-table-api-java-bridge + ${flink.version} + + + + + org.apache.flink + flink-connector-base + ${flink.version} + + + + com.ververica + flink-connector-mysql-cdc + ${mysql-cdc.version} + + + + org.apache.flink + flink-connector-jdbc + ${flink-connector.version} + + + + org.apache.flink + flink-connector-kafka + ${flink-connector.version} + + + org.apache.flink + flink-json + ${flink.version} + + + + mysql + mysql-connector-java + 8.0.33 + + + + + org.slf4j + slf4j-simple + ${slf4j.version} + compile + + + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.1 + + ${java.version} + ${java.version} + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.0.0 + + + + package + + shade + + + + + + org.apache.flink:force-shading + com.google.code.findbugs:jsr305 + org.slf4j:* + log4j:* + + + + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + + + src.main.java.Main + + + + + + + + + + + + + + org.eclipse.m2e + lifecycle-mapping + 1.0.0 + + + + + + org.apache.maven.plugins + maven-shade-plugin + [3.0.0,) + + shade + + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + [3.1,) + + testCompile + compile + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/src/main/java/flink/sql/KafkaToMySQL.java b/src/main/java/flink/sql/KafkaToMySQL.java new file mode 100644 index 0000000..659268b --- /dev/null +++ b/src/main/java/flink/sql/KafkaToMySQL.java @@ -0,0 +1,68 @@ +package flink.sql; + +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.api.TableEnvironment; + +public class KafkaToMySQL { + public static void main(String[] args) throws Exception { + // 设置流环境并启用 Checkpoint + StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + streamEnv.enableCheckpointing(5000); // 每 5 秒做一次 checkpoint + + // 创建 TableEnvironment + TableEnvironment env = StreamTableEnvironment.create(streamEnv); + + // Kafka Source 表定义(Debezium JSON 格式) + String createKafkaSource = "CREATE TABLE kafka_source (\n" + + " menu_id INT NOT NULL,\n" + + " parent_menu_id INT,\n" + + " menu_type TINYINT,\n" + + " menu_title STRING,\n" + + " menu_name STRING,\n" + + " menu_path STRING,\n" + + " menu_icon STRING,\n" + + " menu_server_uri STRING,\n" + + " sort_index INT,\n" + + " create_time INT,\n" + + " update_time INT\n" + + ") WITH (\n" + + " 'connector' = 'kafka',\n" + + " 'topic' = 'kafka_admin_menu',\n" + + " 'properties.bootstrap.servers' = 'localhost:9093',\n" + + " 'properties.group.id' = 'flink_kafka_to_mysql',\n" + + " 'properties.auto.offset.reset' = 'earliest',\n" + + " 'format' = 'debezium-json'\n" + + ");"; + + // MySQL Sink 表定义 + String createMySQLSink = "CREATE TABLE mysql_sink (\n" + + " menu_id INT NOT NULL,\n" + + " parent_menu_id INT,\n" + + " menu_type TINYINT,\n" + + " menu_title STRING,\n" + + " menu_name STRING,\n" + + " menu_path STRING,\n" + + " menu_icon STRING,\n" + + " menu_server_uri STRING,\n" + + " sort_index INT,\n" + + " create_time INT,\n" + + " update_time INT,\n" + + " PRIMARY KEY(menu_id) NOT ENFORCED\n" + + ") WITH (\n" + + " 'connector' = 'jdbc',\n" + + " 'url' = 'jdbc:mysql://localhost:3336/demo',\n" + + " 'table-name' = 'admin_menu1',\n" + + " 'username' = 'root',\n" + + " 'password' = 'mysql57'\n" + + ");"; + + // 插入语句:从 Kafka 读取并写入 MySQL + String insertSQL = "INSERT INTO mysql_sink SELECT * FROM kafka_source"; + + // 执行 SQL + env.executeSql(createKafkaSource); + env.executeSql(createMySQLSink); + env.executeSql(insertSQL); + } +} diff --git a/src/main/java/flink/sql/MySQLToKafka.java b/src/main/java/flink/sql/MySQLToKafka.java new file mode 100644 index 0000000..daf722c --- /dev/null +++ b/src/main/java/flink/sql/MySQLToKafka.java @@ -0,0 +1,80 @@ +package flink.sql; + +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; + +/** + * @author yinz + * @title: fxsql + * @projectName flink + * @description: TODO + * @date 2022/9/1515:53 + */ +public class MySQLToKafka { + public static void main(String[] args) throws Exception { + // 使用 StreamExecutionEnvironment 来控制 Checkpoint + StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + streamEnv.enableCheckpointing(5000); // 每 5 秒做一次 checkpoint + + // 创建 TableEnvironment + TableEnvironment env = StreamTableEnvironment.create(streamEnv); + + + String sql = "CREATE TABLE admin_menu(\n" + + " menu_id int NOT NULL ,\n" + + " parent_menu_id int,\n" + + " menu_type tinyint,\n" + + " menu_title STRING,\n" + + " menu_name STRING,\n" + + " menu_path STRING,\n" + + " menu_icon STRING,\n" + + " menu_server_uri STRING,\n" + + " sort_index int,\n" + + " create_time int,\n" + + " update_time int,\n" + + " PRIMARY KEY(menu_id) NOT ENFORCED\n" + + ") with (\n" + + " 'connector' = 'mysql-cdc',\n" + + " 'scan.startup.mode' = 'initial',\n" + + " 'hostname' = 'localhost',\n" + + " 'port' = '3336',\n" + + " 'username' = 'root',\n" + + " 'password' = 'mysql57',\n" + + " 'database-name' = 'demo',\n" + + " 'debezium.database.server.name' = '123456',\n" + + " 'table-name' = 'admin_menu')"; + + +// flink写入到kafka + String sql1 = "CREATE TABLE admin_menu_kafka(\n" + + " menu_id INT NOT NULL,\n" + + " parent_menu_id INT,\n" + + " menu_type TINYINT,\n" + + " menu_title STRING,\n" + + " menu_name STRING,\n" + + " menu_path STRING,\n" + + " menu_icon STRING,\n" + + " menu_server_uri STRING,\n" + + " sort_index INT,\n" + + " create_time INT,\n" + + " update_time INT\n" + + ") WITH (\n" + + " 'connector' = 'kafka',\n" + + " 'topic' = 'kafka_admin_menu2',\n" + + " 'properties.bootstrap.servers' = '10.23.0.209:9093',\n" + + " 'properties.group.id' = 'testGroup',\n" + + " 'format' = 'debezium-json'\n" + + ");\n"; + + String sql2 = "INSERT INTO admin_menu_kafka\n" + + "select\n" + + " *\n" + + "from admin_menu"; + env.executeSql(sql); + env.executeSql(sql1); + env.executeSql(sql2); + + + } +} diff --git a/src/main/java/flink/sql/MySQLToKafka整库.java b/src/main/java/flink/sql/MySQLToKafka整库.java new file mode 100644 index 0000000..e2b01c2 --- /dev/null +++ b/src/main/java/flink/sql/MySQLToKafka整库.java @@ -0,0 +1,58 @@ +package flink.sql; + +import com.ververica.cdc.connectors.mysql.source.MySqlSource; +import com.ververica.cdc.connectors.mysql.table.StartupOptions; +import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; +import org.apache.flink.api.common.serialization.SimpleStringSchema; + +import java.util.Properties; + +public class MySQLToKafka整库 { + public static void main(String[] args) throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(50000); // 每50秒做一次checkpoint + env.setParallelism(1); // 根据实际情况调整并行度 + + // 创建 MySQL CDC Source + MySqlSource mySqlSource = MySqlSource.builder() + .hostname("yin520.cn") + .port(3306) + .databaseList("test") // 监听 demo 数据库 + .tableList("test.*") // 监听 demo 数据库下的所有表 + .username("root") + .password("yin520.cn") + .startupOptions(StartupOptions.initial()) + .deserializer(new JsonDebeziumDeserializationSchema()) + .build(); + + // 从 source 读取数据 + DataStream sourceStream = env.fromSource( + mySqlSource, + WatermarkStrategy.noWatermarks(), + "MySQL CDC Source" + ); + + // 配置 Kafka Producer + Properties kafkaProps = new Properties(); + kafkaProps.setProperty("bootstrap.servers", "10.23.0.209:9093"); + kafkaProps.setProperty("max.request.size", "5242880"); // 5MB + kafkaProps.setProperty("batch.size", "5242880");// 5MB + + + FlinkKafkaProducer kafkaProducer = new FlinkKafkaProducer<>( + "mysql_cdc_demo_all1", // 默认 topic + new SimpleStringSchema(), // 序列化 schema + kafkaProps // Kafka 配置 + ); + + // 发送数据到 Kafka + sourceStream.addSink(kafkaProducer).name("Kafka Sink"); + + // 执行任务 + env.execute("MySQL Database Sync to Kafka"); + } +} diff --git a/src/main/java/flink/sql/fxsql.java b/src/main/java/flink/sql/fxsql.java new file mode 100644 index 0000000..56587ac --- /dev/null +++ b/src/main/java/flink/sql/fxsql.java @@ -0,0 +1,66 @@ +package flink.sql; + +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableEnvironment; + +/** + * @author yinz + * @title: fxsql + * @projectName flink + * @description: TODO + * @date 2022/9/1515:53 + */ +public class fxsql { + public static void main(String[] args) throws Exception { + EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build(); + TableEnvironment env = TableEnvironment.create(settings); + String sql="CREATE TABLE admin_menu(\n" + + " menu_id int NOT NULL ,\n" + + " parent_menu_id int,\n" + + " menu_type tinyint,\n" + + " menu_title STRING,\n" + + " menu_name STRING,\n" + + " menu_path STRING,\n" + + " menu_icon STRING,\n" + + " menu_server_uri STRING,\n" + + " sort_index int,\n" + + " create_time int,\n" + + " update_time int,\n" + + " PRIMARY KEY(menu_id) NOT ENFORCED\n" + + ") with (\n" + + " 'connector' = 'mysql-cdc',\n" + + " 'scan.startup.mode' = 'initial',\n" + + " 'hostname' = 'yin520.cn',\n" + + " 'port' = '3306',\n" + + " 'username' = 'root',\n" + + " 'password' = 'yin520.cn',\n" + + " 'database-name' = 'test',\n" + + " 'table-name' = 'admin_menu')"; + String sql1="CREATE TABLE admin_menu_a(\n" + + " menu_id int NOT NULL ,\n" + + " parent_menu_id int,\n" + + " menu_type tinyint,\n" + + " menu_title STRING,\n" + + " menu_name STRING,\n" + + " menu_path STRING,\n" + + " menu_icon STRING,\n" + + " menu_server_uri STRING,\n" + + " sort_index int,\n" + + " create_time int,\n" + + " update_time int,\n" + + " PRIMARY KEY(menu_id) NOT ENFORCED\n" + + ") with (\n" + + " 'connector' = 'jdbc',\n" + + " 'url' = 'jdbc:mysql://yin520.cn:3306/yintest1',\n" + + " 'username' = 'root',\n" + + " 'password' = 'yin520.cn',\n" + + " 'table-name' = 'admin_menu')"; + String sql2="INSERT INTO admin_menu_a\n" + + "select\n" + + " *\n" + + "from admin_menu"; + env.executeSql( sql); + env.executeSql(sql1); + env.executeSql(sql2); + } +} diff --git a/src/main/resources/log4j.properties b/src/main/resources/log4j.properties new file mode 100644 index 0000000..0aa059b --- /dev/null +++ b/src/main/resources/log4j.properties @@ -0,0 +1,30 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Set root logger level to OFF to not flood build logs +# set manually to INFO for debugging purposes +rootLogger.level=DEBUG +rootLogger.appenderRef.test.ref = TestLogger + +appender.testlogger.name = TestLogger +appender.testlogger.type = CONSOLE +appender.testlogger.target = SYSTEM_ERR +appender.testlogger.layout.type = PatternLayout +appender.testlogger.layout.pattern = %-4r [%t] %-5p %c - %m%n +log4j.logger.io.debezium=DEBUG +log4j.logger.com.ververica.cdc=DEBUG \ No newline at end of file