动态数据源

Jingxc大约 8 分钟java后端java后端Mysql

动态数据源

在工作中,我们可能会遇到下面这些情况,如果只是在配置文件中配置数据源就有可能太繁琐,或者无法满足要求

  1. 需要配置不止一个数据源,数据源的链接信息都不相同
  2. 需要在服务运行过程中,动态的添加,减少,切换所使用的数据源
  3. 同一服务中不同的接口服务需要调用不同的数据源信息

1.创建当前线程工具类

通过当前线程工具类,保留当前线程数据源信息,防止数据源切换时导致数据错误

package com.game.server.source;

import lombok.extern.log4j.Log4j;

/**
 * 保留当前线程数据源信息
 * 
 */
@Log4j
public class DataSourceContextHolder {

    /**
     * 线程级别的私有变量
     */
    private static final ThreadLocal<String> CONTEXTHOLDER = new ThreadLocal<>();

    /**
     * 切换数据源
     */
    public static void setDataSource(String datasourceId) {
        CONTEXTHOLDER.set(datasourceId);
        log.info("已切换到数据源:{}" + datasourceId);
    }

    public static String getDataSource() {
        return CONTEXTHOLDER.get();
    }

    /**
     * 删除数据源
     */
    public static void removeDataSource() {
        CONTEXTHOLDER.remove();
        log.info("已切换到主数据源");
    }

}

2.创建动态数据源

通过实现Spring提供的AbstractRoutingDataSource类,我们可以实现自己的数据源选择逻辑,从而可以实现数据源的动态切换。

package com.game.server.source;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Map;
import java.util.Set;

import org.apache.commons.lang3.StringUtils;
import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;

import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.pool.DruidPooledConnection;
import com.alibaba.druid.stat.DruidDataSourceStatManager;
import com.game.server.bean.DataSourceBean;

import lombok.extern.log4j.Log4j;

@Log4j
public class DynamicRoutingDataSource extends AbstractRoutingDataSource {

    /**
     * 存储我们注册的数据源
     */
    private volatile Map<Object, Object> customDataSources;

    @Override
    protected Object determineCurrentLookupKey() {
        String datasourceId = DataSourceContextHolder.getDataSource();
        if (!StringUtils.isEmpty(datasourceId)) {
            Map<Object, Object> map = this.customDataSources;
            if (map.containsKey(datasourceId)) {
                log.info("当前数据源是:" + datasourceId);
            } else {
                log.info("不存在数据源:" + datasourceId);
                return null;
            }
        } else {
            log.info("当前是默认数据源");
        }
        return datasourceId;
    }

    @Override
    public void setTargetDataSources(Map<Object, Object> param) {

        super.setTargetDataSources(param);
        this.customDataSources = param;
    }

    /**
     * @Description: 检查数据源是否已经创建
     * @param dataSource
     */
    public void checkCreateDataSource(DataSourceBean dataSource) {
        String datasourceId = dataSource.getDataSourceName();
        Map<Object, Object> map = this.customDataSources;
        if (map.containsKey(datasourceId)) {
            // 这里检查一下之前创建的数据源,现在是否连接正常
            DruidDataSource druidDataSource = (DruidDataSource) map.get(datasourceId);
            boolean flag = true;
            DruidPooledConnection connection = null;
            try {
                connection = druidDataSource.getConnection();
            } catch (SQLException throwables) {
                // 抛异常了说明连接失效吗,则删除现有连接
                log.error(throwables.getMessage());
                flag = false;
                delDatasources(datasourceId);
                //
            } finally {
                // 如果连接正常记得关闭
                if (null != connection) {
                    try {
                        connection.close();
                    } catch (SQLException e) {
                        log.error(e.getMessage());
                    }
                }
            }
            if (!flag) {
                createDataSource(dataSource);
            }
        } else {
            createDataSource(dataSource);
        }
    }

