MyBatis 源码阅读之数据库连接

MyBatis 源码阅读之数据库连接

MyBatis 的配置文件所有配置会被 org.apache.ibatis.builder.xml.XMLConfigBuilder 类读取,
我们可以通过此类来了解各个配置是如何运作的。
而 MyBatis 的映射文件配置会被 org.apache.ibatis.builder.xml.XMLMapperBuilder 类读取。
我们可以通过此类来了解映射文件的配置时如何被解析的。

本文探讨 事务管理器数据源 相关代码

配置

environment

以下是 mybatis 配置文件中 environments 节点的一般配置。

<!-- mybatis-config.xml -->
<environments default="development">
    <environment id="development">
        <transactionManager type="JDBC">
            <property name="..." value="..."/>
        </transactionManager>
        <dataSource type="POOLED">
            <property name="driver" value="${driver}"/>
            <property name="url" value="${url}"/>
            <property name="username" value="${username}"/>
            <property name="password" value="${password}"/>
        </dataSource>
    </environment>
</environments>

environments 节点的加载也不算复杂,它只会加载 id 为 development 属性值的 environment 节点。
它的加载代码在 XMLConfigBuilder 类的 environmentsElement() 方法中,代码不多,逻辑也简单,此处不多讲。

TransactionManager

接下来我们看看 environment 节点下的子节点。transactionManager 节点的 type 值默认提供有 JDBCMANAGED ,dataSource 节点的 type 值默认提供有 JNDIPOOLEDUNPOOLED
它们对应的类都可以在 Configuration 类的构造器中找到,当然下面我们也一个一个来分析。

现在我们大概了解了配置,然后来分析这些配置与 MyBatis 类的关系。

TransactionFactory

transactionManager 节点对应 TransactionFactory 接口,使用了 抽象工厂模式 。MyBatis 给我们提供了两个实现类:ManagedTransactionFactoryJdbcTransactionFactory ,它们分别对应者 type 属性值为 MANAGED 和 JDBC 。

TransactionFactory 有三个方法,我们需要注意的方法只有 newTransaction() ,它用来创建一个事务对象。

void setProperties(Properties props);

Transaction newTransaction(Connection conn);

Transaction newTransaction(DataSource dataSource, TransactionIsolationLevel level, boolean autoCommit);

其中 JdbcTransactionFactory 创建的事务对象是 JdbcTransaction 的实例,该实例是对 JDBC 事务的简单封装,实例中 ConnectionDataSource 对象正是事务所在的 连接数据源
TransactionIsolationLevel 代表当前事务的隔离等级,它是一个枚举类,简单明了无需多言。而 autoCommit 表示是否开启了自动提交,开启了,则没有事务的提交和回滚等操作的意义了。

ManagedTransactionFactory 创建的事务对象是 ManagedTransaction 的实例,它本身并不控制事务,即 commitrollback 都是不做任何操作,而是交由 JavaEE 容器来控制事务,以方便集成。

DataSourceFactory

DataSourceFactory 是获取数据源的接口,也使用了 抽象工厂模式 ,代码如下,方法极为简单:

public interface DataSourceFactory {

    /**
     * 可传入一些属性配置
     */
    void setProperties(Properties props);

    DataSource getDataSource();
}

MyBatis 默认支持三种数据源,分别是 UNPOOLEDPOOLEDJNDI 。对应三个工厂类:
UnpooledDataSourceFactoryPooledDataSourceFactoryJNDIDataSourceFactory

其中 JNDIDataSourceFactory 是使用 JNDI 来获取数据源。我们很少使用,并且代码不是非常复杂,此处不讨论。我们先来看看 UnpooledDataSourceFactory

public class UnpooledDataSourceFactory implements DataSourceFactory {

    private static final String DRIVER_PROPERTY_PREFIX = "driver.";
    private static final int DRIVER_PROPERTY_PREFIX_LENGTH = DRIVER_PROPERTY_PREFIX.length();

    protected DataSource dataSource;

    public UnpooledDataSourceFactory() {
        this.dataSource = new UnpooledDataSource();
    }

