|
1 | 1 | (ns rx.lang.clojure.core |
2 | | - (:refer-clojure :exclude [concat cons do drop drop-while empty |
| 2 | + (:refer-clojure :exclude [concat cons count cycle |
| 3 | + distinct do drop drop-while |
| 4 | + empty every? |
3 | 5 | filter first future |
4 | | - interpose into keep keep-indexed |
| 6 | + interpose into |
| 7 | + keep keep-indexed |
5 | 8 | map mapcat map-indexed |
6 | | - merge next partition reduce reductions |
| 9 | + merge next nth partition reduce reductions |
7 | 10 | rest seq some sort sort-by split-with |
8 | 11 | take take-while throw]) |
9 | 12 | (:require [rx.lang.clojure.interop :as iop] |
|
128 | 131 | (.unsubscribe s) |
129 | 132 | s) |
130 | 133 |
|
| 134 | +(defn subscribe-on |
| 135 | + "Cause subscriptions to the given observable to happen on the given scheduler. |
| 136 | +
|
| 137 | + Returns a new Observable. |
| 138 | +
|
| 139 | + See: |
| 140 | + rx.Observable/subscribeOn |
| 141 | + " |
| 142 | + [^rx.Scheduler s ^Observable xs] |
| 143 | + (.subscribeOn xs s)) |
| 144 | + |
| 145 | +(defn unsubscribe-on |
| 146 | + "Cause unsubscriptions from the given observable to happen on the given scheduler. |
| 147 | +
|
| 148 | + Returns a new Observable. |
| 149 | +
|
| 150 | + See: |
| 151 | + rx.Observable/unsubscribeOn |
| 152 | + " |
| 153 | + [^rx.Scheduler s ^Observable xs] |
| 154 | + (.unsubscribeOn xs s)) |
| 155 | + |
131 | 156 | (defn unsubscribed? |
132 | 157 | "Returns true if the given Subscription (or Subscriber) is unsubscribed. |
133 | 158 |
|
|
161 | 186 | [f] |
162 | 187 | (Observable/create ^Observable$OnSubscribe (iop/action* f))) |
163 | 188 |
|
164 | | -;################################################################################ |
165 | | - |
166 | 189 | (defn wrap-on-completed |
167 | 190 | "Wrap handler with code that automaticaly calls rx.Observable.onCompleted." |
168 | 191 | [handler] |
|
181 | 204 | (when-not (unsubscribed? observer) |
182 | 205 | (.onError observer e)))))) |
183 | 206 |
|
| 207 | +;################################################################################ |
| 208 | + |
| 209 | +(defn synchronize |
| 210 | + ([^Observable xs] |
| 211 | + (.synchronize xs)) |
| 212 | + ([lock ^Observable xs] |
| 213 | + (.synchronize xs lock))) |
| 214 | + |
184 | 215 | (defn ^Observable merge |
185 | 216 | "Observable.merge, renamed because merge means something else in Clojure |
186 | 217 |
|
|
214 | 245 | :else |
215 | 246 | (throw (IllegalArgumentException. (str "Don't know how to merge " (type os)))))) |
216 | 247 |
|
217 | | -(defn ^Observable zip |
218 | | - "rx.Observable.zip. You want map." |
219 | | - ([f ^Observable a ^Observable b] (Observable/zip a b (iop/fn* f))) |
220 | | - ([f ^Observable a ^Observable b ^Observable c] (Observable/zip a b c (iop/fn* f))) |
221 | | - ([f ^Observable a ^Observable b ^Observable c ^Observable d] (Observable/zip a b c d (iop/fn* f))) |
222 | | - ([f a b c d & more] |
223 | | - ; recurse on more and then pull everything together with 4 parameter version |
224 | | - (zip (fn [a b c more-value] |
225 | | - (apply f a b c more-value)) |
226 | | - a |
227 | | - b |
228 | | - c |
229 | | - (apply zip vector d more)))) |
230 | | - |
231 | | -(defmacro zip-let |
232 | | - [bindings & body] |
233 | | - (let [pairs (clojure.core/partition 2 bindings) |
234 | | - names (clojure.core/mapv clojure.core/first pairs) |
235 | | - values (clojure.core/map second pairs)] |
236 | | - `(zip (fn ~names ~@body) ~@values))) |
237 | 248 |
|
238 | 249 | ;################################################################################ |
239 | 250 |
|
|
309 | 320 | [^Observable os] |
310 | 321 | (Observable/concat os)) |
311 | 322 |
|
| 323 | +(defn count |
| 324 | + "Returns an Observable that emits the number of items is xs as a long. |
| 325 | +
|
| 326 | + See: |
| 327 | + rx.Observable/longCount |
| 328 | + " |
| 329 | + [^Observable xs] |
| 330 | + (.longCount xs)) |
| 331 | + |
| 332 | +(defn cycle |
| 333 | + "Returns an Observable that emits the items of xs repeatedly, forever. |
| 334 | +
|
| 335 | + TODO: Other sigs. |
| 336 | +
|
| 337 | + See: |
| 338 | + rx.Observable/repeat |
| 339 | + clojure.core/cycle |
| 340 | + " |
| 341 | + [^Observable xs] |
| 342 | + (.repeat xs)) |
| 343 | + |
| 344 | +(defn distinct |
| 345 | + "Returns an Observable of the elements of Observable xs with duplicates |
| 346 | + removed. key-fn, if provided, is a one arg function that determines the |
| 347 | + key used to determined duplicates. key-fn defaults to identity. |
| 348 | +
|
| 349 | + This implementation doesn't use rx.Observable/distinct because it doesn't |
| 350 | + honor Clojure's equality semantics. |
| 351 | +
|
| 352 | + See: |
| 353 | + clojure.core/distinct |
| 354 | + " |
| 355 | + ([xs] (distinct identity xs)) |
| 356 | + ([key-fn ^Observable xs] |
| 357 | + (let [op (fn->operator (fn [o] |
| 358 | + (let [seen (atom #{})] |
| 359 | + (->subscriber o |
| 360 | + (fn [o v] |
| 361 | + (let [key (key-fn v)] |
| 362 | + (when-not (contains? @seen key) |
| 363 | + (swap! seen conj key) |
| 364 | + (on-next o v))))))))] |
| 365 | + (lift op xs)))) |
| 366 | + |
312 | 367 | (defn ^Observable do |
313 | 368 | "Returns a new Observable that, for each x in Observable xs, executes (do-fn x), |
314 | 369 | presumably for its side effects, and then passes x along unchanged. |
|
318 | 373 |
|
319 | 374 | Example: |
320 | 375 |
|
321 | | - (->> (rx/seq->o [1 2 3]) |
322 | | - (rx/do println) |
323 | | - ...) |
| 376 | + (->> (rx/seq->o [1 2 3]) |
| 377 | + (rx/do println) |
| 378 | + ...) |
324 | 379 |
|
325 | 380 | Will print 1, 2, 3. |
| 381 | +
|
| 382 | + See: |
| 383 | + rx.Observable/doOnNext |
326 | 384 | " |
327 | | - [do-fn xs] |
328 | | - (map #(do (do-fn %) %) xs)) |
| 385 | + [do-fn ^Observable xs] |
| 386 | + (.doOnNext xs (iop/action* do-fn))) |
329 | 387 |
|
330 | 388 | (defn ^Observable drop |
331 | 389 | [n ^Observable xs] |
|
335 | 393 | [p ^Observable xs] |
336 | 394 | (.skipWhile xs (fn->predicate p))) |
337 | 395 |
|
| 396 | +(defn ^Observable every? |
| 397 | + "Returns an Observable that emits a single true value if (p x) is true for |
| 398 | + all x in xs. Otherwise emits false. |
| 399 | +
|
| 400 | + See: |
| 401 | + clojure.core/every? |
| 402 | + rx.Observable/all |
| 403 | + " |
| 404 | + [p ^Observable xs] |
| 405 | + (.all xs (fn->predicate p))) |
| 406 | + |
338 | 407 | (defn ^Observable filter |
339 | 408 | [p ^Observable xs] |
340 | 409 | (.filter xs (fn->predicate p))) |
|
349 | 418 | [^Observable xs] |
350 | 419 | (.takeFirst xs)) |
351 | 420 |
|
| 421 | +; TODO group-by |
| 422 | + |
352 | 423 | (defn interpose |
353 | 424 | [sep xs] |
354 | 425 | (let [op (fn->operator (fn [o] |
|
380 | 451 | [f xs] |
381 | 452 | (filter (complement nil?) (map-indexed f xs))) |
382 | 453 |
|
| 454 | +(defn ^Observable map* |
| 455 | + "Map a function over an Observable of Observables. |
| 456 | +
|
| 457 | + Each item from the first emitted Observable is the first arg, each |
| 458 | + item from the second emitted Observable is the second arg, and so on. |
| 459 | +
|
| 460 | + See: |
| 461 | + map |
| 462 | + clojure.core/map |
| 463 | + rx.Observable/zip |
| 464 | + " |
| 465 | + [f ^Observable observable] |
| 466 | + (Observable/zip observable |
| 467 | + ^rx.functions.FuncN (iop/fnN* f))) |
| 468 | + |
383 | 469 | (defn ^Observable map |
384 | | - "Map a function over an observable sequence. Unlike clojure.core/map, only supports up |
385 | | - to 4 simultaneous source sequences at the moment." |
386 | | - ([f ^Observable xs] (.map xs (iop/fn* f))) |
387 | | - ([f xs & observables] (apply zip f xs observables))) |
| 470 | + "Map a function over one or more observable sequences. |
| 471 | +
|
| 472 | + Each item from the first Observable is the first arg, each item |
| 473 | + from the second Observable is the second arg, and so on. |
| 474 | +
|
| 475 | + See: |
| 476 | + clojure.core/map |
| 477 | + rx.Observable/zip |
| 478 | + " |
| 479 | + [f & observables] |
| 480 | + (Observable/zip ^Iterable observables |
| 481 | + ^rx.functions.FuncN (iop/fnN* f))) |
388 | 482 |
|
389 | 483 | (defn ^Observable mapcat |
390 | 484 | "Returns an observable which, for each value x in xs, calls (f x), which must |
|
393 | 487 |
|
394 | 488 | See: |
395 | 489 | clojure.core/mapcat |
396 | | - rx.Observable/mapMany |
| 490 | + rx.Observable/flatMap |
397 | 491 | " |
398 | | - ([f ^Observable xs] (.mapMany xs (iop/fn* f))) |
399 | | - ; TODO multi-arg version |
400 | | - ) |
| 492 | + ([f ^Observable xs] (.flatMap xs (iop/fn* f)))) |
401 | 493 |
|
402 | 494 | (defn map-indexed |
403 | 495 | "Returns an observable that invokes (f index value) for each value of the input |
|
408 | 500 | " |
409 | 501 | [f xs] |
410 | 502 | (let [op (fn->operator (fn [o] |
411 | | - (let [n (atom -1)] |
412 | | - (->subscriber o |
413 | | - (fn [o v] (on-next o (f (swap! n inc) v)))))))] |
| 503 | + (let [n (atom -1)] |
| 504 | + (->subscriber o |
| 505 | + (fn [o v] (on-next o (f (swap! n inc) v)))))))] |
414 | 506 | (lift op xs))) |
415 | 507 |
|
416 | 508 | (def next |
|
421 | 513 | " |
422 | 514 | (partial drop 1)) |
423 | 515 |
|
424 | | -; TODO partition. Use Buffer whenever it's implemented. |
| 516 | +(defn nth |
| 517 | + "Returns an Observable that emits the value at the index in the given |
| 518 | + Observable. nth throws an IndexOutOfBoundsException unless not-found |
| 519 | + is supplied. |
| 520 | +
|
| 521 | + Note that the Observable is the *first* arg! |
| 522 | + " |
| 523 | + ([^Observable xs index] |
| 524 | + (.elementAt xs index)) |
| 525 | + ([^Observable xs index not-found] |
| 526 | + (.elementAtOrDefault xs index not-found))) |
| 527 | + |
| 528 | +; TODO partition. Use window |
425 | 529 |
|
426 | 530 | (defn ^Observable reduce |
427 | 531 | ([f ^Observable xs] (.reduce xs (iop/fn* f))) |
|
448 | 552 | (filter identity) |
449 | 553 | first)) |
450 | 554 |
|
451 | | -(defn sort |
452 | | - "Returns an observable that emits a single value which is a sorted sequence |
| 555 | +(defn sorted-list |
| 556 | + "Returns an observable that emits a *single value* which is a sorted List |
453 | 557 | of the items in coll, where the sort order is determined by comparing |
454 | 558 | items. If no comparator is supplied, uses compare. comparator must |
455 | 559 | implement java.util.Comparator. |
456 | 560 |
|
| 561 | + Use sort if you don't want the sequence squashed down to a List. |
| 562 | +
|
457 | 563 | See: |
458 | | - clojure.core/sort |
| 564 | + rx.Observable/toSortedList |
| 565 | + sort |
459 | 566 | " |
460 | | - ([coll] (sort clojure.core/compare coll)) |
| 567 | + ([coll] (sorted-list clojure.core/compare coll)) |
461 | 568 | ([comp ^Observable coll] |
462 | 569 | (.toSortedList coll (iop/fn [a b] |
463 | 570 | ; force to int so rxjava doesn't have a fit |
464 | 571 | (int (comp a b)))))) |
465 | 572 |
|
466 | | -(defn sort-by |
467 | | - "Returns an observable that emits a single value which is a sorted sequence |
| 573 | +(defn sorted-list-by |
| 574 | + "Returns an observable that emits a *single value* which is a sorted List |
468 | 575 | of the items in coll, where the sort order is determined by comparing |
469 | 576 | (keyfn item). If no comparator is supplied, uses compare. comparator must |
470 | 577 | implement java.util.Comparator. |
471 | 578 |
|
| 579 | + Use sort-by if you don't want the sequence squashed down to a List. |
| 580 | +
|
472 | 581 | See: |
473 | | - clojure.core/sort-by |
| 582 | + rx.Observable/toSortedList |
| 583 | + sort-by |
474 | 584 | " |
475 | | - ([keyfn coll] (sort-by keyfn clojure.core/compare coll)) |
| 585 | + ([keyfn coll] (sorted-list-by keyfn clojure.core/compare coll)) |
476 | 586 | ([keyfn comp ^Observable coll] |
477 | 587 | (.toSortedList coll (iop/fn [a b] |
478 | 588 | ; force to int so rxjava doesn't have a fit |
479 | 589 | (int (comp (keyfn a) (keyfn b))))))) |
480 | 590 |
|
| 591 | +(defn sort |
| 592 | + "Returns an observable that emits the items in xs, where the sort order is |
| 593 | + determined by comparing items. If no comparator is supplied, uses compare. |
| 594 | + comparator must implement java.util.Comparator. |
| 595 | +
|
| 596 | + See: |
| 597 | + sorted-list |
| 598 | + clojure.core/sort |
| 599 | + " |
| 600 | + ([xs] |
| 601 | + (->> xs |
| 602 | + (sorted-list) |
| 603 | + (mapcat seq->o))) |
| 604 | + ([comp xs] |
| 605 | + (->> xs |
| 606 | + (sorted-list comp) |
| 607 | + (mapcat seq->o)))) |
| 608 | + |
| 609 | +(defn sort-by |
| 610 | + "Returns an observable that emits the items in xs, where the sort order is |
| 611 | + determined by comparing (keyfn item). If no comparator is supplied, uses |
| 612 | + compare. comparator must implement java.util.Comparator. |
| 613 | +
|
| 614 | + See: |
| 615 | + clojure.core/sort-by |
| 616 | + " |
| 617 | + ([keyfn xs] |
| 618 | + (->> (sorted-list-by keyfn xs) |
| 619 | + (mapcat seq->o))) |
| 620 | + ([keyfn comp ^Observable xs] |
| 621 | + (->> xs |
| 622 | + (sorted-list-by keyfn comp) |
| 623 | + (mapcat seq->o)))) |
| 624 | + |
481 | 625 | (defn split-with |
482 | 626 | "Returns an observable that emits a pair of observables |
483 | 627 |
|
|
0 commit comments