redis连接池源码分析

  张一帆   2021年05月15日


  我们在开发的时候经常会用到redis。java中首选的redis客户端是jedis。jedis封装了redis的所有功能,并且提供了更多额外好用的功能。

  你在开发时,可以通过jedis连接redis。jedis支持直连模式和连接池模式。

  • 直连模式:
 public static void main(String[] arg){
        //1个实例
        Jedis jedis = new Jedis("127.0.0.1",6379,100);
        jedis.incr("threadSafe");
        jedis.close();
    }

直连

  你不应该将一个redis实例共享给不同线程,时用时消的用法会造成产生很多的socket和tcp连接,即浪费时间又耗费资源,也提高了服务不可用的风险。如果你只是在本机环境或者使用者相对少的环境中使用,这种模式比较适宜。但是,如果你需要大量使用redis时,应该采用连接池模式。

  • 连接池模式:
    public static void main(String[] arg){
        JedisPool pool = new JedisPool(new JedisPoolConfig(),"localhost");
        try (Jedis jedis = pool.getResource()) {
            jedis.set("foo", "bar");
            String foobar = jedis.get("foo");
            jedis.zadd("sose", 0, "car"); jedis.zadd("sose", 0, "bike"); 
            Set<String> sose = jedis.zrange("sose", 0, -1);
        }
        pool.close();
    }

