This commit is contained in:
尹舟 2025-07-25 17:04:18 +08:00
commit 0ceb53fdab
6 changed files with 513 additions and 0 deletions

211
pom.xml Normal file
View File

@ -0,0 +1,211 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>flinkcdc</artifactId>
<version>1.0</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<encoding>UTF-8</encoding>
<java.version>1.8</java.version>
<flink.version>1.19.1</flink.version>
<scala.binary.version>2.12</scala.binary.version>
<mysql-cdc.version>3.0.1</mysql-cdc.version>
<flink-connector.version>3.2.0-1.19</flink-connector.version>
<slf4j.version>1.7.25</slf4j.version>
<lombok.version>1.18.8</lombok.version>
<fastjson.version>1.2.83</fastjson.version>
</properties>
<dependencies>
<!-- 编写 Flink 批处理/流处理程序 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- 实时数据处理 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- 提交、管理 Flink 作业 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Java 与 Table API 桥接 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- 使用 SQL -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- 连接器基础库 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- 实时同步 MySQL 数据 -->
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>${mysql-cdc.version}</version>
</dependency>
<!-- JDBC 支持 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>${flink-connector.version}</version>
</dependency>
<!-- Kafka Source/Sink -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink-connector.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- MySQL JDBC 驱动 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.33</version>
</dependency>
<!--日志记录-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>${slf4j.version}</version>
<scope>compile</scope>
</dependency>
</dependencies>
<!--打包的配置-->
<build>
<plugins>
<!-- Java 编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
<!-- 使用 maven-shade-plugin 打包并指定 mainClass -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<!-- 在 package 阶段执行 shade 目标 -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<!-- 排除一些不需要打包进 jar 的依赖 -->
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<!-- 过滤资源文件 -->
<filters>
<filter>
<!-- 不要复制 META-INF 中的签名文件 -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<!-- 指定 mainClass -->
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<!-- 替换为你自己的 main 类路径 -->
<mainClass>src.main.java.Main</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
<!-- 插件管理(可选) -->
<pluginManagement>
<plugins>
<!-- 在 Eclipse 中忽略一些警告 -->
<plugin>
<groupId>org.eclipse.m2e</groupId>
<artifactId>lifecycle-mapping</artifactId>
<version>1.0.0</version>
<configuration>
<lifecycleMappingMetadata>
<pluginExecutions>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<versionRange>[3.0.0,)</versionRange>
<goals>
<goal>shade</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore/>
</action>
</pluginExecution>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<versionRange>[3.1,)</versionRange>
<goals>
<goal>testCompile</goal>
<goal>compile</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore/>
</action>
</pluginExecution>
</pluginExecutions>
</lifecycleMappingMetadata>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>

View File

@ -0,0 +1,68 @@
package flink.sql;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.TableEnvironment;
public class KafkaToMySQL {
public static void main(String[] args) throws Exception {
// 设置流环境并启用 Checkpoint
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
streamEnv.enableCheckpointing(5000); // 5 秒做一次 checkpoint
// 创建 TableEnvironment
TableEnvironment env = StreamTableEnvironment.create(streamEnv);
// Kafka Source 表定义Debezium JSON 格式
String createKafkaSource = "CREATE TABLE kafka_source (\n" +
" menu_id INT NOT NULL,\n" +
" parent_menu_id INT,\n" +
" menu_type TINYINT,\n" +
" menu_title STRING,\n" +
" menu_name STRING,\n" +
" menu_path STRING,\n" +
" menu_icon STRING,\n" +
" menu_server_uri STRING,\n" +
" sort_index INT,\n" +
" create_time INT,\n" +
" update_time INT\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'kafka_admin_menu',\n" +
" 'properties.bootstrap.servers' = 'localhost:9093',\n" +
" 'properties.group.id' = 'flink_kafka_to_mysql',\n" +
" 'properties.auto.offset.reset' = 'earliest',\n" +
" 'format' = 'debezium-json'\n" +
");";
// MySQL Sink 表定义
String createMySQLSink = "CREATE TABLE mysql_sink (\n" +
" menu_id INT NOT NULL,\n" +
" parent_menu_id INT,\n" +
" menu_type TINYINT,\n" +
" menu_title STRING,\n" +
" menu_name STRING,\n" +
" menu_path STRING,\n" +
" menu_icon STRING,\n" +
" menu_server_uri STRING,\n" +
" sort_index INT,\n" +
" create_time INT,\n" +
" update_time INT,\n" +
" PRIMARY KEY(menu_id) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = 'jdbc',\n" +
" 'url' = 'jdbc:mysql://localhost:3336/demo',\n" +
" 'table-name' = 'admin_menu1',\n" +
" 'username' = 'root',\n" +
" 'password' = 'mysql57'\n" +
");";
// 插入语句 Kafka 读取并写入 MySQL
String insertSQL = "INSERT INTO mysql_sink SELECT * FROM kafka_source";
// 执行 SQL
env.executeSql(createKafkaSource);
env.executeSql(createMySQLSink);
env.executeSql(insertSQL);
}
}