    /**
     * @Description: 创建数据源
     * @param dataSource
     */
    public void createDataSource(DataSourceBean dataSource) {
        try {
            Class.forName("com.mysql.jdbc.Driver");
            Connection connection = DriverManager.getConnection(dataSource.getUrl(), dataSource.getUsername(),
                    dataSource.getPassword());
            if (connection == null) {
                log.error("数据源配置有错误,DataSource:" + dataSource);
            } else {
                connection.close();
            }

            DruidDataSource druidDataSource = new DruidDataSource();
            druidDataSource.setName(dataSource.getDataSourceName());
            druidDataSource.setDriverClassName("com.mysql.jdbc.Driver");
            druidDataSource.setUrl(dataSource.getUrl());
            druidDataSource.setUsername(dataSource.getUsername());
            druidDataSource.setPassword(dataSource.getPassword());
            druidDataSource.setMaxActive(3000);
            druidDataSource.setMinIdle(10);
            // 获取连接最大等待时间,单位毫秒
            druidDataSource.setMaxWait(60000);
            String validationQuery = "select 1 from dual";
            // 申请连接时执行validationQuery检测连接是否有效,防止取到的连接不可用
            druidDataSource.setTestOnBorrow(true);
            druidDataSource.setValidationQuery(validationQuery);
            druidDataSource.init();
            this.customDataSources.put(dataSource.getDataSourceName(), druidDataSource);
            // 将map赋值给父类的TargetDataSources
            setTargetDataSources(this.customDataSources);
            // 将TargetDataSources中的连接信息放入resolvedDataSources管理
            super.afterPropertiesSet();

        } catch (Exception e) {
            log.error("数据源创建失败", e);
        }
    }

    /**
     * @Description: 删除数据源
     * @param datasourceId
     */
    private void delDatasources(String datasourceId) {
        Map<Object, Object> map = this.customDataSources;
        Set<DruidDataSource> druidDataSourceInstances = DruidDataSourceStatManager.getDruidDataSourceInstances();
        for (DruidDataSource dataSource : druidDataSourceInstances) {
            if (datasourceId.equals(dataSource.getName())) {
                map.remove(datasourceId);
                // 从实例中移除当前dataSource
                DruidDataSourceStatManager.removeDataSource(dataSource);
                // 将map赋值给父类的TargetDataSources
                setTargetDataSources(map);
                // 将TargetDataSources中的连接信息放入resolvedDataSources管理
                super.afterPropertiesSet();
            }
        }
    }

}

3.创建动态数据源配置类

跟配置静态多数据源一样,需要手动配置下面的三个 Bean,在配置时先初始化一个默认的数据源,在我们无法找到数据源相关的数据信息时就会从默认数据源中找

提示

基础配置只需要手动配置3个bean,要是初始化数据源的时候使用的是sharding-jdbc那么可能需要配置其他参数

3.1基础数据源配置

package com.game.server.source;

import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;

import javax.sql.DataSource;

import org.apache.ibatis.session.SqlSessionFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;

import com.alibaba.druid.pool.DruidDataSource;
import com.baomidou.mybatisplus.extension.spring.MybatisSqlSessionFactoryBean;

@Configuration
public class DruidDBConfig {

    @Value("${spring.datasource.url}")
    private String dbUrl;
    @Value("${spring.datasource.username}")
    private String username;
    @Value("${spring.datasource.password}")
    private String password;
    @Value("${spring.datasource.driver-class-name}")
    private String driverClassName;
    // 连接池连接信息
    @Value("${spring.datasource.druid.initialSize}")
    private int initialSize;
    @Value("${spring.datasource.druid.minIdle}")
    private int minIdle;
    @Value("${spring.datasource.druid.max-active}")
    private int maxActive;
    @Value("${spring.datasource.druid.maxWait}")
    private int maxWait;

