时间: 2020-08-30|44次围观|0 条评论

  相对来说,Observable还是比较简单,因为Observable不存在背压问题,换句话说,Observable不解决背压问题。而背压问题是肯定存在的,所以在RxJava2里面加入了Flowable来专门背压问题。本文重点在于5种背压策略,至于整个工作流程,跟Observable差不多。
  本文参考资料:

  1.除非特殊说明,源码来自:2.2.0版本
  2.RxJava从源码到应用 移动端开发效率秒提速
  3.RxJava2 ( Flowable) 相关的实现与使用

1.基本元素

  还是按照Observable的格式,我们先来跟Flowable相关的基本元素有哪些。这里还是举一个简单的案例:

    Flowable.create(new FlowableOnSubscribe<String>() {      @Override      public void subscribe(FlowableEmitter<String> emitter) throws Exception {      }    }, BackpressureStrategy.DROP).subscribe(new Subscriber<String>() {      @Override      public void onSubscribe(Subscription s) {      }      @Override      public void onNext(String s) {      }      @Override      public void onError(Throwable t) {      }      @Override      public void onComplete() {      }    });

  从上面的案例中,我们可以非常容易的提取出基本元素:Flowable, FlowableOnSubscribe,FlowableEmitter, BackpressureStrategy, Subscriber。从这几个基本元素,我们可以看出来,Flowable的基本元素与Observable相差不多,只是换了一个名字而已。不过这其中多了一个元素:BackpressureStrategy即背压策略。我们来看看:

public enum BackpressureStrategy {    MISSING,    ERROR,    BUFFER,    DROP,    LATEST}

  我们可以看出来,一共有5种背压策略供我们使用。这也是本文分析的重点。

2.概述

  如果直接介绍背压策略相关的类,有些老哥可能会觉得有点懵逼。这里先简单的对背压策略做一个解释,介绍是怎么实现不同的背压策略。
  Flowable通过一系列的调用,最终会调用到FlowableCreatesubscribeActual方法

    public void subscribeActual(Subscriber<? super T> t) {        BaseEmitter<T> emitter;        switch (backpressure) {        case MISSING: {            emitter = new MissingEmitter<T>(t);            break;        }        case ERROR: {            emitter = new ErrorAsyncEmitter<T>(t);            break;        }        case DROP: {            emitter = new DropAsyncEmitter<T>(t);            break;        }        case LATEST: {            emitter = new LatestAsyncEmitter<T>(t);            break;        }        default: {            emitter = new BufferAsyncEmitter<T>(t, bufferSize());            break;        }        }        t.onSubscribe(emitter);        try {            source.subscribe(emitter);        } catch (Throwable ex) {            Exceptions.throwIfFatal(ex);            emitter.onError(ex);        }    }

  在subscribeActual方法里面,通过不同的背压模式初始化不同的Emitter,从而达到不同的背压策略。所以,如果我们想要理解5种背压策略,这5个Emitter则是我们分析对象。

3.BaseEmitter和NoOverflowBaseAsyncEmitter

  在正式分析5种Emitter之前,我们先将两个基本的Emitter-BaseEmitterNoOverflowBaseAsyncEmitter看一下。

(1).BaseEmitter

  我们先来看看BaseEmitter类。这里就不贴出整个类的代码了,而是分析几个比较重要的方法。
  先来看看BaseEmitter的类结构:

    abstract static class BaseEmitter<T>    extends AtomicLong    implements FlowableEmitter<T>, Subscription {}

  跟CreateEmitter差不多,都实现了相似的接口,代表不同的职责。这里需要注意的是BaseEmitter继承的是AtomicLong类,而CreateEmitter继承的是AtomicReference类。
  为什么BaseEmitter会继承AtomicLong类呢?因为,背压问题会涉及到不平衡,为了解决这种问题,通常会设置一个缓冲队列,缓冲队列不可能无限大吧?应该是有限制的,所以这里继承于AtomicLong
  BaseEmitter本身是一个抽象类,它对EmitteronError方法和onComlete方法进行了实现。这里就不进行展开了,跟Observable差不多,也是一种AOP模式。
  这里需要讲解的是request方法,我们先来看看request方法:

        @Override        public final void request(long n) {            if (SubscriptionHelper.validate(n)) {                BackpressureHelper.add(this, n);                onRequested();            }        }

  我们知道FlowableSubscriber相较于ObservableObserveronSubscribe方法的参数多一个方法--request方法,request方法就是用来请求被观察者,我能处理多少个数据。如果被观察者发送的数据多于了request的请求数据量,此时就出现了背压问题。
  当我们onSubscribe方法里面调用Subscriptionrequest方法时,就调用了BaseEmitterrequest方法,因为这里的 Subscription对象就是BaseEmitter对象,至于为什么,大佬们请看FlowableCreatesubscribeActual方法。
  所以,BaseEmitterrequest方法是用来设置请求处理最大的数量。我们来看看是怎么设置,这个就得看看BackpressureHelperadd方法了:

