数据连接扩展
数据源说明
Innospots 支持自定义不同类型的数据源扩展,开发人员可通过扩展数据源以支持平台对不同类型数据源的访问和数据操作。 通过不同数据源的开发扩展来支持多种类型数据库、消息队列、KV存储等不同类型的数据操作以及数据访问。
- Innospots提供统一的数据操作接口,向系统提供统一的数据操作接入,面向不同的数据源通过数据扩展支撑多种数据库类型数据操作和平台访问接入。
- 数据源扩展在扩展工程中定义配置数据源的配置项,配置数据源在管理平台端的配置表单。
- Innospots的数据扩展支持对数据源Schema的解析扩展,支持通用数据操作的接口实现,SQL语句数据操作接口实现,面向消息队列数据操作接口实现。
数据源相关功能模块
公共基础代码结构
Innospots基础数据包在io.innospots.base.data包下,包含数据操作接口,消息队列接口,数据schema定义,以及数据管理。
数据扩展的主要接口和功能类
- IDataConnectionMinder,数据源连接接口,所有数据源都必须实现此接口,主要方法包括:
- 初始化数据源:
void initialize(ISchemaRegistryReader dataSchemaReader, DatabaseConfig databaseConfig);
- 获取数据源配置:
DatabaseConfig databaseConfig();
- 打开连接:
void open();
- 测试连接:
boolean test(Map<String, Object> configs);
- 关闭连接:
void close();
- 获取数据源对象:
DataSource datasource();
- 获取数据源标识:
String dbType();
- 获取数据操作对象:
default IDataOperator dataOperator()
- 获取SQL操作对象:
default ISqlOperator sqlOperator()
- 获取消费者:
default IDataReceiver dataReceiver(String topic, String group, String offset, String format, long pollTimeout, int pollSize)
- 获取生产者:
default IDataSender dataSender()
- 取样数据接口:
Object fetchSample(DatabaseConfig databaseConfig, String tableName);
- schema定义
- 获取注册表列表简易结构:
List<SchemaCatalog> schemaCatalogs();
- 获取注册表列表:
List<SchemaRegistry> schemaRegistries(boolean includeField);
- 获取注册表详情:
SchemaRegistry schemaRegistry(String tableName);
SchemaRegistry schemaRegistry(Integer registryId);
- 获取注册表字段列表:
List<SchemaField> schemaRegistryFields(String tableName);
- 获取注册表列表简易结构:
- 初始化数据源:
- IDataOperator, 数据源CRUD操作接口
- ISqlOperator,数据源SQL操作接口
- IDataReceiver,消费者接口
- IDataSender,生产者接口
- ISchemaDatasourceReader,数据源读取接口
- ISchemaRegistryReader,注册表读取接口
- DataConnectionMinderManager,数据源连接管理类
- SchemaDatasourceConfig,数据源配置定义model类,保存数据源配置定义信息
数据源管理模块
innospot-libra-datasource
,模块中定义了对数据源的配置加载,数据源的管理,字段管理功能,作为管理平台的数据源管理模块使用。
数据源表单配置结构
数据源表单配置文件放在classpath:/META-SOURCE/
目录下,配置文件将被SchemaDataSourceConfigLoader,配置文件为JSON结构格式如下:
{
"sourceName": "MYSQL",
"icon": "/static/svg/Mysql.svg",
"order": 200,
"dbType": "mysql",
"sourceType": "JDBC",
"enabled": true,
"defaults": {
"classDriver": "com.mysql.cj.jdbc.Driver",
"maximumPoolSize": "10",
"minimumIdle": "1",
"maxLifetime": "60000",
"idleTimeout": "30000",
"connectionTimeout": "30000",
"validationTimeout": "3000",
"connectionTestQuery": "select 1"
},
"elements": [
{
"label": "名称",
"name": "name",
"placeholder": "请输入数据源名称",
"required": true,
"labelGrid": 4,
"tips": "中文或英文有效字符,小于16个字符",
"type": "INPUT",
"expression": "",
"gridSize": 8
},
{
"label": "连接方式",
"name": "connect_type",
"placeholder": "连接方式",
"required": true,
"labelGrid": 4,
"tips": "连接方式",
"type": "RADIO",
"expression": "",
"gridSize": 8,
"options": [
{
"name": "JDBC",
"value": "JDBC"
},
{
"name": "CDC",
"value": "CDC"
}
]
},
{
"label": "用户名",
"name": "user_name",
"placeholder": "访问数据库用户名",
"required": true,
"labelGrid": 4,
"tips": "访问数据库用户名",
"type": "INPUT",
"expression": "",
"gridSize": 8,
"options": []
},
{
"label": "密码",
"name": "db_password",
"placeholder": "数据库密码",
"required": true,
"labelGrid": 4,
"tips": "数据库密码",
"type": "PASSWORD",
"expression": "",
"gridSize": 8,
"options": []
},
{
"label": "JdbcUrl",
"name": "jdbc_url",
"placeholder": "数据库连接JDBC地址",
"required": true,
"labelGrid": 4,
"tips": "数据库连接JDBC地址",
"type": "INPUT",
"expression": "",
"gridSize": 20,
"options": []
},
{
"label": "描述",
"name": "description",
"placeholder": "介绍描述",
"required": false,
"labelGrid": 4,
"tips": "介绍描述,不超过50个字符",
"type": "TEXTAREA",
"expression": "",
"gridSize": 20,
"options": []
}
]
}
主要配置项说明:
- sourceName,数据源名称
- icon,数据源图标,svg文件
- order,(TODO,代码中未使用)
- dbType,唯一标识
- enabled,是否启用
- defaults,默认配置项
- elements,页面配置元素定义
数据扩展工程
数据源扩展工程使用maven工程结构,使用innospot-extension-connector
作为父类工程模块
maven 的pom文件示例:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>innospot-extension-connector</artifactId>
<groupId>live.innospot</groupId>
<version>1.1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>innospot-connector-mysql</artifactId>
<dependencies>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
</dependencies>
</project>
数据源扩展示例工程参见连接,示例工程
扩展实现
1. meta_config.json定义
在resources/META-SOURCE/
目录下定义数据源表单配置文件。
2. SPI描述文件定义和SPI接口实现
在resources/META-INF/services
目录下创建 io.innospots.base.data.minder.IDataConnectionMinder
文件,
在文件中增加IDataConnectionMinder的接口实现类:io.innospots.data.mysql.minder.MysqlDataConnectionMinder
IDataConnectionMinder接口实现
package io.innospots.data.mysql.minder;
import io.innospots.base.data.minder.jdbc.JdbcDataConnectionMinder;
import io.innospots.base.data.operator.IDataOperator;
import io.innospots.base.data.operator.ISqlOperator;
import io.innospots.data.mysql.operator.MysqlDataOperator;
import io.innospots.data.mysql.operator.MysqlSqlOperator;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
/**
* @author Raydian
* @date 2021/1/31
*/
@Slf4j
public class MysqlDataConnectionMinder extends JdbcDataConnectionMinder {
@Override
public void open() {
if (dataSource != null) {
return;
}
Map<String, Object> config = this.databaseConfig.getConfig();
config.put(DRIVER_CLASS_NAME, "com.mysql.cj.jdbc.Driver");
this.dataSource = buildDataSource(databaseConfig);
}
@Override
public boolean test(Map<String, Object> configs) {
// TODO 待改造calcite
configs.put(DRIVER_CLASS_NAME, "com.mysql.cj.jdbc.Driver");
return super.test(configs);
}
@Override
public IDataOperator dataOperator() {
if (this.dataOperator == null) {
this.dataOperator = new MysqlDataOperator(dataSource);
}
return dataOperator;
}
@Override
public ISqlOperator sqlOperator() {
if (this.sqlOperator == null) {
this.sqlOperator = new MysqlSqlOperator(dataSource);
}
return sqlOperator;
}
@Override
public String dbType() {
return "mysql";
}
}
3. 接口实现
对于数据操作实现以下两个接口
实现IDataOperator接口
package io.innospots.base.data.operator;
import io.innospots.base.condition.Factor;
import io.innospots.base.data.model.DataBody;
import io.innospots.base.model.PageBody;
import io.innospots.base.model.response.InnospotResponse;
import io.innospots.base.utils.StringConverter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* 数据的增删改查操作
*
* @author Smars
* @date 2021/5/3
*/
public interface IDataOperator {
/**
* 查询列表
*
* @param tableName
* @param condition
* @param page
* @param size
* @return
*/
InnospotResponse<PageBody<Map<String, Object>>> selectForList(String tableName, List<Factor> condition, int page, int size);
InnospotResponse<PageBody<Map<String, Object>>> selectForList(SelectClause selectClause);
/**
* 查询最新的数据
*
* @param tableName 表或者数据集名称
* @param upTimeField 时间字段
* @param size 返回数据量
* @return
*/
InnospotResponse<PageBody<Map<String, Object>>> selectLatest(String tableName, String upTimeField, int size);
/**
* 查询单条
*
* @param tableName
* @param condition
* @return
*/
InnospotResponse<DataBody<Map<String, Object>>> selectForObject(String tableName, List<Factor> condition);
InnospotResponse<DataBody<Map<String, Object>>> selectForObject(String tableName, String key, String value);
InnospotResponse<DataBody<Map<String, Object>>> selectForObject(SelectClause selectClause);
/**
* 插入数据
*
* @param tableName
* @param data
* @return
*/
InnospotResponse<Integer> insert(String tableName, Map<String, Object> data);
InnospotResponse<Integer> insertBatch(String tableName, List<Map<String, Object>> data);
/**
*
* @param tableName
* @param keyColumn
* @param data
* @return
*/
InnospotResponse<Integer> upsert(String tableName,String keyColumn ,Map<String, Object> data);
InnospotResponse<Integer> upsertBatch(String tableName,String keyColumn ,List<Map<String, Object>> data);
InnospotResponse<Integer> update(String tableName, UpdateItem item);
InnospotResponse<Integer> updateForBatch(String tableName, List<UpdateItem> items);
InnospotResponse<Integer> delete(String tableName, List<Factor> condition);
InnospotResponse<Integer> deleteBatch(
String tableName,
List<Factor> condition);
InnospotResponse<DataBody> execute(String tableName, Map<String, Object> body);
default Map<String, Object> mapKeyToCamel(Map<String, Object> data) {
Map<String, Object> resultMap = null;
if (data != null) {
resultMap = new HashMap<>();
for (String key : data.keySet()) {
resultMap.put(StringConverter.underscoreToCamel(key), data.get(key));
}
}
return resultMap;
}
default List<Map<String, Object>> mapKeyToCamel(List<Map<String, Object>> dataList) {
List<Map<String, Object>> list = null;
if (dataList != null) {
list = new ArrayList<>();
for (Map<String, Object> data : dataList) {
list.add(mapKeyToCamel(data));
}
}
return list;
}
}
对于支持SQL执行语句的数据源实现ISqlOperator接口
package io.innospots.base.data.operator;
import io.innospots.base.data.model.DataBody;
import io.innospots.base.model.PageBody;
import io.innospots.base.model.response.InnospotResponse;
import java.util.List;
import java.util.Map;
/**
* 以sql语句的方式执行数据操作, 查询语句均放在post方法体中
*
* @author Smars
* @date 2021/5/3
*/
public interface ISqlOperator {
InnospotResponse<DataBody<Map<String,Object>>> selectForObject(String sql);
InnospotResponse<PageBody> selectForList(String sql);
InnospotResponse<PageBody> selectForList(String sql, int page, int size);
InnospotResponse<Integer> executeForSql(String sql);
InnospotResponse<Integer> executeForSqlBatch(List<String> sql);
}
对于消息队列实现以下两个接口
IDataReceiver,消息队列数据接收实现
package io.innospots.base.data.operator;
import io.innospots.base.data.model.DataBody;
import io.innospots.base.model.PageBody;
import io.innospots.base.model.response.InnospotResponse;
import java.util.Map;
/**
* @author Smars
* @date 2021/5/10
*/
public interface IDataReceiver {
void openSubscribe(String group);
void openSubscribe();
void openSubscribe(String topic, String group, Long pollTimeOut);
void assign(String topic, String group, Long pollTimeOut, Long seekOffset);
InnospotResponse<DataBody<Map<String, Object>>> receive();
InnospotResponse<PageBody<Map<String, Object>>> receive(int size);
InnospotResponse<DataBody<Map<String, Object>>> receiveLastData();
void close();
boolean hasCache();
String key();
}
IDataSender,消息队列数据发送实现
package io.innospots.base.data.operator;
import java.util.List;
import java.util.Map;
/**
* @author Smars
* @date 2021/5/10
*/
public interface IDataSender {
void openSender();
Map<String,Object> send(String tableName, Map<String,Object> body);
Map<String,Object> send(String tableName,List<Map<String,Object>> bodies);
void close();
}
4. 打包发布
使用mvn clean package
进行打包发布
或者使用mvn deploy
将发布包推送至maven仓库中发布使用
5. 扩展安装
-
在Libra的Server工程中引入数据扩展依赖包
-
在管理平台端中查看安装扩展的数据源
数据源扩展实现
当前已支持的数据源扩展实现模块
- mysql
- redis
- kafka
- http