    @Override
    public void setProperties(Properties properties) {
        Properties driverProperties = new Properties();
        // MetaObject 用于解析实例对象的元信息,如字段的信息、方法的信息
        MetaObject metaDataSource = SystemMetaObject.forObject(dataSource);
        // 解析所有配置的键值对key-value,发现非预期的属性立即抛异常,以便及时发现
        for (Object key : properties.keySet()) {
            String propertyName = (String) key;
            if (propertyName.startsWith(DRIVER_PROPERTY_PREFIX)) {
                // 添加驱动的配置属性
                String value = properties.getProperty(propertyName);
                driverProperties.setProperty(propertyName.substring(DRIVER_PROPERTY_PREFIX_LENGTH), value);
            } else if (metaDataSource.hasSetter(propertyName)) {
                // 为数据源添加配置属性
                String value = (String) properties.get(propertyName);
                Object convertedValue = convertValue(metaDataSource, propertyName, value);
                metaDataSource.setValue(propertyName, convertedValue);
            } else {
                throw new DataSourceException("Unknown DataSource property: " + propertyName);
            }
        }
        if (driverProperties.size() > 0) {
            metaDataSource.setValue("driverProperties", driverProperties);
        }
    }

    @Override
    public DataSource getDataSource() {
        return dataSource;
    }

    /**
     * 将 String 类型的值转为目标对象字段的类型的值
     */
    private Object convertValue(MetaObject metaDataSource, String propertyName, String value) {
        Object convertedValue = value;
        Class<?> targetType = metaDataSource.getSetterType(propertyName);
        if (targetType == Integer.class || targetType == int.class) {
            convertedValue = Integer.valueOf(value);
        } else if (targetType == Long.class || targetType == long.class) {
            convertedValue = Long.valueOf(value);
        } else if (targetType == Boolean.class || targetType == boolean.class) {
            convertedValue = Boolean.valueOf(value);
        }
        return convertedValue;
    }
}

虽然代码看起来复杂,实际上非常简单,在创建工厂实例时创建它对应的 UnpooledDataSource 数据源。
setProperties() 方法用于给数据源添加部分属性配置,convertValue() 方式时一个私有方法,就是处理 当 DataSource 的属性为整型或布尔类型时提供对字符串类型的转换功能而已。

最后我们看看 PooledDataSourceFactory ,这个类非常简单,仅仅是继承了 UnpooledDataSourceFactory ,然后构造方法替换数据源为 PooledDataSource

public class PooledDataSourceFactory extends UnpooledDataSourceFactory {

  public PooledDataSourceFactory() {
    this.dataSource = new PooledDataSource();
  }
}

虽然它的代码极少,实际上都在 PooledDataSource 类中。

DataSource

看完了工厂类,我们来看看 MyBatis 提供的两种数据源类: UnpooledDataSourcePooledDataSource

UnpooledDataSource

UnpooledDataSource 看名字就知道是没有池化的特征,相对也简单点,以下代码省略一些不重要的方法

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;

public class UnpooledDataSource implements DataSource {

    private ClassLoader driverClassLoader;
    private Properties driverProperties;
    private static Map<String, Driver> registeredDrivers = new ConcurrentHashMap<String, Driver>();

    private String driver;
    private String url;
    private String username;
    private String password;

    private Boolean autoCommit;

    // 事务隔离级别
    private Integer defaultTransactionIsolationLevel;

    static {
        // 遍历所有可用驱动
        Enumeration<Driver> drivers = DriverManager.getDrivers();
        while (drivers.hasMoreElements()) {
            Driver driver = drivers.nextElement();
            registeredDrivers.put(driver.getClass().getName(), driver);
        }
    }

    // ......

    private Connection doGetConnection(Properties properties) throws SQLException {
        // 每次获取连接都会检测驱动
        initializeDriver();
        Connection connection = DriverManager.getConnection(url, properties);
        configureConnection(connection);
        return connection;
    }

    /**
     * 初始化驱动,这是一个 同步 方法
     */
    private synchronized void initializeDriver() throws SQLException {
        // 如果不包含驱动,则准备添加驱动
        if (!registeredDrivers.containsKey(driver)) {
            Class<?> driverType;
            try {
                // 加载驱动
                if (driverClassLoader != null) {
                    driverType = Class.forName(driver, true, driverClassLoader);
                } else {
                    driverType = Resources.classForName(driver);
                }
                Driver driverInstance = (Driver)driverType.newInstance();
                // 注册驱动代理到 DriverManager
                DriverManager.registerDriver(new DriverProxy(driverInstance));
                // 缓存驱动
                registeredDrivers.put(driver, driverInstance);
            } catch (Exception e) {
                throw new SQLException("Error setting driver on UnpooledDataSource. Cause: " + e);
            }
        }
    }

