1+ /**
2+ * Copyright one 2014 Netflix, Inc.
3+ *
4+ * Licensed under the Apache License, Version 2.0 (the "License");
5+ * you may not use this file except in compliance with the License.
6+ * You may obtain a copy of the License at
7+ *
8+ * http://www.apache.org/licenses/LICENSE-2.0
9+ *
10+ * Unless required by applicable law or agreed to in writing, software
11+ * distributed under the License is distributed on an "AS IS" BASIS,
12+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+ * See the License for the specific language governing permissions and
14+ * limitations under the License.
15+ */
16+
17+ package rx .internal .operators ;
18+
19+ import java .util .ArrayList ;
20+ import java .util .Collection ;
21+ import java .util .HashMap ;
22+ import java .util .Map ;
23+
24+ import rx .Observable ;
25+ import rx .Observable .OnSubscribe ;
26+ import rx .Subscriber ;
27+ import rx .exceptions .Exceptions ;
28+ import rx .functions .Func0 ;
29+ import rx .functions .Func1 ;
30+
31+ /**
32+ * Maps the elements of the source observable into a multimap
33+ * (Map<K, Collection<V>>) where each
34+ * key entry has a collection of the source's values.
35+ *
36+ * @see <a href="https://github.com/ReactiveX/RxJava/issues/97">Issue #97</a>
37+ * @param <T> the value type of the input
38+ * @param <K> the multimap-key type
39+ * @param <V> the multimap-value type
40+ */
41+ public final class OnSubscribeToMultimap <T , K , V > implements OnSubscribe <Map <K , Collection <V >>>, Func0 <Map <K , Collection <V >>> {
42+
43+ private final Func1 <? super T , ? extends K > keySelector ;
44+ private final Func1 <? super T , ? extends V > valueSelector ;
45+ private final Func0 <? extends Map <K , Collection <V >>> mapFactory ;
46+ private final Func1 <? super K , ? extends Collection <V >> collectionFactory ;
47+ private final Observable <T > source ;
48+
49+ /**
50+ * ToMultimap with key selector, custom value selector,
51+ * default HashMap factory and default ArrayList collection factory.
52+ * @param keySelector the function extracting the map-key from the main value
53+ * @param valueSelector the function extracting the map-value from the main value
54+ */
55+ public OnSubscribeToMultimap (
56+ Observable <T > source ,
57+ Func1 <? super T , ? extends K > keySelector ,
58+ Func1 <? super T , ? extends V > valueSelector ) {
59+ this (source , keySelector , valueSelector ,
60+ null ,
61+ DefaultMultimapCollectionFactory .<K , V >instance ());
62+ }
63+
64+ /**
65+ * ToMultimap with key selector, custom value selector,
66+ * custom Map factory and default ArrayList collection factory.
67+ * @param the observable source
68+ * @param keySelector the function extracting the map-key from the main value
69+ * @param valueSelector the function extracting the map-value from the main value
70+ * @param mapFactory function that returns a Map instance to store keys and values into
71+ */
72+ public OnSubscribeToMultimap (
73+ Observable <T > source ,
74+ Func1 <? super T , ? extends K > keySelector ,
75+ Func1 <? super T , ? extends V > valueSelector ,
76+ Func0 <? extends Map <K , Collection <V >>> mapFactory ) {
77+ this (source , keySelector , valueSelector ,
78+ mapFactory ,
79+ DefaultMultimapCollectionFactory .<K , V >instance ());
80+ }
81+
82+ /**
83+ * ToMultimap with key selector, custom value selector,
84+ * custom Map factory and custom collection factory.
85+ * @param source the observable source
86+ * @param keySelector the function extracting the map-key from the main value
87+ * @param valueSelector the function extracting the map-value from the main value
88+ * @param mapFactory function that returns a Map instance to store keys and values into
89+ * @param collectionFactory function that returns a Collection for a particular key to store values into
90+ */
91+ public OnSubscribeToMultimap (
92+ Observable <T > source ,
93+ Func1 <? super T , ? extends K > keySelector ,
94+ Func1 <? super T , ? extends V > valueSelector ,
95+ Func0 <? extends Map <K , Collection <V >>> mapFactory ,
96+ Func1 <? super K , ? extends Collection <V >> collectionFactory ) {
97+ this .source = source ;
98+ this .keySelector = keySelector ;
99+ this .valueSelector = valueSelector ;
100+ if (mapFactory == null ) {
101+ this .mapFactory = this ;
102+ } else {
103+ this .mapFactory = mapFactory ;
104+ }
105+ this .collectionFactory = collectionFactory ;
106+ }
107+
108+ // default map factory
109+ @ Override
110+ public Map <K , Collection <V >> call () {
111+ return new HashMap <K , Collection <V >>();
112+ }
113+
114+ @ Override
115+ public void call (final Subscriber <? super Map <K , Collection <V >>> subscriber ) {
116+
117+ Map <K , Collection <V >> map ;
118+ try {
119+ map = mapFactory .call ();
120+ } catch (Throwable ex ) {
121+ Exceptions .throwIfFatal (ex );
122+ subscriber .onError (ex );
123+ return ;
124+ }
125+ new ToMultimapSubscriber <T , K , V >(
126+ subscriber , map , keySelector , valueSelector , collectionFactory )
127+ .subscribeTo (source );
128+ }
129+
130+ private static final class ToMultimapSubscriber <T , K , V >
131+ extends DeferredScalarSubscriberSafe <T ,Map <K , Collection <V >>> {
132+
133+ private final Func1 <? super T , ? extends K > keySelector ;
134+ private final Func1 <? super T , ? extends V > valueSelector ;
135+ private final Func1 <? super K , ? extends Collection <V >> collectionFactory ;
136+
137+ ToMultimapSubscriber (
138+ Subscriber <? super Map <K , Collection <V >>> subscriber ,
139+ Map <K , Collection <V >> map ,
140+ Func1 <? super T , ? extends K > keySelector , Func1 <? super T , ? extends V > valueSelector ,
141+ Func1 <? super K , ? extends Collection <V >> collectionFactory ) {
142+ super (subscriber );
143+ this .value = map ;
144+ this .hasValue = true ;
145+ this .keySelector = keySelector ;
146+ this .valueSelector = valueSelector ;
147+ this .collectionFactory = collectionFactory ;
148+ }
149+
150+ @ Override
151+ public void onStart () {
152+ request (Long .MAX_VALUE );
153+ }
154+
155+ @ Override
156+ public void onNext (T t ) {
157+ if (done ){
158+ return ;
159+ }
160+ try {
161+ // any interaction with keySelector, valueSelector, collectionFactory, collection or value
162+ // may fail unexpectedly because their behaviour is customisable by the user. For this
163+ // reason we wrap their calls in try-catch block.
164+
165+ K key = keySelector .call (t );
166+ V v = valueSelector .call (t );
167+ Collection <V > collection = value .get (key );
168+ if (collection == null ) {
169+ collection = collectionFactory .call (key );
170+ value .put (key , collection );
171+ }
172+ collection .add (v );
173+ } catch (Throwable ex ) {
174+ Exceptions .throwIfFatal (ex );
175+ unsubscribe ();
176+ onError (ex );
177+ }
178+
179+ }
180+ }
181+
182+ /**
183+ * The default collection factory for a key in the multimap returning
184+ * an ArrayList independent of the key.
185+ * @param <K> the key type
186+ * @param <V> the value type
187+ */
188+ private static final class DefaultMultimapCollectionFactory <K , V >
189+ implements Func1 <K , Collection <V >> {
190+
191+ private static final DefaultMultimapCollectionFactory <Object ,Object > INSTANCE = new DefaultMultimapCollectionFactory <Object , Object >();
192+
193+ @ SuppressWarnings ("unchecked" )
194+ static <K , V > DefaultMultimapCollectionFactory <K ,V > instance () {
195+ return (DefaultMultimapCollectionFactory <K , V >) INSTANCE ;
196+ }
197+
198+ @ Override
199+ public Collection <V > call (K t1 ) {
200+ return new ArrayList <V >();
201+ }
202+ }
203+
204+ }
0 commit comments