From 0ceb53fdabc49fdbd04b3e2911fc0199697dda5e Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E5=B0=B9=E8=88=9F?= <13007110208@163.com>
Date: Fri, 25 Jul 2025 17:04:18 +0800
Subject: [PATCH] 1
---
pom.xml | 211 ++++++++++++++++++
src/main/java/flink/sql/KafkaToMySQL.java | 68 ++++++
src/main/java/flink/sql/MySQLToKafka.java | 80 +++++++
src/main/java/flink/sql/MySQLToKafka整库.java | 58 +++++
src/main/java/flink/sql/fxsql.java | 66 ++++++
src/main/resources/log4j.properties | 30 +++
6 files changed, 513 insertions(+)
create mode 100644 pom.xml
create mode 100644 src/main/java/flink/sql/KafkaToMySQL.java
create mode 100644 src/main/java/flink/sql/MySQLToKafka.java
create mode 100644 src/main/java/flink/sql/MySQLToKafka整库.java
create mode 100644 src/main/java/flink/sql/fxsql.java
create mode 100644 src/main/resources/log4j.properties
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..74cc9bb
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,211 @@
+
+
+ 4.0.0
+ org.example
+ flinkcdc
+ 1.0
+
+ UTF-8
+ UTF-8
+ 1.8
+ 1.19.1
+ 2.12
+ 3.0.1
+ 3.2.0-1.19
+ 1.7.25
+ 1.18.8
+ 1.2.83
+
+
+
+
+
+ org.apache.flink
+ flink-java
+ ${flink.version}
+
+
+
+ org.apache.flink
+ flink-streaming-java
+ ${flink.version}
+
+
+
+ org.apache.flink
+ flink-clients
+ ${flink.version}
+
+
+
+ org.apache.flink
+ flink-table-planner_${scala.binary.version}
+ ${flink.version}
+
+
+
+ org.apache.flink
+ flink-table-api-java-bridge
+ ${flink.version}
+
+
+
+
+ org.apache.flink
+ flink-connector-base
+ ${flink.version}
+
+
+
+ com.ververica
+ flink-connector-mysql-cdc
+ ${mysql-cdc.version}
+
+
+
+ org.apache.flink
+ flink-connector-jdbc
+ ${flink-connector.version}
+
+
+
+ org.apache.flink
+ flink-connector-kafka
+ ${flink-connector.version}
+
+
+ org.apache.flink
+ flink-json
+ ${flink.version}
+
+
+
+ mysql
+ mysql-connector-java
+ 8.0.33
+
+
+
+
+ org.slf4j
+ slf4j-simple
+ ${slf4j.version}
+ compile
+
+
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 3.1
+
+ ${java.version}
+ ${java.version}
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+ 3.0.0
+
+
+
+ package
+
+ shade
+
+
+
+
+
+ org.apache.flink:force-shading
+ com.google.code.findbugs:jsr305
+ org.slf4j:*
+ log4j:*
+
+
+
+
+
+
+
+ *:*
+
+ META-INF/*.SF
+ META-INF/*.DSA
+ META-INF/*.RSA
+
+
+
+
+
+
+
+
+ src.main.java.Main
+
+
+
+
+
+
+
+
+
+
+
+
+
+ org.eclipse.m2e
+ lifecycle-mapping
+ 1.0.0
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+ [3.0.0,)
+
+ shade
+
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ [3.1,)
+
+ testCompile
+ compile
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/src/main/java/flink/sql/KafkaToMySQL.java b/src/main/java/flink/sql/KafkaToMySQL.java
new file mode 100644
index 0000000..659268b
--- /dev/null
+++ b/src/main/java/flink/sql/KafkaToMySQL.java
@@ -0,0 +1,68 @@
+package flink.sql;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.TableEnvironment;
+
+public class KafkaToMySQL {
+ public static void main(String[] args) throws Exception {
+ // 设置流环境并启用 Checkpoint
+ StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+ streamEnv.enableCheckpointing(5000); // 每 5 秒做一次 checkpoint
+
+ // 创建 TableEnvironment
+ TableEnvironment env = StreamTableEnvironment.create(streamEnv);
+
+ // Kafka Source 表定义(Debezium JSON 格式)
+ String createKafkaSource = "CREATE TABLE kafka_source (\n" +
+ " menu_id INT NOT NULL,\n" +
+ " parent_menu_id INT,\n" +
+ " menu_type TINYINT,\n" +
+ " menu_title STRING,\n" +
+ " menu_name STRING,\n" +
+ " menu_path STRING,\n" +
+ " menu_icon STRING,\n" +
+ " menu_server_uri STRING,\n" +
+ " sort_index INT,\n" +
+ " create_time INT,\n" +
+ " update_time INT\n" +
+ ") WITH (\n" +
+ " 'connector' = 'kafka',\n" +
+ " 'topic' = 'kafka_admin_menu',\n" +
+ " 'properties.bootstrap.servers' = 'localhost:9093',\n" +
+ " 'properties.group.id' = 'flink_kafka_to_mysql',\n" +
+ " 'properties.auto.offset.reset' = 'earliest',\n" +
+ " 'format' = 'debezium-json'\n" +
+ ");";
+
+ // MySQL Sink 表定义
+ String createMySQLSink = "CREATE TABLE mysql_sink (\n" +
+ " menu_id INT NOT NULL,\n" +
+ " parent_menu_id INT,\n" +
+ " menu_type TINYINT,\n" +
+ " menu_title STRING,\n" +
+ " menu_name STRING,\n" +
+ " menu_path STRING,\n" +
+ " menu_icon STRING,\n" +
+ " menu_server_uri STRING,\n" +
+ " sort_index INT,\n" +
+ " create_time INT,\n" +
+ " update_time INT,\n" +
+ " PRIMARY KEY(menu_id) NOT ENFORCED\n" +
+ ") WITH (\n" +
+ " 'connector' = 'jdbc',\n" +
+ " 'url' = 'jdbc:mysql://localhost:3336/demo',\n" +
+ " 'table-name' = 'admin_menu1',\n" +
+ " 'username' = 'root',\n" +
+ " 'password' = 'mysql57'\n" +
+ ");";
+
+ // 插入语句:从 Kafka 读取并写入 MySQL
+ String insertSQL = "INSERT INTO mysql_sink SELECT * FROM kafka_source";
+
+ // 执行 SQL
+ env.executeSql(createKafkaSource);
+ env.executeSql(createMySQLSink);
+ env.executeSql(insertSQL);
+ }
+}
diff --git a/src/main/java/flink/sql/MySQLToKafka.java b/src/main/java/flink/sql/MySQLToKafka.java
new file mode 100644
index 0000000..daf722c
--- /dev/null
+++ b/src/main/java/flink/sql/MySQLToKafka.java
@@ -0,0 +1,80 @@
+package flink.sql;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+
+/**
+ * @author yinz
+ * @title: fxsql
+ * @projectName flink
+ * @description: TODO
+ * @date 2022/9/1515:53
+ */
+public class MySQLToKafka {
+ public static void main(String[] args) throws Exception {
+ // 使用 StreamExecutionEnvironment 来控制 Checkpoint
+ StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+ streamEnv.enableCheckpointing(5000); // 每 5 秒做一次 checkpoint
+
+ // 创建 TableEnvironment
+ TableEnvironment env = StreamTableEnvironment.create(streamEnv);
+
+
+ String sql = "CREATE TABLE admin_menu(\n" +
+ " menu_id int NOT NULL ,\n" +
+ " parent_menu_id int,\n" +
+ " menu_type tinyint,\n" +
+ " menu_title STRING,\n" +
+ " menu_name STRING,\n" +
+ " menu_path STRING,\n" +
+ " menu_icon STRING,\n" +
+ " menu_server_uri STRING,\n" +
+ " sort_index int,\n" +
+ " create_time int,\n" +
+ " update_time int,\n" +
+ " PRIMARY KEY(menu_id) NOT ENFORCED\n" +
+ ") with (\n" +
+ " 'connector' = 'mysql-cdc',\n" +
+ " 'scan.startup.mode' = 'initial',\n" +
+ " 'hostname' = 'localhost',\n" +
+ " 'port' = '3336',\n" +
+ " 'username' = 'root',\n" +
+ " 'password' = 'mysql57',\n" +
+ " 'database-name' = 'demo',\n" +
+ " 'debezium.database.server.name' = '123456',\n" +
+ " 'table-name' = 'admin_menu')";
+
+
+// flink写入到kafka
+ String sql1 = "CREATE TABLE admin_menu_kafka(\n" +
+ " menu_id INT NOT NULL,\n" +
+ " parent_menu_id INT,\n" +
+ " menu_type TINYINT,\n" +
+ " menu_title STRING,\n" +
+ " menu_name STRING,\n" +
+ " menu_path STRING,\n" +
+ " menu_icon STRING,\n" +
+ " menu_server_uri STRING,\n" +
+ " sort_index INT,\n" +
+ " create_time INT,\n" +
+ " update_time INT\n" +
+ ") WITH (\n" +
+ " 'connector' = 'kafka',\n" +
+ " 'topic' = 'kafka_admin_menu2',\n" +
+ " 'properties.bootstrap.servers' = '10.23.0.209:9093',\n" +
+ " 'properties.group.id' = 'testGroup',\n" +
+ " 'format' = 'debezium-json'\n" +
+ ");\n";
+
+ String sql2 = "INSERT INTO admin_menu_kafka\n" +
+ "select\n" +
+ " *\n" +
+ "from admin_menu";
+ env.executeSql(sql);
+ env.executeSql(sql1);
+ env.executeSql(sql2);
+
+
+ }
+}
diff --git a/src/main/java/flink/sql/MySQLToKafka整库.java b/src/main/java/flink/sql/MySQLToKafka整库.java
new file mode 100644
index 0000000..e2b01c2
--- /dev/null
+++ b/src/main/java/flink/sql/MySQLToKafka整库.java
@@ -0,0 +1,58 @@
+package flink.sql;
+
+import com.ververica.cdc.connectors.mysql.source.MySqlSource;
+import com.ververica.cdc.connectors.mysql.table.StartupOptions;
+import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+
+import java.util.Properties;
+
+public class MySQLToKafka整库 {
+ public static void main(String[] args) throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.enableCheckpointing(50000); // 每50秒做一次checkpoint
+ env.setParallelism(1); // 根据实际情况调整并行度
+
+ // 创建 MySQL CDC Source
+ MySqlSource mySqlSource = MySqlSource.builder()
+ .hostname("yin520.cn")
+ .port(3306)
+ .databaseList("test") // 监听 demo 数据库
+ .tableList("test.*") // 监听 demo 数据库下的所有表
+ .username("root")
+ .password("yin520.cn")
+ .startupOptions(StartupOptions.initial())
+ .deserializer(new JsonDebeziumDeserializationSchema())
+ .build();
+
+ // 从 source 读取数据
+ DataStream sourceStream = env.fromSource(
+ mySqlSource,
+ WatermarkStrategy.noWatermarks(),
+ "MySQL CDC Source"
+ );
+
+ // 配置 Kafka Producer
+ Properties kafkaProps = new Properties();
+ kafkaProps.setProperty("bootstrap.servers", "10.23.0.209:9093");
+ kafkaProps.setProperty("max.request.size", "5242880"); // 5MB
+ kafkaProps.setProperty("batch.size", "5242880");// 5MB
+
+
+ FlinkKafkaProducer kafkaProducer = new FlinkKafkaProducer<>(
+ "mysql_cdc_demo_all1", // 默认 topic
+ new SimpleStringSchema(), // 序列化 schema
+ kafkaProps // Kafka 配置
+ );
+
+ // 发送数据到 Kafka
+ sourceStream.addSink(kafkaProducer).name("Kafka Sink");
+
+ // 执行任务
+ env.execute("MySQL Database Sync to Kafka");
+ }
+}
diff --git a/src/main/java/flink/sql/fxsql.java b/src/main/java/flink/sql/fxsql.java
new file mode 100644
index 0000000..56587ac
--- /dev/null
+++ b/src/main/java/flink/sql/fxsql.java
@@ -0,0 +1,66 @@
+package flink.sql;
+
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+
+/**
+ * @author yinz
+ * @title: fxsql
+ * @projectName flink
+ * @description: TODO
+ * @date 2022/9/1515:53
+ */
+public class fxsql {
+ public static void main(String[] args) throws Exception {
+ EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
+ TableEnvironment env = TableEnvironment.create(settings);
+ String sql="CREATE TABLE admin_menu(\n" +
+ " menu_id int NOT NULL ,\n" +
+ " parent_menu_id int,\n" +
+ " menu_type tinyint,\n" +
+ " menu_title STRING,\n" +
+ " menu_name STRING,\n" +
+ " menu_path STRING,\n" +
+ " menu_icon STRING,\n" +
+ " menu_server_uri STRING,\n" +
+ " sort_index int,\n" +
+ " create_time int,\n" +
+ " update_time int,\n" +
+ " PRIMARY KEY(menu_id) NOT ENFORCED\n" +
+ ") with (\n" +
+ " 'connector' = 'mysql-cdc',\n" +
+ " 'scan.startup.mode' = 'initial',\n" +
+ " 'hostname' = 'yin520.cn',\n" +
+ " 'port' = '3306',\n" +
+ " 'username' = 'root',\n" +
+ " 'password' = 'yin520.cn',\n" +
+ " 'database-name' = 'test',\n" +
+ " 'table-name' = 'admin_menu')";
+ String sql1="CREATE TABLE admin_menu_a(\n" +
+ " menu_id int NOT NULL ,\n" +
+ " parent_menu_id int,\n" +
+ " menu_type tinyint,\n" +
+ " menu_title STRING,\n" +
+ " menu_name STRING,\n" +
+ " menu_path STRING,\n" +
+ " menu_icon STRING,\n" +
+ " menu_server_uri STRING,\n" +
+ " sort_index int,\n" +
+ " create_time int,\n" +
+ " update_time int,\n" +
+ " PRIMARY KEY(menu_id) NOT ENFORCED\n" +
+ ") with (\n" +
+ " 'connector' = 'jdbc',\n" +
+ " 'url' = 'jdbc:mysql://yin520.cn:3306/yintest1',\n" +
+ " 'username' = 'root',\n" +
+ " 'password' = 'yin520.cn',\n" +
+ " 'table-name' = 'admin_menu')";
+ String sql2="INSERT INTO admin_menu_a\n" +
+ "select\n" +
+ " *\n" +
+ "from admin_menu";
+ env.executeSql( sql);
+ env.executeSql(sql1);
+ env.executeSql(sql2);
+ }
+}
diff --git a/src/main/resources/log4j.properties b/src/main/resources/log4j.properties
new file mode 100644
index 0000000..0aa059b
--- /dev/null
+++ b/src/main/resources/log4j.properties
@@ -0,0 +1,30 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level=DEBUG
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%t] %-5p %c - %m%n
+log4j.logger.io.debezium=DEBUG
+log4j.logger.com.ververica.cdc=DEBUG
\ No newline at end of file