    private void configureConnection(Connection conn) throws SQLException {
        // 设置是否自动提交事务
        if (autoCommit != null && autoCommit != conn.getAutoCommit()) {
            conn.setAutoCommit(autoCommit);
        }
        // 设置 事务隔离级别
        if (defaultTransactionIsolationLevel != null) {
            conn.setTransactionIsolation(defaultTransactionIsolationLevel);
        }
    }

    private static class DriverProxy implements Driver {
        private Driver driver;

        DriverProxy(Driver d) {
            this.driver = d;
        }

        /**
         * Driver 仅在 JDK7 中定义了本方法,用于返回本驱动的所有日志记录器的父记录器
         * 个人也不是十分明确它的用法,毕竟很少会关注驱动的日志
         */
        public Logger getParentLogger() {
            return Logger.getLogger(Logger.GLOBAL_LOGGER_NAME);
        }

        // 其他方法均为调用 driver 对应的方法,此处省略
    }
}

这里 DriverProxy 仅被注册到 DriverManager 中,这是一个代理操作,但源码上并没有什么特别的处理代码,我也不懂官方为什么在这里加代理,有谁明白的可以留言相互讨论。这里的其他方法也不是非常复杂,我都已经标有注释,应该都可以看懂,不再细说。

以上便是 UnpooledDataSource 的初始化驱动和获取连接关键代码。

PooledDataSource

接下来我们来看最后一个类 PooledDataSource ,它也是直接实现 DataSource ,不过因为拥有池化的特性,它的代码复杂不少,当然效率比 UnpooledDataSource 会高出不少。

PooledDataSource 通过两个辅助类 PoolStatePooledConnection 来完成池化功能。
PoolState 是记录连接池运行时的状态,定义了两个 PooledConnection 集合用于记录空闲连接和活跃连接。
PooledConnection 内部定义了两个 Connection 分别表示一个真实连接和代理连接,还有一些其他字段用于记录一个连接的运行时状态。

先来详细了解一下 PooledConnection

/**
 * 此处使用默认的访问权限
 * 实现了 InvocationHandler
 */
class PooledConnection implements InvocationHandler {

    private static final String CLOSE = "close";
    private static final Class<?>[] IFACES = new Class<?>[] { Connection.class };

    /** hashCode() 方法返回 */
    private final int hashCode;

    private final Connection realConnection;

    private final Connection proxyConnection;

    // 省略 checkoutTimestamp、createdTimestamp、lastUsedTimestamp
    private boolean valid;

    /*
     * Constructor for SimplePooledConnection that uses the Connection and PooledDataSource passed in
     *
     * @param connection - the connection that is to be presented as a pooled connection
     * @param dataSource - the dataSource that the connection is from
     */
    public PooledConnection(Connection connection, PooledDataSource dataSource) {
        this.hashCode = connection.hashCode();
        this.realConnection = connection;
        this.dataSource = dataSource;
        this.createdTimestamp = System.currentTimeMillis();
        this.lastUsedTimestamp = System.currentTimeMillis();
        this.valid = true;
        this.proxyConnection = (Connection) Proxy.newProxyInstance(Connection.class.getClassLoader(), IFACES, this);
    }

    /*
     * 设置连接状态为不正常,不可使用
     */
    public void invalidate() {
        valid = false;
    }

    /*
     * 查看连接是否可用
     *
     * @return 如果可用则返回 true
     */
    public boolean isValid() {
        return valid && realConnection != null && dataSource.pingConnection(this);
    }

    /**
     * 自动上一次使用后经过的时间
     */
    public long getTimeElapsedSinceLastUse() {
        return System.currentTimeMillis() - lastUsedTimestamp;
    }

    /**
     * 存活时间
     */
    public long getAge() {
        return System.currentTimeMillis() - createdTimestamp;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        String methodName = method.getName();
        if (CLOSE.hashCode() == methodName.hashCode() && CLOSE.equals(methodName)) {
            // 对于 close() 方法,将连接放回池中
            dataSource.pushConnection(this);
            return null;
        } else {
            try {
                if (!Object.class.equals(method.getDeclaringClass())) {
                    checkConnection();
                }
                return method.invoke(realConnection, args);
            } catch (Throwable t) {
                throw ExceptionUtil.unwrapThrowable(t);
            }
        }
    }