    @Bean
    @Primary
    @Qualifier("mainDataSource")
    public DataSource dataSource() throws SQLException {
        DruidDataSource datasource = new DruidDataSource();
        // 基础连接信息
        datasource.setUrl(this.dbUrl);
        datasource.setUsername(username);
        datasource.setPassword(password);
        datasource.setDriverClassName(driverClassName);
        // 连接池连接信息
        datasource.setInitialSize(initialSize);
        datasource.setMinIdle(minIdle);
        datasource.setMaxActive(maxActive);
        datasource.setMaxWait(maxWait);
        // 是否缓存preparedStatement,也就是PSCache。PSCache对支持游标的数据库性能提升巨大,比如说oracle。在mysql下建议关闭。
        datasource.setPoolPreparedStatements(false);
        datasource.setMaxPoolPreparedStatementPerConnectionSize(20);
        // 申请连接时执行validationQuery检测连接是否有效,这里建议配置为TRUE,防止取到的连接不可用
        datasource.setTestOnBorrow(true);
        // 建议配置为true,不影响性能,并且保证安全性。申请连接的时候检测,如果空闲时间大于timeBetweenEvictionRunsMillis,执行validationQuery检测连接是否有效。
        datasource.setTestWhileIdle(true);
        // 用来检测连接是否有效的sql
        datasource.setValidationQuery("select 1 from dual");
        // 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
        datasource.setTimeBetweenEvictionRunsMillis(60000);
        // 配置一个连接在池中最小生存的时间,单位是毫秒,这里配置为3分钟180000
        datasource.setMinEvictableIdleTimeMillis(180000);
        datasource.setKeepAlive(true);
        return datasource;

    }

    @Bean(name = "dynamicDataSource")
    @Qualifier("dynamicDataSource")
    public DynamicRoutingDataSource dynamicDataSource() throws SQLException {
        DynamicRoutingDataSource dynamicDataSource = new DynamicRoutingDataSource();
        // dynamicDataSource.setDebug(false);
        // 配置缺省的数据源
        dynamicDataSource.setDefaultTargetDataSource(dataSource());
        Map<Object, Object> targetDataSources = new HashMap<Object, Object>();
        // 额外数据源配置 TargetDataSources
        targetDataSources.put("mainDataSource", dataSource());
        dynamicDataSource.setTargetDataSources(targetDataSources);
        return dynamicDataSource;
    }

    @Bean
    public SqlSessionFactory sqlSessionFactory() throws Exception {
        // 用mybatis的这里会有点区别,mybatis用的是SqlSessionFactoryBean
        MybatisSqlSessionFactoryBean sqlSessionFactoryBean = new MybatisSqlSessionFactoryBean();
        sqlSessionFactoryBean.setDataSource(dynamicDataSource());

        // 设置设置Mapper文件路径
        sqlSessionFactoryBean
                .setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath:/mapper/*.xml"));

        return sqlSessionFactoryBean.getObject();
    }

    /**
     * @Description: 将动态数据加载类添加到事务管理器
     * @author YZY
     * @date 2021/6/1 9:14
     * @param dataSource
     * @return org.springframework.jdbc.datasource.DataSourceTransactionManager
     */
    @Bean
    public DataSourceTransactionManager transactionManager(DynamicRoutingDataSource dataSource) {
        return new DataSourceTransactionManager(dataSource);
    }

}

如果使用sharding-jdbc作为初始化数据源,则需要做相应的修改

3.2sharding-jdbc数据源配置

package com.game.server.source;

import java.sql.SQLException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import javax.sql.DataSource;

import org.apache.ibatis.session.SqlSessionFactory;
import org.apache.shardingsphere.driver.api.ShardingSphereDataSourceFactory;
import org.apache.shardingsphere.infra.config.RuleConfiguration;
import org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmConfiguration;
import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
import org.apache.shardingsphere.mode.repository.standalone.StandalonePersistRepositoryConfiguration;
import org.apache.shardingsphere.sharding.api.config.ShardingRuleConfiguration;
import org.apache.shardingsphere.sharding.api.config.rule.ShardingTableRuleConfiguration;
import org.apache.shardingsphere.sharding.api.config.strategy.sharding.StandardShardingStrategyConfiguration;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;

import com.alibaba.druid.pool.DruidDataSource;
import com.baomidou.mybatisplus.extension.spring.MybatisSqlSessionFactoryBean;

@Configuration
public class DruidDBConfig {

    @Value("${spring.datasource.url}")
    private String dbUrl;
    @Value("${spring.datasource.username}")
    private String username;
    @Value("${spring.datasource.password}")
    private String password;
    @Value("${spring.datasource.driver-class-name}")
    private String driverClassName;
    // 连接池连接信息
    @Value("${spring.datasource.druid.initialSize}")
    private int initialSize;
    @Value("${spring.datasource.druid.minIdle}")
    private int minIdle;
    @Value("${spring.datasource.druid.max-active}")
    private int maxActive;
    @Value("${spring.datasource.druid.maxWait}")
    private int maxWait;