View File

@ -0,0 +1,80 @@
package flink.sql;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
* @author yinz
* @title: fxsql
* @projectName flink
* @description: TODO
* @date 2022/9/1515:53
*/
public class MySQLToKafka {
public static void main(String[] args) throws Exception {
// 使用 StreamExecutionEnvironment 来控制 Checkpoint
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
streamEnv.enableCheckpointing(5000); // 5 秒做一次 checkpoint
// 创建 TableEnvironment
TableEnvironment env = StreamTableEnvironment.create(streamEnv);
String sql = "CREATE TABLE admin_menu(\n" +
" menu_id int NOT NULL ,\n" +
" parent_menu_id int,\n" +
" menu_type tinyint,\n" +
" menu_title STRING,\n" +
" menu_name STRING,\n" +
" menu_path STRING,\n" +
" menu_icon STRING,\n" +
" menu_server_uri STRING,\n" +
" sort_index int,\n" +
" create_time int,\n" +
" update_time int,\n" +
" PRIMARY KEY(menu_id) NOT ENFORCED\n" +
") with (\n" +
" 'connector' = 'mysql-cdc',\n" +
" 'scan.startup.mode' = 'initial',\n" +
" 'hostname' = 'localhost',\n" +
" 'port' = '3336',\n" +
" 'username' = 'root',\n" +
" 'password' = 'mysql57',\n" +
" 'database-name' = 'demo',\n" +
" 'debezium.database.server.name' = '123456',\n" +
" 'table-name' = 'admin_menu')";
// flink写入到kafka
String sql1 = "CREATE TABLE admin_menu_kafka(\n" +
" menu_id INT NOT NULL,\n" +
" parent_menu_id INT,\n" +
" menu_type TINYINT,\n" +
" menu_title STRING,\n" +
" menu_name STRING,\n" +
" menu_path STRING,\n" +
" menu_icon STRING,\n" +
" menu_server_uri STRING,\n" +
" sort_index INT,\n" +
" create_time INT,\n" +
" update_time INT\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'kafka_admin_menu2',\n" +
" 'properties.bootstrap.servers' = '10.23.0.209:9093',\n" +
" 'properties.group.id' = 'testGroup',\n" +
" 'format' = 'debezium-json'\n" +
");\n";
String sql2 = "INSERT INTO admin_menu_kafka\n" +
"select\n" +
" *\n" +
"from admin_menu";
env.executeSql(sql);
env.executeSql(sql1);
env.executeSql(sql2);
}
}

View File