    private void checkConnection() throws SQLException {
        if (!valid) {
            throw new SQLException("Error accessing PooledConnection. Connection is invalid.");
        }
    }
}

本类实现了 InvocationHandler 接口,这个接口是用于 JDK 动态代理的,在这个类的构造器中 proxyConnection 就是创建了此代理对象。
来看看 invoke() 方法,它拦截了 close() 方法,不再关闭连接,而是将其继续放入池中,然后其他已实现的方法则是每次调用都需要检测连接是否合法。

PoolState 类,这个类实际上没什么可说的,都是一些统计字段,没有复杂逻辑,不讨论; 需要注意该类是针对一个 PooledDataSource 对象统计的
也就是说 PoolState 的统计字段是关于整个数据源的,而一个 PooledConnection 则是针对单个连接的。

最后我们回过头来看 PooledDataSource 类,数据源的操作就只有两个,获取连接,释放连接,先来看看获取连接

public class PooledDataSource implements DataSource {

    private final UnpooledDataSource dataSource;

    @Override
    public Connection getConnection() throws SQLException {
        return popConnection(dataSource.getUsername(), dataSource.getPassword()).getProxyConnection();
    }

    @Override
    public Connection getConnection(String username, String password) throws SQLException {
        return popConnection(username, password).getProxyConnection();
    }

    /**
     * 获取一个连接
     */
    private PooledConnection popConnection(String username, String password) throws SQLException {
        boolean countedWait = false;
        PooledConnection conn = null;
        long t = System.currentTimeMillis();
        int localBadConnectionCount = 0;

        // conn == null 也可能是没有获得连接,被通知后再次走流程
        while (conn == null) {
            synchronized (state) {
                // 是否存在空闲连接
                if (!state.idleConnections.isEmpty()) {
                    // 池里存在空闲连接
                    conn = state.idleConnections.remove(0);
                } else {
                    // 池里不存在空闲连接
                    if (state.activeConnections.size() < poolMaximumActiveConnections) {
                        // 池里的激活连接数小于最大数,创建一个新的
                        conn = new PooledConnection(dataSource.getConnection(), this);
                    } else {
                        // 最坏的情况,无法获取连接

                        // 检测最早使用的连接是否超时
                        PooledConnection oldestActiveConnection = state.activeConnections.get(0);
                        long longestCheckoutTime = oldestActiveConnection.getCheckoutTime();
                        if (longestCheckoutTime > poolMaximumCheckoutTime) {
                            // 使用超时连接,对超时连接的操作进行回滚
                            state.claimedOverdueConnectionCount++;
                            state.accumulatedCheckoutTimeOfOverdueConnections += longestCheckoutTime;
                            state.accumulatedCheckoutTime += longestCheckoutTime;
                            state.activeConnections.remove(oldestActiveConnection);
                            if (!oldestActiveConnection.getRealConnection().getAutoCommit()) {
                                try {
                                    oldestActiveConnection.getRealConnection().rollback();
                                } catch (SQLException e) {
                                    /*
                                     * Just log a message for debug and continue to execute the following statement
                                     * like nothing happened. Wrap the bad connection with a new PooledConnection,
                                     * this will help to not interrupt current executing thread and give current
                                     * thread a chance to join the next competition for another valid/good database
                                     * connection. At the end of this loop, bad {@link @conn} will be set as null.
                                     */
                                    log.debug("Bad connection. Could not roll back");
                                }
                            }
                            conn = new PooledConnection(oldestActiveConnection.getRealConnection(), this);
                            conn.setCreatedTimestamp(oldestActiveConnection.getCreatedTimestamp());
                            conn.setLastUsedTimestamp(oldestActiveConnection.getLastUsedTimestamp());
                            oldestActiveConnection.invalidate();
                        } else {
                            // 等待可用连接
                            try {
                                if (!countedWait) {
                                    state.hadToWaitCount++;
                                    countedWait = true;
                                }
                                long wt = System.currentTimeMillis();
                                state.wait(poolTimeToWait);
                                state.accumulatedWaitTime += System.currentTimeMillis() - wt;
                            } catch (InterruptedException e) {
                                break;
                            }
                        }
                    }
                }
                // 已获取连接
                if (conn != null) {
                    // 检测连接是否可用
                    if (conn.isValid()) {
                        // 对之前的操作回滚
                        if (!conn.getRealConnection().getAutoCommit()) {
                            conn.getRealConnection().rollback();
                        }
                        conn.setConnectionTypeCode(assembleConnectionTypeCode(dataSource.getUrl(), username, password));
                        conn.setCheckoutTimestamp(System.currentTimeMillis());
                        conn.setLastUsedTimestamp(System.currentTimeMillis());
                        // 激活连接池数+1
                        state.activeConnections.add(conn);
                        state.requestCount++;
                        state.accumulatedRequestTime += System.currentTimeMillis() - t;
                    } else {
                        // 连接坏掉了,超过一定阈值则抛异常提醒
                        state.badConnectionCount++;
                        localBadConnectionCount++;
                        conn = null;
                        if (localBadConnectionCount > (poolMaximumIdleConnections
                                + poolMaximumLocalBadConnectionTolerance)) {
                            // 省略日志
                            throw new SQLException(
                                    "PooledDataSource: Could not get a good connection to the database.");
                        }
                    }
                }
            }

        }

        if (conn == null) {
            // 省略日志
            throw new SQLException(
                    "PooledDataSource: Unknown severe error condition.  The connection pool returned a null connection.");
        }

        return conn;
    }
}

