I’m observing behavior that seems strange to me when I use an Observable constructed in the following way.
I’m using Observable.using to create an observable based on some resource. In Observable.using, I pass a function as sourceSupplier, which returns an observable created using Observable.create.
Then I apply the takeUntil operator to the observable created by Observable.using to combine it with another observable — stopSubject, which in this example is a BehaviorSubject.
class Resource {
boolean isClosed = false;
}
BehaviorSubject<Object> stopSubject = BehaviorSubject.create();
stopSubject.onNext(1);
Observable
.using(
Resource::new,
(resource) -> Observable.create(emitter -> {
...
}),
(resource) -> {
resource.isClosed = true;
}
)
.takeUntil(stopSubject)
I expect that when the callback passed to Observable.create is invoked, the resource created by Observable.using has not yet been disposed, and the emitter passed to the callback is not yet disposed either. I verify this in the tests of the attached TakeUntilTest class.
TakeUntilTest.java.zip
These expectations hold true if I emit a value into stopSubject after subscribing to the observable under test (see the test otherObservableEmitsValueAfterSubscription).
However, if I emit a value into stopSubject before subscribing to the observable under test, the following happens:
The callback passed to Observable.create is invoked, but by that time the resource has already been disposed, and the emitter passed to the callback is already in the disposed state. This is verified in the test otherObservableHasValueInAdvance.
I looked into the RxJava implementation, and I believe the reason for this behavior is as follows:
ObservableTakeUntil class in its subscribeActual method first subscribes to the other observable and only afterwards subscribes to the source observable:
@Override
public void subscribeActual(Observer<? super T> child) {
TakeUntilMainObserver<T, U> parent = new TakeUntilMainObserver<>(child);
child.onSubscribe(parent);
other.subscribe(parent.otherObserver);
source.subscribe(parent);
}
In my test, stopSubject acts as the other observable, and since it already has a value in advance, subscribing to it immediately triggers the TakeUntilMainObserver.otherComplete method, which writes a special DISPOSED value into the TakeUntilMainObserver.upstream field (using the helper method DisposableHelper.dispose):
void otherComplete() {
DisposableHelper.dispose(upstream);
HalfSerializer.onComplete(downstream, this, error);
}
The subsequent subscription to the source observable in ObservableTakeUntil.subscribeActual calls subscribeActual on ObservableCreate, which first calls onSubscribe on the passed observer, and only then invokes the source callback:
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
The call to observer.onSubscribe inside ObservableCreate ends up calling TakeUntilMainObserver. onSubscribe, which tries to set the passed disposable into the upstream field using the helper method DisposableHelper.setOnce:
@Override
public void onSubscribe(Disposable d) {
DisposableHelper.setOnce(upstream, d);
}
But since the upstream field already contains the special DISPOSED value, the assignment doesn’t happen, and instead DisposableHelper.setOnce calls dispose on the passed disposable. In this case, the disposable is an instance of ObservableUsing$UsingObserver, which proceeds to dispose the resource:
@Override
public void dispose() {
if (eager) {
disposeResource();
upstream.dispose();
upstream = DisposableHelper.DISPOSED;
} else {
upstream.dispose();
upstream = DisposableHelper.DISPOSED;
disposeResource();
}
}
Therefore, when control returns back to ObservableCreate.subscribeActual after onSubscribe is called, and the source callback is finally invoked, the resource has already been disposed, and the emitter passed to the callback (CreateEmitter) is already in disposed state.
I believe this current behavior is incorrect, and in the test otherObservableHasValueInAdvance, one of the following should happen:
• Either the callback passed to Observable.create should not be invoked at all,
• Or the callback should be invoked before the resource is disposed and the emitter is transitioned to the disposed state.
I’m observing behavior that seems strange to me when I use an Observable constructed in the following way.
I’m using
Observable.usingto create an observable based on some resource. InObservable.using, I pass a function assourceSupplier, which returns an observable created usingObservable.create.Then I apply the
takeUntiloperator to the observable created byObservable.usingto combine it with another observable —stopSubject, which in this example is a BehaviorSubject.I expect that when the callback passed to
Observable.createis invoked, the resource created byObservable.usinghas not yet been disposed, and theemitterpassed to the callback is not yet disposed either. I verify this in the tests of the attached TakeUntilTest class.TakeUntilTest.java.zip
These expectations hold true if I emit a value into stopSubject after subscribing to the observable under test (see the test
otherObservableEmitsValueAfterSubscription).However, if I emit a value into
stopSubjectbefore subscribing to the observable under test, the following happens:The callback passed to
Observable.createis invoked, but by that time the resource has already been disposed, and theemitterpassed to the callback is already in the disposed state. This is verified in the testotherObservableHasValueInAdvance.I looked into the RxJava implementation, and I believe the reason for this behavior is as follows:
ObservableTakeUntilclass in itssubscribeActualmethod first subscribes to theotherobservable and only afterwards subscribes to thesourceobservable:In my test,
stopSubjectacts as theotherobservable, and since it already has a value in advance, subscribing to it immediately triggers theTakeUntilMainObserver.otherCompletemethod, which writes a specialDISPOSEDvalue into theTakeUntilMainObserver.upstreamfield (using the helper methodDisposableHelper.dispose):The subsequent subscription to the
sourceobservable inObservableTakeUntil.subscribeActualcallssubscribeActualonObservableCreate, which first callsonSubscribeon the passedobserver, and only then invokes thesourcecallback:The call to
observer.onSubscribeinsideObservableCreateends up callingTakeUntilMainObserver. onSubscribe, which tries to set the passed disposable into theupstreamfield using the helper methodDisposableHelper.setOnce:But since the
upstreamfield already contains the specialDISPOSEDvalue, the assignment doesn’t happen, and insteadDisposableHelper.setOncecallsdisposeon the passeddisposable. In this case, thedisposableis an instance ofObservableUsing$UsingObserver, which proceeds to dispose the resource:Therefore, when control returns back to
ObservableCreate.subscribeActualafteronSubscribeis called, and thesourcecallback is finally invoked, theresourcehas already been disposed, and theemitterpassed to the callback (CreateEmitter) is already in disposed state.I believe this current behavior is incorrect, and in the test
otherObservableHasValueInAdvance, one of the following should happen:• Either the callback passed to
Observable.createshould not be invoked at all,• Or the callback should be invoked before the
resourceis disposed and theemitteris transitioned to the disposed state.