public static long add(AtomicLong requested, long n) {        for (;;) {            long r = requested.get();            if (r == Long.MAX_VALUE) {                return Long.MAX_VALUE;            }            long u = addCap(r, n);            if (requested.compareAndSet(r, u)) {                return r;            }        }    }

  这个方法非常的简单,就是更新AtomicLong的值。这里就不多说了。
  既然设置了处理的阈值,我们就得看看当超过这个阈值,不同策略采取的不同方式来处理这个问题。

(2). NoOverflowBaseAsyncEmitter

    abstract static class NoOverflowBaseAsyncEmitter<T> extends BaseEmitter<T> {        private static final long serialVersionUID = 4127754106204442833L;        NoOverflowBaseAsyncEmitter(Subscriber<? super T> actual) {            super(actual);        }        @Override        public final void onNext(T t) {            if (isCancelled()) {                return;            }            if (t == null) {                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));                return;            }            if (get() != 0) {                actual.onNext(t);                BackpressureHelper.produced(this, 1);            } else {                onOverflow();            }        }        abstract void onOverflow();    }

  我们可以看到NoOverflowBaseAsyncEmitter继承于BaseEmitter,同时还实现了onNext方法。从这里,我们可以看出AtomicLong的作用了,当调用一次EmitteronNext方法时,先判断当前值是否为0,如果不为0,那么调用SubscriberonNext方法,同时使用BackpressureHelper使当前的值减一;如果当前的值已经为0了,会调用onOverflow方法,此时根据不同的策略,onOverflow方法实现就不尽相同。
  对BaseEmitterNoOverflowBaseAsyncEmitter分析的差不多了,此时我们正式分析5种策略对应的Emitter

4. MissingEmitter

  我们先来看看MissingEmitter的源码:

    static final class MissingEmitter<T> extends BaseEmitter<T> {        private static final long serialVersionUID = 3776720187248809713L;        MissingEmitter(Subscriber<? super T> actual) {            super(actual);        }        @Override        public void onNext(T t) {            if (isCancelled()) {                return;            }            if (t != null) {                actual.onNext(t);            } else {                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));                return;            }            for (;;) {                long r = get();                if (r == 0L || compareAndSet(r, r - 1)) {                    return;                }            }        }    }

   MissingEmitter的源码比较简单,而且调用SubscriberonNext没有什么限制,只是最终阈值减到了0,就不会再变了。
  从BackpressureStrategy中对MISSING的注释,我们得到,MISSING本身没有处理背压问题。

public enum BackpressureStrategy {    /**     * OnNext events are written without any buffering or dropping.     * Downstream has to deal with any overflow.     * <p>Useful when one applies one of the custom-parameter onBackpressureXXX operators.     */    MISSING,}

5. ErrorAsyncEmitter

  继续来看看ErrorAsyncEmitter

static final class ErrorAsyncEmitter<T> extends NoOverflowBaseAsyncEmitter<T> {        private static final long serialVersionUID = 338953216916120960L;        ErrorAsyncEmitter(Subscriber<? super T> actual) {            super(actual);        }        @Override        void onOverflow() {            onError(new MissingBackpressureException("create: could not emit value due to lack of requests"));        }    }

  哈哈,是不是更加的简单?之前在看NoOverflowBaseAsyncEmitter源码的时候,我们发现当调用onNext的次数用完了,会调用onOverflow方法。所以ErrorAsyncEmitter直接在onOverflow方法里面抛出了一个MissingBackpressureException异常。

