Skip to content

Commit f809f1c

Browse files
committed
Merge pull request ReactiveX#75 from dlew/dlew/lifecycle-observable
Added LifecycleObservable binds
2 parents b232482 + febe37d commit f809f1c

File tree

5 files changed

+560
-0
lines changed

5 files changed

+560
-0
lines changed
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/**
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
15+
package rx.android.lifecycle;
16+
17+
/**
18+
* Lifecycle events that can be emitted by Activities or Fragments.
19+
*/
20+
public enum LifecycleEvent {
21+
22+
ATTACH,
23+
CREATE,
24+
CREATE_VIEW,
25+
START,
26+
RESUME,
27+
PAUSE,
28+
STOP,
29+
DESTROY_VIEW,
30+
DESTROY,
31+
DETACH
32+
33+
}
Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
/**
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
15+
package rx.android.lifecycle;
16+
17+
import rx.Observable;
18+
import rx.functions.Func1;
19+
import rx.functions.Func2;
20+
21+
public class LifecycleObservable {
22+
23+
private LifecycleObservable() {
24+
throw new AssertionError("LifeCycleObservable cannot be instantiated.");
25+
}
26+
27+
/**
28+
* Binds the given source to a lifecycle.
29+
* <p/>
30+
* When the lifecycle event occurs, the source will cease to emit any notifications.
31+
*
32+
* @param lifecycle the lifecycle sequence
33+
* @param source the source sequence
34+
* @param event the event which should conclude notifications from the source
35+
*/
36+
public static <T> Observable<T> bindUntilLifecycleEvent(Observable<LifecycleEvent> lifecycle,
37+
Observable<T> source,
38+
final LifecycleEvent event) {
39+
if (lifecycle == null || source == null) {
40+
throw new IllegalArgumentException("Lifecycle and Observable must be given");
41+
}
42+
43+
return source.lift(
44+
new OperatorSubscribeUntil<T, LifecycleEvent>(
45+
lifecycle.takeFirst(new Func1<LifecycleEvent, Boolean>() {
46+
@Override
47+
public Boolean call(LifecycleEvent lifecycleEvent) {
48+
return lifecycleEvent == event;
49+
}
50+
})
51+
)
52+
);
53+
}
54+
55+
/**
56+
* Binds the given source to an Activity lifecycle.
57+
* <p/>
58+
* This helper automatically determines (based on the lifecycle sequence itself) when the source
59+
* should stop emitting items. In the case that the lifecycle sequence is in the
60+
* creation phase (CREATE, START, etc) it will choose the equivalent destructive phase (DESTROY,
61+
* STOP, etc). If used in the destructive phase, the notifications will cease at the next event;
62+
* for example, if used in PAUSE, it will unsubscribe in STOP.
63+
* <p/>
64+
* Due to the differences between the Activity and Fragment lifecycles, this method should only
65+
* be used for an Activity lifecycle.
66+
*
67+
* @param lifecycle the lifecycle sequence of an Activity
68+
* @param source the source sequence
69+
*/
70+
public static <T> Observable<T> bindActivityLifecycle(Observable<LifecycleEvent> lifecycle, Observable<T> source) {
71+
return bindLifecycle(lifecycle, source, ACTIVITY_LIFECYCLE);
72+
}
73+
74+
/**
75+
* Binds the given source to a Fragment lifecycle.
76+
* <p/>
77+
* This helper automatically determines (based on the lifecycle sequence itself) when the source
78+
* should stop emitting items. In the case that the lifecycle sequence is in the
79+
* creation phase (CREATE, START, etc) it will choose the equivalent destructive phase (DESTROY,
80+
* STOP, etc). If used in the destructive phase, the notifications will cease at the next event;
81+
* for example, if used in PAUSE, it will unsubscribe in STOP.
82+
* <p/>
83+
* Due to the differences between the Activity and Fragment lifecycles, this method should only
84+
* be used for a Fragment lifecycle.
85+
*
86+
* @param lifecycle the lifecycle sequence of a Fragment
87+
* @param source the source sequence
88+
*/
89+
public static <T> Observable<T> bindFragmentLifecycle(Observable<LifecycleEvent> lifecycle, Observable<T> source) {
90+
return bindLifecycle(lifecycle, source, FRAGMENT_LIFECYCLE);
91+
}
92+
93+
private static <T> Observable<T> bindLifecycle(Observable<LifecycleEvent> lifecycle,
94+
Observable<T> source,
95+
Func1<LifecycleEvent, LifecycleEvent> correspondingEvents) {
96+
if (lifecycle == null || source == null) {
97+
throw new IllegalArgumentException("Lifecycle and Observable must be given");
98+
}
99+
100+
// Make sure we're truly comparing a single stream to itself
101+
Observable<LifecycleEvent> sharedLifecycle = lifecycle.share();
102+
103+
// Keep emitting from source until the corresponding event occurs in the lifecycle
104+
return source.lift(
105+
new OperatorSubscribeUntil<T, Boolean>(
106+
Observable.combineLatest(
107+
sharedLifecycle.take(1).map(correspondingEvents),
108+
sharedLifecycle.skip(1),
109+
new Func2<LifecycleEvent, LifecycleEvent, Boolean>() {
110+
@Override
111+
public Boolean call(LifecycleEvent bindUntilEvent, LifecycleEvent lifecycleEvent) {
112+
return lifecycleEvent == bindUntilEvent;
113+
}
114+
})
115+
.takeFirst(new Func1<Boolean, Boolean>() {
116+
@Override
117+
public Boolean call(Boolean shouldComplete) {
118+
return shouldComplete;
119+
}
120+
})
121+
)
122+
);
123+
}
124+
125+
// Figures out which corresponding next lifecycle event in which to unsubscribe, for Activities
126+
private static final Func1<LifecycleEvent, LifecycleEvent> ACTIVITY_LIFECYCLE =
127+
new Func1<LifecycleEvent, LifecycleEvent>() {
128+
@Override
129+
public LifecycleEvent call(LifecycleEvent lastEvent) {
130+
if (lastEvent == null) {
131+
throw new NullPointerException("Cannot bind to null LifecycleEvent.");
132+
}
133+
134+
switch (lastEvent) {
135+
case CREATE:
136+
return LifecycleEvent.DESTROY;
137+
case START:
138+
return LifecycleEvent.STOP;
139+
case RESUME:
140+
return LifecycleEvent.PAUSE;
141+
case PAUSE:
142+
return LifecycleEvent.STOP;
143+
case STOP:
144+
return LifecycleEvent.DESTROY;
145+
case DESTROY:
146+
throw new IllegalStateException("Cannot bind to Activity lifecycle when outside of it.");
147+
case ATTACH:
148+
case CREATE_VIEW:
149+
case DESTROY_VIEW:
150+
case DETACH:
151+
throw new IllegalStateException("Cannot bind to " + lastEvent + " for an Activity.");
152+
default:
153+
throw new UnsupportedOperationException("Binding to LifecycleEvent " + lastEvent
154+
+ " not yet implemented");
155+
}
156+
}
157+
};
158+
159+
// Figures out which corresponding next lifecycle event in which to unsubscribe, for Fragments
160+
private static final Func1<LifecycleEvent, LifecycleEvent> FRAGMENT_LIFECYCLE =
161+
new Func1<LifecycleEvent, LifecycleEvent>() {
162+
@Override
163+
public LifecycleEvent call(LifecycleEvent lastEvent) {
164+
if (lastEvent == null) {
165+
throw new NullPointerException("Cannot bind to null LifecycleEvent.");
166+
}
167+
168+
switch (lastEvent) {
169+
case ATTACH:
170+
return LifecycleEvent.DETACH;
171+
case CREATE:
172+
return LifecycleEvent.DESTROY;
173+
case CREATE_VIEW:
174+
return LifecycleEvent.DESTROY_VIEW;
175+
case START:
176+
return LifecycleEvent.STOP;
177+
case RESUME:
178+
return LifecycleEvent.PAUSE;
179+
case PAUSE:
180+
return LifecycleEvent.STOP;
181+
case STOP:
182+
return LifecycleEvent.DESTROY_VIEW;
183+
case DESTROY_VIEW:
184+
return LifecycleEvent.DESTROY;
185+
case DESTROY:
186+
return LifecycleEvent.DETACH;
187+
case DETACH:
188+
throw new IllegalStateException("Cannot bind to Fragment lifecycle when outside of it.");
189+
default:
190+
throw new UnsupportedOperationException("Binding to LifecycleEvent " + lastEvent
191+
+ " not yet implemented");
192+
}
193+
}
194+
};
195+
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/**
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
15+
package rx.android.lifecycle;
16+
17+
import rx.Observable;
18+
import rx.Subscriber;
19+
import rx.observers.SerializedSubscriber;
20+
21+
/**
22+
* Returns an Observable that emits the items from the source Observable until another Observable
23+
* emits an item.
24+
* <p>
25+
* Unlike takeUntil, this choose to unsubscribe the parent rather than calling onComplete().
26+
*/
27+
final class OperatorSubscribeUntil<T, R> implements Observable.Operator<T, T> {
28+
29+
private final Observable<? extends R> other;
30+
31+
public OperatorSubscribeUntil(final Observable<? extends R> other) {
32+
this.other = other;
33+
}
34+
35+
@Override
36+
public Subscriber<? super T> call(final Subscriber<? super T> child) {
37+
final Subscriber<T> parent = new SerializedSubscriber<T>(child);
38+
39+
other.unsafeSubscribe(new Subscriber<R>(child) {
40+
41+
@Override
42+
public void onCompleted() {
43+
parent.unsubscribe();
44+
}
45+
46+
@Override
47+
public void onError(Throwable e) {
48+
parent.onError(e);
49+
}
50+
51+
@Override
52+
public void onNext(R t) {
53+
parent.unsubscribe();
54+
}
55+
56+
});
57+
58+
return parent;
59+
}
60+
}

0 commit comments

Comments
 (0)