@ -0,0 +1,58 @@
package flink.sql;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import java.util.Properties;
public class MySQLToKafka整库 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(50000); // 每50秒做一次checkpoint
env.setParallelism(1); // 根据实际情况调整并行度
// 创建 MySQL CDC Source
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("yin520.cn")
.port(3306)
.databaseList("test") // 监听 demo 数据库
.tableList("test.*") // 监听 demo 数据库下的所有表
.username("root")
.password("yin520.cn")
.startupOptions(StartupOptions.initial())
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
// source 读取数据
DataStream<String> sourceStream = env.fromSource(
mySqlSource,
WatermarkStrategy.noWatermarks(),
"MySQL CDC Source"
);
// 配置 Kafka Producer
Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers", "10.23.0.209:9093");
kafkaProps.setProperty("max.request.size", "5242880"); // 5MB
kafkaProps.setProperty("batch.size", "5242880");// 5MB
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(
"mysql_cdc_demo_all1", // 默认 topic
new SimpleStringSchema(), // 序列化 schema
kafkaProps // Kafka 配置
);
// 发送数据到 Kafka
sourceStream.addSink(kafkaProducer).name("Kafka Sink");
// 执行任务
env.execute("MySQL Database Sync to Kafka");
}
}

View File

@ -0,0 +1,66 @@
package flink.sql;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
/**
* @author yinz
* @title: fxsql
* @projectName flink
* @description: TODO
* @date 2022/9/1515:53
*/
public class fxsql {
public static void main(String[] args) throws Exception {
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
TableEnvironment env = TableEnvironment.create(settings);
String sql="CREATE TABLE admin_menu(\n" +
" menu_id int NOT NULL ,\n" +
" parent_menu_id int,\n" +
" menu_type tinyint,\n" +
" menu_title STRING,\n" +
" menu_name STRING,\n" +
" menu_path STRING,\n" +
" menu_icon STRING,\n" +
" menu_server_uri STRING,\n" +
" sort_index int,\n" +
" create_time int,\n" +
" update_time int,\n" +
" PRIMARY KEY(menu_id) NOT ENFORCED\n" +
") with (\n" +
" 'connector' = 'mysql-cdc',\n" +
" 'scan.startup.mode' = 'initial',\n" +
" 'hostname' = 'yin520.cn',\n" +
" 'port' = '3306',\n" +
" 'username' = 'root',\n" +
" 'password' = 'yin520.cn',\n" +
" 'database-name' = 'test',\n" +
" 'table-name' = 'admin_menu')";
String sql1="CREATE TABLE admin_menu_a(\n" +
" menu_id int NOT NULL ,\n" +
" parent_menu_id int,\n" +
" menu_type tinyint,\n" +
" menu_title STRING,\n" +
" menu_name STRING,\n" +
" menu_path STRING,\n" +
" menu_icon STRING,\n" +
" menu_server_uri STRING,\n" +
" sort_index int,\n" +
" create_time int,\n" +
" update_time int,\n" +
" PRIMARY KEY(menu_id) NOT ENFORCED\n" +
") with (\n" +
" 'connector' = 'jdbc',\n" +
" 'url' = 'jdbc:mysql://yin520.cn:3306/yintest1',\n" +
" 'username' = 'root',\n" +
" 'password' = 'yin520.cn',\n" +
" 'table-name' = 'admin_menu')";
String sql2="INSERT INTO admin_menu_a\n" +
"select\n" +
" *\n" +
"from admin_menu";
env.executeSql( sql);
env.executeSql(sql1);
env.executeSql(sql2);
}
}

View File

@ -0,0 +1,30 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
# Set root logger level to OFF to not flood build logs
# set manually to INFO for debugging purposes
rootLogger.level=DEBUG
rootLogger.appenderRef.test.ref = TestLogger
appender.testlogger.name = TestLogger
appender.testlogger.type = CONSOLE
appender.testlogger.target = SYSTEM_ERR
appender.testlogger.layout.type = PatternLayout
appender.testlogger.layout.pattern = %-4r [%t] %-5p %c - %m%n
log4j.logger.io.debezium=DEBUG
log4j.logger.com.ververica.cdc=DEBUG