6. DropAsyncEmitter

  继续来看看DropAsyncEmitter

    static final class DropAsyncEmitter<T> extends NoOverflowBaseAsyncEmitter<T> {        private static final long serialVersionUID = 8360058422307496563L;        DropAsyncEmitter(Subscriber<? super T> actual) {            super(actual);        }        @Override        void onOverflow() {            // nothing to do        }    }

  DropAsyncEmitter表示的意思是,丢弃多余的信息,从其源码看得出来,onOverflow方法根本没有做什么

7. LatestAsyncEmitter

  LatestAsyncEmitter表达的意思是获取最近的数据。表达的意思是非常的简单,但是我们在看LatestAsyncEmitter的源码,发现确实有点恼火。特别是那个drain方法,为什么需要那样写?接下来我们将分析一下drain方法。
  不过在分析drain方法之前,我们首先有一个概念,那就是操作系统里面生产者和消费者模式。在DropAsyncEmitter中,包括接下来要讲的BufferAsyncEmitter都是使用这种模式,其中都是使用缓冲池来存放生产者的生成的数据,然后通知消费者者来消费数据。
  所以,在这里,我们将DropAsyncEmitteronNext方法当做生产者生成数据的过程,生成完成之后会存放在一个AtomicReference类型的变量里面(因为LatestAsyncEmitter缓冲池只存放一个数据,所以没必要开一个队列。)。
  生成完毕之后,会调用drain方法来通知消费者来消费我们的数据,具体的消费就是SubscriberonNext方法。所以,在这里,我们可以将drain方法单纯的当成一个消费数据方法。

(1).成员变量

  在正式分析之前,我们还是看一下LatestAsyncEmitter里面那些比较重要的成员变量吧。

变量名 类型 含义
queue AtomicReference 用来存储最近一次发射的数据,也就是说,如果消费者上次没有消费掉上次生成的数据,这次生成,会覆盖上一次未消费的数据
wip AtomicInteger 此时此刻,有且只有一个线程来消费。用来控制线程安全的,具体的实现,我们可以在drain方法里面可以看到

(2).onNext方法

  按序就班的来,drain方法再难也不怕?。我们先来看看onNext方法,看看是怎么实现的。

        @Override        public void onNext(T t) {            if (done || isCancelled()) {                return;            }            if (t == null) {                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));                return;            }            queue.set(t);            drain();        }

  这个方法比较简单,不过跟其他Emitter不一样的地方,没有在Emitter的onNext方法里面调用SubscriberonNext方法。在这个方法方法里面主要是做了两件事,一个是往queue里面塞数据,这里需要注意的是,如果queue之前有数据还未消费的话,就是说调用queueget方法返回的不为null,这里的set操作会覆盖上一次的数据。
  set成功的时候,我们可以理解为生产者已经生成完毕,然后需要通知消费者来消费我们的数据,怎么通知呢?当然是调用drain方法。