    @Bean(name = "dynamicDataSource")
    @Qualifier("dynamicDataSource")
    public DynamicRoutingDataSource dynamicDataSource() throws SQLException {
        DynamicRoutingDataSource dynamicDataSource = new DynamicRoutingDataSource();
        // dynamicDataSource.setDebug(false);
        // 配置缺省的数据源
        dynamicDataSource.setDefaultTargetDataSource(shardingSphereDataSource());
        Map<Object, Object> targetDataSources = new HashMap<Object, Object>();
        // 额外数据源配置 TargetDataSources
        targetDataSources.put("mainDataSource", shardingSphereDataSource());
        dynamicDataSource.setTargetDataSources(targetDataSources);
        return dynamicDataSource;
    }

    @Bean
    public SqlSessionFactory sqlSessionFactory() throws Exception {
        // 用mybatis的这里会有点区别,mybatis用的是SqlSessionFactoryBean
        MybatisSqlSessionFactoryBean sqlSessionFactoryBean = new MybatisSqlSessionFactoryBean();
        sqlSessionFactoryBean.setDataSource(dynamicDataSource());

        // 设置设置Mapper文件路径
        sqlSessionFactoryBean
                .setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath:/mapper/*.xml"));

        return sqlSessionFactoryBean.getObject();
    }

    @Bean
    public DataSourceTransactionManager transactionManager(DynamicRoutingDataSource dataSource) {
        return new DataSourceTransactionManager(dataSource);
    }

    @Bean
    @Primary
    @Qualifier("mainDataSource")
    public DataSource shardingSphereDataSource() throws SQLException {
        // ModeConfiguration modeConfig = createModeConfiguration();//构建运行模式
        Map<String, DataSource> dataSourceMap = createDataSources(); // 构建真实数据源
        Collection<RuleConfiguration> ruleConfigs = Arrays.asList(createShardingRuleConfiguration()); // 构建具体规则
        Properties props = new Properties(); // 构建属性配置
        // props.setProperty("sql-show", "true");
        DataSource createDataSource = ShardingSphereDataSourceFactory.createDataSource(dataSourceMap, ruleConfigs,
                props);
        return createDataSource;
    }

    /**
     * 登陆表规则
     *
     * @return
     */
    private ShardingRuleConfiguration createShardingRuleConfiguration() {
        ShardingRuleConfiguration result = new ShardingRuleConfiguration();

        // 设置分片表
        result.getTables().add(getLoginTableRuleConfiguration());
        result.getTables().add(getLogoutTableRuleConfiguration());

        // 设置分片逻辑
        Properties props = new Properties();
        props.setProperty("algorithm-expression", "jj_oversea_aurora_login_${month}");
        result.getShardingAlgorithms().put("table-inline-login",
                new ShardingSphereAlgorithmConfiguration("INLINE", props));

        Properties logoutProps = new Properties();
        logoutProps.setProperty("algorithm-expression", "jj_oversea_aurora_logout_${month}");
        result.getShardingAlgorithms().put("table-inline-logout",
                new ShardingSphereAlgorithmConfiguration("INLINE", logoutProps));
        return result;
    }

    private ShardingTableRuleConfiguration getLoginTableRuleConfiguration() {
        // 设置分片逻辑
        ShardingTableRuleConfiguration result = new ShardingTableRuleConfiguration("jj_oversea_aurora_login",
                "m0.jj_oversea_aurora_login_${1..12}");
        // result.setKeyGenerateStrategy(new
        // KeyGenerateStrategyConfiguration("id", "snowflake"));
        result.setTableShardingStrategy(new StandardShardingStrategyConfiguration("month", "table-inline-login"));
        return result;
    }

    private ShardingTableRuleConfiguration getLogoutTableRuleConfiguration() {
        // 设置分片逻辑
        ShardingTableRuleConfiguration result = new ShardingTableRuleConfiguration("jj_oversea_aurora_logout",
                "m0.jj_oversea_aurora_logout_${1..12}");
        // result.setKeyGenerateStrategy(new
        // KeyGenerateStrategyConfiguration("id", "snowflake"));
        result.setTableShardingStrategy(new StandardShardingStrategyConfiguration("month", "table-inline-logout"));
        return result;
    }

    private Map<String, DataSource> createDataSources() {

        Map<String, DataSource> dataSourceMap = new HashMap<>();
        // 配置第 1 个数据源
        DruidDataSource datasource = new DruidDataSource();

        // 基础连接信息
        datasource.setUrl(dbUrl);
        datasource.setUsername(username);
        datasource.setPassword(password);
        datasource.setDriverClassName(driverClassName);
        // 连接池连接信息
        datasource.setInitialSize(initialSize);
        datasource.setMinIdle(minIdle);
        datasource.setMaxActive(maxActive);
        datasource.setMaxWait(maxWait);
        // 是否缓存preparedStatement,也就是PSCache。PSCache对支持游标的数据库性能提升巨大,比如说oracle。在mysql下建议关闭。
        datasource.setPoolPreparedStatements(false);
        datasource.setMaxPoolPreparedStatementPerConnectionSize(20);
        // 申请连接时执行validationQuery检测连接是否有效,这里建议配置为TRUE,防止取到的连接不可用
        datasource.setTestOnBorrow(true);
        // 建议配置为true,不影响性能,并且保证安全性。申请连接的时候检测,如果空闲时间大于timeBetweenEvictionRunsMillis,执行validationQuery检测连接是否有效。
        datasource.setTestWhileIdle(true);
        // 用来检测连接是否有效的sql
        datasource.setValidationQuery("select 1 from dual");
        // 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
        datasource.setTimeBetweenEvictionRunsMillis(60000);
        // 配置一个连接在池中最小生存的时间,单位是毫秒,这里配置为3分钟180000
        datasource.setMinEvictableIdleTimeMillis(180000);
        datasource.setKeepAlive(true);
        dataSourceMap.put("m0", datasource);

        return dataSourceMap;
    }

    @SuppressWarnings("unused")
    private ModeConfiguration createModeConfiguration() {
        return new ModeConfiguration("Standalone",
                new StandalonePersistRepositoryConfiguration("JDBC", new Properties()), true);
    }

}

与基础数据源相比多了shard-jdbc的配置,需要参考官方文档,通过代码进行数据源配置,如果只是为了分库分表,可以参考数据库中间件

这样基础配置已经完成,接下来在代码中使用就很方便

动态数据源使用


这个是结合数据哭使用的主要逻辑代码


//数据库存储,可自行配置
@Autowired
private DataSourceBeanMapper dataSourceBeanMapper;
//动态数据源
@Autowired
private DynamicRoutingDataSource dynamicRoutingDataSource;

@Override
public boolean changeDS(String datasourceId) {
    // 切到默认数据源
    DataSourceContextHolder.removeDataSource();
    // 找到所有的配置
    List<DataSourceBean> dataSourceList = dataSourceBeanMapper.selectList(null);

    if (!CollectionUtils.isEmpty(dataSourceList)) {
        for (DataSourceBean dataSource : dataSourceList) {
            if (dataSource.getDataSourceName().equals(datasourceId)) {
                System.out.println("已找到数据源,datasourceId是:" + dataSource.getDataSourceName());
                // 判断连接是否存在,不存在就创建
                dynamicRoutingDataSource.checkCreateDataSource(dataSource);
                // 切换数据源
                DataSourceContextHolder.setDataSource(dataSource.getDataSourceName());
                return true;
            }
        }
    }
    return false;
}

在使用的时候我们直接通过下面代码即可


private final static String DATA_SOURCE_NAME = "DATA_SOURCE_NAME";


// 切换数据源
dataSourceServiceImpl.changeDS(DATA_SOURCE_NAME);
//sql查询(换成自己的查询sql)
List<BaseConfig> baseConfigs = baseConfigMapper.selectList(null);
// 切回主数据源
DataSourceContextHolder.removeDataSource();

这样我们就可以在不同的方法中使用不同的线程数据源了,我们甚至可以将数据源标志通过接口参数传递,完成真正的动态数据源

上次编辑于:
贡献者: Jingxc