上面的代码都已经加了注释,总体流程不算复杂:

  1. while => 连接为空

    1. 能否直接从池里拿连接 => 可以则获取连接并返回
    2. 不能,查看池里的连接是否没满 => 没满则创建一个连接并返回
    3. 满了,查看池里最早的连接是否超时 => 超时则强制该连接回滚,然后获取该连接并返回
    4. 未超时,等待连接可用
  2. 检测连接是否可用

释放连接操作,更为简单,判断更少

protected void pushConnection(PooledConnection conn) throws SQLException {
    // 同步操作
    synchronized (state) {
        // 从活动池中移除连接
        state.activeConnections.remove(conn);
        if (conn.isValid()) {
            // 不超过空闲连接数 并且连接是同一类型的连接
            if (state.idleConnections.size() < poolMaximumIdleConnections
                    && conn.getConnectionTypeCode() == expectedConnectionTypeCode) {
                state.accumulatedCheckoutTime += conn.getCheckoutTime();
                if (!conn.getRealConnection().getAutoCommit()) {
                    conn.getRealConnection().rollback();
                }
                // 废弃原先的对象
                PooledConnection newConn = new PooledConnection(conn.getRealConnection(), this);
                state.idleConnections.add(newConn);
                newConn.setCreatedTimestamp(conn.getCreatedTimestamp());
                newConn.setLastUsedTimestamp(conn.getLastUsedTimestamp());
                // 该对象已经不能用于连接了
                conn.invalidate();
                if (log.isDebugEnabled()) {
                    log.debug("Returned connection " + newConn.getRealHashCode() + " to pool.");
                }
                state.notifyAll();
            } else {
                state.accumulatedCheckoutTime += conn.getCheckoutTime();
                if (!conn.getRealConnection().getAutoCommit()) {
                    conn.getRealConnection().rollback();
                }
                // 关闭连接
                conn.getRealConnection().close();
                if (log.isDebugEnabled()) {
                    log.debug("Closed connection " + conn.getRealHashCode() + ".");
                }
                conn.invalidate();
            }
        } else {
            if (log.isDebugEnabled()) {
                log.debug("A bad connection (" + conn.getRealHashCode()
                        + ") attempted to return to the pool, discarding connection.");
            }
            state.badConnectionCount++;
        }
    }
}

部分码注释已添加,这里就说一下总体流程:

  1. 从活动池中移除连接
  2. 如果该连接可用

    1. 连接池未满,则连接放回池中
    2. 满了,回滚,关闭连接

总体流程大概就是这样

以下还有两个方法代码较多,但逻辑都很简单,稍微说明一下:

  • pingConnection() 执行一条 SQL 检测连接是否可用。
  • forceCloseAll() 回滚并关闭激活连接池和空闲连接池中的连接

相关推荐