(3).drain方法

  现在我们可以正式分析drain方法了。先来看看drain方法时怎么控制只有一个线程来消费我们的数据。

            if (wip.getAndIncrement() != 0) {                return;            }

  每调用一次drain方法,wip都会自增,如果不为0的话,表示当前有线程在。有人可能有疑问了,那我在一个线程调用两次onNext方法,第二次在这里不就是被return了吗?关于这个疑问,我们下面这段代码。

                missed = wip.addAndGet(-missed);                if (missed == 0) {                    break;                }

  在SubscriberonNext方法调用完成之后,都会调用执行这段代码。用来表示当前线程执行drain方法完毕,但是又有人说,如果missed不为0的话,会怎么样?
  关于这个疑问,首先我们得知道在什么情况会导致missed不为0。在多个线程同时调用drain方法,missed不为0,因为每个线程在drain方法时,wip都会调用getAndIncrement来自增一下。最后,获取 drain方法执行权的那个线程,在执行完毕之后,调用addAndGet方法来表示自己的释放了锁。但是同时发现有其他线程在它执行的时候,试图来调用这个方法,为了避免其他队这个造成的影响,这段代码一个for死循环里面做的,用来消除其他线程自增的值。

        void drain() {            if (wip.getAndIncrement() != 0) {                return;            }            for (;;) {                //省略众多代码······                missed = wip.addAndGet(-missed);                if (missed == 0) {                    break;                }            }        }

  整个的执行流程就是这样的,现在看上去不是晦涩了。
  看清楚了控制线程的逻辑,我们现在来看看正式消费的这一块的逻辑,相对来说,就比较容易了。

                long r = get();                long e = 0L;                while (e != r) {                    if (isCancelled()) {                        q.lazySet(null);                        return;                    }                    boolean d = done;                    T o = q.getAndSet(null);                    boolean empty = o == null;                    if (d && empty) {                        Throwable ex = error;                        if (ex != null) {                            error(ex);                        } else {                            complete();                        }                        return;                    }                    if (empty) {                        break;                    }                    a.onNext(o);                    e++;                }

  对于这段代码,我们梳理一下逻辑,首先考虑一下,什么情况下会执行这段代码?我们发现drain方法,在onNextonCompleteonRequested都被调用过,我们来具体的分析一下。
  onNext方法就不用说了,onNext方法里面生成了一个数据,此时queue里面存储肯定不为空,所以只要消费数量还没有到阈值,肯定会执行,因为这里做了一个e != r的判断。所以所有的消费前提都是,消费的数量还没有达到规定的阈值,这个是毋庸置疑的。
  onComplete方法在哪一种情况会执行这段代码呢?在达到消费前提的情况下,如果此时queue存储的值没有被消费掉,此时会被消费。
  onRequested方法就不用说了,既然调用了request方法,消费提前肯定是达到的(当然除非调用的是0?),也会消费上一次未消费数据。
  不过从这里面,我们得需要注意一点,就是如果之前抛了异常,会在延迟抛出。

                    if (d && empty) {                        Throwable ex = error;                        if (ex != null) {                            error(ex);                        } else {                            complete();                        }                        return;                    }

  至于这段代码,就比较好理解了。

                if (e == r) {                    if (isCancelled()) {                        q.lazySet(null);                        return;                    }                    boolean d = done;                    boolean empty = q.get() == null;                    if (d && empty) {                        Throwable ex = error;                        if (ex != null) {                            error(ex);                        } else {                            complete();                        }                        return;                    }                }

  至于这段代码,就比较好理解了。当申请消费的个数已经被消费完毕了,此时调用onNextonComplete任意一个方法都会走到这个来。

8. BufferAsyncEmitter

  BufferAsyncEmitterLatestAsyncEmitter非常的像,就是从缓冲池从AtomicInteger换成了SpscLinkedArrayQueue,毕竟LatestAsyncEmitter只缓存最近的一个值,而BufferAsyncEmitter则是缓存了所有的数据,所以BufferAsyncEmitter得用一个队列来存储所有的数据。
  说到这个队列,那可不得了,简单的看一下SpscLinkedArrayQueue的代码,不得不佩服,大佬写的太牛逼了。本来想在本文来介绍SpscLinkedArrayQueue队列的原理,但是SpscLinkedArrayQueue比较复杂,这样写下去,篇幅比较大,所以还是打算单独的写一篇文章来介绍这个类。

9.总结

  Flowable的基本工作原理,其实跟Observable非常的像,所以感觉没什么需要总结。不过,我们还是有必要去了解一下Flowable里面的5种策略,也就是本文介绍的5种Emitter

文章转载于:https://www.jianshu.com/p/4fa9e5dff7c7

原著是一个有趣的人,若有侵权,请通知删除

本博客所有文章如无特别注明均为原创。
复制或转载请以超链接形式注明转自起风了,原文地址《RxJava 源码分析系列(二) – Flowable的5种背压策略
   

还没有人抢沙发呢~