连接池

  为了避免由单一redis实例引发的未知的服务不可用的风险,我们应该采用这种方案。这种方案背后的原理就是连接池(common-pool2)。当服务到达时服务会从连接池中borrow一个jedis连接,当用完或者发生错误时,连接会归还到连接池中。如果没有borrow到连接,那么服务就会报错且关闭。

  我的上一篇池化技术已经提到过连接池的概念,所以这篇文章是我想记录并分享我所学习到的jedis连接池的知识。通过上一段代码我们看到当jedisPool实例化以后,我们从pool中通过getResource来获取一个jedis实例。那么我们的代码研究就从这里开始。

    //jedis-3.6.0.jar/redis/clients/jedis/jedisPool.class
    public Jedis getResource() {
        //调用父集getResource
        Jedis jedis = (Jedis)super.getResource();
        //将当前实例暴露出dataSource,供后续redis操作使用
        jedis.setDataSource(this);
        return jedis;
    }
    //jedis-3.6.0.jar/redis/clients/jedis/client/util/Pool.class
    //genericObjectPool是ObjectPool的实现,ObjectPool是common.pool2中关于池化对象的接口。其中定义了几个标准的对象方法,这些方法就是管理池的核心方法。
    protected GenericObjectPool<T> internalPool;
    ……
    public T getResource() {
        try {
            //通过调用common.pool2中的borrowObject,完成对jedis实例的借取。
            return this.internalPool.borrowObject();
            //如果出现了异常了,按照异常的分类进行处理。
        } catch (NoSuchElementException var2) {
            if (null == var2.getCause()) {
                throw new JedisExhaustedPoolException("Could not get a resource since the pool is exhausted", var2);
            } else {
                throw new JedisException("Could not get a resource from the pool", var2);
            }
        } catch (Exception var3) {
            throw new JedisConnectionException("Could not get a resource from the pool", var3);
        }
    }

  在common-pool2中,对象池的核心接口叫做ObjectPool,他定义了对象池的实现的行为。

  1. addObject方法:往池中添加一个对象。池子里的所有对象都是通过这个方法进来的。
  2. borrowObject方法:从池中借走到一个对象。借走不等于删除。对象一直都属于池子,只是状态的变化。
  3. returnObject方法:把对象归还给对象池。归还不等于添加。对象一直都属于池子,只是状态的变化。
  4. invalidateObject:销毁一个对象。这个方法才会将对象从池子中删除,当然这其中最重要的就是释放对象本身持有的各种资源。
  5. getNumIdle:返回对象池中有多少对象是空闲的,也就是能够被借走的对象的数量。
  6. getNumActive:返回对象池中有对象对象是活跃的,也就是已经被借走的,在使用中的对象的数量。
  7. clear:清理对象池。注意是清理不是清空,改方法要求的是,清理所有空闲对象,释放相关资源。
  8. close:关闭对象池。这个方法可以达到清空的效果,清理所有对象以及相关资源。

  在common-pool2中,objectPool的核心实现类就是GenericObjectPool。

    //commons-pool2.jar/org/apache/commons/pool2/impl/GenericObjectedPool
    @Override
    public T borrowObject() throws Exception {
        //getMaxWaitMillis()是BaseGenericObjectPool中设定的volatile类型的值,代表最长等待时间(毫秒),配置文件中的"maxWait"
        return borrowObject(getMaxWaitMillis());
    }

  接下来我们来分析下borrowObject方法,刚才说过borrowObject是实现了ObjectPool。那么先看一下这个接口中对borrowObject的描述。

    /**
     * Obtains an instance from this pool.
     * <p>
     * Instances returned from this method will have been either newly created
     * with {@link PooledObjectFactory#makeObject} or will be a previously
     * idle object and have been activated with
     * {@link PooledObjectFactory#activateObject} and then validated with
     * {@link PooledObjectFactory#validateObject}.
     * </p>
     * <p>
     * By contract, clients <strong>must</strong> return the borrowed instance
     * using {@link #returnObject}, {@link #invalidateObject}, or a related
     * method as defined in an implementation or sub-interface.
     * </p>
     * <p>
     * The behavior of this method when the pool has been exhausted
     * is not strictly specified (although it may be specified by
     * implementations).
     * </p>
     *
     * @return an instance from this pool.
     *
     * @throws IllegalStateException
     *              after {@link #close close} has been called on this pool.
     * @throws Exception
     *              when {@link PooledObjectFactory#makeObject} throws an
     *              exception.
     * @throws NoSuchElementException
     *              when the pool is exhausted and cannot or will not return
     *              another instance.
     */
     /*
     这个method返回实例,这个实例将是一个被makeObject()创建的对象,或者是一个之前就是idle并且经过activateObject()激活过的并且经过validatedObject()验证过的对象。
     根据约定,客户端必须调用过returenObject(),invalidateObject()或者一个在子类中实现了归还逻辑的方法后归还。
     这个方法在池子耗尽时的表现没有指明具体如何处理(尽管他有可能被他的实现制定过。)
     */
    T borrowObject() throws Exception, NoSuchElementException,
            IllegalStateException;

  接着我们再来看看这个方法的具体实现。

    //commons-pool2.jar/org/apache/commons/pool2/impl/GenericObjectPool.class
    /**
     * Equivalent to <code>{@link #borrowObject(long)
     * borrowObject}({@link #getMaxWaitMillis()})</code>.
     * <p>
     * {@inheritDoc}
     * </p>
     */
     //通过获取配置中MaxWait配置,当做传输调用重载的方法。
    @Override
    public T borrowObject() throws Exception {
        return borrowObject(getMaxWaitMillis());
    }
    
    /**
     * Borrows an object from the pool using the specific waiting time which only
     * applies if {@link #getBlockWhenExhausted()} is true.
     * <p>
     * If there is one or more idle instance available in the pool, then an
     * idle instance will be selected based on the value of {@link #getLifo()},
     * activated and returned. If activation fails, or {@link #getTestOnBorrow()
     * testOnBorrow} is set to {@code true} and validation fails, the
     * instance is destroyed and the next available instance is examined. This
     * continues until either a valid instance is returned or there are no more
     * idle instances available.
     * </p>
     * <p>
     * If there are no idle instances available in the pool, behavior depends on
     * the {@link #getMaxTotal() maxTotal}, (if applicable)
     * {@link #getBlockWhenExhausted()} and the value passed in to the
     * {@code borrowMaxWaitMillis} parameter. If the number of instances
     * checked out from the pool is less than {@code maxTotal,} a new
     * instance is created, activated and (if applicable) validated and returned
     * to the caller. If validation fails, a {@code NoSuchElementException}
     * is thrown.
     * </p>
     * <p>
     * If the pool is exhausted (no available idle instances and no capacity to
     * create new ones), this method will either block (if
     * {@link #getBlockWhenExhausted()} is true) or throw a
     * {@code NoSuchElementException} (if
     * {@link #getBlockWhenExhausted()} is false). The length of time that this
     * method will block when {@link #getBlockWhenExhausted()} is true is
     * determined by the value passed in to the {@code borrowMaxWaitMillis}
     * parameter.
     * </p>
     * <p>
     * When the pool is exhausted, multiple calling threads may be
     * simultaneously blocked waiting for instances to become available. A
     * "fairness" algorithm has been implemented to ensure that threads receive
     * available instances in request arrival order.
     * </p>
     *
     * @param borrowMaxWaitMillis The time to wait in milliseconds for an object
     *                            to become available
     *
     * @return object instance from the pool
     *
     * @throws NoSuchElementException if an instance cannot be returned
     *
     * @throws Exception if an object instance cannot be returned due to an
     *                   error
     */
     /*
     当getBlockWhenExhausted()返回true时,需要提供等待时间从对象池中借出对象,
     如果有1个或多个空闲实例的话,就通过getLifo()方法,也就是先进先出的策略选择由哪个空闲实例承接工作。如果激活失败了,或者testOnBorrow设置为true并且检测失败了,这个实例就会销毁,接下来由下一个可用实例进行承接。这个流程一直持续,直到没有可用的实例可供使用或者没有空闲实例可供使用。
     如果在对象池中没有空闲实例了,接下来的走向取决于maxTotal,getBlockWhenExhausted()和传进来的等待时间参数决定。如果当前对象池中的实力数量小于maxtotal,一个新的实例将被创建,激活并且检测最终返回给调用者。如果检测失败,一个NoSuchElementException异常将被抛出。
     如果对象池已经占满了(也就是说没有可用的空闲实例并且没有容量可以被创建),这个方法要不堵塞(如果getBlockWhenExhausted设置为true的情况下才会堵塞)要不就抛出一个NoSuchElementException(相反getBlockWhenExhausted为false的情况)异常。具体阻塞的时间跟传入的borrowMaxWaitMillis时间大小有关。(还记得刚才说的maxWait的设置么?)
     当对象池已经耗尽,多个调用线程将被同时堵塞,他们一起等待可用的jedis实例接客。接客的规则采用公平算法。其实就是先到先被接~哈哈
     */
    public T borrowObject(final long borrowMaxWaitMillis) throws Exception {
        //这一步就是检查一下对象池的状态,如果之前已经关闭了,就会抛出IllegalStateException异常。
        assertOpen();
        //移除已经被抛弃的实例
        final AbandonedConfig ac = this.abandonedConfig;
        if (ac != null && ac.getRemoveAbandonedOnBorrow() &&
                (getNumIdle() < 2) &&
                (getNumActive() > getMaxTotal() - 3) ) {
            /*
            被抛弃的实例不为null
            borrow时去除被抛弃的实例
            空闲线程实例<2
            当前活动的实例数量大于最大实例数量-3
            */
            removeAbandoned(ac);
        }
        //定义一个PooledObject类型的对象,T是真正的实例对象。如jedis,db等。
        PooledObject<T> p = null;

        // Get local copy of current config so it is consistent for entire
        // method execution
        /*
        获取一个本地的拷贝,这个拷贝是目前的配置。这样就能够保证整体的一致性。
        这个getBlockWhenExhausted()最终是获取的一个volatile类型的值,这个值是从配置中获取的,代表当线程池满时,是否对新来的任务给予堵塞。但是,我们得注意volatile这个类型了,而且设计者在这个地方单独获取出来这个值,肯定是为了什么。接着往下看吧。
        */
        final boolean blockWhenExhausted = getBlockWhenExhausted();
        //是否为新建,请区别于created。为什么仅在这里定义了变量而没有初始化呢?
        boolean create;
        //当前的毫秒,用来就算
        final long waitTime = System.currentTimeMillis();
        //循环开始,第一次p=null开始循环
        while (p == null) {
            //初始化create=false,代表目前这个线程不是新建的。
            create = false;
            /*
            private final LinkedBlockingDeque<PooledObject<T>> idleObjects;
            idleObjects是一个阻塞有序队列,队列的类型是刚才的封装类型,封装的是实际的资源。从名字上可以看出,这里叫做空闲对象组
            可以看到,从空闲对象组里拿出第一个元素。pollFirst()是LinkedBlockingDeque.java中的方法。就是通过操作节点来把第一个节点删除并返回。
            */
            p = idleObjects.pollFirst();
            //如果p为空代表弹出的是null,说明没有空闲。那就创建一个新的对象。
            if (p == null) {
                p = create();
                //p可能会创建失败,创建失败的时候 create=false。 创建成功create=true
                if (p != null) {
                    create = true;
                }
            }
            //如果设置了对象池耗尽后堵塞等待的标示
            if (blockWhenExhausted) {
                if (p == null) {
                    //p没有创建成功
                    if (borrowMaxWaitMillis < 0) {
                        //如果没有设置为-1,只有这种情况会符合小于0.所以就直接从空闲对象组中获取第一个有效的空闲对象。这个方法有锁的实现,所以这块可能会一直堵塞着直到有一个空闲对象可以使用。
                        p = idleObjects.takeFirst();
                    } else {
                    //如果设置等待时间了,就在这段时间里获取。超过了就返回null
                        p = idleObjects.pollFirst(borrowMaxWaitMillis,
                                TimeUnit.MILLISECONDS);
                    }
                }
                //到这里如果还是空,就说明没等待到空闲的资源了。所以就要报异常了。
                if (p == null) {
                    throw new NoSuchElementException(
                            "Timeout waiting for idle object");
                }
            } else {
                //如果没有设置blockWhenExhausted标示且p是null,那真是获取不到资源了,因为池子已经被占满了。
                if (p == null) {
                    throw new NoSuchElementException("Pool exhausted");
                }
            }
            /*
            到这里其实p应该不是null了,说明肯定是有资源了。有可能是获得了空闲资源或是创建了一个新的资源。这个时候就要让系统给他分配内存了。跟进allocate()代码中,发现这是一个synchronized方法,保证了线程安全。(这块会有线程安全问题。为什么呢?)方法体判断了当前池状态(空闲状态或者测试中状态)然后更改状态位,置lastBorrowTime,lastUseTime,borrowedCount++等,然后返回boolean。
            我们可以推算出,当前的池资源状态可能会被另一个线程改写。有一种情况就是,空闲的连接还需要进行test和validate才能够正确分配。但是如果在test或validate阶段失败了,那线程就不能够被分配资源了,因为前后状态不一致了。所以需要allocate的同步操作来保证当时的空闲资源不被改写。
            */
            if (!p.allocate()) {
                //没有成功分配资源,当然p就应该置为null,等待重新进行遍历。
                p = null;
            }
            //到这里,分配成功的p应该进行校验阶段了。没有成功分配的p目前还是null,所以跳过下面的判断。
            if (p != null) {
                //从这开始p是有资源了
                try {
                    /*
                    private final PooledObjectFactory<T> factory;
                    通过池工厂(PooledObjectFactory.java)对返回来的实例重新初始化。因为PooledObjectFactory是个接口。所以,我以Jedis为T进行解析(JedisFactory.java),别的实际类型会有不同的行为,但是基本差不多。
        这个方法里就是获取了BinaryJedis,判断当前的库是不是库0.如果不是就就简单做个select操作。如果是的话什么都不做。
                    这块的设计有一个小心思,就是作者用这种代码结构来给开发者留出了很大的实现空间。
                    */
                    factory.activateObject(p);
                } catch (final Exception e) {
                    //激活异常
                    try {
                        //摧毁对象,释放资源
                        destroy(p, DestroyMode.NORMAL);
                    } catch (final Exception e1) {
                        // 摧毁的程序都报异常了,我可管不了了。采取鸵鸟算法。
                    }
                    //help GC
                    p = null;
                    //如果是新建的资源,就说明分配成功,但是激活失败。抛个错吧。
                    if (create) {
                        final NoSuchElementException nsee = new NoSuchElementException(
                                "Unable to activate object");
                        //用什么错误原因呢?就用上面这个吧
                        nsee.initCause(e);
                        throw nsee;
                    }
                }
                /*
                如果分配成功了, 激活也成功了,并且开启了TestOnBorrow,逻辑进来再让我蹂躏下吧。如果失败了跳过这个逻辑吧。
                testOnBorrow的目的是不论资源是否是从池子里借的,都要在返回钱进行一次验证。如果失败了资源会被清除出池子并且毁掉。然后再向池对象尝试借一次。
                */
                if (p != null && getTestOnBorrow()) {
                    boolean validate = false;
                    Throwable validationThrowable = null;
                    try {
                    /*
                    我依然以JedisFactory.java分析吧。
                    依然是首先获取一个BinaryJedis资源。判断连接的Port和设置的port是否相等。判断连接的host和设置额host是否相等。相等就返回true。否则就返回false。
                    */
                        validate = factory.validateObject(p);
                    } catch (final Throwable t) {
                        PoolUtils.checkRethrow(t);
                        validationThrowable = t;
                    }
                    //如果检查没有通过依然是摧毁。如果是新建的就报错。和上面激活的流程一样。
                    if (!validate) {
                        try {
                            destroy(p, DestroyMode.NORMAL);
                            destroyedByBorrowValidationCount.incrementAndGet();
                        } catch (final Exception e) {
                            // Ignore - validation failure is more important
                        }
                        p = null;
                        if (create) {
                            final NoSuchElementException nsee = new NoSuchElementException(
                                    "Unable to validate object");
                            nsee.initCause(validationThrowable);
                            throw nsee;
                        }
                    }
                }
            }
        }
        //恭喜获得了一个有效的p资源。然后统计一下数据吧,比如消耗时间。
        updateStatsBorrow(p, System.currentTimeMillis() - waitTime);
        //返回吧。记住返回的是一个PooledObject对象。真正的资源需要通过getObject()获取。
        return p.getObject();
    }

  至此,看懂了jedis是如何从连接池中拿到资源的了。这里面经常有一些线程安全的判断,这也是难点所在,因为池化技术最重要的功能就是对线程资源的管理与利用。所以,代码中用了很多sychronize方法。不光这些,作者还要考虑到资源的释放和销毁还有扩展。

  最后看到了getObject()方法。为什么作者不直接暴露资源呢?为什么还要包一层呢?接下来我来分析一下吧。PooledObject是一个接口,我们先看看接口的描述是什么。详情看我的之前一篇文章common-pool对象池学习

/**
 * Defines the wrapper that is used to track the additional information, such as
 * state, for the pooled objects.
 * <p>
 * Implementations of this class are required to be thread-safe.
 * </p>
 *
 * @param <T> the type of object in the pool
 *
 * @since 2.0
 */
 /*
 定义一个封装结构,这个封装结构使用来记录额外信息的。比如池资源的状态等。
 实现这个接口的class文件必须是线程安全的。(为什么呢?)
 
 */
public interface PooledObject<T> extends Comparable<PooledObject<T>> {}

参考资料: