资讯 小学 初中 高中 语言 会计职称 学历提升 法考 计算机考试 医护考试 建工考试 教育百科
栏目分类:
子分类:
返回
空麓网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
空麓网 > 计算机考试 > 软件开发 > 后端开发 > Java

Flink CDC整库同步(多表异构同步)

Java 更新时间: 发布时间: 计算机考试归档 最新发布

Flink CDC整库同步(多表异构同步)

前言

flinkcdc单表同步比较简单,按照官方案例基本都能成功,多表异构同步、整库同步这块一直想尝试一下,社区说使用API可以做到,但是一直没能白嫖到可行方案(代码),然后自己动手尝试了下,咳咳,无奈技术太菜,java各种语法都搞的不是太明白,时间跨度蛮久,中间遇到了不少问题,中途偶然间在群里看到了很久很久以前群友发的一份同步方案,可惜缺少了反序列化的过程,借鉴过来改巴改巴(也改了好几个星期,太菜了),勉强是能跑了,分享出来,能帮到大家一点也就很好了。

方案思路

这个方案的整体思路我先说一下(大佬的思路,我借鉴的),首先我们先使用mysqlcatalog获取到各个表的信息(列名、列类型之类的),然后创建相应的sink table,然后flinkcdc的DataStream是提供了整库获取数据的能力的,所以我们就采用DataStream的方式拿到数据,然后在自定义反序列化里形成的输出,得到DataStream<,然后根据tableName将这个流拆分(过滤),就相当于一个tablename对应一个自己的DataStream,然后将每个流转为一个sourcetable,然后insert into sinktable select * from sourcetable,然后…gameover。

走起:

flink版本:1.15.2(1.15以下版本貌似还没有mysqlcatalog,如果要使用低版本,代码需要调整一下)
flink cdc版本:2.3.0

不巴拉了,直接上代码,场景是mysql -> mysql,sink端如果是其他数据库理论上应该是一样,source表需要有主键,这是flinkcdc底层约定好的,没有会报错。

