2020import rx .Subscriber ;
2121import rx .exceptions .CompositeException ;
2222import rx .exceptions .Exceptions ;
23+ import rx .exceptions .OnCompletedFailedException ;
2324import rx .exceptions .OnErrorFailedException ;
2425import rx .exceptions .OnErrorNotImplementedException ;
25- import rx .plugins .RxJavaPlugins ;
26+ import rx .exceptions .UnsubscribeFailedException ;
27+ import rx .internal .util .RxJavaPluginUtils ;
2628
2729/**
2830 * {@code SafeSubscriber} is a wrapper around {@code Subscriber} that ensures that the {@code Subscriber}
@@ -83,11 +85,17 @@ public void onCompleted() {
8385 // we handle here instead of another method so we don't add stacks to the frame
8486 // which can prevent it from being able to handle StackOverflow
8587 Exceptions .throwIfFatal (e );
86- // handle errors if the onCompleted implementation fails, not just if the Observable fails
87- _onError ( e );
88+ RxJavaPluginUtils . handleException ( e );
89+ throw new OnCompletedFailedException ( e . getMessage (), e );
8890 } finally {
89- // auto-unsubscribe
90- unsubscribe ();
91+ try {
92+ // Similarly to onError if failure occurs in unsubscribe then Rx contract is broken
93+ // and we throw an UnsubscribeFailureException.
94+ unsubscribe ();
95+ } catch (Throwable e ) {
96+ RxJavaPluginUtils .handleException (e );
97+ throw new UnsubscribeFailedException (e .getMessage (), e );
98+ }
9199 }
92100 }
93101 }
@@ -145,11 +153,7 @@ public void onNext(T args) {
145153 * @see <a href="https://github.com/ReactiveX/RxJava/issues/630">the report of this bug</a>
146154 */
147155 protected void _onError (Throwable e ) {
148- try {
149- RxJavaPlugins .getInstance ().getErrorHandler ().handleError (e );
150- } catch (Throwable pluginException ) {
151- handlePluginException (pluginException );
152- }
156+ RxJavaPluginUtils .handleException (e );
153157 try {
154158 actual .onError (e );
155159 } catch (Throwable e2 ) {
@@ -168,11 +172,7 @@ protected void _onError(Throwable e) {
168172 try {
169173 unsubscribe ();
170174 } catch (Throwable unsubscribeException ) {
171- try {
172- RxJavaPlugins .getInstance ().getErrorHandler ().handleError (unsubscribeException );
173- } catch (Throwable pluginException ) {
174- handlePluginException (pluginException );
175- }
175+ RxJavaPluginUtils .handleException (unsubscribeException );
176176 throw new RuntimeException ("Observer.onError not implemented and error while unsubscribing." , new CompositeException (Arrays .asList (e , unsubscribeException )));
177177 }
178178 throw (OnErrorNotImplementedException ) e2 ;
@@ -182,19 +182,11 @@ protected void _onError(Throwable e) {
182182 *
183183 * https://github.com/ReactiveX/RxJava/issues/198
184184 */
185- try {
186- RxJavaPlugins .getInstance ().getErrorHandler ().handleError (e2 );
187- } catch (Throwable pluginException ) {
188- handlePluginException (pluginException );
189- }
185+ RxJavaPluginUtils .handleException (e2 );
190186 try {
191187 unsubscribe ();
192188 } catch (Throwable unsubscribeException ) {
193- try {
194- RxJavaPlugins .getInstance ().getErrorHandler ().handleError (unsubscribeException );
195- } catch (Throwable pluginException ) {
196- handlePluginException (pluginException );
197- }
189+ RxJavaPluginUtils .handleException (unsubscribeException );
198190 throw new OnErrorFailedException ("Error occurred when trying to propagate error to Observer.onError and during unsubscription." , new CompositeException (Arrays .asList (e , e2 , unsubscribeException )));
199191 }
200192
@@ -205,25 +197,11 @@ protected void _onError(Throwable e) {
205197 try {
206198 unsubscribe ();
207199 } catch (RuntimeException unsubscribeException ) {
208- try {
209- RxJavaPlugins .getInstance ().getErrorHandler ().handleError (unsubscribeException );
210- } catch (Throwable pluginException ) {
211- handlePluginException (pluginException );
212- }
200+ RxJavaPluginUtils .handleException (unsubscribeException );
213201 throw new OnErrorFailedException (unsubscribeException );
214202 }
215203 }
216204
217- private void handlePluginException (Throwable pluginException ) {
218- /*
219- * We don't want errors from the plugin to affect normal flow.
220- * Since the plugin should never throw this is a safety net
221- * and will complain loudly to System.err so it gets fixed.
222- */
223- System .err .println ("RxJavaErrorHandler threw an Exception. It shouldn't. => " + pluginException .getMessage ());
224- pluginException .printStackTrace ();
225- }
226-
227205 /**
228206 * Returns the {@link Subscriber} underlying this {@code SafeSubscriber}.
229207 *
0 commit comments