Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@
package rx.lang.scala

import rx.functions.FuncN
import rx.Observable.OnSubscribeFunc
import rx.lang.scala.observables.ConnectableObservable
import scala.concurrent.duration
import java.util
import collection.JavaConversions._
import scala.collection.generic.CanBuildFrom
import scala.annotation.unchecked.uncheckedVariance
import scala.collection.{Iterable, Traversable, immutable}
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag


/**
Expand Down Expand Up @@ -3970,6 +3974,163 @@ trait Observable[+T]
}
}
}

/**
* Returns an Observable that emits a single item, a collection composed of all the items emitted by
* the source Observable.
*
* Be careful not to use this operator on Observables that emit infinite or very large numbers
* of items, as you do not have the option to unsubscribe.
*
* @tparam Col the collection type to build.
* @return an Observable that emits a single item, a collection containing all of the items emitted by
* the source Observable.
*/
def to[Col[_]](implicit cbf: CanBuildFrom[Nothing, T, Col[T @uncheckedVariance]]): Observable[Col[T @uncheckedVariance]] = {
lift {
(subscriber: Subscriber[Col[T]]) => {
val b = cbf()
Subscriber[T](
subscriber,
(t: T) => {
b += t: Unit
},
e => subscriber.onError(e),
() => {
subscriber.onNext(b.result)
subscriber.onCompleted()
}
)
}
}
}

/**
* Returns an Observable that emits a single item, a `Traversable` composed of all the items emitted by
* the source Observable.
*
* Be careful not to use this operator on Observables that emit infinite or very large numbers
* of items, as you do not have the option to unsubscribe.
*
* @return an Observable that emits a single item, a `Traversable` containing all of the items emitted by
* the source Observable.
*/
def toTraversable: Observable[Traversable[T]] = to[Traversable]

/**
* Returns an Observable that emits a single item, a `List` composed of all the items emitted by
* the source Observable.
*
* Be careful not to use this operator on Observables that emit infinite or very large numbers
* of items, as you do not have the option to unsubscribe.
*
* @return an Observable that emits a single item, a `List` containing all of the items emitted by
* the source Observable.
*/
def toList: Observable[List[T]] = to[List]

/**
* Returns an Observable that emits a single item, an `Iterable` composed of all the items emitted by
* the source Observable.
*
* Be careful not to use this operator on Observables that emit infinite or very large numbers
* of items, as you do not have the option to unsubscribe.
*
* @return an Observable that emits a single item, an `Iterable` containing all of the items emitted by
* the source Observable.
*/
def toIterable: Observable[Iterable[T]] = to[Iterable]

/**
* Returns an Observable that emits a single item, an `Iterator` composed of all the items emitted by
* the source Observable.
*
* Be careful not to use this operator on Observables that emit infinite or very large numbers
* of items, as you do not have the option to unsubscribe.
*
* @return an Observable that emits a single item, an `Iterator` containing all of the items emitted by
* the source Observable.
*/
def toIterator: Observable[Iterator[T]] = toIterable.map(_.iterator)

/**
* Returns an Observable that emits a single item, a `Stream` composed of all the items emitted by
* the source Observable.
*
* Be careful not to use this operator on Observables that emit infinite or very large numbers
* of items, as you do not have the option to unsubscribe.
*
* @return an Observable that emits a single item, a `Stream` containing all of the items emitted by
* the source Observable.
*/
def toStream: Observable[Stream[T]] = to[Stream]

/**
* Returns an Observable that emits a single item, an `IndexedSeq` composed of all the items emitted by
* the source Observable.
*
* Be careful not to use this operator on Observables that emit infinite or very large numbers
* of items, as you do not have the option to unsubscribe.
*
* @return an Observable that emits a single item, an `IndexedSeq` containing all of the items emitted by
* the source Observable.
*/
def toIndexedSeq: Observable[immutable.IndexedSeq[T]] = to[immutable.IndexedSeq]

/**
* Returns an Observable that emits a single item, a `Vector` composed of all the items emitted by
* the source Observable.
*
* Be careful not to use this operator on Observables that emit infinite or very large numbers
* of items, as you do not have the option to unsubscribe.
*
* @return an Observable that emits a single item, a `Vector` containing all of the items emitted by
* the source Observable.
*/
def toVector: Observable[Vector[T]] = to[Vector]

/**
* Returns an Observable that emits a single item, a `Buffer` composed of all the items emitted by
* the source Observable.
*
* Be careful not to use this operator on Observables that emit infinite or very large numbers
* of items, as you do not have the option to unsubscribe.
*
* @return an Observable that emits a single item, a `Buffer` containing all of the items emitted by
* the source Observable.
*/
def toBuffer[U >: T]: Observable[mutable.Buffer[U]] = { // use U >: T because Buffer is invariant
val us: Observable[U] = this
us.to[ArrayBuffer]
}

/**
* Returns an Observable that emits a single item, a `Set` composed of all the items emitted by
* the source Observable.
*
* Be careful not to use this operator on Observables that emit infinite or very large numbers
* of items, as you do not have the option to unsubscribe.
*
* @return an Observable that emits a single item, a `Set` containing all of the items emitted by
* the source Observable.
*/
def toSet[U >: T]: Observable[immutable.Set[U]] = { // use U >: T because Set is invariant
val us: Observable[U] = this
us.to[immutable.Set]
}

/**
* Returns an Observable that emits a single item, an `Array` composed of all the items emitted by
* the source Observable.
*
* Be careful not to use this operator on Observables that emit infinite or very large numbers
* of items, as you do not have the option to unsubscribe.
*
* @return an Observable that emits a single item, an `Array` containing all of the items emitted by
* the source Observable.
*/
def toArray[U >: T : ClassTag]: Observable[Array[U]] = // use U >: T because Array is invariant
toBuffer[U].map(_.toArray)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,4 +286,64 @@ class ObservableTests extends JUnitSuite {
assertEquals(List("a", "b", "c"), o.toBlocking.toList)
assertTrue(called)
}

@Test
def testToTraversable() {
val o = Observable.items(1, 2, 3).toTraversable
assertEquals(Seq(1, 2, 3), o.toBlocking.single)
}

@Test
def testToList() {
val o = Observable.items(1, 2, 3).toList
assertEquals(Seq(1, 2, 3), o.toBlocking.single)
}

@Test
def testToIterable() {
val o = Observable.items(1, 2, 3).toIterable
assertEquals(Seq(1, 2, 3), o.toBlocking.single)
}

@Test
def testToIterator() {
val o = Observable.items(1, 2, 3).toIterator
assertEquals(Seq(1, 2, 3), o.toBlocking.single.toSeq)
}

@Test
def testToStream() {
val o = Observable.items(1, 2, 3).toStream
assertEquals(Seq(1, 2, 3), o.toBlocking.single)
}

@Test
def testToIndexedSeq() {
val o = Observable.items(1, 2, 3).toIndexedSeq
assertEquals(Seq(1, 2, 3), o.toBlocking.single)
}

@Test
def testToBuffer() {
val o = Observable.items(1, 2, 3).toBuffer
assertEquals(Seq(1, 2, 3), o.toBlocking.single)
}

@Test
def testToSet() {
val o = Observable.items(1, 2, 2).toSet
assertEquals(Set(1, 2), o.toBlocking.single)
}

@Test
def testToVector() {
val o = Observable.items(1, 2, 3).toVector
assertEquals(Seq(1, 2, 3), o.toBlocking.single)
}

@Test
def testToArray() {
val o = Observable.items(1, 2, 3).toArray
assertArrayEquals(Array(1, 2, 3), o.toBlocking.single)
}
}