package com.cityos;import com.ververica.cdc.connectors.mysql.source.MySqlSource;import com.ververica.cdc.connectors.mysql.table.StartupOptions;import org.apache.commons.lang3.StringUtils;import org.apache.flink.api.common.eventtime.WatermarkStrategy;import org.apache.flink.api.common.typeinfo.TypeInformation;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.api.java.typeutils.RowTypeInfo;import org.apache.flink.calcite.shaded.com.google.common.collect.Maps;import org.apache.flink.connector.jdbc.catalog.MySqlCatalog;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.Schema;import org.apache.flink.table.api.StatementSet;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.table.catalog.DefaultCatalogTable;import org.apache.flink.table.catalog.ObjectPath;import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;import org.apache.flink.table.types.DataType;import org.apache.flink.table.types.logical.LogicalType;import org.apache.flink.table.types.logical.RowType;import org.apache.flink.types.Row;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.util.ArrayList;import java.util.List;import java.util.Map;public class FlinkCdcMultiSyncJdbc {    private static final Logger log = LoggerFactory.getLogger(FlinkCdcMultiSyncJdbc.class);    public static void main(String[] args) throws Exception {       // source端连接信息        String userName = "root";        String passWord = "18772247265Ldy@";        String host = "localhost";        String db = "flinktest1";       // 如果是整库,tableList = ".*"        String tableList = "lidy.nlp_category,lidy.nlp_classify_man_made3";        int port = 33306;       // sink连接信息模板        String sink_url = "jdbc:mysql://localhost:33306/flinktest?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai";        String sink_username = "root";        String sink_password = "18772247265Ldy@";        String connectorWithBody =                " with (n" +                        " 'connector' = 'jdbc',n" +                        " 'url' = '${sink_url}',n" +                        " 'username' = '${sink_username}',n" +                        " 'password' = '${sink_password}',n" +                        " 'table-name' = '${tableName}'n" +                        ")";        connectorWithBody = connectorWithBody.replace("${sink_url}", sink_url)                .replace("${sink_username}", sink_username)                .replace("${sink_password}", sink_password);        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        env.enableCheckpointing(3000);        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);        // 注册同步的库对应的catalog        MySqlCatalog mysqlCatalog = new MySqlCatalog("mysql-catalog", db, userName, passWord, String.format("jdbc:mysql://%s:%d", host, port));        List tables = new ArrayList<>();       // 如果整库同步,则从catalog里取所有表,否则从指定表中取表名        if (".*".equals(tableList)) {            tables = mysqlCatalog.listTables(db);        } else {            String[] tableArray = tableList.split(",");            for (String table : tableArray) {                tables.add(table.split(".")[1]);            }        }       // 创建表名和对应RowTypeInfo映射的Map        Map tableTypeInformationMap = Maps.newConcurrentMap();        Map tableDataTypesMap = Maps.newConcurrentMap();        Map tableRowTypeMap = Maps.newConcurrentMap();        for (String table : tables) {            // 获取mysql catalog中注册的表            ObjectPath objectPath = new ObjectPath(db, table);            DefaultCatalogTable catalogBaseTable = (DefaultCatalogTable) mysqlCatalog.getTable(objectPath);            // 获取表的Schema            Schema schema = catalogBaseTable.getUnresolvedSchema();            // 获取表中字段名列表            String[] fieldNames = new String[schema.getColumns().size()];            // 获取DataType            DataType[] fieldDataTypes = new DataType[schema.getColumns().size()];            LogicalType[] logicalTypes = new LogicalType[schema.getColumns().size()];            // 获取表字段类型            TypeInformation[] fieldTypes = new TypeInformation[schema.getColumns().size()];            // 获取表的主键            List primaryKeys = schema.getPrimaryKey().get().getColumnNames();            for (int i = 0; i < schema.getColumns().size(); i++) {                Schema.UnresolvedPhysicalColumn column = (Schema.UnresolvedPhysicalColumn) schema.getColumns().get(i);                fieldNames[i] = column.getName();                fieldDataTypes[i] = (DataType) column.getDataType();                fieldTypes[i] = InternalTypeInfo.of(((DataType) column.getDataType()).getLogicalType());                logicalTypes[i] = ((DataType) column.getDataType()).getLogicalType();            }            RowType rowType = RowType.of(logicalTypes, fieldNames);            tableRowTypeMap.put(table, rowType);            // 组装sink表ddl sql            StringBuilder stmt = new StringBuilder();            String tableName = table;            String jdbcSinkTableName = String.format("jdbc_sink_%s", tableName);            stmt.append("create table ").append(jdbcSinkTableName).append("(n");            for (int i = 0; i < fieldNames.length; i++) {                String column = fieldNames[i];                String fieldDataType = fieldDataTypes[i].toString();                stmt.append("t").append(column).append(" ").append(fieldDataType).append(",n");            }            stmt.append(String.format("PRIMARY KEY (%s) NOT ENFORCEDn)", StringUtils.join(primaryKeys, ",")));            String formatJdbcSinkWithBody = connectorWithBody                    .replace("${tableName}", jdbcSinkTableName);            String createSinkTableDdl = stmt.toString() + formatJdbcSinkWithBody;            // 创建sink表            log.info("createSinkTableDdl: {}", createSinkTableDdl);            tEnv.executeSql(createSinkTableDdl);            tableDataTypesMap.put(tableName, fieldDataTypes);            tableTypeInformationMap.put(tableName, new RowTypeInfo(fieldTypes, fieldNames));        }       // 监控mysql binlog        MySqlSource mySqlSource = MySqlSource.>builder()                .hostname(host)                .port(port)                .databaseList(db)                .tableList(tableList)                .username(userName)                .password(passWord)                .deserializer(new CustomDebeziumDeserializer(tableRowTypeMap))                .startupOptions(StartupOptions.initial())                .build();        SingleOutputStreamOperator> dataStreamSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "mysql cdc").disableChaining();        StatementSet statementSet = tEnv.createStatementSet();        // dataStream转Table,创建临时视图,插入sink表        for (Map.Entry entry : tableTypeInformationMap.entrySet()) {            String tableName = entry.getKey();            RowTypeInfo rowTypeInfo = entry.getValue();            SingleOutputStreamOperator mapStream = dataStreamSource.filter(data -> data.f0.equals(tableName)).map(data -> data.f1, rowTypeInfo);            Table table = tEnv.fromChangelogStream(mapStream);            String temporaryViewName = String.format("t_%s", tableName);            tEnv.createTemporaryView(temporaryViewName, table);            String sinkTableName = String.format("jdbc_sink_%s", tableName);            String insertSql = String.format("insert into %s select * from %s", sinkTableName, temporaryViewName);            log.info("add insertSql for {},sql: {}", tableName, insertSql);            statementSet.addInsertSql(insertSql);        }        statementSet.execute();    }}

对应的反序列化代码

package com.cityos;import com.ververica.cdc.debezium.DebeziumDeserializationSchema;import com.ververica.cdc.debezium.table.DeserializationRuntimeConverter;import com.ververica.cdc.debezium.utils.TemporalConversions;import io.debezium.data.Envelope;import io.debezium.data.SpecialValueDecimal;import io.debezium.data.VariableScaleDecimal;import io.debezium.time.*;import org.apache.flink.api.common.typeinfo.TypeHint;import org.apache.flink.api.common.typeinfo.TypeInformation;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.calcite.shaded.com.google.common.collect.Maps;import org.apache.flink.table.data.DecimalData;import org.apache.flink.table.data.RowData;import org.apache.flink.table.data.StringData;import org.apache.flink.table.data.TimestampData;import org.apache.flink.table.types.logical.DecimalType;import org.apache.flink.table.types.logical.LogicalType;import org.apache.flink.table.types.logical.RowType;import org.apache.flink.types.Row;import org.apache.flink.types.RowKind;import org.apache.flink.util.Collector;import org.apache.kafka.connect.data.Decimal;import org.apache.kafka.connect.data.Field;import org.apache.kafka.connect.data.Schema;import org.apache.kafka.connect.data.Struct;import org.apache.kafka.connect.source.SourceRecord;import java.math.BigDecimal;import java.nio.ByteBuffer;import java.time.Instant;import java.time.LocalDateTime;import java.time.ZoneId;import java.util.Map;public class CustomDebeziumDeserializer implements DebeziumDeserializationSchema {        private final Map tableRowTypeMap;    private Map physicalConverterMap = Maps.newConcurrentMap();    CustomDebeziumDeserializer(Map tableRowTypeMap) {        this.tableRowTypeMap = tableRowTypeMap;        for (String tablename : this.tableRowTypeMap.keySet()) {            RowType rowType = this.tableRowTypeMap.get(tablename);            DeserializationRuntimeConverter physicalConverter =createNotNullConverter(rowType);            this.physicalConverterMap.put(tablename,physicalConverter);        }    }    @Override    public void deserialize(SourceRecord record, Collector out) throws Exception {        Envelope.Operation op = Envelope.operationFor(record);        Struct value = (Struct) record.value();        Schema valueSchema = record.valueSchema();        Struct source = value.getStruct("source");        String tablename = source.get("table").toString();        DeserializationRuntimeConverter physicalConverter = physicalConverterMap.get(tablename);        if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {            Row insert = extractAfterRow(value, valueSchema, physicalConverter);            insert.setKind(RowKind.INSERT);            out.collect(Tuple2.of(tablename,insert));        } else if (op == Envelope.Operation.DELETe) {            Row delete = extractBeforeRow(value, valueSchema, physicalConverter);            delete.setKind(RowKind.DELETE);            out.collect(Tuple2.of(tablename,delete));        } else {            Row before = extractBeforeRow(value, valueSchema, physicalConverter);            before.setKind(RowKind.UPDATE_BEFORE);            out.collect(Tuple2.of(tablename,before));            Row after = extractAfterRow(value, valueSchema, physicalConverter);            after.setKind(RowKind.UPDATE_AFTER);            out.collect(Tuple2.of(tablename,after));        }    }    private Row extractAfterRow(Struct value, Schema valueSchema, DeserializationRuntimeConverter physicalConverter) throws Exception {        Schema afterSchema = valueSchema.field(Envelope.FieldName.AFTER).schema();        Struct after = value.getStruct(Envelope.FieldName.AFTER);        return (Row) physicalConverter.convert(after, afterSchema);    }    private Row extractBeforeRow(Struct value, Schema valueSchema, DeserializationRuntimeConverter physicalConverter) throws Exception {        Schema beforeSchema = valueSchema.field(Envelope.FieldName.BEFORE).schema();        Struct before = value.getStruct(Envelope.FieldName.BEFORE);        return (Row) physicalConverter.convert(before, beforeSchema);    }    @Override    public TypeInformation> getProducedType() {        return TypeInformation.of(new TypeHint>() {        });    }    public static DeserializationRuntimeConverter createNotNullConverter(LogicalType type) {        switch (type.getTypeRoot()) {            case NULL:                return new DeserializationRuntimeConverter() {                    private static final long serialVersionUID = 1L;                    @Override                    public Object convert(Object dbzObj, Schema schema) {                        return null;                    }                };            case BOOLEAN:                return convertToBoolean();            case TINYINT:                return new DeserializationRuntimeConverter() {                    private static final long serialVersionUID = 1L;                    @Override                    public Object convert(Object dbzObj, Schema schema) {                        return Byte.parseByte(dbzObj.toString());                    }                };            case SMALLINT:                return new DeserializationRuntimeConverter() {                    private static final long serialVersionUID = 1L;                    @Override                    public Object convert(Object dbzObj, Schema schema) {                        return Short.parseShort(dbzObj.toString());                    }                };            case INTEGER:            case INTERVAL_YEAR_MONTH:                return convertToInt();            case BIGINT:            case INTERVAL_DAY_TIME:                return convertToLong();            case DATE:                return convertToDate();            case TIME_WITHOUT_TIME_ZONE:                return convertToTime();            case TIMESTAMP_WITHOUT_TIME_ZONE:                return convertToTimestamp(ZoneId.of("UTC"));            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:                return convertToLocalTimeZoneTimestamp(ZoneId.of("UTC"));            case FLOAT:                return convertToFloat();            case DOUBLE:                return convertToDouble();            case CHAR:            case VARCHAR:                return convertToString();            case BINARY:            case VARBINARY:                return convertToBinary();            case DECIMAL:                return createDecimalConverter((DecimalType) type);            case ROW:                return createRowConverter(                        (RowType) type);            case ARRAY:            case MAP:            case MULTISET:            case RAW:            default:                throw new UnsupportedOperationException("Unsupported type: " + type);        }    }    private static DeserializationRuntimeConverter convertToBoolean() {        return new DeserializationRuntimeConverter() {            private static final long serialVersionUID = 1L;            @Override            public Object convert(Object dbzObj, Schema schema) {                if (dbzObj instanceof Boolean) {                    return dbzObj;                } else if (dbzObj instanceof Byte) {                    return (byte) dbzObj == 1;                } else if (dbzObj instanceof Short) {                    return (short) dbzObj == 1;                } else {                    return Boolean.parseBoolean(dbzObj.toString());                }            }        };    }    private static DeserializationRuntimeConverter convertToInt() {        return new DeserializationRuntimeConverter() {            private static final long serialVersionUID = 1L;            @Override            public Object convert(Object dbzObj, Schema schema) {                if (dbzObj instanceof Integer) {                    return dbzObj;                } else if (dbzObj instanceof Long) {                    return ((Long) dbzObj).intValue();                } else {                    return Integer.parseInt(dbzObj.toString());                }            }        };    }    private static DeserializationRuntimeConverter convertToLong() {        return new DeserializationRuntimeConverter() {            private static final long serialVersionUID = 1L;            @Override            public Object convert(Object dbzObj, Schema schema) {                if (dbzObj instanceof Integer) {                    return ((Integer) dbzObj).longValue();                } else if (dbzObj instanceof Long) {                    return dbzObj;                } else {                    return Long.parseLong(dbzObj.toString());                }            }        };    }    private static DeserializationRuntimeConverter createDecimalConverter(DecimalType decimalType) {        final int precision = decimalType.getPrecision();        final int scale = decimalType.getScale();        return new DeserializationRuntimeConverter() {            private static final long serialVersionUID = 1L;            @Override            public Object convert(Object dbzObj, Schema schema) {                BigDecimal bigDecimal;                if (dbzObj instanceof byte[]) {                    // decimal.handling.mode=precise                    bigDecimal = Decimal.toLogical(schema, (byte[]) dbzObj);                } else if (dbzObj instanceof String) {                    // decimal.handling.mode=string                    bigDecimal = new BigDecimal((String) dbzObj);                } else if (dbzObj instanceof Double) {                    // decimal.handling.mode=double                    bigDecimal = BigDecimal.valueOf((Double) dbzObj);                } else {                    if (VariableScaleDecimal.LOGICAL_NAME.equals(schema.name())) {                        SpecialValueDecimal decimal =                                VariableScaleDecimal.toLogical((Struct) dbzObj);                        bigDecimal = decimal.getDecimalValue().orElse(BigDecimal.ZERO);                    } else {                        // fallback to string                        bigDecimal = new BigDecimal(dbzObj.toString());                    }                }                return DecimalData.fromBigDecimal(bigDecimal, precision, scale);            }        };    }    private static DeserializationRuntimeConverter convertToDouble() {        return new DeserializationRuntimeConverter() {            private static final long serialVersionUID = 1L;            @Override            public Object convert(Object dbzObj, Schema schema) {                if (dbzObj instanceof Float) {                    return ((Float) dbzObj).doubleValue();                } else if (dbzObj instanceof Double) {                    return dbzObj;                } else {                    return Double.parseDouble(dbzObj.toString());                }            }        };    }    private static DeserializationRuntimeConverter convertToFloat() {        return new DeserializationRuntimeConverter() {            private static final long serialVersionUID = 1L;            @Override            public Object convert(Object dbzObj, Schema schema) {                if (dbzObj instanceof Float) {                    return dbzObj;                } else if (dbzObj instanceof Double) {                    return ((Double) dbzObj).floatValue();                } else {                    return Float.parseFloat(dbzObj.toString());                }            }        };    }    private static DeserializationRuntimeConverter convertToDate() {        return new DeserializationRuntimeConverter() {            private static final long serialVersionUID = 1L;            @Override            public Object convert(Object dbzObj, Schema schema) {                return (int) TemporalConversions.toLocalDate(dbzObj).toEpochDay();            }        };    }    private static DeserializationRuntimeConverter convertToTime() {        return new DeserializationRuntimeConverter() {            private static final long serialVersionUID = 1L;            @Override            public Object convert(Object dbzObj, Schema schema) {                if (dbzObj instanceof Long) {                    switch (schema.name()) {                        case MicroTime.SCHEMA_NAME:                            return (int) ((long) dbzObj / 1000);                        case NanoTime.SCHEMA_NAME:                            return (int) ((long) dbzObj / 1000_000);                    }                } else if (dbzObj instanceof Integer) {                    return dbzObj;                }                // get number of milliseconds of the day                return TemporalConversions.toLocalTime(dbzObj).toSecondOfDay() * 1000;            }        };    }    private static DeserializationRuntimeConverter convertToTimestamp(ZoneId serverTimeZone) {        return new DeserializationRuntimeConverter() {            private static final long serialVersionUID = 1L;            @Override            public Object convert(Object dbzObj, Schema schema) {                if (dbzObj instanceof Long) {                    switch (schema.name()) {                        case Timestamp.SCHEMA_NAME:                            return TimestampData.fromEpochMillis((Long) dbzObj);                        case MicroTimestamp.SCHEMA_NAME:                            long micro = (long) dbzObj;                            return TimestampData.fromEpochMillis(                                    micro / 1000, (int) (micro % 1000 * 1000));                        case NanoTimestamp.SCHEMA_NAME:                            long nano = (long) dbzObj;                            return TimestampData.fromEpochMillis(                                    nano / 1000_000, (int) (nano % 1000_000));                    }                }                LocalDateTime localDateTime =                        TemporalConversions.toLocalDateTime(dbzObj, serverTimeZone);                return TimestampData.fromLocalDateTime(localDateTime);            }        };    }    private static DeserializationRuntimeConverter convertToLocalTimeZoneTimestamp(            ZoneId serverTimeZone) {        return new DeserializationRuntimeConverter() {            private static final long serialVersionUID = 1L;            @Override            public Object convert(Object dbzObj, Schema schema) {                if (dbzObj instanceof String) {                    String str = (String) dbzObj;                    // TIMESTAMP_LTZ type is encoded in string type                    Instant instant = Instant.parse(str);                    return TimestampData.fromLocalDateTime(                            LocalDateTime.ofInstant(instant, serverTimeZone));                }                throw new IllegalArgumentException(                        "Unable to convert to TimestampData from unexpected value '"                                + dbzObj                                + "' of type "                                + dbzObj.getClass().getName());            }        };    }    private static DeserializationRuntimeConverter convertToString() {        return new DeserializationRuntimeConverter() {            private static final long serialVersionUID = 1L;            @Override            public Object convert(Object dbzObj, Schema schema) {                return StringData.fromString(dbzObj.toString());            }        };    }    private static DeserializationRuntimeConverter convertToBinary() {        return new DeserializationRuntimeConverter() {            private static final long serialVersionUID = 1L;            @Override            public Object convert(Object dbzObj, Schema schema) {                if (dbzObj instanceof byte[]) {                    return dbzObj;                } else if (dbzObj instanceof ByteBuffer) {                    ByteBuffer byteBuffer = (ByteBuffer) dbzObj;                    byte[] bytes = new byte[byteBuffer.remaining()];                    byteBuffer.get(bytes);                    return bytes;                } else {                    throw new UnsupportedOperationException(                            "Unsupported BYTES value type: " + dbzObj.getClass().getSimpleName());                }            }        };    }    private static DeserializationRuntimeConverter createRowConverter(RowType rowType) {        final DeserializationRuntimeConverter[] fieldConverters =                rowType.getFields().stream()                        .map(RowType.RowField::getType)                        .map(                                logicType ->                                        createNotNullConverter( logicType))                        .toArray(DeserializationRuntimeConverter[]::new);        final String[] fieldNames = rowType.getFieldNames().toArray(new String[0]);        return new DeserializationRuntimeConverter() {            private static final long serialVersionUID = 1L;            @Override            public Object convert(Object dbzObj, Schema schema) throws Exception {                Struct struct = (Struct) dbzObj;                int arity = fieldNames.length;                Row row = new Row(arity);                for (int i = 0; i < arity; i++) {                    String fieldName = fieldNames[i];                    Field field = schema.field(fieldName);                    if (field == null) {                        row.setField(i, null);                    } else {                        Object fieldValue = struct.getWithoutDefault(fieldName);                        Schema fieldSchema = schema.field(fieldName).schema();                        Object convertedField =                                convertField(fieldConverters[i], fieldValue, fieldSchema);                        row.setField(i, convertedField);                    }                }                return row;            }        };    }    private static Object convertField(            DeserializationRuntimeConverter fieldConverter, Object fieldValue, Schema fieldSchema)            throws Exception {        if (fieldValue == null) {            return null;        } else {            return fieldConverter.convert(fieldValue, fieldSchema);        }    }}

再贴上我的pom.xml

    4.0.0    com.cityos    flink_1_15    1.0-SNAPSHOT            1.8        UTF-8        UTF-8        2.3.7.RELEASE        1.15.2        2.12                                    scala-tools.org            Scala-Tools Maven2 Repository            http://scala-tools.org/repo-releases                            spring            https://maven.aliyun.com/repository/spring                            cloudera            https://repository.cloudera.com/artifactory/cloudera-repos/                                    org.apache.flink            flink-scala_${scala.binary.version}            ${flink.version}                            org.apache.flink            flink-streaming-scala_${scala.binary.version}            ${flink.version}                            org.apache.flink            flink-table-planner_${scala.binary.version}            ${flink.version}                                        org.apache.flink            flink-table-api-scala-bridge_${scala.binary.version}            ${flink.version}                            org.apache.flink            flink-table-common            ${flink.version}                                        org.apache.flink            flink-clients            ${flink.version}                                    org.apache.flink            flink-connector-kafka            ${flink.version}                                    org.apache.flink            flink-connector-jdbc            ${flink.version}                                    com.ververica            flink-connector-mysql-cdc            2.3.0                                    mysql            mysql-connector-java            8.0.29                            org.apache.flink            flink-json            ${flink.version}                            org.apache.flink            flink-csv            ${flink.version}                                    org.slf4j            slf4j-log4j12            1.7.21            compile                                    log4j            log4j            1.2.17                                                    org.springframework.boot                spring-boot-dependencies                ${spring-boot.version}                pom                import                                                                org.apache.maven.plugins                maven-compiler-plugin                3.8.1                                    1.8                    1.8                    UTF-8                                                        org.springframework.boot                spring-boot-maven-plugin                2.3.7.RELEASE                                    com.cityos.Flink1142Application                                                                            repackage                                                    repackage                                                                                    

有兴趣的看看,没兴趣的或者感觉不屑的划过就好,莫喷我,代码写的确实是丑。

转载请注明:文章转载自 http://www.konglu.com/
本文地址:http://www.konglu.com/it/1097474.html
免责声明:

我们致力于保护作者版权,注重分享,被刊用文章【Flink CDC整库同步(多表异构同步)】因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理,本文部分文字与图片资源来自于网络,转载此文是出于传递更多信息之目的,若有来源标注错误或侵犯了您的合法权益,请立即通知我们,情况属实,我们会第一时间予以删除,并同时向您表示歉意,谢谢!

我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2023 成都空麓科技有限公司

ICP备案号:蜀ICP备2023000828号-2